Structured Streaming é o modelo de processamento de fluxos do Spark, disponível desde o Spark 2.0. Trata um stream de dados como uma tabela que cresce continuamente: o mesmo código e as mesmas APIs do DataFrame funcionam em batch e em streaming, com garantias de exactly-once.

Modelo conceitual

graph LR
    subgraph Fonte de dados infinita
        E1[evento]
        E2[evento]
        E3[...]
    end

    subgraph Spark Structured Streaming
        T[Tabela de entrada<br/>ilimitada]
        Q[Query<br/>DataFrame API]
        R[Tabela de resultado]
    end

    subgraph Sink
        S[Kafka / GCS / BQ / Delta]
    end

    E1 & E2 & E3 --> T
    T --> Q --> R --> S

A engine processa novos dados periodicamente (micro-batch) ou de forma contínua, atualizando a tabela de resultado conforme o output mode configurado. O estado intermediário e os offsets são salvos em checkpoints para garantia de exatamente-uma-vez.

Fontes suportadas

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
 
spark = SparkSession.builder.appName("streaming-app").getOrCreate()
 
# Kafka (fonte mais comum em produção)
df_kafka = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
    .option("subscribe",        "topico-pedidos")
    .option("startingOffsets",  "latest")   # "earliest" para reprocessar desde o início
    .option("maxOffsetsPerTrigger", "10000")
    .load()
)
 
# O payload do Kafka vem como bytes em key e value; deserializar manualmente
schema_pedido = StructType([
    StructField("pedido_id",  StringType()),
    StructField("cliente_id", StringType()),
    StructField("valor",      DoubleType()),
    StructField("event_time", StringType()),
])
 
df_pedidos = (
    df_kafka
    .select(F.from_json(F.col("value").cast("string"), schema_pedido).alias("d"))
    .select("d.*")
    .withColumn("event_ts", F.to_timestamp("event_time"))
)
 
# Delta Lake como fonte de streaming (Change Data Feed)
df_delta = (
    spark.readStream
    .format("delta")
    .option("readChangeFeed", "true")
    .load("gs://lake/bronze/pedidos/")
)
 
# Cloud Storage / GCS: arquivos chegando continuamente
df_gcs = (
    spark.readStream
    .format("parquet")
    .schema(schema_pedido)
    .load("gs://lake/landing/pedidos/")
)
 
# Rate source: geração sintética para testes de carga
df_rate = (
    spark.readStream
    .format("rate")
    .option("rowsPerSecond", 1000)
    .load()
)

Triggers

O trigger controla a frequência com que o Spark processa novos dados:

from pyspark.sql.streaming import Trigger
 
# ProcessingTime: processa a cada N segundos (micro-batch com intervalo fixo)
query = df.writeStream \
    .trigger(Trigger.ProcessingTime("30 seconds")) \
    .start(...)
 
# Contínuo, sem pausa: processa o próximo batch assim que o anterior termina
query = df.writeStream \
    .trigger(Trigger.ProcessingTime("0 seconds")) \
    .start(...)
 
# Once: processa todos os dados disponíveis uma única vez e para
# Substitui um job batch agendado com a semântica de streaming
query = df.writeStream \
    .trigger(Trigger.Once()) \
    .start(...)
 
# AvailableNow (Spark 3.3+): como Once, mas processa em múltiplos micro-batches
# Recomendado em vez de Once para datasets maiores
query = df.writeStream \
    .trigger(Trigger.AvailableNow()) \
    .start(...)

Trigger.AvailableNow() é a evolução moderna do Trigger.Once() para jobs agendados que usam a semântica de streaming sem manter processo rodando continuamente.

Output Modes

Controla quais linhas são escritas no sink a cada micro-batch:

ModoComportamentoCompatível com
appendApenas novas linhas adicionadas desde o último batchQueries sem agregação ou com watermark
completeToda a tabela de resultado é reescrita a cada batchAgregações sem watermark (estado completo)
updateApenas linhas adicionadas ou modificadas desde o último batchAgregações com ou sem watermark
# Append: dados brutos para um data lake
query = (
    df_pedidos
    .writeStream
    .outputMode("append")
    .format("delta")
    .option("path",               "gs://lake/bronze/pedidos/")
    .option("checkpointLocation", "gs://lake/checkpoints/pedidos-bronze/")
    .start()
)
 
# Complete: contagem acumulada em memória (para consulta ad-hoc em dev)
contagem = df_pedidos.groupBy("cliente_id").count()
query = (
    contagem
    .writeStream
    .outputMode("complete")
    .format("memory")
    .queryName("contagem_live")
    .start()
)
spark.sql("SELECT * FROM contagem_live ORDER BY count DESC LIMIT 10").show()

Watermarks

Watermarks permitem ao Spark liberar o estado de janelas antigas após um tempo configurável, tratando eventos atrasados de forma controlada:

# Definir watermark de 10 minutos sobre a coluna de event time
df_com_watermark = (
    df_pedidos
    .withWatermark("event_ts", "10 minutes")
)

O watermark funciona assim:

Tempo máximo de evento observado até agora: 14:50
Watermark de 10 minutos: 14:40

