Conectar Python a bancos de dados é essencial para ingestão e carga de dados em pipelines. Existem duas camadas principais: drivers nativos (baixo nível) e abstrações (SQLAlchemy).

Camadas de conexão

Python app
    └── SQLAlchemy (abstração / ORM)
            └── psycopg2 / pyodbc / cx_Oracle / pymysql (driver nativo)
                    └── Banco de dados

Regra geral: use SQLAlchemy para portabilidade e integração com Pandas/Polars. Use o driver nativo diretamente quando precisar de controle fino ou máxima performance em operações bulk.

SQLAlchemy: conexão e execução SQL

from sqlalchemy import create_engine, text
 
# Connection strings por banco
engine_pg    = create_engine("postgresql+psycopg2://user:pwd@host:5432/db")
engine_mssql = create_engine("mssql+pyodbc://user:pwd@host/db?driver=ODBC+Driver+18+for+SQL+Server")
engine_mysql = create_engine("mysql+pymysql://user:pwd@host:3306/db")
engine_sqlite = create_engine("sqlite:///local.db")
 
# Execução de queries
with engine_pg.connect() as conn:
    resultado = conn.execute(text("SELECT * FROM schema.tabela WHERE status = :status"), {"status": "ativo"})
    linhas = resultado.fetchall()
 
# Transação explícita
with engine_pg.begin() as conn:    # commit automático ao sair sem exceção
    conn.execute(text("UPDATE tabela SET status = 'processado' WHERE id = :id"), {"id": 1})

Integração com Pandas e Polars

import pandas as pd
import polars as pl
 
# Pandas ↔ banco
df = pd.read_sql("SELECT * FROM schema.tabela", con=engine)
df.to_sql("tabela_destino", con=engine, schema="schema", if_exists="append", index=False)
 
# Polars ↔ banco (via connection string ADBC ou Pandas intermediário)
df_pl = pl.read_database_uri(
    "SELECT * FROM schema.tabela",
    uri="postgresql://user:pwd@host:5432/db"
)

Carga bulk: alta performance

Para carregar grandes volumes, to_sql do Pandas é lento (INSERT linha a linha). Alternativas:

import io
import psycopg2
 
# PostgreSQL: COPY via psycopg2 (muito mais rápido)
conn = psycopg2.connect("postgresql://user:pwd@host:5432/db")
cur = conn.cursor()
 
buffer = io.StringIO()
df.to_csv(buffer, index=False, header=False)
buffer.seek(0)
 
cur.copy_expert(
    "COPY schema.tabela (col1, col2, col3) FROM STDIN WITH CSV",
    buffer
)
conn.commit()
# SQL Server: fast_executemany
engine_mssql = create_engine(
    "mssql+pyodbc://...",
    fast_executemany=True    # usa batch insert do driver ODBC
)
df.to_sql("tabela", con=engine_mssql, if_exists="append", index=False)

Variáveis de ambiente para credenciais

Nunca hardcodar senhas. Usar variáveis de ambiente ou secret managers:

import os
from sqlalchemy import create_engine
 
DB_URL = os.environ["DATABASE_URL"]   # ex: postgresql://user:pwd@host/db
engine = create_engine(DB_URL)

Em pipelines Airflow, usar PostgresHook / MsSqlHook para gerenciar conexões via UI:

from airflow.providers.postgres.hooks.postgres import PostgresHook
 
hook = PostgresHook(postgres_conn_id="meu_postgres")
df = hook.get_pandas_df("SELECT * FROM tabela")
engine = hook.get_sqlalchemy_engine()

Principais drivers por banco

BancoDriver PythonInstalação
PostgreSQLpsycopg2-binarypip install psycopg2-binary
SQL Serverpyodbcpip install pyodbc + ODBC Driver
MySQL / MariaDBpymysqlpip install pymysql
SQLiteembutido-
BigQuerygoogle-cloud-bigquerypip install google-cloud-bigquery[pandas]
Snowflakesnowflake-sqlalchemypip install snowflake-sqlalchemy

Ver também: python-engenharia-dados | python-pandas | db-relacional | sqlserver-visao-geral