O Airflow é fundamentalmente um orquestrador batch: não substitui Apache Kafka ou Spark Structured Streaming para streaming com estado e latência de milissegundos. Porém, suporta padrões event-driven e near-real-time com sensores, Dataset scheduling e micro-batch.
Quando usar Airflow vs. streaming nativo
| Requisito | Airflow | Kafka + Flink/Spark Streaming |
|---|---|---|
| Latência tolerável | Minutos a horas | Milissegundos a segundos |
| Eventos complexos com estado | Limitado | Nativo (CEP, joins temporais) |
| Trigger por chegada de arquivo/evento | Sim (sensores) | — |
| Orquestrar jobs de streaming | Sim | — |
| Reprocessamento de histórico | Excelente | Trabalhoso |
| Visibilidade e retry da lógica | Excelente | Depende da ferramenta |
Padrão 1: Sensor-driven: trigger por chegada de arquivo
DAG que só executa quando o arquivo do dia chega no S3/GCS.
from airflow.decorators import dag, task
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from datetime import datetime, timedelta
@dag(
dag_id="processar_ao_chegar_arquivo",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
default_args={"retries": 1},
tags=["sensor", "event-driven"],
)
def processar_ao_chegar_arquivo():
aguardar_arquivo = S3KeySensor(
task_id="aguardar_arquivo_do_dia",
bucket_name="data-lake-prod",
bucket_key="inbound/pedidos/{{ ds_nodash }}/pedidos.csv",
aws_conn_id="aws_default",
mode="reschedule", # libera worker entre verificações
poke_interval=300, # verifica a cada 5 minutos
timeout=43200, # falha após 12 horas sem o arquivo
soft_fail=False,
)
@task
def processar(data_ref: str) -> None:
print(f"Processando arquivo de {data_ref}")
@task
def notificar_sucesso(data_ref: str) -> None:
print(f"Pipeline de {data_ref} concluído")
data = "{{ ds }}"
aguardar_arquivo >> processar(data_ref=data) >> notificar_sucesso(data_ref=data)
processar_ao_chegar_arquivo()Padrão 2: Micro-batch via schedule frequente
Executa o DAG a cada 5 minutos, processando janelas curtas de dados novos.
from airflow.decorators import dag, task
from airflow.models import Variable
from datetime import datetime
@dag(
dag_id="micro_batch_eventos",
start_date=datetime(2026, 1, 1),
schedule="*/5 * * * *", # a cada 5 minutos
catchup=False,
max_active_runs=1, # garante que apenas um run execute por vez
tags=["micro-batch", "near-realtime"],
)
def micro_batch_eventos():
@task
def extrair_novos_eventos() -> list[dict]:
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id="postgres_origem")
# Watermark: busca apenas o que ainda não foi processado
ultimo_id = int(Variable.get("eventos_ultimo_id_processado", default_var=0))
rows = hook.get_records(
f"""
SELECT id, evento_tipo, payload, criado_em
FROM eventos
WHERE id > {ultimo_id}
ORDER BY id
LIMIT 10000
"""
)
return [{"id": r[0], "tipo": r[1], "payload": r[2]} for r in rows]
@task
def processar_eventos(eventos: list[dict]) -> int:
if not eventos:
print("Nenhum evento novo")
return 0
print(f"Processando {len(eventos)} eventos")
# lógica de transformação
ultimo_id = max(e["id"] for e in eventos)
return ultimo_id
@task
def atualizar_watermark(ultimo_id: int) -> None:
if ultimo_id > 0:
Variable.set("eventos_ultimo_id_processado", str(ultimo_id))
print(f"Watermark atualizado para id={ultimo_id}")
eventos = extrair_novos_eventos()
ultimo_id = processar_eventos(eventos)
atualizar_watermark(ultimo_id)
micro_batch_eventos()Use
max_active_runs=1para evitar que dois runs do mesmo DAG processem o mesmo lote simultaneamente quando o schedule é muito frequente.
Padrão 3: Dataset Scheduling (Airflow 2.4+)
Encadeia DAGs automaticamente quando dados são produzidos, sem polling.
graph LR DAG_A[ingerir_pedidos] -->|produz| DS[(dataset:\nbronze/pedidos)] DS -->|dispara| DAG_B[transformar_pedidos] DAG_B -->|produz| DS2[(dataset:\nsilver/pedidos)] DS2 -->|dispara| DAG_C[calcular_metricas]
from airflow import Dataset
from airflow.decorators import dag, task
from datetime import datetime
# Definir datasets como objetos reutilizáveis
DATASET_BRONZE_PEDIDOS = Dataset("s3://data-lake/bronze/pedidos/")
DATASET_SILVER_PEDIDOS = Dataset("s3://data-lake/silver/pedidos/")
# DAG produtora — roda diariamente
@dag(
schedule="@daily",
start_date=datetime(2026, 1, 1),
catchup=False,
)
def ingerir_pedidos():
@task(outlets=[DATASET_BRONZE_PEDIDOS]) # sinaliza produção do dataset
def ingerir() -> None:
print("Ingerindo pedidos para bronze")
ingerir()
ingerir_pedidos()
# DAG consumidora — dispara automaticamente quando bronze é atualizado
@dag(
schedule=[DATASET_BRONZE_PEDIDOS], # trigger = dataset produzido
start_date=datetime(2026, 1, 1),
catchup=False,
)
def transformar_pedidos():
@task(outlets=[DATASET_SILVER_PEDIDOS])
def transformar() -> None:
print("Transformando bronze → silver")
transformar()
transformar_pedidos()
# Terceiro nível — dispara quando silver estiver pronto
@dag(
schedule=[DATASET_SILVER_PEDIDOS],
start_date=datetime(2026, 1, 1),
catchup=False,
)
def calcular_metricas():
@task
def calcular() -> None:
print("Calculando métricas de negócio no gold")
calcular()
calcular_metricas()Padrão 4: Consumidor Kafka (micro-batch via sensor customizado)
Airflow não é um consumer Kafka nativo, mas pode disparar processamento baseado em offsets ou contagem de mensagens.
from airflow.decorators import dag, task
from airflow.sensors.base import BaseSensorOperator
from datetime import datetime, timedelta
class KafkaTopicSensor(BaseSensorOperator):
"""Aguarda mensagens acima de um threshold no tópico Kafka."""
def __init__(self, topic: str, min_mensagens: int = 1000, **kwargs):
super().__init__(**kwargs)
self.topic = topic
self.min_mensagens = min_mensagens
def poke(self, context):
from confluent_kafka.admin import AdminClient
admin = AdminClient({"bootstrap.servers": "kafka:9092"})
metadata = admin.list_topics(topic=self.topic, timeout=10)
partitions = metadata.topics[self.topic].partitions
total_msgs = sum(
admin.list_offsets({...}).result().get(tp, {}).get("offset", 0)
for tp in partitions
)
self.log.info(f"Mensagens no tópico {self.topic}: {total_msgs}")
return total_msgs >= self.min_mensagens
@dag(
dag_id="processar_kafka_acumulado",
start_date=datetime(2026, 1, 1),
schedule="*/15 * * * *", # verifica a cada 15 minutos
catchup=False,
max_active_runs=1,
tags=["kafka", "micro-batch"],
)
def processar_kafka_acumulado():
aguardar_mensagens = KafkaTopicSensor(
task_id="aguardar_mensagens_kafka",
topic="eventos-pedidos",
min_mensagens=500,
mode="reschedule",
poke_interval=60,
timeout=900,
)
@task
def consumir_e_processar() -> None:
from confluent_kafka import Consumer
consumer = Consumer({
"bootstrap.servers": "kafka:9092",
"group.id": "airflow-batch-consumer",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
})
consumer.subscribe(["eventos-pedidos"])
mensagens = []
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
break
mensagens.append(msg.value().decode())
# processar lote
print(f"Processados {len(mensagens)} eventos")
consumer.commit()
consumer.close()
aguardar_mensagens >> consumir_e_processar()
processar_kafka_acumulado()Padrão 5: Orquestrar job de Spark Streaming
O Airflow não executa o job de streaming em si, mas pode iniciar, monitorar e reiniciar jobs de Spark Structured Streaming rodando no Databricks ou EMR.
from airflow.decorators import dag, task
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
from airflow.providers.databricks.sensors.databricks import DatabricksJobRunSensor
from datetime import datetime
@dag(
dag_id="gerenciar_streaming_pedidos",
start_date=datetime(2026, 1, 1),
schedule="@once", # disparo manual para iniciar
catchup=False,
tags=["databricks", "streaming"],
)
def gerenciar_streaming_pedidos():
iniciar_job_streaming = DatabricksRunNowOperator(
task_id="iniciar_streaming",
databricks_conn_id="databricks_default",
job_id=12345, # ID do job Databricks de streaming
notebook_params={
"kafka_bootstrap": "kafka.empresa.com:9092",
"topico_entrada": "eventos-pedidos",
"checkpoint_path": "s3://data-lake/checkpoints/streaming-pedidos/",
"output_path": "s3://data-lake/silver/pedidos-streaming/",
},
)
monitorar = DatabricksJobRunSensor(
task_id="monitorar_streaming",
databricks_conn_id="databricks_default",
run_id="{{ task_instance.xcom_pull('iniciar_streaming')['run_id'] }}",
poke_interval=300, # verifica a cada 5 minutos
)
iniciar_job_streaming >> monitorar
gerenciar_streaming_pedidos()Padrão 6: Pipeline reativo com múltiplas fontes
Aguarda dados de múltiplas fontes antes de processar (fan-in).
from airflow.decorators import dag, task
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.operators.empty import EmptyOperator
from datetime import datetime
@dag(
dag_id="pipeline_multifonte",
start_date=datetime(2026, 1, 1),
schedule="0 8 * * *",
catchup=False,
tags=["multi-fonte", "fan-in"],
)
def pipeline_multifonte():
# Aguardar chegada de dados de 3 fontes distintas
aguardar_pedidos = S3KeySensor(
task_id="aguardar_pedidos",
bucket_name="data-lake",
bucket_key="raw/pedidos/{{ ds_nodash }}/_SUCCESS",
mode="reschedule",
poke_interval=120,
timeout=14400,
)
aguardar_clientes = S3KeySensor(
task_id="aguardar_clientes",
bucket_name="data-lake",
bucket_key="raw/clientes/{{ ds_nodash }}/_SUCCESS",
mode="reschedule",
poke_interval=120,
timeout=14400,
)
aguardar_produtos = S3KeySensor(
task_id="aguardar_produtos",
bucket_name="data-lake",
bucket_key="raw/produtos/{{ ds_nodash }}/_SUCCESS",
mode="reschedule",
poke_interval=120,
timeout=14400,
)
# Join: só processa quando as 3 fontes chegarem
todas_fontes_prontas = EmptyOperator(
task_id="todas_fontes_prontas",
trigger_rule="all_success",
)
@task
def processar_enriquecido(data_ref: str) -> None:
print(f"Processando dados enriquecidos de {data_ref}")
@task
def notificar(data_ref: str) -> None:
print(f"Pipeline concluído para {data_ref}")
[aguardar_pedidos, aguardar_clientes, aguardar_produtos] >> todas_fontes_prontas
data = "{{ ds }}"
todas_fontes_prontas >> processar_enriquecido(data_ref=data) >> notificar(data_ref=data)
pipeline_multifonte()Boas práticas para pipelines event-driven
| Prática | Por quê |
|---|---|
Usar mode="reschedule" em sensores | Evita workers ociosos durante waits longos |
Configurar timeout em todos os sensores | Garante que o pipeline falhe em vez de ficar preso indefinidamente |
Usar max_active_runs=1 em micro-batch | Evita dois runs processando o mesmo intervalo simultaneamente |
Usar soft_fail=True em sensores não-críticos | Permite que o pipeline continue mesmo sem o dado opcional |
| Preferir Dataset scheduling a sensores de tempo | Mais explícito sobre dependências de dados entre DAGs |
| Persistir watermarks em Variables ou banco | Garante que micro-batch retome do ponto correto após falha |
Ver também: airflow | airflow-conceitos | airflow-pipelines-batch | airflow-dag-desenvolvimento | pipeline-de-dados | spark-streaming | Apache Kafka | databricks | cdc-change-data-capture