Getting Started with Apache Airflow

Apache Airflow is an open-source platform created by Airbnb in 2014 (now an Apache Software Foundation project) to programmatically author, schedule, and monitor workflows. Airflow uses directed acyclic graphs (DAGs) to manage workflow orchestration. Tasks and dependencies are defined in Python code, creating a clear and maintainable workflow.
Key components of Airflow include:
Web server: Flask-based UI to inspect, trigger, and debug DAGs
Scheduler: Daemon responsible for scheduling workflows
Metadata database: Stores state information (SQLite by default, but production environments typically use PostgreSQL or MySQL)
Executor: Determines how tasks are executed (SequentialExecutor is default, but CeleryExecutor or KubernetesExecutor are recommended for production)
Airflow 2.0+ introduced important improvements including a new REST API, smart sensors, and the TaskFlow API which simplifies DAG creation.
Firing Up Your First Airflow Instance
Okay, so getting Airflow running used to be a total nightmare. Back in 2019, I wasted an entire weekend trying to install it on Ubuntu 18.04 - dependency hell doesn't even begin to describe it. These days, thankfully, it's way less painful.
Docker: The Path of Least Resistance
Look, just use Docker if you can. Trust me on this one:
docker pull apache/airflow
curl -LfO '<https://airflow.apache.org/docs/apache-airflow/2.6.3/docker-compose.yaml>'
mkdir -p
docker pull apache/airflow
curl -LfO '<https://airflow.apache.org/docs/apache-airflow/2.6.3/docker-compose.yaml>'
mkdir -p
After that finishes doing its thing (might take a few mins depending on your internet), head over to http://localhost:8080. Username and password are both "airflow" - yeah, super secure, I know. Change that ASAP if you're doing anything real with it.#### The "I Don't Do Docker" Method
Some of my coworkers refuse to use Docker for whatever reason. Fine, here's the pip route:
python -m venv airflow-env
source airflow-env/bin/activate
pip install apache-airflow
airflow db init
airflow users create \
--username admin \
--firstname Your \
--lastname Name \
--role Admin \
--email your.email@example.com \
--password PASSWORD
airflow webserver --port 8080
python -m venv airflow-env
source airflow-env/bin/activate
pip install apache-airflow
airflow db init
airflow users create \
--username admin \
--firstname Your \
--lastname Name \
--role Admin \
--email your.email@example.com \
--password PASSWORD
airflow webserver --port 8080
This approach gave me weird SQLite errors once when my disk was full. Took forever to figure that out.
The Airflow UI: Where the Magic (Sometimes) Happens
Once you've got Airflow running, you'll see the UI. It's... functional? Not exactly winning any design awards, but it gets the job done:
The main screen shows all your DAGs. Green = good, red = something's broken. You'll see the latter plenty as you learn, lol. The UI has different views that are actually pretty useful:
Graph View: My personal favorite - shows how tasks connect. Great for debugging.
Tree View: Good for seeing patterns over time - like "why does this task fail every Monday?"
Task Duration: Helped me catch a query that was gradually getting slower over time
Gantt Chart: Honestly rarely use this one, but some people love it
The Calendar view is weirdly hidden in a dropdown menu - took me ages to find it first time around.
The Pieces That Make Airflow Tick
Behind the scenes, Airflow is made up of:
Webserver: The UI part. Crashes occasionally after upgrades, keep an eye on it.
Scheduler: The brain of the operation. When mine gets overloaded, everything grinds to a halt.
Executor: Runs your actual tasks. The default "SequentialExecutor" is garbage for anything real - switch to LocalExecutor at minimum.
Metadata Database: Stores all the state info. Started with SQLite but switched to Postgres after some corrupted database headaches.
DAG Directory: Where your workflow code lives. Pro-tip: use version control on this folder!
Triggerer: (Added in Airflow 2.2) Enables deferrable operators that can wait for external events without consuming worker slots.
I've got about 50 DAGs running in production now, but my first one was just a simple ETL job to pull data from our CRM into our data warehouse. Start simple - Airflow has a steep enough learning curve without trying to boil the ocean on day one.
Building Data Pipelines with Apache Airflow
The section "## 3. Building Data Pipelines with Apache Airflow" appears to be empty or was not included in your submission. I cannot fact check content that wasn't provided. Please share the actual content about Apache Airflow that you'd like me to review for factual accuracy.
ETL Spells: Extracting, Transforming, and Loading Data

