O Kubernetes é o ambiente de produção mais robusto para o Airflow. Cada task pode rodar em um Pod isolado com recursos próprios (CPU/memória), e o cluster escala automaticamente.
Formas de deploy no K8s
| Abordagem | Executor | Tasks rodam em | Quando usar |
|---|---|---|---|
| Airflow com KubernetesExecutor | KubernetesExecutor | Pod efêmero por task | Isolamento total, cargas variáveis |
| Airflow com CeleryExecutor + workers em K8s | CeleryExecutor | Workers persistentes (Pods) | Migração de Celery para K8s |
| Airflow com KubernetesPodOperator | Qualquer executor | Pod customizado por task | Tasks com imagens Docker diferentes |
Helm Chart oficial
O projeto mantém um Helm chart em airflow.apache.org/docs/helm-chart.
Instalação
helm repo add apache-airflow https://airflow.apache.org
helm repo update
# Instalar em namespace dedicado
kubectl create namespace airflow
helm install airflow apache-airflow/airflow \
--namespace airflow \
--version 1.13.1 \
--values values.yamlvalues.yaml básico
# values.yaml
executor: KubernetesExecutor
# Imagem customizada com dependências
images:
airflow:
repository: meu-registry/airflow-custom
tag: 2.9.3
pullPolicy: IfNotPresent
# DAGs via GitSync (recomendado para produção)
dags:
gitSync:
enabled: true
repo: https://github.com/minha-org/airflow-dags.git
branch: main
rev: HEAD
depth: 1
maxFailures: 3
subPath: dags
credentialsSecret: airflow-git-credentials # Secret K8s com token/SSH
# Backend de metadados externo (RDS, Cloud SQL, etc.)
data:
metadataConnection:
user: airflow
pass: ~ # usar Secret
host: postgres-host.interno
port: 5432
db: airflow
sslmode: require
# Recursos para os componentes core
scheduler:
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1"
webserver:
resources:
requests:
memory: "512Mi"
cpu: "250m"
service:
type: ClusterIP
# Ingress para expor a UI
ingress:
web:
enabled: true
annotations:
kubernetes.io/ingress.class: nginx
hosts:
- name: airflow.minha-empresa.com
tls:
enabled: true
secretName: airflow-tls
# Variáveis de ambiente e segredos
env:
- name: AIRFLOW__CORE__LOAD_EXAMPLES
value: "false"
- name: AIRFLOW__CORE__DEFAULT_TIMEZONE
value: "America/Sao_Paulo"
# Secrets do K8s mapeados como env vars
secret:
- envName: AIRFLOW__CORE__FERNET_KEY
secretName: airflow-secrets
secretKey: fernet-key
# Redis (para CeleryExecutor — desnecessário com KubernetesExecutor)
redis:
enabled: false
# Workers (para CeleryExecutor)
workers:
enabled: false
# Flower (monitor do Celery — desnecessário com KubernetesExecutor)
flower:
enabled: false
# Triggerer (para sensores deferríveis)
triggerer:
enabled: true
resources:
requests:
memory: "256Mi"
cpu: "100m"Atualizar configuração
# Atualizar values
helm upgrade airflow apache-airflow/airflow \
--namespace airflow \
--values values.yaml
# Verificar status
kubectl get pods -n airflow
kubectl get svc -n airflow
# Logs do scheduler
kubectl logs -n airflow -l component=scheduler -f
# Acessar UI via port-forward
kubectl port-forward svc/airflow-webserver 8080:8080 -n airflowKubernetesExecutor em detalhe
O scheduler cria Pods no K8s diretamente, sem workers persistentes. O worker_container_repository define a imagem usada nos Pods de task.
# Trecho do values.yaml para KubernetesExecutor
executor: KubernetesExecutor
# Template de Pod para as tasks (configuração global)
workers:
podAnnotations:
prometheus.io/scrape: "true"
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "2Gi"
cpu: "1"Pod Template File
Permite customizar o Pod de task no nível do DAG:
from airflow import DAG
from airflow.decorators import task
@dag(
dag_id="pipeline_k8s",
...
)
def pipeline_k8s():
@task(
executor_config={
"KubernetesExecutor": {
"request_memory": "1Gi",
"limit_memory": "4Gi",
"request_cpu": "500m",
"limit_cpu": "2",
"labels": {"workload": "heavy-processing"},
"tolerations": [
{
"key": "workload",
"operator": "Equal",
"value": "compute",
"effect": "NoSchedule",
}
],
}
}
)
def processar_pesado():
pass
processar_pesado()KubernetesPodOperator
Executa qualquer imagem Docker como uma task do Airflow. Útil para isolar dependências ou usar linguagens diferentes.
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes.client import models as k8s
processar_spark = KubernetesPodOperator(
task_id="processar_spark",
name="spark-job-{{ ds_nodash }}",
namespace="airflow",
image="apache/spark-py:3.5.0",
cmds=["python", "/opt/spark/jobs/processar_pedidos.py"],
arguments=["--data", "{{ ds }}"],
env_vars={
"SPARK_MASTER": "k8s://https://kubernetes.default.svc",
},
resources=k8s.V1ResourceRequirements(
requests={"memory": "2Gi", "cpu": "1"},
limits={"memory": "4Gi", "cpu": "2"},
),
image_pull_policy="Always",
get_logs=True,
is_delete_operator_pod=True, # remove o pod após conclusão
in_cluster=True, # assume que Airflow roda no mesmo cluster
)Passar segredos para o Pod
from kubernetes.client import models as k8s
processar = KubernetesPodOperator(
task_id="processar",
image="meu-registry/meu-job:latest",
secrets=[
k8s.V1EnvVarSource(
secret_key_ref=k8s.V1SecretKeySelector(
name="credenciais-db",
key="password",
)
)
],
env_from=[
k8s.V1EnvFromSource(
secret_ref=k8s.V1SecretEnvSource(name="env-secrets")
)
],
)GitSync: sincronização de DAGs
O GitSync mantém os DAGs sincronizados com um repositório Git sem necessidade de rebuild da imagem.
graph LR GH[GitHub/GitLab] -->|pull a cada 60s| GS[GitSync sidecar] GS --> VOL[(Volume compartilhado)] VOL --> SCH[Scheduler] VOL --> WK[Workers / Pods]
# Criar secret para autenticação no repositório privado
kubectl create secret generic airflow-git-credentials \
--from-literal=GIT_SYNC_USERNAME=meu-usuario \
--from-literal=GIT_SYNC_PASSWORD=ghp_token \
-n airflowAutoscaling com KEDA
O KEDA (Kubernetes Event-Driven Autoscaling) pode escalar workers do CeleryExecutor com base no tamanho da fila.
# values.yaml com KEDA
workers:
keda:
enabled: true
minReplicaCount: 1
maxReplicaCount: 10
pollingInterval: 10
cooldownPeriod: 30
# escala com base em tasks na fila do Celery
triggers:
- type: postgresql
metadata:
targetQueryValue: "1"
query: >
SELECT COUNT(*) FROM task_instance
WHERE state = 'queued'Secrets com External Secrets Operator
Em produção, evite armazenar segredos diretamente nos values. Use o ESO para sincronizar do AWS Secrets Manager, GCP Secret Manager ou HashiCorp Vault.
# ExternalSecret CR
apiVersion: external-secrets.io/v1beta1
kind: ExternalSecret
metadata:
name: airflow-secrets
namespace: airflow
spec:
refreshInterval: 1h
secretStoreRef:
name: aws-secrets-store
kind: SecretStore
target:
name: airflow-secrets
data:
- secretKey: fernet-key
remoteRef:
key: airflow/prod/fernet-key
- secretKey: webserver-secret
remoteRef:
key: airflow/prod/webserver-secretVer também: airflow | airflow-dag-desenvolvimento | airflow-deploy-local | Kubernetes | Helm | kubectl | ArgoCD | gcp-cloud-composer | airflow-aws-mwaa