DAG (Directed Acyclic Graph)

O DAG é a unidade central do Airflow. Define o conjunto de tarefas e as dependências entre elas. “Acyclic” significa que não pode haver ciclos: um task não pode depender de si mesmo, direta ou indiretamente.

graph LR
    A[extrair] --> B[validar]
    B --> C[transformar]
    C --> D[carregar_dw]
    C --> E[carregar_lake]
    D --> F[notificar]
    E --> F

Cada execução do DAG é um DagRun, identificado por um run_id e um logical_date (data lógica de processamento, antes chamada de execution_date).

Estados de um DagRun

EstadoSignificado
runningPelo menos uma task em execução
successTodas as tasks concluídas com sucesso
failedPelo menos uma task falhou e não há mais tentativas
queuedAguardando o executor

Task e TaskInstance

Task é a definição de uma unidade de trabalho dentro do DAG. TaskInstance é a execução concreta de uma Task em um DagRun específico.

Estados de uma TaskInstance

none → scheduled → queued → running → success
                                    → failed → up_for_retry → queued
                                    → skipped

Operators

Operators são os blocos de construção das tasks. Cada operator encapsula uma ação específica.

Categorias

TipoDescriçãoExemplos
Action OperatorsExecutam uma ação diretamentePythonOperator, BashOperator, EmailOperator
Transfer OperatorsMovem dados entre sistemasGCSToGCSOperator, S3ToRedshiftOperator
SensorsAguardam uma condição externaFileSensor, HttpSensor, S3KeySensor

Operators mais comuns

from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator  # placeholder / join
from airflow.operators.email import EmailOperator
 
processar = PythonOperator(
    task_id="processar_dados",
    python_callable=minha_funcao,
    op_kwargs={"data_ref": "{{ ds }}"},  # templating Jinja
)
 
executar_script = BashOperator(
    task_id="executar_script",
    bash_command="python /opt/scripts/processar.py --data {{ ds }}",
)

Sensors

Sensors são operators especiais que ficam verificando uma condição até ela ser verdadeira (ou atingir timeout).

from airflow.sensors.filesystem import FileSensor
from airflow.sensors.http_sensor import HttpSensor
 
aguardar_arquivo = FileSensor(
    task_id="aguardar_arquivo",
    filepath="/data/input/arquivo_{{ ds_nodash }}.csv",
    poke_interval=60,    # verifica a cada 60s
    timeout=3600,        # falha após 1h
    mode="poke",         # mantém worker ocupado
)
 
# mode="reschedule": libera o worker entre verificações (recomendado para waits longos)
aguardar_api = HttpSensor(
    task_id="aguardar_api",
    http_conn_id="api_externa",
    endpoint="/status",
    poke_interval=30,
    mode="reschedule",
)

Sensores Deferríveis (Airflow 2.2+)

Usam async I/O via Triggerer: não bloqueiam workers durante a espera. Ideal para waits longos.

from airflow.providers.google.cloud.sensors.bigquery import BigQueryTableExistenceSensor
 
# AsyncBigQueryTableExistenceSensor é deferrable por padrão no provider
aguardar_tabela = BigQueryTableExistenceSensor(
    task_id="aguardar_tabela",
    project_id="meu-projeto",
    dataset_id="staging",
    table_id="pedidos_{{ ds_nodash }}",
    deferrable=True,
)

Hooks

Hooks são interfaces para sistemas externos (banco de dados, APIs, cloud storage). Os operators usam hooks internamente, mas você pode usá-los diretamente em PythonOperator.

from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.google.cloud.hooks.gcs import GCSHook
 
def consultar_postgres(**context):
    hook = PostgresHook(postgres_conn_id="postgres_dw")
    df = hook.get_pandas_df("SELECT * FROM pedidos WHERE data = %s", parameters=["{{ ds }}"])
    return df.to_dict()
 
def upload_gcs(**context):
    hook = GCSHook(gcp_conn_id="google_cloud_default")
    hook.upload(
        bucket_name="meu-bucket",
        object_name=f"output/{{ ds }}/resultado.parquet",
        filename="/tmp/resultado.parquet",
    )

Connections

Connections armazenam credenciais e endpoints para sistemas externos. São gerenciadas no banco de metadados e acessíveis via ID.

  • Criação: Admin → Connections na UI, ou via CLI: airflow connections add
  • Variáveis de ambiente: AIRFLOW_CONN_{CONN_ID} em formato URI
