Camada Silver
A camada Silver é responsável pela limpeza e padronização dos dados brutos da camada Bronze, garantindo que os dados estejam estruturados de maneira consistente.
Mostrando todos os arquivos da camada bronze
display(dbutils.fs.ls(f"/mnt/{storageAccountName}/bronze"))
Gerando um dataframe dos delta lake no container bronze do Azure Data Lake Storage
df_circuits = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/circuits")
df_constructor_results = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/constructor_results")
df_constructor_standings = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/constructor_standings")
df_constructors = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/constructors")
df_driver_standings = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/driver_standings")
df_drivers = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/drivers")
df_lap_times = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/lap_times")
df_pit_stops = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/pit_stops")
df_qualifying = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/qualifying")
df_races = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/races")
df_results = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/results")
df_seasons = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/seasons")
df_sprint_results = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/sprint_results")
df_status = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/status")
Adicionando metadados de data e hora de processamento e nome do arquivo de origem
from pyspark.sql.functions import current_timestamp, lit
df_circuits = df_circuits.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("circuits"))
df_constructor_results = df_constructor_results.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("constructor_results"))
df_constructor_standings = df_constructor_standings.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("constructor_standings"))
df_constructors = df_constructors.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("constructors"))
df_driver_standings = df_driver_standings.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("driver_standings"))
df_drivers = df_drivers.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("drivers"))
df_lap_times = df_lap_times.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("lap_times"))
df_pit_stops = df_pit_stops.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("pit_stops"))
df_qualifying = df_qualifying.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("qualifying"))
df_races = df_races.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("races"))
df_results = df_results.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("results"))
df_seasons = df_seasons.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("seasons"))
df_sprint_results = df_sprint_results.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("sprint_results"))
df_status = df_status.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("status"))
Limpeza e padronização dos dados
# Mudando o nome das colunas para maiúscula e ajustanto os nomes das colunas de acordo com o dicionario de dados
#['circuitId', 'circuitRef', 'name', 'location', 'country', 'lat', 'lng', 'alt', 'url', 'data_hora_bronze', 'nome_arquivo', 'data_hora_silver']
df_circuits = ( df_circuits
.withColumnRenamed("circuitId" , "IDENTIFIER_CIRCUIT")
.withColumnRenamed("circuitRef" , "REFERENCE_CIRCUIT")
.withColumnRenamed("name" , "NAME")
.withColumnRenamed("location" , "LOCATION")
.withColumnRenamed("country" , "COUNTRY" )
.withColumnRenamed("lat" , "LATITUDE" )
.withColumnRenamed("lng" , "LONGITUDE" )
.withColumnRenamed("alt" , "ALTITUDE" )
.withColumnRenamed("url" , "URL" )
.withColumnRenamed("data_hora_bronze", "DATA_HORA_BRONZE")
.withColumnRenamed("nome_arquivo" , "NOME_ARQUIVO")
.withColumnRenamed("data_hora_silver" , "DATA_HORA_SILVER")
)
#['constructorResultsId', 'raceId', 'constructorId', 'points', 'status', 'data_hora_bronze', 'nome_arquivo', 'data_hora_silver']
df_constructor_results = ( df_constructor_results
.withColumnRenamed("constructorResultsId" , "IDENTIFIER_CONSTRUCTOR_RESULTS")
.withColumnRenamed("raceId" , "IDENTIFIER_RACE")
.withColumnRenamed("constructorId" , "IDENTIFIER_CONSTRUCTOR")
.withColumnRenamed("points" , "POINTS")
.withColumnRenamed("status" , "STATUS")
.withColumnRenamed("data_hora_bronze", "DATA_HORA_BRONZE")
.withColumnRenamed("nome_arquivo" , "NOME_ARQUIVO")
.withColumnRenamed("data_hora_silver" , "DATA_HORA_SILVER")
)
#['constructorStandingsId', 'raceId', 'constructorId', 'points', 'position', 'positionText', 'wins', 'data_hora_bronze', 'nome_arquivo', 'data_hora_silver']
df_constructor_standings = ( df_constructor_standings
.withColumnRenamed("constructorStandingsId" , "IDENTIFIER_CONSTRUCTOR_STANDINGS")
.withColumnRenamed("raceId" , "IDENTIFIER_RACE")
.withColumnRenamed("constructorId" , "IDENTIFIER_CONSTRUCTOR")
.withColumnRenamed("position" , "POSITION")
.withColumnRenamed("positionText" , "POSITION_TEXT")
.withColumnRenamed("wins" , "WINS")
.withColumnRenamed("data_hora_bronze", "DATA_HORA_BRONZE")
.withColumnRenamed("nome_arquivo" , "NOME_ARQUIVO")
.withColumnRenamed("data_hora_silver" , "DATA_HORA_SILVER")
)
#['constructorId', 'constructorRef', 'name', 'nationality', 'url', 'data_hora_bronze', 'nome_arquivo', 'data_hora_silver']
df_constructors = ( df_constructors
.withColumnRenamed("constructorId" , "IDENTIFIER_CONSTRUCTOR")
.withColumnRenamed("constructorRef" , "REFERENCE_CONSTRUCTOR")
.withColumnRenamed("name" , "NAME")
.withColumnRenamed("nationality" , "NATIONALITY")
.withColumnRenamed("url" , "URL")
.withColumnRenamed("data_hora_bronze", "DATA_HORA_BRONZE")
.withColumnRenamed("nome_arquivo" , "NOME_ARQUIVO")
.withColumnRenamed("data_hora_silver" , "DATA_HORA_SILVER")
)
#['driverStandingsId', 'raceId', 'driverId', 'points', 'position', 'positionText', 'wins', 'data_hora_bronze', 'nome_arquivo', 'data_hora_silver']
df_driver_standings = ( df_driver_standings
.withColumnRenamed("driverStandingsId" , "IDENTIFIER_DRIVER_STANDINGS")
.withColumnRenamed("raceId" , "IDENTIFIER_RACE")
.withColumnRenamed("driverId" , "IDENTIFIER_DRIVER")
.withColumnRenamed("points" , "POINTS")
.withColumnRenamed("position" , "POSITION")
.withColumnRenamed("positionText" , "POSITION_TEXT")
.withColumnRenamed("wins" , "WINS")
.withColumnRenamed("data_hora_bronze", "DATA_HORA_BRONZE")
.withColumnRenamed("nome_arquivo" , "NOME_ARQUIVO")
.withColumnRenamed("data_hora_silver" , "DATA_HORA_SILVER")
)
#['driverId', 'driverRef', 'number', 'code', 'forename', 'surname', 'dob', 'nationality', 'url', 'data_hora_bronze', 'nome_arquivo', 'data_hora_silver']
df_drivers = ( df_drivers
.withColumnRenamed("driverId" , "IDENTIFIER_DRIVER")
.withColumnRenamed("driverRef" , "REFERENCE_DRIVER")
.withColumnRenamed("number" , "NUMBER")
.withColumnRenamed("code" , "CODE")
.withColumnRenamed("forename" , "FORENAME")
.withColumnRenamed("surname" , "SURNAME")
.withColumnRenamed("dob" , "DOB")
.withColumnRenamed("nationality" , "NATIONALITY")
.withColumnRenamed("url" , "URL")
.withColumnRenamed("data_hora_bronze", "DATA_HORA_BRONZE")
.withColumnRenamed("nome_arquivo" , "NOME_ARQUIVO")
.withColumnRenamed("data_hora_silver" , "DATA_HORA_SILVER")
)
#['raceId', 'driverId', 'lap', 'position', 'time', 'milliseconds', 'data_hora_bronze', 'nome_arquivo', 'data_hora_silver']
df_lap_times = ( df_lap_times
.withColumnRenamed("RACEID" , "IDENTIFIER_RACE")
.withColumnRenamed("driverId" , "IDENTIFIER_DRIVER")
.withColumnRenamed("lap" , "LAP")
.withColumnRenamed("position" , "POSITION")
.withColumnRenamed("time" , "TIME")
.withColumnRenamed("milliseconds" , "MILLISECONDS")
.withColumnRenamed("data_hora_bronze", "DATA_HORA_BRONZE")
.withColumnRenamed("nome_arquivo" , "NOME_ARQUIVO")
.withColumnRenamed("data_hora_silver" , "DATA_HORA_SILVER")
)
#['raceId', 'driverId', 'stop', 'lap', 'time', 'duration', 'milliseconds', 'data_hora_bronze', 'nome_arquivo', 'data_hora_silver']
df_pit_stops = ( df_pit_stops
.withColumnRenamed("raceId" , "IDENTIFIER_RACE")
.withColumnRenamed("driverId" , "IDENTIFIER_DRIVER")
.withColumnRenamed("stop" , "STOP")
.withColumnRenamed("lap" , "LAP")
.withColumnRenamed("time" , "TIME")
.withColumnRenamed("duration" , "DURATION")
.withColumnRenamed("milliseconds" , "MILLISECONDS")
.withColumnRenamed("data_hora_bronze", "DATA_HORA_BRONZE")
.withColumnRenamed("nome_arquivo" , "NOME_ARQUIVO")
.withColumnRenamed("data_hora_silver" , "DATA_HORA_SILVER")
)
#['qualifyId', 'raceId', 'driverId', 'constructorId', 'number', 'position', 'q1', 'q2', 'q3', 'data_hora_bronze', 'nome_arquivo', 'data_hora_silver']
df_qualifying = ( df_qualifying
.withColumnRenamed("qualifyId" , "IDENTIFIER_QUALIFYING")
.withColumnRenamed("raceId" , "IDENTIFIER_RACE")
.withColumnRenamed("driverId" , "IDENTIFIER_DRIVER")
.withColumnRenamed("constructorId" , "IDENTIFIER_CONSTRUCTOR")
.withColumnRenamed("number" , "NUMBER")
.withColumnRenamed("position" , "POSITION")
.withColumnRenamed("q1" , "Q1")
.withColumnRenamed("q2" , "Q2")
.withColumnRenamed("q3" , "Q3")
.withColumnRenamed("data_hora_bronze", "DATA_HORA_BRONZE")
.withColumnRenamed("nome_arquivo" , "NOME_ARQUIVO")
.withColumnRenamed("data_hora_silver" , "DATA_HORA_SILVER")
)
#['raceId', 'year', 'round', 'circuitId', 'name', 'date', 'time', 'url', 'fp1_date', 'fp1_time', 'fp2_date', 'fp2_time', 'fp3_date', 'fp3_time', 'quali_date', 'quali_time', 'sprint_date', 'sprint_time', 'data_hora_bronze', 'nome_arquivo', 'data_hora_silver']
df_races = ( df_races
.withColumnRenamed("RACEID" , "IDENTIFIER_RACE")
.withColumnRenamed("year" , "YEAR")
.withColumnRenamed("round" , "ROUND")
.withColumnRenamed("circuitId" , "IDENTIFIER_CIRCUIT")
.withColumnRenamed("name" , "NAME")
.withColumnRenamed("date" , "DATE")
.withColumnRenamed("time" , "TIME")
.withColumnRenamed("url" , "URL")
.withColumnRenamed("fp1_date" , "FP1_DATE")
.withColumnRenamed("fp1_time" , "FP1_TIME")
.withColumnRenamed("fp2_date" , "FP2_DATE")
.withColumnRenamed("fp2_time" , "FP2_TIME")
.withColumnRenamed("fp3_date" , "FP3_DATE")
.withColumnRenamed("fp3_time" , "FP3_TIME")
.withColumnRenamed("quali_date" , "QUALI_DATE")
.withColumnRenamed("quali_time" , "QUALI_TIME")
.withColumnRenamed("sprint_date" , "SPRINT_DATE")
.withColumnRenamed("sprint_time" , "SPRINT_TIME")
.withColumnRenamed("data_hora_bronze", "DATA_HORA_BRONZE")
.withColumnRenamed("nome_arquivo" , "NOME_ARQUIVO")
.withColumnRenamed("data_hora_silver" , "DATA_HORA_SILVER")
)
#['resultId', 'raceId', 'driverId', 'constructorId', 'number', 'grid', 'position', 'positionText', 'positionOrder', 'points', 'laps', 'time', 'milliseconds', 'fastestLap', 'rank', 'fastestLapTime', 'fastestLapSpeed', 'statusId', 'data_hora_bronze', 'nome_arquivo', 'data_hora_silver']
df_results = ( df_results
.withColumnRenamed("resultId" , "IDENTIFIER_RESULT")
.withColumnRenamed("raceId" , "IDENTIFIER_RACE")
.withColumnRenamed("driverId" , "IDENTIFIER_DRIVER")
.withColumnRenamed("constructorId" , "IDENTIFIER_CONSTRUCTOR")
.withColumnRenamed("number" , "NUMBER")
.withColumnRenamed("grid" , "GRID")
.withColumnRenamed("position" , "POSITION")
.withColumnRenamed("positionText" , "POSITION_TEXT")
.withColumnRenamed("positionOrder" , "POSITION_ORDER")
.withColumnRenamed("points" , "POINTS")
.withColumnRenamed("laps" , "LAPS")
.withColumnRenamed("time" , "TIME")
.withColumnRenamed("milliseconds" , "MILLISECONDS")
.withColumnRenamed("fastestLap" , "FASTESTLAP")
.withColumnRenamed("rank" , "RANK")
.withColumnRenamed("fastestLapTime" , "FASTESTLAPTIME")
.withColumnRenamed("fastestLapSpeed" , "FASTESTLAPSPEED")
.withColumnRenamed("statusId" , "IDENTIFIER_STATUS")
.withColumnRenamed("data_hora_bronze", "DATA_HORA_BRONZE")
.withColumnRenamed("nome_arquivo" , "NOME_ARQUIVO")
.withColumnRenamed("data_hora_silver" , "DATA_HORA_SILVER")
)
#['year', 'url', 'data_hora_bronze', 'nome_arquivo', 'data_hora_silver']
df_seasons = (df_seasons
.withColumnRenamed("year" , "YEAR")
.withColumnRenamed("url" , "URL")
.withColumnRenamed("data_hora_bronze", "DATA_HORA_BRONZE")
.withColumnRenamed("nome_arquivo" , "NOME_ARQUIVO")
.withColumnRenamed("data_hora_silver" , "DATA_HORA_SILVER")
)
#['resultId', 'raceId', 'driverId', 'constructorId', 'number', 'grid', 'position', 'positionText', 'positionOrder', 'points', 'laps', 'time', 'milliseconds', 'fastestLap', 'fastestLapTime', 'statusId', 'data_hora_bronze', 'nome_arquivo', 'data_hora_silver']
df_sprint_results = ( df_sprint_results
.withColumnRenamed("resultId" , "IDENTIFIER_RESULT")
.withColumnRenamed("raceId" , "IDENTIFIER_RACE")
.withColumnRenamed("driverId" , "IDENTIFIER_DRIVER")
.withColumnRenamed("constructorId" , "IDENTIFIER_CONSTRUCTOR")
.withColumnRenamed("number" , "NUMBER")
.withColumnRenamed("grid" , "GRID")
.withColumnRenamed("position" , "POSITION")
.withColumnRenamed("positionText" , "POSITION_TEXT")
.withColumnRenamed("positionOrder" , "POSITION_ORDER")
.withColumnRenamed("points" , "POINTS")
.withColumnRenamed("laps" , "LAPS")
.withColumnRenamed("time" , "TIME")
.withColumnRenamed("milliseconds" , "MILLISECONDS")
.withColumnRenamed("fastestLap" , "FASTESTLAP")
.withColumnRenamed("fastestLapTime" , "FASTESTLAPTIME")
.withColumnRenamed("statusId" , "IDENTIFIER_STATUS")
.withColumnRenamed("data_hora_bronze", "DATA_HORA_BRONZE")
.withColumnRenamed("nome_arquivo" , "NOME_ARQUIVO")
.withColumnRenamed("data_hora_silver" , "DATA_HORA_SILVER")
)
#['statusId', 'status', 'data_hora_bronze', 'nome_arquivo', 'data_hora_silver']
df_status = ( df_status
.withColumnRenamed("statusId" , "IDENTIFIER_STATUS")
.withColumnRenamed("status" , "STATUS")
.withColumnRenamed("data_hora_bronze", "DATA_HORA_BRONZE")
.withColumnRenamed("nome_arquivo" , "NOME_ARQUIVO")
.withColumnRenamed("data_hora_silver" , "DATA_HORA_SILVER")
)
Salvando os dataframes em delta lake (formato de arquivo) no data lake (repositorio cloud)
df_circuits.write.format('delta').save(f"/mnt/{storageAccountName}/silver/circuits")
df_constructor_results.write.format('delta').save(f"/mnt/{storageAccountName}/silver/constructor_results")
df_constructor_standings.write.format('delta').save(f"/mnt/{storageAccountName}/silver/constructor_standings")
df_constructors.write.format('delta').save(f"/mnt/{storageAccountName}/silver/constructors")
df_driver_standings.write.format('delta').save(f"/mnt/{storageAccountName}/silver/driver_standings")
df_drivers.write.format('delta').save(f"/mnt/{storageAccountName}/silver/drivers")
df_lap_times.write.format('delta').save(f"/mnt/{storageAccountName}/silver/lap_times")
df_pit_stops.write.format('delta').save(f"/mnt/{storageAccountName}/silver/pit_stops")
df_qualifying.write.format('delta').save(f"/mnt/{storageAccountName}/silver/qualifying")
df_races.write.format('delta').save(f"/mnt/{storageAccountName}/silver/races")
df_results.write.format('delta').save(f"/mnt/{storageAccountName}/silver/results")
df_seasons.write.format('delta').save(f"/mnt/{storageAccountName}/silver/seasons")
df_sprint_results.write.format('delta').save(f"/mnt/{storageAccountName}/silver/sprint_results")
df_status.write.format('delta').save(f"/mnt/{storageAccountName}/silver/status")
Verificando os dados gravados em delta na camada silver
display(dbutils.fs.ls(f"/mnt/{storageAccountName}/silver/"))