O Airflow é fundamentalmente um orquestrador batch: não substitui Apache Kafka ou Spark Structured Streaming para streaming com estado e latência de milissegundos. Porém, suporta padrões event-driven e near-real-time com sensores, Dataset scheduling e micro-batch.

Quando usar Airflow vs. streaming nativo

RequisitoAirflowKafka + Flink/Spark Streaming
Latência tolerávelMinutos a horasMilissegundos a segundos
Eventos complexos com estadoLimitadoNativo (CEP, joins temporais)
Trigger por chegada de arquivo/eventoSim (sensores)
Orquestrar jobs de streamingSim
Reprocessamento de históricoExcelenteTrabalhoso
Visibilidade e retry da lógicaExcelenteDepende da ferramenta

Padrão 1: Sensor-driven: trigger por chegada de arquivo

DAG que só executa quando o arquivo do dia chega no S3/GCS.

from airflow.decorators import dag, task
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from datetime import datetime, timedelta
 
@dag(
    dag_id="processar_ao_chegar_arquivo",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    default_args={"retries": 1},
    tags=["sensor", "event-driven"],
)
def processar_ao_chegar_arquivo():
 
    aguardar_arquivo = S3KeySensor(
        task_id="aguardar_arquivo_do_dia",
        bucket_name="data-lake-prod",
        bucket_key="inbound/pedidos/{{ ds_nodash }}/pedidos.csv",
        aws_conn_id="aws_default",
        mode="reschedule",       # libera worker entre verificações
        poke_interval=300,       # verifica a cada 5 minutos
        timeout=43200,           # falha após 12 horas sem o arquivo
        soft_fail=False,
    )
 
    @task
    def processar(data_ref: str) -> None:
        print(f"Processando arquivo de {data_ref}")
 
    @task
    def notificar_sucesso(data_ref: str) -> None:
        print(f"Pipeline de {data_ref} concluído")
 
    data = "{{ ds }}"
    aguardar_arquivo >> processar(data_ref=data) >> notificar_sucesso(data_ref=data)
 
processar_ao_chegar_arquivo()

Padrão 2: Micro-batch via schedule frequente

Executa o DAG a cada 5 minutos, processando janelas curtas de dados novos.

from airflow.decorators import dag, task
from airflow.models import Variable
from datetime import datetime
 
@dag(
    dag_id="micro_batch_eventos",
    start_date=datetime(2026, 1, 1),
    schedule="*/5 * * * *",  # a cada 5 minutos
    catchup=False,
    max_active_runs=1,       # garante que apenas um run execute por vez
    tags=["micro-batch", "near-realtime"],
)
def micro_batch_eventos():
 
    @task
    def extrair_novos_eventos() -> list[dict]:
        from airflow.providers.postgres.hooks.postgres import PostgresHook
 
        hook = PostgresHook(postgres_conn_id="postgres_origem")
        # Watermark: busca apenas o que ainda não foi processado
        ultimo_id = int(Variable.get("eventos_ultimo_id_processado", default_var=0))
 
        rows = hook.get_records(
            f"""
            SELECT id, evento_tipo, payload, criado_em
            FROM eventos
            WHERE id > {ultimo_id}
            ORDER BY id
            LIMIT 10000
            """
        )
        return [{"id": r[0], "tipo": r[1], "payload": r[2]} for r in rows]
 
    @task
    def processar_eventos(eventos: list[dict]) -> int:
        if not eventos:
            print("Nenhum evento novo")
            return 0
 
        print(f"Processando {len(eventos)} eventos")
        # lógica de transformação
        ultimo_id = max(e["id"] for e in eventos)
        return ultimo_id
 
    @task
    def atualizar_watermark(ultimo_id: int) -> None:
        if ultimo_id > 0:
            Variable.set("eventos_ultimo_id_processado", str(ultimo_id))
            print(f"Watermark atualizado para id={ultimo_id}")
 
    eventos = extrair_novos_eventos()
    ultimo_id = processar_eventos(eventos)
    atualizar_watermark(ultimo_id)
 
micro_batch_eventos()

Use max_active_runs=1 para evitar que dois runs do mesmo DAG processem o mesmo lote simultaneamente quando o schedule é muito frequente.

Padrão 3: Dataset Scheduling (Airflow 2.4+)

Encadeia DAGs automaticamente quando dados são produzidos, sem polling.

