Performance no Spark é determinada principalmente por três fatores: volume de dados no Shuffle, distribuição uniforme das partições e uso eficiente da memória dos Executors. A maioria dos problemas de performance no Spark tem origem em uma dessas três áreas.

Particionamento

Número de partições

O número de partições determina o paralelismo do job. Cada Task processa uma partição em uma thread de um Executor:

  • Partições demais: overhead de scheduling, muitos arquivos pequenos na escrita
  • Partições de menos: Executors ociosos, OOM se partições forem muito grandes

Regra prática: 2 a 4 vezes o total de cores disponíveis no cluster.

# Verificar partições atuais
print(df.rdd.getNumPartitions())
 
# Ajustar número de partições após shuffle
spark.conf.set("spark.sql.shuffle.partitions", "100")  # padrão é 200 (alto para datasets pequenos)
 
# Reparticionar explicitamente
df = df.repartition(100)            # com shuffle
df = df.coalesce(10)                # sem shuffle, apenas reduz
df = df.repartition(50, "regiao")   # distribui por coluna, útil antes de writes particionados

Particionamento para escrita (partition pruning)

Particionar a escrita por colunas usadas em filtros frequentes reduz os dados lidos nas queries:

# Escrita particionada por ano e mes
(
    df.write
    .mode("overwrite")
    .partitionBy("ano", "mes")
    .parquet("gs://lake/silver/pedidos/")
)
 
# Query que se beneficia da partition pruning (lê apenas partições de 2026-05)
spark.read.parquet("gs://lake/silver/pedidos/") \
    .filter("ano = 2026 AND mes = 5") \
    .count()

O problema dos arquivos pequenos

Muitas partições ao escrever geram muitos arquivos pequenos, prejudicando a performance de leituras futuras. Soluções:

# Reduzir partições antes de escrever (coalesce não gera shuffle)
df.coalesce(20).write.parquet("gs://lake/output/")
 
# Repartir por coluna para equalizar tamanho dos arquivos e manter partição
df.repartition(20, "regiao").write.partitionBy("regiao").parquet("gs://lake/output/")
 
# No Delta Lake: OPTIMIZE compacta arquivos pequenos automaticamente
spark.sql("OPTIMIZE meu_db.pedidos")

Broadcast Join

O Broadcast Join elimina o Shuffle ao enviar a tabela menor para todos os Executors uma única vez:

graph LR
    subgraph Sem broadcast
        T1[Tabela grande] -->|shuffle| S1[Shuffle]
        T2[Tabela pequena] -->|shuffle| S1
        S1 --> R1[Resultado]
    end

    subgraph Com broadcast
        T3[Tabela grande] --> J1[Join local]
        T4[Tabela pequena] -->|broadcast sem shuffle| E1[Executor 1]
        T4 -->|broadcast| E2[Executor 2]
        E1 --> J1
        J1 --> R2[Resultado]
    end
from pyspark.sql import functions as F
 
# Automático: Catalyst usa broadcast quando a tabela é menor que o threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10m")  # padrão 10 MB
 
# Forçar broadcast com hint (quando o Catalyst não detecta automaticamente)
resultado = df_grande.join(
    F.broadcast(df_pequeno),
    on="municipio_id"
)
 
# Desabilitar broadcast (para diagnosticar problemas de OOM por broadcast grande)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

Limite prático do broadcast: tabelas maiores que 100-200 MB na memória serializada causam OOM no Driver (que precisa coletar os dados para enviar aos Executors).

Caching e Persist

Cache reutiliza dados calculados sem recomputar a partir do source. Só faz sentido quando o mesmo DataFrame é usado mais de uma vez no código:

from pyspark.storagelevel import StorageLevel
 
# cache() é equivalente a persist(StorageLevel.MEMORY_AND_DISK)
df.cache()
 
# persist com controle fino do storage level
df.persist(StorageLevel.MEMORY_ONLY)          # mais rápido, pode causar recomputação se não couber
df.persist(StorageLevel.MEMORY_AND_DISK)      # mais seguro, spill para disco se necessário
df.persist(StorageLevel.DISK_ONLY)            # para dados grandes que não cabem em memória
df.persist(StorageLevel.MEMORY_AND_DISK_SER)  # serializado: menos memória, mais CPU para desserializar
 
# Forçar materialização do cache (cache é lazy: só ocorre quando uma Action for chamada)
df.cache()
df.count()  # força a materialização
 
# Liberar cache quando não precisar mais (importante para não desperdiçar memória dos Executors)
df.unpersist()

Antipadrão: chamar cache() em DataFrames usados apenas uma vez, pois a serialização e o armazenamento consomem memória sem benefício.

Data Skew

Skew ocorre quando algumas partições têm muito mais dados que outras, criando tarefas “straggler” que atrasam toda a Stage:

Partição 1: 5 MB   (task termina em 2s)
Partição 2: 5 MB   (task termina em 2s)
Partição 3: 800 MB (task demora 3 minutos)  <- STRAGGLER

