Spark UI

A Spark UI é a primeira ferramenta de diagnóstico. Acessível em http://driver-host:4040 durante a execução do job (ou no histórico via Spark History Server):

Abas essenciais

AbaO que ver
JobsLista de jobs, status (running, succeeded, failed), duração total
StagesDAG de stages, tempo por stage, tasks com skew, volume de shuffle
StorageDataFrames em cache, memória utilizada por RDD/DataFrame
EnvironmentConfigurações ativas do Spark (confirmar se as configs foram aplicadas)
ExecutorsMemória usada, tempo de GC, tasks ativas, falhas por executor
SQLPlanos de execução com métricas reais, bytess lidos/escritos por operação

Métricas por Stage: o que significa cada número

Dentro de uma Stage, cada Task exibe:

  • Duration: tempo de execução da task
  • GC Time: tempo gasto em garbage collection (deve ser < 10% do Duration)
  • Input Size / Records: dados lidos da fonte ou shuffle read
  • Shuffle Write Size / Records: dados escritos para o próximo stage
  • Spill (Memory): dados que saíram da memória para disco (ruim)
  • Spill (Disk): dados lidos do spill em disco

Problemas comuns e diagnóstico

1. OOM no Driver (OutOfMemoryError: Java heap space)

Sintomas: job falha com java.lang.OutOfMemoryError no processo do Driver.

Causas e soluções:

# Causa: collect() de dados grandes (traz tudo para o Driver)
dados = df.collect()         # perigoso para DataFrames grandes
dados = df.take(1000)        # melhor: limitar a quantidade retornada
 
# Causa: broadcast de tabela maior que a memória do Driver
resultado = df.join(F.broadcast(df_grande), ...)
# Solução: aumentar memória do Driver ou não usar broadcast
spark.conf.set("spark.driver.memory", "8g")
spark.conf.set("spark.driver.maxResultSize", "2g")
 
# Causa: resultado de groupBy.count() acumulando no Driver em streaming (complete mode)
# Solução: usar update mode ou adicionar watermark + append mode
 
# Conferir limite de memória para results coletados pelo Driver
spark.conf.get("spark.driver.maxResultSize")  # padrão: 1g

2. OOM no Executor

Sintomas: ExecutorLostFailure, SparkException: Task failed while writing rows ou java.lang.OutOfMemoryError nos logs do Executor.

Causas e soluções:

# Causa: partição muito grande (não cabe na memória da task)
# Verificar: Spark UI > Stages > Tasks > "Input Size" (máx muito acima da média)
 
# Solução 1: aumentar memória do Executor
spark.conf.set("spark.executor.memory", "8g")
 
# Solução 2: aumentar overhead para PySpark (Python usa memória fora do heap JVM)
spark.conf.set("spark.executor.memoryOverhead", "2g")
 
# Solução 3: reparticionar para reduzir tamanho de cada partição
df = df.repartition(500)
 
# Causa: UDF Python com vazamento de memória ou objetos grandes
# Solução: usar Pandas UDFs que processam bateladas e liberam memória
 
# Causa: spill de shuffle para disco acumulando
# Solução: aumentar spark.memory.fraction ou reduzir shuffle.partitions
spark.conf.set("spark.memory.fraction", "0.75")

3. Data Skew

Sintomas: uma Stage demora muito mais que o esperado. No Spark UI > Stage > Tasks, uma ou poucas tasks têm duração muito maior que as outras.

Diagnóstico:

# Verificar distribuição de dados pela chave de join ou groupBy
df.groupBy("chave_problematica") \
  .count() \
  .orderBy(F.col("count").desc()) \
  .show(20)

Soluções:

# Solução 1: habilitar AQE skew handling (mais simples)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
 
# Solução 2: usar broadcast join se a tabela menor for pequena o suficiente
resultado = df_grande.join(F.broadcast(df_pequeno), on="chave")
 
# Solução 3: salting manual (para casos que o AQE não cobre)
# Ver spark-performance para a técnica completa de salting

4. GC Pressure

Sintomas: GC Time alto no Spark UI (> 10% do Duration), jobs lentos com uso alto de memória.

Causas e soluções:

# Causa: muitos objetos Java de curta duração na heap (common em UDFs Python old-style)
# Solução 1: usar Pandas UDFs ou funções nativas
 
# Solução 2: usar serialização Kryo (reduz o tamanho dos objetos na heap)
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
 
# Solução 3: usar off-heap memory (Tungsten bypassa o GC do JVM)
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "2g")
 
# Solução 4: aumentar memória do Executor (mais espaço = GC menos frequente)
spark.conf.set("spark.executor.memory", "8g")

