Batch Completion
Batch Completion enables you to process large volumes of LLM requests (e.g., 1,000+ requests) while automatically respecting AI Connection and AI Resource capacity limits (TPM/RPM). Unlike traditional message queueing systems that process messages concurrently, Batch Completion intelligently schedules requests based on available capacity, ensuring you never exceed rate limits.
Overview
Batch Completion is designed for scenarios where you need to process many LLM requests efficiently while staying within provider rate limits. The system uses Redis-based queuing with capacity-aware scheduling to ensure optimal throughput without violating TPM (Tokens Per Minute) or RPM (Requests Per Minute) constraints.
Key Benefits
- Capacity-Aware Processing: Automatically respects AI Connection and AI Resource capacity limits (TPM/RPM)
- Intelligent Scheduling: Queues requests and processes them based on available capacity, not just concurrency
- Multiple Response Methods: Choose between Server-Sent Events (SSE), callback URLs, or async polling
- Automatic Retry: Built-in retry logic for transient failures
- Batch-Level Capacity: Override capacity limits per batch for custom rate limiting
- Scalable: Handles thousands of requests efficiently with Redis-based distributed queuing
- Progress Tracking: Real-time progress updates via SSE or callback events
When to Use Batch Completion
Batch Completion is ideal for:
- Batch Document Processing: Processing large volumes of documents (summarization, extraction, analysis)
- Data Pipeline Processing: Bulk processing of data that requires LLM analysis
- Content Generation at Scale: Generating content for multiple items while respecting rate limits
- Migration Tasks: Migrating or transforming data using LLM operations
- Scheduled Jobs: Processing large batches on a schedule
Architecture
The Batch Completion system uses Redis as a distributed queue with multiple data structures to manage capacity-aware processing:
Queue System Components
- Main Queue (
{workspaceId}:{environmentId}:{resourceId}:queue): Sorted set containing pending items, ordered by creation timestamp - Processing Queue (
{workspaceId}:{environmentId}:{resourceId}:processing-queue): Sorted set of items currently being processed with visibility timeout - Global Processing Queue: Tracks all in-flight items across all resources for monitoring and reprocessing
- Active Resources (
global-active-resources-batch): Sorted set of resources that have items ready to process - Standby Resources (
standby-resources-batch): Sorted set of resources waiting for capacity to become available - Batch Stream (
{workspaceId}:{environmentId}:{batchId}:batch-queue-stream): Redis Stream for SSE event delivery
How It Works
Batch Processing Flow
The following sequence diagram illustrates the complete flow of batch processing:
1. Batch Creation
When you create a batch, the system:
- Validates all items and calculates estimated token counts
- Creates batch and item records in PostgreSQL
- Groups items by AI Resource
- Enqueues items into Redis queues (one queue per resource)
- Adds resources to the active resources set
2. Capacity-Aware Processing
The consumer workers continuously:
- Select the oldest active resources (up to 10 at a time)
- Lock each resource to prevent concurrent processing
- Check available capacity (RPM/TPM) for the resource
- Retrieve items from the queue, respecting:
- Available requests (RPM limit)
- Available tokens (TPM limit)
- Estimated token count per item
- Move items from main queue to processing queue
- Process items concurrently (up to 1,000 concurrent tasks)
3. Capacity Management
The system checks capacity from multiple sources:
- AI Connection Capacity: Base capacity defined on the connection
- AI Resource Capacity: Resource-level capacity overrides
- Batch-Level Capacity: Optional per-batch capacity override
- Discovered Capacity: Automatically discovered provider limits (if available)
If capacity is exhausted:
- Items remain in the main queue
- Resource is moved to standby resources
- Resource is reactivated when capacity becomes available
4. Retry and Error Handling
- Retryable Errors: Items are moved back to the main queue with a delay
- Non-Retryable Errors: Items are marked as failed and removed
- Visibility Timeout: Items in processing queue expire after a timeout and are reprocessed if not completed
- Reprocessing Loop: Continuously monitors for expired items and reprocesses them
5. Progress Updates
Progress is tracked and delivered via:
- SSE (SYNC type): Real-time events as items complete
- Callback URLs (CALLBACK type): HTTP POST to your endpoint
- Polling (ASYNC type): Query batch status via API
API Usage
Batch Types
VM-X AI supports three batch types:
- SYNC: Stream results via Server-Sent Events (SSE)
- ASYNC: Process in background, poll for status
- CALLBACK: Process in background, receive HTTP callbacks
Creating a Batch (SYNC)
- Python
- Node.js
- cURL
import requests
import json
workspace_id = "6c41dc1b-910c-4358-beef-2c609d38db31"
environment_id = "6c1957ca-77ca-49b3-8fa1-0590281b8b44"
# Create batch with SYNC type
response = requests.post(
f"http://localhost:3000/v1/completion-batch/{workspace_id}/{environment_id}",
headers={
"Authorization": "Bearer your-vmx-api-key",
"Content-Type": "application/json",
},
json={
"type": "SYNC",
"items": [
{
"resourceId": "your-resource-id",
"request": {
"messages": [
{"role": "user", "content": "Summarize this document: ..."}
]
}
},
{
"resourceId": "your-resource-id",
"request": {
"messages": [
{"role": "user", "content": "Extract key points from: ..."}
]
}
}
]
},
stream=True # Enable streaming for SSE
)
# Process SSE events
for line in response.iter_lines():
if line:
data = line.decode('utf-8')
if data.startswith('data: '):
event_data = json.loads(data[6:])
if event_data.get('action') == 'item-completed':
print(f"Item completed: {event_data['payload']['itemId']}")
elif event_data.get('action') == 'batch-completed':
print("Batch completed!")
break
import axios from 'axios';
const workspaceId = '6c41dc1b-910c-4358-beef-2c609d38db31';
const environmentId = '6c1957ca-77ca-49b3-8fa1-0590281b8b44';
// Create batch with SYNC type
const response = await axios.post(
`http://localhost:3000/v1/completion-batch/${workspaceId}/${environmentId}`,
{
type: 'SYNC',
items: [
{
resourceId: 'your-resource-id',
request: {
messages: [{ role: 'user', content: 'Summarize this document: ...' }],
},
},
{
resourceId: 'your-resource-id',
request: {
messages: [{ role: 'user', content: 'Extract key points from: ...' }],
},
},
],
},
{
headers: {
Authorization: 'Bearer your-vmx-api-key',
'Content-Type': 'application/json',
},
responseType: 'stream',
}
);
// Process SSE events
response.data.on('data', (chunk) => {
const lines = chunk.toString().split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const event = JSON.parse(line.slice(6));
if (event.action === 'item-completed') {
console.log(`Item completed: ${event.payload.itemId}`);
} else if (event.action === 'batch-completed') {
console.log('Batch completed!');
}
}
}
});
curl -N -X POST \
"http://localhost:3000/v1/completion-batch/{workspaceId}/{environmentId}" \
-H "Authorization: Bearer your-vmx-api-key" \
-H "Content-Type: application/json" \
-d '{
"type": "SYNC",
"items": [
{
"resourceId": "your-resource-id",
"request": {
"messages": [
{"role": "user", "content": "Summarize this document: ..."}
]
}
}
]
}'
Creating a Batch (ASYNC)
- Python
- Node.js
import requests
import time
workspace_id = "6c41dc1b-910c-4358-beef-2c609d38db31"
environment_id = "6c1957ca-77ca-49b3-8fa1-0590281b8b44"
# Create batch
response = requests.post(
f"http://localhost:3000/v1/completion-batch/{workspace_id}/{environment_id}",
headers={
"Authorization": "Bearer your-vmx-api-key",
"Content-Type": "application/json",
},
json={
"type": "ASYNC",
"items": [
{
"resourceId": "your-resource-id",
"request": {
"messages": [
{"role": "user", "content": "Process document..."}
]
}
}
]
}
)
batch = response.json()
batch_id = batch["batchId"]
# Poll for status
while True:
status_response = requests.get(
f"http://localhost:3000/v1/completion-batch/{workspace_id}/{environment_id}/{batch_id}",
headers={"Authorization": "Bearer your-vmx-api-key"}
)
batch_status = status_response.json()
print(f"Progress: {batch_status['completedPercentage']}%")
if batch_status["status"] in ["COMPLETED", "FAILED"]:
break
time.sleep(5) # Poll every 5 seconds
import axios from 'axios';
const workspaceId = '6c41dc1b-910c-4358-beef-2c609d38db31';
const environmentId = '6c1957ca-77ca-49b3-8fa1-0590281b8b44';
// Create batch
const { data: batch } = await axios.post(
`http://localhost:3000/v1/completion-batch/${workspaceId}/${environmentId}`,
{
type: 'ASYNC',
items: [
{
resourceId: 'your-resource-id',
request: {
messages: [{ role: 'user', content: 'Process document...' }],
},
},
],
},
{
headers: {
Authorization: 'Bearer your-vmx-api-key',
'Content-Type': 'application/json',
},
}
);
const batchId = batch.batchId;
// Poll for status
const pollStatus = async () => {
while (true) {
const { data: batchStatus } = await axios.get(`http://localhost:3000/v1/completion-batch/${workspaceId}/${environmentId}/${batchId}`, {
headers: {
Authorization: 'Bearer your-vmx-api-key',
},
});
console.log(`Progress: ${batchStatus.completedPercentage}%`);
if (['COMPLETED', 'FAILED'].includes(batchStatus.status)) {
break;
}
await new Promise((resolve) => setTimeout(resolve, 5000)); // Poll every 5 seconds
}
};
pollStatus();
Creating a Batch (CALLBACK)
- Python
- Node.js
import requests
workspace_id = "6c41dc1b-910c-4358-beef-2c609d38db31"
environment_id = "6c1957ca-77ca-49b3-8fa1-0590281b8b44"
# Create batch with callback
response = requests.post(
f"http://localhost:3000/v1/completion-batch/{workspace_id}/{environment_id}",
headers={
"Authorization": "Bearer your-vmx-api-key",
"Content-Type": "application/json",
},
json={
"type": "CALLBACK",
"callbackOptions": {
"url": "https://your-app.com/batch-callback",
"headers": {
"X-API-Key": "your-secret-key"
},
"events": ["item_update", "batch_update"]
},
"items": [
{
"resourceId": "your-resource-id",
"request": {
"messages": [
{"role": "user", "content": "Process document..."}
]
}
}
]
}
)
batch = response.json()
print(f"Batch created: {batch['batchId']}")
import axios from 'axios';
const workspaceId = '6c41dc1b-910c-4358-beef-2c609d38db31';
const environmentId = '6c1957ca-77ca-49b3-8fa1-0590281b8b44';
// Create batch with callback
const { data: batch } = await axios.post(
`http://localhost:3000/v1/completion-batch/${workspaceId}/${environmentId}`,
{
type: 'CALLBACK',
callbackOptions: {
url: 'https://your-app.com/batch-callback',
headers: {
'X-API-Key': 'your-secret-key',
},
events: ['item_update', 'batch_update'],
},
items: [
{
resourceId: 'your-resource-id',
request: {
messages: [{ role: 'user', content: 'Process document...' }],
},
},
],
},
{
headers: {
Authorization: 'Bearer your-vmx-api-key',
'Content-Type': 'application/json',
},
}
);
console.log(`Batch created: ${batch.batchId}`);
Batch-Level Capacity
Override capacity limits for a specific batch:
{
"type": "ASYNC",
"capacity": [
{
"period": "MINUTE",
"requests": 50,
"tokens": 50000,
"enabled": true
}
],
"items": [...]
}
This allows you to:
- Process batches at a slower rate to avoid overwhelming downstream systems
- Allocate specific capacity to a batch for cost control
- Test different rate limits without modifying AI Resource configuration
Endpoint Reference
All endpoints are scoped under /v1/completion-batch and require either an authenticated user (workspace member) or a workspace API key with completion-batch permissions. The resourceId on each item is enforced against the API key's resources allowlist when an API key is used.
| Method | Path | Purpose |
|---|---|---|
POST | /v1/completion-batch/{workspaceId}/{environmentId} | Create a batch. Body is CreateCompletionBatchDto (type: SYNC | ASYNC) or CreateCompletionCallbackBatchDto (type: CALLBACK + callbackOptions). For SYNC, the response is an SSE stream; for ASYNC / CALLBACK, the response is the created CompletionBatchDto. |
GET | /v1/completion-batch/{workspaceId}/{environmentId}/{batchId} | Poll for batch status. Returns CompletionBatchDto with completedPercentage. Query params: includesUsers (default true), includesItems (default true). |
GET | /v1/completion-batch/{workspaceId}/{environmentId}/{batchId}/{itemId} | Fetch a single item (CompletionBatchItemEntity), including its response once COMPLETED. |
POST | /v1/completion-batch/{workspaceId}/{environmentId}/{batchId}/cancel | Cancel the batch and mark any still-PENDING items as CANCELLED. |
SSE Event Types
When using SYNC type, the response is text/event-stream. Each event is delivered as a JSON data: line whose action discriminator is one of:
batch-created: Batch was created and queued.payloadis the fullCompletionBatchEntityitem-running: Item started processing.payloadis theCompletionBatchItemEntityitem-completed: Item completed successfully.payloadis theCompletionBatchItemEntity(withresponsepopulated)item-failed: Item failed with an error.payloadis theCompletionBatchItemEntity(witherrorMessagepopulated)batch-completed: All items completed.payloadis theCompletionBatchEntitybatch-failed: Batch failed (some items failed).payloadis theCompletionBatchEntity
Two terminator markers close the stream:
data: [DONE]— emitted after the last event when the batch finishes normallydata: [ERROR]— emitted (after adata: { "error": ... }line) if the SSE pipeline itself errors
Lifecycle States
Both CompletionBatchEntity.status (the parent batch) and CompletionBatchItemEntity.status (each item) draw from the same enum, PublicCompletionBatchRequestStatus:
PENDING: Created and queued, waiting on capacity / a workerRUNNING: At least one item is in flight (or, for an item, the worker has picked it up)COMPLETED: All items completed successfully (batch) / item completed andresponseis populatedFAILED: At least one item failed and could not be retried (batch) / item exhausted retries with a non-retryable error (item)CANCELLED: The batch was cancelled viaPOST /v1/completion-batch/{workspaceId}/{environmentId}/{batchId}/cancel— pending items are also markedCANCELLEDin the same transaction
The batch tracks per-state counters (pending, running, completed, failed) plus totalItems, plus token totals (totalEstimatedPromptTokens, totalPromptTokens, totalOutputTokens). GET responses on the batch also surface a derived completedPercentage.
Callback Events
When using CALLBACK type, your endpoint receives HTTP POST requests. The events array on callbackOptions accepts the following values (lowercase wire format, defined by CompletionBatchCallbackEvent in packages/api/src/gateway/batch/dto/callback-options.dto.ts):
item_update: Send a callback when any item transitions state (running / completed / failed / cancelled)batch_update: Send a callback when the batch itself transitions state (completed / failed)all: Subscribe to every event
The HTTP POST body uses the event field with the uppercase names — ITEM_UPDATE / BATCH_UPDATE — and a payload field containing the full CompletionBatchItemEntity or CompletionBatchEntity.
Best Practices
1. Estimate Batch Size
- Calculate estimated tokens per item
- Ensure total tokens don't exceed your capacity
- Consider processing time based on your TPM/RPM limits
2. Choose the Right Type
- SYNC: Use for interactive applications or when you need immediate feedback
- ASYNC: Use for background processing with polling
- CALLBACK: Use for webhook-based integrations
3. Handle Errors
- Implement retry logic for transient failures
- Monitor batch status regularly
- Set up alerts for failed batches
4. Monitor Capacity
- Monitor your capacity usage
- Adjust batch-level capacity if needed
- Consider processing during off-peak hours for large batches
5. Optimize Item Size
- Keep items reasonably sized
- Avoid extremely large prompts that consume too many tokens
- Consider splitting very large documents into smaller chunks
Troubleshooting
Batch Stuck in PENDING
- Check Capacity: Verify you have available capacity (RPM/TPM)
- Check Workers: Ensure batch consumer workers are running
- Check Redis: Verify Redis connectivity and queue status
- Review Logs: Check application logs for errors
Items Failing
- Check Error Messages: Review item error messages in batch status
- Verify Resource: Ensure AI Resource is configured correctly
- Check API Keys: Verify API keys have access to the resource
- Review Provider Errors: Check underlying provider error messages
Slow Processing
- Check Capacity Limits: Low TPM/RPM limits will slow processing
- Monitor Queue Depth: Large queues may take time to process
- Check Worker Count: More workers can process items faster
- Review Item Complexity: Complex prompts take longer to process
Next Steps
- AI Resources — what each batch item routes to via
resourceId - AI Connections — where the connection-level TPM/RPM caps that batch consumers respect are defined
- Capacity Management — resource-level capacity overrides that the batch scheduler honors
- Prioritization — pool definitions interact with batch consumers when the same resource is shared with online traffic
- Chat Completions API — the request shape used inside each item's
requestfield - Anthropic Messages API, Responses API — alternate request shapes that each batch item's
requestmay use - Usage Monitoring — batch token totals are reported through the same usage pipeline