PySpark é a API Python do Apache Spark. Permite processar datasets que não cabem em uma única máquina, distribuindo o trabalho em um cluster.
Use PySpark quando Pandas ou Polars já não sustentam o volume, geralmente acima de dezenas de GB ou quando há necessidade de processamento distribuído com Big Data.
SparkSession: ponto de entrada
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("pipeline-vendas")
.config("spark.sql.shuffle.partitions", "200")
.getOrCreate()
)Em plataformas gerenciadas (Databricks, EMR, Dataproc), a SparkSession já vem disponível como spark.
Leitura e escrita
# Leitura
df = spark.read.parquet("s3://bucket/dados/")
df = spark.read.csv("s3://bucket/arquivo.csv", header=True, inferSchema=True)
df = spark.read.format("delta").load("s3://bucket/delta/tabela/")
df = spark.read.format("jdbc").options(
url="jdbc:postgresql://host:5432/db",
dbtable="schema.tabela",
user="usr", password="pwd"
).load()
# Escrita
df.write.parquet("s3://bucket/saida/", mode="overwrite")
df.write.format("delta").mode("append").partitionBy("ano", "mes").save("s3://bucket/delta/")
df.write.format("delta").mode("overwrite").saveAsTable("gold.ltv_clientes")Operações essenciais
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
# Inspeção
df.printSchema()
df.show(5)
df.count()
# Seleção e filtros
df.select("col1", "col2")
df.filter(F.col("status") == "ativo")
df.filter((F.col("valor") > 0) & F.col("data").isNotNull())
# Transformações
df = df.withColumn("preco_com_imposto", F.col("preco") * 1.1)
df = df.withColumn("ano", F.year("data"))
df = df.withColumn("nome_upper", F.upper("nome"))
df = df.drop("coluna_desnecessaria")
df = df.dropna(subset=["campo_obrigatorio"])
df = df.fillna({"campo": 0})
# Agrupamentos
df.groupBy("categoria").agg(
F.sum("valor").alias("total"),
F.count("pedido_id").alias("qtd"),
F.max("data").alias("ultima_compra"),
)
# Joins
df_left.join(df_right, on="chave", how="left")
df_left.join(df_right, df_left["id"] == df_right["cliente_id"], "inner")
# Window functions
from pyspark.sql.window import Window
w = Window.partitionBy("cliente_id").orderBy("data")
df = df.withColumn("rank_compra", F.rank().over(w))
df = df.withColumn("valor_acumulado", F.sum("valor").over(w))SQL no PySpark
# Registrar DataFrame como view temporária e usar SQL
df.createOrReplaceTempView("pedidos")
resultado = spark.sql("""
SELECT
cliente_id,
COUNT(pedido_id) AS qtd_pedidos,
SUM(valor_total) AS ltv
FROM pedidos
WHERE status = 'concluido'
GROUP BY cliente_id
""")Conceitos importantes
Lazy evaluation: transformações (filter, select, join) constroem um plano de execução. Só executam com uma action (show, count, write, collect).
Particionamento: Spark divide os dados em partições. O padrão de shuffle.partitions (200) pode ser ajustado conforme o volume.
Broadcast join: quando um dos DataFrames é pequeno, use F.broadcast() para evitar shuffle:
df_grande.join(F.broadcast(df_pequeno), on="chave")Cuidado com .collect(): traz todos os dados para o driver (memória local). Só usar em datasets pequenos.
PySpark no ecossistema
- Roda em Databricks (ver databricks), AWS EMR, Google Dataproc, Azure HDInsight, ou standalone
- Integra nativamente com Delta Lake (ver databricks-delta-lake), Iceberg, S3/GCS/ADLS
- Orquestrado via Apache Airflow (
SparkSubmitOperator) ou Databricks Jobs
Ver também: python-engenharia-dados | python-pandas | python-polars | ferramentas-engenharia-dados | big-data