Camada Gold
A camada Gold representa a etapa final de refinamento e transformação dos dados, onde eles são preparados para análises avançadas e tomadas de decisão.
Mostrando todos os arquivos da camada silver
display(dbutils.fs.ls(f"/mnt/{storageAccountName}/silver/"))
Gerando um dataframe dos delta lake no container silver do Azure Data Lake Storage
df_circuits = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/circuits")
df_constructor_results = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/constructor_results")
df_constructor_standings = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/constructor_standings")
df_constructors = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/constructors")
df_driver_standings = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/driver_standings")
df_drivers = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/drivers")
df_lap_times = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/lap_times")
df_pit_stops = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/pit_stops")
df_qualifying = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/qualifying")
df_races = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/races")
df_results = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/results")
df_seasons = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/seasons")
df_sprint_results = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/sprint_results")
df_status = spark.read.format('delta').load(f"/mnt/{storageAccountName}/silver/status")
Criação das tabelas do modelo relacional
Criação tabela dim_driver
%sql
USE spark_catalog.default;
CREATE TABLE IF NOT EXISTS dim_driver (
SK_DRIVER BIGINT GENERATED BY DEFAULT AS IDENTITY,
IDENTIFIER_DRIVER INTEGER,
REFERENCE_DRIVER VARCHAR(100),
FORENAME VARCHAR(50),
NATIONALITY VARCHAR(50)
)
USING delta
LOCATION 'dbfs:/mnt/datalakeea44c854c651c494/gold/dim_driver'
Inserção de dados na tabela dim_driver
%sql
WITH drivers_relacional AS (
SELECT
try_cast(IDENTIFIER_DRIVER AS INT) AS IDENTIFIER_DRIVER,
REFERENCE_DRIVER,
FORENAME,
NATIONALITY
FROM drivers
)
MERGE INTO dim_driver AS d
USING drivers_relacional AS dr
ON d.IDENTIFIER_DRIVER = dr.IDENTIFIER_DRIVER
WHEN MATCHED AND (
d.REFERENCE_DRIVER <> dr.REFERENCE_DRIVER OR
d.FORENAME <> dr.FORENAME OR
d.NATIONALITY <> dr.NATIONALITY
) THEN
UPDATE SET
REFERENCE_DRIVER = dr.REFERENCE_DRIVER,
FORENAME = dr.FORENAME,
NATIONALITY = dr.NATIONALITY
WHEN NOT MATCHED THEN
INSERT (IDENTIFIER_DRIVER, REFERENCE_DRIVER, FORENAME, NATIONALITY)
VALUES (dr.IDENTIFIER_DRIVER, dr.REFERENCE_DRIVER, dr.FORENAME, dr.NATIONALITY);
Criação tabela dim_circuits
%sql
USE spark_catalog.default;
CREATE TABLE IF NOT EXISTS dim_circuits (
SK_CIRCUITS BIGINT GENERATED BY DEFAULT AS IDENTITY,
IDENTIFIER_CIRCUIT INTEGER,
REFERENCE_CIRCUIT VARCHAR(100),
COUNTRY VARCHAR(50)
)
USING delta
LOCATION 'dbfs:/mnt/datalakeea44c854c651c494/gold/dim_circuits'
Inserção de dados na tabela dim_circuits
%sql
WITH circuits_relacional AS (
SELECT
try_cast(IDENTIFIER_CIRCUIT AS INT) AS IDENTIFIER_CIRCUIT,
REFERENCE_CIRCUIT,
COUNTRY
FROM circuits
)
MERGE INTO dim_circuits AS dc
USING circuits_relacional AS cr
ON dc.IDENTIFIER_CIRCUIT = cr.IDENTIFIER_CIRCUIT
WHEN MATCHED AND (
dc.REFERENCE_CIRCUIT <> cr.REFERENCE_CIRCUIT OR
dc.COUNTRY <> cr.COUNTRY
) THEN
UPDATE SET
REFERENCE_CIRCUIT = cr.REFERENCE_CIRCUIT,
COUNTRY = cr.COUNTRY
WHEN NOT MATCHED THEN
INSERT (IDENTIFIER_CIRCUIT, REFERENCE_CIRCUIT, COUNTRY)
VALUES (cr.IDENTIFIER_CIRCUIT, cr.REFERENCE_CIRCUIT, cr.COUNTRY);
Criação tabela dim_status
%sql
USE spark_catalog.default;
CREATE TABLE IF NOT EXISTS dim_status (
SK_STATUS BIGINT GENERATED BY DEFAULT AS IDENTITY,
IDENTIFIER_STATUS INTEGER,
STATUS VARCHAR(20)
)
USING delta
LOCATION 'dbfs:/mnt/datalakeea44c854c651c494/gold/dim_status'
Inserção de dados na tabela dim_status
%sql
WITH status_relacional AS (
SELECT
try_cast(IDENTIFIER_STATUS AS INT) AS IDENTIFIER_STATUS,
STATUS
FROM status
)
MERGE INTO dim_status AS ds
USING status_relacional AS sr
ON ds.IDENTIFIER_STATUS = sr.IDENTIFIER_STATUS
WHEN MATCHED AND (
ds.STATUS <> sr.STATUS
) THEN
UPDATE SET
ds.STATUS = sr.STATUS
WHEN NOT MATCHED THEN
INSERT (IDENTIFIER_STATUS, STATUS)
VALUES (sr.IDENTIFIER_STATUS, sr.STATUS);
Criação tabela dim_time
from pyspark.sql.functions import expr, date_format, row_number
from pyspark.sql.window import Window
# Define o intervalo de datas desejado
data_inicial = "1950-05-12"
data_final = "2024-12-07"
# Calcula o número de dias no intervalo
num_dias = spark.sql(f"SELECT datediff('{data_final}', '{data_inicial}')").collect()[0][0]
# Cria um DataFrame com uma coluna contendo uma sequência de datas
df_calendario = spark.range(0, num_dias + 1) \
.selectExpr(f"date_add(to_date('{data_inicial}'), CAST(id AS INT)) AS Data")
# Extrai os componentes de data
df_tempo = df_calendario.selectExpr(
"Data",
"year(Data) AS Ano",
"month(Data) AS Mes",
"(CASE month(Data) \
WHEN 1 THEN 'JANEIRO' \
WHEN 2 THEN 'FEVEREIRO' \
WHEN 3 THEN 'MARCO' \
WHEN 4 THEN 'ABRIL' \
WHEN 5 THEN 'MAIO' \
WHEN 6 THEN 'JUNHO' \
WHEN 7 THEN 'JULHO' \
WHEN 8 THEN 'AGOSTO' \
WHEN 9 THEN 'SETEMBRO' \
WHEN 10 THEN 'OUTUBRO' \
WHEN 11 THEN 'NOVEMBRO' \
WHEN 12 THEN 'DEZEMBRO' \
END) AS NomeMes",
"day(Data) AS Dia",
"(CASE dayofweek(Data) \
WHEN 1 THEN 'DOMINGO' \
WHEN 2 THEN 'SEGUNDA-FEIRA' \
WHEN 3 THEN 'TERCA-FEIRA' \
WHEN 4 THEN 'QUARTA-FEIRA' \
WHEN 5 THEN 'QUINTA-FEIRA' \
WHEN 6 THEN 'SEXTA-FEIRA' \
WHEN 7 THEN 'SABADO' \
END) AS NomeDiaSemana",
"dayofweek(Data) AS NumeroDiaSemana"
)
# Define a janela para gerar o surrogate key
windowSpec = Window.orderBy("Data")
# Adiciona a coluna sk_time
df_tempo_sk = df_tempo.withColumn("sk_time", row_number().over(windowSpec))
# Exibe o DataFrame resultante
df_tempo_sk.display()
# Salva como tabela Delta
df_tempo_sk.write.option("path", f"/mnt/{storageAccountName}/gold/dim_time").saveAsTable("dim_time", format="delta")
Criação tabela dim_races
%sql
USE spark_catalog.default;
CREATE TABLE IF NOT EXISTS dim_races (
SK_RACES BIGINT GENERATED BY DEFAULT AS IDENTITY,
SK_TIME BIGINT,
IDENTIFIER_RACE INTEGER
)
USING delta
LOCATION 'dbfs:/mnt/datalakeea44c854c651c494/gold/dim_races';
Inserção de dados na tabela dim_races
%sql
WITH races_data AS (
SELECT
IDENTIFIER_RACE,
TO_DATE(DATE, 'yyyy-MM-dd') AS race_date
FROM
races
),
races_com_sk_time AS (
SELECT
r.IDENTIFIER_RACE,
dt.sk_time
FROM
races_data r
JOIN
dim_time dt
ON
r.race_date = dt.Data
)
MERGE INTO dim_races dr
USING races_com_sk_time rst
ON dr.IDENTIFIER_RACE = rst.IDENTIFIER_RACE
WHEN NOT MATCHED THEN
INSERT (SK_TIME, IDENTIFIER_RACE)
VALUES (rst.sk_time, rst.IDENTIFIER_RACE)
Criação tabela fato fat_crash_history
%sql
create table fat_crash_history (
FK_RACES int,
FK_STATUS int,
FK_DRIVER int,
FK_CIRCUITS int
)
USING delta
LOCATION '/mnt/datalakeea44c854c651c494/gold/fat_crash_history'
Inserção de dados na tabela fato fat_crash_history
%sql
CREATE OR REPLACE TABLE fat_crash_history
USING delta
LOCATION '/mnt/datalakeea44c854c651c494/gold/fat_crash_history'
AS
SELECT
d.SK_DRIVER AS FK_DRIVER,
s.SK_STATUS AS FK_STATUS,
r.SK_RACES AS FK_RACES,
c.SK_CIRCUITS AS FK_CIRCUITS
FROM results res
JOIN races ra
ON res.IDENTIFIER_RACE = ra.IDENTIFIER_RACE
JOIN dim_driver d
ON res.IDENTIFIER_DRIVER = d.IDENTIFIER_DRIVER
JOIN dim_status s
ON res.IDENTIFIER_STATUS = s.IDENTIFIER_STATUS
JOIN dim_races r
ON res.IDENTIFIER_RACE = r.IDENTIFIER_RACE
JOIN dim_circuits c
ON ra.IDENTIFIER_CIRCUIT = c.IDENTIFIER_CIRCUIT
WHERE res.IDENTIFIER_STATUS IN (3, 4);
Criação tabela fato fat_driver_history
%sql
create table fat_driver_history (
FK_RACES int,
FK_DRIVER int,
FK_CIRCUITS int,
FASTESTLAPTIME int
)
USING delta
LOCATION '/mnt/datalakeea44c854c651c494/gold/fat_driver_history'
Inserção de dados na tabela fato fat_driver_history
%sql
CREATE OR REPLACE TABLE fat_driver_history
USING delta
LOCATION '/mnt/datalakeea44c854c651c494/gold/fat_driver_history'
AS
SELECT
d.SK_DRIVER AS FK_DRIVER,
r.SK_RACES AS FK_RACES,
c.SK_CIRCUITS AS FK_CIRCUITS,
res.FASTESTLAPTIME AS FASTESTLAPTIME
FROM results res
JOIN races ra
ON res.IDENTIFIER_RACE = ra.IDENTIFIER_RACE
JOIN dim_driver d
ON res.IDENTIFIER_DRIVER = d.IDENTIFIER_DRIVER
JOIN dim_races r
ON res.IDENTIFIER_RACE = r.IDENTIFIER_RACE
JOIN dim_circuits c
ON ra.IDENTIFIER_CIRCUIT = c.IDENTIFIER_CIRCUIT
WHERE res.FASTESTLAPTIME IS NOT NULL;