Estilo clássico vs. TaskFlow API

O Airflow 2.0 introduziu a TaskFlow API como forma pythônica de definir DAGs. Os dois estilos coexistem e podem ser misturados.

Estilo clássico (context manager)

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
 
def extrair(**context):
    data = context["ds"]
    # lógica de extração
    return {"registros": 1000}
 
def carregar(**context):
    ti = context["ti"]
    resultado = ti.xcom_pull(task_ids="extrair")
    print(f"Carregando {resultado['registros']} registros")
 
with DAG(
    dag_id="pipeline_classico",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    default_args={
        "retries": 2,
        "retry_delay": timedelta(minutes=5),
        "owner": "data-eng",
    },
    tags=["dados", "exemplo"],
) as dag:
 
    t_extrair = PythonOperator(
        task_id="extrair",
        python_callable=extrair,
    )
 
    t_carregar = PythonOperator(
        task_id="carregar",
        python_callable=carregar,
    )
 
    t_extrair >> t_carregar

TaskFlow API (recomendado para código Python)

from airflow.decorators import dag, task
from datetime import datetime, timedelta
 
@dag(
    dag_id="pipeline_taskflow",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
    default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
    tags=["dados", "exemplo"],
)
def pipeline_taskflow():
 
    @task
    def extrair(data_ref: str) -> dict:
        # lógica de extração
        return {"registros": 1000, "data": data_ref}
 
    @task
    def transformar(dados: dict) -> list:
        return [{"id": i} for i in range(dados["registros"])]
 
    @task
    def carregar(registros: list) -> None:
        print(f"Carregando {len(registros)} registros")
 
    dados = extrair(data_ref="{{ ds }}")
    registros = transformar(dados)
    carregar(registros)
 
pipeline_taskflow()  # instancia o DAG

O XCom é gerenciado automaticamente: o retorno de cada @task vira input da próxima.

Configuração de agendamento

# Cron expression
schedule="0 6 * * *"       # todo dia às 06:00 UTC
schedule="0 6 * * 1"       # toda segunda-feira às 06:00 UTC
schedule="0 */4 * * *"     # a cada 4 horas
 
# Atalhos
schedule="@daily"           # equivale a "0 0 * * *"
schedule="@hourly"
schedule="@weekly"
schedule="@monthly"
schedule=None               # sem agendamento (disparo manual)
 
# Timedelta (Airflow 2.2+)
from datetime import timedelta
schedule=timedelta(hours=6)
 
# Dataset scheduling (Airflow 2.4+) — ver seção abaixo

Dataset Scheduling (data-aware)

DAGs podem ser disparadas automaticamente quando outros DAGs produzem um dataset.

from airflow import Dataset
 
# DAG produtora
dataset_pedidos = Dataset("s3://lake/bronze/pedidos/")
 
@dag(schedule="@daily")
def ingerir_pedidos():
    @task(outlets=[dataset_pedidos])  # declara que produz o dataset
    def carregar():
        pass
    carregar()
 
ingerir_pedidos()
 
# DAG consumidora — dispara automaticamente quando dataset_pedidos for atualizado
@dag(schedule=[dataset_pedidos])
def processar_pedidos():
    @task
    def transformar():
        pass
    transformar()
 
processar_pedidos()

Dynamic Task Mapping (Airflow 2.3+)

Cria tasks dinamicamente em tempo de execução, com base em dados do run anterior.

from airflow.decorators import dag, task
 
@dag(schedule="@daily", start_date=datetime(2026, 1, 1), catchup=False)
def pipeline_dinamico():
 
    @task
    def listar_arquivos() -> list[str]:
        # retorna lista de arquivos a processar
        return ["arquivo_A.csv", "arquivo_B.csv", "arquivo_C.csv"]
 
    @task
    def processar_arquivo(nome_arquivo: str) -> dict:
        print(f"Processando {nome_arquivo}")
        return {"arquivo": nome_arquivo, "linhas": 1000}
 
    @task
    def consolidar(resultados: list[dict]) -> None:
        total = sum(r["linhas"] for r in resultados)
        print(f"Total: {total} linhas processadas")
 
    arquivos = listar_arquivos()
    # .expand() cria uma TaskInstance por item da lista
    resultados = processar_arquivo.expand(nome_arquivo=arquivos)
    consolidar(resultados)
 
pipeline_dinamico()