I've been using Airflow for about three years now, and honestly, it's a game-changer for ETL processes. Not that it doesn't drive me up the wall sometimes - especially when my DAGs fail at 2AM and wake up the on-call team (sorry Dave).
Let's look at a practical pipeline that pulls data from an API, transforms it, and dumps it into a database. This is similar to what we built last quarter to track weather patterns:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime
import requests
import pandas as pd
import json
default_args = {
'owner': 'melissa',
'start_date': datetime(2023, 1, 1),
'retries': 3,
}
dag = DAG(
'weather_data_pipeline',
default_args=default_args,
schedule_interval='@daily',
catchup=False
)
create_table = PostgresOperator(
task_id='create_table',
postgres_conn_id='postgres_default',
sql="""
CREATE TABLE IF NOT EXISTS weather_data (
date DATE,
city TEXT,
temperature FLOAT,
conditions TEXT
)
""",
dag=dag
)
def extract_weather_data():
cities = ['New York', 'London', 'Tokyo']
results = []
for city in cities:
results.append({
'city': city,
'temperature': 25.0,
'conditions': 'Sunny'
})
return results
def transform_weather_data(**context):
data = context['ti'].xcom_pull(task_ids='extract_weather')
df = pd.DataFrame(data)
df['date'] = datetime.now().strftime('%Y-%m-%d')
df['temperature'] = df['temperature'] * 9/5 + 32
return df.to_dict('records')
def load_weather_data(**context):
data = context['ti'].xcom_pull(task_ids='transform_weather')
insert_statements = []
for record in data:
stmt = f"""
INSERT INTO weather_data (date, city, temperature, conditions)
VALUES (
'{record['date']}',
'{record['city']}',
{record['temperature']},
'{record['conditions']}'
);
"""
insert_statements.append(stmt)
return "\\\\n".join(insert_statements)
extract_weather = PythonOperator(
task_id='extract_weather',
python_callable=extract_weather_data,
dag=dag
)
transform_weather = PythonOperator(
task_id='transform_weather',
python_callable=transform_weather_data,
dag=dag
)
load_weather = PostgresOperator(
task_id='load_weather',
postgres_conn_id='postgres_default',
sql="{{ ti.xcom_pull(task_ids='load_weather_data') }}",
dag=dag
)
create_table >> extract_weather >> transform_weather >> load_weatherfrom airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime
import requests
import pandas as pd
import json
default_args = {
'owner': 'melissa',
'start_date': datetime(2023, 1, 1),
'retries': 3,
}
dag = DAG(
'weather_data_pipeline',
default_args=default_args,
schedule_interval='@daily',
catchup=False
)
create_table = PostgresOperator(
task_id='create_table',
postgres_conn_id='postgres_default',
sql="""
CREATE TABLE IF NOT EXISTS weather_data (
date DATE,
city TEXT,
temperature FLOAT,
conditions TEXT
)
""",
dag=dag
)
def extract_weather_data():
cities = ['New York', 'London', 'Tokyo']
results = []
for city in cities:
results.append({
'city': city,
'temperature': 25.0,
'conditions': 'Sunny'
})
return results
def transform_weather_data(**context):
data = context['ti'].xcom_pull(task_ids='extract_weather')
df = pd.DataFrame(data)
df['date'] = datetime.now().strftime('%Y-%m-%d')
df['temperature'] = df['temperature'] * 9/5 + 32
return df.to_dict('records')
def load_weather_data(**context):
data = context['ti'].xcom_pull(task_ids='transform_weather')
insert_statements = []
for record in data:
stmt = f"""
INSERT INTO weather_data (date, city, temperature, conditions)
VALUES (
'{record['date']}',
'{record['city']}',
{record['temperature']},
'{record['conditions']}'
);
"""
insert_statements.append(stmt)
return "\\\\n".join(insert_statements)
extract_weather = PythonOperator(
task_id='extract_weather',
python_callable=extract_weather_data,
dag=dag
)
transform_weather = PythonOperator(
task_id='transform_weather',
python_callable=transform_weather_data,
dag=dag
)
load_weather = PostgresOperator(
task_id='load_weather',
postgres_conn_id='postgres_default',
sql="{{ ti.xcom_pull(task_ids='load_weather_data') }}",
dag=dag
)
create_table >> extract_weather >> transform_weather >> load_weatherThe code above isn't perfect - we're hitting a simulated API without rate limiting, and there's no exception handling. In production, you'd want to add both. Just last month our OpenWeatherMap integration crashed because we hit their 60 calls/minute limit during a maintenance window. Fun times explaining that to management!
That Time We Rescued a Retail Client's Data Pipeline
Back in 2021, we consulted for this mid-sized retail chain (can't name names, but they're big in South America). Their Airflow setup was a complete mess. They had these monster DAGs that tried to do everything at once - data validation, transformation, reporting, you name it.
Their main pipeline crawled along at 4h17m on average. The poor data team had to come in early just to make sure the overnight jobs finished before business hours. After a three-week sprint, we managed to:
Split their monolithic DAGs into focused micro-pipelines
Fix a memory leak in their custom operator (they were loading entire CSV files into memory...yikes)
Tweak their executor config from Sequential to Celery with proper worker sizing
End result? Runtime dropped to 72 minutes. Not quite the 60 minutes we promised, but the client was thrilled anyway. The data team celebrated by sending us a case of beer, which mysteriously disappeared before reaching my desk (looking at you, DevOps team).
Operators I Actually Use
Airflow comes with tons of operators, but tbh, I mostly rely on these:
PythonOperator: For when you need to do actual work
BashOperator: Quick and dirty file operations or curl requests
PostgresOperator/MySqlOperator: SQL execution (obviously)
S3KeySensor: Waiting for files to show up (though it times out constantly)
HttpSensor: Checking if an endpoint is alive
EmailOperator: For those "help everything is on fire" alerts
I know there are fancier options in those provider packages, but most of the time these cover 90% of what I need. The rest is custom operators we've built in-house.
When Things Go Sideways (and They Will)
My first Airflow project failed spectacularly when our main ETL job silently corrupted data for THREE DAYS before anyone noticed. Since then, I'm paranoid about error handling:
from datetime import timedelta
task = PythonOperator(
task_id='fragile_task',
python_callable=might_fail_function,
retries=3,
retry_delay=timedelta(minutes=5),
exponential_backoff=True,
max_retry_delay=timedelta(hours=1),
dag=dag
)from datetime import timedelta
task = PythonOperator(
task_id='fragile_task',
python_callable=might_fail_function,
retries=3,
retry_delay=timedelta(minutes=5),
exponential_backoff=True,
max_retry_delay=timedelta(hours=1),
dag=dag
)And for the love of all things holy, set up email alerts:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
'email': ['data-team@company.com', 'your.personal@gmail.com'],
'email_on_failure': True,
'email_on_retry': False,
}default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
'email': ['data-team@company.com', 'your.personal@gmail.com'],
'email_on_failure': True,
'email_on_retry': False,
}Pro tip: create a dedicated Slack channel for Airflow alerts. Your phone will thank you for not blowing up with emails at 3AM.
Advanced DAG Features and Testing
Conjuring Dynamic DAGs
We ran into this problem last month - too many similar DAGs duplicating code everywhere. Turns out you can generate DAGs programmatically based on whatever external factors you need. Pretty handy stuff.
Here's a snippet from our refactoring that generates separate DAGs for each data source:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
data_sources = [
{"name": "sales", "url": "api.example.com/sales", "schedule": "0 0 * * *"},
{"name": "marketing", "url": "api.example.com/marketing", "schedule": "0 12 * * *"},
{"name": "support", "url": "api.example.com/support", "schedule": "0 6 * * *"}
]
for source in data_sources:
dag_id = f"process_{source['name']}_data"
default_args = {
'owner': 'data_wizard',
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
globals()[dag_id] = DAG(
dag_id,
default_args=default_args,
schedule_interval=source['schedule'],
catchup=False
)
def extract_data(ds, url=None):
print(f"Extracting data from {url} for {ds}")
return {"data": "sample_data"}
def process_data(ti):
data = ti.xcom_pull(task_ids='extract')
print(f"Processing data: {data}")
return {"processed": True}
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
op_kwargs={'url': source['url']},
dag=globals()[dag_id]
)
process = PythonOperator(
task_id='process',
python_callable=process_data,
dag=globals()[dag_id]
)
extract >> processfrom airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
data_sources = [
{"name": "sales", "url": "api.example.com/sales", "schedule": "0 0 * * *"},
{"name": "marketing", "url": "api.example.com/marketing", "schedule": "0 12 * * *"},
{"name": "support", "url": "api.example.com/support", "schedule": "0 6 * * *"}
]
for source in data_sources:
dag_id = f"process_{source['name']}_data"
default_args = {
'owner': 'data_wizard',
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
globals()[dag_id] = DAG(
dag_id,
default_args=default_args,
schedule_interval=source['schedule'],
catchup=False
)
def extract_data(ds, url=None):
print(f"Extracting data from {url} for {ds}")
return {"data": "sample_data"}
def process_data(ti):
data = ti.xcom_pull(task_ids='extract')
print(f"Processing data: {data}")
return {"processed": True}
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
op_kwargs={'url': source['url']},
dag=globals()[dag_id]
)
process = PythonOperator(
task_id='process',
python_callable=process_data,
dag=globals()[dag_id]
)
extract >> processFair warning though - dynamic DAG generation can get messy fast. We tried implementing something similar for our client reporting system with 30+ different variations, and debugging became absolute hell. Took three days to track down why certain DAGs weren't registering properly (turned out to be a scope issue with the function definitions - ugh).
Testing Your DAG Spells
Nobody told me about DAG testing when I started with Airflow. Wasted so much time deploying broken DAGs that failed at 3am and ruined my sleep schedule. Don't be me.
Basic testing looks something like this:
import pytest
from airflow.models import DagBag
def test_dag_loaded():
"""Test that the DAG is loaded correctly"""
dagbag = DagBag()
dag = dagbag.get_dag(dag_id='my_first_spell')
assert dagbag.import_errors == {}
assert dag is not None
assert len(dag.tasks) == 3
def test_dag_structure():
"""Test that the DAG structure is correct"""
dagbag = DagBag()
dag = dagbag.get_dag(dag_id='my_first_spell')
extract_task = dag.get_task('extract_task')
transform_task = dag.get_task('transform_task')
load_task = dag.get_task('load_task')
downstream_tasks = extract_task.downstream_list
assert transform_task in downstream_tasks
assert load_task not in downstream_tasks
downstream_tasks = transform_task.downstream_list
assert load_task in downstream_tasks
import pytest
from airflow.models import DagBag
def test_dag_loaded():
"""Test that the DAG is loaded correctly"""
dagbag = DagBag()
dag = dagbag.get_dag(dag_id='my_first_spell')
assert dagbag.import_errors == {}
assert dag is not None
assert len(dag.tasks) == 3
def test_dag_structure():
"""Test that the DAG structure is correct"""
dagbag = DagBag()
dag = dagbag.get_dag(dag_id='my_first_spell')
extract_task = dag.get_task('extract_task')
transform_task = dag.get_task('transform_task')
load_task = dag.get_task('load_task')
downstream_tasks = extract_task.downstream_list
assert transform_task in downstream_tasks
assert load_task not in downstream_tasks
downstream_tasks = transform_task.downstream_list
assert load_task in downstream_tasksThen just run it with pytest:
BTW, we ended up setting up a pre-commit hook to run these tests automatically. Saved us countless headaches, especially after that one time when a well-meaning intern accidentally flipped a dependency and created a circular reference. The poor kid was mortified, but at least our tests caught it before it hit prod.
Sharing Data Between Tasks
OK so passing data between tasks... XCom is your friend here, but also potentially your worst enemy.
def push_data(**context):
data = {"spell_power": 9000}
context['ti'].xcom_push(key='spell_stats', value=data)
def pull_data(**context):
data = context['ti'].xcom_pull(task_ids='push_task', key='spell_stats')
print(f"Spell power: {data['spell_power']}")
def push_data(**context):
data = {"spell_power": 9000}
context['ti'].xcom_push(key='spell_stats', value=data)
def pull_data(**context):
data = context['ti'].xcom_pull(task_ids='push_task', key='spell_stats')
print(f"Spell power: {data['spell_power']}")XCom is great for small bits of data, but we learned the hard way not to use it for anything substantial. Tried to pass a 50MB JSON through XCom once and basically brought our metadata DB to its knees. Had to explain to my boss why all our pipelines were suddenly failing... not my finest moment.
For bigger stuff, just dump it in S3 or a database and pass the reference. Your ops team will thank you. Or at least not hunt you down with pitchforks.
Machine Learning Workflows in Airflow
Building End-to-End ML Pipelines

