Qualidade de dados em Python pode ser garantida em diferentes pontos do pipeline-de-dados: na borda da ingestão (validação de schema e valores), após transformações (contagens, nulos, unicidade) e no destino (freshness, consistência).
Pydantic: validação de contratos de entrada
Pydantic valida estruturas de dados Python com tipagem. Ideal para validar payloads de APIs, eventos Kafka, e configurações de pipeline.
from pydantic import BaseModel, field_validator
from datetime import date
from typing import Optional
class Pedido(BaseModel):
pedido_id: int
cliente_id: int
valor_total: float
data_pedido: date
status: str
desconto: Optional[float] = None
@field_validator("status")
@classmethod
def status_valido(cls, v: str) -> str:
allowed = {"pendente", "concluido", "cancelado"}
if v not in allowed:
raise ValueError(f"status deve ser um de {allowed}")
return v
@field_validator("valor_total")
@classmethod
def valor_positivo(cls, v: float) -> float:
if v < 0:
raise ValueError("valor_total não pode ser negativo")
return v
# Uso
pedido = Pedido(**payload_dict) # lança ValidationError se inválido
pedidos = [Pedido(**p) for p in lista] # valida uma listaPandera: validação de DataFrames
Pandera adiciona validação tipada a DataFrames Pandas e Polars. Ótimo para validar dados após leitura de arquivos ou antes de carregar em banco.
import pandera as pa
from pandera import Column, DataFrameSchema, Check
schema = DataFrameSchema({
"pedido_id": Column(int, Check.greater_than(0), nullable=False),
"cliente_id": Column(int, nullable=False),
"valor_total": Column(float, Check.greater_than_or_equal_to(0)),
"status": Column(str, Check.isin(["pendente", "concluido", "cancelado"])),
"data_pedido": Column("datetime64[ns]"),
})
# Lança SchemaError se o DataFrame não passa
df_validado = schema.validate(df)Com a API de classes (mais expressiva):
from pandera.typing import DataFrame, Series
class PedidoSchema(pa.DataFrameModel):
pedido_id: Series[int] = pa.Field(gt=0)
cliente_id: Series[int]
valor_total: Series[float] = pa.Field(ge=0)
status: Series[str] = pa.Field(isin=["pendente", "concluido", "cancelado"])
class Config:
coerce = True # tenta converter tipos automaticamente
@pa.check_types
def transformar(df: DataFrame[PedidoSchema]) -> DataFrame[PedidoSchema]:
return dfGreat Expectations: testes de qualidade em escala
Great Expectations (GX) é a ferramenta mais completa para qualidade de dados em produção. Define Expectations (expectativas sobre os dados) que são executadas e geram relatórios.
import great_expectations as gx
context = gx.get_context()
# Criar suite de expectativas
suite = context.add_expectation_suite("pedidos_suite")
validator = context.get_validator(batch_request=..., expectation_suite_name="pedidos_suite")
# Definir expectativas
validator.expect_column_values_to_not_be_null("pedido_id")
validator.expect_column_values_to_be_unique("pedido_id")
validator.expect_column_values_to_be_in_set("status", ["pendente", "concluido", "cancelado"])
validator.expect_column_values_to_be_between("valor_total", min_value=0)
validator.expect_table_row_count_to_be_between(min_value=1)
validator.save_expectation_suite()
# Executar e gerar relatório HTML
results = context.run_checkpoint(checkpoint_name="pedidos_checkpoint")Quando usar cada ferramenta
| Ferramenta | Melhor para |
|---|---|
| Pydantic | Validação de eventos/payloads individuais, contratos de API, configs |
| Pandera | Validação de DataFrames no código de pipeline, contratos entre funções |
| Great Expectations | Qualidade em produção, relatórios, rastreamento de anomalias ao longo do tempo |
Padrão prático num pipeline
# 1. Ler dados
df = pd.read_parquet("s3://bucket/raw/pedidos/")
# 2. Validar schema e valores antes de transformar
schema.validate(df) # Pandera: explode se dados inválidos
# 3. Transformar
df_clean = transformar(df)
# 4. Carregar
df_clean.to_sql(...)
# 5. GX verifica o destino (rodado por orquestrador)Ver também: python-engenharia-dados | python-pandas | python-polars | pipeline-de-dados