Spark SQL é o módulo do Spark responsável por processar dados estruturados via SQL ou via DataFrame API. Por baixo, o Catalyst Optimizer transforma o código escrito pelo usuário em um plano de execução otimizado antes de qualquer dado ser processado.

Catalyst Optimizer

O Catalyst converte o código da aplicação em código executável em quatro passos:

graph LR
    A[Código do<br/>usuário<br/>SQL ou DataFrame] --> B[Unresolved<br/>Logical Plan]
    B --> C[Resolved<br/>Logical Plan<br/>Catálogo]
    C --> D[Optimized<br/>Logical Plan<br/>regras de otimização]
    D --> E[Physical<br/>Plans<br/>várias estratégias]
    E --> F[Selected<br/>Physical Plan<br/>baseado em custo]
    F --> G[Código gerado<br/>WholeStageCodegen]
  1. Parsing: o código SQL ou DataFrame é convertido em um plano lógico com referências não resolvidas (nomes de colunas e tabelas ainda não validados)
  2. Analysis: as referências são resolvidas consultando o catálogo de metadados (tipos das colunas, existência das tabelas)
  3. Logical Optimization: o Catalyst aplica regras de otimização determinísticas ao plano lógico
  4. Physical Planning: o plano lógico otimizado é convertido em um ou mais planos físicos concretos. O melhor plano é selecionado pelo Cost Based Optimizer (CBO) com base em estatísticas das tabelas

Otimizações automáticas do Catalyst

OtimizaçãoO que faz
Predicate PushdownMove filtros para o mais perto possível da fonte de dados, lendo menos dados
Column PruningLê apenas as colunas realmente usadas na query, eliminando leitura desnecessária
Constant FoldingAvalia expressões constantes em tempo de compilação (3 * 4 vira 12 no plano)
Join ReorderingReordena joins para processar primeiro as tabelas menores
Broadcast Join ConversionConverte Sort-Merge Join em Broadcast Join quando uma tabela é pequena

Inspecionar planos de execução

O método explain() expõe o plano de execução para diagnóstico e otimização:

df = (
    spark.read.parquet("gs://lake/pedidos/")
    .filter("status = 'ativo'")
    .groupBy("regiao")
    .agg(F.sum("valor").alias("receita"))
)
 
# Plano físico resumido (ponto de partida)
df.explain()
 
# Plano completo: lógico não resolvido, lógico resolvido, otimizado e físico
df.explain("extended")
 
# Formato legível com seções separadas (Spark 3+, recomendado)
df.explain("formatted")
 
# Plano com métricas reais (executa o job)
df.explain("cost")

O que procurar no plano físico:

== Physical Plan ==
AdaptiveSparkPlan (1)                       <- AQE habilitado
+- HashAggregate (2)                        <- aggregation final no Driver side
   +- Exchange hashpartitioning(regiao)     <- SHUFFLE aqui: fronteira de Stage
      +- HashAggregate (3)                  <- aggregation parcial antes do shuffle
         +- Filter (status = ativo)         <- Predicate Pushdown: filtro aplicado cedo
            +- FileScan parquet [...cols]   <- Column Pruning: só lê colunas necessárias
                 PartitionFilters: []
                 PushedFilters: [IsNotNull(status), EqualTo(status,ativo)]
  • Exchange: indica um Shuffle (fronteira de Stage)
  • BroadcastHashJoin ou BroadcastExchange: broadcast join (sem shuffle)
  • SortMergeJoin: join com shuffle nos dois lados (mais custoso)
  • FileScan com PushedFilters: filtros empurrados para a leitura do Parquet

Adaptive Query Execution (AQE)

O AQE reotimiza o plano físico em tempo de execução com base nas estatísticas reais dos dados, disponível desde o Spark 3.0 e habilitado por padrão no Spark 3.2+:

spark.conf.set("spark.sql.adaptive.enabled", "true")

O AQE realiza três tipos de otimização automática:

1. Coalesce de partições após shuffle

Após um shuffle, partições pequenas são mescladas automaticamente para reduzir overhead de scheduling:

# Em vez de criar 200 partições pequenas (padrão)
# o AQE mescla partições vazias ou pequenas em partições de tamanho razoável
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.targetPostShuffleInputSize", "64m")

2. Conversão dinâmica para Broadcast Join

Durante a execução, se uma tabela for menor do que o threshold, o AQE converte um Sort-Merge Join planejado em um Broadcast Join:

spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10m")  # 10 MB

3. Handling de skew automaticamente

O AQE detecta partições muito maiores que as outras e as divide automaticamente, redistribuindo a carga:

spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")

Spark SQL: consultas e views

Qualquer DataFrame pode ser consultado como uma tabela temporária via SQL:

df.createOrReplaceTempView("pedidos")
 
