PySpark é a API Python do Apache Spark. Permite processar datasets que não cabem em uma única máquina, distribuindo o trabalho em um cluster.

Use PySpark quando Pandas ou Polars já não sustentam o volume, geralmente acima de dezenas de GB ou quando há necessidade de processamento distribuído com Big Data.

SparkSession: ponto de entrada

from pyspark.sql import SparkSession
 
spark = (
    SparkSession.builder
    .appName("pipeline-vendas")
    .config("spark.sql.shuffle.partitions", "200")
    .getOrCreate()
)

Em plataformas gerenciadas (Databricks, EMR, Dataproc), a SparkSession já vem disponível como spark.

Leitura e escrita

# Leitura
df = spark.read.parquet("s3://bucket/dados/")
df = spark.read.csv("s3://bucket/arquivo.csv", header=True, inferSchema=True)
df = spark.read.format("delta").load("s3://bucket/delta/tabela/")
df = spark.read.format("jdbc").options(
    url="jdbc:postgresql://host:5432/db",
    dbtable="schema.tabela",
    user="usr", password="pwd"
).load()
 
# Escrita
df.write.parquet("s3://bucket/saida/", mode="overwrite")
df.write.format("delta").mode("append").partitionBy("ano", "mes").save("s3://bucket/delta/")
df.write.format("delta").mode("overwrite").saveAsTable("gold.ltv_clientes")

Operações essenciais

from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
 
# Inspeção
df.printSchema()
df.show(5)
df.count()
 
# Seleção e filtros
df.select("col1", "col2")
df.filter(F.col("status") == "ativo")
df.filter((F.col("valor") > 0) & F.col("data").isNotNull())
 
# Transformações
df = df.withColumn("preco_com_imposto", F.col("preco") * 1.1)
df = df.withColumn("ano", F.year("data"))
df = df.withColumn("nome_upper", F.upper("nome"))
df = df.drop("coluna_desnecessaria")
df = df.dropna(subset=["campo_obrigatorio"])
df = df.fillna({"campo": 0})
 
# Agrupamentos
df.groupBy("categoria").agg(
    F.sum("valor").alias("total"),
    F.count("pedido_id").alias("qtd"),
    F.max("data").alias("ultima_compra"),
)
 
# Joins
df_left.join(df_right, on="chave", how="left")
df_left.join(df_right, df_left["id"] == df_right["cliente_id"], "inner")
 
# Window functions
from pyspark.sql.window import Window
 
w = Window.partitionBy("cliente_id").orderBy("data")
df = df.withColumn("rank_compra", F.rank().over(w))
df = df.withColumn("valor_acumulado", F.sum("valor").over(w))

SQL no PySpark

# Registrar DataFrame como view temporária e usar SQL
df.createOrReplaceTempView("pedidos")
 
resultado = spark.sql("""
    SELECT
        cliente_id,
        COUNT(pedido_id)  AS qtd_pedidos,
        SUM(valor_total)  AS ltv
    FROM pedidos
    WHERE status = 'concluido'
    GROUP BY cliente_id
""")

Conceitos importantes

Lazy evaluation: transformações (filter, select, join) constroem um plano de execução. Só executam com uma action (show, count, write, collect).

Particionamento: Spark divide os dados em partições. O padrão de shuffle.partitions (200) pode ser ajustado conforme o volume.

Broadcast join: quando um dos DataFrames é pequeno, use F.broadcast() para evitar shuffle:

df_grande.join(F.broadcast(df_pequeno), on="chave")

Cuidado com .collect(): traz todos os dados para o driver (memória local). Só usar em datasets pequenos.

PySpark no ecossistema

Ver também: python-engenharia-dados | python-pandas | python-polars | ferramentas-engenharia-dados | big-data