A AWS oferece duas formas de rodar Airflow: o serviço gerenciado MWAA e a instalação manual em instâncias EC2 (ou EKS).

Comparativo: MWAA vs. EC2 vs. EKS

MWAAEC2 (Docker Compose)EKS (Helm)
OperaçãoTotalmente gerenciadaManualSemi-gerenciada
SetupMinutos via Console/CLIHoras (configuração manual)Horas (Helm + K8s)
EscalabilidadeAuto (max_workers configurável)ManualAuto com KEDA
CustoAlto (instâncias dedicadas + overhead)Baixo a médioMédio a alto
ControleLimitado (versão Airflow fixa pelo AWS)TotalAlto
Ideal paraTimes sem ops/DevOps dedicadoProjetos menores, POCProdução cloud-native

Amazon MWAA (Managed Workflows for Apache Airflow)

O MWAA é o Airflow gerenciado da AWS: AWS controla o provisionamento, patching, backups e escalabilidade dos workers.

Arquitetura do MWAA

graph TB
    subgraph VPC["VPC Privada"]
        subgraph MWAA["Amazon MWAA"]
            SCH[Scheduler]
            WS[Webserver]
            WK[Workers - CeleryExecutor]
        end
        RDS[(Aurora PostgreSQL\nMetadata DB)]
        ELA[(ElastiCache Redis\nBroker Celery)]
    end
    S3[(S3 Bucket\nDAGs + Plugins + Requirements)]
    CW[CloudWatch\nLogs]
    S3 --> SCH
    MWAA --> CW

Os DAGs são lidos diretamente de um bucket S3. Não há acesso SSH ao ambiente: tudo via S3, UI e API.

Criando um ambiente MWAA

Via AWS Console

  1. S3 → criar bucket para DAGs (ex: airflow-dags-prod)
  2. MWAA → Create environment
  3. Configurar: nome, versão Airflow, tamanho da classe de worker, VPC, subnets privadas
  4. Apontar para o bucket S3 e pasta de DAGs

Via Terraform

# Módulo Terraform para MWAA
resource "aws_mwaa_environment" "airflow_prod" {
  name               = "airflow-prod"
  airflow_version    = "2.9.2"
  environment_class  = "mw1.medium"  # mw1.small | mw1.medium | mw1.large | mw1.xlarge
  max_workers        = 10
  min_workers        = 1
 
  source_bucket_arn    = aws_s3_bucket.airflow_dags.arn
  dag_s3_path          = "dags/"
  plugins_s3_path      = "plugins.zip"
  requirements_s3_path = "requirements.txt"
 
  network_configuration {
    security_group_ids = [aws_security_group.airflow.id]
    subnet_ids         = aws_subnet.private[*].id
  }
 
  logging_configuration {
    dag_processing_logs {
      enabled   = true
      log_level = "WARNING"
    }
    scheduler_logs {
      enabled   = true
      log_level = "INFO"
    }
    task_logs {
      enabled   = true
      log_level = "INFO"
    }
    webserver_logs {
      enabled   = true
      log_level = "ERROR"
    }
    worker_logs {
      enabled   = true
      log_level = "INFO"
    }
  }
 
  execution_role_arn = aws_iam_role.mwaa_execution.arn
 
  airflow_configuration_options = {
    "core.load_examples"          = "false"
    "core.default_timezone"       = "America/Sao_Paulo"
    "scheduler.dag_dir_list_interval" = "30"
  }
 
  tags = {
    Environment = "prod"
    Team        = "data-engineering"
  }
}

IAM Role para MWAA

A execution role precisa de permissões para S3, CloudWatch, Secrets Manager e os serviços que os DAGs vão acessar.

resource "aws_iam_role_policy" "mwaa_execution" {
  name = "mwaa-execution-policy"
  role = aws_iam_role.mwaa_execution.id
 
  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Effect   = "Allow",
        Action   = ["s3:GetObject*", "s3:GetBucket*", "s3:List*"],
        Resource = ["${aws_s3_bucket.airflow_dags.arn}", "${aws_s3_bucket.airflow_dags.arn}/*"]
      },
      {
        Effect   = "Allow",
        Action   = ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "logs:DescribeLogGroups"],
        Resource = "arn:aws:logs:*:*:log-group:airflow-*"
      },
      {
        Effect   = "Allow",
        Action   = ["cloudwatch:PutMetricData"],
        Resource = "*"
      },
      # Adicionar permissões para serviços que os DAGs vão usar:
      {
        Effect   = "Allow",
        Action   = ["glue:*", "athena:*", "s3:*"],
        Resource = "*"
      }
    ]
  })
}

Deploy de DAGs no MWAA

# Sincronizar DAGs para o S3 (manual)
aws s3 sync ./dags/ s3://airflow-dags-prod/dags/ --delete
 
# Com CI/CD (GitHub Actions)
# .github/workflows/deploy-dags.yml
# - on push para main, sincroniza s3://airflow-dags-prod/dags/
# requirements.txt no S3 (s3://airflow-dags-prod/requirements.txt)
apache-airflow-providers-amazon==8.20.0
apache-airflow-providers-google==10.15.0
pandas==2.0.3

Após atualizar requirements.txt no S3, é necessário atualizar o ambiente MWAA no Console ou via CLI para ele reinstalar as dependências.

Operadores AWS no MWAA

from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator
from airflow.providers.amazon.aws.operators.emr import EmrCreateJobFlowOperator
 
# Aguardar arquivo no S3
aguardar_arquivo = S3KeySensor(
    task_id="aguardar_arquivo",
    bucket_name="data-lake-prod",
    bucket_key="raw/pedidos/{{ ds_nodash }}/*.parquet",
    wildcard_match=True,
    aws_conn_id="aws_default",
    mode="reschedule",
    poke_interval=120,
    timeout=7200,
)
 
