Ir para o conteúdo

Camada Gold

A camada Gold representa a etapa final de refinamento e transformação dos dados, onde eles são preparados para análises avançadas e tomadas de decisão.

Mostrando todos os arquivos da camada silver

display(dbutils.fs.ls(f"/mnt/{storageAccountName}/silver/"))

Gerando um dataframe dos delta lake no container silver do Azure Data Lake Storage

df_circuits              = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/circuits")
df_constructor_results   = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/constructor_results")
df_constructor_standings = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/constructor_standings")
df_constructors          = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/constructors")
df_driver_standings      = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/driver_standings")
df_drivers               = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/drivers")
df_lap_times             = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/lap_times")
df_pit_stops             = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/pit_stops")
df_qualifying            = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/qualifying")
df_races                 = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/races")
df_results               = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/results")
df_seasons               = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/seasons")
df_sprint_results        = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/sprint_results")
df_status                = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/status")

Criação das tabelas do modelo relacional

Criação tabela dim_driver

%sql
USE spark_catalog.default;

CREATE TABLE IF NOT EXISTS dim_driver (
    SK_DRIVER BIGINT GENERATED BY DEFAULT AS IDENTITY,
    IDENTIFIER_DRIVER INTEGER,
    REFERENCE_DRIVER VARCHAR(100),
    FORENAME VARCHAR(50),
    NATIONALITY VARCHAR(50)
)
USING delta
LOCATION 'dbfs:/mnt/datalakeea44c854c651c494/gold/dim_driver'

Inserção de dados na tabela dim_driver

%sql

WITH drivers_relacional AS (
  SELECT 
    try_cast(IDENTIFIER_DRIVER AS INT) AS IDENTIFIER_DRIVER,
    REFERENCE_DRIVER,
    FORENAME,
    NATIONALITY
  FROM drivers
)

MERGE INTO dim_driver AS d
USING drivers_relacional AS dr
ON d.IDENTIFIER_DRIVER = dr.IDENTIFIER_DRIVER

WHEN MATCHED AND (
     d.REFERENCE_DRIVER <> dr.REFERENCE_DRIVER OR
     d.FORENAME <> dr.FORENAME OR
     d.NATIONALITY <> dr.NATIONALITY
) THEN
  UPDATE SET 
    REFERENCE_DRIVER = dr.REFERENCE_DRIVER,
    FORENAME = dr.FORENAME,
    NATIONALITY = dr.NATIONALITY

WHEN NOT MATCHED THEN
  INSERT (IDENTIFIER_DRIVER, REFERENCE_DRIVER, FORENAME, NATIONALITY)
  VALUES (dr.IDENTIFIER_DRIVER, dr.REFERENCE_DRIVER, dr.FORENAME, dr.NATIONALITY);

Criação tabela dim_circuits

%sql
USE spark_catalog.default;

CREATE TABLE IF NOT EXISTS dim_circuits (
    SK_CIRCUITS BIGINT GENERATED BY DEFAULT AS IDENTITY,
    IDENTIFIER_CIRCUIT INTEGER,
    REFERENCE_CIRCUIT VARCHAR(100),
    COUNTRY VARCHAR(50)
)
USING delta
LOCATION 'dbfs:/mnt/datalakeea44c854c651c494/gold/dim_circuits'

Inserção de dados na tabela dim_circuits

%sql
WITH circuits_relacional AS (
  SELECT 
    try_cast(IDENTIFIER_CIRCUIT AS INT) AS IDENTIFIER_CIRCUIT,
    REFERENCE_CIRCUIT,
    COUNTRY
  FROM circuits
)

MERGE INTO dim_circuits AS dc
USING circuits_relacional AS cr
ON dc.IDENTIFIER_CIRCUIT = cr.IDENTIFIER_CIRCUIT

WHEN MATCHED AND (
     dc.REFERENCE_CIRCUIT <> cr.REFERENCE_CIRCUIT OR
     dc.COUNTRY <> cr.COUNTRY
) THEN
  UPDATE SET 
    REFERENCE_CIRCUIT = cr.REFERENCE_CIRCUIT,
    COUNTRY = cr.COUNTRY