# Definir connection via variável de ambiente (sem tocar no banco)
export AIRFLOW_CONN_POSTGRES_DW="postgresql://user:senha@host:5432/database"
export AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT="google-cloud-platform://?project=meu-projeto"

Variables

Variables são pares chave-valor globais, acessíveis em qualquer DAG ou task.

from airflow.models import Variable
 
# Leitura simples
bucket = Variable.get("gcs_bucket_nome")
 
# Com valor padrão e deserialização JSON
config = Variable.get("pipeline_config", deserialize_json=True, default_var={})
data_limite = config.get("data_limite", "2026-01-01")

Evite chamar Variable.get() no nível do módulo (fora de funções/operators). O scheduler parseia os DAGs frequentemente e acessar o banco a cada parse é custoso. Use templates Jinja ou default_args quando possível.

XCom (Cross-Communication)

XCom permite que tasks troquem pequenos valores entre si. Os valores são armazenados no metadata DB.

# Task que produz um valor
def extrair(**context):
    resultado = {"total": 1500, "status": "ok"}
    context["ti"].xcom_push(key="resultado_extracao", value=resultado)
 
# Task que consome o valor
def processar(**context):
    ti = context["ti"]
    resultado = ti.xcom_pull(task_ids="extrair", key="resultado_extracao")
    print(f"Total: {resultado['total']}")

Com a TaskFlow API, o XCom é implícito via retorno de função:

@task
def extrair():
    return {"total": 1500, "status": "ok"}
 
@task
def processar(dados):
    print(f"Total: {dados['total']}")
 
# Na DAG: processar(extrair()) — o XCom é gerenciado automaticamente

XCom não é adequado para DataFrames ou objetos grandes. Use armazenamento externo (GCS, S3, banco) e passe apenas o caminho/referência via XCom.

TaskGroup

TaskGroups agrupam tasks visualmente na UI e permitem organizar DAGs complexas.

from airflow.utils.task_group import TaskGroup
 
with TaskGroup("ingestao") as grupo_ingestao:
    extrair_pedidos = PythonOperator(task_id="extrair_pedidos", ...)
    extrair_clientes = PythonOperator(task_id="extrair_clientes", ...)
 
with TaskGroup("transformacao") as grupo_transformacao:
    transformar = PythonOperator(task_id="transformar", ...)
 
grupo_ingestao >> grupo_transformacao

Pools

Pools limitam a execução paralela de tasks, evitando sobrecarga em sistemas externos.

# Criação: Admin → Pools na UI, ou via CLI:
# airflow pools set api_externa 5 "Limite de 5 calls paralelos para a API externa"
 
consultar_api = PythonOperator(
    task_id="consultar_api",
    python_callable=chamar_api,
    pool="api_externa",  # task só executa se houver slot disponível no pool
    pool_slots=1,
)

Templating Jinja

O Airflow suporta templates Jinja nos parâmetros dos operators. As variáveis mais usadas:

VariávelValor
{{ ds }}logical_date no formato YYYY-MM-DD
{{ ds_nodash }}logical_date no formato YYYYMMDD
{{ ts }}Timestamp ISO completo
{{ dag.dag_id }}ID do DAG
{{ run_id }}ID único do DagRun
{{ prev_ds }}ds da execução anterior
{{ next_ds }}ds da próxima execução programada
{{ params.chave }}Parâmetros passados ao DagRun
carregar = BigQueryInsertJobOperator(
    task_id="carregar",
    configuration={
        "query": {
            "query": """
                INSERT INTO `projeto.dataset.tabela`
                SELECT * FROM `projeto.staging.pedidos_{{ ds_nodash }}`
            """,
            "useLegacySql": False,
        }
    },
)

Trigger Rules

Por padrão, uma task só executa quando todas as tasks upstream concluíram com sucesso (all_success). Esse comportamento pode ser alterado:

Trigger RuleExecuta quando…
all_success (padrão)Todas upstream tiveram sucesso
all_failedTodas upstream falharam
all_doneTodas upstream concluíram (qualquer estado)
one_failedPelo menos uma upstream falhou
one_successPelo menos uma upstream teve sucesso
none_failedNenhuma upstream falhou (success ou skipped)
notificar_falha = PythonOperator(
    task_id="notificar_falha",
    python_callable=enviar_alerta,
    trigger_rule="one_failed",  # executa se qualquer task anterior falhou
)

Ver também: airflow | airflow-dag-desenvolvimento | airflow-pipelines-batch | airflow-pipelines-streaming | pipeline-de-dados