RDD, DataFrame e Dataset

O Spark expõe três níveis de abstração para trabalhar com dados distribuídos:

APISchemaCatalystLinguagensQuando usar
RDDNãoNãoPython, Scala, JavaDados não estruturados, controle fino de particionamento, código legado
DataFrameSimSimPython, Scala, Java, R, SQLPadrão atual para todo uso geral
DatasetSimSimScala, Java apenasQuando segurança de tipo em tempo de compilação é obrigatória

Em Python, DataFrame e Dataset são a mesma coisa. Use sempre DataFrame.

Quando ainda usar RDD

# Dados não tabulares: contagem de palavras em texto livre
rdd = spark.sparkContext.textFile("gs://bucket/logs/*.txt")
palavras = rdd.flatMap(lambda linha: linha.split(" "))
contagem = palavras.map(lambda w: (w, 1)).reduceByKey(lambda a, b: a + b)
top10 = contagem.sortBy(lambda kv: kv[1], ascending=False).take(10)

Fora de casos como esse (texto bruto, grafos, controle de particionamento customizado), prefira sempre o DataFrame.

SparkSession

Ponto de entrada único desde o Spark 2.0. Substitui o SparkContext, SQLContext e HiveContext de versões anteriores:

from pyspark.sql import SparkSession
 
spark = (
    SparkSession.builder
    .appName("meu-pipeline")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.shuffle.partitions", "100")
    .getOrCreate()
)
 
# SparkContext ainda acessível quando necessário (para RDDs, Accumulators, Broadcast)
sc = spark.sparkContext

Em jobs no Dataproc, EMR ou Databricks, a SparkSession já existe e basta chamar SparkSession.builder.getOrCreate() para obtê-la.

Transformações essenciais

Transformações são lazy: constroem o plano sem executar nada.

Seleção e filtros

from pyspark.sql import functions as F
 
df = spark.read.parquet("gs://lake/pedidos/")
 
# select: escolhe colunas, aplica expressões, renomeia
df2 = df.select(
    "pedido_id",
    "cliente_id",
    F.col("valor").alias("valor_total"),
    F.upper("regiao").alias("regiao_upper")
)
 
# selectExpr: expressões SQL como strings, mais compacto
df2 = df.selectExpr(
    "pedido_id",
    "cliente_id",
    "valor * 1.1 as valor_com_imposto",
    "UPPER(regiao) as regiao"
)
 
# filter / where (equivalentes)
df_ativo = df.filter(F.col("status") == "ativo")
df_ativo = df.where("status = 'ativo' AND valor > 0")
 
# Remover colunas
df = df.drop("coluna_desnecessaria", "outra_coluna")
 
# Remover duplicatas
df = df.dropDuplicates(["pedido_id"])
df = df.distinct()  # remove linhas inteiras duplicadas

Colunas novas e transformações

# withColumn: adiciona ou substitui coluna
df = df.withColumn("ano_mes", F.date_format("data_criacao", "yyyy-MM"))
df = df.withColumn("valor_brl", F.col("valor_usd") * F.lit(5.0))
 
# when/otherwise: condicional (equivalente ao CASE WHEN do SQL)
df = df.withColumn(
    "categoria",
    F.when(F.col("valor") > 1000, "alto")
     .when(F.col("valor") > 100,  "medio")
     .otherwise("baixo")
)
 
# Funções de string
df = df.withColumn("nome_upper",  F.upper("nome"))
df = df.withColumn("cpf_limpo",   F.regexp_replace("cpf", r"[.\-]", ""))
df = df.withColumn("dominio",     F.split("email", "@").getItem(1))
 
# Funções de data
df = df.withColumn("data_dt",     F.to_date("data_str", "yyyy-MM-dd"))
df = df.withColumn("dias_atraso", F.datediff(F.current_date(), "data_vencimento"))
df = df.withColumn("mes",         F.month("data_dt"))
df = df.withColumn("ano",         F.year("data_dt"))
 
# Lidar com nulos
df = df.withColumn("valor", F.coalesce("valor", F.lit(0.0)))
df = df.fillna({"valor": 0.0, "status": "desconhecido"})
df = df.dropna(subset=["pedido_id", "cliente_id"])
 
# Explode: transforma array em múltiplas linhas
df_items = df.withColumn("item", F.explode("itens"))
 
# Campos aninhados (struct)
df = df.withColumn("cidade", F.col("endereco.cidade"))

Agregações

# groupBy + agg
resultado = (
    df
    .groupBy("regiao", "ano_mes")
    .agg(
        F.count("*").alias("qtd_pedidos"),
        F.sum("valor").alias("receita_total"),
        F.avg("valor").alias("ticket_medio"),
        F.countDistinct("cliente_id").alias("clientes_unicos"),
        F.max("valor").alias("maior_pedido"),
        F.percentile_approx("valor", 0.5).alias("mediana_valor"),
    )
)
 
# pivot: transforma valores de uma coluna em colunas
pivot = (
    df
    .groupBy("cliente_id")
    .pivot("regiao", ["norte", "sul", "sudeste"])
    .sum("valor")
)

Window Functions

Window functions aplicam cálculos sobre um grupo de linhas relacionadas sem reduzir o número de linhas:

from pyspark.sql.window import Window
 
# Janela particionada por cliente, ordenada por data
janela_cliente = (
    Window
    .partitionBy("cliente_id")
    .orderBy("data_criacao")
)
 
# Row number, rank e dense_rank
df = df.withColumn("num_pedido",    F.row_number().over(janela_cliente))
df = df.withColumn("rank_valor",    F.rank().over(
    Window.partitionBy("regiao").orderBy(F.col("valor").desc())
))
 
