Lakehouse no Databricks com PySpark Core

Ingestão de CSV/JSON, schema complexo, transformação sem SQL, Parquet e organização mínima de projeto

Por Fábio Linhares • Instituto Infnet

Do notebook executável à solução técnica defensável

Este conjunto de questões foi elaborado para conduzir o leitor por um percurso que vai além da execução mecânica de comandos em notebook. Ao longo das atividades, o desafio não será apenas fazer o código rodar, mas compreender o problema, interpretar o enunciado com precisão, escolher dados adequados, aplicar transformações coerentes e demonstrar, com evidências, que cada decisão técnica foi correta. Em outras palavras, o que se exige aqui não é só uso instrumental do Databricks ou do PySpark, mas raciocínio de engenharia de dados: ler, transformar, validar, persistir e apresentar.

Nas etapas seguintes, o leitor enfrentará problemas típicos de ambientes reais de dados. Será preciso lidar com diferentes formatos de arquivo, como CSV e JSON, interpretar estruturas simples e aninhadas, reconhecer o papel do schema na leitura correta dos dados, filtrar registros inválidos, selecionar atributos realmente relevantes, tratar valores ausentes com critério e persistir o resultado em formato mais adequado ao processamento analítico. Além disso, as questões exigem um passo importante de maturidade técnica: sair do notebook como espaço único de trabalho e começar a organizar a solução como projeto, separando lógica em módulos Python e declarando dependências de forma minimamente profissional.

Se conseguir resolver todas as questões, o leitor demonstrará competências centrais de um praticante de engenharia de dados em ambiente Lakehouse. Entre elas, estarão a capacidade de trabalhar com PySpark Core usando a API de DataFrames sem depender de SQL, ler e interpretar schemas complexos, manipular dados semi-estruturados, aplicar limpeza orientada a contexto, persistir dados de forma reprodutível e estruturar entregas técnicas de modo mais claro, organizado e defensável. Também terá desenvolvido uma habilidade que diferencia quem apenas executa de quem realmente entende: a capacidade de justificar tecnicamente suas escolhas e provar, por meio de checkpoints e validações, que a solução atende ao que foi pedido.

Mais do que respostas isoladas, estas questões treinam um modo de pensar. Quem percorre esse caminho com rigor começa a adquirir visão de fluxo: entende que ingestão, transformação, persistência e organização do código não são etapas desconectadas, mas partes de uma mesma solução. Passa também a desenvolver qualidades valiosas em contexto profissional, como atenção ao enunciado, disciplina de validação, clareza na apresentação da solução, cuidado com evidências e senso de responsabilidade técnica. Em termos práticos, isso significa sair de um nível em que se "roda exemplos" para um nível em que se consegue construir, explicar e defender uma solução de dados com mais autonomia.

Em síntese, quem resolver satisfatoriamente este conjunto de questões terá dado um passo importante na transição entre o uso introdutório de notebooks e uma prática mais sólida de engenharia de dados no Databricks. Terá aprendido não apenas a chegar ao resultado, mas a mostrar por que aquele resultado faz sentido, como foi obtido e quais evidências sustentam sua validade. É esse tipo de competência, técnica, organizada e verificável, que as atividades a seguir buscam formar.

EXERCÍCIO 01
Estratégia correta antes de codar
Definir um caminho de resolução coerente antes de implementar as questões.

O enunciado cobra cinco movimentos encadeados: carregar dados reais, provar entendimento do schema, transformar com PySpark puro, persistir em Parquet e organizar isso como projeto Python mínimo. O caminho mais seguro é montar um fluxo único e reaproveitar o mesmo DataFrame ao longo da resolução.

O dataset ideal deve ter pelo menos um CSV, pelo menos um JSON, algum aninhamento no JSON, uma coluna de status para identificar sensores defeituosos, alguns nulos para demonstrar tratamento e colunas numéricas de leitura. O notebook deve seguir esta ordem: leitura dos arquivos, inspeção do schema, transformação com DataFrame API, persistência em Parquet, uso do módulo .py e evidências finais.

Escolha um dataset que permita demonstrar naturalmente todas as exigências do enunciado; se o dataset for "bonito demais", a resposta fica artificial.
EXERCÍCIO 02
Carregar CSV e JSON para DataFrames do Spark
Mostrar como ler formatos diferentes com PySpark e provar que a ingestão funcionou.

A primeira etapa consiste em carregar um dataset de sensores, composto por um CSV estruturado e um JSON potencialmente semi-estruturado, como DataFrames do Spark. Não basta ler os arquivos: é necessário exibir schema e amostra para provar que a leitura ocorreu corretamente.

