Spark UI
A Spark UI é a primeira ferramenta de diagnóstico. Acessível em http://driver-host:4040 durante a execução do job (ou no histórico via Spark History Server):
Abas essenciais
| Aba | O que ver |
|---|---|
| Jobs | Lista de jobs, status (running, succeeded, failed), duração total |
| Stages | DAG de stages, tempo por stage, tasks com skew, volume de shuffle |
| Storage | DataFrames em cache, memória utilizada por RDD/DataFrame |
| Environment | Configurações ativas do Spark (confirmar se as configs foram aplicadas) |
| Executors | Memória usada, tempo de GC, tasks ativas, falhas por executor |
| SQL | Planos de execução com métricas reais, bytess lidos/escritos por operação |
Métricas por Stage: o que significa cada número
Dentro de uma Stage, cada Task exibe:
- Duration: tempo de execução da task
- GC Time: tempo gasto em garbage collection (deve ser < 10% do Duration)
- Input Size / Records: dados lidos da fonte ou shuffle read
- Shuffle Write Size / Records: dados escritos para o próximo stage
- Spill (Memory): dados que saíram da memória para disco (ruim)
- Spill (Disk): dados lidos do spill em disco
Problemas comuns e diagnóstico
1. OOM no Driver (OutOfMemoryError: Java heap space)
Sintomas: job falha com java.lang.OutOfMemoryError no processo do Driver.
Causas e soluções:
# Causa: collect() de dados grandes (traz tudo para o Driver)
dados = df.collect() # perigoso para DataFrames grandes
dados = df.take(1000) # melhor: limitar a quantidade retornada
# Causa: broadcast de tabela maior que a memória do Driver
resultado = df.join(F.broadcast(df_grande), ...)
# Solução: aumentar memória do Driver ou não usar broadcast
spark.conf.set("spark.driver.memory", "8g")
spark.conf.set("spark.driver.maxResultSize", "2g")
# Causa: resultado de groupBy.count() acumulando no Driver em streaming (complete mode)
# Solução: usar update mode ou adicionar watermark + append mode
# Conferir limite de memória para results coletados pelo Driver
spark.conf.get("spark.driver.maxResultSize") # padrão: 1g2. OOM no Executor
Sintomas: ExecutorLostFailure, SparkException: Task failed while writing rows ou java.lang.OutOfMemoryError nos logs do Executor.
Causas e soluções:
# Causa: partição muito grande (não cabe na memória da task)
# Verificar: Spark UI > Stages > Tasks > "Input Size" (máx muito acima da média)
# Solução 1: aumentar memória do Executor
spark.conf.set("spark.executor.memory", "8g")
# Solução 2: aumentar overhead para PySpark (Python usa memória fora do heap JVM)
spark.conf.set("spark.executor.memoryOverhead", "2g")
# Solução 3: reparticionar para reduzir tamanho de cada partição
df = df.repartition(500)
# Causa: UDF Python com vazamento de memória ou objetos grandes
# Solução: usar Pandas UDFs que processam bateladas e liberam memória
# Causa: spill de shuffle para disco acumulando
# Solução: aumentar spark.memory.fraction ou reduzir shuffle.partitions
spark.conf.set("spark.memory.fraction", "0.75")3. Data Skew
Sintomas: uma Stage demora muito mais que o esperado. No Spark UI > Stage > Tasks, uma ou poucas tasks têm duração muito maior que as outras.
Diagnóstico:
# Verificar distribuição de dados pela chave de join ou groupBy
df.groupBy("chave_problematica") \
.count() \
.orderBy(F.col("count").desc()) \
.show(20)Soluções:
# Solução 1: habilitar AQE skew handling (mais simples)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Solução 2: usar broadcast join se a tabela menor for pequena o suficiente
resultado = df_grande.join(F.broadcast(df_pequeno), on="chave")
# Solução 3: salting manual (para casos que o AQE não cobre)
# Ver spark-performance para a técnica completa de salting4. GC Pressure
Sintomas: GC Time alto no Spark UI (> 10% do Duration), jobs lentos com uso alto de memória.
Causas e soluções:
# Causa: muitos objetos Java de curta duração na heap (common em UDFs Python old-style)
# Solução 1: usar Pandas UDFs ou funções nativas
# Solução 2: usar serialização Kryo (reduz o tamanho dos objetos na heap)
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
# Solução 3: usar off-heap memory (Tungsten bypassa o GC do JVM)
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "2g")
# Solução 4: aumentar memória do Executor (mais espaço = GC menos frequente)
spark.conf.set("spark.executor.memory", "8g")5. Slow Stage por Shuffle Excessivo
Sintomas: no Spark UI, a stage tem Shuffle Write ou Shuffle Read com dezenas ou centenas de GBs.
Diagnóstico:
# Verificar plano de execução para entender quais operações geram shuffle
df.explain("formatted")
# Procurar por "Exchange hashpartitioning" ou "Exchange rangepartitioning" no planoSoluções:
# Evitar sortBy desnecessário (usa shuffle global para ordenação total)
# Substituir por orderBy dentro de partições quando ordenação global não é necessária
# Reduzir número de colunas antes do groupBy/join (column pruning manual)
df_slim = df.select("coluna_chave", "coluna_valor")
resultado = df_slim.groupBy("coluna_chave").sum("coluna_valor")
# Usar broadcast para tabelas pequenas
# Pré-agregar antes do join (reduz volume que vai para o shuffle)
df_agg = df_detalhe.groupBy("chave").agg(F.sum("valor").alias("total"))
resultado = df_principal.join(df_agg, on="chave")6. Spill para Disco
Sintomas: no Spark UI > Stage > Tasks, as colunas “Spill (Memory)” e “Spill (Disk)” mostram valores significativos.
Spill indica que a memória de execução foi insuficiente para uma operação (aggregation, sort, shuffle), forçando escrita temporária em disco.
Soluções:
# Aumentar fração de memória dedicada à execução
spark.conf.set("spark.memory.fraction", "0.75")
# Reduzir tamanho das partições (mais partições, cada uma menor)
spark.conf.set("spark.sql.shuffle.partitions", "400")
# Aumentar memória do Executor
spark.conf.set("spark.executor.memory", "8g")7. Job Falha e Reinicia (Executor Lost / Task Killed)
Sintomas: tasks reiniciando repetidamente, mensagens de ExecutorLostFailure nos logs.
# Aumentar tentativas de task e executor antes de falhar o job
spark.conf.set("spark.task.maxFailures", "4")
spark.conf.set("spark.stage.maxConsecutiveAttempts", "4")
spark.conf.set("spark.executor.heartbeatInterval", "20s")
spark.conf.set("spark.network.timeout", "300s")
spark.conf.set("spark.rpc.message.maxSize", "256") # MB, para mensagens grandesLogs e diagnóstico
# Habilitar log do Spark (nível INFO para ver mais detalhes)
import logging
logging.getLogger("pyspark").setLevel(logging.WARN)
# No cluster (via configuração)
spark.conf.set("spark.eventLog.enabled", "true")
spark.conf.set("spark.eventLog.dir", "gs://meu-bucket/spark-logs/")Logs no Dataproc (GCP)
# Ver logs de um job no Dataproc
gcloud dataproc jobs describe JOB_ID --region=REGIAO
# Logs detalhados de Executors via Cloud Logging
gcloud logging read "resource.type=cloud_dataproc_job AND resource.labels.job_id=JOB_ID"Configurações de diagnóstico recomendadas
spark = SparkSession.builder \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.adaptive.skewJoin.enabled", "true") \
.config("spark.eventLog.enabled", "true") \
.config("spark.eventLog.dir", "gs://bucket/spark-history/") \
.config("spark.history.ui.port", "18080") \
.config("spark.executor.memory", "4g") \
.config("spark.executor.memoryOverhead", "1g") \
.config("spark.driver.memory", "2g") \
.config("spark.driver.maxResultSize", "1g") \
.config("spark.sql.shuffle.partitions", "200") \
.config("spark.task.maxFailures", "4") \
.config("spark.network.timeout", "300s") \
.getOrCreate()Referência rápida de diagnóstico
Job lento?
Abrir Spark UI > Jobs > encontrar stage mais lenta
Stage específica lenta?
Tasks tab > comparar Duration (min/median/max)
max >> median?
Skew → checar "Input Size" das tasks, usar AQE ou salting
GC Time alto (>10% do Duration)?
Memória insuficiente → aumentar executor.memory ou usar Kryo
Spill > 0?
Partições muito grandes → aumentar shuffle.partitions, aumentar memória
Shuffle Read/Write muito alto?
Verificar se broadcast join pode ser usado
Verificar plano com explain("formatted")
OOM no Driver?
Remover collect() desnecessário, aumentar driver.memory
Ver também: spark | spark-arquitetura | spark-performance | spark-sql