Pipelines batch processam conjuntos de dados em intervalos definidos. O Airflow é otimizado para esse padrão: cada DagRun processa uma janela de tempo específica (o ds), garantindo idempotência e rastreabilidade.

Pipeline 1: Ingestão incremental de API para Data Lake

Busca dados de uma API REST e salva no GCS/S3 particionado por data.

from airflow.decorators import dag, task
from datetime import datetime, timedelta
import requests, json
 
@dag(
    dag_id="ingestao_api_pedidos",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=True,   # True para backfill histórico
    default_args={
        "retries": 3,
        "retry_delay": timedelta(minutes=10),
        "retry_exponential_backoff": True,
    },
    tags=["ingestao", "api", "pedidos"],
)
def ingestao_api_pedidos():
 
    @task
    def extrair_pedidos(data_ref: str) -> str:
        """Busca pedidos da API para a data de referência."""
        response = requests.get(
            "https://api.empresa.com/v1/pedidos",
            params={"data": data_ref, "pagina": 1, "limite": 10000},
            headers={"Authorization": "Bearer {{ var.value.api_token }}"},
            timeout=30,
        )
        response.raise_for_status()
 
        caminho_local = f"/tmp/pedidos_{data_ref.replace('-', '')}.json"
        with open(caminho_local, "w") as f:
            json.dump(response.json(), f)
 
        return caminho_local
 
    @task
    def validar_e_transformar(caminho_local: str, data_ref: str) -> str:
        import pandas as pd
 
        df = pd.read_json(caminho_local)
        assert len(df) > 0, f"Nenhum registro encontrado para {data_ref}"
        assert "pedido_id" in df.columns, "Coluna pedido_id ausente"
 
        df["data_carga"] = datetime.utcnow()
        df["data_ref"] = data_ref
 
        caminho_parquet = f"/tmp/pedidos_{data_ref.replace('-', '')}.parquet"
        df.to_parquet(caminho_parquet, index=False)
        return caminho_parquet
 
    @task
    def carregar_gcs(caminho_parquet: str, data_ref: str) -> None:
        from airflow.providers.google.cloud.hooks.gcs import GCSHook
 
        hook = GCSHook(gcp_conn_id="google_cloud_default")
        data_formatada = data_ref.replace("-", "")
        hook.upload(
            bucket_name="data-lake-prod",
            object_name=f"bronze/pedidos/dt={data_formatada}/pedidos.parquet",
            filename=caminho_parquet,
            mime_type="application/octet-stream",
        )
 
    @task
    def limpar_temporarios(caminhos: list[str]) -> None:
        import os
        for caminho in caminhos:
            if os.path.exists(caminho):
                os.remove(caminho)
 
    data_ref = "{{ ds }}"
    arquivo_json = extrair_pedidos(data_ref=data_ref)
    arquivo_parquet = validar_e_transformar(caminho_local=arquivo_json, data_ref=data_ref)
    carregar_gcs(caminho_parquet=arquivo_parquet, data_ref=data_ref)
    limpar_temporarios(caminhos=[arquivo_json, arquivo_parquet])
 
ingestao_api_pedidos()

Pipeline 2: ETL completo com arquitetura medalhão (BigQuery)

Processa dados da camada Bronze para Silver e Gold no BigQuery.

from airflow.decorators import dag, task
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta
 
PROJECT = "meu-projeto-gcp"
DATASET_BRONZE = "bronze"
DATASET_SILVER = "silver"
DATASET_GOLD = "gold"
 