Na defesa, o ponto central é mostrar que o CSV foi tratado como dado estruturado, que o JSON pode exigir cuidado adicional e que as opções de leitura adotadas têm justificativa técnica.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

csv_path = "/FileStore/tables/sensores/sensores.csv"
json_path = "/FileStore/tables/sensores/sensores.json"

df_csv = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(csv_path)
)

df_json = (
    spark.read
    .option("multiLine", True)
    .json(json_path)
)

print("CSV:")
df_csv.printSchema()
df_csv.show(5, truncate=False)

print("JSON:")
df_json.printSchema()
df_json.show(5, truncate=False)
Ler o arquivo e não mostrar schema nem amostra equivale a pedir que o avaliador acredite em você sem evidência.
EXERCÍCIO 03
Demonstrar leitura correta de schema complexo
Ensinar a interpretar JSON aninhado e acessar subcampos de forma explícita.

Nesta etapa, o foco é provar que o leitor não apenas carregou o JSON, mas entendeu a sua estrutura. O procedimento correto é inspecionar o schema, identificar os subcampos relevantes e então acessá-los com select e col.

Esse ponto avalia leitura e interpretação de schema, não apenas sintaxe.

from pyspark.sql.functions import col

df_json.printSchema()

df_json.select(
    col("device.id").alias("sensor_id"),
    col("device.type").alias("sensor_type"),
    col("reading.temperature").alias("temperature"),
    col("reading.humidity").alias("humidity"),
    col("metadata.location").alias("location"),
    col("status"),
    col("timestamp")
).show(5, truncate=False)
Mostrar apenas o printSchema() é pouco; o ganho real está em provar que você sabe navegar no schema.
EXERCÍCIO 04 (2.1.1)
Filtrar leituras de sensores defeituosos
Aplicar filtragem com DataFrame API após normalização de status.

O enunciado foi explícito: sem SQL. A etapa 2.1 será resolvida com uso combinado de withColumn(), filter() e, na sequência, select(), sempre pela DataFrame API. Aqui o fluxo começa pela preparação de um DataFrame base a partir do JSON, com normalização de campos e filtragem de sensores defeituosos.

Mais importante que "fazer funcionar" é justificar tecnicamente a normalização de status, evitando inconsistências por espaços e variação de maiúsculas/minúsculas. O valor "defeituoso" é apenas um exemplo e deve ser adaptado ao marcador real usado no dataset.

from pyspark.sql.functions import col, lower, trim

df_base = (
    df_json
    .withColumn("sensor_id", col("device.id"))
    .withColumn("sensor_type", col("device.type"))
    .withColumn("temperature", col("reading.temperature").cast("double"))
    .withColumn("humidity", col("reading.humidity").cast("double"))
    .withColumn("location", col("metadata.location"))
    .withColumn("status_norm", lower(trim(col("status"))))
)

df_ok = df_base.filter(col("status_norm") != "defeituoso")

df_base.groupBy("status_norm").count().show()
df_ok.groupBy("status_norm").count().show()
O erro clássico aqui é filtrar direto em coluna suja; normalize primeiro para evitar falso positivo e falso negativo.
EXERCÍCIO 05 (2.1.2)
Selecionar apenas as colunas relevantes
Reduzir o DataFrame ao que realmente será usado na análise.

A etapa de seleção é onde você transforma um DataFrame amplo em estrutura objetiva para processamento e defesa. Coluna relevante é coluna usada: identificador, contexto, tempo e medidas principais.

Esse recorte deixa claro que houve critério técnico, não apenas carregamento bruto do JSON. A escolha também se justifica pela finalidade analítica e operacional: manter rastreabilidade do sensor, referência temporal e métricas que realmente serão persistidas e analisadas.

df_sel = df_ok.select(
    "sensor_id",
    "sensor_type",
    "timestamp",
    "location",
    "temperature",
    "humidity"
)

df_sel.printSchema()
df_sel.show(5, truncate=False)
Manter colunas intermediárias sem necessidade enfraquece a defesa; mostre que o schema final foi intencional.
EXERCÍCIO 06 (2.1.3)
Tratar valores nulos com critério técnico
Combinar na.fill e na.drop de forma justificada.

Nesta etapa, a regra prática é separar campos críticos de medidas numéricas. Para campos-chave como sensor_id e timestamp, descarte é normalmente o caminho mais seguro. Para medidas, preenchimento por média costuma ser defensável.

O enunciado aceitava tanto na.fill quanto na.drop; aqui a solução combina os dois com critério técnico. Feche a questão com evidência de nulos após o tratamento para provar o resultado.

