Componentes principais

graph TD
    subgraph Máquina do usuário
        APP[Aplicação PySpark]
    end

    subgraph Driver Process
        SC[SparkContext]
        DAG[DAGScheduler]
        TS[TaskScheduler]
    end

    subgraph Cluster Manager
        CM[YARN / Kubernetes / Standalone]
    end

    subgraph Worker Node 1
        E1[Executor 1<br/>N threads em paralelo<br/>Cache de dados]
    end

    subgraph Worker Node 2
        E2[Executor 2<br/>N threads em paralelo<br/>Cache de dados]
    end

    APP --> SC
    SC --> DAG --> TS --> CM
    CM --> E1
    CM --> E2

Driver

O Driver é o processo central da aplicação. Ele é responsável por:

  • Manter o SparkContext e a SparkSession
  • Converter o código da aplicação em um DAG de operações
  • Dividir o DAG em Stages e alocar Tasks para os Executors
  • Coletar resultados quando a aplicação chama collect() ou similar
  • Coordenar os Executors durante toda a execução

O Driver roda na máquina que inicia a aplicação (modo client) ou em um nó do cluster (modo cluster). Em produção, o modo cluster é recomendado porque o processo do Driver não fica dependente da conectividade da máquina cliente.

Cluster Manager

O Cluster Manager aloca recursos (CPU e memória) para o Driver e os Executors. O Spark não impõe um Cluster Manager específico:

Cluster ManagerContexto de uso
StandaloneCluster simples do próprio Spark, sem dependência de YARN ou Kubernetes
YARNIntegração com ecossistema Hadoop, comum em ambientes legados
KubernetesPadrão atual para cloud-native, cada Executor é um Pod
local[N]Modo de desenvolvimento, simula N Executors em threads locais

Executor

Processos JVM rodando nos Worker Nodes. Cada Executor:

  • Recebe Tasks do Driver e as executa em paralelo (uma thread por core alocado)
  • Armazena dados em cache quando a aplicação chama persist() ou cache()
  • Persiste durante toda a vida da aplicação (salvo com Dynamic Allocation habilitado)
  • Reporta o progresso e resultados de volta ao Driver

Modelo de execução: Aplicação, Job, Stage e Task

graph LR
    AP[Aplicação] --> J1[Job 1<br/>por Action]
    AP --> J2[Job 2<br/>por Action]
    J1 --> S1[Stage 1<br/>transf. narrow]
    J1 --> S2[Stage 2<br/>após o shuffle]
    S1 --> T1[Task 1<br/>partição 1]
    S1 --> T2[Task 2<br/>partição 2]
    S1 --> TN[Task N<br/>partição N]
    S2 --> TK[Task 1]
    S2 --> TM[Task N]
  • Aplicação: todo o programa PySpark, desde o SparkSession.builder até o spark.stop()
  • Job: criado cada vez que uma Action é chamada. Um Job representa todo o trabalho necessário para produzir o resultado daquela Action
  • Stage: o DAGScheduler divide o Job em Stages nas fronteiras de Shuffle. Dentro de um Stage, todas as transformações são “narrow” (sem movimentação de dados entre partições)
  • Task: a menor unidade de trabalho. Cada Task processa exatamente uma partição e roda em uma thread de um Executor. Um Stage tem tantas Tasks quanto o número de partições

Lazy Evaluation (avaliação preguiçosa)

Transformações como filter(), select(), groupBy() e join() não executam imediatamente. Elas constroem um plano (DAG). A execução ocorre apenas quando uma Action é chamada:

df = spark.read.parquet("s3://lake/pedidos/")  # nada executa
df2 = df.filter(df.status == "ativo")          # nada executa
df3 = df2.groupBy("cliente_id").count()        # nada executa
 
df3.show()  # apenas aqui o Spark executa tudo, otimizando o plano completo

A avaliação preguiçosa permite que o Catalyst Optimizer (ver spark-sql) analise o plano completo antes de executar, combinando transformações e eliminando trabalho desnecessário.

Transformações Narrow vs Wide

A distinção entre transformações narrow e wide determina onde o Spark cria as fronteiras de Stage:

graph LR
    subgraph Narrow: sem shuffle
        P1A[Partição 1<br/>Entrada] --> P1B[Partição 1<br/>Saída]
        P2A[Partição 2<br/>Entrada] --> P2B[Partição 2<br/>Saída]
    end
graph LR
    subgraph Wide: com shuffle
        Q1A[Partição 1] --> Q1B[Partição 1]
        Q1A --> Q2B[Partição 2]
        Q2A[Partição 2] --> Q1B
        Q2A --> Q2B
    end
