Structured Streaming é o modelo de processamento de fluxos do Spark, disponível desde o Spark 2.0. Trata um stream de dados como uma tabela que cresce continuamente: o mesmo código e as mesmas APIs do DataFrame funcionam em batch e em streaming, com garantias de exactly-once.
Modelo conceitual
graph LR subgraph Fonte de dados infinita E1[evento] E2[evento] E3[...] end subgraph Spark Structured Streaming T[Tabela de entrada<br/>ilimitada] Q[Query<br/>DataFrame API] R[Tabela de resultado] end subgraph Sink S[Kafka / GCS / BQ / Delta] end E1 & E2 & E3 --> T T --> Q --> R --> S
A engine processa novos dados periodicamente (micro-batch) ou de forma contínua, atualizando a tabela de resultado conforme o output mode configurado. O estado intermediário e os offsets são salvos em checkpoints para garantia de exatamente-uma-vez.
Fontes suportadas
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
spark = SparkSession.builder.appName("streaming-app").getOrCreate()
# Kafka (fonte mais comum em produção)
df_kafka = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("subscribe", "topico-pedidos")
.option("startingOffsets", "latest") # "earliest" para reprocessar desde o início
.option("maxOffsetsPerTrigger", "10000")
.load()
)
# O payload do Kafka vem como bytes em key e value; deserializar manualmente
schema_pedido = StructType([
StructField("pedido_id", StringType()),
StructField("cliente_id", StringType()),
StructField("valor", DoubleType()),
StructField("event_time", StringType()),
])
df_pedidos = (
df_kafka
.select(F.from_json(F.col("value").cast("string"), schema_pedido).alias("d"))
.select("d.*")
.withColumn("event_ts", F.to_timestamp("event_time"))
)
# Delta Lake como fonte de streaming (Change Data Feed)
df_delta = (
spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.load("gs://lake/bronze/pedidos/")
)
# Cloud Storage / GCS: arquivos chegando continuamente
df_gcs = (
spark.readStream
.format("parquet")
.schema(schema_pedido)
.load("gs://lake/landing/pedidos/")
)
# Rate source: geração sintética para testes de carga
df_rate = (
spark.readStream
.format("rate")
.option("rowsPerSecond", 1000)
.load()
)Triggers
O trigger controla a frequência com que o Spark processa novos dados:
from pyspark.sql.streaming import Trigger
# ProcessingTime: processa a cada N segundos (micro-batch com intervalo fixo)
query = df.writeStream \
.trigger(Trigger.ProcessingTime("30 seconds")) \
.start(...)
# Contínuo, sem pausa: processa o próximo batch assim que o anterior termina
query = df.writeStream \
.trigger(Trigger.ProcessingTime("0 seconds")) \
.start(...)
# Once: processa todos os dados disponíveis uma única vez e para
# Substitui um job batch agendado com a semântica de streaming
query = df.writeStream \
.trigger(Trigger.Once()) \
.start(...)
# AvailableNow (Spark 3.3+): como Once, mas processa em múltiplos micro-batches
# Recomendado em vez de Once para datasets maiores
query = df.writeStream \
.trigger(Trigger.AvailableNow()) \
.start(...)Trigger.AvailableNow() é a evolução moderna do Trigger.Once() para jobs agendados que usam a semântica de streaming sem manter processo rodando continuamente.
Output Modes
Controla quais linhas são escritas no sink a cada micro-batch:
| Modo | Comportamento | Compatível com |
|---|---|---|
| append | Apenas novas linhas adicionadas desde o último batch | Queries sem agregação ou com watermark |
| complete | Toda a tabela de resultado é reescrita a cada batch | Agregações sem watermark (estado completo) |
| update | Apenas linhas adicionadas ou modificadas desde o último batch | Agregações com ou sem watermark |
# Append: dados brutos para um data lake
query = (
df_pedidos
.writeStream
.outputMode("append")
.format("delta")
.option("path", "gs://lake/bronze/pedidos/")
.option("checkpointLocation", "gs://lake/checkpoints/pedidos-bronze/")
.start()
)
# Complete: contagem acumulada em memória (para consulta ad-hoc em dev)
contagem = df_pedidos.groupBy("cliente_id").count()
query = (
contagem
.writeStream
.outputMode("complete")
.format("memory")
.queryName("contagem_live")
.start()
)
spark.sql("SELECT * FROM contagem_live ORDER BY count DESC LIMIT 10").show()Watermarks
Watermarks permitem ao Spark liberar o estado de janelas antigas após um tempo configurável, tratando eventos atrasados de forma controlada:
# Definir watermark de 10 minutos sobre a coluna de event time
df_com_watermark = (
df_pedidos
.withWatermark("event_ts", "10 minutes")
)O watermark funciona assim:
Tempo máximo de evento observado até agora: 14:50
Watermark de 10 minutos: 14:40
Eventos com event_time >= 14:40: aceitos e incluídos nas agregações
Eventos com event_time < 14:40: considerados atrasados demais, descartados
Estado de janelas anteriores a 14:40: pode ser liberado da memória
Janelas deslizantes com watermark
resultado = (
df_com_watermark
.groupBy(
# janela de 5 minutos que avança a cada 1 minuto
F.window("event_ts", "5 minutes", "1 minute"),
"regiao"
)
.agg(
F.count("*").alias("qtd"),
F.sum("valor").alias("receita")
)
)
query = (
resultado
.writeStream
.outputMode("append") # com watermark, o append mode é possível
.format("delta")
.option("checkpointLocation", "gs://lake/checkpoints/janelas/")
.start("gs://lake/silver/pedidos-janelas/")
)Sinks suportados
# Kafka: produzir resultados de volta a um tópico
query = (
resultado
.select(F.to_json(F.struct("*")).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("topic", "topico-resultado")
.option("checkpointLocation", "gs://lake/checkpoints/kafka-out/")
.outputMode("append")
.start()
)
# BigQuery via conector GCP
query = (
df_pedidos
.writeStream
.format("bigquery")
.option("table", "projeto.dataset.pedidos_stream")
.option("checkpointLocation", "gs://lake/checkpoints/bq/")
.outputMode("append")
.start()
)
# Delta Lake
query = (
df_pedidos
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "gs://lake/checkpoints/delta-pedidos/")
.start("gs://lake/bronze/pedidos/")
)
# ForeachBatch: lógica customizada por batch (útil para sinks não nativos)
def escrever_batch(df_batch, batch_id):
df_batch.persist()
df_batch.write.format("delta").mode("append").save("gs://lake/bronze/")
df_batch.filter("valor > 1000").write.format("bigquery") \
.option("table", "projeto.dataset.alertas").mode("append").save()
df_batch.unpersist()
query = (
df_pedidos
.writeStream
.foreachBatch(escrever_batch)
.option("checkpointLocation", "gs://lake/checkpoints/foreach/")
.start()
)
# Console (somente desenvolvimento)
query = df_pedidos.writeStream.outputMode("append").format("console").start()Checkpoints
O checkpoint garante exactly-once e permite retomar de onde parou:
checkpointLocation/
commits/ <- quais batches foram confirmados no sink
offsets/ <- posição das fontes (ex: offsets do Kafka) por batch
state/ <- estado das operações stateful (agregações, deduplicação)
metadata <- metadados da query
O checkpointLocation deve ser um path persistente acessível por todos os Executors (GCS, S3, ADLS). Nunca use paths locais em produção. Alterar o schema da query ou a lógica de transformação pode invalidar o checkpoint existente.
Operações Stateful
Deduplicação
df_unico = (
df_pedidos
.withWatermark("event_ts", "1 hour")
.dropDuplicates(["pedido_id"]) # remove duplicatas dentro da janela do watermark
)Joins entre dois streams
pedidos_s = pedidos.withWatermark("pedido_ts", "10 minutes")
pagamentos_s = pagamentos.withWatermark("pgto_ts", "10 minutes")
resultado = pedidos_s.join(
pagamentos_s,
on="pedido_id",
how="inner"
)flatMapGroupsWithState: estado arbitrário por chave
Para lógica de estado que vai além de agregações de janela, como sessões de usuário ou máquinas de estado:
from pyspark.sql.streaming.state import GroupStateTimeout
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
schema_estado = StructType([
StructField("total", DoubleType()),
StructField("contagem", LongType()),
])
schema_saida = StructType([
StructField("cliente_id", StringType()),
StructField("total", DoubleType()),
StructField("contagem", LongType()),
])
def acumular_sessao(cliente_id, eventos, estado):
sessao = estado.get if estado.exists else {"total": 0.0, "contagem": 0}
for evento in eventos:
sessao["total"] += evento["valor"]
sessao["contagem"] += 1
estado.update(sessao)
yield (cliente_id, sessao["total"], sessao["contagem"])
resultado = (
df_pedidos
.groupBy("cliente_id")
.flatMapGroupsWithState(
outputMode="append",
timeoutConf=GroupStateTimeout.ProcessingTimeTimeout
)(acumular_sessao)
)Monitorar queries em execução
# Listar queries ativas
spark.streams.active
# Aguardar término (bloqueia o processo principal; necessário para jobs de longa duração)
query.awaitTermination()
# Progresso do último micro-batch processado
import json
print(json.dumps(query.lastProgress, indent=2))
# Status atual da query
print(query.status)
# Parar query graciosamente
query.stop()Campos relevantes em lastProgress:
processedRowsPerSecond: throughput atualinputRowsPerSecond: taxa de chegada dos dados na fontedurationMs: tempo de cada fase do micro-batch (scheduling, get-offset, add-batch, commit)
Ver também: spark | spark-arquitetura | spark-apis | spark-performance | spark-troubleshooting