RDD, DataFrame e Dataset
O Spark expõe três níveis de abstração para trabalhar com dados distribuídos:
| API | Schema | Catalyst | Linguagens | Quando usar |
|---|---|---|---|---|
| RDD | Não | Não | Python, Scala, Java | Dados não estruturados, controle fino de particionamento, código legado |
| DataFrame | Sim | Sim | Python, Scala, Java, R, SQL | Padrão atual para todo uso geral |
| Dataset | Sim | Sim | Scala, Java apenas | Quando segurança de tipo em tempo de compilação é obrigatória |
Em Python, DataFrame e Dataset são a mesma coisa. Use sempre DataFrame.
Quando ainda usar RDD
# Dados não tabulares: contagem de palavras em texto livre
rdd = spark.sparkContext.textFile("gs://bucket/logs/*.txt")
palavras = rdd.flatMap(lambda linha: linha.split(" "))
contagem = palavras.map(lambda w: (w, 1)).reduceByKey(lambda a, b: a + b)
top10 = contagem.sortBy(lambda kv: kv[1], ascending=False).take(10)Fora de casos como esse (texto bruto, grafos, controle de particionamento customizado), prefira sempre o DataFrame.
SparkSession
Ponto de entrada único desde o Spark 2.0. Substitui o SparkContext, SQLContext e HiveContext de versões anteriores:
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("meu-pipeline")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.shuffle.partitions", "100")
.getOrCreate()
)
# SparkContext ainda acessível quando necessário (para RDDs, Accumulators, Broadcast)
sc = spark.sparkContextEm jobs no Dataproc, EMR ou Databricks, a SparkSession já existe e basta chamar SparkSession.builder.getOrCreate() para obtê-la.
Transformações essenciais
Transformações são lazy: constroem o plano sem executar nada.
Seleção e filtros
from pyspark.sql import functions as F
df = spark.read.parquet("gs://lake/pedidos/")
# select: escolhe colunas, aplica expressões, renomeia
df2 = df.select(
"pedido_id",
"cliente_id",
F.col("valor").alias("valor_total"),
F.upper("regiao").alias("regiao_upper")
)
# selectExpr: expressões SQL como strings, mais compacto
df2 = df.selectExpr(
"pedido_id",
"cliente_id",
"valor * 1.1 as valor_com_imposto",
"UPPER(regiao) as regiao"
)
# filter / where (equivalentes)
df_ativo = df.filter(F.col("status") == "ativo")
df_ativo = df.where("status = 'ativo' AND valor > 0")
# Remover colunas
df = df.drop("coluna_desnecessaria", "outra_coluna")
# Remover duplicatas
df = df.dropDuplicates(["pedido_id"])
df = df.distinct() # remove linhas inteiras duplicadasColunas novas e transformações
# withColumn: adiciona ou substitui coluna
df = df.withColumn("ano_mes", F.date_format("data_criacao", "yyyy-MM"))
df = df.withColumn("valor_brl", F.col("valor_usd") * F.lit(5.0))
# when/otherwise: condicional (equivalente ao CASE WHEN do SQL)
df = df.withColumn(
"categoria",
F.when(F.col("valor") > 1000, "alto")
.when(F.col("valor") > 100, "medio")
.otherwise("baixo")
)
# Funções de string
df = df.withColumn("nome_upper", F.upper("nome"))
df = df.withColumn("cpf_limpo", F.regexp_replace("cpf", r"[.\-]", ""))
df = df.withColumn("dominio", F.split("email", "@").getItem(1))
# Funções de data
df = df.withColumn("data_dt", F.to_date("data_str", "yyyy-MM-dd"))
df = df.withColumn("dias_atraso", F.datediff(F.current_date(), "data_vencimento"))
df = df.withColumn("mes", F.month("data_dt"))
df = df.withColumn("ano", F.year("data_dt"))
# Lidar com nulos
df = df.withColumn("valor", F.coalesce("valor", F.lit(0.0)))
df = df.fillna({"valor": 0.0, "status": "desconhecido"})
df = df.dropna(subset=["pedido_id", "cliente_id"])
# Explode: transforma array em múltiplas linhas
df_items = df.withColumn("item", F.explode("itens"))
# Campos aninhados (struct)
df = df.withColumn("cidade", F.col("endereco.cidade"))Agregações
# groupBy + agg
resultado = (
df
.groupBy("regiao", "ano_mes")
.agg(
F.count("*").alias("qtd_pedidos"),
F.sum("valor").alias("receita_total"),
F.avg("valor").alias("ticket_medio"),
F.countDistinct("cliente_id").alias("clientes_unicos"),
F.max("valor").alias("maior_pedido"),
F.percentile_approx("valor", 0.5).alias("mediana_valor"),
)
)
# pivot: transforma valores de uma coluna em colunas
pivot = (
df
.groupBy("cliente_id")
.pivot("regiao", ["norte", "sul", "sudeste"])
.sum("valor")
)Window Functions
Window functions aplicam cálculos sobre um grupo de linhas relacionadas sem reduzir o número de linhas:
from pyspark.sql.window import Window
# Janela particionada por cliente, ordenada por data
janela_cliente = (
Window
.partitionBy("cliente_id")
.orderBy("data_criacao")
)
# Row number, rank e dense_rank
df = df.withColumn("num_pedido", F.row_number().over(janela_cliente))
df = df.withColumn("rank_valor", F.rank().over(
Window.partitionBy("regiao").orderBy(F.col("valor").desc())
))
# Agregação acumulada dentro da partição
df = df.withColumn("valor_acumulado", F.sum("valor").over(
janela_cliente.rowsBetween(Window.unboundedPreceding, Window.currentRow)
))
# Valor da linha anterior / próxima linha
df = df.withColumn("valor_anterior", F.lag("valor", 1).over(janela_cliente))
df = df.withColumn("proximo_valor", F.lead("valor", 1).over(janela_cliente))
# Valor máximo dentro de uma janela deslizante de 7 linhas
df = df.withColumn("max_7dias", F.max("valor").over(
janela_cliente.rowsBetween(-6, 0)
))Joins
# Inner join (padrão)
resultado = pedidos.join(clientes, on="cliente_id", how="inner")
# Tipos disponíveis: "inner", "left", "right", "full", "left_semi", "left_anti", "cross"
# left_semi: mantém linhas do lado esquerdo que têm correspondência no direito (sem colunas do direito)
pedidos_com_pagamento = pedidos.join(pagamentos, on="pedido_id", how="left_semi")
# left_anti: mantém linhas do lado esquerdo que NÃO têm correspondência
pedidos_sem_pagamento = pedidos.join(pagamentos, on="pedido_id", how="left_anti")
# Join com condição explícita (quando nomes de colunas diferem)
resultado = pedidos.join(
produtos,
pedidos.produto_id == produtos.id,
how="left"
).drop(produtos.id) # evitar coluna duplicada
# Broadcast join: força envio da tabela pequena para todos os Executors (sem shuffle)
resultado = pedidos.join(
F.broadcast(municipios),
on="municipio_id"
)Ver spark-performance para quando e como usar broadcast join e os trade-offs de cada estratégia.
Reparticionamento
# repartition: aumenta ou diminui com shuffle completo
df = df.repartition(200)
df = df.repartition(50, "regiao") # distribui por coluna (útil antes de writes particionados)
# coalesce: apenas reduz, sem shuffle (mais rápido que repartition para diminuir)
df = df.coalesce(10)
# Conferir número atual de partições
print(df.rdd.getNumPartitions())Ações
Ações disparam a execução do plano e retornam resultados ao Driver:
df.show(20, truncate=False) # exibe no console (Action, executa o job)
df.printSchema() # mostra schema (não é Action, não executa)
df.count() # contagem total
df.take(5) # retorna lista com N primeiras linhas
df.first() # retorna a primeira linha
df.collect() # retorna TODAS as linhas para o Driver (risco de OOM)
df.describe("valor", "idade").show() # estatísticas descritivas básicas
df.summary().show() # estatísticas completas incluindo percentis
# Checar schema sem executar o job
df.schema
df.dtypes
df.columnsLeitura de dados
# Parquet (formato recomendado para data lake)
df = spark.read.parquet("gs://lake/pedidos/")
# Delta Lake (ACID + time travel)
df = spark.read.format("delta").load("gs://lake/silver/pedidos/")
df_ontem = spark.read.format("delta") \
.option("versionAsOf", 5) \
.load("gs://lake/silver/pedidos/")
# JSON com schema explícito (evitar inferSchema em produção: lento e impreciso)
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
schema = StructType([
StructField("pedido_id", StringType(), nullable=False),
StructField("valor", DoubleType(), nullable=True),
StructField("status", StringType(), nullable=True),
StructField("criado_em", TimestampType(), nullable=True),
])
df = spark.read.schema(schema).json("gs://lake/raw/eventos/")
# CSV
df = (
spark.read
.option("header", "true")
.option("sep", ";")
.option("encoding", "UTF-8")
.option("inferSchema", "false")
.schema(schema)
.csv("gs://lake/raw/arquivo.csv")
)
# JDBC com paralelismo (leitura particionada por coluna)
df = (
spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://host:5432/banco")
.option("dbtable", "schema.tabela")
.option("user", "usuario")
.option("password", "senha")
.option("driver", "org.postgresql.Driver")
.option("numPartitions", "10")
.option("partitionColumn", "id")
.option("lowerBound", "1")
.option("upperBound", "1000000")
.load()
)
# BigQuery (conector GCP)
df = (
spark.read
.format("bigquery")
.option("table", "projeto.dataset.tabela")
.load()
)Escrita de dados
# Parquet com particionamento por coluna
(
df.write
.mode("overwrite")
.partitionBy("ano", "mes")
.parquet("gs://lake/silver/pedidos/")
)
# Delta Lake: append com schema evolution
(
df.write
.format("delta")
.mode("append")
.option("mergeSchema", "true")
.partitionBy("ano", "mes")
.save("gs://lake/silver/pedidos/")
)
# Delta Lake: upsert (merge)
from delta.tables import DeltaTable
destino = DeltaTable.forPath(spark, "gs://lake/silver/pedidos/")
(
destino.alias("d")
.merge(df.alias("s"), "d.pedido_id = s.pedido_id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# BigQuery
(
df.write
.format("bigquery")
.option("table", "projeto.dataset.tabela")
.mode("overwrite")
.save()
)
# Modos de escrita disponíveis
# "overwrite": substitui todos os dados existentes
# "append": adiciona ao existente sem sobrescrever
# "ignore": não faz nada se o destino já existe
# "error": falha com exceção se o destino já existe (padrão)Ver também: spark | spark-arquitetura | spark-sql | spark-performance