# Agregação acumulada dentro da partição
df = df.withColumn("valor_acumulado", F.sum("valor").over(
    janela_cliente.rowsBetween(Window.unboundedPreceding, Window.currentRow)
))
 
# Valor da linha anterior / próxima linha
df = df.withColumn("valor_anterior", F.lag("valor",  1).over(janela_cliente))
df = df.withColumn("proximo_valor",  F.lead("valor", 1).over(janela_cliente))
 
# Valor máximo dentro de uma janela deslizante de 7 linhas
df = df.withColumn("max_7dias", F.max("valor").over(
    janela_cliente.rowsBetween(-6, 0)
))

Joins

# Inner join (padrão)
resultado = pedidos.join(clientes, on="cliente_id", how="inner")
 
# Tipos disponíveis: "inner", "left", "right", "full", "left_semi", "left_anti", "cross"
 
# left_semi: mantém linhas do lado esquerdo que têm correspondência no direito (sem colunas do direito)
pedidos_com_pagamento = pedidos.join(pagamentos, on="pedido_id", how="left_semi")
 
# left_anti: mantém linhas do lado esquerdo que NÃO têm correspondência
pedidos_sem_pagamento = pedidos.join(pagamentos, on="pedido_id", how="left_anti")
 
# Join com condição explícita (quando nomes de colunas diferem)
resultado = pedidos.join(
    produtos,
    pedidos.produto_id == produtos.id,
    how="left"
).drop(produtos.id)  # evitar coluna duplicada
 
# Broadcast join: força envio da tabela pequena para todos os Executors (sem shuffle)
resultado = pedidos.join(
    F.broadcast(municipios),
    on="municipio_id"
)

Ver spark-performance para quando e como usar broadcast join e os trade-offs de cada estratégia.

Reparticionamento

# repartition: aumenta ou diminui com shuffle completo
df = df.repartition(200)
df = df.repartition(50, "regiao")  # distribui por coluna (útil antes de writes particionados)
 
# coalesce: apenas reduz, sem shuffle (mais rápido que repartition para diminuir)
df = df.coalesce(10)
 
# Conferir número atual de partições
print(df.rdd.getNumPartitions())

Ações

Ações disparam a execução do plano e retornam resultados ao Driver:

df.show(20, truncate=False)              # exibe no console (Action, executa o job)
df.printSchema()                         # mostra schema (não é Action, não executa)
df.count()                               # contagem total
df.take(5)                               # retorna lista com N primeiras linhas
df.first()                               # retorna a primeira linha
df.collect()                             # retorna TODAS as linhas para o Driver (risco de OOM)
df.describe("valor", "idade").show()     # estatísticas descritivas básicas
df.summary().show()                      # estatísticas completas incluindo percentis
 
# Checar schema sem executar o job
df.schema
df.dtypes
df.columns

Leitura de dados

# Parquet (formato recomendado para data lake)
df = spark.read.parquet("gs://lake/pedidos/")
 
# Delta Lake (ACID + time travel)
df = spark.read.format("delta").load("gs://lake/silver/pedidos/")
df_ontem = spark.read.format("delta") \
    .option("versionAsOf", 5) \
    .load("gs://lake/silver/pedidos/")
 
# JSON com schema explícito (evitar inferSchema em produção: lento e impreciso)
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
 
schema = StructType([
    StructField("pedido_id",  StringType(),    nullable=False),
    StructField("valor",      DoubleType(),    nullable=True),
    StructField("status",     StringType(),    nullable=True),
    StructField("criado_em",  TimestampType(), nullable=True),
])
df = spark.read.schema(schema).json("gs://lake/raw/eventos/")
 
# CSV
df = (
    spark.read
    .option("header",      "true")
    .option("sep",         ";")
    .option("encoding",    "UTF-8")
    .option("inferSchema", "false")
    .schema(schema)
    .csv("gs://lake/raw/arquivo.csv")
)
 
# JDBC com paralelismo (leitura particionada por coluna)
df = (
    spark.read
    .format("jdbc")
    .option("url",             "jdbc:postgresql://host:5432/banco")
    .option("dbtable",         "schema.tabela")
    .option("user",            "usuario")
    .option("password",        "senha")
    .option("driver",          "org.postgresql.Driver")
    .option("numPartitions",   "10")
    .option("partitionColumn", "id")
    .option("lowerBound",      "1")
    .option("upperBound",      "1000000")
    .load()
)
 
# BigQuery (conector GCP)
df = (
    spark.read
    .format("bigquery")
    .option("table", "projeto.dataset.tabela")
    .load()
)

Escrita de dados

# Parquet com particionamento por coluna
(
    df.write
    .mode("overwrite")
    .partitionBy("ano", "mes")
    .parquet("gs://lake/silver/pedidos/")
)
 
# Delta Lake: append com schema evolution
(
    df.write
    .format("delta")
    .mode("append")
    .option("mergeSchema", "true")
    .partitionBy("ano", "mes")
    .save("gs://lake/silver/pedidos/")
)
 
# Delta Lake: upsert (merge)
from delta.tables import DeltaTable
 
destino = DeltaTable.forPath(spark, "gs://lake/silver/pedidos/")
(
    destino.alias("d")
    .merge(df.alias("s"), "d.pedido_id = s.pedido_id")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)
 
# BigQuery
(
    df.write
    .format("bigquery")
    .option("table", "projeto.dataset.tabela")
    .mode("overwrite")
    .save()
)
 
# Modos de escrita disponíveis
# "overwrite": substitui todos os dados existentes
# "append":    adiciona ao existente sem sobrescrever
# "ignore":    não faz nada se o destino já existe
# "error":     falha com exceção se o destino já existe (padrão)

Ver também: spark | spark-arquitetura | spark-sql | spark-performance