Published on

AI Batch Processing — OpenAI Batch API, Cost Savings, and Pipeline Design

Authors

Introduction

Real-time inference is expensive. When latency isn't critical, batch processing delivers 50% cost savings. The OpenAI Batch API processes thousands of requests asynchronously, optimizing throughput. This guide covers batch API mechanics, pipeline design, and when to batch.

OpenAI Batch API: 50% Cost Reduction

The Batch API costs exactly half of standard API pricing:

# Standard API costs
standard_rates = {
    "gpt-4-turbo": {
        "input": 0.01,  # per 1K tokens
        "output": 0.03,
    },
    "gpt-3.5-turbo": {
        "input": 0.0005,
        "output": 0.0015,
    },
}

# Batch API costs (50% reduction)
batch_rates = {
    "gpt-4-turbo": {
        "input": 0.005,  # 50% cheaper
        "output": 0.015,
    },
    "gpt-3.5-turbo": {
        "input": 0.00025,
        "output": 0.00075,
    },
}

# Example: 1M input tokens, 500K output tokens
input_tokens = 1_000_000
output_tokens = 500_000

standard_cost = (input_tokens / 1000) * 0.01 + (output_tokens / 1000) * 0.03
batch_cost = (input_tokens / 1000) * 0.005 + (output_tokens / 1000) * 0.015

savings = standard_cost - batch_cost
print(f"Standard: ${standard_cost:,.2f}")
print(f"Batch: ${batch_cost:,.2f}")
print(f"Savings: ${savings:,.2f} ({savings/standard_cost*100:.1f}%)")

Trade-off: requests take 12-24 hours to process. Perfect for non-real-time tasks.

Batch Job Submission (JSONL Format)

OpenAI requires JSONL format (one JSON object per line):

import json
import openai
from datetime import datetime

# Prepare batch requests in JSONL format
def create_batch_file(requests: list[dict], filename: str) -> str:
    """Create JSONL file for batch submission"""
    with open(filename, "w") as f:
        for i, request in enumerate(requests):
            batch_request = {
                "custom_id": f"request-{i}",  # Unique ID for tracking
                "method": "POST",
                "url": "/v1/chat/completions",
                "body": {
                    "model": request["model"],
                    "messages": request["messages"],
                    "max_tokens": request.get("max_tokens", 1000),
                },
            }
            f.write(json.dumps(batch_request) + "\n")

    return filename

# Example: batch classify documents
documents = [
    "This product is amazing and works perfectly!",
    "Terrible experience, would not recommend",
    "Good quality but a bit expensive",
] * 1000  # 3000 documents

# Create batch requests
requests = []
for i, doc in enumerate(documents):
    requests.append({
        "model": "gpt-3.5-turbo",
        "messages": [
            {
                "role": "system",
                "content": "Classify sentiment as positive, negative, or neutral",
            },
            {"role": "user", "content": doc},
        ],
        "max_tokens": 10,
    })

# Create JSONL file
batch_file = create_batch_file(requests, "batch_requests.jsonl")

# Submit batch
client = openai.OpenAI(api_key="sk-...")

with open(batch_file, "rb") as f:
    batch_response = client.beta.batches.create(
        input_file=f,
        endpoint="/v1/chat/completions",
        completion_window="24h",
    )

batch_id = batch_response.id
print(f"Batch submitted: {batch_id}")
print(f"Status: {batch_response.status}")

JSONL format critical: one JSON object per line, properly escaped strings.

Async Polling for Completion

Poll for completion without blocking:

import openai
import time
from datetime import datetime

class BatchPoller:
    def __init__(self, api_key: str):
        self.client = openai.OpenAI(api_key=api_key)
        self.batch_id = None

    def submit_batch(self, input_file: str) -> str:
        """Submit batch and return batch ID"""
        with open(input_file, "rb") as f:
            response = self.client.beta.batches.create(
                input_file=f,
                endpoint="/v1/chat/completions",
                completion_window="24h",
            )
        self.batch_id = response.id
        return self.batch_id

    def poll_status(self, batch_id: str, poll_interval: int = 30) -> dict:
        """Poll batch status until completion"""
        while True:
            batch = self.client.beta.batches.retrieve(batch_id)

            print(f"{datetime.now()}: {batch.status}")
            print(f"  Processed: {batch.request_counts.processed}")
            print(f"  Completed: {batch.request_counts.completed}")
            print(f"  Failed: {batch.request_counts.failed}")

            if batch.status in ["completed", "failed", "expired"]:
                return batch.model_dump()

            time.sleep(poll_interval)

    def get_results(self, batch_id: str) -> list[dict]:
        """Download and parse batch results"""
        batch = self.client.beta.batches.retrieve(batch_id)

        if batch.status != "completed":
            raise ValueError(f"Batch not ready: {batch.status}")

        # Download results
        results_file = self.client.beta.batches.results(batch_id)

        results = []
        for line in results_file:
            results.append(json.loads(line))

        return results

