Published on

AI Workflow Orchestration — Temporal, Prefect, and Airflow for AI Pipelines

Authors

Introduction

Long-running AI tasks need orchestration. Workflows that call LLMs, process data, and make decisions must handle failures gracefully, log intermediate results, and enable rollback. Traditional job schedulers (cron) and simple queues (Celery) break at scale. Modern orchestration platforms like Temporal, Prefect, and Airflow solve this. This guide covers production patterns.

Why AI Workflows Need Orchestration

AI operations are fundamentally different:

# Problem 1: Long-running, potentially failing tasks
def llm_inference_task():
    # This might take 10 seconds or hang for 2 minutes
    response = openai.ChatCompletion.create(
        model="gpt-4-turbo",
        messages=[{"role": "user", "content": "Complex task"}],
        timeout=120,
    )
    return response

# Problem 2: Expensive retries
# Cost: $0.03 per 1K tokens = $15 per failure
# Need smart retry with exponential backoff

# Problem 3: Intermediate state
# If task fails halfway, need to resume from checkpoint
# Simple retry restarts from beginning (wasted cost)

# Problem 4: Complex dependencies
# Task A → Task B (depends on A) → Task C (depends on A and B)
# C should run after both finish, not before

# Problem 5: Human approval steps
# Generate content → Human review → Publish
# Automation shouldn't publish without approval

# Solution: Workflow orchestration
# - Captures intermediate state
# - Enables intelligent retries
# - Manages dependencies
# - Tracks cost per workflow run
# - Allows human intervention

class WorkflowOrchestrator:
    """Base class for workflow orchestration"""
    def __init__(self):
        self.state = {}
        self.cost_tracking = {}

    def execute_task_with_retry(
        self,
        task_fn,
        task_name: str,
        max_retries: int = 3,
        backoff_factor: float = 2.0,
    ):
        """Execute with intelligent retry and cost tracking"""
        for attempt in range(max_retries):
            try:
                result = task_fn()
                self.state[task_name] = result
                return result
            except Exception as e:
                if attempt == max_retries - 1:
                    raise
                wait_time = 2 ** attempt * backoff_factor
                print(f"Attempt {attempt + 1} failed, retrying in {wait_time}s")
                time.sleep(wait_time)

Orchestration becomes critical at scale: >10 AI tasks/day or >$1000/month in API costs.

Temporal for Durable AI Workflows

Temporal is designed for long-running, reliable operations:

from temporalio import workflow, activity, Client
from datetime import timedelta
import openai

# Define activities (individual steps)
@activity.defn
async def call_llm(prompt: str) -> str:
    """Call LLM with timeout and cost tracking"""
    client = openai.OpenAI(api_key="sk-...")

    response = client.chat.completions.create(
        model="gpt-4-turbo",
        messages=[{"role": "user", "content": prompt}],
        max_tokens=1000,
        timeout=30,
    )

    cost = (
        response.usage.prompt_tokens * 0.01 / 1000 +
        response.usage.completion_tokens * 0.03 / 1000
    )

    return response.choices[0].message.content

@activity.defn
async def embed_text(text: str) -> list[float]:
    """Embed using batch API for cost savings"""
    client = openai.OpenAI(api_key="sk-...")

    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=text,
    )

    return response.data[0].embedding

@activity.defn
async def store_in_vector_db(
    text_id: str,
    embedding: list[float],
) -> bool:
    """Store embedding in vector database"""
    # Store in Pinecone, Qdrant, etc.
    index.upsert([
        {
            "id": text_id,
            "values": embedding,
            "metadata": {"stored_at": time.time()},
        }
    ])
    return True

# Define workflow (orchestration logic)
@workflow.defn
async def rag_indexing_workflow(documents: list[dict]) -> dict:
    """Index documents for RAG"""
    results = {
        "indexed": [],
        "failed": [],
        "total_cost": 0.0,
    }

    for i, doc in enumerate(documents):
        try:
            # Activity 1: Extract insights with LLM
            summary = await workflow.execute_activity(
                call_llm,
                args=[f"Summarize: {doc['content']}"],
                start_to_close_timeout=timedelta(seconds=60),
                retry_policy=RetryPolicy(
                    initial_interval=timedelta(seconds=1),
                    maximum_interval=timedelta(seconds=60),
                    maximum_attempts=3,
                    backoff_coefficient=2.0,
                ),
            )

            # Activity 2: Embed text
            embedding = await workflow.execute_activity(
                embed_text,
                args=[summary],
                start_to_close_timeout=timedelta(seconds=30),
            )

            # Activity 3: Store in vector DB
            stored = await workflow.execute_activity(
                store_in_vector_db,
                args=[doc["id"], embedding],
                start_to_close_timeout=timedelta(seconds=10),
            )

            if stored:
                results["indexed"].append(doc["id"])

        except Exception as e:
            results["failed"].append({
                "id": doc["id"],
                "error": str(e),
            })

    return results

