Lakeflow Declarative Pipelines (LDP), anteriormente chamado de Delta Live Tables (DLT), é o framework declarativo do Databricks para construir pipelines de dados confiáveis. Você declara o que as tabelas devem conter; o Databricks gerencia como executar, orquestrar, escalar e re-executar.
Por que usar LDP em vez de notebooks Spark puros
| Aspecto | Spark puro (notebooks/jobs) | Lakeflow Declarative Pipelines |
|---|---|---|
| Dependências entre tabelas | Manual (você define a ordem) | Automático (inferido pelo grafo) |
| Qualidade de dados | Código custom | Expectations declarativas nativas |
| Retry e recovery | Manual | Automático |
| Monitoramento | Logs do job | Dashboard de pipeline + event log |
| Idempotência | Você garante | Garantido pelo framework |
| Streaming + batch | Dois jobs separados | Um único pipeline (unified) |
Conceitos fundamentais
- Pipeline: conjunto de tabelas e views relacionadas, executadas como unidade
- Streaming Table: tabela atualizada incrementalmente (append / CDC), processando apenas dados novos
- Materialized View: tabela recalculada completamente a cada execução (como uma view materializada)
- View: view temporária dentro do pipeline (não persiste como tabela)
- Expectations: regras de qualidade declarativas aplicadas durante a execução
Estrutura básica de um pipeline LDP
# src/pipelines/bronze_pedidos.py
import dlt
from pyspark.sql import functions as F
# 1. Ingestão Auto Loader → Streaming Table (Bronze)
@dlt.table(
name="bronze_pedidos_raw",
comment="Ingestão bruta de pedidos via Auto Loader, append-only",
table_properties={"quality": "bronze"},
)
def bronze_pedidos_raw():
return (
spark.readStream
.format("cloudFiles") # Auto Loader
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/Volumes/bronze/schemas/pedidos/")
.load("/Volumes/bronze/landing/pedidos/")
)
# 2. Limpeza → Streaming Table (Silver) com Expectations
@dlt.table(
name="silver_pedidos",
comment="Pedidos limpos e validados, camada Silver",
partition_cols=["ano", "mes"],
)
@dlt.expect_or_drop("pedido_id não nulo", "pedido_id IS NOT NULL")
@dlt.expect_or_drop("valor positivo", "valor_total > 0")
@dlt.expect("status válido", "status IN ('pendente', 'concluido', 'cancelado')")
def silver_pedidos():
return (
dlt.read_stream("bronze_pedidos_raw")
.withColumn("data_pedido", F.to_date("data_pedido_str", "yyyy-MM-dd"))
.withColumn("ano", F.year("data_pedido"))
.withColumn("mes", F.month("data_pedido"))
.select("pedido_id", "cliente_id", "data_pedido", "ano", "mes", "valor_total", "status")
)
# 3. Agregação → Materialized View (Gold)
@dlt.table(
name="gold_kpis_diarios",
comment="KPIs diários de vendas, camada Gold",
)
def gold_kpis_diarios():
return (
dlt.read("silver_pedidos")
.filter(F.col("status") == "concluido")
.groupBy("data_pedido")
.agg(
F.sum("valor_total").alias("receita_total"),
F.count("pedido_id").alias("qtd_pedidos"),
)
)Expectations (qualidade de dados)
Expectations são constraints declaradas com decoradores. O comportamento quando violadas é configurável:
| Decorador | Comportamento ao violar |
|---|---|
@dlt.expect("nome", "condição") | Registra violação nas métricas, mantém a linha |
@dlt.expect_or_drop("nome", "condição") | Remove a linha (quarentena) |
@dlt.expect_or_fail("nome", "condição") | Falha o pipeline (stop on error) |
@dlt.expect_all(dict) | Múltiplas expectations com o mesmo comportamento |
@dlt.expect_all_or_drop(dict) | Múltiplas, remove linhas inválidas |
@dlt.table
@dlt.expect_all_or_drop({
"pedido_id não nulo": "pedido_id IS NOT NULL",
"valor positivo": "valor_total > 0",
"status válido": "status IN ('pendente', 'concluido', 'cancelado')",
"data válida": "data_pedido >= '2020-01-01'",
})
def silver_pedidos():
...Métricas de violação ficam disponíveis no event log da pipeline e no dashboard de qualidade.
Auto Loader
Auto Loader (cloudFiles) é a forma nativa e eficiente de ingerir arquivos novos incrementalmente de object storage.
@dlt.table
def bronze_raw():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json") # csv, parquet, avro, text...
.option("cloudFiles.schemaLocation", "/Volumes/bronze/schemas/raw/")
.option("cloudFiles.inferColumnTypes", "true")
# Opcionalmente, schema explícito:
# .schema(schema_explicito)
.load("/Volumes/bronze/landing/raw/")
)- Detecta arquivos novos sem precisar fazer
LISTcompleto (usa notificações do cloud) - Schema inference automática com evolução de schema
- Checkpointing automático, sem reprocessar arquivos já lidos
Apply Changes (CDC)
Para pipelines CDC (Change Data Capture), LDP tem suporte nativo via apply_changes:
dlt.create_streaming_table("silver_clientes_scd1")
dlt.apply_changes(
target="silver_clientes_scd1",
source="bronze_clientes_cdc", # tabela com eventos CDC
keys=["cliente_id"], # chave de unicidade
sequence_by="timestamp", # campo para ordenar eventos
apply_as_deletes=F.expr("op = 'D'"),
except_column_list=["op", "timestamp"],
stored_as_scd_type=1, # SCD Type 1 (sobrescreve) ou 2 (histórico)
)Com scd_type=2, LDP mantém histórico completo de mudanças com colunas __START_AT e __END_AT.
Modos de execução
| Modo | Quando usar |
|---|---|
| Triggered | Executa uma vez e para, como um job batch agendado |
| Continuous | Fica rodando, processa novos dados com baixa latência (~segundos) |
Configurado no pipeline ou no DAB (continuous: true/false).
Nomenclatura de tabelas
Tabelas LDP vivem no Unity Catalog dentro do catalog.schema configurado no pipeline:
catalog: gold
schema: vendas
→ tabelas: gold.vendas.bronze_pedidos_raw
gold.vendas.silver_pedidos
gold.vendas.gold_kpis_diarios
Referências entre notebooks do mesmo pipeline
# notebook A define bronze_pedidos_raw
# notebook B pode referenciar:
dlt.read("bronze_pedidos_raw") # batch
dlt.read_stream("bronze_pedidos_raw") # streaming
# Tabelas externas (fora do pipeline): acesso normal via Spark
df = spark.table("silver.outros.dim_clientes")Monitoramento e event log
Cada execução gera um event log com métricas detalhadas:
-- Event log gerado automaticamente pelo LDP
SELECT
timestamp,
event_type,
details:flow_progress.metrics.num_output_rows AS linhas_saida,
details:flow_progress.metrics.num_dropped_rows AS linhas_descartadas,
details:flow_progress.data_quality.expectations AS violacoes
FROM event_log("gold.vendas.silver_pedidos")
ORDER BY timestamp DESC;Ver também: databricks | databricks-delta-lake | databricks-jobs | databricks-asset-bundles | databricks-unity-catalog | arquitetura-medalhao