# Usage
poller = BatchPoller(api_key="sk-...")

# Submit
batch_id = poller.submit_batch("batch_requests.jsonl")

# Poll (non-blocking)
# Can do other work while polling
status = poller.poll_status(batch_id, poll_interval=60)

# Get results
results = poller.get_results(batch_id)

# Parse results
for result in results[:5]:
    custom_id = result["custom_id"]
    response = result["response"]["body"]["choices"][0]["message"]["content"]
    print(f"{custom_id}: {response}")

Polling can be background job (Celery, Airflow) to avoid blocking.

Batch Size Limits and Chunking

OpenAI limits batch size to 10M tokens. Chunk larger jobs:

import json
from typing import Iterator

class BatchChunker:
    def __init__(self, max_tokens: int = 10_000_000):
        self.max_tokens = max_tokens

    def chunk_requests(
        self,
        requests: list[dict],
    ) -> Iterator[list[dict]]:
        """Chunk requests into batches within token limit"""
        current_chunk = []
        current_tokens = 0

        for request in requests:
            # Estimate tokens (rough: 1 token per 4 characters)
            request_text = json.dumps(request)
            estimated_tokens = len(request_text) / 4

            if current_tokens + estimated_tokens > self.max_tokens:
                # Flush current chunk
                yield current_chunk
                current_chunk = [request]
                current_tokens = estimated_tokens
            else:
                current_chunk.append(request)
                current_tokens += estimated_tokens

        if current_chunk:
            yield current_chunk

# Usage
chunker = BatchChunker(max_tokens=10_000_000)

# Generate 100M tokens of requests
large_request_set = generate_large_request_set()

batch_num = 0
for chunk in chunker.chunk_requests(large_request_set):
    print(f"Chunk {batch_num}: {len(chunk)} requests")
    create_batch_file(chunk, f"batch_{batch_num}.jsonl")
    batch_num += 1

Chunk large datasets into multiple batch jobs.

Error Handling Per Item

Some requests fail. Handle gracefully:

import json
import openai

def process_batch_results(batch_id: str) -> dict:
    """Process results, track successes and failures"""
    client = openai.OpenAI(api_key="sk-...")

    batch = client.beta.batches.retrieve(batch_id)
    results_file = client.beta.batches.results(batch_id)

    successes = []
    failures = []

    for line in results_file:
        result = json.loads(line)
        custom_id = result["custom_id"]

        if result["response"]["status_code"] == 200:
            # Success
            successes.append({
                "custom_id": custom_id,
                "response": result["response"]["body"],
            })
        else:
            # Failure
            failures.append({
                "custom_id": custom_id,
                "error": result["response"]["body"]["error"],
                "status_code": result["response"]["status_code"],
            })

    return {
        "total": len(successes) + len(failures),
        "succeeded": len(successes),
        "failed": len(failures),
        "success_rate": len(successes) / (len(successes) + len(failures)),
        "successes": successes,
        "failures": failures,
    }

# Process results
results = process_batch_results("batch-12345")

print(f"Success rate: {results['success_rate']:.1%}")
print(f"Failed items: {len(results['failures'])}")

# Retry failures
if results["failures"]:
    retry_ids = [f["custom_id"] for f in results["failures"]]
    print(f"Resubmitting {len(retry_ids)} failed requests")
    # Create new batch with only failures

Track success/failure per request. Retry failures separately.

Mixing Models in Batch

Batch multiple models in one job:

