Performance no Spark é determinada principalmente por três fatores: volume de dados no Shuffle, distribuição uniforme das partições e uso eficiente da memória dos Executors. A maioria dos problemas de performance no Spark tem origem em uma dessas três áreas.
Particionamento
Número de partições
O número de partições determina o paralelismo do job. Cada Task processa uma partição em uma thread de um Executor:
- Partições demais: overhead de scheduling, muitos arquivos pequenos na escrita
- Partições de menos: Executors ociosos, OOM se partições forem muito grandes
Regra prática: 2 a 4 vezes o total de cores disponíveis no cluster.
# Verificar partições atuais
print(df.rdd.getNumPartitions())
# Ajustar número de partições após shuffle
spark.conf.set("spark.sql.shuffle.partitions", "100") # padrão é 200 (alto para datasets pequenos)
# Reparticionar explicitamente
df = df.repartition(100) # com shuffle
df = df.coalesce(10) # sem shuffle, apenas reduz
df = df.repartition(50, "regiao") # distribui por coluna, útil antes de writes particionadosParticionamento para escrita (partition pruning)
Particionar a escrita por colunas usadas em filtros frequentes reduz os dados lidos nas queries:
# Escrita particionada por ano e mes
(
df.write
.mode("overwrite")
.partitionBy("ano", "mes")
.parquet("gs://lake/silver/pedidos/")
)
# Query que se beneficia da partition pruning (lê apenas partições de 2026-05)
spark.read.parquet("gs://lake/silver/pedidos/") \
.filter("ano = 2026 AND mes = 5") \
.count()O problema dos arquivos pequenos
Muitas partições ao escrever geram muitos arquivos pequenos, prejudicando a performance de leituras futuras. Soluções:
# Reduzir partições antes de escrever (coalesce não gera shuffle)
df.coalesce(20).write.parquet("gs://lake/output/")
# Repartir por coluna para equalizar tamanho dos arquivos e manter partição
df.repartition(20, "regiao").write.partitionBy("regiao").parquet("gs://lake/output/")
# No Delta Lake: OPTIMIZE compacta arquivos pequenos automaticamente
spark.sql("OPTIMIZE meu_db.pedidos")Broadcast Join
O Broadcast Join elimina o Shuffle ao enviar a tabela menor para todos os Executors uma única vez:
graph LR subgraph Sem broadcast T1[Tabela grande] -->|shuffle| S1[Shuffle] T2[Tabela pequena] -->|shuffle| S1 S1 --> R1[Resultado] end subgraph Com broadcast T3[Tabela grande] --> J1[Join local] T4[Tabela pequena] -->|broadcast sem shuffle| E1[Executor 1] T4 -->|broadcast| E2[Executor 2] E1 --> J1 J1 --> R2[Resultado] end
from pyspark.sql import functions as F
# Automático: Catalyst usa broadcast quando a tabela é menor que o threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10m") # padrão 10 MB
# Forçar broadcast com hint (quando o Catalyst não detecta automaticamente)
resultado = df_grande.join(
F.broadcast(df_pequeno),
on="municipio_id"
)
# Desabilitar broadcast (para diagnosticar problemas de OOM por broadcast grande)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")Limite prático do broadcast: tabelas maiores que 100-200 MB na memória serializada causam OOM no Driver (que precisa coletar os dados para enviar aos Executors).
Caching e Persist
Cache reutiliza dados calculados sem recomputar a partir do source. Só faz sentido quando o mesmo DataFrame é usado mais de uma vez no código:
from pyspark.storagelevel import StorageLevel
# cache() é equivalente a persist(StorageLevel.MEMORY_AND_DISK)
df.cache()
# persist com controle fino do storage level
df.persist(StorageLevel.MEMORY_ONLY) # mais rápido, pode causar recomputação se não couber
df.persist(StorageLevel.MEMORY_AND_DISK) # mais seguro, spill para disco se necessário
df.persist(StorageLevel.DISK_ONLY) # para dados grandes que não cabem em memória
df.persist(StorageLevel.MEMORY_AND_DISK_SER) # serializado: menos memória, mais CPU para desserializar
# Forçar materialização do cache (cache é lazy: só ocorre quando uma Action for chamada)
df.cache()
df.count() # força a materialização
# Liberar cache quando não precisar mais (importante para não desperdiçar memória dos Executors)
df.unpersist()Antipadrão: chamar cache() em DataFrames usados apenas uma vez, pois a serialização e o armazenamento consomem memória sem benefício.
Data Skew
Skew ocorre quando algumas partições têm muito mais dados que outras, criando tarefas “straggler” que atrasam toda a Stage:
Partição 1: 5 MB (task termina em 2s)
Partição 2: 5 MB (task termina em 2s)
Partição 3: 800 MB (task demora 3 minutos) <- STRAGGLER
AQE Skew Handling (automático, Spark 3.0+)
O Adaptive Query Execution detecta e divide partições com skew automaticamente:
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Partição é considerada com skew se for mais de N vezes a mediana E maior que M bytes
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")Salting (tratamento manual de skew)
Quando o AQE não é suficiente, adicionar um prefixo aleatório (salt) à chave de join distribui as partições artificialmente:
import random
NUM_SALTS = 10
# Adicionar salt aleatório à tabela maior
df_grande = df_grande.withColumn(
"chave_salt",
F.concat(F.col("chave"), F.lit("_"), (F.rand() * NUM_SALTS).cast("int").cast("string"))
)
# Explode a tabela menor com todos os valores de salt possíveis
salt_values = spark.range(NUM_SALTS).withColumnRenamed("id", "salt")
df_pequeno = df_pequeno.crossJoin(salt_values).withColumn(
"chave_salt",
F.concat(F.col("chave"), F.lit("_"), F.col("salt").cast("string"))
).drop("salt")
# Join pela chave saltada (sem skew agora)
resultado = df_grande.join(df_pequeno, on="chave_salt").drop("chave_salt")Diagnosticar skew no Spark UI
- Ir para a Stage com longa duração no Spark UI
- Clicar na Stage e ver a distribuição das Tasks
- Comparar
min,medianemaxde “Duration” e “Input Size” - Se
maxfor muito maior quemedian, há skew
Bucketing
Bucketing pré-particiona dados por uma coluna de join ao escrever, eliminando o shuffle em joins futuros entre tabelas bucketizadas:
# Escrever tabela bucketizada (uma vez, na preparação dos dados)
(
df_pedidos
.write
.mode("overwrite")
.bucketBy(50, "cliente_id")
.sortBy("cliente_id")
.saveAsTable("meu_db.pedidos_bucketed")
)
(
df_clientes
.write
.mode("overwrite")
.bucketBy(50, "cliente_id")
.sortBy("cliente_id")
.saveAsTable("meu_db.clientes_bucketed")
)
# Join posterior sem shuffle (ambas as tabelas já estão co-particionadas)
resultado = (
spark.table("meu_db.pedidos_bucketed")
.join(spark.table("meu_db.clientes_bucketed"), on="cliente_id")
)
resultado.explain() # deve mostrar "SortMergeJoin" sem "Exchange" (sem shuffle)O bucketing é eficaz para tabelas grandes que são juntadas frequentemente pelas mesmas chaves.
Serialização
Kryo é significativamente mais rápido e compacto que o serializador Java padrão para tipos customizados:
spark = SparkSession.builder \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.kryo.registrationRequired", "false") \
.getOrCreate()Para RDDs com tipos customizados, registrar as classes melhora ainda mais a performance:
// Scala: registrar classes para Kryo
spark.conf.set("spark.kryo.classesToRegister", "com.exemplo.MinhaClasse")Configurações críticas de performance
# Número de partições após shuffle (ajustar conforme o volume de dados)
spark.conf.set("spark.sql.shuffle.partitions", "200")
# AQE: adaptação automática em tempo de execução
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Broadcast join automático até 50 MB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50m")
# Compressão do shuffle (snappy é o padrão, lz4 é mais rápido)
spark.conf.set("spark.io.compression.codec", "lz4")
# Spill para disco quando memória de execução é insuficiente
spark.conf.set("spark.memory.fraction", "0.6")
spark.conf.set("spark.memory.storageFraction", "0.5")
# Dynamic Allocation: ajusta executors conforme a demanda
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "1")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "50")
spark.conf.set("spark.dynamicAllocation.initialExecutors", "5")Checklist de otimização
Ao investigar um job lento, seguir esta ordem:
- Shuffle excessivo: verificar no Spark UI quantos bytes são lidos/escritos no shuffle. Reduzir operações wide ou usar broadcast join
- Skew: comparar duração e tamanho das Tasks na Stage mais lenta. Usar AQE ou salting
- Partições: verificar se o número é compatível com o volume de dados. Ajustar
shuffle.partitions - Cache: verificar se DataFrames reutilizados estão em cache e se o cache está materializado
- GC: verificar no aba Executors do Spark UI se o tempo de GC é alto. Ver spark-troubleshooting
- UDFs Python: substituir por Pandas UDFs ou funções nativas do
pyspark.sql.functions
Ver também: spark | spark-arquitetura | spark-sql | spark-troubleshooting