Estilo clássico vs. TaskFlow API
O Airflow 2.0 introduziu a TaskFlow API como forma pythônica de definir DAGs. Os dois estilos coexistem e podem ser misturados.
Estilo clássico (context manager)
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def extrair(**context):
data = context["ds"]
# lógica de extração
return {"registros": 1000}
def carregar(**context):
ti = context["ti"]
resultado = ti.xcom_pull(task_ids="extrair")
print(f"Carregando {resultado['registros']} registros")
with DAG(
dag_id="pipeline_classico",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
default_args={
"retries": 2,
"retry_delay": timedelta(minutes=5),
"owner": "data-eng",
},
tags=["dados", "exemplo"],
) as dag:
t_extrair = PythonOperator(
task_id="extrair",
python_callable=extrair,
)
t_carregar = PythonOperator(
task_id="carregar",
python_callable=carregar,
)
t_extrair >> t_carregarTaskFlow API (recomendado para código Python)
from airflow.decorators import dag, task
from datetime import datetime, timedelta
@dag(
dag_id="pipeline_taskflow",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
tags=["dados", "exemplo"],
)
def pipeline_taskflow():
@task
def extrair(data_ref: str) -> dict:
# lógica de extração
return {"registros": 1000, "data": data_ref}
@task
def transformar(dados: dict) -> list:
return [{"id": i} for i in range(dados["registros"])]
@task
def carregar(registros: list) -> None:
print(f"Carregando {len(registros)} registros")
dados = extrair(data_ref="{{ ds }}")
registros = transformar(dados)
carregar(registros)
pipeline_taskflow() # instancia o DAGO XCom é gerenciado automaticamente: o retorno de cada @task vira input da próxima.
Configuração de agendamento
# Cron expression
schedule="0 6 * * *" # todo dia às 06:00 UTC
schedule="0 6 * * 1" # toda segunda-feira às 06:00 UTC
schedule="0 */4 * * *" # a cada 4 horas
# Atalhos
schedule="@daily" # equivale a "0 0 * * *"
schedule="@hourly"
schedule="@weekly"
schedule="@monthly"
schedule=None # sem agendamento (disparo manual)
# Timedelta (Airflow 2.2+)
from datetime import timedelta
schedule=timedelta(hours=6)
# Dataset scheduling (Airflow 2.4+) — ver seção abaixoDataset Scheduling (data-aware)
DAGs podem ser disparadas automaticamente quando outros DAGs produzem um dataset.
from airflow import Dataset
# DAG produtora
dataset_pedidos = Dataset("s3://lake/bronze/pedidos/")
@dag(schedule="@daily")
def ingerir_pedidos():
@task(outlets=[dataset_pedidos]) # declara que produz o dataset
def carregar():
pass
carregar()
ingerir_pedidos()
# DAG consumidora — dispara automaticamente quando dataset_pedidos for atualizado
@dag(schedule=[dataset_pedidos])
def processar_pedidos():
@task
def transformar():
pass
transformar()
processar_pedidos()Dynamic Task Mapping (Airflow 2.3+)
Cria tasks dinamicamente em tempo de execução, com base em dados do run anterior.
from airflow.decorators import dag, task
@dag(schedule="@daily", start_date=datetime(2026, 1, 1), catchup=False)
def pipeline_dinamico():
@task
def listar_arquivos() -> list[str]:
# retorna lista de arquivos a processar
return ["arquivo_A.csv", "arquivo_B.csv", "arquivo_C.csv"]
@task
def processar_arquivo(nome_arquivo: str) -> dict:
print(f"Processando {nome_arquivo}")
return {"arquivo": nome_arquivo, "linhas": 1000}
@task
def consolidar(resultados: list[dict]) -> None:
total = sum(r["linhas"] for r in resultados)
print(f"Total: {total} linhas processadas")
arquivos = listar_arquivos()
# .expand() cria uma TaskInstance por item da lista
resultados = processar_arquivo.expand(nome_arquivo=arquivos)
consolidar(resultados)
pipeline_dinamico()Dependências entre tasks
# Operador >>
t1 >> t2 >> t3
# Múltiplos upstream
[t1, t2] >> t3
# Múltiplos downstream
t1 >> [t2, t3]
# Método set_upstream / set_downstream
t2.set_upstream(t1)
# Fan-out e fan-in
t_inicio >> [t_a, t_b, t_c] >> t_fimParâmetros (Params)
Permitem passar valores ao disparar o DAG manualmente via UI ou API.
from airflow.models.param import Param
@dag(
params={
"data_inicio": Param("2026-01-01", type="string", format="date"),
"modo": Param("incremental", enum=["incremental", "full"]),
"limite": Param(1000, type="integer", minimum=1),
}
)
def pipeline_parametrizado():
@task
def processar(data_inicio: str = "{{ params.data_inicio }}",
modo: str = "{{ params.modo }}"):
print(f"Modo: {modo}, desde: {data_inicio}")
processar()Callbacks e alertas
from airflow.utils.email import send_email
def on_failure_callback(context):
dag_id = context["dag"].dag_id
task_id = context["task_instance"].task_id
erro = context.get("exception")
send_email(
to="[email protected]",
subject=f"[FALHA] {dag_id} / {task_id}",
html_content=f"<p>Task falhou: {erro}</p>",
)
def on_success_callback(context):
print("DAG concluída com sucesso")
with DAG(
dag_id="pipeline_com_alertas",
on_failure_callback=on_failure_callback,
on_success_callback=on_success_callback,
...
) as dag:
...Boas práticas
Estrutura do repositório de DAGs
dags/
├── dag_pedidos.py
├── dag_clientes.py
├── utils/ # helpers compartilhados
│ ├── __init__.py
│ ├── notificacoes.py
│ └── validacoes.py
└── sql/ # queries referenciadas nas tasks
├── pedidos_silver.sql
└── ltv_gold.sql
O que fazer e o que evitar
| Fazer | Evitar |
|---|---|
Definir catchup=False por padrão | Lógica pesada no nível do módulo (executada no parse) |
Usar tags para organizar DAGs na UI | Importar bibliotecas pesadas fora de funções |
Configurar retries e retry_delay | Chamar Variable.get() ou acessar o banco no nível do módulo |
Usar sensores com mode="reschedule" para waits longos | Tasks que duram mais de alguns minutos (particionar em steps menores) |
Nomear tasks de forma descritiva (ingerir_pedidos, não task1) | Hardcodar datas: use {{ ds }} |
| Passar dados grandes via storage externo + XCom com referência | Passar DataFrames via XCom |
Usar TaskGroup para organizar DAGs grandes | DAGs com mais de ~50 tasks sem agrupamento |
| Testar DAGs localmente antes de fazer push | Alterar o start_date de uma DAG ativa (cria confusão de runs) |
Idempotência
Cada task deve poder ser re-executada sem efeitos colaterais duplicados.
@task
def carregar_tabela(data: str) -> None:
# RUIM: INSERT pode duplicar dados
# hook.run("INSERT INTO tabela SELECT ...")
# BOM: MERGE ou TRUNCATE+INSERT garante idempotência
hook.run(f"""
DELETE FROM tabela WHERE data_ref = '{data}';
INSERT INTO tabela SELECT * FROM staging WHERE data_ref = '{data}';
""")Particionamento temporal
Prefira processar janelas de tempo específicas (o ds do run) em vez de processar “todos os dados novos”.
@task
def extrair(data_ref: str) -> None:
query = f"""
SELECT * FROM origem
WHERE DATE(criado_em) = '{data_ref}'
"""
# processa apenas o dia do run — re-executável sem duplicarTestando DAGs localmente
# Testar se o DAG carrega sem erro de parse
python dags/meu_dag.py
# Listar tasks de um DAG
airflow tasks list meu_dag
# Executar uma task específica
airflow tasks test meu_dag nome_da_task 2026-01-01
# Disparar um DagRun manualmente
airflow dags trigger meu_dag --conf '{"modo": "full"}'Ver também: airflow | airflow-conceitos | airflow-deploy-local | airflow-pipelines-batch | airflow-pipelines-streaming | python-engenharia-dados | pipeline-de-dados