Eventos com event_time >= 14:40: aceitos e incluídos nas agregações
Eventos com event_time < 14:40:  considerados atrasados demais, descartados
Estado de janelas anteriores a 14:40: pode ser liberado da memória

Janelas deslizantes com watermark

resultado = (
    df_com_watermark
    .groupBy(
        # janela de 5 minutos que avança a cada 1 minuto
        F.window("event_ts", "5 minutes", "1 minute"),
        "regiao"
    )
    .agg(
        F.count("*").alias("qtd"),
        F.sum("valor").alias("receita")
    )
)
 
query = (
    resultado
    .writeStream
    .outputMode("append")  # com watermark, o append mode é possível
    .format("delta")
    .option("checkpointLocation", "gs://lake/checkpoints/janelas/")
    .start("gs://lake/silver/pedidos-janelas/")
)

Sinks suportados

# Kafka: produzir resultados de volta a um tópico
query = (
    resultado
    .select(F.to_json(F.struct("*")).alias("value"))
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:9092")
    .option("topic",               "topico-resultado")
    .option("checkpointLocation",  "gs://lake/checkpoints/kafka-out/")
    .outputMode("append")
    .start()
)
 
# BigQuery via conector GCP
query = (
    df_pedidos
    .writeStream
    .format("bigquery")
    .option("table",               "projeto.dataset.pedidos_stream")
    .option("checkpointLocation",  "gs://lake/checkpoints/bq/")
    .outputMode("append")
    .start()
)
 
# Delta Lake
query = (
    df_pedidos
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "gs://lake/checkpoints/delta-pedidos/")
    .start("gs://lake/bronze/pedidos/")
)
 
# ForeachBatch: lógica customizada por batch (útil para sinks não nativos)
def escrever_batch(df_batch, batch_id):
    df_batch.persist()
    df_batch.write.format("delta").mode("append").save("gs://lake/bronze/")
    df_batch.filter("valor > 1000").write.format("bigquery") \
        .option("table", "projeto.dataset.alertas").mode("append").save()
    df_batch.unpersist()
 
query = (
    df_pedidos
    .writeStream
    .foreachBatch(escrever_batch)
    .option("checkpointLocation", "gs://lake/checkpoints/foreach/")
    .start()
)
 
# Console (somente desenvolvimento)
query = df_pedidos.writeStream.outputMode("append").format("console").start()

Checkpoints

O checkpoint garante exactly-once e permite retomar de onde parou:

checkpointLocation/
  commits/     <- quais batches foram confirmados no sink
  offsets/     <- posição das fontes (ex: offsets do Kafka) por batch
  state/       <- estado das operações stateful (agregações, deduplicação)
  metadata     <- metadados da query

O checkpointLocation deve ser um path persistente acessível por todos os Executors (GCS, S3, ADLS). Nunca use paths locais em produção. Alterar o schema da query ou a lógica de transformação pode invalidar o checkpoint existente.

Operações Stateful

Deduplicação

df_unico = (
    df_pedidos
    .withWatermark("event_ts", "1 hour")
    .dropDuplicates(["pedido_id"])  # remove duplicatas dentro da janela do watermark
)

Joins entre dois streams

pedidos_s   = pedidos.withWatermark("pedido_ts",   "10 minutes")
pagamentos_s = pagamentos.withWatermark("pgto_ts",  "10 minutes")
 
resultado = pedidos_s.join(
    pagamentos_s,
    on="pedido_id",
    how="inner"
)

flatMapGroupsWithState: estado arbitrário por chave

Para lógica de estado que vai além de agregações de janela, como sessões de usuário ou máquinas de estado:

from pyspark.sql.streaming.state import GroupStateTimeout
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
 
schema_estado = StructType([
    StructField("total",    DoubleType()),
    StructField("contagem", LongType()),
])
 
schema_saida = StructType([
    StructField("cliente_id", StringType()),
    StructField("total",      DoubleType()),
    StructField("contagem",   LongType()),
])
 
def acumular_sessao(cliente_id, eventos, estado):
    sessao = estado.get if estado.exists else {"total": 0.0, "contagem": 0}
    for evento in eventos:
        sessao["total"]    += evento["valor"]
        sessao["contagem"] += 1
    estado.update(sessao)
    yield (cliente_id, sessao["total"], sessao["contagem"])
 
resultado = (
    df_pedidos
    .groupBy("cliente_id")
    .flatMapGroupsWithState(
        outputMode="append",
        timeoutConf=GroupStateTimeout.ProcessingTimeTimeout
    )(acumular_sessao)
)

Monitorar queries em execução

# Listar queries ativas
spark.streams.active
 
# Aguardar término (bloqueia o processo principal; necessário para jobs de longa duração)
query.awaitTermination()
 
# Progresso do último micro-batch processado
import json
print(json.dumps(query.lastProgress, indent=2))
 
# Status atual da query
print(query.status)
 
# Parar query graciosamente
query.stop()

Campos relevantes em lastProgress:

  • processedRowsPerSecond: throughput atual
  • inputRowsPerSecond: taxa de chegada dos dados na fonte
  • durationMs: tempo de cada fase do micro-batch (scheduling, get-offset, add-batch, commit)

Ver também: spark | spark-arquitetura | spark-apis | spark-performance | spark-troubleshooting