# Executar Glue Job
processar_glue = GlueJobOperator(
    task_id="processar_glue",
    job_name="transformar-pedidos",
    script_args={
        "--DATA_DATE": "{{ ds }}",
        "--SOURCE_BUCKET": "data-lake-prod",
    },
    aws_conn_id="aws_default",
    wait_for_completion=True,
)
 
# Executar query no Redshift
carregar_redshift = RedshiftDataOperator(
    task_id="carregar_redshift",
    cluster_identifier="redshift-prod",
    database="analytics",
    db_user="airflow",
    sql="""
        DELETE FROM gold.pedidos WHERE data_ref = '{{ ds }}';
        INSERT INTO gold.pedidos
        SELECT * FROM staging.pedidos WHERE data_ref = '{{ ds }}';
    """,
    aws_conn_id="aws_default",
    wait_for_completion=True,
)
 
# Copiar arquivo no S3
mover_para_processed = S3CopyObjectOperator(
    task_id="mover_para_processed",
    source_bucket_name="data-lake-prod",
    source_bucket_key="raw/pedidos/{{ ds_nodash }}/pedidos.parquet",
    dest_bucket_name="data-lake-prod",
    dest_bucket_key="processed/pedidos/{{ ds_nodash }}/pedidos.parquet",
)

Connection para AWS

# Via variável de ambiente (funciona no MWAA)
# O MWAA usa automaticamente a execution role — sem necessidade de credentials estáticas
# Definir a connection como aws_default com tipo "Amazon Web Services" e deixar credentials vazias
 
# Para testes locais:
export AIRFLOW_CONN_AWS_DEFAULT='aws://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI@?region_name=us-east-1'

Airflow no EC2 (instalação manual)

Para times que precisam de mais controle ou menor custo, é possível instalar o Airflow diretamente em instâncias EC2.

Arquitetura recomendada para EC2

graph TB
    subgraph VPC
        EC2_SCH["EC2: Scheduler + Webserver\nt3.medium"]
        EC2_W1["EC2: Worker 1\nm5.xlarge"]
        EC2_W2["EC2: Worker 2\nm5.xlarge"]
        RDS[(RDS PostgreSQL\ndb.t3.medium)]
        ELA[(ElastiCache Redis\ncache.t3.micro)]
    end
    ALB[Application Load Balancer\n:80/:443] --> EC2_SCH
    EC2_SCH --> RDS
    EC2_SCH --> ELA
    EC2_W1 --> RDS
    EC2_W1 --> ELA
    EC2_W2 --> RDS
    EC2_W2 --> ELA

User data script para EC2 (Amazon Linux 2)

#!/bin/bash
# Script de inicialização da instância EC2
 
# Dependências do sistema
yum update -y
yum install -y python3.11 python3.11-pip git docker
 
# Docker Compose
curl -L "https://github.com/docker/compose/releases/latest/download/docker-compose-$(uname -s)-$(uname -m)" \
  -o /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose
 
# Clonar repositório com docker-compose.yml e DAGs
git clone https://github.com/minha-org/airflow-infra.git /opt/airflow
cd /opt/airflow
 
# Variáveis de ambiente (idealmente via AWS Secrets Manager + env vars)
cat > .env << EOF
AIRFLOW_UID=1000
AIRFLOW__CORE__FERNET_KEY=$(python3 -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())")
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:${DB_PASS}@${RDS_HOST}:5432/airflow
AIRFLOW__CELERY__BROKER_URL=redis://${REDIS_HOST}:6379/0
EOF
 
# Inicializar e subir
docker-compose up airflow-init
docker-compose up -d
 
# Serviço systemd para restart automático
cat > /etc/systemd/system/airflow.service << EOF
[Unit]
Description=Apache Airflow
After=docker.service
Requires=docker.service
 
[Service]
WorkingDirectory=/opt/airflow
ExecStart=/usr/local/bin/docker-compose up
ExecStop=/usr/local/bin/docker-compose down
Restart=always
 
[Install]
WantedBy=multi-user.target
EOF
 
systemctl enable airflow
systemctl start airflow

DAGs via S3 (EC2)

Manter DAGs no S3 e sincronizar na instância com um cron:

# /opt/airflow/sync-dags.sh
#!/bin/bash
aws s3 sync s3://airflow-dags-prod/dags/ /opt/airflow/dags/ --delete
 
# crontab -e
# */1 * * * * /opt/airflow/sync-dags.sh >> /var/log/airflow-sync.log 2>&1

Secrets Manager integration (EC2)

# Buscar secrets do AWS Secrets Manager em tempo de execução
import boto3, json
 
def get_secret(secret_name: str) -> dict:
    client = boto3.client("secretsmanager", region_name="us-east-1")
    response = client.get_secret_value(SecretId=secret_name)
    return json.loads(response["SecretString"])
 
@task
def extrair_db():
    creds = get_secret("airflow/prod/postgres-creds")
    # usar creds["username"], creds["password"]

Classes de workers MWAA

ClassevCPURAMQuando usar
mw1.micro0.25512MBDesenvolvimento, baixo volume
mw1.small0.51GBCargas leves
mw1.medium24GBUso geral de produção
mw1.large48GBPipelines intensivos
mw1.xlarge816GBCargas pesadas
mw1.2xlarge1632GBMáximo disponível

Ver também: airflow | airflow-dag-desenvolvimento | airflow-deploy-kubernetes | gcp-cloud-composer | terraform-cloud-aws | pipeline-de-dados