Spark SQL é o módulo do Spark responsável por processar dados estruturados via SQL ou via DataFrame API. Por baixo, o Catalyst Optimizer transforma o código escrito pelo usuário em um plano de execução otimizado antes de qualquer dado ser processado.
Catalyst Optimizer
O Catalyst converte o código da aplicação em código executável em quatro passos:
graph LR A[Código do<br/>usuário<br/>SQL ou DataFrame] --> B[Unresolved<br/>Logical Plan] B --> C[Resolved<br/>Logical Plan<br/>Catálogo] C --> D[Optimized<br/>Logical Plan<br/>regras de otimização] D --> E[Physical<br/>Plans<br/>várias estratégias] E --> F[Selected<br/>Physical Plan<br/>baseado em custo] F --> G[Código gerado<br/>WholeStageCodegen]
- Parsing: o código SQL ou DataFrame é convertido em um plano lógico com referências não resolvidas (nomes de colunas e tabelas ainda não validados)
- Analysis: as referências são resolvidas consultando o catálogo de metadados (tipos das colunas, existência das tabelas)
- Logical Optimization: o Catalyst aplica regras de otimização determinísticas ao plano lógico
- Physical Planning: o plano lógico otimizado é convertido em um ou mais planos físicos concretos. O melhor plano é selecionado pelo Cost Based Optimizer (CBO) com base em estatísticas das tabelas
Otimizações automáticas do Catalyst
| Otimização | O que faz |
|---|---|
| Predicate Pushdown | Move filtros para o mais perto possível da fonte de dados, lendo menos dados |
| Column Pruning | Lê apenas as colunas realmente usadas na query, eliminando leitura desnecessária |
| Constant Folding | Avalia expressões constantes em tempo de compilação (3 * 4 vira 12 no plano) |
| Join Reordering | Reordena joins para processar primeiro as tabelas menores |
| Broadcast Join Conversion | Converte Sort-Merge Join em Broadcast Join quando uma tabela é pequena |
Inspecionar planos de execução
O método explain() expõe o plano de execução para diagnóstico e otimização:
df = (
spark.read.parquet("gs://lake/pedidos/")
.filter("status = 'ativo'")
.groupBy("regiao")
.agg(F.sum("valor").alias("receita"))
)
# Plano físico resumido (ponto de partida)
df.explain()
# Plano completo: lógico não resolvido, lógico resolvido, otimizado e físico
df.explain("extended")
# Formato legível com seções separadas (Spark 3+, recomendado)
df.explain("formatted")
# Plano com métricas reais (executa o job)
df.explain("cost")O que procurar no plano físico:
== Physical Plan ==
AdaptiveSparkPlan (1) <- AQE habilitado
+- HashAggregate (2) <- aggregation final no Driver side
+- Exchange hashpartitioning(regiao) <- SHUFFLE aqui: fronteira de Stage
+- HashAggregate (3) <- aggregation parcial antes do shuffle
+- Filter (status = ativo) <- Predicate Pushdown: filtro aplicado cedo
+- FileScan parquet [...cols] <- Column Pruning: só lê colunas necessárias
PartitionFilters: []
PushedFilters: [IsNotNull(status), EqualTo(status,ativo)]
Exchange: indica um Shuffle (fronteira de Stage)BroadcastHashJoinouBroadcastExchange: broadcast join (sem shuffle)SortMergeJoin: join com shuffle nos dois lados (mais custoso)FileScancomPushedFilters: filtros empurrados para a leitura do Parquet
Adaptive Query Execution (AQE)
O AQE reotimiza o plano físico em tempo de execução com base nas estatísticas reais dos dados, disponível desde o Spark 3.0 e habilitado por padrão no Spark 3.2+:
spark.conf.set("spark.sql.adaptive.enabled", "true")O AQE realiza três tipos de otimização automática:
1. Coalesce de partições após shuffle
Após um shuffle, partições pequenas são mescladas automaticamente para reduzir overhead de scheduling:
# Em vez de criar 200 partições pequenas (padrão)
# o AQE mescla partições vazias ou pequenas em partições de tamanho razoável
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.targetPostShuffleInputSize", "64m")2. Conversão dinâmica para Broadcast Join
Durante a execução, se uma tabela for menor do que o threshold, o AQE converte um Sort-Merge Join planejado em um Broadcast Join:
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10m") # 10 MB3. Handling de skew automaticamente
O AQE detecta partições muito maiores que as outras e as divide automaticamente, redistribuindo a carga:
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")Spark SQL: consultas e views
Qualquer DataFrame pode ser consultado como uma tabela temporária via SQL:
df.createOrReplaceTempView("pedidos")
resultado = spark.sql("""
SELECT
regiao,
DATE_FORMAT(data_criacao, 'yyyy-MM') AS ano_mes,
COUNT(*) AS qtd,
SUM(valor) AS receita,
AVG(valor) AS ticket_medio
FROM pedidos
WHERE status = 'ativo'
GROUP BY 1, 2
HAVING SUM(valor) > 1000
ORDER BY 1, 2
""")
# Views globais: visíveis entre diferentes SparkSessions na mesma aplicação
df.createOrReplaceGlobalTempView("pedidos_global")
spark.sql("SELECT * FROM global_temp.pedidos_global LIMIT 10")Catálogo e Metastore
O catálogo mantém metadados de tabelas, views e databases. Em ambientes com Hive Metastore (Dataproc, Databricks com Unity Catalog), as tabelas persistem entre sessões:
# Listar databases e tabelas
spark.catalog.listDatabases()
spark.catalog.listTables("meu_database")
# Criar tabela gerenciada (dados armazenados no warehouse do Spark/Hive)
spark.sql("""
CREATE TABLE IF NOT EXISTS meu_db.pedidos (
pedido_id STRING,
cliente_id STRING,
valor DOUBLE,
status STRING,
criado_em TIMESTAMP
)
USING delta
PARTITIONED BY (DATE(criado_em))
""")
# Criar tabela externa (dados permanecem no path especificado)
spark.sql("""
CREATE EXTERNAL TABLE IF NOT EXISTS meu_db.pedidos_ext
USING parquet
LOCATION 'gs://lake/silver/pedidos/'
""")
# Computar estatísticas para melhorar o CBO
spark.sql("ANALYZE TABLE meu_db.pedidos COMPUTE STATISTICS FOR ALL COLUMNS")UDFs (User Defined Functions)
Python UDFs (lentas)
Python UDFs são executadas linha por linha com serialização via pickle entre a JVM e o processo Python. Funcionam mas são significativamente mais lentas que funções nativas:
from pyspark.sql.types import StringType
@F.udf(returnType=StringType())
def classificar_cpf(cpf: str) -> str:
if not cpf:
return "invalido"
limpo = cpf.replace(".", "").replace("-", "")
return "valido" if len(limpo) == 11 else "invalido"
df = df.withColumn("cpf_status", classificar_cpf(F.col("cpf")))Pandas UDFs (vetorizadas via Arrow, muito mais rápidas)
Pandas UDFs processam colunas inteiras como pd.Series usando Apache Arrow, eliminando a serialização linha a linha:
from pyspark.sql.functions import pandas_udf
import pandas as pd
# UDF sobre uma coluna (Series -> Series)
@pandas_udf(returnType=StringType())
def classificar_cpf_rapido(serie: pd.Series) -> pd.Series:
return serie.str.replace(r"[.\-]", "", regex=True).str.len().map(
lambda n: "valido" if n == 11 else "invalido"
)
df = df.withColumn("cpf_status", classificar_cpf_rapido(F.col("cpf")))
# UDF agrupada: recebe DataFrame inteiro de um grupo, retorna DataFrame
from pyspark.sql.functions import PandasUDFType
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
schema_saida = StructType([
StructField("cliente_id", StringType()),
StructField("ltv", DoubleType()),
])
@pandas_udf(schema_saida, PandasUDFType.GROUPED_MAP)
def calcular_ltv(df: pd.DataFrame) -> pd.DataFrame:
return pd.DataFrame({
"cliente_id": [df["cliente_id"].iloc[0]],
"ltv": [df["valor"].sum()],
})
resultado = df.groupBy("cliente_id").apply(calcular_ltv)Boas práticas com UDFs
- Prefira sempre funções nativas do
pyspark.sql.functionsantes de criar UDFs - Use Pandas UDFs quando precisar de lógica Python que as funções nativas não cobrem
- Evite Python UDFs convencionais em produção com grandes volumes de dados
- UDFs em Scala/Java são as mais rápidas por rodar na mesma JVM, mas exigem compilar um JAR
Cost Based Optimizer (CBO)
O CBO usa estatísticas de tabela para escolher a melhor estratégia física. Ele só funciona quando as estatísticas foram coletadas:
# Habilitar CBO
spark.conf.set("spark.sql.cbo.enabled", "true")
spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true")
# Coletar estatísticas (necessário antes de o CBO funcionar)
spark.sql("ANALYZE TABLE meu_db.pedidos COMPUTE STATISTICS FOR ALL COLUMNS")
# Verificar estatísticas coletadas
spark.sql("DESCRIBE EXTENDED meu_db.pedidos").show(truncate=False)Ver também: spark | spark-arquitetura | spark-apis | spark-performance | spark-troubleshooting