DAG (Directed Acyclic Graph)
O DAG é a unidade central do Airflow. Define o conjunto de tarefas e as dependências entre elas. “Acyclic” significa que não pode haver ciclos: um task não pode depender de si mesmo, direta ou indiretamente.
graph LR A[extrair] --> B[validar] B --> C[transformar] C --> D[carregar_dw] C --> E[carregar_lake] D --> F[notificar] E --> F
Cada execução do DAG é um DagRun, identificado por um run_id e um logical_date (data lógica de processamento, antes chamada de execution_date).
Estados de um DagRun
| Estado | Significado |
|---|---|
running | Pelo menos uma task em execução |
success | Todas as tasks concluídas com sucesso |
failed | Pelo menos uma task falhou e não há mais tentativas |
queued | Aguardando o executor |
Task e TaskInstance
Task é a definição de uma unidade de trabalho dentro do DAG. TaskInstance é a execução concreta de uma Task em um DagRun específico.
Estados de uma TaskInstance
none → scheduled → queued → running → success
→ failed → up_for_retry → queued
→ skipped
Operators
Operators são os blocos de construção das tasks. Cada operator encapsula uma ação específica.
Categorias
| Tipo | Descrição | Exemplos |
|---|---|---|
| Action Operators | Executam uma ação diretamente | PythonOperator, BashOperator, EmailOperator |
| Transfer Operators | Movem dados entre sistemas | GCSToGCSOperator, S3ToRedshiftOperator |
| Sensors | Aguardam uma condição externa | FileSensor, HttpSensor, S3KeySensor |
Operators mais comuns
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator # placeholder / join
from airflow.operators.email import EmailOperator
processar = PythonOperator(
task_id="processar_dados",
python_callable=minha_funcao,
op_kwargs={"data_ref": "{{ ds }}"}, # templating Jinja
)
executar_script = BashOperator(
task_id="executar_script",
bash_command="python /opt/scripts/processar.py --data {{ ds }}",
)Sensors
Sensors são operators especiais que ficam verificando uma condição até ela ser verdadeira (ou atingir timeout).
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.http_sensor import HttpSensor
aguardar_arquivo = FileSensor(
task_id="aguardar_arquivo",
filepath="/data/input/arquivo_{{ ds_nodash }}.csv",
poke_interval=60, # verifica a cada 60s
timeout=3600, # falha após 1h
mode="poke", # mantém worker ocupado
)
# mode="reschedule": libera o worker entre verificações (recomendado para waits longos)
aguardar_api = HttpSensor(
task_id="aguardar_api",
http_conn_id="api_externa",
endpoint="/status",
poke_interval=30,
mode="reschedule",
)Sensores Deferríveis (Airflow 2.2+)
Usam async I/O via Triggerer: não bloqueiam workers durante a espera. Ideal para waits longos.
from airflow.providers.google.cloud.sensors.bigquery import BigQueryTableExistenceSensor
# AsyncBigQueryTableExistenceSensor é deferrable por padrão no provider
aguardar_tabela = BigQueryTableExistenceSensor(
task_id="aguardar_tabela",
project_id="meu-projeto",
dataset_id="staging",
table_id="pedidos_{{ ds_nodash }}",
deferrable=True,
)Hooks
Hooks são interfaces para sistemas externos (banco de dados, APIs, cloud storage). Os operators usam hooks internamente, mas você pode usá-los diretamente em PythonOperator.
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.google.cloud.hooks.gcs import GCSHook
def consultar_postgres(**context):
hook = PostgresHook(postgres_conn_id="postgres_dw")
df = hook.get_pandas_df("SELECT * FROM pedidos WHERE data = %s", parameters=["{{ ds }}"])
return df.to_dict()
def upload_gcs(**context):
hook = GCSHook(gcp_conn_id="google_cloud_default")
hook.upload(
bucket_name="meu-bucket",
object_name=f"output/{{ ds }}/resultado.parquet",
filename="/tmp/resultado.parquet",
)Connections
Connections armazenam credenciais e endpoints para sistemas externos. São gerenciadas no banco de metadados e acessíveis via ID.
- Criação: Admin → Connections na UI, ou via CLI:
airflow connections add - Variáveis de ambiente:
AIRFLOW_CONN_{CONN_ID}em formato URI
# Definir connection via variável de ambiente (sem tocar no banco)
export AIRFLOW_CONN_POSTGRES_DW="postgresql://user:senha@host:5432/database"
export AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT="google-cloud-platform://?project=meu-projeto"Variables
Variables são pares chave-valor globais, acessíveis em qualquer DAG ou task.
from airflow.models import Variable
# Leitura simples
bucket = Variable.get("gcs_bucket_nome")
# Com valor padrão e deserialização JSON
config = Variable.get("pipeline_config", deserialize_json=True, default_var={})
data_limite = config.get("data_limite", "2026-01-01")Evite chamar
Variable.get()no nível do módulo (fora de funções/operators). O scheduler parseia os DAGs frequentemente e acessar o banco a cada parse é custoso. Use templates Jinja oudefault_argsquando possível.
XCom (Cross-Communication)
XCom permite que tasks troquem pequenos valores entre si. Os valores são armazenados no metadata DB.
# Task que produz um valor
def extrair(**context):
resultado = {"total": 1500, "status": "ok"}
context["ti"].xcom_push(key="resultado_extracao", value=resultado)
# Task que consome o valor
def processar(**context):
ti = context["ti"]
resultado = ti.xcom_pull(task_ids="extrair", key="resultado_extracao")
print(f"Total: {resultado['total']}")Com a TaskFlow API, o XCom é implícito via retorno de função:
@task
def extrair():
return {"total": 1500, "status": "ok"}
@task
def processar(dados):
print(f"Total: {dados['total']}")
# Na DAG: processar(extrair()) — o XCom é gerenciado automaticamenteXCom não é adequado para DataFrames ou objetos grandes. Use armazenamento externo (GCS, S3, banco) e passe apenas o caminho/referência via XCom.
TaskGroup
TaskGroups agrupam tasks visualmente na UI e permitem organizar DAGs complexas.
from airflow.utils.task_group import TaskGroup
with TaskGroup("ingestao") as grupo_ingestao:
extrair_pedidos = PythonOperator(task_id="extrair_pedidos", ...)
extrair_clientes = PythonOperator(task_id="extrair_clientes", ...)
with TaskGroup("transformacao") as grupo_transformacao:
transformar = PythonOperator(task_id="transformar", ...)
grupo_ingestao >> grupo_transformacaoPools
Pools limitam a execução paralela de tasks, evitando sobrecarga em sistemas externos.
# Criação: Admin → Pools na UI, ou via CLI:
# airflow pools set api_externa 5 "Limite de 5 calls paralelos para a API externa"
consultar_api = PythonOperator(
task_id="consultar_api",
python_callable=chamar_api,
pool="api_externa", # task só executa se houver slot disponível no pool
pool_slots=1,
)Templating Jinja
O Airflow suporta templates Jinja nos parâmetros dos operators. As variáveis mais usadas:
| Variável | Valor |
|---|---|
{{ ds }} | logical_date no formato YYYY-MM-DD |
{{ ds_nodash }} | logical_date no formato YYYYMMDD |
{{ ts }} | Timestamp ISO completo |
{{ dag.dag_id }} | ID do DAG |
{{ run_id }} | ID único do DagRun |
{{ prev_ds }} | ds da execução anterior |
{{ next_ds }} | ds da próxima execução programada |
{{ params.chave }} | Parâmetros passados ao DagRun |
carregar = BigQueryInsertJobOperator(
task_id="carregar",
configuration={
"query": {
"query": """
INSERT INTO `projeto.dataset.tabela`
SELECT * FROM `projeto.staging.pedidos_{{ ds_nodash }}`
""",
"useLegacySql": False,
}
},
)Trigger Rules
Por padrão, uma task só executa quando todas as tasks upstream concluíram com sucesso (all_success). Esse comportamento pode ser alterado:
| Trigger Rule | Executa quando… |
|---|---|
all_success (padrão) | Todas upstream tiveram sucesso |
all_failed | Todas upstream falharam |
all_done | Todas upstream concluíram (qualquer estado) |
one_failed | Pelo menos uma upstream falhou |
one_success | Pelo menos uma upstream teve sucesso |
none_failed | Nenhuma upstream falhou (success ou skipped) |
notificar_falha = PythonOperator(
task_id="notificar_falha",
python_callable=enviar_alerta,
trigger_rule="one_failed", # executa se qualquer task anterior falhou
)Ver também: airflow | airflow-dag-desenvolvimento | airflow-pipelines-batch | airflow-pipelines-streaming | pipeline-de-dados