def create_multimodel_batch(requests: list[dict]) -> str:
    """Submit batch with mixed models"""
    batch_requests = []

    for i, request in enumerate(requests):
        # Each request can specify different model
        batch_request = {
            "custom_id": f"request-{i}",
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "model": request.get("model", "gpt-3.5-turbo"),  # Model per request
                "messages": request["messages"],
                "max_tokens": request.get("max_tokens", 1000),
            },
        }
        batch_requests.append(batch_request)

    # Write to JSONL
    with open("multimodel_batch.jsonl", "w") as f:
        for br in batch_requests:
            f.write(json.dumps(br) + "\n")

    # Submit
    client = openai.OpenAI(api_key="sk-...")
    with open("multimodel_batch.jsonl", "rb") as f:
        batch = client.beta.batches.create(
            input_file=f,
            endpoint="/v1/chat/completions",
            completion_window="24h",
        )

    return batch.id

# Example: use gpt-4 for complex, gpt-3.5 for simple
requests = [
    {
        "model": "gpt-4-turbo",
        "messages": [{"role": "user", "content": "Complex question"}],
    },
    {
        "model": "gpt-3.5-turbo",
        "messages": [{"role": "user", "content": "Simple question"}],
    },
]

batch_id = create_multimodel_batch(requests)

Mix models to balance cost and quality.

Pipeline Design: Offline Inference

Build end-to-end batch processing pipelines:

from datetime import datetime
import json
import asyncio

class AIBatchPipeline:
    """Batch processing pipeline for AI tasks"""

    def __init__(self, client_api_key: str):
        self.client = openai.OpenAI(api_key=client_api_key)

    async def embed_documents_batch(
        self,
        documents: list[str],
        output_file: str,
    ) -> str:
        """Batch embed 1M+ documents"""
        requests = []

        for i, doc in enumerate(documents):
            requests.append({
                "custom_id": f"embed-{i}",
                "method": "POST",
                "url": "/v1/embeddings",
                "body": {
                    "model": "text-embedding-3-small",
                    "input": doc,
                },
            })

        return self._submit_batch(requests, output_file)

    async def classify_documents_batch(
        self,
        documents: list[str],
        categories: list[str],
    ) -> str:
        """Batch classify documents"""
        requests = []

        for i, doc in enumerate(documents):
            requests.append({
                "custom_id": f"classify-{i}",
                "method": "POST",
                "url": "/v1/chat/completions",
                "body": {
                    "model": "gpt-3.5-turbo",
                    "messages": [
                        {
                            "role": "system",
                            "content": f"Classify into: {', '.join(categories)}",
                        },
                        {"role": "user", "content": doc},
                    ],
                    "max_tokens": 10,
                },
            })

        return self._submit_batch(requests, output_file)

    def _submit_batch(self, requests: list[dict], output_file: str) -> str:
        """Submit batch job"""
        batch_file = "batch_temp.jsonl"
        with open(batch_file, "w") as f:
            for req in requests:
                f.write(json.dumps(req) + "\n")

        with open(batch_file, "rb") as f:
            batch = self.client.beta.batches.create(
                input_file=f,
                endpoint="/v1/chat/completions",
                completion_window="24h",
            )

        return batch.id

# Usage in data pipeline
pipeline = AIBatchPipeline(api_key="sk-...")

documents = load_documents_from_db()  # 100K documents

# Start batch embedding
embed_batch_id = asyncio.run(
    pipeline.embed_documents_batch(documents, "embeddings.jsonl")
)

# Batch runs in background (~12 hours)
# Continue with other work

# Later: fetch results
results = get_batch_results(embed_batch_id)
save_embeddings_to_vector_db(results)

Batch pipelines integrate with data workflows for cost-effective processing.

Embedding Bulk Documents

Batch embeddings at massive scale:

def batch_embed_documents(
    documents: list[str],
    batch_size: int = 10000,
) -> list[list[float]]:
    """Embed 1M+ documents efficiently"""
    client = openai.OpenAI(api_key="sk-...")

    all_embeddings = []

    for batch_num in range(0, len(documents), batch_size):
        batch = documents[batch_num : batch_num + batch_size]

        # Create batch job
        requests = []
        for i, doc in enumerate(batch):
            requests.append({
                "custom_id": f"{batch_num + i}",
                "method": "POST",
                "url": "/v1/embeddings",
                "body": {
                    "model": "text-embedding-3-small",
                    "input": doc,
                },
            })

        # Submit
        batch_file = f"embed_batch_{batch_num}.jsonl"
        with open(batch_file, "w") as f:
            for req in requests:
                f.write(json.dumps(req) + "\n")

        with open(batch_file, "rb") as f:
            batch_job = client.beta.batches.create(
                input_file=f,
                endpoint="/v1/embeddings",
                completion_window="24h",
            )

        print(f"Submitted batch {batch_num}: {batch_job.id}")

    # Poll and collect results
    return all_embeddings