WHEN NOT MATCHED THEN
  INSERT (IDENTIFIER_CIRCUIT, REFERENCE_CIRCUIT, COUNTRY)
  VALUES (cr.IDENTIFIER_CIRCUIT, cr.REFERENCE_CIRCUIT, cr.COUNTRY);

Criação tabela dim_status

%sql
USE spark_catalog.default;

CREATE TABLE IF NOT EXISTS dim_status (
    SK_STATUS BIGINT GENERATED BY DEFAULT AS IDENTITY,
    IDENTIFIER_STATUS INTEGER,
    STATUS VARCHAR(20)
)
USING delta
LOCATION 'dbfs:/mnt/datalakeea44c854c651c494/gold/dim_status'

Inserção de dados na tabela dim_status

%sql
WITH status_relacional AS (
  SELECT 
    try_cast(IDENTIFIER_STATUS AS INT) AS IDENTIFIER_STATUS,
    STATUS
  FROM status
)

MERGE INTO dim_status AS ds
USING status_relacional AS sr
ON ds.IDENTIFIER_STATUS = sr.IDENTIFIER_STATUS

WHEN MATCHED AND (
     ds.STATUS <> sr.STATUS
) THEN
  UPDATE SET 
    ds.STATUS = sr.STATUS

WHEN NOT MATCHED THEN
  INSERT (IDENTIFIER_STATUS, STATUS)
  VALUES (sr.IDENTIFIER_STATUS, sr.STATUS);

Criação tabela dim_time

from pyspark.sql.functions import expr, date_format, row_number
from pyspark.sql.window import Window

# Define o intervalo de datas desejado
data_inicial = "1950-05-12"
data_final = "2024-12-07"

# Calcula o número de dias no intervalo
num_dias = spark.sql(f"SELECT datediff('{data_final}', '{data_inicial}')").collect()[0][0]

# Cria um DataFrame com uma coluna contendo uma sequência de datas
df_calendario = spark.range(0, num_dias + 1) \
    .selectExpr(f"date_add(to_date('{data_inicial}'), CAST(id AS INT)) AS Data")

# Extrai os componentes de data
df_tempo = df_calendario.selectExpr(
    "Data",
    "year(Data) AS Ano",
    "month(Data) AS Mes",
    "(CASE month(Data) \
        WHEN 1 THEN 'JANEIRO' \
        WHEN 2 THEN 'FEVEREIRO' \
        WHEN 3 THEN 'MARCO' \
        WHEN 4 THEN 'ABRIL' \
        WHEN 5 THEN 'MAIO' \
        WHEN 6 THEN 'JUNHO' \
        WHEN 7 THEN 'JULHO' \
        WHEN 8 THEN 'AGOSTO' \
        WHEN 9 THEN 'SETEMBRO' \
        WHEN 10 THEN 'OUTUBRO' \
        WHEN 11 THEN 'NOVEMBRO' \
        WHEN 12 THEN 'DEZEMBRO' \
    END) AS NomeMes",
    "day(Data) AS Dia",
    "(CASE dayofweek(Data) \
        WHEN 1 THEN 'DOMINGO' \
        WHEN 2 THEN 'SEGUNDA-FEIRA' \
        WHEN 3 THEN 'TERCA-FEIRA' \
        WHEN 4 THEN 'QUARTA-FEIRA' \
        WHEN 5 THEN 'QUINTA-FEIRA' \
        WHEN 6 THEN 'SEXTA-FEIRA' \
        WHEN 7 THEN 'SABADO' \
    END) AS NomeDiaSemana",
    "dayofweek(Data) AS NumeroDiaSemana"
)

# Define a janela para gerar o surrogate key
windowSpec = Window.orderBy("Data")

# Adiciona a coluna sk_time
df_tempo_sk = df_tempo.withColumn("sk_time", row_number().over(windowSpec))

# Exibe o DataFrame resultante
df_tempo_sk.display()

# Salva como tabela Delta
df_tempo_sk.write.option("path", f"/mnt/{storageAccountName}/gold/dim_time").saveAsTable("dim_time", format="delta")