I've been trying to streamline our ML processes for months, and honestly, Airflow has been a lifesaver. After trying about five different orchestration tools (and hating most of them), here's the approach that actually worked for us.
This example builds a complete ML pipeline for the Titanic dataset. Nothing fancy - just the classics:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import pandas as pd
import pickle
import os
default_args = {
'owner': 'ml_wizard',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'titanic_ml_pipeline',
default_args=default_args,
description='Train and deploy a Titanic survival model',
schedule_interval='@weekly',
catchup=False
)
data_path = '/opt/airflow/data/titanic.csv'
model_path = '/opt/airflow/models/titanic_model.pkl'
model_metrics_path = '/opt/airflow/models/titanic_metrics.json'
def extract_and_prepare_data():
"""Fetch and prepare the Titanic dataset"""
df = pd.read_csv(data_path)
df['Sex'] = df['Sex'].map({'female': 0, 'male': 1})
df['Age'].fillna(df['Age'].median(), inplace=True)
df['Embarked'].fillna(df['Embarked'].mode()[0], inplace=True)
df['FamilySize'] = df['SibSp'] + df['Parch'] + 1
df['IsAlone'] = (df['FamilySize'] == 1).astype(int)
df['Embarked'] = df['Embarked'].map({'S': 0, 'C': 1, 'Q': 2})
features = ['Pclass', 'Sex', 'Age', 'Fare', 'FamilySize', 'IsAlone', 'Embarked']
X = df[features]
y = df['Survived']
processed_data = {
'X': X.to_dict('records'),
'y': y.tolist(),
'features': features
}
return processed_data
def train_model(**context):
"""Train a machine learning model"""
data = context['ti'].xcom_pull(task_ids='prepare_data')
X = pd.DataFrame(data['X'])
y = pd.Series(data['y'])
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)
from sklearn.ensemble import RandomForestClassifier
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
os.makedirs(os.path.dirname(model_path), exist_ok=True)
with open(model_path, 'wb') as f:
pickle.dump(model, f)
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
y_pred = model.predict(X_test)
metrics = {
'accuracy': accuracy_score(y_test, y_pred),
'precision': precision_score(y_test, y_pred),
'recall': recall_score(y_test, y_pred),
'f1': f1_score(y_test, y_pred)
}
import json
os.makedirs(os.path.dirname(model_metrics_path), exist_ok=True)
with open(model_metrics_path, 'w') as f:
json.dump(metrics, f)
return metrics
def evaluate_model_performance(**context):
"""Decide whether to deploy the model based on performance"""
metrics = context['ti'].xcom_pull(task_ids='train_model')
if metrics['accuracy'] > 0.8:
return "Model performance acceptable, proceed with deployment"
else:
raise ValueError(f"Model accuracy {metrics['accuracy']} below threshold (0.8)")
def deploy_model(**context):
"""Deploy the model to a serving environment"""
print("Model successfully deployed!")
metrics = context['ti'].xcom_pull(task_ids='train_model')
print(f"Model metrics: {metrics}")
return "Model deployment completed"
prepare_data = PythonOperator(
task_id='prepare_data',
python_callable=extract_and_prepare_data,
dag=dag
)
train_model_task = PythonOperator(
task_id='train_model',
python_callable=train_model,
dag=dag
)
evaluate_model = PythonOperator(
task_id='evaluate_model',
python_callable=evaluate_model_performance,
dag=dag
)
deploy_model_task = PythonOperator(
task_id='deploy_model',
python_callable=deploy_model,
dag=dag
)
prepare_data >> train_model_task >> evaluate_model >> deploy_model_taskfrom airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import pandas as pd
import pickle
import os
default_args = {
'owner': 'ml_wizard',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'titanic_ml_pipeline',
default_args=default_args,
description='Train and deploy a Titanic survival model',
schedule_interval='@weekly',
catchup=False
)
data_path = '/opt/airflow/data/titanic.csv'
model_path = '/opt/airflow/models/titanic_model.pkl'
model_metrics_path = '/opt/airflow/models/titanic_metrics.json'
def extract_and_prepare_data():
"""Fetch and prepare the Titanic dataset"""
df = pd.read_csv(data_path)
df['Sex'] = df['Sex'].map({'female': 0, 'male': 1})
df['Age'].fillna(df['Age'].median(), inplace=True)
df['Embarked'].fillna(df['Embarked'].mode()[0], inplace=True)
df['FamilySize'] = df['SibSp'] + df['Parch'] + 1
df['IsAlone'] = (df['FamilySize'] == 1).astype(int)
df['Embarked'] = df['Embarked'].map({'S': 0, 'C': 1, 'Q': 2})
features = ['Pclass', 'Sex', 'Age', 'Fare', 'FamilySize', 'IsAlone', 'Embarked']
X = df[features]
y = df['Survived']
processed_data = {
'X': X.to_dict('records'),
'y': y.tolist(),
'features': features
}
return processed_data
def train_model(**context):
"""Train a machine learning model"""
data = context['ti'].xcom_pull(task_ids='prepare_data')
X = pd.DataFrame(data['X'])
y = pd.Series(data['y'])
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)
from sklearn.ensemble import RandomForestClassifier
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
os.makedirs(os.path.dirname(model_path), exist_ok=True)
with open(model_path, 'wb') as f:
pickle.dump(model, f)
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
y_pred = model.predict(X_test)
metrics = {
'accuracy': accuracy_score(y_test, y_pred),
'precision': precision_score(y_test, y_pred),
'recall': recall_score(y_test, y_pred),
'f1': f1_score(y_test, y_pred)
}
import json
os.makedirs(os.path.dirname(model_metrics_path), exist_ok=True)
with open(model_metrics_path, 'w') as f:
json.dump(metrics, f)
return metrics
def evaluate_model_performance(**context):
"""Decide whether to deploy the model based on performance"""
metrics = context['ti'].xcom_pull(task_ids='train_model')
if metrics['accuracy'] > 0.8:
return "Model performance acceptable, proceed with deployment"
else:
raise ValueError(f"Model accuracy {metrics['accuracy']} below threshold (0.8)")
def deploy_model(**context):
"""Deploy the model to a serving environment"""
print("Model successfully deployed!")
metrics = context['ti'].xcom_pull(task_ids='train_model')
print(f"Model metrics: {metrics}")
return "Model deployment completed"
prepare_data = PythonOperator(
task_id='prepare_data',
python_callable=extract_and_prepare_data,
dag=dag
)
train_model_task = PythonOperator(
task_id='train_model',
python_callable=train_model,
dag=dag
)
evaluate_model = PythonOperator(
task_id='evaluate_model',
python_callable=evaluate_model_performance,
dag=dag
)
deploy_model_task = PythonOperator(
task_id='deploy_model',
python_callable=deploy_model,
dag=dag
)
prepare_data >> train_model_task >> evaluate_model >> deploy_model_taskLook, I know this isn't the fanciest setup, but it works. We had to rebuild our pipeline three times before landing on this structure. The first version didn't use XComs properly and data kept disappearing between tasks (ugh). Then we tried storing intermediate results in Redis, which was a spectacular fail when we ran out of memory during a training run.
Something that isn't obvious from this example - you really need to handle permissions properly. I spent an entire Thursday debugging why our models weren't saving, only to discover the container was running as the wrong user. Still mad about that one.
Integrating with ML Platforms
For bigger projects, we usually connect Airflow with specialized ML platforms. MLflow has been decent for us:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import mlflow
import mlflow.sklearn
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import pandas as pd
default_args = {
'owner': 'ml_wizard',
'start_date': datetime(2023, 1, 1),
}
dag = DAG(
'mlflow_integration',
default_args=default_args,
schedule_interval='@daily',
catchup=False
)
def train_with_mlflow():
mlflow.set_tracking_uri("<http://mlflow-server:5000>")
with mlflow.start_run(run_name="airflow_triggered_run"):
df = pd.read_csv('/opt/airflow/data/wine-quality.csv')
X = df.drop('quality', axis=1)
y = df['quality']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
n_estimators = 100
max_depth = 10
mlflow.log_param("n_estimators", n_estimators)
mlflow.log_param("max_depth", max_depth)
model = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
mlflow.log_metric("accuracy", accuracy)
mlflow.sklearn.log_model(model, "random_forest_model")
return f"Run completed with accuracy: {accuracy}"
mlflow_task = PythonOperator(
task_id='train_with_mlflow',
python_callable=train_with_mlflow,
dag=dag
)
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import mlflow
import mlflow.sklearn
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import pandas as pd
default_args = {
'owner': 'ml_wizard',
'start_date': datetime(2023, 1, 1),
}
dag = DAG(
'mlflow_integration',
default_args=default_args,
schedule_interval='@daily',
catchup=False
)
def train_with_mlflow():
mlflow.set_tracking_uri("<http://mlflow-server:5000>")
with mlflow.start_run(run_name="airflow_triggered_run"):
df = pd.read_csv('/opt/airflow/data/wine-quality.csv')
X = df.drop('quality', axis=1)
y = df['quality']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
n_estimators = 100
max_depth = 10
mlflow.log_param("n_estimators", n_estimators)
mlflow.log_param("max_depth", max_depth)
model = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
mlflow.log_metric("accuracy", accuracy)
mlflow.sklearn.log_model(model, "random_forest_model")
return f"Run completed with accuracy: {accuracy}"
mlflow_task = PythonOperator(
task_id='train_with_mlflow',
python_callable=train_with_mlflow,
dag=dag
)I have a love-hate relationship with MLflow. On one hand, it's saved us from our own terrible model versioning system (which was basically renamed pickle files - don't judge). On the other hand, its UI freezes all the time when we have too many runs. We've also had issues with experiment permissions when new team members join.
The experiment tracking is worth the headache though. Before this, Sarah would literally email model metrics to everyone. Dark times.
Handling Model Deployment with Airflow
We've been using SageMaker for some deployments. It's... fine. More expensive than we'd like, but it beats maintaining our own prediction servers.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.sagemaker import SageMakerCreateModelOperator
from datetime import datetime
dag = DAG(
'sagemaker_deployment',
start_date=datetime(2023, 1, 1),
schedule_interval=None
)
create_model = SageMakerCreateModelOperator(
task_id='create_sagemaker_model',
config={
'ModelName': 'airflow-deployed-model',
'PrimaryContainer': {
'Image': '123456789012.dkr.ecr.us-west-2.amazonaws.com/my-custom-image:latest',
'ModelDataUrl': 's3://my-bucket/model.tar.gz'
},
'ExecutionRoleArn': 'arn:aws:iam::123456789012:role/service-role/AmazonSageMaker-ExecutionRole'
},
aws_conn_id='aws_default',
dag=dag
)
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.sagemaker import SageMakerCreateModelOperator
from datetime import datetime
dag = DAG(
'sagemaker_deployment',
start_date=datetime(2023, 1, 1),
schedule_interval=None
)
create_model = SageMakerCreateModelOperator(
task_id='create_sagemaker_model',
config={
'ModelName': 'airflow-deployed-model',
'PrimaryContainer': {
'Image': '123456789012.dkr.ecr.us-west-2.amazonaws.com/my-custom-image:latest',
'ModelDataUrl': 's3://my-bucket/model.tar.gz'
},
'ExecutionRoleArn': 'arn:aws:iam::123456789012:role/service-role/AmazonSageMaker-ExecutionRole'
},
aws_conn_id='aws_default',
dag=dag
)
This example is simplified. In practice, we've had to add retry logic for when AWS decides to have connectivity issues (happens more than you'd think). Also, watch your CloudWatch logs like a hawk - we once had a model silently failing because of a mismatched tensor dimension that only happened with specific inputs.
Tip from painful experience: set up proper monitoring from day one. We once had a model in production for three weeks before realizing it was returning garbage predictions for a specific customer segment. Cost us a client. Now we log prediction distributions and check for drift daily.
Integrating Airflow with Other Tools