@dag(
    dag_id="etl_pedidos_medallion",
    start_date=datetime(2026, 1, 1),
    schedule="0 3 * * *",   # todo dia às 03:00 UTC
    catchup=False,
    default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
    tags=["etl", "bigquery", "medallion"],
)
def etl_pedidos_medallion():
 
    with TaskGroup("bronze_para_silver") as silver_tasks:
 
        limpar_silver = BigQueryInsertJobOperator(
            task_id="limpar_silver_pedidos",
            configuration={
                "query": {
                    "query": f"""
                        DELETE FROM `{PROJECT}.{DATASET_SILVER}.pedidos`
                        WHERE DATE(data_pedido) = '{{{{ ds }}}}'
                    """,
                    "useLegacySql": False,
                }
            },
        )
 
        transformar_silver = BigQueryInsertJobOperator(
            task_id="transformar_silver_pedidos",
            configuration={
                "query": {
                    "query": f"""
                        INSERT INTO `{PROJECT}.{DATASET_SILVER}.pedidos`
                        SELECT
                            pedido_id,
                            cliente_id,
                            CAST(valor_total AS NUMERIC)       AS valor_total,
                            LOWER(TRIM(status))                AS status,
                            TIMESTAMP(data_pedido)             AS data_pedido,
                            DATE(data_pedido)                  AS data_ref,
                            CURRENT_TIMESTAMP()                AS atualizado_em
                        FROM `{PROJECT}.{DATASET_BRONZE}.pedidos`
                        WHERE DATE(_PARTITIONTIME) = '{{{{ ds }}}}'
                          AND pedido_id IS NOT NULL
                          AND valor_total > 0
                    """,
                    "useLegacySql": False,
                }
            },
        )
 
        limpar_silver >> transformar_silver
 
    with TaskGroup("silver_para_gold") as gold_tasks:
 
        calcular_ltv = BigQueryInsertJobOperator(
            task_id="calcular_ltv_clientes",
            configuration={
                "query": {
                    "query": f"""
                        CREATE OR REPLACE TABLE `{PROJECT}.{DATASET_GOLD}.ltv_clientes` AS
                        SELECT
                            cliente_id,
                            COUNT(DISTINCT pedido_id)        AS qtd_pedidos,
                            SUM(valor_total)                 AS ltv_total,
                            AVG(valor_total)                 AS ticket_medio,
                            MIN(data_pedido)                 AS primeira_compra,
                            MAX(data_pedido)                 AS ultima_compra,
                            DATE_DIFF(MAX(data_pedido), MIN(data_pedido), DAY) AS dias_ativo
                        FROM `{PROJECT}.{DATASET_SILVER}.pedidos`
                        WHERE status = 'concluido'
                        GROUP BY 1
                    """,
                    "useLegacySql": False,
                }
            },
        )
 
    @task
    def validar_contagens() -> None:
        from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
 
        hook = BigQueryHook(gcp_conn_id="google_cloud_default")
        client = hook.get_client()
 
        query = f"""
            SELECT
                (SELECT COUNT(*) FROM `{PROJECT}.{DATASET_BRONZE}.pedidos`
                 WHERE DATE(_PARTITIONTIME) = '{{{{ ds }}}}') AS bronze_count,
                (SELECT COUNT(*) FROM `{PROJECT}.{DATASET_SILVER}.pedidos`
                 WHERE data_ref = '{{{{ ds }}}}') AS silver_count
        """
        resultado = list(client.query(query))[0]
        bronze = resultado["bronze_count"]
        silver = resultado["silver_count"]
 
        if silver == 0:
            raise ValueError(f"Silver vazio! Bronze tinha {bronze} registros.")
        if silver < bronze * 0.95:
            raise ValueError(f"Perda > 5%: bronze={bronze}, silver={silver}")
 
    silver_tasks >> gold_tasks >> validar_contagens()
 
etl_pedidos_medallion()

Pipeline 3: Spark na arquitetura medalhão (AWS + EMR)

Dispara um job Spark via EMR para processar dados em larga escala.

from airflow.decorators import dag, task
from airflow.providers.amazon.aws.operators.emr import EmrAddStepsOperator
from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from datetime import datetime, timedelta
 
EMR_CLUSTER_ID = "j-XXXXXXXXXXXXX"
S3_BUCKET = "data-lake-prod"
S3_SCRIPTS = f"s3://{S3_BUCKET}/scripts/spark"
 