# Run workflow
async def run_workflow():
    client = await Client.connect("localhost:7233")

    documents = [
        {"id": "doc-1", "content": "AI infrastructure guide"},
        {"id": "doc-2", "content": "Vector databases"},
    ]

    result = await client.execute_workflow(
        rag_indexing_workflow,
        args=[documents],
        id="rag-workflow-001",
        task_queue="rag_tasks",
    )

    print(f"Workflow result: {result}")

# Key benefits:
# 1. Automatic retry with exponential backoff
# 2. Survives server restart (durability)
# 3. Inspect execution history
# 4. Pause/resume workflows
# 5. Cost tracking per activity

Temporal excels at:

  • Long-running workflows (>1 hour)
  • Complex retry logic
  • Distributed tracing
  • Workflow versioning

Prefect for Data + AI Pipelines

Prefect focuses on data pipelines with AI steps:

from prefect import flow, task, get_run_logger
from prefect.futures import PrefectFuture
import openai

@task(name="Load Documents", retries=2, retry_delay_seconds=10)
def load_documents(source_path: str) -> list[dict]:
    """Load documents from source"""
    logger = get_run_logger()
    logger.info(f"Loading documents from {source_path}")

    documents = [
        {"id": f"doc-{i}", "content": f"Content {i}"}
        for i in range(100)
    ]

    return documents

@task(name="Classify with LLM", retries=3)
async def classify_document(doc: dict) -> dict:
    """Classify document using LLM"""
    client = openai.OpenAI(api_key="sk-...")

    response = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[
            {
                "role": "system",
                "content": "Classify the document category",
            },
            {"role": "user", "content": doc["content"]},
        ],
        max_tokens=10,
    )

    return {
        "id": doc["id"],
        "category": response.choices[0].message.content,
        "cost": (
            response.usage.prompt_tokens * 0.0005 / 1000 +
            response.usage.completion_tokens * 0.0015 / 1000
        ),
    }

@task(name="Save Results")
def save_results(classifications: list[dict]) -> int:
    """Save classifications to database"""
    saved_count = 0

    for classification in classifications:
        # Save to database
        saved_count += 1

    return saved_count

@flow(name="Document Classification Flow", retries=1)
def classify_documents_flow():
    """End-to-end document classification"""
    # Load data
    documents = load_documents("s3://docs/2026-03")

    # Classify in parallel
    classifications = []
    for doc in documents:
        result = classify_document.submit(doc)
        classifications.append(result)

    # Wait for all and collect results
    completed = [c.result() for c in classifications]

    # Calculate total cost
    total_cost = sum(c["cost"] for c in completed)
    print(f"Total cost: ${total_cost:.2f}")

    # Save results
    saved = save_results(completed)

    return {
        "classified": len(completed),
        "saved": saved,
        "cost": total_cost,
    }

# Run flow
if __name__ == "__main__":
    result = classify_documents_flow()
    print(f"Flow result: {result}")

Prefect excels at:

  • Data transformation + AI tasks
  • Parallel execution
  • Dynamic task graphs
  • Rich monitoring UI

Temporal: Durable Workflows With Retry Policies

Temporal's strength is handling failures and recovery:

from temporalio import workflow, activity, RetryPolicy
from datetime import timedelta
import asyncio

@activity.defn
async def process_batch(batch_id: str) -> dict:
    """Process a batch of items (might fail mid-way)"""
    import random
    import time

    processed = []

    for i in range(100):
        # Simulate work
        time.sleep(0.1)

        # Simulate occasional failures
        if random.random() < 0.1:  # 10% failure rate
            raise Exception(f"Item {i} failed")

        processed.append(i)

    return {"batch_id": batch_id, "processed": processed}

@workflow.defn
async def batch_processing_workflow(batch_ids: list[str]) -> dict:
    """Process multiple batches with smart retry"""
    results = {}

    for batch_id in batch_ids:
        try:
            # Retry policy: exponential backoff
            # - Start: 1 second
            # - Max: 60 seconds
            # - Max attempts: 5
            # - Multiplier: 2.0 (1s, 2s, 4s, 8s, 16s)
            result = await workflow.execute_activity(
                process_batch,
                args=[batch_id],
                start_to_close_timeout=timedelta(minutes=5),
                retry_policy=RetryPolicy(
                    initial_interval=timedelta(seconds=1),
                    maximum_interval=timedelta(seconds=60),
                    maximum_attempts=5,
                    backoff_coefficient=2.0,
                ),
            )
            results[batch_id] = result
        except Exception as e:
            # After retries exhausted, log and continue
            results[batch_id] = {"error": str(e)}

    return results