resultado = spark.sql("""
    SELECT
        regiao,
        DATE_FORMAT(data_criacao, 'yyyy-MM') AS ano_mes,
        COUNT(*)                             AS qtd,
        SUM(valor)                           AS receita,
        AVG(valor)                           AS ticket_medio
    FROM pedidos
    WHERE status = 'ativo'
    GROUP BY 1, 2
    HAVING SUM(valor) > 1000
    ORDER BY 1, 2
""")
 
# Views globais: visíveis entre diferentes SparkSessions na mesma aplicação
df.createOrReplaceGlobalTempView("pedidos_global")
spark.sql("SELECT * FROM global_temp.pedidos_global LIMIT 10")

Catálogo e Metastore

O catálogo mantém metadados de tabelas, views e databases. Em ambientes com Hive Metastore (Dataproc, Databricks com Unity Catalog), as tabelas persistem entre sessões:

# Listar databases e tabelas
spark.catalog.listDatabases()
spark.catalog.listTables("meu_database")
 
# Criar tabela gerenciada (dados armazenados no warehouse do Spark/Hive)
spark.sql("""
    CREATE TABLE IF NOT EXISTS meu_db.pedidos (
        pedido_id  STRING,
        cliente_id STRING,
        valor      DOUBLE,
        status     STRING,
        criado_em  TIMESTAMP
    )
    USING delta
    PARTITIONED BY (DATE(criado_em))
""")
 
# Criar tabela externa (dados permanecem no path especificado)
spark.sql("""
    CREATE EXTERNAL TABLE IF NOT EXISTS meu_db.pedidos_ext
    USING parquet
    LOCATION 'gs://lake/silver/pedidos/'
""")
 
# Computar estatísticas para melhorar o CBO
spark.sql("ANALYZE TABLE meu_db.pedidos COMPUTE STATISTICS FOR ALL COLUMNS")

UDFs (User Defined Functions)

Python UDFs (lentas)

Python UDFs são executadas linha por linha com serialização via pickle entre a JVM e o processo Python. Funcionam mas são significativamente mais lentas que funções nativas:

from pyspark.sql.types import StringType
 
@F.udf(returnType=StringType())
def classificar_cpf(cpf: str) -> str:
    if not cpf:
        return "invalido"
    limpo = cpf.replace(".", "").replace("-", "")
    return "valido" if len(limpo) == 11 else "invalido"
 
df = df.withColumn("cpf_status", classificar_cpf(F.col("cpf")))

Pandas UDFs (vetorizadas via Arrow, muito mais rápidas)

Pandas UDFs processam colunas inteiras como pd.Series usando Apache Arrow, eliminando a serialização linha a linha:

from pyspark.sql.functions import pandas_udf
import pandas as pd
 
# UDF sobre uma coluna (Series -> Series)
@pandas_udf(returnType=StringType())
def classificar_cpf_rapido(serie: pd.Series) -> pd.Series:
    return serie.str.replace(r"[.\-]", "", regex=True).str.len().map(
        lambda n: "valido" if n == 11 else "invalido"
    )
 
df = df.withColumn("cpf_status", classificar_cpf_rapido(F.col("cpf")))
 
# UDF agrupada: recebe DataFrame inteiro de um grupo, retorna DataFrame
from pyspark.sql.functions import PandasUDFType
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
 
schema_saida = StructType([
    StructField("cliente_id", StringType()),
    StructField("ltv", DoubleType()),
])
 
@pandas_udf(schema_saida, PandasUDFType.GROUPED_MAP)
def calcular_ltv(df: pd.DataFrame) -> pd.DataFrame:
    return pd.DataFrame({
        "cliente_id": [df["cliente_id"].iloc[0]],
        "ltv": [df["valor"].sum()],
    })
 
resultado = df.groupBy("cliente_id").apply(calcular_ltv)

Boas práticas com UDFs

  • Prefira sempre funções nativas do pyspark.sql.functions antes de criar UDFs
  • Use Pandas UDFs quando precisar de lógica Python que as funções nativas não cobrem
  • Evite Python UDFs convencionais em produção com grandes volumes de dados
  • UDFs em Scala/Java são as mais rápidas por rodar na mesma JVM, mas exigem compilar um JAR

Cost Based Optimizer (CBO)

O CBO usa estatísticas de tabela para escolher a melhor estratégia física. Ele só funciona quando as estatísticas foram coletadas:

# Habilitar CBO
spark.conf.set("spark.sql.cbo.enabled", "true")
spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true")
 
# Coletar estatísticas (necessário antes de o CBO funcionar)
spark.sql("ANALYZE TABLE meu_db.pedidos COMPUTE STATISTICS FOR ALL COLUMNS")
 
# Verificar estatísticas coletadas
spark.sql("DESCRIBE EXTENDED meu_db.pedidos").show(truncate=False)

Ver também: spark | spark-arquitetura | spark-apis | spark-performance | spark-troubleshooting