Change Data Feed (CDF) é o mecanismo nativo do Delta Lake no Databricks para capturar mudanças (inserts, updates, deletes) em tabelas Delta e expô-las como um stream de eventos. É o equivalente do CDC dentro do Lakehouse.
Enquanto o CDC captura mudanças de bancos de dados externos, o CDF captura mudanças entre camadas do próprio Lakehouse (por exemplo, Silver → Gold) para alimentar processos downstream sem refazer leitura completa.
Habilitando o CDF
-- Na criação da tabela
CREATE TABLE silver.pedidos (
pedido_id BIGINT,
cliente_id BIGINT,
status STRING,
updated_at TIMESTAMP
)
TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');
-- Em tabela existente
ALTER TABLE silver.pedidos
SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');# Via PySpark na criação
spark.sql("""
CREATE TABLE silver.pedidos ...
TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')
""")Quando habilitado, o Delta Lake passa a armazenar os eventos de mudança em _change_data/ dentro do diretório da tabela.
Lendo as mudanças
Batch (leitura pontual)
# A partir de uma versão específica
df = (
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 5)
.table("silver.pedidos")
)
# Entre versões
df = (
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 5)
.option("endingVersion", 10)
.table("silver.pedidos")
)
# A partir de um timestamp
df = (
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2026-05-01 00:00:00")
.table("silver.pedidos")
)-- SQL equivalente
SELECT * FROM table_changes('silver.pedidos', 5);
SELECT * FROM table_changes('silver.pedidos', 5, 10);
SELECT * FROM table_changes('silver.pedidos', '2026-05-01');Streaming (leitura contínua)
df_stream = (
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "latest")
.table("silver.pedidos")
)Colunas de metadado
O CDF adiciona colunas especiais a cada linha retornada:
| Coluna | Tipo | Descrição |
|---|---|---|
_change_type | String | Tipo da operação |
_commit_version | Long | Versão do commit Delta |
_commit_timestamp | Timestamp | Timestamp do commit |
Valores de _change_type
| Valor | Significado |
|---|---|
insert | Linha inserida |
update_preimage | Valor anterior ao update |
update_postimage | Valor posterior ao update (novo valor) |
delete | Linha deletada |
from pyspark.sql import functions as F
# Filtrar só os valores atuais (pós-update e inserts)
df_atual = df.filter(
F.col("_change_type").isin("insert", "update_postimage")
)
# Ver só deletes
df_deletes = df.filter(F.col("_change_type") == "delete")
# Ver o antes e depois de cada update
df_before = df.filter(F.col("_change_type") == "update_preimage")
df_after = df.filter(F.col("_change_type") == "update_postimage")Caso de uso: pipeline incremental Silver → Gold
Sem CDF, o pipeline Gold precisa re-ler toda a Silver ou usar watermarks complexos. Com CDF, lê apenas o delta.
from delta.tables import DeltaTable
from pyspark.sql import functions as F
# Ler só o que mudou na Silver desde a última execução
ultima_versao_processada = spark.sql(
"SELECT max(_commit_version) FROM controle.cdf_checkpoint WHERE tabela = 'silver.pedidos'"
).collect()[0][0] or 0
changes = (
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", ultima_versao_processada + 1)
.table("silver.pedidos")
)
# Considerar só o estado final de cada registro
novos_estados = (
changes
.filter(F.col("_change_type").isin("insert", "update_postimage"))
.drop("_change_type", "_commit_version", "_commit_timestamp")
)
deletes = (
changes
.filter(F.col("_change_type") == "delete")
.select("pedido_id")
)
# Aplicar upsert na Gold
gold = DeltaTable.forName(spark, "gold.pedidos_agregado")
gold.alias("alvo").merge(
novos_estados.alias("origem"),
"alvo.pedido_id = origem.pedido_id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
# Remover deletes
gold.delete(F.col("pedido_id").isin([r.pedido_id for r in deletes.collect()]))
# Salvar checkpoint
nova_versao = changes.agg(F.max("_commit_version")).collect()[0][0]
spark.sql(f"UPDATE controle.cdf_checkpoint SET max_version = {nova_versao} WHERE tabela = 'silver.pedidos'")CDF com Lakeflow Declarative Pipelines
No Lakeflow, o APPLY CHANGES INTO usa CDF internamente para processar SCD de forma declarativa:
-- SCD Tipo 1 via CDF
APPLY CHANGES INTO LIVE.silver_clientes
FROM STREAM(LIVE.bronze_clientes)
KEYS (cliente_id)
SEQUENCE BY updated_at;
-- SCD Tipo 2 via CDF
APPLY CHANGES INTO LIVE.dim_cliente
FROM STREAM(LIVE.silver_clientes)
KEYS (cliente_id)
SEQUENCE BY updated_at
STORED AS SCD TYPE 2;Comparação: CDF vs Time Travel vs Full Scan
| Estratégia | Lê apenas delta | Captura deletes | Overhead storage |
|---|---|---|---|
| Full Scan | Não | N/A (tudo) | Nenhum |
| Time Travel | Não (lê snapshot) | Não | Nenhum |
| CDF | Sim | Sim | _change_data/ |
Considerações
- Storage adicional: o CDF armazena os eventos em
_change_data/. VACUUM remove arquivos antigos, mas os eventos dentro da retenção são mantidos. - Performance de write: escrita tem overhead mínimo pois o Delta Log já registra as operações.
- MERGE e CDF: operações MERGE geram
update_preimage+update_postimagepara cada linha modificada, útil para auditoria e para construir SCD Tipo 2. - CDF não retroage: habilitar CDF em uma tabela existente não gera histórico das mudanças anteriores. Só captura a partir do momento em que foi habilitado.
Ver também: databricks-delta-lake | databricks | databricks-lakeflow-pipelines | cdc-change-data-capture | scd-slowly-changing-dimensions | arquitetura-medalhao | pipeline-de-dados