@dag(
    dag_id="spark_emr_pedidos",
    start_date=datetime(2026, 1, 1),
    schedule="0 4 * * *",
    catchup=False,
    default_args={"retries": 1, "retry_delay": timedelta(minutes=15)},
    tags=["spark", "emr", "batch"],
)
def spark_emr_pedidos():
 
    # 1. Aguardar arquivo de ingestão
    aguardar_raw = S3KeySensor(
        task_id="aguardar_raw",
        bucket_name=S3_BUCKET,
        bucket_key="raw/pedidos/{{ ds_nodash }}/_SUCCESS",
        aws_conn_id="aws_default",
        mode="reschedule",
        poke_interval=300,
        timeout=7200,
    )
 
    # 2. Submeter step Spark no EMR
    submeter_spark = EmrAddStepsOperator(
        task_id="submeter_spark",
        job_flow_id=EMR_CLUSTER_ID,
        steps=[
            {
                "Name": "transformar-pedidos-{{ ds }}",
                "ActionOnFailure": "CONTINUE",
                "HadoopJarStep": {
                    "Jar": "command-runner.jar",
                    "Args": [
                        "spark-submit",
                        "--deploy-mode", "cluster",
                        "--driver-memory", "4g",
                        "--executor-memory", "8g",
                        "--executor-cores", "4",
                        "--num-executors", "10",
                        f"{S3_SCRIPTS}/transformar_pedidos.py",
                        "--data-date", "{{ ds }}",
                        "--input-path", f"s3://{S3_BUCKET}/raw/pedidos/{{{{ ds_nodash }}}}/",
                        "--output-path", f"s3://{S3_BUCKET}/silver/pedidos/",
                    ],
                },
            }
        ],
        aws_conn_id="aws_default",
    )
 
    # 3. Aguardar conclusão do step
    aguardar_spark = EmrStepSensor(
        task_id="aguardar_spark",
        job_flow_id=EMR_CLUSTER_ID,
        step_id="{{ task_instance.xcom_pull('submeter_spark')[0] }}",
        aws_conn_id="aws_default",
        mode="reschedule",
        poke_interval=60,
    )
 
    # 4. Validar output
    validar_output = S3KeySensor(
        task_id="validar_output",
        bucket_name=S3_BUCKET,
        bucket_key="silver/pedidos/dt={{ ds_nodash }}/_SUCCESS",
        aws_conn_id="aws_default",
        mode="reschedule",
        poke_interval=60,
        timeout=300,
    )
 
    aguardar_raw >> submeter_spark >> aguardar_spark >> validar_output
 
spark_emr_pedidos()

Pipeline 4: dbt + Airflow (orquestrar modelos dbt)

from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
from datetime import datetime
 
DBT_PROJECT_DIR = "/opt/airflow/dbt/projeto"
DBT_PROFILES_DIR = "/opt/airflow/dbt/profiles"
 
@dag(
    dag_id="dbt_transformacoes",
    start_date=datetime(2026, 1, 1),
    schedule="0 5 * * *",
    catchup=False,
    tags=["dbt", "transformacao"],
)
def dbt_transformacoes():
 
    rodar_dbt = BashOperator(
        task_id="rodar_dbt_run",
        bash_command=f"""
            cd {DBT_PROJECT_DIR} && \
            dbt run \
              --profiles-dir {DBT_PROFILES_DIR} \
              --target prod \
              --vars '{{"data_ref": "{{{{ ds }}}}"}}' \
              --select tag:daily
        """,
    )
 
    testar_dbt = BashOperator(
        task_id="testar_dbt_test",
        bash_command=f"""
            cd {DBT_PROJECT_DIR} && \
            dbt test \
              --profiles-dir {DBT_PROFILES_DIR} \
              --target prod \
              --select tag:daily
        """,
    )
 
    rodar_dbt >> testar_dbt
 
dbt_transformacoes()

Pipeline 5: Backfill de múltiplas partições

Usa Dynamic Task Mapping para reprocessar várias datas em um único DagRun.

from airflow.decorators import dag, task
from datetime import datetime, date, timedelta
 
@dag(
    dag_id="backfill_pedidos",
    start_date=datetime(2026, 1, 1),
    schedule=None,   # disparo manual
    catchup=False,
    params={
        "data_inicio": "2026-01-01",
        "data_fim": "2026-01-31",
    },
    tags=["backfill", "utilitario"],
)
def backfill_pedidos():
 
    @task
    def gerar_datas(data_inicio: str, data_fim: str) -> list[str]:
        inicio = date.fromisoformat(data_inicio)
        fim = date.fromisoformat(data_fim)
        datas = []
        atual = inicio
        while atual <= fim:
            datas.append(atual.isoformat())
            atual += timedelta(days=1)
        return datas
 
    @task(max_active_tis_per_dag=5)  # processa até 5 datas em paralelo
    def processar_data(data_ref: str) -> dict:
        print(f"Reprocessando {data_ref}")
        # lógica de reprocessamento aqui
        return {"data": data_ref, "status": "ok"}
 
    @task
    def resumo(resultados: list[dict]) -> None:
        sucesso = sum(1 for r in resultados if r["status"] == "ok")
        print(f"Backfill concluído: {sucesso}/{len(resultados)} datas processadas")
 
    datas = gerar_datas(
        data_inicio="{{ params.data_inicio }}",
        data_fim="{{ params.data_fim }}",
    )
    resultados = processar_data.expand(data_ref=datas)
    resumo(resultados)
 
backfill_pedidos()

Ver também: airflow | airflow-conceitos | airflow-dag-desenvolvimento | airflow-pipelines-streaming | arquitetura-medalhao | pipeline-de-dados | gcp-bigquery | spark | gcp-cloud-composer | airflow-aws-mwaa