Componentes principais
graph TD subgraph Máquina do usuário APP[Aplicação PySpark] end subgraph Driver Process SC[SparkContext] DAG[DAGScheduler] TS[TaskScheduler] end subgraph Cluster Manager CM[YARN / Kubernetes / Standalone] end subgraph Worker Node 1 E1[Executor 1<br/>N threads em paralelo<br/>Cache de dados] end subgraph Worker Node 2 E2[Executor 2<br/>N threads em paralelo<br/>Cache de dados] end APP --> SC SC --> DAG --> TS --> CM CM --> E1 CM --> E2
Driver
O Driver é o processo central da aplicação. Ele é responsável por:
- Manter o
SparkContexte aSparkSession - Converter o código da aplicação em um DAG de operações
- Dividir o DAG em Stages e alocar Tasks para os Executors
- Coletar resultados quando a aplicação chama
collect()ou similar - Coordenar os Executors durante toda a execução
O Driver roda na máquina que inicia a aplicação (modo client) ou em um nó do cluster (modo cluster). Em produção, o modo cluster é recomendado porque o processo do Driver não fica dependente da conectividade da máquina cliente.
Cluster Manager
O Cluster Manager aloca recursos (CPU e memória) para o Driver e os Executors. O Spark não impõe um Cluster Manager específico:
| Cluster Manager | Contexto de uso |
|---|---|
| Standalone | Cluster simples do próprio Spark, sem dependência de YARN ou Kubernetes |
| YARN | Integração com ecossistema Hadoop, comum em ambientes legados |
| Kubernetes | Padrão atual para cloud-native, cada Executor é um Pod |
| local[N] | Modo de desenvolvimento, simula N Executors em threads locais |
Executor
Processos JVM rodando nos Worker Nodes. Cada Executor:
- Recebe Tasks do Driver e as executa em paralelo (uma thread por core alocado)
- Armazena dados em cache quando a aplicação chama
persist()oucache() - Persiste durante toda a vida da aplicação (salvo com Dynamic Allocation habilitado)
- Reporta o progresso e resultados de volta ao Driver
Modelo de execução: Aplicação, Job, Stage e Task
graph LR AP[Aplicação] --> J1[Job 1<br/>por Action] AP --> J2[Job 2<br/>por Action] J1 --> S1[Stage 1<br/>transf. narrow] J1 --> S2[Stage 2<br/>após o shuffle] S1 --> T1[Task 1<br/>partição 1] S1 --> T2[Task 2<br/>partição 2] S1 --> TN[Task N<br/>partição N] S2 --> TK[Task 1] S2 --> TM[Task N]
- Aplicação: todo o programa PySpark, desde o
SparkSession.builderaté ospark.stop() - Job: criado cada vez que uma Action é chamada. Um Job representa todo o trabalho necessário para produzir o resultado daquela Action
- Stage: o DAGScheduler divide o Job em Stages nas fronteiras de Shuffle. Dentro de um Stage, todas as transformações são “narrow” (sem movimentação de dados entre partições)
- Task: a menor unidade de trabalho. Cada Task processa exatamente uma partição e roda em uma thread de um Executor. Um Stage tem tantas Tasks quanto o número de partições
Lazy Evaluation (avaliação preguiçosa)
Transformações como filter(), select(), groupBy() e join() não executam imediatamente. Elas constroem um plano (DAG). A execução ocorre apenas quando uma Action é chamada:
df = spark.read.parquet("s3://lake/pedidos/") # nada executa
df2 = df.filter(df.status == "ativo") # nada executa
df3 = df2.groupBy("cliente_id").count() # nada executa
df3.show() # apenas aqui o Spark executa tudo, otimizando o plano completoA avaliação preguiçosa permite que o Catalyst Optimizer (ver spark-sql) analise o plano completo antes de executar, combinando transformações e eliminando trabalho desnecessário.
Transformações Narrow vs Wide
A distinção entre transformações narrow e wide determina onde o Spark cria as fronteiras de Stage:
graph LR subgraph Narrow: sem shuffle P1A[Partição 1<br/>Entrada] --> P1B[Partição 1<br/>Saída] P2A[Partição 2<br/>Entrada] --> P2B[Partição 2<br/>Saída] end
graph LR subgraph Wide: com shuffle Q1A[Partição 1] --> Q1B[Partição 1] Q1A --> Q2B[Partição 2] Q2A[Partição 2] --> Q1B Q2A --> Q2B end
| Tipo | Exemplos | Comportamento |
|---|---|---|
| Narrow | filter, select, withColumn, map, union | Cada partição de entrada produz exatamente uma partição de saída, sem precisar de dados de outras partições |
| Wide | groupBy, join, orderBy, repartition, distinct | Dados de múltiplas partições de entrada podem ir para a mesma partição de saída, exigindo Shuffle |
Cada transformação Wide cria uma fronteira de Stage no DAG.
Shuffle
O Shuffle é o processo de redistribuir dados entre os Executors para que registros com a mesma chave fiquem na mesma partição. É a operação mais custosa do Spark:
sequenceDiagram participant M1 as Executor 1 (Map) participant M2 as Executor 2 (Map) participant R1 as Executor 1 (Reduce) participant R2 as Executor 2 (Reduce) M1->>M1: Shuffle write (serializa, grava em disco) M2->>M2: Shuffle write (serializa, grava em disco) R1->>M1: Lê dados de key range A R1->>M2: Lê dados de key range A R2->>M1: Lê dados de key range B R2->>M2: Lê dados de key range B R1->>R1: Shuffle read (desserializa, agrega) R2->>R2: Shuffle read (desserializa, agrega)
O custo do Shuffle vem de três partes:
- Shuffle write: serialização e escrita em disco nos Executors do Stage de map
- Transferência de rede: movimentação dos dados entre os Executors
- Shuffle read: leitura, desserialização e ordenação nos Executors do Stage de reduce
O número de partições após um Shuffle é controlado por spark.sql.shuffle.partitions (padrão: 200). Para datasets menores esse valor é alto demais e cria overhead de scheduling. Para datasets grandes é baixo demais e causa spill para disco. Ver spark-performance para estratégias de ajuste.
Modelo de memória do Executor
spark.executor.memory (ex: 4g)
├── Reserved Memory (~300 MB, overhead interno do Spark)
├── User Memory (~40% do restante)
│ └── Estruturas de dados criadas pelo usuário, UDFs, broadcasting
└── Spark Memory (~60% do restante)
├── Execution Memory (dinâmico)
│ └── Shuffle, sort, aggregation, join, hash tables
└── Storage Memory (dinâmico)
└── cache(), persist(), broadcast variables
A divisão entre Execution Memory e Storage Memory é dinâmica (Unified Memory Manager, desde Spark 1.6): quando a Execution Memory precisa de mais espaço, ela pode despejar blocos do Storage Memory (eviction do cache). O comportamento inverso também ocorre quando há espaço livre de execution e o storage precisa crescer.
Além do spark.executor.memory, existe o spark.executor.memoryOverhead (padrão: 10% do executor memory), que aloca memória fora do heap JVM para uso de código nativo, Python workers (PySpark) e containers do Kubernetes.
Configurações relevantes:
spark = SparkSession.builder \
.config("spark.executor.memory", "4g") \
.config("spark.executor.memoryOverhead", "1g") \
.config("spark.driver.memory", "2g") \
.config("spark.driver.maxResultSize", "1g") \
.config("spark.memory.fraction", "0.6") \
.config("spark.memory.storageFraction", "0.5") \
.getOrCreate()Broadcast Variables e Accumulators
Broadcast Variables
Variáveis enviadas uma única vez do Driver para todos os Executors e mantidas em cache local. Evitam o envio repetido em cada Task:
from pyspark.sql import functions as F
# Dicionário de lookups para enriquecer registros
tabela_municipios = spark.sparkContext.broadcast(
{row["codigo"]: row["nome"] for row in df_municipios.collect()}
)
@F.udf("string")
def nome_municipio(codigo):
return tabela_municipios.value.get(codigo, "desconhecido")
df = df.withColumn("municipio", nome_municipio(F.col("codigo_municipio")))Accumulators
Contadores distribuídos que os Executors atualizam e o Driver lê ao final. Úteis para métricas de qualidade de dados:
registros_invalidos = spark.sparkContext.accumulator(0)
def validar(row):
if row["valor"] is None or row["valor"] < 0:
registros_invalidos.add(1)
df.foreach(validar)
print(f"Registros inválidos encontrados: {registros_invalidos.value}")Dynamic Allocation
Com Dynamic Allocation, o Spark pede e devolve Executors conforme a demanda da aplicação, em vez de manter um número fixo durante toda a execução:
spark = SparkSession.builder \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors", "1") \
.config("spark.dynamicAllocation.maxExecutors", "50") \
.config("spark.dynamicAllocation.initialExecutors", "5") \
.getOrCreate()Dynamic Allocation é especialmente útil em jobs com fases de processamento muito distintas (ex: ingesta leve seguida de aggregation pesada).
Ver também: spark | spark-apis | spark-sql | spark-performance | spark-troubleshooting