A AWS oferece duas formas de rodar Airflow: o serviço gerenciado MWAA e a instalação manual em instâncias EC2 (ou EKS).
Comparativo: MWAA vs. EC2 vs. EKS
| MWAA | EC2 (Docker Compose) | EKS (Helm) | |
|---|---|---|---|
| Operação | Totalmente gerenciada | Manual | Semi-gerenciada |
| Setup | Minutos via Console/CLI | Horas (configuração manual) | Horas (Helm + K8s) |
| Escalabilidade | Auto (max_workers configurável) | Manual | Auto com KEDA |
| Custo | Alto (instâncias dedicadas + overhead) | Baixo a médio | Médio a alto |
| Controle | Limitado (versão Airflow fixa pelo AWS) | Total | Alto |
| Ideal para | Times sem ops/DevOps dedicado | Projetos menores, POC | Produção cloud-native |
Amazon MWAA (Managed Workflows for Apache Airflow)
O MWAA é o Airflow gerenciado da AWS: AWS controla o provisionamento, patching, backups e escalabilidade dos workers.
Arquitetura do MWAA
graph TB subgraph VPC["VPC Privada"] subgraph MWAA["Amazon MWAA"] SCH[Scheduler] WS[Webserver] WK[Workers - CeleryExecutor] end RDS[(Aurora PostgreSQL\nMetadata DB)] ELA[(ElastiCache Redis\nBroker Celery)] end S3[(S3 Bucket\nDAGs + Plugins + Requirements)] CW[CloudWatch\nLogs] S3 --> SCH MWAA --> CW
Os DAGs são lidos diretamente de um bucket S3. Não há acesso SSH ao ambiente: tudo via S3, UI e API.
Criando um ambiente MWAA
Via AWS Console
- S3 → criar bucket para DAGs (ex:
airflow-dags-prod) - MWAA → Create environment
- Configurar: nome, versão Airflow, tamanho da classe de worker, VPC, subnets privadas
- Apontar para o bucket S3 e pasta de DAGs
Via Terraform
# Módulo Terraform para MWAA
resource "aws_mwaa_environment" "airflow_prod" {
name = "airflow-prod"
airflow_version = "2.9.2"
environment_class = "mw1.medium" # mw1.small | mw1.medium | mw1.large | mw1.xlarge
max_workers = 10
min_workers = 1
source_bucket_arn = aws_s3_bucket.airflow_dags.arn
dag_s3_path = "dags/"
plugins_s3_path = "plugins.zip"
requirements_s3_path = "requirements.txt"
network_configuration {
security_group_ids = [aws_security_group.airflow.id]
subnet_ids = aws_subnet.private[*].id
}
logging_configuration {
dag_processing_logs {
enabled = true
log_level = "WARNING"
}
scheduler_logs {
enabled = true
log_level = "INFO"
}
task_logs {
enabled = true
log_level = "INFO"
}
webserver_logs {
enabled = true
log_level = "ERROR"
}
worker_logs {
enabled = true
log_level = "INFO"
}
}
execution_role_arn = aws_iam_role.mwaa_execution.arn
airflow_configuration_options = {
"core.load_examples" = "false"
"core.default_timezone" = "America/Sao_Paulo"
"scheduler.dag_dir_list_interval" = "30"
}
tags = {
Environment = "prod"
Team = "data-engineering"
}
}IAM Role para MWAA
A execution role precisa de permissões para S3, CloudWatch, Secrets Manager e os serviços que os DAGs vão acessar.
resource "aws_iam_role_policy" "mwaa_execution" {
name = "mwaa-execution-policy"
role = aws_iam_role.mwaa_execution.id
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = ["s3:GetObject*", "s3:GetBucket*", "s3:List*"],
Resource = ["${aws_s3_bucket.airflow_dags.arn}", "${aws_s3_bucket.airflow_dags.arn}/*"]
},
{
Effect = "Allow",
Action = ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "logs:DescribeLogGroups"],
Resource = "arn:aws:logs:*:*:log-group:airflow-*"
},
{
Effect = "Allow",
Action = ["cloudwatch:PutMetricData"],
Resource = "*"
},
# Adicionar permissões para serviços que os DAGs vão usar:
{
Effect = "Allow",
Action = ["glue:*", "athena:*", "s3:*"],
Resource = "*"
}
]
})
}Deploy de DAGs no MWAA
# Sincronizar DAGs para o S3 (manual)
aws s3 sync ./dags/ s3://airflow-dags-prod/dags/ --delete
# Com CI/CD (GitHub Actions)
# .github/workflows/deploy-dags.yml
# - on push para main, sincroniza s3://airflow-dags-prod/dags/# requirements.txt no S3 (s3://airflow-dags-prod/requirements.txt)
apache-airflow-providers-amazon==8.20.0
apache-airflow-providers-google==10.15.0
pandas==2.0.3Após atualizar
requirements.txtno S3, é necessário atualizar o ambiente MWAA no Console ou via CLI para ele reinstalar as dependências.
Operadores AWS no MWAA
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator
from airflow.providers.amazon.aws.operators.emr import EmrCreateJobFlowOperator
# Aguardar arquivo no S3
aguardar_arquivo = S3KeySensor(
task_id="aguardar_arquivo",
bucket_name="data-lake-prod",
bucket_key="raw/pedidos/{{ ds_nodash }}/*.parquet",
wildcard_match=True,
aws_conn_id="aws_default",
mode="reschedule",
poke_interval=120,
timeout=7200,
)
# Executar Glue Job
processar_glue = GlueJobOperator(
task_id="processar_glue",
job_name="transformar-pedidos",
script_args={
"--DATA_DATE": "{{ ds }}",
"--SOURCE_BUCKET": "data-lake-prod",
},
aws_conn_id="aws_default",
wait_for_completion=True,
)
# Executar query no Redshift
carregar_redshift = RedshiftDataOperator(
task_id="carregar_redshift",
cluster_identifier="redshift-prod",
database="analytics",
db_user="airflow",
sql="""
DELETE FROM gold.pedidos WHERE data_ref = '{{ ds }}';
INSERT INTO gold.pedidos
SELECT * FROM staging.pedidos WHERE data_ref = '{{ ds }}';
""",
aws_conn_id="aws_default",
wait_for_completion=True,
)
# Copiar arquivo no S3
mover_para_processed = S3CopyObjectOperator(
task_id="mover_para_processed",
source_bucket_name="data-lake-prod",
source_bucket_key="raw/pedidos/{{ ds_nodash }}/pedidos.parquet",
dest_bucket_name="data-lake-prod",
dest_bucket_key="processed/pedidos/{{ ds_nodash }}/pedidos.parquet",
)Connection para AWS
# Via variável de ambiente (funciona no MWAA)
# O MWAA usa automaticamente a execution role — sem necessidade de credentials estáticas
# Definir a connection como aws_default com tipo "Amazon Web Services" e deixar credentials vazias
# Para testes locais:
export AIRFLOW_CONN_AWS_DEFAULT='aws://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI@?region_name=us-east-1'Airflow no EC2 (instalação manual)
Para times que precisam de mais controle ou menor custo, é possível instalar o Airflow diretamente em instâncias EC2.
Arquitetura recomendada para EC2
graph TB subgraph VPC EC2_SCH["EC2: Scheduler + Webserver\nt3.medium"] EC2_W1["EC2: Worker 1\nm5.xlarge"] EC2_W2["EC2: Worker 2\nm5.xlarge"] RDS[(RDS PostgreSQL\ndb.t3.medium)] ELA[(ElastiCache Redis\ncache.t3.micro)] end ALB[Application Load Balancer\n:80/:443] --> EC2_SCH EC2_SCH --> RDS EC2_SCH --> ELA EC2_W1 --> RDS EC2_W1 --> ELA EC2_W2 --> RDS EC2_W2 --> ELA
User data script para EC2 (Amazon Linux 2)
#!/bin/bash
# Script de inicialização da instância EC2
# Dependências do sistema
yum update -y
yum install -y python3.11 python3.11-pip git docker
# Docker Compose
curl -L "https://github.com/docker/compose/releases/latest/download/docker-compose-$(uname -s)-$(uname -m)" \
-o /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose
# Clonar repositório com docker-compose.yml e DAGs
git clone https://github.com/minha-org/airflow-infra.git /opt/airflow
cd /opt/airflow
# Variáveis de ambiente (idealmente via AWS Secrets Manager + env vars)
cat > .env << EOF
AIRFLOW_UID=1000
AIRFLOW__CORE__FERNET_KEY=$(python3 -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())")
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:${DB_PASS}@${RDS_HOST}:5432/airflow
AIRFLOW__CELERY__BROKER_URL=redis://${REDIS_HOST}:6379/0
EOF
# Inicializar e subir
docker-compose up airflow-init
docker-compose up -d
# Serviço systemd para restart automático
cat > /etc/systemd/system/airflow.service << EOF
[Unit]
Description=Apache Airflow
After=docker.service
Requires=docker.service
[Service]
WorkingDirectory=/opt/airflow
ExecStart=/usr/local/bin/docker-compose up
ExecStop=/usr/local/bin/docker-compose down
Restart=always
[Install]
WantedBy=multi-user.target
EOF
systemctl enable airflow
systemctl start airflowDAGs via S3 (EC2)
Manter DAGs no S3 e sincronizar na instância com um cron:
# /opt/airflow/sync-dags.sh
#!/bin/bash
aws s3 sync s3://airflow-dags-prod/dags/ /opt/airflow/dags/ --delete
# crontab -e
# */1 * * * * /opt/airflow/sync-dags.sh >> /var/log/airflow-sync.log 2>&1Secrets Manager integration (EC2)
# Buscar secrets do AWS Secrets Manager em tempo de execução
import boto3, json
def get_secret(secret_name: str) -> dict:
client = boto3.client("secretsmanager", region_name="us-east-1")
response = client.get_secret_value(SecretId=secret_name)
return json.loads(response["SecretString"])
@task
def extrair_db():
creds = get_secret("airflow/prod/postgres-creds")
# usar creds["username"], creds["password"]Classes de workers MWAA
| Classe | vCPU | RAM | Quando usar |
|---|---|---|---|
mw1.micro | 0.25 | 512MB | Desenvolvimento, baixo volume |
mw1.small | 0.5 | 1GB | Cargas leves |
mw1.medium | 2 | 4GB | Uso geral de produção |
mw1.large | 4 | 8GB | Pipelines intensivos |
mw1.xlarge | 8 | 16GB | Cargas pesadas |
mw1.2xlarge | 16 | 32GB | Máximo disponível |
Ver também: airflow | airflow-dag-desenvolvimento | airflow-deploy-kubernetes | gcp-cloud-composer | terraform-cloud-aws | pipeline-de-dados