AQE Skew Handling (automático, Spark 3.0+)

O Adaptive Query Execution detecta e divide partições com skew automaticamente:

spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Partição é considerada com skew se for mais de N vezes a mediana E maior que M bytes
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor",        "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")

Salting (tratamento manual de skew)

Quando o AQE não é suficiente, adicionar um prefixo aleatório (salt) à chave de join distribui as partições artificialmente:

import random
 
NUM_SALTS = 10
 
# Adicionar salt aleatório à tabela maior
df_grande = df_grande.withColumn(
    "chave_salt",
    F.concat(F.col("chave"), F.lit("_"), (F.rand() * NUM_SALTS).cast("int").cast("string"))
)
 
# Explode a tabela menor com todos os valores de salt possíveis
salt_values = spark.range(NUM_SALTS).withColumnRenamed("id", "salt")
df_pequeno = df_pequeno.crossJoin(salt_values).withColumn(
    "chave_salt",
    F.concat(F.col("chave"), F.lit("_"), F.col("salt").cast("string"))
).drop("salt")
 
# Join pela chave saltada (sem skew agora)
resultado = df_grande.join(df_pequeno, on="chave_salt").drop("chave_salt")

Diagnosticar skew no Spark UI

  1. Ir para a Stage com longa duração no Spark UI
  2. Clicar na Stage e ver a distribuição das Tasks
  3. Comparar min, median e max de “Duration” e “Input Size”
  4. Se max for muito maior que median, há skew

Bucketing

Bucketing pré-particiona dados por uma coluna de join ao escrever, eliminando o shuffle em joins futuros entre tabelas bucketizadas:

# Escrever tabela bucketizada (uma vez, na preparação dos dados)
(
    df_pedidos
    .write
    .mode("overwrite")
    .bucketBy(50, "cliente_id")
    .sortBy("cliente_id")
    .saveAsTable("meu_db.pedidos_bucketed")
)
 
(
    df_clientes
    .write
    .mode("overwrite")
    .bucketBy(50, "cliente_id")
    .sortBy("cliente_id")
    .saveAsTable("meu_db.clientes_bucketed")
)
 
# Join posterior sem shuffle (ambas as tabelas já estão co-particionadas)
resultado = (
    spark.table("meu_db.pedidos_bucketed")
    .join(spark.table("meu_db.clientes_bucketed"), on="cliente_id")
)
resultado.explain()  # deve mostrar "SortMergeJoin" sem "Exchange" (sem shuffle)

O bucketing é eficaz para tabelas grandes que são juntadas frequentemente pelas mesmas chaves.

Serialização

Kryo é significativamente mais rápido e compacto que o serializador Java padrão para tipos customizados:

spark = SparkSession.builder \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryo.registrationRequired", "false") \
    .getOrCreate()

Para RDDs com tipos customizados, registrar as classes melhora ainda mais a performance:

// Scala: registrar classes para Kryo
spark.conf.set("spark.kryo.classesToRegister", "com.exemplo.MinhaClasse")

Configurações críticas de performance

# Número de partições após shuffle (ajustar conforme o volume de dados)
spark.conf.set("spark.sql.shuffle.partitions", "200")
 
# AQE: adaptação automática em tempo de execução
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
 
# Broadcast join automático até 50 MB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50m")
 
# Compressão do shuffle (snappy é o padrão, lz4 é mais rápido)
spark.conf.set("spark.io.compression.codec", "lz4")
 
# Spill para disco quando memória de execução é insuficiente
spark.conf.set("spark.memory.fraction",        "0.6")
spark.conf.set("spark.memory.storageFraction", "0.5")
 
# Dynamic Allocation: ajusta executors conforme a demanda
spark.conf.set("spark.dynamicAllocation.enabled",         "true")
spark.conf.set("spark.dynamicAllocation.minExecutors",     "1")
spark.conf.set("spark.dynamicAllocation.maxExecutors",     "50")
spark.conf.set("spark.dynamicAllocation.initialExecutors", "5")

Checklist de otimização

Ao investigar um job lento, seguir esta ordem:

  1. Shuffle excessivo: verificar no Spark UI quantos bytes são lidos/escritos no shuffle. Reduzir operações wide ou usar broadcast join
  2. Skew: comparar duração e tamanho das Tasks na Stage mais lenta. Usar AQE ou salting
  3. Partições: verificar se o número é compatível com o volume de dados. Ajustar shuffle.partitions
  4. Cache: verificar se DataFrames reutilizados estão em cache e se o cache está materializado
  5. GC: verificar no aba Executors do Spark UI se o tempo de GC é alto. Ver spark-troubleshooting
  6. UDFs Python: substituir por Pandas UDFs ou funções nativas do pyspark.sql.functions

Ver também: spark | spark-arquitetura | spark-sql | spark-troubleshooting