Criação tabela dim_races

%sql
USE spark_catalog.default;

CREATE TABLE IF NOT EXISTS dim_races (
  SK_RACES BIGINT GENERATED BY DEFAULT AS IDENTITY,
  SK_TIME BIGINT,
  IDENTIFIER_RACE INTEGER
)
USING delta
LOCATION 'dbfs:/mnt/datalakeea44c854c651c494/gold/dim_races';

Inserção de dados na tabela dim_races

%sql
WITH races_data AS (
  SELECT
    IDENTIFIER_RACE,
    TO_DATE(DATE, 'yyyy-MM-dd') AS race_date
  FROM
    races
),

races_com_sk_time AS (
  SELECT
    r.IDENTIFIER_RACE,
    dt.sk_time
  FROM
    races_data r
  JOIN
    dim_time dt
  ON
    r.race_date = dt.Data
)

MERGE INTO dim_races dr
USING races_com_sk_time rst
ON dr.IDENTIFIER_RACE = rst.IDENTIFIER_RACE

WHEN NOT MATCHED THEN
  INSERT (SK_TIME, IDENTIFIER_RACE)
  VALUES (rst.sk_time, rst.IDENTIFIER_RACE)

Criação tabela fato fat_crash_history

%sql
create table fat_crash_history (
   FK_RACES             int,
   FK_STATUS            int,
   FK_DRIVER            int,
   FK_CIRCUITS          int
)
USING delta
LOCATION '/mnt/datalakeea44c854c651c494/gold/fat_crash_history'

Inserção de dados na tabela fato fat_crash_history

%sql
CREATE OR REPLACE TABLE fat_crash_history
USING delta
LOCATION '/mnt/datalakeea44c854c651c494/gold/fat_crash_history'
AS
SELECT
  d.SK_DRIVER     AS FK_DRIVER,
  s.SK_STATUS     AS FK_STATUS,
  r.SK_RACES      AS FK_RACES,
  c.SK_CIRCUITS   AS FK_CIRCUITS
FROM results res
JOIN races ra 
  ON res.IDENTIFIER_RACE = ra.IDENTIFIER_RACE
JOIN dim_driver d 
  ON res.IDENTIFIER_DRIVER = d.IDENTIFIER_DRIVER
JOIN dim_status s 
  ON res.IDENTIFIER_STATUS = s.IDENTIFIER_STATUS
JOIN dim_races r 
  ON res.IDENTIFIER_RACE = r.IDENTIFIER_RACE
JOIN dim_circuits c 
  ON ra.IDENTIFIER_CIRCUIT = c.IDENTIFIER_CIRCUIT
WHERE res.IDENTIFIER_STATUS IN (3, 4);

Criação tabela fato fat_driver_history

%sql
create table fat_driver_history (
   FK_RACES             int,
   FK_DRIVER            int,
   FK_CIRCUITS          int,
   FASTESTLAPTIME       int
)
USING delta
LOCATION '/mnt/datalakeea44c854c651c494/gold/fat_driver_history'

Inserção de dados na tabela fato fat_driver_history

%sql
CREATE OR REPLACE TABLE fat_driver_history
USING delta
LOCATION '/mnt/datalakeea44c854c651c494/gold/fat_driver_history'
AS
SELECT
  d.SK_DRIVER        AS FK_DRIVER,
  r.SK_RACES         AS FK_RACES,
  c.SK_CIRCUITS      AS FK_CIRCUITS,
  res.FASTESTLAPTIME AS FASTESTLAPTIME
FROM results res
JOIN races ra 
  ON res.IDENTIFIER_RACE = ra.IDENTIFIER_RACE
JOIN dim_driver d 
  ON res.IDENTIFIER_DRIVER = d.IDENTIFIER_DRIVER
JOIN dim_races r 
  ON res.IDENTIFIER_RACE = r.IDENTIFIER_RACE
JOIN dim_circuits c 
  ON ra.IDENTIFIER_CIRCUIT = c.IDENTIFIER_CIRCUIT
WHERE res.FASTESTLAPTIME IS NOT NULL;