# Key advantage: Temporal retries preserve state
# If activity fails at item 50/100:
# - Retry starts from item 50 (if activity tracks progress)
# - Or retries entire batch (if stateless)
# - No lost computation

# With naive retry (queue-based):
# - Entire batch restarts
# - Wasted API calls
# - Could exceed rate limits

# With Temporal:
# - Checkpoints enable partial retry
# - Automatic backoff prevents rate-limit errors
# - Cost-efficient

Temporal's recovery semantics are critical for expensive AI tasks.

Airflow for Batch DAGs

Apache Airflow is best for batch data pipelines:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import openai

default_args = {
    "owner": "ai-team",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG(
    "nightly_embedding_pipeline",
    default_args=default_args,
    description="Embed documents nightly",
    schedule_interval="0 2 * * *",  # 2 AM daily
    start_date=days_ago(1),
)

def extract_documents(**context):
    """Extract documents from warehouse"""
    documents = [
        {"id": f"doc-{i}", "content": f"Content {i}"}
        for i in range(10000)
    ]
    return {"documents": documents, "count": len(documents)}

def embed_documents(**context):
    """Embed documents using batch API"""
    docs = context["task_instance"].xcom_pull(
        task_ids="extract_documents",
    )["documents"]

    client = openai.OpenAI(api_key="sk-...")

    embeddings = client.embeddings.create(
        model="text-embedding-3-small",
        input=[d["content"] for d in docs],
    )

    return {
        "embedding_count": len(embeddings.data),
        "cost": (len(docs) * 125 / 1_000_000) * 0.02,  # Batch pricing
    }

def store_embeddings(**context):
    """Store in vector database"""
    embedding_count = context["task_instance"].xcom_pull(
        task_ids="embed_documents",
    )["embedding_count"]

    # Store to Pinecone, Qdrant, etc.
    print(f"Stored {embedding_count} embeddings")

    return True

# Define DAG tasks
extract = PythonOperator(
    task_id="extract_documents",
    python_callable=extract_documents,
    dag=dag,
)

embed = PythonOperator(
    task_id="embed_documents",
    python_callable=embed_documents,
    dag=dag,
)

store = PythonOperator(
    task_id="store_embeddings",
    python_callable=store_embeddings,
    dag=dag,
)

# Define dependencies
extract >> embed >> store

Airflow DAG visualization:

extract_documents → embed_documents → store_embeddings

Airflow excels at:

  • Scheduled batch pipelines
  • Complex task dependencies
  • Visual DAG representation
  • Rich monitoring

Task Dependency Graphs

Orchestrate complex workflows:

from prefect import flow, task

@task
def task_a() -> str:
    return "A result"

@task
def task_b() -> str:
    return "B result"

@task
def task_c(a_result: str, b_result: str) -> str:
    return f"C depends on {a_result} and {b_result}"

@task
def task_d(c_result: str) -> str:
    return f"D depends on {c_result}"

@flow
def complex_workflow():
    a_result = task_a()
    b_result = task_b()
    c_result = task_c(a_result, b_result)
    d_result = task_d(c_result)
    return d_result

# Dependency graph:
#     ┌── Task A ──┐
#     │            │
#   Flow ─────────→ Task C → Task D
#     │            │
#     └── Task B ──┘

# Execution:
# - A and B run in parallel
# - C waits for both A and B
# - D waits for C

Parallel execution reduces total runtime.

Parallel Execution of AI Tasks

Maximize GPU/CPU utilization:

from concurrent.futures import ThreadPoolExecutor, as_completed
import openai

class ParallelAIExecutor:
    def __init__(self, max_workers: int = 10):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.client = openai.OpenAI(api_key="sk-...")

    def classify_batch(self, items: list[str]) -> list[dict]:
        """Classify items in parallel"""
        futures = {}

        for i, item in enumerate(items):
            future = self.executor.submit(self._classify_one, item)
            futures[future] = i

        results = [None] * len(items)

        for future in as_completed(futures):
            idx = futures[future]
            try:
                results[idx] = future.result()
            except Exception as e:
                results[idx] = {"error": str(e)}

        return results

    def _classify_one(self, item: str) -> dict:
        """Classify single item"""
        response = self.client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[
                {
                    "role": "system",
                    "content": "Classify into: positive/negative",
                },
                {"role": "user", "content": item},
            ],
            max_tokens=5,
        )

        return {
            "item": item,
            "classification": response.choices[0].message.content,
            "cost": (
                response.usage.prompt_tokens * 0.0005 / 1000 +
                response.usage.completion_tokens * 0.0015 / 1000
            ),
        }

# Usage
executor = ParallelAIExecutor(max_workers=10)

