Lakehouse no Databricks com PySpark Core
Ingestão de CSV/JSON, schema complexo, transformação sem SQL, Parquet e organização mínima de projeto
Este conjunto de questões foi elaborado para conduzir o leitor por um percurso que vai além da execução mecânica de comandos em notebook. Ao longo das atividades, o desafio não será apenas fazer o código rodar, mas compreender o problema, interpretar o enunciado com precisão, escolher dados adequados, aplicar transformações coerentes e demonstrar, com evidências, que cada decisão técnica foi correta. Em outras palavras, o que se exige aqui não é só uso instrumental do Databricks ou do PySpark, mas raciocínio de engenharia de dados: ler, transformar, validar, persistir e apresentar.
Nas etapas seguintes, o leitor enfrentará problemas típicos de ambientes reais de dados. Será preciso lidar com diferentes formatos de arquivo, como CSV e JSON, interpretar estruturas simples e aninhadas, reconhecer o papel do schema na leitura correta dos dados, filtrar registros inválidos, selecionar atributos realmente relevantes, tratar valores ausentes com critério e persistir o resultado em formato mais adequado ao processamento analítico. Além disso, as questões exigem um passo importante de maturidade técnica: sair do notebook como espaço único de trabalho e começar a organizar a solução como projeto, separando lógica em módulos Python e declarando dependências de forma minimamente profissional.
Se conseguir resolver todas as questões, o leitor demonstrará competências centrais de um praticante de engenharia de dados em ambiente Lakehouse. Entre elas, estarão a capacidade de trabalhar com PySpark Core usando a API de DataFrames sem depender de SQL, ler e interpretar schemas complexos, manipular dados semi-estruturados, aplicar limpeza orientada a contexto, persistir dados de forma reprodutível e estruturar entregas técnicas de modo mais claro, organizado e defensável. Também terá desenvolvido uma habilidade que diferencia quem apenas executa de quem realmente entende: a capacidade de justificar tecnicamente suas escolhas e provar, por meio de checkpoints e validações, que a solução atende ao que foi pedido.
Mais do que respostas isoladas, estas questões treinam um modo de pensar. Quem percorre esse caminho com rigor começa a adquirir visão de fluxo: entende que ingestão, transformação, persistência e organização do código não são etapas desconectadas, mas partes de uma mesma solução. Passa também a desenvolver qualidades valiosas em contexto profissional, como atenção ao enunciado, disciplina de validação, clareza na apresentação da solução, cuidado com evidências e senso de responsabilidade técnica. Em termos práticos, isso significa sair de um nível em que se "roda exemplos" para um nível em que se consegue construir, explicar e defender uma solução de dados com mais autonomia.
Em síntese, quem resolver satisfatoriamente este conjunto de questões terá dado um passo importante na transição entre o uso introdutório de notebooks e uma prática mais sólida de engenharia de dados no Databricks. Terá aprendido não apenas a chegar ao resultado, mas a mostrar por que aquele resultado faz sentido, como foi obtido e quais evidências sustentam sua validade. É esse tipo de competência, técnica, organizada e verificável, que as atividades a seguir buscam formar.
O enunciado cobra cinco movimentos encadeados: carregar dados reais, provar entendimento do schema, transformar com PySpark puro, persistir em Parquet e organizar isso como projeto Python mínimo. O caminho mais seguro é montar um fluxo único e reaproveitar o mesmo DataFrame ao longo da resolução.
O dataset ideal deve ter pelo menos um CSV, pelo menos um JSON, algum aninhamento no JSON, uma coluna de status para identificar sensores defeituosos, alguns nulos para demonstrar tratamento e colunas numéricas de leitura. O notebook deve seguir esta ordem: leitura dos arquivos, inspeção do schema, transformação com DataFrame API, persistência em Parquet, uso do módulo .py e evidências finais.
A primeira etapa consiste em carregar um dataset de sensores, composto por um CSV estruturado e um JSON potencialmente semi-estruturado, como DataFrames do Spark. Não basta ler os arquivos: é necessário exibir schema e amostra para provar que a leitura ocorreu corretamente.
Na defesa, o ponto central é mostrar que o CSV foi tratado como dado estruturado, que o JSON pode exigir cuidado adicional e que as opções de leitura adotadas têm justificativa técnica.
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
csv_path = "/FileStore/tables/sensores/sensores.csv"
json_path = "/FileStore/tables/sensores/sensores.json"
df_csv = (
spark.read
.option("header", True)
.option("inferSchema", True)
.csv(csv_path)
)
df_json = (
spark.read
.option("multiLine", True)
.json(json_path)
)
print("CSV:")
df_csv.printSchema()
df_csv.show(5, truncate=False)
print("JSON:")
df_json.printSchema()
df_json.show(5, truncate=False)Nesta etapa, o foco é provar que o leitor não apenas carregou o JSON, mas entendeu a sua estrutura. O procedimento correto é inspecionar o schema, identificar os subcampos relevantes e então acessá-los com select e col.
Esse ponto avalia leitura e interpretação de schema, não apenas sintaxe.
from pyspark.sql.functions import col
df_json.printSchema()
df_json.select(
col("device.id").alias("sensor_id"),
col("device.type").alias("sensor_type"),
col("reading.temperature").alias("temperature"),
col("reading.humidity").alias("humidity"),
col("metadata.location").alias("location"),
col("status"),
col("timestamp")
).show(5, truncate=False)printSchema() é pouco; o ganho real está em provar que você sabe navegar no schema.O enunciado foi explícito: sem SQL. A etapa 2.1 será resolvida com uso combinado de withColumn(), filter() e, na sequência, select(), sempre pela DataFrame API. Aqui o fluxo começa pela preparação de um DataFrame base a partir do JSON, com normalização de campos e filtragem de sensores defeituosos.
Mais importante que "fazer funcionar" é justificar tecnicamente a normalização de status, evitando inconsistências por espaços e variação de maiúsculas/minúsculas. O valor "defeituoso" é apenas um exemplo e deve ser adaptado ao marcador real usado no dataset.
from pyspark.sql.functions import col, lower, trim
df_base = (
df_json
.withColumn("sensor_id", col("device.id"))
.withColumn("sensor_type", col("device.type"))
.withColumn("temperature", col("reading.temperature").cast("double"))
.withColumn("humidity", col("reading.humidity").cast("double"))
.withColumn("location", col("metadata.location"))
.withColumn("status_norm", lower(trim(col("status"))))
)
df_ok = df_base.filter(col("status_norm") != "defeituoso")
df_base.groupBy("status_norm").count().show()
df_ok.groupBy("status_norm").count().show()A etapa de seleção é onde você transforma um DataFrame amplo em estrutura objetiva para processamento e defesa. Coluna relevante é coluna usada: identificador, contexto, tempo e medidas principais.
Esse recorte deixa claro que houve critério técnico, não apenas carregamento bruto do JSON. A escolha também se justifica pela finalidade analítica e operacional: manter rastreabilidade do sensor, referência temporal e métricas que realmente serão persistidas e analisadas.
df_sel = df_ok.select(
"sensor_id",
"sensor_type",
"timestamp",
"location",
"temperature",
"humidity"
)
df_sel.printSchema()
df_sel.show(5, truncate=False)na.fill e na.drop de forma justificada.Nesta etapa, a regra prática é separar campos críticos de medidas numéricas. Para campos-chave como sensor_id e timestamp, descarte é normalmente o caminho mais seguro. Para medidas, preenchimento por média costuma ser defensável.
O enunciado aceitava tanto na.fill quanto na.drop; aqui a solução combina os dois com critério técnico. Feche a questão com evidência de nulos após o tratamento para provar o resultado.
from pyspark.sql.functions import col, avg, count, when
media_temp = df_sel.select(avg("temperature").alias("media_temp")).first()["media_temp"]
media_humidity = df_sel.select(avg("humidity").alias("media_humidity")).first()["media_humidity"]
df_clean = (
df_sel
.na.fill({
"temperature": media_temp,
"humidity": media_humidity
})
.na.drop(subset=["sensor_id", "timestamp"])
)
df_clean.select([
count(when(col(c).isNull(), c)).alias(c) for c in df_clean.columns
]).show()na.drop() global sem justificativa; isso costuma eliminar linhas demais e enfraquece sua argumentação.A persistência em Parquet deve ser apresentada como etapa técnica: formato colunar, bom para análise e reexecução controlada do processo. A gravação precisa ser seguida de releitura para validar que a escrita realmente funcionou.
O exemplo usa um caminho temporário em DBFS, mas o mesmo raciocínio vale para S3 quando o ambiente estiver configurado com armazenamento externo. Sem releitura, a demonstração fica incompleta.
output_path = "dbfs:/tmp/tp2_sensores_parquet"
df_clean.write.mode("overwrite").parquet(output_path)
df_parquet = spark.read.parquet(output_path)
df_parquet.printSchema()
df_parquet.show(5, truncate=False)
display(dbutils.fs.ls(output_path))Esta etapa, junto da próxima, responde ao item 4.1 do enunciado. O objetivo é extrair a lógica de limpeza para um módulo .py, manter o notebook como orquestrador e favorecer reutilização.
O avaliador quer ver separação de responsabilidades, reaproveitamento e coerência entre notebook, módulo e configuração. Na prática, o notebook principal deve importar esse módulo e chamar a pipeline de limpeza, em vez de duplicar a lógica.
# src/cleaning.py
from pyspark.sql.functions import col, lower, trim, avg
def preparar_base(df):
return (
df
.withColumn("sensor_id", col("device.id"))
.withColumn("sensor_type", col("device.type"))
.withColumn("temperature", col("reading.temperature").cast("double"))
.withColumn("humidity", col("reading.humidity").cast("double"))
.withColumn("location", col("metadata.location"))
.withColumn("status_norm", lower(trim(col("status"))))
)
def filtrar_defeituosos(df):
return df.filter(col("status_norm") != "defeituoso")
def selecionar_colunas(df):
return df.select(
"sensor_id",
"sensor_type",
"timestamp",
"location",
"temperature",
"humidity"
)
def tratar_nulos(df):
media_temp = df.select(avg("temperature").alias("media_temp")).first()["media_temp"]
media_humidity = df.select(avg("humidity").alias("media_humidity")).first()["media_humidity"]
return (
df
.na.fill({
"temperature": media_temp,
"humidity": media_humidity
})
.na.drop(subset=["sensor_id", "timestamp"])
)
def pipeline_limpeza(df):
df1 = preparar_base(df)
df2 = filtrar_defeituosos(df1)
df3 = selecionar_colunas(df2)
df4 = tratar_nulos(df3)
return df4.py que não é usado pelo notebook passa sensação de encenação técnica.pyproject.toml e fechar a defesa com checkpoints.Esta continuação fecha o item 4.1 do enunciado. O pyproject.toml deve ser simples, coerente e suficiente para o contexto do exercício. Além disso, a entrega precisa ser defensável: notebook, arquivos lidos, schema, transformações, Parquet gravado, módulo Python e configuração de projeto.
A defesa boa não é a que lê código linha por linha, mas a que mostra raciocínio, justificativa e checkpoints objetivos.
[project]
name = "tp2-sensores-databricks"
version = "0.1.0"
description = "Projeto de engenharia de dados com PySpark Core no Databricks"
requires-python = ">=3.10"
dependencies = [
"pyspark"
]
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"