Conectar Python a bancos de dados é essencial para ingestão e carga de dados em pipelines. Existem duas camadas principais: drivers nativos (baixo nível) e abstrações (SQLAlchemy).
Camadas de conexão
Python app
└── SQLAlchemy (abstração / ORM)
└── psycopg2 / pyodbc / cx_Oracle / pymysql (driver nativo)
└── Banco de dados
Regra geral: use SQLAlchemy para portabilidade e integração com Pandas/Polars. Use o driver nativo diretamente quando precisar de controle fino ou máxima performance em operações bulk.
SQLAlchemy: conexão e execução SQL
from sqlalchemy import create_engine, text
# Connection strings por banco
engine_pg = create_engine("postgresql+psycopg2://user:pwd@host:5432/db")
engine_mssql = create_engine("mssql+pyodbc://user:pwd@host/db?driver=ODBC+Driver+18+for+SQL+Server")
engine_mysql = create_engine("mysql+pymysql://user:pwd@host:3306/db")
engine_sqlite = create_engine("sqlite:///local.db")
# Execução de queries
with engine_pg.connect() as conn:
resultado = conn.execute(text("SELECT * FROM schema.tabela WHERE status = :status"), {"status": "ativo"})
linhas = resultado.fetchall()
# Transação explícita
with engine_pg.begin() as conn: # commit automático ao sair sem exceção
conn.execute(text("UPDATE tabela SET status = 'processado' WHERE id = :id"), {"id": 1})Integração com Pandas e Polars
import pandas as pd
import polars as pl
# Pandas ↔ banco
df = pd.read_sql("SELECT * FROM schema.tabela", con=engine)
df.to_sql("tabela_destino", con=engine, schema="schema", if_exists="append", index=False)
# Polars ↔ banco (via connection string ADBC ou Pandas intermediário)
df_pl = pl.read_database_uri(
"SELECT * FROM schema.tabela",
uri="postgresql://user:pwd@host:5432/db"
)Carga bulk: alta performance
Para carregar grandes volumes, to_sql do Pandas é lento (INSERT linha a linha). Alternativas:
import io
import psycopg2
# PostgreSQL: COPY via psycopg2 (muito mais rápido)
conn = psycopg2.connect("postgresql://user:pwd@host:5432/db")
cur = conn.cursor()
buffer = io.StringIO()
df.to_csv(buffer, index=False, header=False)
buffer.seek(0)
cur.copy_expert(
"COPY schema.tabela (col1, col2, col3) FROM STDIN WITH CSV",
buffer
)
conn.commit()# SQL Server: fast_executemany
engine_mssql = create_engine(
"mssql+pyodbc://...",
fast_executemany=True # usa batch insert do driver ODBC
)
df.to_sql("tabela", con=engine_mssql, if_exists="append", index=False)Variáveis de ambiente para credenciais
Nunca hardcodar senhas. Usar variáveis de ambiente ou secret managers:
import os
from sqlalchemy import create_engine
DB_URL = os.environ["DATABASE_URL"] # ex: postgresql://user:pwd@host/db
engine = create_engine(DB_URL)Em pipelines Airflow, usar PostgresHook / MsSqlHook para gerenciar conexões via UI:
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id="meu_postgres")
df = hook.get_pandas_df("SELECT * FROM tabela")
engine = hook.get_sqlalchemy_engine()Principais drivers por banco
| Banco | Driver Python | Instalação |
|---|---|---|
| PostgreSQL | psycopg2-binary | pip install psycopg2-binary |
| SQL Server | pyodbc | pip install pyodbc + ODBC Driver |
| MySQL / MariaDB | pymysql | pip install pymysql |
| SQLite | embutido | - |
| BigQuery | google-cloud-bigquery | pip install google-cloud-bigquery[pandas] |
| Snowflake | snowflake-sqlalchemy | pip install snowflake-sqlalchemy |
Ver também: python-engenharia-dados | python-pandas | db-relacional | sqlserver-visao-geral