from pyspark.sql.functions import col, avg, count, when

media_temp = df_sel.select(avg("temperature").alias("media_temp")).first()["media_temp"]
media_humidity = df_sel.select(avg("humidity").alias("media_humidity")).first()["media_humidity"]

df_clean = (
    df_sel
    .na.fill({
        "temperature": media_temp,
        "humidity": media_humidity
    })
    .na.drop(subset=["sensor_id", "timestamp"])
)

df_clean.select([
    count(when(col(c).isNull(), c)).alias(c) for c in df_clean.columns
]).show()
Evite na.drop() global sem justificativa; isso costuma eliminar linhas demais e enfraquece sua argumentação.
EXERCÍCIO 07
Persistir o resultado em Parquet
Gravar o DataFrame tratado em formato colunar e validar a persistência.

A persistência em Parquet deve ser apresentada como etapa técnica: formato colunar, bom para análise e reexecução controlada do processo. A gravação precisa ser seguida de releitura para validar que a escrita realmente funcionou.

O exemplo usa um caminho temporário em DBFS, mas o mesmo raciocínio vale para S3 quando o ambiente estiver configurado com armazenamento externo. Sem releitura, a demonstração fica incompleta.

output_path = "dbfs:/tmp/tp2_sensores_parquet"

df_clean.write.mode("overwrite").parquet(output_path)

df_parquet = spark.read.parquet(output_path)

df_parquet.printSchema()
df_parquet.show(5, truncate=False)

display(dbutils.fs.ls(output_path))
Gravar e não reler é como salvar um arquivo sem nunca verificar se ele existe.
EXERCÍCIO 08
Organizar a solução como projeto Python mínimo
Extrair a lógica principal do notebook para módulo reutilizável.

Esta etapa, junto da próxima, responde ao item 4.1 do enunciado. O objetivo é extrair a lógica de limpeza para um módulo .py, manter o notebook como orquestrador e favorecer reutilização.

O avaliador quer ver separação de responsabilidades, reaproveitamento e coerência entre notebook, módulo e configuração. Na prática, o notebook principal deve importar esse módulo e chamar a pipeline de limpeza, em vez de duplicar a lógica.

# src/cleaning.py
from pyspark.sql.functions import col, lower, trim, avg

def preparar_base(df):
    return (
        df
        .withColumn("sensor_id", col("device.id"))
        .withColumn("sensor_type", col("device.type"))
        .withColumn("temperature", col("reading.temperature").cast("double"))
        .withColumn("humidity", col("reading.humidity").cast("double"))
        .withColumn("location", col("metadata.location"))
        .withColumn("status_norm", lower(trim(col("status"))))
    )

def filtrar_defeituosos(df):
    return df.filter(col("status_norm") != "defeituoso")

def selecionar_colunas(df):
    return df.select(
        "sensor_id",
        "sensor_type",
        "timestamp",
        "location",
        "temperature",
        "humidity"
    )

def tratar_nulos(df):
    media_temp = df.select(avg("temperature").alias("media_temp")).first()["media_temp"]
    media_humidity = df.select(avg("humidity").alias("media_humidity")).first()["media_humidity"]

    return (
        df
        .na.fill({
            "temperature": media_temp,
            "humidity": media_humidity
        })
        .na.drop(subset=["sensor_id", "timestamp"])
    )

def pipeline_limpeza(df):
    df1 = preparar_base(df)
    df2 = filtrar_defeituosos(df1)
    df3 = selecionar_colunas(df2)
    df4 = tratar_nulos(df3)
    return df4
Criar um .py que não é usado pelo notebook passa sensação de encenação técnica.
EXERCÍCIO 09
Declarar dependências e evidenciar a entrega
Completar o projeto com pyproject.toml e fechar a defesa com checkpoints.

Esta continuação fecha o item 4.1 do enunciado. O pyproject.toml deve ser simples, coerente e suficiente para o contexto do exercício. Além disso, a entrega precisa ser defensável: notebook, arquivos lidos, schema, transformações, Parquet gravado, módulo Python e configuração de projeto.

A defesa boa não é a que lê código linha por linha, mas a que mostra raciocínio, justificativa e checkpoints objetivos.

[project]
name = "tp2-sensores-databricks"
version = "0.1.0"
description = "Projeto de engenharia de dados com PySpark Core no Databricks"
requires-python = ">=3.10"
dependencies = [
  "pyspark"
]

[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
No fechamento, destaque as armadilhas: não usar SQL quando proibido, não esconder schema, não pular a releitura do Parquet e não deixar o projeto desorganizado.