graph LR
    DAG_A[ingerir_pedidos] -->|produz| DS[(dataset:\nbronze/pedidos)]
    DS -->|dispara| DAG_B[transformar_pedidos]
    DAG_B -->|produz| DS2[(dataset:\nsilver/pedidos)]
    DS2 -->|dispara| DAG_C[calcular_metricas]
from airflow import Dataset
from airflow.decorators import dag, task
from datetime import datetime
 
# Definir datasets como objetos reutilizáveis
DATASET_BRONZE_PEDIDOS = Dataset("s3://data-lake/bronze/pedidos/")
DATASET_SILVER_PEDIDOS = Dataset("s3://data-lake/silver/pedidos/")
 
# DAG produtora — roda diariamente
@dag(
    schedule="@daily",
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def ingerir_pedidos():
 
    @task(outlets=[DATASET_BRONZE_PEDIDOS])  # sinaliza produção do dataset
    def ingerir() -> None:
        print("Ingerindo pedidos para bronze")
 
    ingerir()
 
ingerir_pedidos()
 
# DAG consumidora — dispara automaticamente quando bronze é atualizado
@dag(
    schedule=[DATASET_BRONZE_PEDIDOS],  # trigger = dataset produzido
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def transformar_pedidos():
 
    @task(outlets=[DATASET_SILVER_PEDIDOS])
    def transformar() -> None:
        print("Transformando bronze → silver")
 
    transformar()
 
transformar_pedidos()
 
# Terceiro nível — dispara quando silver estiver pronto
@dag(
    schedule=[DATASET_SILVER_PEDIDOS],
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def calcular_metricas():
 
    @task
    def calcular() -> None:
        print("Calculando métricas de negócio no gold")
 
    calcular()
 
calcular_metricas()

Padrão 4: Consumidor Kafka (micro-batch via sensor customizado)

Airflow não é um consumer Kafka nativo, mas pode disparar processamento baseado em offsets ou contagem de mensagens.

from airflow.decorators import dag, task
from airflow.sensors.base import BaseSensorOperator
from datetime import datetime, timedelta
 
class KafkaTopicSensor(BaseSensorOperator):
    """Aguarda mensagens acima de um threshold no tópico Kafka."""
 
    def __init__(self, topic: str, min_mensagens: int = 1000, **kwargs):
        super().__init__(**kwargs)
        self.topic = topic
        self.min_mensagens = min_mensagens
 
    def poke(self, context):
        from confluent_kafka.admin import AdminClient
 
        admin = AdminClient({"bootstrap.servers": "kafka:9092"})
        metadata = admin.list_topics(topic=self.topic, timeout=10)
        partitions = metadata.topics[self.topic].partitions
 
        total_msgs = sum(
            admin.list_offsets({...}).result().get(tp, {}).get("offset", 0)
            for tp in partitions
        )
 
        self.log.info(f"Mensagens no tópico {self.topic}: {total_msgs}")
        return total_msgs >= self.min_mensagens
 
@dag(
    dag_id="processar_kafka_acumulado",
    start_date=datetime(2026, 1, 1),
    schedule="*/15 * * * *",   # verifica a cada 15 minutos
    catchup=False,
    max_active_runs=1,
    tags=["kafka", "micro-batch"],
)
def processar_kafka_acumulado():
 
    aguardar_mensagens = KafkaTopicSensor(
        task_id="aguardar_mensagens_kafka",
        topic="eventos-pedidos",
        min_mensagens=500,
        mode="reschedule",
        poke_interval=60,
        timeout=900,
    )
 
    @task
    def consumir_e_processar() -> None:
        from confluent_kafka import Consumer
 
        consumer = Consumer({
            "bootstrap.servers": "kafka:9092",
            "group.id": "airflow-batch-consumer",
            "auto.offset.reset": "earliest",
            "enable.auto.commit": False,
        })
        consumer.subscribe(["eventos-pedidos"])
 
        mensagens = []
        while True:
            msg = consumer.poll(timeout=1.0)
            if msg is None:
                break
            mensagens.append(msg.value().decode())
 
        # processar lote
        print(f"Processados {len(mensagens)} eventos")
        consumer.commit()
        consumer.close()
 
    aguardar_mensagens >> consumir_e_processar()
 
processar_kafka_acumulado()

Padrão 5: Orquestrar job de Spark Streaming

O Airflow não executa o job de streaming em si, mas pode iniciar, monitorar e reiniciar jobs de Spark Structured Streaming rodando no Databricks ou EMR.

from airflow.decorators import dag, task
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
from airflow.providers.databricks.sensors.databricks import DatabricksJobRunSensor
from datetime import datetime
 
@dag(
    dag_id="gerenciar_streaming_pedidos",
    start_date=datetime(2026, 1, 1),
    schedule="@once",   # disparo manual para iniciar
    catchup=False,
    tags=["databricks", "streaming"],
)
def gerenciar_streaming_pedidos():
 
    iniciar_job_streaming = DatabricksRunNowOperator(
        task_id="iniciar_streaming",
        databricks_conn_id="databricks_default",
        job_id=12345,   # ID do job Databricks de streaming
        notebook_params={
            "kafka_bootstrap": "kafka.empresa.com:9092",
            "topico_entrada": "eventos-pedidos",
            "checkpoint_path": "s3://data-lake/checkpoints/streaming-pedidos/",
            "output_path": "s3://data-lake/silver/pedidos-streaming/",
        },
    )
 
    monitorar = DatabricksJobRunSensor(
        task_id="monitorar_streaming",
        databricks_conn_id="databricks_default",
        run_id="{{ task_instance.xcom_pull('iniciar_streaming')['run_id'] }}",
        poke_interval=300,  # verifica a cada 5 minutos
    )
 
    iniciar_job_streaming >> monitorar
 
gerenciar_streaming_pedidos()

Padrão 6: Pipeline reativo com múltiplas fontes

Aguarda dados de múltiplas fontes antes de processar (fan-in).

from airflow.decorators import dag, task
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.operators.empty import EmptyOperator
from datetime import datetime
 
@dag(
    dag_id="pipeline_multifonte",
    start_date=datetime(2026, 1, 1),
    schedule="0 8 * * *",
    catchup=False,
    tags=["multi-fonte", "fan-in"],
)
def pipeline_multifonte():
 
    # Aguardar chegada de dados de 3 fontes distintas
    aguardar_pedidos = S3KeySensor(
        task_id="aguardar_pedidos",
        bucket_name="data-lake",
        bucket_key="raw/pedidos/{{ ds_nodash }}/_SUCCESS",
        mode="reschedule",
        poke_interval=120,
        timeout=14400,
    )
 
    aguardar_clientes = S3KeySensor(
        task_id="aguardar_clientes",
        bucket_name="data-lake",
        bucket_key="raw/clientes/{{ ds_nodash }}/_SUCCESS",
        mode="reschedule",
        poke_interval=120,
        timeout=14400,
    )
 
    aguardar_produtos = S3KeySensor(
        task_id="aguardar_produtos",
        bucket_name="data-lake",
        bucket_key="raw/produtos/{{ ds_nodash }}/_SUCCESS",
        mode="reschedule",
        poke_interval=120,
        timeout=14400,
    )
 
    # Join: só processa quando as 3 fontes chegarem
    todas_fontes_prontas = EmptyOperator(
        task_id="todas_fontes_prontas",
        trigger_rule="all_success",
    )
 
    @task
    def processar_enriquecido(data_ref: str) -> None:
        print(f"Processando dados enriquecidos de {data_ref}")
 
    @task
    def notificar(data_ref: str) -> None:
        print(f"Pipeline concluído para {data_ref}")
 
    [aguardar_pedidos, aguardar_clientes, aguardar_produtos] >> todas_fontes_prontas
    data = "{{ ds }}"
    todas_fontes_prontas >> processar_enriquecido(data_ref=data) >> notificar(data_ref=data)
 
pipeline_multifonte()

Boas práticas para pipelines event-driven

PráticaPor quê
Usar mode="reschedule" em sensoresEvita workers ociosos durante waits longos
Configurar timeout em todos os sensoresGarante que o pipeline falhe em vez de ficar preso indefinidamente
Usar max_active_runs=1 em micro-batchEvita dois runs processando o mesmo intervalo simultaneamente
Usar soft_fail=True em sensores não-críticosPermite que o pipeline continue mesmo sem o dado opcional
Preferir Dataset scheduling a sensores de tempoMais explícito sobre dependências de dados entre DAGs
Persistir watermarks em Variables ou bancoGarante que micro-batch retome do ponto correto após falha

Ver também: airflow | airflow-conceitos | airflow-pipelines-batch | airflow-dag-desenvolvimento | pipeline-de-dados | spark-streaming | Apache Kafka | databricks | cdc-change-data-capture