items = ["Great product!"] * 1000
results = executor.classify_batch(items)

total_cost = sum(r["cost"] for r in results if "cost" in r)
print(f"Classified {len(results)} items for ${total_cost:.2f}")

Parallel execution: 10× throughput at same cost.

Human Approval Steps

Workflows that need human review:

from prefect import flow, task
import asyncio

@task
def generate_content(topic: str) -> str:
    """Generate content with LLM"""
    client = openai.OpenAI(api_key="sk-...")

    response = client.chat.completions.create(
        model="gpt-4-turbo",
        messages=[
            {"role": "user", "content": f"Write article about {topic}"},
        ],
        max_tokens=2000,
    )

    return response.choices[0].message.content

@task
def request_approval(content: str) -> dict:
    """Request human approval via webhook"""
    import requests

    # Send to approval system
    response = requests.post(
        "https://approval-service/request",
        json={
            "content": content,
            "type": "article",
            "workflow_id": context.flow_run_id,
        },
    )

    return response.json()

@task
def publish_content(content: str) -> bool:
    """Publish to website"""
    # Only called if approved
    # Save to database/CMS
    return True

@flow
def content_generation_with_approval():
    # Generate
    content = generate_content("AI Trends 2026")

    # Wait for approval (can timeout)
    approval = request_approval(content)

    if approval["status"] == "approved":
        published = publish_content(content)
        return {"published": published}
    else:
        return {"rejected": True}

Approval gates prevent accidental publishing.

Workflow Versioning

Handle model/prompt updates safely:

@workflow.defn(name="document_indexing", version=1)
async def document_indexing_v1(doc: dict) -> dict:
    """Original version"""
    embedding = await workflow.execute_activity(embed_text, args=[doc["content"]])
    return {"id": doc["id"], "embedding": embedding}

@workflow.defn(name="document_indexing", version=2)
async def document_indexing_v2(doc: dict) -> dict:
    """Updated version: use better embedding model"""
    embedding = await workflow.execute_activity(
        embed_text_v2,  # New activity
        args=[doc["content"]],
    )
    return {"id": doc["id"], "embedding": embedding}

# In-flight workflows using v1 continue with v1
# New workflows use v2
# No compatibility breakage

# To migrate: replay workflows with new definition

Versioning allows safe model updates in production.

Cost Tracking Per Workflow Run

Monitor spending:

class WorkflowCostTracker:
    def __init__(self):
        self.costs = {}

    def track_activity_cost(
        self,
        workflow_id: str,
        activity_name: str,
        cost_usd: float,
    ):
        """Track cost per activity"""
        if workflow_id not in self.costs:
            self.costs[workflow_id] = {}

        if activity_name not in self.costs[workflow_id]:
            self.costs[workflow_id][activity_name] = 0

        self.costs[workflow_id][activity_name] += cost_usd

    def get_workflow_cost(self, workflow_id: str) -> float:
        """Get total cost for workflow"""
        return sum(self.costs.get(workflow_id, {}).values())

    def set_budget_limit(self, workflow_id: str, limit_usd: float):
        """Set max spend and alert if exceeded"""
        current = self.get_workflow_cost(workflow_id)

        if current > limit_usd:
            raise Exception(f"Workflow exceeded budget: ${current} > ${limit_usd}")

# Usage
tracker = WorkflowCostTracker()

# During workflow execution
tracker.track_activity_cost("workflow-001", "llm_inference", 0.50)
tracker.track_activity_cost("workflow-001", "embedding", 0.10)
tracker.track_activity_cost("workflow-001", "vector_db_store", 0.05)

total = tracker.get_workflow_cost("workflow-001")
print(f"Workflow-001 cost: ${total:.2f}")

# Alert if over budget
tracker.set_budget_limit("workflow-001", 1.00)  # Max $1

Cost tracking prevents runaway spending.

Checklist

  • Identify long-running AI tasks (>5 seconds)
  • Choose platform: Temporal (reliability), Prefect (data+AI), Airflow (batch)
  • Design retry policy (exponential backoff, max attempts)
  • Implement cost tracking per workflow
  • Set budget limits and alerts
  • Design task dependencies
  • Use parallel execution where possible
  • Plan human approval gates for sensitive operations
  • Document workflow versions and migration path
  • Monitor workflow execution metrics

Conclusion

Workflow orchestration is essential for production AI systems. Temporal excels at long-running, reliable operations with sophisticated retry. Prefect handles data + AI seamlessly. Airflow dominates batch schedules. All enable cost tracking, dependency management, and failure recovery. Choose based on workload: reliability (Temporal), data transformation (Prefect), or batch ETL (Airflow). At >10K workflows/day, orchestration becomes mandatory for cost control and reliability.