TipoExemplosComportamento
Narrowfilter, select, withColumn, map, unionCada partição de entrada produz exatamente uma partição de saída, sem precisar de dados de outras partições
WidegroupBy, join, orderBy, repartition, distinctDados de múltiplas partições de entrada podem ir para a mesma partição de saída, exigindo Shuffle

Cada transformação Wide cria uma fronteira de Stage no DAG.

Shuffle

O Shuffle é o processo de redistribuir dados entre os Executors para que registros com a mesma chave fiquem na mesma partição. É a operação mais custosa do Spark:

sequenceDiagram
    participant M1 as Executor 1 (Map)
    participant M2 as Executor 2 (Map)
    participant R1 as Executor 1 (Reduce)
    participant R2 as Executor 2 (Reduce)

    M1->>M1: Shuffle write (serializa, grava em disco)
    M2->>M2: Shuffle write (serializa, grava em disco)
    R1->>M1: Lê dados de key range A
    R1->>M2: Lê dados de key range A
    R2->>M1: Lê dados de key range B
    R2->>M2: Lê dados de key range B
    R1->>R1: Shuffle read (desserializa, agrega)
    R2->>R2: Shuffle read (desserializa, agrega)

O custo do Shuffle vem de três partes:

  1. Shuffle write: serialização e escrita em disco nos Executors do Stage de map
  2. Transferência de rede: movimentação dos dados entre os Executors
  3. Shuffle read: leitura, desserialização e ordenação nos Executors do Stage de reduce

O número de partições após um Shuffle é controlado por spark.sql.shuffle.partitions (padrão: 200). Para datasets menores esse valor é alto demais e cria overhead de scheduling. Para datasets grandes é baixo demais e causa spill para disco. Ver spark-performance para estratégias de ajuste.

Modelo de memória do Executor

spark.executor.memory (ex: 4g)
├── Reserved Memory (~300 MB, overhead interno do Spark)
├── User Memory (~40% do restante)
│     └── Estruturas de dados criadas pelo usuário, UDFs, broadcasting
└── Spark Memory (~60% do restante)
      ├── Execution Memory (dinâmico)
      │     └── Shuffle, sort, aggregation, join, hash tables
      └── Storage Memory (dinâmico)
            └── cache(), persist(), broadcast variables

A divisão entre Execution Memory e Storage Memory é dinâmica (Unified Memory Manager, desde Spark 1.6): quando a Execution Memory precisa de mais espaço, ela pode despejar blocos do Storage Memory (eviction do cache). O comportamento inverso também ocorre quando há espaço livre de execution e o storage precisa crescer.

Além do spark.executor.memory, existe o spark.executor.memoryOverhead (padrão: 10% do executor memory), que aloca memória fora do heap JVM para uso de código nativo, Python workers (PySpark) e containers do Kubernetes.

Configurações relevantes:

spark = SparkSession.builder \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.memoryOverhead", "1g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.driver.maxResultSize", "1g") \
    .config("spark.memory.fraction", "0.6") \
    .config("spark.memory.storageFraction", "0.5") \
    .getOrCreate()

Broadcast Variables e Accumulators

Broadcast Variables

Variáveis enviadas uma única vez do Driver para todos os Executors e mantidas em cache local. Evitam o envio repetido em cada Task:

from pyspark.sql import functions as F
 
# Dicionário de lookups para enriquecer registros
tabela_municipios = spark.sparkContext.broadcast(
    {row["codigo"]: row["nome"] for row in df_municipios.collect()}
)
 
@F.udf("string")
def nome_municipio(codigo):
    return tabela_municipios.value.get(codigo, "desconhecido")
 
df = df.withColumn("municipio", nome_municipio(F.col("codigo_municipio")))

Accumulators

Contadores distribuídos que os Executors atualizam e o Driver lê ao final. Úteis para métricas de qualidade de dados:

registros_invalidos = spark.sparkContext.accumulator(0)
 
def validar(row):
    if row["valor"] is None or row["valor"] < 0:
        registros_invalidos.add(1)
 
df.foreach(validar)
print(f"Registros inválidos encontrados: {registros_invalidos.value}")

Dynamic Allocation

Com Dynamic Allocation, o Spark pede e devolve Executors conforme a demanda da aplicação, em vez de manter um número fixo durante toda a execução:

spark = SparkSession.builder \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "1") \
    .config("spark.dynamicAllocation.maxExecutors", "50") \
    .config("spark.dynamicAllocation.initialExecutors", "5") \
    .getOrCreate()

Dynamic Allocation é especialmente útil em jobs com fases de processamento muito distintas (ex: ingesta leve seguida de aggregation pesada).

Ver também: spark | spark-apis | spark-sql | spark-performance | spark-troubleshooting