Dependências entre tasks

# Operador >>
t1 >> t2 >> t3
 
# Múltiplos upstream
[t1, t2] >> t3
 
# Múltiplos downstream
t1 >> [t2, t3]
 
# Método set_upstream / set_downstream
t2.set_upstream(t1)
 
# Fan-out e fan-in
t_inicio >> [t_a, t_b, t_c] >> t_fim

Parâmetros (Params)

Permitem passar valores ao disparar o DAG manualmente via UI ou API.

from airflow.models.param import Param
 
@dag(
    params={
        "data_inicio": Param("2026-01-01", type="string", format="date"),
        "modo": Param("incremental", enum=["incremental", "full"]),
        "limite": Param(1000, type="integer", minimum=1),
    }
)
def pipeline_parametrizado():
 
    @task
    def processar(data_inicio: str = "{{ params.data_inicio }}",
                  modo: str = "{{ params.modo }}"):
        print(f"Modo: {modo}, desde: {data_inicio}")
 
    processar()

Callbacks e alertas

from airflow.utils.email import send_email
 
def on_failure_callback(context):
    dag_id = context["dag"].dag_id
    task_id = context["task_instance"].task_id
    erro = context.get("exception")
    send_email(
        to="[email protected]",
        subject=f"[FALHA] {dag_id} / {task_id}",
        html_content=f"<p>Task falhou: {erro}</p>",
    )
 
def on_success_callback(context):
    print("DAG concluída com sucesso")
 
with DAG(
    dag_id="pipeline_com_alertas",
    on_failure_callback=on_failure_callback,
    on_success_callback=on_success_callback,
    ...
) as dag:
    ...

Boas práticas

Estrutura do repositório de DAGs

dags/
├── dag_pedidos.py
├── dag_clientes.py
├── utils/           # helpers compartilhados
│   ├── __init__.py
│   ├── notificacoes.py
│   └── validacoes.py
└── sql/             # queries referenciadas nas tasks
    ├── pedidos_silver.sql
    └── ltv_gold.sql

O que fazer e o que evitar

FazerEvitar
Definir catchup=False por padrãoLógica pesada no nível do módulo (executada no parse)
Usar tags para organizar DAGs na UIImportar bibliotecas pesadas fora de funções
Configurar retries e retry_delayChamar Variable.get() ou acessar o banco no nível do módulo
Usar sensores com mode="reschedule" para waits longosTasks que duram mais de alguns minutos (particionar em steps menores)
Nomear tasks de forma descritiva (ingerir_pedidos, não task1)Hardcodar datas: use {{ ds }}
Passar dados grandes via storage externo + XCom com referênciaPassar DataFrames via XCom
Usar TaskGroup para organizar DAGs grandesDAGs com mais de ~50 tasks sem agrupamento
Testar DAGs localmente antes de fazer pushAlterar o start_date de uma DAG ativa (cria confusão de runs)

Idempotência

Cada task deve poder ser re-executada sem efeitos colaterais duplicados.

@task
def carregar_tabela(data: str) -> None:
    # RUIM: INSERT pode duplicar dados
    # hook.run("INSERT INTO tabela SELECT ...")
 
    # BOM: MERGE ou TRUNCATE+INSERT garante idempotência
    hook.run(f"""
        DELETE FROM tabela WHERE data_ref = '{data}';
        INSERT INTO tabela SELECT * FROM staging WHERE data_ref = '{data}';
    """)

Particionamento temporal

Prefira processar janelas de tempo específicas (o ds do run) em vez de processar “todos os dados novos”.

@task
def extrair(data_ref: str) -> None:
    query = f"""
        SELECT * FROM origem
        WHERE DATE(criado_em) = '{data_ref}'
    """
    # processa apenas o dia do run — re-executável sem duplicar

Testando DAGs localmente

# Testar se o DAG carrega sem erro de parse
python dags/meu_dag.py
 
# Listar tasks de um DAG
airflow tasks list meu_dag
 
# Executar uma task específica
airflow tasks test meu_dag nome_da_task 2026-01-01
 
# Disparar um DagRun manualmente
airflow dags trigger meu_dag --conf '{"modo": "full"}'

Ver também: airflow | airflow-conceitos | airflow-deploy-local | airflow-pipelines-batch | airflow-pipelines-streaming | python-engenharia-dados | pipeline-de-dados