Pub/Sub é o serviço de mensageria assíncrona e totalmente gerenciado do GCP. Implementa o padrão publish/subscribe: produtores publicam mensagens em tópicos e consumidores recebem essas mensagens por meio de subscrições, sem acoplamento direto entre eles.
Nasceu do sistema interno do Google chamado Millwheel e é a espinha dorsal de pipelines event-driven e ingestão de streaming na GCP.
Modelo de funcionamento
flowchart LR P1[Publisher A] --> T[("Tópico")] P2[Publisher B] --> T T --> S1["Subscription 1 - pull"] T --> S2["Subscription 2 - push"] S1 --> C1["Consumer 1 - Dataflow"] S2 --> C2["Consumer 2 - Cloud Function"]
- Tópico: canal lógico onde mensagens são publicadas. Um tópico pode ter múltiplas subscrições.
- Subscrição: vínculo entre um tópico e um ou mais consumidores. Cada subscrição recebe uma cópia independente das mensagens.
- Mensagem: dado publicado, composto por
data(bytes) eattributes(mapa de strings).
Modos de entrega
| Modo | Como funciona | Quando usar |
|---|---|---|
| Pull | Consumer chama a API para buscar mensagens | Processamento batch, controle de ritmo pelo consumer |
| Push | Pub/Sub entrega via HTTP POST em um endpoint | Cloud Functions, Cloud Run, webhooks |
| BigQuery | Entrega direta em tabela BigQuery sem consumer | Ingestão de eventos em warehouse |
| Cloud Storage | Agrega mensagens e salva objetos no GCS | Arquivamento de eventos |
Publicando mensagens
from google.cloud import pubsub_v1
import json
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("meu-projeto", "meu-topico")
payload = {"evento": "compra", "valor": 150.0, "usuario_id": "u123"}
data = json.dumps(payload).encode("utf-8")
future = publisher.publish(topic_path, data, origem="app-web", ambiente="prod")
print(f"Mensagem publicada: {future.result()}")Atributos (origem, ambiente) são metadados da mensagem, úteis para filtros em subscrições.
Consumindo mensagens via Pull
from google.cloud import pubsub_v1
subscriber = pubsub_v1.SubscriberClient()
subscription = subscriber.subscription_path("meu-projeto", "minha-subscription")
def processar(message):
payload = json.loads(message.data.decode("utf-8"))
print(f"Recebido: {payload}")
message.ack() # confirma processamento; sem ack a mensagem é reentregue
with subscriber:
streaming_pull = subscriber.subscribe(subscription, callback=processar)
streaming_pull.result(timeout=60)Garantias e comportamento
- At-least-once delivery: mensagens podem ser entregues mais de uma vez. O consumer deve ser idempotente.
- Ordering: desativado por padrão. Ativar com
ordering_keyna publicação eenable_message_orderingna subscrição quando a ordem importa. - Retenção: mensagens não confirmadas ficam retidas por até 7 dias (padrão: 7 dias, mínimo: 10 min).
- Dead Letter Topic (DLT): após N tentativas sem ack, a mensagem é encaminhada a um tópico de dead letter para análise.
# Configurar dead letter na subscrição (via Terraform ou API)
dead_letter_policy = pubsub_v1.types.DeadLetterPolicy(
dead_letter_topic="projects/meu-projeto/topics/topico-dlq",
max_delivery_attempts=5,
)Filtros de subscrição
Permitem que uma subscrição receba apenas mensagens com determinados atributos, sem lógica no consumer:
# Sintaxe de filtro
attributes.ambiente = "prod" AND attributes.tipo = "compra"
Útil para fazer fan-out seletivo a partir de um único tópico.
CLI
# Criar tópico
gcloud pubsub topics create meu-topico
# Criar subscrição pull
gcloud pubsub subscriptions create minha-sub --topic=meu-topico
# Publicar mensagem de teste
gcloud pubsub topics publish meu-topico --message='{"teste": true}'
# Consumir mensagens (pull manual)
gcloud pubsub subscriptions pull minha-sub --limit=10 --auto-ackPadrões em engenharia de dados
Ingestão de eventos em tempo real: aplicações publicam eventos no Pub/Sub; Dataflow consome e transforma em streaming para o BigQuery.
Fan-out: um único tópico alimenta múltiplos sistemas em paralelo (warehouse, data lake, sistema de alertas) via subscrições independentes.
Desacoplamento de pipelines: Cloud Function publica evento ao finalizar etapa; próxima etapa consome de forma assíncrona, sem dependência direta.
Notificações do GCS: objetos criados no Cloud Storage publicam automaticamente no Pub/Sub, disparando pipelines event-driven.
Integração com o ecossistema
- Dataflow: source e sink nativo para pipelines Beam streaming
- Cloud Functions: trigger via push subscription ou evento do Eventarc
- Cloud Composer:
PubSubPublishMessageOperator,PubSubPullSensor - BigQuery: subscrição com entrega direta em tabela (BigQuery subscription)
Ver também: gcp | gcp-dataflow | gcp-cloud-functions | gcp-cloud-storage | gcp-boas-praticas | pipeline-de-dados