- Published on
AI Workflow Orchestration — Temporal, Prefect, and Airflow for AI Pipelines
- Authors

- Name
- Sanjeev Sharma
- @webcoderspeed1
Introduction
Long-running AI tasks need orchestration. Workflows that call LLMs, process data, and make decisions must handle failures gracefully, log intermediate results, and enable rollback. Traditional job schedulers (cron) and simple queues (Celery) break at scale. Modern orchestration platforms like Temporal, Prefect, and Airflow solve this. This guide covers production patterns.
- Why AI Workflows Need Orchestration
- Temporal for Durable AI Workflows
- Prefect for Data + AI Pipelines
- Temporal: Durable Workflows With Retry Policies
- Airflow for Batch DAGs
- Task Dependency Graphs
- Parallel Execution of AI Tasks
- Human Approval Steps
- Workflow Versioning
- Cost Tracking Per Workflow Run
- Checklist
- Conclusion
Why AI Workflows Need Orchestration
AI operations are fundamentally different:
# Problem 1: Long-running, potentially failing tasks
def llm_inference_task():
# This might take 10 seconds or hang for 2 minutes
response = openai.ChatCompletion.create(
model="gpt-4-turbo",
messages=[{"role": "user", "content": "Complex task"}],
timeout=120,
)
return response
# Problem 2: Expensive retries
# Cost: $0.03 per 1K tokens = $15 per failure
# Need smart retry with exponential backoff
# Problem 3: Intermediate state
# If task fails halfway, need to resume from checkpoint
# Simple retry restarts from beginning (wasted cost)
# Problem 4: Complex dependencies
# Task A → Task B (depends on A) → Task C (depends on A and B)
# C should run after both finish, not before
# Problem 5: Human approval steps
# Generate content → Human review → Publish
# Automation shouldn't publish without approval
# Solution: Workflow orchestration
# - Captures intermediate state
# - Enables intelligent retries
# - Manages dependencies
# - Tracks cost per workflow run
# - Allows human intervention
class WorkflowOrchestrator:
"""Base class for workflow orchestration"""
def __init__(self):
self.state = {}
self.cost_tracking = {}
def execute_task_with_retry(
self,
task_fn,
task_name: str,
max_retries: int = 3,
backoff_factor: float = 2.0,
):
"""Execute with intelligent retry and cost tracking"""
for attempt in range(max_retries):
try:
result = task_fn()
self.state[task_name] = result
return result
except Exception as e:
if attempt == max_retries - 1:
raise
wait_time = 2 ** attempt * backoff_factor
print(f"Attempt {attempt + 1} failed, retrying in {wait_time}s")
time.sleep(wait_time)
Orchestration becomes critical at scale: >10 AI tasks/day or >$1000/month in API costs.
Temporal for Durable AI Workflows
Temporal is designed for long-running, reliable operations:
from temporalio import workflow, activity, Client
from datetime import timedelta
import openai
# Define activities (individual steps)
@activity.defn
async def call_llm(prompt: str) -> str:
"""Call LLM with timeout and cost tracking"""
client = openai.OpenAI(api_key="sk-...")
response = client.chat.completions.create(
model="gpt-4-turbo",
messages=[{"role": "user", "content": prompt}],
max_tokens=1000,
timeout=30,
)
cost = (
response.usage.prompt_tokens * 0.01 / 1000 +
response.usage.completion_tokens * 0.03 / 1000
)
return response.choices[0].message.content
@activity.defn
async def embed_text(text: str) -> list[float]:
"""Embed using batch API for cost savings"""
client = openai.OpenAI(api_key="sk-...")
response = client.embeddings.create(
model="text-embedding-3-small",
input=text,
)
return response.data[0].embedding
@activity.defn
async def store_in_vector_db(
text_id: str,
embedding: list[float],
) -> bool:
"""Store embedding in vector database"""
# Store in Pinecone, Qdrant, etc.
index.upsert([
{
"id": text_id,
"values": embedding,
"metadata": {"stored_at": time.time()},
}
])
return True
# Define workflow (orchestration logic)
@workflow.defn
async def rag_indexing_workflow(documents: list[dict]) -> dict:
"""Index documents for RAG"""
results = {
"indexed": [],
"failed": [],
"total_cost": 0.0,
}
for i, doc in enumerate(documents):
try:
# Activity 1: Extract insights with LLM
summary = await workflow.execute_activity(
call_llm,
args=[f"Summarize: {doc['content']}"],
start_to_close_timeout=timedelta(seconds=60),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=60),
maximum_attempts=3,
backoff_coefficient=2.0,
),
)
# Activity 2: Embed text
embedding = await workflow.execute_activity(
embed_text,
args=[summary],
start_to_close_timeout=timedelta(seconds=30),
)
# Activity 3: Store in vector DB
stored = await workflow.execute_activity(
store_in_vector_db,
args=[doc["id"], embedding],
start_to_close_timeout=timedelta(seconds=10),
)
if stored:
results["indexed"].append(doc["id"])
except Exception as e:
results["failed"].append({
"id": doc["id"],
"error": str(e),
})
return results
# Run workflow
async def run_workflow():
client = await Client.connect("localhost:7233")
documents = [
{"id": "doc-1", "content": "AI infrastructure guide"},
{"id": "doc-2", "content": "Vector databases"},
]
result = await client.execute_workflow(
rag_indexing_workflow,
args=[documents],
id="rag-workflow-001",
task_queue="rag_tasks",
)
print(f"Workflow result: {result}")
# Key benefits:
# 1. Automatic retry with exponential backoff
# 2. Survives server restart (durability)
# 3. Inspect execution history
# 4. Pause/resume workflows
# 5. Cost tracking per activity
Temporal excels at:
- Long-running workflows (>1 hour)
- Complex retry logic
- Distributed tracing
- Workflow versioning
Prefect for Data + AI Pipelines
Prefect focuses on data pipelines with AI steps:
from prefect import flow, task, get_run_logger
from prefect.futures import PrefectFuture
import openai
@task(name="Load Documents", retries=2, retry_delay_seconds=10)
def load_documents(source_path: str) -> list[dict]:
"""Load documents from source"""
logger = get_run_logger()
logger.info(f"Loading documents from {source_path}")
documents = [
{"id": f"doc-{i}", "content": f"Content {i}"}
for i in range(100)
]
return documents
@task(name="Classify with LLM", retries=3)
async def classify_document(doc: dict) -> dict:
"""Classify document using LLM"""
client = openai.OpenAI(api_key="sk-...")
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{
"role": "system",
"content": "Classify the document category",
},
{"role": "user", "content": doc["content"]},
],
max_tokens=10,
)
return {
"id": doc["id"],
"category": response.choices[0].message.content,
"cost": (
response.usage.prompt_tokens * 0.0005 / 1000 +
response.usage.completion_tokens * 0.0015 / 1000
),
}
@task(name="Save Results")
def save_results(classifications: list[dict]) -> int:
"""Save classifications to database"""
saved_count = 0
for classification in classifications:
# Save to database
saved_count += 1
return saved_count
@flow(name="Document Classification Flow", retries=1)
def classify_documents_flow():
"""End-to-end document classification"""
# Load data
documents = load_documents("s3://docs/2026-03")
# Classify in parallel
classifications = []
for doc in documents:
result = classify_document.submit(doc)
classifications.append(result)
# Wait for all and collect results
completed = [c.result() for c in classifications]
# Calculate total cost
total_cost = sum(c["cost"] for c in completed)
print(f"Total cost: ${total_cost:.2f}")
# Save results
saved = save_results(completed)
return {
"classified": len(completed),
"saved": saved,
"cost": total_cost,
}
# Run flow
if __name__ == "__main__":
result = classify_documents_flow()
print(f"Flow result: {result}")
Prefect excels at:
- Data transformation + AI tasks
- Parallel execution
- Dynamic task graphs
- Rich monitoring UI
Temporal: Durable Workflows With Retry Policies
Temporal's strength is handling failures and recovery:
from temporalio import workflow, activity, RetryPolicy
from datetime import timedelta
import asyncio
@activity.defn
async def process_batch(batch_id: str) -> dict:
"""Process a batch of items (might fail mid-way)"""
import random
import time
processed = []
for i in range(100):
# Simulate work
time.sleep(0.1)
# Simulate occasional failures
if random.random() < 0.1: # 10% failure rate
raise Exception(f"Item {i} failed")
processed.append(i)
return {"batch_id": batch_id, "processed": processed}
@workflow.defn
async def batch_processing_workflow(batch_ids: list[str]) -> dict:
"""Process multiple batches with smart retry"""
results = {}
for batch_id in batch_ids:
try:
# Retry policy: exponential backoff
# - Start: 1 second
# - Max: 60 seconds
# - Max attempts: 5
# - Multiplier: 2.0 (1s, 2s, 4s, 8s, 16s)
result = await workflow.execute_activity(
process_batch,
args=[batch_id],
start_to_close_timeout=timedelta(minutes=5),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=60),
maximum_attempts=5,
backoff_coefficient=2.0,
),
)
results[batch_id] = result
except Exception as e:
# After retries exhausted, log and continue
results[batch_id] = {"error": str(e)}
return results
# Key advantage: Temporal retries preserve state
# If activity fails at item 50/100:
# - Retry starts from item 50 (if activity tracks progress)
# - Or retries entire batch (if stateless)
# - No lost computation
# With naive retry (queue-based):
# - Entire batch restarts
# - Wasted API calls
# - Could exceed rate limits
# With Temporal:
# - Checkpoints enable partial retry
# - Automatic backoff prevents rate-limit errors
# - Cost-efficient
Temporal's recovery semantics are critical for expensive AI tasks.
Airflow for Batch DAGs
Apache Airflow is best for batch data pipelines:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import openai
default_args = {
"owner": "ai-team",
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"nightly_embedding_pipeline",
default_args=default_args,
description="Embed documents nightly",
schedule_interval="0 2 * * *", # 2 AM daily
start_date=days_ago(1),
)
def extract_documents(**context):
"""Extract documents from warehouse"""
documents = [
{"id": f"doc-{i}", "content": f"Content {i}"}
for i in range(10000)
]
return {"documents": documents, "count": len(documents)}
def embed_documents(**context):
"""Embed documents using batch API"""
docs = context["task_instance"].xcom_pull(
task_ids="extract_documents",
)["documents"]
client = openai.OpenAI(api_key="sk-...")
embeddings = client.embeddings.create(
model="text-embedding-3-small",
input=[d["content"] for d in docs],
)
return {
"embedding_count": len(embeddings.data),
"cost": (len(docs) * 125 / 1_000_000) * 0.02, # Batch pricing
}
def store_embeddings(**context):
"""Store in vector database"""
embedding_count = context["task_instance"].xcom_pull(
task_ids="embed_documents",
)["embedding_count"]
# Store to Pinecone, Qdrant, etc.
print(f"Stored {embedding_count} embeddings")
return True
# Define DAG tasks
extract = PythonOperator(
task_id="extract_documents",
python_callable=extract_documents,
dag=dag,
)
embed = PythonOperator(
task_id="embed_documents",
python_callable=embed_documents,
dag=dag,
)
store = PythonOperator(
task_id="store_embeddings",
python_callable=store_embeddings,
dag=dag,
)
# Define dependencies
extract >> embed >> store
Airflow DAG visualization:
extract_documents → embed_documents → store_embeddings
Airflow excels at:
- Scheduled batch pipelines
- Complex task dependencies
- Visual DAG representation
- Rich monitoring
Task Dependency Graphs
Orchestrate complex workflows:
from prefect import flow, task
@task
def task_a() -> str:
return "A result"
@task
def task_b() -> str:
return "B result"
@task
def task_c(a_result: str, b_result: str) -> str:
return f"C depends on {a_result} and {b_result}"
@task
def task_d(c_result: str) -> str:
return f"D depends on {c_result}"
@flow
def complex_workflow():
a_result = task_a()
b_result = task_b()
c_result = task_c(a_result, b_result)
d_result = task_d(c_result)
return d_result
# Dependency graph:
# ┌── Task A ──┐
# │ │
# Flow ─────────→ Task C → Task D
# │ │
# └── Task B ──┘
# Execution:
# - A and B run in parallel
# - C waits for both A and B
# - D waits for C
Parallel execution reduces total runtime.
Parallel Execution of AI Tasks
Maximize GPU/CPU utilization:
from concurrent.futures import ThreadPoolExecutor, as_completed
import openai
class ParallelAIExecutor:
def __init__(self, max_workers: int = 10):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.client = openai.OpenAI(api_key="sk-...")
def classify_batch(self, items: list[str]) -> list[dict]:
"""Classify items in parallel"""
futures = {}
for i, item in enumerate(items):
future = self.executor.submit(self._classify_one, item)
futures[future] = i
results = [None] * len(items)
for future in as_completed(futures):
idx = futures[future]
try:
results[idx] = future.result()
except Exception as e:
results[idx] = {"error": str(e)}
return results
def _classify_one(self, item: str) -> dict:
"""Classify single item"""
response = self.client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{
"role": "system",
"content": "Classify into: positive/negative",
},
{"role": "user", "content": item},
],
max_tokens=5,
)
return {
"item": item,
"classification": response.choices[0].message.content,
"cost": (
response.usage.prompt_tokens * 0.0005 / 1000 +
response.usage.completion_tokens * 0.0015 / 1000
),
}
# Usage
executor = ParallelAIExecutor(max_workers=10)
items = ["Great product!"] * 1000
results = executor.classify_batch(items)
total_cost = sum(r["cost"] for r in results if "cost" in r)
print(f"Classified {len(results)} items for ${total_cost:.2f}")
Parallel execution: 10× throughput at same cost.
Human Approval Steps
Workflows that need human review:
from prefect import flow, task
import asyncio
@task
def generate_content(topic: str) -> str:
"""Generate content with LLM"""
client = openai.OpenAI(api_key="sk-...")
response = client.chat.completions.create(
model="gpt-4-turbo",
messages=[
{"role": "user", "content": f"Write article about {topic}"},
],
max_tokens=2000,
)
return response.choices[0].message.content
@task
def request_approval(content: str) -> dict:
"""Request human approval via webhook"""
import requests
# Send to approval system
response = requests.post(
"https://approval-service/request",
json={
"content": content,
"type": "article",
"workflow_id": context.flow_run_id,
},
)
return response.json()
@task
def publish_content(content: str) -> bool:
"""Publish to website"""
# Only called if approved
# Save to database/CMS
return True
@flow
def content_generation_with_approval():
# Generate
content = generate_content("AI Trends 2026")
# Wait for approval (can timeout)
approval = request_approval(content)
if approval["status"] == "approved":
published = publish_content(content)
return {"published": published}
else:
return {"rejected": True}
Approval gates prevent accidental publishing.
Workflow Versioning
Handle model/prompt updates safely:
@workflow.defn(name="document_indexing", version=1)
async def document_indexing_v1(doc: dict) -> dict:
"""Original version"""
embedding = await workflow.execute_activity(embed_text, args=[doc["content"]])
return {"id": doc["id"], "embedding": embedding}
@workflow.defn(name="document_indexing", version=2)
async def document_indexing_v2(doc: dict) -> dict:
"""Updated version: use better embedding model"""
embedding = await workflow.execute_activity(
embed_text_v2, # New activity
args=[doc["content"]],
)
return {"id": doc["id"], "embedding": embedding}
# In-flight workflows using v1 continue with v1
# New workflows use v2
# No compatibility breakage
# To migrate: replay workflows with new definition
Versioning allows safe model updates in production.
Cost Tracking Per Workflow Run
Monitor spending:
class WorkflowCostTracker:
def __init__(self):
self.costs = {}
def track_activity_cost(
self,
workflow_id: str,
activity_name: str,
cost_usd: float,
):
"""Track cost per activity"""
if workflow_id not in self.costs:
self.costs[workflow_id] = {}
if activity_name not in self.costs[workflow_id]:
self.costs[workflow_id][activity_name] = 0
self.costs[workflow_id][activity_name] += cost_usd
def get_workflow_cost(self, workflow_id: str) -> float:
"""Get total cost for workflow"""
return sum(self.costs.get(workflow_id, {}).values())
def set_budget_limit(self, workflow_id: str, limit_usd: float):
"""Set max spend and alert if exceeded"""
current = self.get_workflow_cost(workflow_id)
if current > limit_usd:
raise Exception(f"Workflow exceeded budget: ${current} > ${limit_usd}")
# Usage
tracker = WorkflowCostTracker()
# During workflow execution
tracker.track_activity_cost("workflow-001", "llm_inference", 0.50)
tracker.track_activity_cost("workflow-001", "embedding", 0.10)
tracker.track_activity_cost("workflow-001", "vector_db_store", 0.05)
total = tracker.get_workflow_cost("workflow-001")
print(f"Workflow-001 cost: ${total:.2f}")
# Alert if over budget
tracker.set_budget_limit("workflow-001", 1.00) # Max $1
Cost tracking prevents runaway spending.
Checklist
- Identify long-running AI tasks (>5 seconds)
- Choose platform: Temporal (reliability), Prefect (data+AI), Airflow (batch)
- Design retry policy (exponential backoff, max attempts)
- Implement cost tracking per workflow
- Set budget limits and alerts
- Design task dependencies
- Use parallel execution where possible
- Plan human approval gates for sensitive operations
- Document workflow versions and migration path
- Monitor workflow execution metrics
Conclusion
Workflow orchestration is essential for production AI systems. Temporal excels at long-running, reliable operations with sophisticated retry. Prefect handles data + AI seamlessly. Airflow dominates batch schedules. All enable cost tracking, dependency management, and failure recovery. Choose based on workload: reliability (Temporal), data transformation (Prefect), or batch ETL (Airflow). At >10K workflows/day, orchestration becomes mandatory for cost control and reliability.