Cloud Composer é o serviço de orquestração de workflows do GCP baseado em Apache Airflow. Elimina a necessidade de instalar, configurar e operar o Airflow manualmente.

É a escolha natural para orquestrar pipelines que envolvem serviços GCP (BigQuery, GCS, Dataflow, etc.), pois inclui providers nativos e integração com IAM.

Versões

VersãoBaseInfraDiferencial
Composer 2Airflow 2.xGKE AutopilotAuto-scaling de workers, ambiente mais leve
Composer 3Airflow 2.xServerless (sem GKE visível)Totalmente gerenciado, startup mais rápido

Composer 3 é o padrão atual. Composer 1 (Airflow 1.x) está em fim de vida.

Arquitetura interna

graph LR
    subgraph Composer3["Cloud Composer 3"]
        S[Scheduler]
        W[Webserver]
        WK[Workers]
    end
    GCS[(GCS Bucket)] --> S
    GCS --> WK

DAGs são arquivos Python copiados para o bucket do ambiente. O scheduler detecta mudanças automaticamente.

Operadores nativos GCP

O apache-airflow-providers-google inclui centenas de operadores. Os mais usados em engenharia de dados:

BigQuery

from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryInsertJobOperator,
    BigQueryCreateEmptyTableOperator,
    BigQueryDeleteTableOperator,
)
 
executar_query = BigQueryInsertJobOperator(
    task_id="executar_query",
    configuration={
        "query": {
            "query": "SELECT * FROM `projeto.dataset.tabela`",
            "useLegacySql": False,
        }
    },
    project_id="meu-projeto",
)

Cloud Storage

from airflow.providers.google.cloud.operators.gcs import (
    GCSCreateBucketOperator,
    GCSDeleteObjectsOperator,
    GCSSynchronizeBucketsOperator,
)
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
 
mover_arquivo = GCSToGCSOperator(
    task_id="mover_arquivo",
    source_bucket="bucket-origem",
    source_object="raw/arquivo.csv",
    destination_bucket="bucket-destino",
    destination_object="processed/arquivo.csv",
    move_object=True,
)
 
carregar_bq = GCSToBigQueryOperator(
    task_id="carregar_bq",
    bucket="meu-bucket",
    source_objects=["raw/*.parquet"],
    destination_project_dataset_table="projeto.dataset.tabela",
    source_format="PARQUET",
    write_disposition="WRITE_TRUNCATE",
)

Dataflow

from airflow.providers.google.cloud.operators.dataflow import (
    DataflowCreatePythonJobOperator,
    DataflowTemplatedJobStartOperator,
)

Variáveis e conexões

  • Variables: Admin > Variables na UI. Acesso via Variable.get("chave")
  • Connections: credenciais para sistemas externos. A conexão google_cloud_default usa a Service Account do ambiente automaticamente.

Boas práticas com DAGs

from airflow import DAG
from airflow.utils.dates import days_ago
from datetime import timedelta
 
with DAG(
    dag_id="pipeline_dados",
    start_date=days_ago(1),
    schedule_interval="0 6 * * *",      # todo dia às 6h UTC
    catchup=False,                        # não executa runs passados
    default_args={
        "retries": 2,
        "retry_delay": timedelta(minutes=5),
    },
    tags=["dados", "bigquery"],
) as dag:
    ...
  • catchup=False evita execução em cascata de runs históricos
  • Usar tags para filtrar DAGs na UI
  • Evitar lógica pesada no nível do módulo (executada no parse, não na task)

Autenticação

O Cloud Composer usa uma Service Account própria do ambiente. Conceder ao SA as roles necessárias no IAM do projeto (ex: roles/bigquery.dataEditor, roles/storage.objectAdmin).

Ver também: gcp | gcp-bigquery | gcp-cloud-storage | gcp-dataflow | gcp-boas-praticas | pipeline-de-dados