Cloud Composer é o serviço de orquestração de workflows do GCP baseado em Apache Airflow. Elimina a necessidade de instalar, configurar e operar o Airflow manualmente.
É a escolha natural para orquestrar pipelines que envolvem serviços GCP (BigQuery, GCS, Dataflow, etc.), pois inclui providers nativos e integração com IAM.
Versões
| Versão | Base | Infra | Diferencial |
|---|---|---|---|
| Composer 2 | Airflow 2.x | GKE Autopilot | Auto-scaling de workers, ambiente mais leve |
| Composer 3 | Airflow 2.x | Serverless (sem GKE visível) | Totalmente gerenciado, startup mais rápido |
Composer 3 é o padrão atual. Composer 1 (Airflow 1.x) está em fim de vida.
Arquitetura interna
graph LR subgraph Composer3["Cloud Composer 3"] S[Scheduler] W[Webserver] WK[Workers] end GCS[(GCS Bucket)] --> S GCS --> WK
DAGs são arquivos Python copiados para o bucket do ambiente. O scheduler detecta mudanças automaticamente.
Operadores nativos GCP
O apache-airflow-providers-google inclui centenas de operadores. Os mais usados em engenharia de dados:
BigQuery
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryInsertJobOperator,
BigQueryCreateEmptyTableOperator,
BigQueryDeleteTableOperator,
)
executar_query = BigQueryInsertJobOperator(
task_id="executar_query",
configuration={
"query": {
"query": "SELECT * FROM `projeto.dataset.tabela`",
"useLegacySql": False,
}
},
project_id="meu-projeto",
)Cloud Storage
from airflow.providers.google.cloud.operators.gcs import (
GCSCreateBucketOperator,
GCSDeleteObjectsOperator,
GCSSynchronizeBucketsOperator,
)
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
mover_arquivo = GCSToGCSOperator(
task_id="mover_arquivo",
source_bucket="bucket-origem",
source_object="raw/arquivo.csv",
destination_bucket="bucket-destino",
destination_object="processed/arquivo.csv",
move_object=True,
)
carregar_bq = GCSToBigQueryOperator(
task_id="carregar_bq",
bucket="meu-bucket",
source_objects=["raw/*.parquet"],
destination_project_dataset_table="projeto.dataset.tabela",
source_format="PARQUET",
write_disposition="WRITE_TRUNCATE",
)Dataflow
from airflow.providers.google.cloud.operators.dataflow import (
DataflowCreatePythonJobOperator,
DataflowTemplatedJobStartOperator,
)Variáveis e conexões
- Variables:
Admin > Variablesna UI. Acesso viaVariable.get("chave") - Connections: credenciais para sistemas externos. A conexão
google_cloud_defaultusa a Service Account do ambiente automaticamente.
Boas práticas com DAGs
from airflow import DAG
from airflow.utils.dates import days_ago
from datetime import timedelta
with DAG(
dag_id="pipeline_dados",
start_date=days_ago(1),
schedule_interval="0 6 * * *", # todo dia às 6h UTC
catchup=False, # não executa runs passados
default_args={
"retries": 2,
"retry_delay": timedelta(minutes=5),
},
tags=["dados", "bigquery"],
) as dag:
...catchup=Falseevita execução em cascata de runs históricos- Usar
tagspara filtrar DAGs na UI - Evitar lógica pesada no nível do módulo (executada no parse, não na task)
Autenticação
O Cloud Composer usa uma Service Account própria do ambiente. Conceder ao SA as roles necessárias no IAM do projeto (ex: roles/bigquery.dataEditor, roles/storage.objectAdmin).
Ver também: gcp | gcp-bigquery | gcp-cloud-storage | gcp-dataflow | gcp-boas-praticas | pipeline-de-dados