Lakeflow Declarative Pipelines (LDP), anteriormente chamado de Delta Live Tables (DLT), é o framework declarativo do Databricks para construir pipelines de dados confiáveis. Você declara o que as tabelas devem conter; o Databricks gerencia como executar, orquestrar, escalar e re-executar.

Por que usar LDP em vez de notebooks Spark puros

AspectoSpark puro (notebooks/jobs)Lakeflow Declarative Pipelines
Dependências entre tabelasManual (você define a ordem)Automático (inferido pelo grafo)
Qualidade de dadosCódigo customExpectations declarativas nativas
Retry e recoveryManualAutomático
MonitoramentoLogs do jobDashboard de pipeline + event log
IdempotênciaVocê garanteGarantido pelo framework
Streaming + batchDois jobs separadosUm único pipeline (unified)

Conceitos fundamentais

  • Pipeline: conjunto de tabelas e views relacionadas, executadas como unidade
  • Streaming Table: tabela atualizada incrementalmente (append / CDC), processando apenas dados novos
  • Materialized View: tabela recalculada completamente a cada execução (como uma view materializada)
  • View: view temporária dentro do pipeline (não persiste como tabela)
  • Expectations: regras de qualidade declarativas aplicadas durante a execução

Estrutura básica de um pipeline LDP

# src/pipelines/bronze_pedidos.py
import dlt
from pyspark.sql import functions as F
 
# 1. Ingestão Auto Loader → Streaming Table (Bronze)
@dlt.table(
    name="bronze_pedidos_raw",
    comment="Ingestão bruta de pedidos via Auto Loader, append-only",
    table_properties={"quality": "bronze"},
)
def bronze_pedidos_raw():
    return (
        spark.readStream
        .format("cloudFiles")                          # Auto Loader
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation", "/Volumes/bronze/schemas/pedidos/")
        .load("/Volumes/bronze/landing/pedidos/")
    )
 
 
# 2. Limpeza → Streaming Table (Silver) com Expectations
@dlt.table(
    name="silver_pedidos",
    comment="Pedidos limpos e validados, camada Silver",
    partition_cols=["ano", "mes"],
)
@dlt.expect_or_drop("pedido_id não nulo", "pedido_id IS NOT NULL")
@dlt.expect_or_drop("valor positivo",     "valor_total > 0")
@dlt.expect("status válido",              "status IN ('pendente', 'concluido', 'cancelado')")
def silver_pedidos():
    return (
        dlt.read_stream("bronze_pedidos_raw")
        .withColumn("data_pedido", F.to_date("data_pedido_str", "yyyy-MM-dd"))
        .withColumn("ano",  F.year("data_pedido"))
        .withColumn("mes",  F.month("data_pedido"))
        .select("pedido_id", "cliente_id", "data_pedido", "ano", "mes", "valor_total", "status")
    )
 
 
# 3. Agregação → Materialized View (Gold)
@dlt.table(
    name="gold_kpis_diarios",
    comment="KPIs diários de vendas, camada Gold",
)
def gold_kpis_diarios():
    return (
        dlt.read("silver_pedidos")
        .filter(F.col("status") == "concluido")
        .groupBy("data_pedido")
        .agg(
            F.sum("valor_total").alias("receita_total"),
            F.count("pedido_id").alias("qtd_pedidos"),
        )
    )

Expectations (qualidade de dados)

Expectations são constraints declaradas com decoradores. O comportamento quando violadas é configurável:

DecoradorComportamento ao violar
@dlt.expect("nome", "condição")Registra violação nas métricas, mantém a linha
@dlt.expect_or_drop("nome", "condição")Remove a linha (quarentena)
@dlt.expect_or_fail("nome", "condição")Falha o pipeline (stop on error)
@dlt.expect_all(dict)Múltiplas expectations com o mesmo comportamento
@dlt.expect_all_or_drop(dict)Múltiplas, remove linhas inválidas
@dlt.table
@dlt.expect_all_or_drop({
    "pedido_id não nulo": "pedido_id IS NOT NULL",
    "valor positivo":     "valor_total > 0",
    "status válido":      "status IN ('pendente', 'concluido', 'cancelado')",
    "data válida":        "data_pedido >= '2020-01-01'",
})
def silver_pedidos():
    ...

Métricas de violação ficam disponíveis no event log da pipeline e no dashboard de qualidade.

Auto Loader

Auto Loader (cloudFiles) é a forma nativa e eficiente de ingerir arquivos novos incrementalmente de object storage.

@dlt.table
def bronze_raw():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")           # csv, parquet, avro, text...
        .option("cloudFiles.schemaLocation", "/Volumes/bronze/schemas/raw/")
        .option("cloudFiles.inferColumnTypes", "true")
        # Opcionalmente, schema explícito:
        # .schema(schema_explicito)
        .load("/Volumes/bronze/landing/raw/")
    )
  • Detecta arquivos novos sem precisar fazer LIST completo (usa notificações do cloud)
  • Schema inference automática com evolução de schema
  • Checkpointing automático, sem reprocessar arquivos já lidos

Apply Changes (CDC)

Para pipelines CDC (Change Data Capture), LDP tem suporte nativo via apply_changes:

dlt.create_streaming_table("silver_clientes_scd1")
 
dlt.apply_changes(
    target="silver_clientes_scd1",
    source="bronze_clientes_cdc",       # tabela com eventos CDC
    keys=["cliente_id"],                 # chave de unicidade
    sequence_by="timestamp",             # campo para ordenar eventos
    apply_as_deletes=F.expr("op = 'D'"),
    except_column_list=["op", "timestamp"],
    stored_as_scd_type=1,               # SCD Type 1 (sobrescreve) ou 2 (histórico)
)

Com scd_type=2, LDP mantém histórico completo de mudanças com colunas __START_AT e __END_AT.

Modos de execução

ModoQuando usar
TriggeredExecuta uma vez e para, como um job batch agendado
ContinuousFica rodando, processa novos dados com baixa latência (~segundos)

Configurado no pipeline ou no DAB (continuous: true/false).

Nomenclatura de tabelas

Tabelas LDP vivem no Unity Catalog dentro do catalog.schema configurado no pipeline:

catalog: gold
schema: vendas
→ tabelas: gold.vendas.bronze_pedidos_raw
           gold.vendas.silver_pedidos
           gold.vendas.gold_kpis_diarios

Referências entre notebooks do mesmo pipeline

# notebook A define bronze_pedidos_raw
# notebook B pode referenciar:
dlt.read("bronze_pedidos_raw")          # batch
dlt.read_stream("bronze_pedidos_raw")   # streaming
 
# Tabelas externas (fora do pipeline): acesso normal via Spark
df = spark.table("silver.outros.dim_clientes")

Monitoramento e event log

Cada execução gera um event log com métricas detalhadas:

-- Event log gerado automaticamente pelo LDP
SELECT
  timestamp,
  event_type,
  details:flow_progress.metrics.num_output_rows    AS linhas_saida,
  details:flow_progress.metrics.num_dropped_rows   AS linhas_descartadas,
  details:flow_progress.data_quality.expectations  AS violacoes
FROM event_log("gold.vendas.silver_pedidos")
ORDER BY timestamp DESC;

Ver também: databricks | databricks-delta-lake | databricks-jobs | databricks-asset-bundles | databricks-unity-catalog | arquitetura-medalhao