5. Slow Stage por Shuffle Excessivo

Sintomas: no Spark UI, a stage tem Shuffle Write ou Shuffle Read com dezenas ou centenas de GBs.

Diagnóstico:

# Verificar plano de execução para entender quais operações geram shuffle
df.explain("formatted")
# Procurar por "Exchange hashpartitioning" ou "Exchange rangepartitioning" no plano

Soluções:

# Evitar sortBy desnecessário (usa shuffle global para ordenação total)
# Substituir por orderBy dentro de partições quando ordenação global não é necessária
 
# Reduzir número de colunas antes do groupBy/join (column pruning manual)
df_slim = df.select("coluna_chave", "coluna_valor")
resultado = df_slim.groupBy("coluna_chave").sum("coluna_valor")
 
# Usar broadcast para tabelas pequenas
# Pré-agregar antes do join (reduz volume que vai para o shuffle)
df_agg = df_detalhe.groupBy("chave").agg(F.sum("valor").alias("total"))
resultado = df_principal.join(df_agg, on="chave")

6. Spill para Disco

Sintomas: no Spark UI > Stage > Tasks, as colunas “Spill (Memory)” e “Spill (Disk)” mostram valores significativos.

Spill indica que a memória de execução foi insuficiente para uma operação (aggregation, sort, shuffle), forçando escrita temporária em disco.

Soluções:

# Aumentar fração de memória dedicada à execução
spark.conf.set("spark.memory.fraction", "0.75")
 
# Reduzir tamanho das partições (mais partições, cada uma menor)
spark.conf.set("spark.sql.shuffle.partitions", "400")
 
# Aumentar memória do Executor
spark.conf.set("spark.executor.memory", "8g")

7. Job Falha e Reinicia (Executor Lost / Task Killed)

Sintomas: tasks reiniciando repetidamente, mensagens de ExecutorLostFailure nos logs.

# Aumentar tentativas de task e executor antes de falhar o job
spark.conf.set("spark.task.maxFailures",                    "4")
spark.conf.set("spark.stage.maxConsecutiveAttempts",        "4")
spark.conf.set("spark.executor.heartbeatInterval",         "20s")
spark.conf.set("spark.network.timeout",                    "300s")
spark.conf.set("spark.rpc.message.maxSize",               "256")  # MB, para mensagens grandes

Logs e diagnóstico

# Habilitar log do Spark (nível INFO para ver mais detalhes)
import logging
logging.getLogger("pyspark").setLevel(logging.WARN)
 
# No cluster (via configuração)
spark.conf.set("spark.eventLog.enabled", "true")
spark.conf.set("spark.eventLog.dir",     "gs://meu-bucket/spark-logs/")

Logs no Dataproc (GCP)

# Ver logs de um job no Dataproc
gcloud dataproc jobs describe JOB_ID --region=REGIAO
 
# Logs detalhados de Executors via Cloud Logging
gcloud logging read "resource.type=cloud_dataproc_job AND resource.labels.job_id=JOB_ID"

Configurações de diagnóstico recomendadas

spark = SparkSession.builder \
    .config("spark.sql.adaptive.enabled",                       "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled",    "true") \
    .config("spark.sql.adaptive.skewJoin.enabled",              "true") \
    .config("spark.eventLog.enabled",                           "true") \
    .config("spark.eventLog.dir",                               "gs://bucket/spark-history/") \
    .config("spark.history.ui.port",                            "18080") \
    .config("spark.executor.memory",                            "4g") \
    .config("spark.executor.memoryOverhead",                    "1g") \
    .config("spark.driver.memory",                              "2g") \
    .config("spark.driver.maxResultSize",                       "1g") \
    .config("spark.sql.shuffle.partitions",                     "200") \
    .config("spark.task.maxFailures",                           "4") \
    .config("spark.network.timeout",                            "300s") \
    .getOrCreate()

Referência rápida de diagnóstico

Job lento?
  Abrir Spark UI > Jobs > encontrar stage mais lenta
  
Stage específica lenta?
  Tasks tab > comparar Duration (min/median/max)
  
max >> median?
  Skew → checar "Input Size" das tasks, usar AQE ou salting
  
GC Time alto (>10% do Duration)?
  Memória insuficiente → aumentar executor.memory ou usar Kryo
  
Spill > 0?
  Partições muito grandes → aumentar shuffle.partitions, aumentar memória
  
Shuffle Read/Write muito alto?
  Verificar se broadcast join pode ser usado
  Verificar plano com explain("formatted")
  
OOM no Driver?
  Remover collect() desnecessário, aumentar driver.memory

Ver também: spark | spark-arquitetura | spark-performance | spark-sql