- Published on
AI Batch Processing — OpenAI Batch API, Cost Savings, and Pipeline Design
- Authors

- Name
- Sanjeev Sharma
- @webcoderspeed1
Introduction
Real-time inference is expensive. When latency isn't critical, batch processing delivers 50% cost savings. The OpenAI Batch API processes thousands of requests asynchronously, optimizing throughput. This guide covers batch API mechanics, pipeline design, and when to batch.
- OpenAI Batch API: 50% Cost Reduction
- Batch Job Submission (JSONL Format)
- Async Polling for Completion
- Batch Size Limits and Chunking
- Error Handling Per Item
- Mixing Models in Batch
- Pipeline Design: Offline Inference
- Embedding Bulk Documents
- Classification at Scale
- Report Generation Pipelines
- Checklist
- Conclusion
OpenAI Batch API: 50% Cost Reduction
The Batch API costs exactly half of standard API pricing:
# Standard API costs
standard_rates = {
"gpt-4-turbo": {
"input": 0.01, # per 1K tokens
"output": 0.03,
},
"gpt-3.5-turbo": {
"input": 0.0005,
"output": 0.0015,
},
}
# Batch API costs (50% reduction)
batch_rates = {
"gpt-4-turbo": {
"input": 0.005, # 50% cheaper
"output": 0.015,
},
"gpt-3.5-turbo": {
"input": 0.00025,
"output": 0.00075,
},
}
# Example: 1M input tokens, 500K output tokens
input_tokens = 1_000_000
output_tokens = 500_000
standard_cost = (input_tokens / 1000) * 0.01 + (output_tokens / 1000) * 0.03
batch_cost = (input_tokens / 1000) * 0.005 + (output_tokens / 1000) * 0.015
savings = standard_cost - batch_cost
print(f"Standard: ${standard_cost:,.2f}")
print(f"Batch: ${batch_cost:,.2f}")
print(f"Savings: ${savings:,.2f} ({savings/standard_cost*100:.1f}%)")
Trade-off: requests take 12-24 hours to process. Perfect for non-real-time tasks.
Batch Job Submission (JSONL Format)
OpenAI requires JSONL format (one JSON object per line):
import json
import openai
from datetime import datetime
# Prepare batch requests in JSONL format
def create_batch_file(requests: list[dict], filename: str) -> str:
"""Create JSONL file for batch submission"""
with open(filename, "w") as f:
for i, request in enumerate(requests):
batch_request = {
"custom_id": f"request-{i}", # Unique ID for tracking
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": request["model"],
"messages": request["messages"],
"max_tokens": request.get("max_tokens", 1000),
},
}
f.write(json.dumps(batch_request) + "\n")
return filename
# Example: batch classify documents
documents = [
"This product is amazing and works perfectly!",
"Terrible experience, would not recommend",
"Good quality but a bit expensive",
] * 1000 # 3000 documents
# Create batch requests
requests = []
for i, doc in enumerate(documents):
requests.append({
"model": "gpt-3.5-turbo",
"messages": [
{
"role": "system",
"content": "Classify sentiment as positive, negative, or neutral",
},
{"role": "user", "content": doc},
],
"max_tokens": 10,
})
# Create JSONL file
batch_file = create_batch_file(requests, "batch_requests.jsonl")
# Submit batch
client = openai.OpenAI(api_key="sk-...")
with open(batch_file, "rb") as f:
batch_response = client.beta.batches.create(
input_file=f,
endpoint="/v1/chat/completions",
completion_window="24h",
)
batch_id = batch_response.id
print(f"Batch submitted: {batch_id}")
print(f"Status: {batch_response.status}")
JSONL format critical: one JSON object per line, properly escaped strings.
Async Polling for Completion
Poll for completion without blocking:
import openai
import time
from datetime import datetime
class BatchPoller:
def __init__(self, api_key: str):
self.client = openai.OpenAI(api_key=api_key)
self.batch_id = None
def submit_batch(self, input_file: str) -> str:
"""Submit batch and return batch ID"""
with open(input_file, "rb") as f:
response = self.client.beta.batches.create(
input_file=f,
endpoint="/v1/chat/completions",
completion_window="24h",
)
self.batch_id = response.id
return self.batch_id
def poll_status(self, batch_id: str, poll_interval: int = 30) -> dict:
"""Poll batch status until completion"""
while True:
batch = self.client.beta.batches.retrieve(batch_id)
print(f"{datetime.now()}: {batch.status}")
print(f" Processed: {batch.request_counts.processed}")
print(f" Completed: {batch.request_counts.completed}")
print(f" Failed: {batch.request_counts.failed}")
if batch.status in ["completed", "failed", "expired"]:
return batch.model_dump()
time.sleep(poll_interval)
def get_results(self, batch_id: str) -> list[dict]:
"""Download and parse batch results"""
batch = self.client.beta.batches.retrieve(batch_id)
if batch.status != "completed":
raise ValueError(f"Batch not ready: {batch.status}")
# Download results
results_file = self.client.beta.batches.results(batch_id)
results = []
for line in results_file:
results.append(json.loads(line))
return results
# Usage
poller = BatchPoller(api_key="sk-...")
# Submit
batch_id = poller.submit_batch("batch_requests.jsonl")
# Poll (non-blocking)
# Can do other work while polling
status = poller.poll_status(batch_id, poll_interval=60)
# Get results
results = poller.get_results(batch_id)
# Parse results
for result in results[:5]:
custom_id = result["custom_id"]
response = result["response"]["body"]["choices"][0]["message"]["content"]
print(f"{custom_id}: {response}")
Polling can be background job (Celery, Airflow) to avoid blocking.
Batch Size Limits and Chunking
OpenAI limits batch size to 10M tokens. Chunk larger jobs:
import json
from typing import Iterator
class BatchChunker:
def __init__(self, max_tokens: int = 10_000_000):
self.max_tokens = max_tokens
def chunk_requests(
self,
requests: list[dict],
) -> Iterator[list[dict]]:
"""Chunk requests into batches within token limit"""
current_chunk = []
current_tokens = 0
for request in requests:
# Estimate tokens (rough: 1 token per 4 characters)
request_text = json.dumps(request)
estimated_tokens = len(request_text) / 4
if current_tokens + estimated_tokens > self.max_tokens:
# Flush current chunk
yield current_chunk
current_chunk = [request]
current_tokens = estimated_tokens
else:
current_chunk.append(request)
current_tokens += estimated_tokens
if current_chunk:
yield current_chunk
# Usage
chunker = BatchChunker(max_tokens=10_000_000)
# Generate 100M tokens of requests
large_request_set = generate_large_request_set()
batch_num = 0
for chunk in chunker.chunk_requests(large_request_set):
print(f"Chunk {batch_num}: {len(chunk)} requests")
create_batch_file(chunk, f"batch_{batch_num}.jsonl")
batch_num += 1
Chunk large datasets into multiple batch jobs.
Error Handling Per Item
Some requests fail. Handle gracefully:
import json
import openai
def process_batch_results(batch_id: str) -> dict:
"""Process results, track successes and failures"""
client = openai.OpenAI(api_key="sk-...")
batch = client.beta.batches.retrieve(batch_id)
results_file = client.beta.batches.results(batch_id)
successes = []
failures = []
for line in results_file:
result = json.loads(line)
custom_id = result["custom_id"]
if result["response"]["status_code"] == 200:
# Success
successes.append({
"custom_id": custom_id,
"response": result["response"]["body"],
})
else:
# Failure
failures.append({
"custom_id": custom_id,
"error": result["response"]["body"]["error"],
"status_code": result["response"]["status_code"],
})
return {
"total": len(successes) + len(failures),
"succeeded": len(successes),
"failed": len(failures),
"success_rate": len(successes) / (len(successes) + len(failures)),
"successes": successes,
"failures": failures,
}
# Process results
results = process_batch_results("batch-12345")
print(f"Success rate: {results['success_rate']:.1%}")
print(f"Failed items: {len(results['failures'])}")
# Retry failures
if results["failures"]:
retry_ids = [f["custom_id"] for f in results["failures"]]
print(f"Resubmitting {len(retry_ids)} failed requests")
# Create new batch with only failures
Track success/failure per request. Retry failures separately.
Mixing Models in Batch
Batch multiple models in one job:
def create_multimodel_batch(requests: list[dict]) -> str:
"""Submit batch with mixed models"""
batch_requests = []
for i, request in enumerate(requests):
# Each request can specify different model
batch_request = {
"custom_id": f"request-{i}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": request.get("model", "gpt-3.5-turbo"), # Model per request
"messages": request["messages"],
"max_tokens": request.get("max_tokens", 1000),
},
}
batch_requests.append(batch_request)
# Write to JSONL
with open("multimodel_batch.jsonl", "w") as f:
for br in batch_requests:
f.write(json.dumps(br) + "\n")
# Submit
client = openai.OpenAI(api_key="sk-...")
with open("multimodel_batch.jsonl", "rb") as f:
batch = client.beta.batches.create(
input_file=f,
endpoint="/v1/chat/completions",
completion_window="24h",
)
return batch.id
# Example: use gpt-4 for complex, gpt-3.5 for simple
requests = [
{
"model": "gpt-4-turbo",
"messages": [{"role": "user", "content": "Complex question"}],
},
{
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "Simple question"}],
},
]
batch_id = create_multimodel_batch(requests)
Mix models to balance cost and quality.
Pipeline Design: Offline Inference
Build end-to-end batch processing pipelines:
from datetime import datetime
import json
import asyncio
class AIBatchPipeline:
"""Batch processing pipeline for AI tasks"""
def __init__(self, client_api_key: str):
self.client = openai.OpenAI(api_key=client_api_key)
async def embed_documents_batch(
self,
documents: list[str],
output_file: str,
) -> str:
"""Batch embed 1M+ documents"""
requests = []
for i, doc in enumerate(documents):
requests.append({
"custom_id": f"embed-{i}",
"method": "POST",
"url": "/v1/embeddings",
"body": {
"model": "text-embedding-3-small",
"input": doc,
},
})
return self._submit_batch(requests, output_file)
async def classify_documents_batch(
self,
documents: list[str],
categories: list[str],
) -> str:
"""Batch classify documents"""
requests = []
for i, doc in enumerate(documents):
requests.append({
"custom_id": f"classify-{i}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "gpt-3.5-turbo",
"messages": [
{
"role": "system",
"content": f"Classify into: {', '.join(categories)}",
},
{"role": "user", "content": doc},
],
"max_tokens": 10,
},
})
return self._submit_batch(requests, output_file)
def _submit_batch(self, requests: list[dict], output_file: str) -> str:
"""Submit batch job"""
batch_file = "batch_temp.jsonl"
with open(batch_file, "w") as f:
for req in requests:
f.write(json.dumps(req) + "\n")
with open(batch_file, "rb") as f:
batch = self.client.beta.batches.create(
input_file=f,
endpoint="/v1/chat/completions",
completion_window="24h",
)
return batch.id
# Usage in data pipeline
pipeline = AIBatchPipeline(api_key="sk-...")
documents = load_documents_from_db() # 100K documents
# Start batch embedding
embed_batch_id = asyncio.run(
pipeline.embed_documents_batch(documents, "embeddings.jsonl")
)
# Batch runs in background (~12 hours)
# Continue with other work
# Later: fetch results
results = get_batch_results(embed_batch_id)
save_embeddings_to_vector_db(results)
Batch pipelines integrate with data workflows for cost-effective processing.
Embedding Bulk Documents
Batch embeddings at massive scale:
def batch_embed_documents(
documents: list[str],
batch_size: int = 10000,
) -> list[list[float]]:
"""Embed 1M+ documents efficiently"""
client = openai.OpenAI(api_key="sk-...")
all_embeddings = []
for batch_num in range(0, len(documents), batch_size):
batch = documents[batch_num : batch_num + batch_size]
# Create batch job
requests = []
for i, doc in enumerate(batch):
requests.append({
"custom_id": f"{batch_num + i}",
"method": "POST",
"url": "/v1/embeddings",
"body": {
"model": "text-embedding-3-small",
"input": doc,
},
})
# Submit
batch_file = f"embed_batch_{batch_num}.jsonl"
with open(batch_file, "w") as f:
for req in requests:
f.write(json.dumps(req) + "\n")
with open(batch_file, "rb") as f:
batch_job = client.beta.batches.create(
input_file=f,
endpoint="/v1/embeddings",
completion_window="24h",
)
print(f"Submitted batch {batch_num}: {batch_job.id}")
# Poll and collect results
return all_embeddings
Batch embedding cost: < $50 for 1M documents (vs >$100 with standard API).
Classification at Scale
Classify millions of items cheaply:
def batch_classify(
items: list[dict],
categories: list[str],
) -> dict:
"""Classify 1M+ items via batch"""
client = openai.OpenAI(api_key="sk-...")
# Create batch job
requests = []
for i, item in enumerate(items):
requests.append({
"custom_id": f"item-{item['id']}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "gpt-3.5-turbo",
"messages": [
{
"role": "system",
"content": f"Classify into: {', '.join(categories)}. Return ONLY the category name.",
},
{
"role": "user",
"content": item["text"],
},
],
"max_tokens": 5,
},
})
# Write and submit
with open("classify_batch.jsonl", "w") as f:
for req in requests:
f.write(json.dumps(req) + "\n")
with open("classify_batch.jsonl", "rb") as f:
batch = client.beta.batches.create(
input_file=f,
endpoint="/v1/chat/completions",
completion_window="24h",
)
return batch.id
# Example: classify user reviews (1M reviews)
reviews = load_reviews() # 1M review objects
batch_id = batch_classify(
reviews,
categories=["positive", "negative", "neutral"],
)
# Get results after 12-24 hours
results = get_batch_results(batch_id)
# Count classifications
classifications = {}
for result in results:
category = result["response"]["body"]["choices"][0]["message"]["content"].strip()
classifications[category] = classifications.get(category, 0) + 1
print(f"Classification summary: {classifications}")
Classify 1M items for < $50 vs > $100 with standard API.
Report Generation Pipelines
Generate reports offline:
async def generate_monthly_reports(company_data: dict) -> str:
"""Generate reports for 1000+ companies"""
client = openai.OpenAI(api_key="sk-...")
requests = []
for i, company in enumerate(company_data):
requests.append({
"custom_id": f"report-{company['id']}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "gpt-4-turbo",
"messages": [
{
"role": "system",
"content": "Generate a 500-word monthly report",
},
{
"role": "user",
"content": f"Company metrics: {json.dumps(company['metrics'])}",
},
],
"max_tokens": 1000,
},
})
# Submit batch
with open("reports_batch.jsonl", "w") as f:
for req in requests:
f.write(json.dumps(req) + "\n")
with open("reports_batch.jsonl", "rb") as f:
batch = client.beta.batches.create(
input_file=f,
endpoint="/v1/chat/completions",
completion_window="24h",
)
return batch.id
# Generate reports for 1000 companies
batch_id = generate_monthly_reports(company_data)
# Poll for completion
poll_batch_status(batch_id)
# Save reports to database
results = get_batch_results(batch_id)
for result in results:
company_id = result["custom_id"].replace("report-", "")
report = result["response"]["body"]["choices"][0]["message"]["content"]
save_report_to_db(company_id, report)
Batch report generation costs ~50% less than real-time generation.
Checklist
- Identify non-real-time AI tasks (embedding, classification, reporting)
- Calculate cost savings (typically 40-50%)
- Design JSONL batch format with unique custom_ids
- Implement polling mechanism for batch completion
- Chunk requests into <10M token batches
- Handle per-item errors and implement retry
- Integrate into data pipeline (Airflow, Celery, etc.)
- Monitor batch job success rates
- Set up scheduled batch jobs for recurring tasks
- Document SLA (24-hour completion guarantee)
Conclusion
OpenAI Batch API delivers 50% cost savings for non-time-critical inference. Process embeddings, classifications, and reports offline at massive scale. Design pipelines for end-to-end automation. Schedule recurring batches. Integrate with data warehouses. At 1M+ tokens/day, batch processing becomes mandatory for cost control.