Ir para o conteúdo

Camada Bronze

A camada Bronze realiza a leitura dos arquivos da Landing e os transforma em tabelas Delta.

Mostrando todos os arquivos da camada landing-zone

display(dbutils.fs.ls(f"/mnt/{storageAccountName}/landing-zone"))

Gerando um dataframe para cada arquivo a partir dos arquivos CSV gravado no container landing-zone do Azure Data Lake Storage

df_circuits = spark.read.option("infeschema", "true").option("header", "true").csv(f"/mnt/{storageAccountName}/landing-zone/circuits.csv")
df_constructor_results   = spark.read.option("infeschema", "true").option("header", "true").csv(f"/mnt/{storageAccountName}/landing-zone/constructor_results.csv")
df_constructor_standings = spark.read.option("infeschema", "true").option("header", "true").csv(f"/mnt/{storageAccountName}/landing-zone/constructor_standings.csv")
df_constructors     = spark.read.option("infeschema", "true").option("header", "true").csv(f"/mnt/{storageAccountName}/landing-zone/constructors.csv")
df_driver_standings = spark.read.option("infeschema", "true").option("header", "true").csv(f"/mnt/{storageAccountName}/landing-zone/driver_standings.csv")
df_drivers    = spark.read.option("infeschema", "true").option("header", "true").csv(f"/mnt/{storageAccountName}/landing-zone/drivers.csv")
df_lap_times  = spark.read.option("infeschema", "true").option("header", "true").csv(f"/mnt/{storageAccountName}/landing-zone/lap_times.csv")
df_pit_stops  = spark.read.option("infeschema", "true").option("header", "true").csv(f"/mnt/{storageAccountName}/landing-zone/pit_stops.csv")
df_qualifying = spark.read.option("infeschema", "true").option("header", "true").csv(f"/mnt/{storageAccountName}/landing-zone/qualifying.csv")
df_races   = spark.read.option("infeschema", "true").option("header", "true").csv(f"/mnt/{storageAccountName}/landing-zone/races.csv")
df_results = spark.read.option("infeschema", "true").option("header", "true").csv(f"/mnt/{storageAccountName}/landing-zone/results.csv")
df_seasons = spark.read.option("infeschema", "true").option("header", "true").csv(f"/mnt/{storageAccountName}/landing-zone/seasons.csv")
df_sprint_results = spark.read.option("infeschema", "true").option("header", "true").csv(f"/mnt/{storageAccountName}/landing-zone/sprint_results.csv")
df_status  = spark.read.option("infeschema", "true").option("header", "true").csv(f"/mnt/{storageAccountName}/landing-zone/status.csv")

Adicionando metadados de data e hora de processamento e nome do arquivo de origem

from pyspark.sql.functions import current_timestamp, lit

df_circuits = df_circuits.withColumn("data_hora_bronze", current_timestamp()).withColumn("nome_arquivo", lit("circuits.csv"))
df_constructor_results   = df_constructor_results.withColumn("data_hora_bronze", current_timestamp()).withColumn("nome_arquivo", lit("constructor_results.csv"))
df_constructor_standings = df_constructor_standings.withColumn("data_hora_bronze", current_timestamp()).withColumn("nome_arquivo", lit("constructor_standings.csv"))
df_constructors  = df_constructors.withColumn("data_hora_bronze", current_timestamp()).withColumn("nome_arquivo", lit("constructors.csv"))
df_driver_standings = df_driver_standings.withColumn("data_hora_bronze", current_timestamp()).withColumn("nome_arquivo", lit("driver_standings.csv"))
df_drivers       = df_drivers.withColumn("data_hora_bronze", current_timestamp()).withColumn("nome_arquivo", lit("drivers.csv"))
df_lap_times     = df_lap_times.withColumn("data_hora_bronze", current_timestamp()).withColumn("nome_arquivo", lit("lap_times.csv"))
df_pit_stops     = df_pit_stops.withColumn("data_hora_bronze", current_timestamp()).withColumn("nome_arquivo", lit("pit_stops.csv"))
df_qualifying    = df_qualifying.withColumn("data_hora_bronze", current_timestamp()).withColumn("nome_arquivo", lit("qualifying.csv"))
df_races         = df_races.withColumn("data_hora_bronze", current_timestamp()).withColumn("nome_arquivo", lit("races.csv"))
df_results       = df_results.withColumn("data_hora_bronze", current_timestamp()).withColumn("nome_arquivo", lit("results.csv"))
df_seasons       = df_seasons.withColumn("data_hora_bronze", current_timestamp()).withColumn("nome_arquivo", lit("seasons.csv"))
df_sprint_results = df_sprint_results.withColumn("data_hora_bronze", current_timestamp()).withColumn("nome_arquivo", lit("sprint_results.csv"))
df_status        = df_status.withColumn("data_hora_bronze", current_timestamp()).withColumn("nome_arquivo", lit("status.csv"))

Salvando os dataframes em delta lake (formato de arquivo) no data lake (repositorio cloud)

df_circuits.write.format('delta').save(f"/mnt/{storageAccountName}/bronze/circuits")
df_constructor_results.write.format('delta').save(f"/mnt/{storageAccountName}/bronze/constructor_results")
df_constructor_standings.write.format('delta').save(f"/mnt/{storageAccountName}/bronze/constructor_standings")
df_constructors.write.format('delta').save(f"/mnt/{storageAccountName}/bronze/constructors")
df_driver_standings.write.format('delta').save(f"/mnt/{storageAccountName}/bronze/driver_standings")
df_drivers.write.format('delta').save(f"/mnt/{storageAccountName}/bronze/drivers")
df_lap_times.write.format('delta').save(f"/mnt/{storageAccountName}/bronze/lap_times")
df_pit_stops.write.format('delta').save(f"/mnt/{storageAccountName}/bronze/pit_stops")
df_qualifying.write.format('delta').save(f"/mnt/{storageAccountName}/bronze/qualifying")
df_races.write.format('delta').save(f"/mnt/{storageAccountName}/bronze/races")
df_results.write.format('delta').save(f"/mnt/{storageAccountName}/bronze/results")
df_seasons.write.format('delta').save(f"/mnt/{storageAccountName}/bronze/seasons")
df_sprint_results.write.format('delta').save(f"/mnt/{storageAccountName}/bronze/sprint_results")
df_status.write.format('delta').save(f"/mnt/{storageAccountName}/bronze/status")

Verificando os dados gravados em delta na camada bronze

display(dbutils.fs.ls(f"/mnt/{storageAccountName}/bronze/"))

Lendo um exemplo de um delta lake para validar a existencia dos dados e das colunas do metadados

spark.read.format('delta').load(f'/mnt/{storageAccountName}/bronze/drivers').limit(10).display()