Honestly, I was super hesitant about Airflow integration for the longest time. The docs make it look straightforward, but... yeah. Not always the case in real life. I've got a few battle-tested examples that might help you avoid the headaches I went through.
Slack Notifications: Stay Informed About Your Spells
Slack notifications saved our on-call rotation last year. Our team kept missing email alerts until our VP started asking uncomfortable questions about SLAs.
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
def success_notification(context):
success_alert = SlackWebhookOperator(
task_id='slack_success',
http_conn_id=slack_webhook_token,
message=f"DAG {context['dag'].dag_id} completed successfully!",
channel="#airflow-alerts",
username="Airflow Wizard"
)
return success_alert.execute(context=context)
def failure_notification(context):
failure_alert = SlackWebhookOperator(
task_id='slack_fail',
http_conn_id=slack_webhook_token,
message=f"DAG {context['dag'].dag_id} failed on task {context['task'].task_id}",
channel="#airflow-alerts",
username="Airflow Wizard"
)
return failure_alert.execute(context=context)
dag = DAG(
'notify_example',
default_args=default_args,
on_success_callback=success_notification,
on_failure_callback=failure_notification
)from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
def success_notification(context):
success_alert = SlackWebhookOperator(
task_id='slack_success',
http_conn_id=slack_webhook_token,
message=f"DAG {context['dag'].dag_id} completed successfully!",
channel="#airflow-alerts",
username="Airflow Wizard"
)
return success_alert.execute(context=context)
def failure_notification(context):
failure_alert = SlackWebhookOperator(
task_id='slack_fail',
http_conn_id=slack_webhook_token,
message=f"DAG {context['dag'].dag_id} failed on task {context['task'].task_id}",
channel="#airflow-alerts",
username="Airflow Wizard"
)
return failure_alert.execute(context=context)
dag = DAG(
'notify_example',
default_args=default_args,
on_success_callback=success_notification,
on_failure_callback=failure_notification
)We actually changed the username to something fun because nobody was reading the alerts. Whatever works, right?
Spark Integration: Handling Big Data Spells
For big data processing, Spark is the obvious choice, though the config can be a total pain.
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
spark_job = SparkSubmitOperator(
task_id='process_big_data',
application='/path/to/spark_job.py',
conn_id='spark_default',
application_args=['--input', 's3://bucket/input', '--output', 's3://bucket/output'],
conf={'spark.executor.memory': '4g'},
dag=dag
)from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
spark_job = SparkSubmitOperator(
task_id='process_big_data',
application='/path/to/spark_job.py',
conn_id='spark_default',
application_args=['--input', 's3://bucket/input', '--output', 's3://bucket/output'],
conf={'spark.executor.memory': '4g'},
dag=dag
)That spark.executor.memory setting is worth double-checking. First time I deployed this, our metrics job crashed spectacularly for four days straight because we underestimated how much memory it needed. Good times.
dbt Integration: Data Transformation Magic
dbt + Airflow is pretty sweet. Not gonna lie, it took me forever to get the hang of dbt, but now I'm a convert.
from airflow.operators.bash import BashOperator
dbt_run = BashOperator(
task_id='dbt_run',
bash_command='cd /path/to/dbt/project && dbt run --profiles-dir .',
dag=dag
)
dbt_test = BashOperator(
task_id='dbt_test',
bash_command='cd /path/to/dbt/project && dbt test --profiles-dir .',
dag=dag
)
extract_data >> dbt_run >> dbt_test >> load_datafrom airflow.operators.bash import BashOperator
dbt_run = BashOperator(
task_id='dbt_run',
bash_command='cd /path/to/dbt/project && dbt run --profiles-dir .',
dag=dag
)
dbt_test = BashOperator(
task_id='dbt_test',
bash_command='cd /path/to/dbt/project && dbt test --profiles-dir .',
dag=dag
)
extract_data >> dbt_run >> dbt_test >> load_dataOur BI team loves this setup, although they still complain about our naming conventions. Can't please everyone.
Developing Custom Components
Sometimes you just gotta roll your own operators. This usually happens around 11pm when you're trying to integrate with some obscure internal system that nobody remembers how to use.
from airflow.models.baseoperator import BaseOperator
class MyCustomOperator(BaseOperator):
"""
A custom operator that does something special
:param custom_param: A parameter specific to this operator
:type custom_param: str
"""
def __init__(self, custom_param, **kwargs):
super().__init__(**kwargs)
self.custom_param = custom_param
def execute(self, context):
"""Execute the custom logic"""
self.log.info(f"Executing with custom_param: {self.custom_param}")
result = f"Processed {self.custom_param}"
return resultfrom airflow.models.baseoperator import BaseOperator
class MyCustomOperator(BaseOperator):
"""
A custom operator that does something special
:param custom_param: A parameter specific to this operator
:type custom_param: str
"""
def __init__(self, custom_param, **kwargs):
super().__init__(**kwargs)
self.custom_param = custom_param
def execute(self, context):
"""Execute the custom logic"""
self.log.info(f"Executing with custom_param: {self.custom_param}")
result = f"Processed {self.custom_param}"
return resultMy first attempt at a custom operator crashed so badly we had to restart the whole scheduler. Still haven't lived that one down.
Creating custom hooks is another option when you need to connect to weird systems:
from airflow.hooks.base import BaseHook
class MyServiceHook(BaseHook):
"""Hook for connecting to MyService"""
def __init__(self, my_service_conn_id='my_service_default'):
self.conn_id = my_service_conn_id
self.connection = self.get_connection(my_service_conn_id)
self.host = self.connection.host
self.port = self.connection.port
self.login = self.connection.login
self.password = self.connection.password
def get_conn(self):
"""Connect to the service and return the connection object"""
import myservice_client
client = myservice_client.Client(
host=self.host,
port=self.port,
username=self.login,
password=self.password
)
return client
def run_command(self, command):
"""Run a command on the service"""
client = self.get_conn()
return client.execute(command)from airflow.hooks.base import BaseHook
class MyServiceHook(BaseHook):
"""Hook for connecting to MyService"""
def __init__(self, my_service_conn_id='my_service_default'):
self.conn_id = my_service_conn_id
self.connection = self.get_connection(my_service_conn_id)
self.host = self.connection.host
self.port = self.connection.port
self.login = self.connection.login
self.password = self.connection.password
def get_conn(self):
"""Connect to the service and return the connection object"""
import myservice_client
client = myservice_client.Client(
host=self.host,
port=self.port,
username=self.login,
password=self.password
)
return client
def run_command(self, command):
"""Run a command on the service"""
client = self.get_conn()
return client.execute(command)Honestly, I barely use custom hooks anymore. Most of what I need is already in providers, and maintaining the extra code is a hassle. But when you need 'em, you need 'em.
Monitoring, Scaling, and Operations
Monitoring Your Workflow Spells

