Change Data Feed (CDF) é o mecanismo nativo do Delta Lake no Databricks para capturar mudanças (inserts, updates, deletes) em tabelas Delta e expô-las como um stream de eventos. É o equivalente do CDC dentro do Lakehouse.

Enquanto o CDC captura mudanças de bancos de dados externos, o CDF captura mudanças entre camadas do próprio Lakehouse (por exemplo, Silver → Gold) para alimentar processos downstream sem refazer leitura completa.

Habilitando o CDF

-- Na criação da tabela
CREATE TABLE silver.pedidos (
  pedido_id BIGINT,
  cliente_id BIGINT,
  status STRING,
  updated_at TIMESTAMP
)
TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');
 
-- Em tabela existente
ALTER TABLE silver.pedidos
SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');
# Via PySpark na criação
spark.sql("""
    CREATE TABLE silver.pedidos ...
    TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')
""")

Quando habilitado, o Delta Lake passa a armazenar os eventos de mudança em _change_data/ dentro do diretório da tabela.

Lendo as mudanças

Batch (leitura pontual)

# A partir de uma versão específica
df = (
    spark.read.format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", 5)
    .table("silver.pedidos")
)
 
# Entre versões
df = (
    spark.read.format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", 5)
    .option("endingVersion", 10)
    .table("silver.pedidos")
)
 
# A partir de um timestamp
df = (
    spark.read.format("delta")
    .option("readChangeFeed", "true")
    .option("startingTimestamp", "2026-05-01 00:00:00")
    .table("silver.pedidos")
)
-- SQL equivalente
SELECT * FROM table_changes('silver.pedidos', 5);
SELECT * FROM table_changes('silver.pedidos', 5, 10);
SELECT * FROM table_changes('silver.pedidos', '2026-05-01');

Streaming (leitura contínua)

df_stream = (
    spark.readStream.format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", "latest")
    .table("silver.pedidos")
)

Colunas de metadado

O CDF adiciona colunas especiais a cada linha retornada:

ColunaTipoDescrição
_change_typeStringTipo da operação
_commit_versionLongVersão do commit Delta
_commit_timestampTimestampTimestamp do commit

Valores de _change_type

ValorSignificado
insertLinha inserida
update_preimageValor anterior ao update
update_postimageValor posterior ao update (novo valor)
deleteLinha deletada
from pyspark.sql import functions as F
 
# Filtrar só os valores atuais (pós-update e inserts)
df_atual = df.filter(
    F.col("_change_type").isin("insert", "update_postimage")
)
 
# Ver só deletes
df_deletes = df.filter(F.col("_change_type") == "delete")
 
# Ver o antes e depois de cada update
df_before = df.filter(F.col("_change_type") == "update_preimage")
df_after  = df.filter(F.col("_change_type") == "update_postimage")

Caso de uso: pipeline incremental Silver → Gold

Sem CDF, o pipeline Gold precisa re-ler toda a Silver ou usar watermarks complexos. Com CDF, lê apenas o delta.

from delta.tables import DeltaTable
from pyspark.sql import functions as F
 
# Ler só o que mudou na Silver desde a última execução
ultima_versao_processada = spark.sql(
    "SELECT max(_commit_version) FROM controle.cdf_checkpoint WHERE tabela = 'silver.pedidos'"
).collect()[0][0] or 0
 
changes = (
    spark.read.format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", ultima_versao_processada + 1)
    .table("silver.pedidos")
)
 
# Considerar só o estado final de cada registro
novos_estados = (
    changes
    .filter(F.col("_change_type").isin("insert", "update_postimage"))
    .drop("_change_type", "_commit_version", "_commit_timestamp")
)
 
deletes = (
    changes
    .filter(F.col("_change_type") == "delete")
    .select("pedido_id")
)
 
# Aplicar upsert na Gold
gold = DeltaTable.forName(spark, "gold.pedidos_agregado")
 
gold.alias("alvo").merge(
    novos_estados.alias("origem"),
    "alvo.pedido_id = origem.pedido_id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
 
# Remover deletes
gold.delete(F.col("pedido_id").isin([r.pedido_id for r in deletes.collect()]))
 
# Salvar checkpoint
nova_versao = changes.agg(F.max("_commit_version")).collect()[0][0]
spark.sql(f"UPDATE controle.cdf_checkpoint SET max_version = {nova_versao} WHERE tabela = 'silver.pedidos'")

CDF com Lakeflow Declarative Pipelines

No Lakeflow, o APPLY CHANGES INTO usa CDF internamente para processar SCD de forma declarativa:

-- SCD Tipo 1 via CDF
APPLY CHANGES INTO LIVE.silver_clientes
FROM STREAM(LIVE.bronze_clientes)
KEYS (cliente_id)
SEQUENCE BY updated_at;
 
-- SCD Tipo 2 via CDF
APPLY CHANGES INTO LIVE.dim_cliente
FROM STREAM(LIVE.silver_clientes)
KEYS (cliente_id)
SEQUENCE BY updated_at
STORED AS SCD TYPE 2;

Comparação: CDF vs Time Travel vs Full Scan

EstratégiaLê apenas deltaCaptura deletesOverhead storage
Full ScanNãoN/A (tudo)Nenhum
Time TravelNão (lê snapshot)NãoNenhum
CDFSimSim_change_data/

Considerações

  • Storage adicional: o CDF armazena os eventos em _change_data/. VACUUM remove arquivos antigos, mas os eventos dentro da retenção são mantidos.
  • Performance de write: escrita tem overhead mínimo pois o Delta Log já registra as operações.
  • MERGE e CDF: operações MERGE geram update_preimage + update_postimage para cada linha modificada, útil para auditoria e para construir SCD Tipo 2.
  • CDF não retroage: habilitar CDF em uma tabela existente não gera histórico das mudanças anteriores. Só captura a partir do momento em que foi habilitado.

Ver também: databricks-delta-lake | databricks | databricks-lakeflow-pipelines | cdc-change-data-capture | scd-slowly-changing-dimensions | arquitetura-medalhao | pipeline-de-dados