Pipelines batch processam conjuntos de dados em intervalos definidos. O Airflow é otimizado para esse padrão: cada DagRun processa uma janela de tempo específica (o ds), garantindo idempotência e rastreabilidade.
Pipeline 1: Ingestão incremental de API para Data Lake
Busca dados de uma API REST e salva no GCS/S3 particionado por data.
from airflow.decorators import dag, task
from datetime import datetime, timedelta
import requests, json
@dag(
dag_id="ingestao_api_pedidos",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=True, # True para backfill histórico
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=10),
"retry_exponential_backoff": True,
},
tags=["ingestao", "api", "pedidos"],
)
def ingestao_api_pedidos():
@task
def extrair_pedidos(data_ref: str) -> str:
"""Busca pedidos da API para a data de referência."""
response = requests.get(
"https://api.empresa.com/v1/pedidos",
params={"data": data_ref, "pagina": 1, "limite": 10000},
headers={"Authorization": "Bearer {{ var.value.api_token }}"},
timeout=30,
)
response.raise_for_status()
caminho_local = f"/tmp/pedidos_{data_ref.replace('-', '')}.json"
with open(caminho_local, "w") as f:
json.dump(response.json(), f)
return caminho_local
@task
def validar_e_transformar(caminho_local: str, data_ref: str) -> str:
import pandas as pd
df = pd.read_json(caminho_local)
assert len(df) > 0, f"Nenhum registro encontrado para {data_ref}"
assert "pedido_id" in df.columns, "Coluna pedido_id ausente"
df["data_carga"] = datetime.utcnow()
df["data_ref"] = data_ref
caminho_parquet = f"/tmp/pedidos_{data_ref.replace('-', '')}.parquet"
df.to_parquet(caminho_parquet, index=False)
return caminho_parquet
@task
def carregar_gcs(caminho_parquet: str, data_ref: str) -> None:
from airflow.providers.google.cloud.hooks.gcs import GCSHook
hook = GCSHook(gcp_conn_id="google_cloud_default")
data_formatada = data_ref.replace("-", "")
hook.upload(
bucket_name="data-lake-prod",
object_name=f"bronze/pedidos/dt={data_formatada}/pedidos.parquet",
filename=caminho_parquet,
mime_type="application/octet-stream",
)
@task
def limpar_temporarios(caminhos: list[str]) -> None:
import os
for caminho in caminhos:
if os.path.exists(caminho):
os.remove(caminho)
data_ref = "{{ ds }}"
arquivo_json = extrair_pedidos(data_ref=data_ref)
arquivo_parquet = validar_e_transformar(caminho_local=arquivo_json, data_ref=data_ref)
carregar_gcs(caminho_parquet=arquivo_parquet, data_ref=data_ref)
limpar_temporarios(caminhos=[arquivo_json, arquivo_parquet])
ingestao_api_pedidos()Pipeline 2: ETL completo com arquitetura medalhão (BigQuery)
Processa dados da camada Bronze para Silver e Gold no BigQuery.
from airflow.decorators import dag, task
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta
PROJECT = "meu-projeto-gcp"
DATASET_BRONZE = "bronze"
DATASET_SILVER = "silver"
DATASET_GOLD = "gold"
@dag(
dag_id="etl_pedidos_medallion",
start_date=datetime(2026, 1, 1),
schedule="0 3 * * *", # todo dia às 03:00 UTC
catchup=False,
default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
tags=["etl", "bigquery", "medallion"],
)
def etl_pedidos_medallion():
with TaskGroup("bronze_para_silver") as silver_tasks:
limpar_silver = BigQueryInsertJobOperator(
task_id="limpar_silver_pedidos",
configuration={
"query": {
"query": f"""
DELETE FROM `{PROJECT}.{DATASET_SILVER}.pedidos`
WHERE DATE(data_pedido) = '{{{{ ds }}}}'
""",
"useLegacySql": False,
}
},
)
transformar_silver = BigQueryInsertJobOperator(
task_id="transformar_silver_pedidos",
configuration={
"query": {
"query": f"""
INSERT INTO `{PROJECT}.{DATASET_SILVER}.pedidos`
SELECT
pedido_id,
cliente_id,
CAST(valor_total AS NUMERIC) AS valor_total,
LOWER(TRIM(status)) AS status,
TIMESTAMP(data_pedido) AS data_pedido,
DATE(data_pedido) AS data_ref,
CURRENT_TIMESTAMP() AS atualizado_em
FROM `{PROJECT}.{DATASET_BRONZE}.pedidos`
WHERE DATE(_PARTITIONTIME) = '{{{{ ds }}}}'
AND pedido_id IS NOT NULL
AND valor_total > 0
""",
"useLegacySql": False,
}
},
)
limpar_silver >> transformar_silver
with TaskGroup("silver_para_gold") as gold_tasks:
calcular_ltv = BigQueryInsertJobOperator(
task_id="calcular_ltv_clientes",
configuration={
"query": {
"query": f"""
CREATE OR REPLACE TABLE `{PROJECT}.{DATASET_GOLD}.ltv_clientes` AS
SELECT
cliente_id,
COUNT(DISTINCT pedido_id) AS qtd_pedidos,
SUM(valor_total) AS ltv_total,
AVG(valor_total) AS ticket_medio,
MIN(data_pedido) AS primeira_compra,
MAX(data_pedido) AS ultima_compra,
DATE_DIFF(MAX(data_pedido), MIN(data_pedido), DAY) AS dias_ativo
FROM `{PROJECT}.{DATASET_SILVER}.pedidos`
WHERE status = 'concluido'
GROUP BY 1
""",
"useLegacySql": False,
}
},
)
@task
def validar_contagens() -> None:
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
hook = BigQueryHook(gcp_conn_id="google_cloud_default")
client = hook.get_client()
query = f"""
SELECT
(SELECT COUNT(*) FROM `{PROJECT}.{DATASET_BRONZE}.pedidos`
WHERE DATE(_PARTITIONTIME) = '{{{{ ds }}}}') AS bronze_count,
(SELECT COUNT(*) FROM `{PROJECT}.{DATASET_SILVER}.pedidos`
WHERE data_ref = '{{{{ ds }}}}') AS silver_count
"""
resultado = list(client.query(query))[0]
bronze = resultado["bronze_count"]
silver = resultado["silver_count"]
if silver == 0:
raise ValueError(f"Silver vazio! Bronze tinha {bronze} registros.")
if silver < bronze * 0.95:
raise ValueError(f"Perda > 5%: bronze={bronze}, silver={silver}")
silver_tasks >> gold_tasks >> validar_contagens()
etl_pedidos_medallion()Pipeline 3: Spark na arquitetura medalhão (AWS + EMR)
Dispara um job Spark via EMR para processar dados em larga escala.
from airflow.decorators import dag, task
from airflow.providers.amazon.aws.operators.emr import EmrAddStepsOperator
from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from datetime import datetime, timedelta
EMR_CLUSTER_ID = "j-XXXXXXXXXXXXX"
S3_BUCKET = "data-lake-prod"
S3_SCRIPTS = f"s3://{S3_BUCKET}/scripts/spark"
@dag(
dag_id="spark_emr_pedidos",
start_date=datetime(2026, 1, 1),
schedule="0 4 * * *",
catchup=False,
default_args={"retries": 1, "retry_delay": timedelta(minutes=15)},
tags=["spark", "emr", "batch"],
)
def spark_emr_pedidos():
# 1. Aguardar arquivo de ingestão
aguardar_raw = S3KeySensor(
task_id="aguardar_raw",
bucket_name=S3_BUCKET,
bucket_key="raw/pedidos/{{ ds_nodash }}/_SUCCESS",
aws_conn_id="aws_default",
mode="reschedule",
poke_interval=300,
timeout=7200,
)
# 2. Submeter step Spark no EMR
submeter_spark = EmrAddStepsOperator(
task_id="submeter_spark",
job_flow_id=EMR_CLUSTER_ID,
steps=[
{
"Name": "transformar-pedidos-{{ ds }}",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode", "cluster",
"--driver-memory", "4g",
"--executor-memory", "8g",
"--executor-cores", "4",
"--num-executors", "10",
f"{S3_SCRIPTS}/transformar_pedidos.py",
"--data-date", "{{ ds }}",
"--input-path", f"s3://{S3_BUCKET}/raw/pedidos/{{{{ ds_nodash }}}}/",
"--output-path", f"s3://{S3_BUCKET}/silver/pedidos/",
],
},
}
],
aws_conn_id="aws_default",
)
# 3. Aguardar conclusão do step
aguardar_spark = EmrStepSensor(
task_id="aguardar_spark",
job_flow_id=EMR_CLUSTER_ID,
step_id="{{ task_instance.xcom_pull('submeter_spark')[0] }}",
aws_conn_id="aws_default",
mode="reschedule",
poke_interval=60,
)
# 4. Validar output
validar_output = S3KeySensor(
task_id="validar_output",
bucket_name=S3_BUCKET,
bucket_key="silver/pedidos/dt={{ ds_nodash }}/_SUCCESS",
aws_conn_id="aws_default",
mode="reschedule",
poke_interval=60,
timeout=300,
)
aguardar_raw >> submeter_spark >> aguardar_spark >> validar_output
spark_emr_pedidos()Pipeline 4: dbt + Airflow (orquestrar modelos dbt)
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
from datetime import datetime
DBT_PROJECT_DIR = "/opt/airflow/dbt/projeto"
DBT_PROFILES_DIR = "/opt/airflow/dbt/profiles"
@dag(
dag_id="dbt_transformacoes",
start_date=datetime(2026, 1, 1),
schedule="0 5 * * *",
catchup=False,
tags=["dbt", "transformacao"],
)
def dbt_transformacoes():
rodar_dbt = BashOperator(
task_id="rodar_dbt_run",
bash_command=f"""
cd {DBT_PROJECT_DIR} && \
dbt run \
--profiles-dir {DBT_PROFILES_DIR} \
--target prod \
--vars '{{"data_ref": "{{{{ ds }}}}"}}' \
--select tag:daily
""",
)
testar_dbt = BashOperator(
task_id="testar_dbt_test",
bash_command=f"""
cd {DBT_PROJECT_DIR} && \
dbt test \
--profiles-dir {DBT_PROFILES_DIR} \
--target prod \
--select tag:daily
""",
)
rodar_dbt >> testar_dbt
dbt_transformacoes()Pipeline 5: Backfill de múltiplas partições
Usa Dynamic Task Mapping para reprocessar várias datas em um único DagRun.
from airflow.decorators import dag, task
from datetime import datetime, date, timedelta
@dag(
dag_id="backfill_pedidos",
start_date=datetime(2026, 1, 1),
schedule=None, # disparo manual
catchup=False,
params={
"data_inicio": "2026-01-01",
"data_fim": "2026-01-31",
},
tags=["backfill", "utilitario"],
)
def backfill_pedidos():
@task
def gerar_datas(data_inicio: str, data_fim: str) -> list[str]:
inicio = date.fromisoformat(data_inicio)
fim = date.fromisoformat(data_fim)
datas = []
atual = inicio
while atual <= fim:
datas.append(atual.isoformat())
atual += timedelta(days=1)
return datas
@task(max_active_tis_per_dag=5) # processa até 5 datas em paralelo
def processar_data(data_ref: str) -> dict:
print(f"Reprocessando {data_ref}")
# lógica de reprocessamento aqui
return {"data": data_ref, "status": "ok"}
@task
def resumo(resultados: list[dict]) -> None:
sucesso = sum(1 for r in resultados if r["status"] == "ok")
print(f"Backfill concluído: {sucesso}/{len(resultados)} datas processadas")
datas = gerar_datas(
data_inicio="{{ params.data_inicio }}",
data_fim="{{ params.data_fim }}",
)
resultados = processar_data.expand(data_ref=datas)
resumo(resultados)
backfill_pedidos()Ver também: airflow | airflow-conceitos | airflow-dag-desenvolvimento | airflow-pipelines-streaming | arquitetura-medalhao | pipeline-de-dados | gcp-bigquery | spark | gcp-cloud-composer | airflow-aws-mwaa