Batch embedding cost: < $50 for 1M documents (vs >$100 with standard API).

Classification at Scale

Classify millions of items cheaply:

def batch_classify(
    items: list[dict],
    categories: list[str],
) -> dict:
    """Classify 1M+ items via batch"""
    client = openai.OpenAI(api_key="sk-...")

    # Create batch job
    requests = []
    for i, item in enumerate(items):
        requests.append({
            "custom_id": f"item-{item['id']}",
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "model": "gpt-3.5-turbo",
                "messages": [
                    {
                        "role": "system",
                        "content": f"Classify into: {', '.join(categories)}. Return ONLY the category name.",
                    },
                    {
                        "role": "user",
                        "content": item["text"],
                    },
                ],
                "max_tokens": 5,
            },
        })

    # Write and submit
    with open("classify_batch.jsonl", "w") as f:
        for req in requests:
            f.write(json.dumps(req) + "\n")

    with open("classify_batch.jsonl", "rb") as f:
        batch = client.beta.batches.create(
            input_file=f,
            endpoint="/v1/chat/completions",
            completion_window="24h",
        )

    return batch.id

# Example: classify user reviews (1M reviews)
reviews = load_reviews()  # 1M review objects

batch_id = batch_classify(
    reviews,
    categories=["positive", "negative", "neutral"],
)

# Get results after 12-24 hours
results = get_batch_results(batch_id)

# Count classifications
classifications = {}
for result in results:
    category = result["response"]["body"]["choices"][0]["message"]["content"].strip()
    classifications[category] = classifications.get(category, 0) + 1

print(f"Classification summary: {classifications}")

Classify 1M items for < $50 vs > $100 with standard API.

Report Generation Pipelines

Generate reports offline:

async def generate_monthly_reports(company_data: dict) -> str:
    """Generate reports for 1000+ companies"""
    client = openai.OpenAI(api_key="sk-...")

    requests = []

    for i, company in enumerate(company_data):
        requests.append({
            "custom_id": f"report-{company['id']}",
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "model": "gpt-4-turbo",
                "messages": [
                    {
                        "role": "system",
                        "content": "Generate a 500-word monthly report",
                    },
                    {
                        "role": "user",
                        "content": f"Company metrics: {json.dumps(company['metrics'])}",
                    },
                ],
                "max_tokens": 1000,
            },
        })

    # Submit batch
    with open("reports_batch.jsonl", "w") as f:
        for req in requests:
            f.write(json.dumps(req) + "\n")

    with open("reports_batch.jsonl", "rb") as f:
        batch = client.beta.batches.create(
            input_file=f,
            endpoint="/v1/chat/completions",
            completion_window="24h",
        )

    return batch.id

# Generate reports for 1000 companies
batch_id = generate_monthly_reports(company_data)

# Poll for completion
poll_batch_status(batch_id)

# Save reports to database
results = get_batch_results(batch_id)
for result in results:
    company_id = result["custom_id"].replace("report-", "")
    report = result["response"]["body"]["choices"][0]["message"]["content"]
    save_report_to_db(company_id, report)

Batch report generation costs ~50% less than real-time generation.

Checklist

  • Identify non-real-time AI tasks (embedding, classification, reporting)
  • Calculate cost savings (typically 40-50%)
  • Design JSONL batch format with unique custom_ids
  • Implement polling mechanism for batch completion
  • Chunk requests into <10M token batches
  • Handle per-item errors and implement retry
  • Integrate into data pipeline (Airflow, Celery, etc.)
  • Monitor batch job success rates
  • Set up scheduled batch jobs for recurring tasks
  • Document SLA (24-hour completion guarantee)

Conclusion

OpenAI Batch API delivers 50% cost savings for non-time-critical inference. Process embeddings, classifications, and reports offline at massive scale. Design pipelines for end-to-end automation. Schedule recurring batches. Integrate with data warehouses. At 1M+ tokens/day, batch processing becomes mandatory for cost control.