God, monitoring Airflow is such a pain sometimes. After our third pipeline failure last month (that nobody noticed until the VP of Sales couldn't get his precious dashboard), I finally got around to setting up proper monitoring.
The most basic option is just using the Airflow UI. It works fine for small setups - that's what we used for like a year before things got messy. But honestly, relying on the UI alone is a recipe for 3 AM phone calls. Trust me on this one.
For actual grown-up monitoring:
Hook up metrics to something like StatsD or Prometheus. We use Datadog because our DevOps team was already using it for everything else. The integration was surprisingly painless.
Set up decent logging. We dumped everything into Elasticsearch which was total overkill for our use case, but hey, the infrastructure team had already built it so...
¯\\\(ツ)\/¯
Here's what our metrics config looks like:
[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
Just make sure your statsd host is actually running before enabling this. Made that mistake once and spent an entire afternoon wondering why metrics weren't showing up. Facepalm.
Performance Optimization for Large-Scale Pipelines
Our Airflow instance started choking once we hit about 60 DAGs with a bunch of tasks each. Database connections were maxing out, tasks were queueing forever... it was a mess. Had to do a bunch of performance tweaking:
Database stuff first:
Then we switched executors. Started with SequentialExecutor (lol, don't do this in prod), then LocalExecutor, and finally went to CeleryExecutor when things got serious:
Oh, and queuing! This was a game-changer for our finance data team's end-of-month jobs that would crush everything else:
task = PythonOperator(
task_id='process_finance_data',
python_callable=process_data,
queue='finance_priority',
dag=dag
)task = PythonOperator(
task_id='process_finance_data',
python_callable=process_data,
queue='finance_priority',
dag=dag
)Sarah in data engineering insisted we implement caching for some common dataset fetching. Rolled my eyes at first, but she was right - cut our pipeline times by 40%:
def get_customer_data(ds, **context):
from airflow.models import Variable
import json
from datetime import datetime, timedelta
cache_key = f"customer_data_{ds}"
try:
cached = Variable.get(cache_key, deserialize_json=True)
cache_date = datetime.fromtimestamp(cached.get('timestamp', 0))
if datetime.now() - cache_date < timedelta(hours=6):
print(f"Using cached data from {cache_date}")
return cached['data']
except (KeyError, ValueError, TypeError) as e:
print(f"Cache retrieval error: {e}")
print("Cache miss - fetching fresh data")
data = run_expensive_query()
try:
Variable.set(
cache_key,
json.dumps({"data": data, "timestamp": datetime.now().timestamp()})
)
except (TypeError, ValueError) as e:
print(f"Failed to cache data: {e}")
return datadef get_customer_data(ds, **context):
from airflow.models import Variable
import json
from datetime import datetime, timedelta
cache_key = f"customer_data_{ds}"
try:
cached = Variable.get(cache_key, deserialize_json=True)
cache_date = datetime.fromtimestamp(cached.get('timestamp', 0))
if datetime.now() - cache_date < timedelta(hours=6):
print(f"Using cached data from {cache_date}")
return cached['data']
except (KeyError, ValueError, TypeError) as e:
print(f"Cache retrieval error: {e}")
print("Cache miss - fetching fresh data")
data = run_expensive_query()
try:
Variable.set(
cache_key,
json.dumps({"data": data, "timestamp": datetime.now().timestamp()})
)
except (TypeError, ValueError) as e:
print(f"Failed to cache data: {e}")
return dataNot gonna lie, we had weird cache invalidation bugs for weeks. Cache invalidation and naming things, amirite?
Dealing with Long-Running Tasks
Long-running tasks are THE WORST in Airflow. We had this one ETL process that ran for 9+ hours sometimes.
Initially we just cranked up the timeouts:
task = PythonOperator(
task_id='etl_from_hell',
python_callable=extract_transform_load,
execution_timeout=timedelta(hours=12),
dag=dag
)task = PythonOperator(
task_id='etl_from_hell',
python_callable=extract_transform_load,
execution_timeout=timedelta(hours=12),
dag=dag
)But that's just a band-aid. Better approach is to make the task async. Kick it off, then check on it later:
trigger_job = PythonOperator(
task_id='trigger_etl_job',
python_callable=start_process_in_other_system,
dag=dag
)
check_job = ExternalTaskSensor(
task_id='check_etl_status',
external_dag_id='etl_status_checker',
external_task_id='is_complete',
mode='reschedule',
poke_interval=600,
timeout=60 * 60 * 24,
dag=dag
)
process_results = PythonOperator(
task_id='process_etl_results',
python_callable=do_something_with_results,
dag=dag
)
trigger_job >> check_job >> process_results
trigger_job = PythonOperator(
task_id='trigger_etl_job',
python_callable=start_process_in_other_system,
dag=dag
)
check_job = ExternalTaskSensor(
task_id='check_etl_status',
external_dag_id='etl_status_checker',
external_task_id='is_complete',
mode='reschedule',
poke_interval=600,
timeout=60 * 60 * 24,
dag=dag
)
process_results = PythonOperator(
task_id='process_etl_results',
python_callable=do_something_with_results,
dag=dag
)
trigger_job >> check_job >> process_resultsStill feel like there's a better way, but this works OK.
Lessons from 6 Years of Airflow Wrangling
After 3 years of Airflow wrangling, here’s my “I’ve made all these mistakes so you don’t have to” list:
🧠 XComs for Large Data
Just don’t.
We tried passing a 50MB dataset between tasks and nearly took down the scheduler. XComs go in the metadata database, people! Now we just pass file paths.
def extract_data(**context):
try:
data = get_big_data()
filepath = f"/shared/tmp/data_{context['ts_nodash']}.parquet"
data.to_parquet(filepath)
return filepath
except Exception as e:
raise AirflowException(f"Failed to extract data: {str(e)}")def extract_data(**context):
try:
data = get_big_data()
filepath = f"/shared/tmp/data_{context['ts_nodash']}.parquet"
data.to_parquet(filepath)
return filepath
except Exception as e:
raise AirflowException(f"Failed to extract data: {str(e)}")🧩 Task Granularity
We went too fine-grained once, with like 50 tiny tasks. DAG looked pretty but was impossible to debug. Now we aim for medium-sized tasks that each do one logical thing.
⚙️ Resource Management with Pools
Adding pools saved us from ourselves.
task1 = PythonOperator(
task_id='cpu_intensive_task',
python_callable=some_function,
pool='cpu_bound_pool',
pool_slots=2,
)
task2 = PythonOperator(
task_id='another_cpu_task',
python_callable=another_function,
pool='cpu_bound_pool',
)
# Tasks that use the same pool will be limited by the pool's capacity
task1 >> task2task1 = PythonOperator(
task_id='cpu_intensive_task',
python_callable=some_function,
pool='cpu_bound_pool',
pool_slots=2,
)
task2 = PythonOperator(
task_id='another_cpu_task',
python_callable=another_function,
pool='cpu_bound_pool',
)
# Tasks that use the same pool will be limited by the pool's capacity
task1 >> task2💾 File-Based Handoff
We switched to passing file paths instead of actual data. Here’s how the downstream task handles it:
def process_data(filepath):
try:
data = pd.read_parquet(filepath)
# Do stuff
# import os
# os.remove(filepath) # Uncomment if you want to clean up
except Exception as e:
raise AirflowException(f"Failed to process data: {str(e)}")def process_data(filepath):
try:
data = pd.read_parquet(filepath)
# Do stuff
# import os
# os.remove(filepath) # Uncomment if you want to clean up
except Exception as e:
raise AirflowException(f"Failed to process data: {str(e)}")🛑 Before Pools = Chaos
Without pools, these would all run at once and crash our database:
task = PythonOperator(
task_id='db_intensive_task',
python_callable=query_function,
pool='database_access',
pool_slots=1,
dag=dag
)task = PythonOperator(
task_id='db_intensive_task',
python_callable=query_function,
pool='database_access',
pool_slots=1,
dag=dag
)Our heavyweight ETL uses more slots:
from airflow.operators.python import PythonOperator
big_task = PythonOperator(
task_id='massive_query',
python_callable=big_query_function,
pool='database_access',
pool_slots=5,
dag=dag
)from airflow.operators.python import PythonOperator
big_task = PythonOperator(
task_id='massive_query',
python_callable=big_query_function,
pool='database_access',
pool_slots=5,
dag=dag
)Development workflow
OK this still sucks. We use a dev Airflow instance, but the test cycle is so slow. Our workflow is basically:
Write DAG
Deploy to dev
Fix the 6 bugs
Deploy to prod
Fix the 2 additional bugs that only show up in prod
If anyone has a better solution, I'm all ears.
Dependencies
Our first prod issue was missing libraries. Classic. We switched to custom Docker images after that nightmare:
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
task = KubernetesPodOperator(
task_id='pandas_task',
namespace='airflow',
image='our-registry.com/data-science-env:2.1.3',
cmds=['python', '/scripts/process.py'],
dag=dag
)from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
task = KubernetesPodOperator(
task_id='pandas_task',
namespace='airflow',
image='our-registry.com/data-science-env:2.1.3',
cmds=['python', '/scripts/process.py'],
dag=dag
)Probably the best way to go if you're using K8s. We're still battling over who maintains these images though... good times.
Conclusion
Airflow has been around forever. When you get to Airbnb size, you’ll need a team of 14+ data engineers just to support and maintain Airflow for the rest of the organizations. Have fun!