Appearance
Batch Processing
Learn how to efficiently process large volumes of interviews, studies, and synthetic users at scale.
Overview
When working with production workloads, you'll often need to:
- Process multiple interviews in parallel
- Iterate through paginated lists of results
- Filter and batch process specific subsets of data
- Handle rate limiting and retries gracefully
This guide covers best practices for batch processing operations.
Iterating Through Paginated Results
Most list endpoints return paginated results. Here's how to iterate through all items:
List All Interviews
python
from syntheticusers import ApiClient, Configuration, InterviewsApi
configuration = Configuration(
host="https://api.syntheticusers.com/api/v1",
access_token="your-access-token"
)
with ApiClient(configuration) as api_client:
interviews_api = InterviewsApi(api_client)
# Iterate through all interviews in a project
project_id = "your-project-id"
page = 1
page_size = 50 # Adjust based on your needs
all_interviews = []
while True:
# Fetch page of results
response = interviews_api.list_interviews_v1(
project_id=project_id,
page=page,
page_size=page_size
)
# Add items to our collection
all_interviews.extend(response.items)
# Check if we've reached the end
if len(response.items) < page_size:
break
page += 1
print(f"Total interviews: {len(all_interviews)}")Helper Function for Pagination
Create a reusable helper to paginate any endpoint:
python
def paginate_all(api_method, **kwargs):
"""
Generic pagination helper for any list endpoint.
Args:
api_method: The API method to call (e.g., interviews_api.list_interviews_v1)
**kwargs: Additional arguments to pass to the API method
Returns:
List of all items across all pages
"""
page = 1
page_size = kwargs.pop('page_size', 50)
all_items = []
while True:
response = api_method(page=page, page_size=page_size, **kwargs)
all_items.extend(response.items)
if len(response.items) < page_size:
break
page += 1
return all_items
# Usage
all_interviews = paginate_all(
interviews_api.list_interviews_v1,
project_id="your-project-id",
status="completed"
)Filtering and Batch Processing
Use filters to process specific subsets of data efficiently.
Process Interviews by Status
python
from syntheticusers import ApiClient, Configuration, InterviewsApi, SummariesApi
with ApiClient(configuration) as api_client:
interviews_api = InterviewsApi(api_client)
summaries_api = SummariesApi(api_client)
# Get all completed interviews that need analysis
completed_interviews = paginate_all(
interviews_api.list_interviews_v1,
project_id="your-project-id",
status="completed"
)
print(f"Processing {len(completed_interviews)} completed interviews...")
# Process each interview
for interview in completed_interviews:
try:
# Example: Generate summary for each interview
summary = summaries_api.create_summary_v1(
project_id="your-project-id",
summary_create={
"study_id": interview.study_id,
"interview_ids": [interview.id]
}
)
print(f"✓ Generated summary for interview {interview.id}")
except Exception as e:
print(f"✗ Failed to process interview {interview.id}: {e}")Batch by Audience
python
# Get all synthetic users in an audience
from syntheticusers import SyntheticUsersApi
synthetic_users_api = SyntheticUsersApi(api_client)
users = paginate_all(
synthetic_users_api.list_synthetic_users_v1,
project_id="your-project-id",
audience_id="your-audience-id"
)
print(f"Found {len(users)} synthetic users in audience")
# Group by some attribute for batch processing
from collections import defaultdict
by_persona = defaultdict(list)
for user in users:
persona_type = user.persona.get("type", "default")
by_persona[persona_type].append(user)
for persona_type, users_list in by_persona.items():
print(f"{persona_type}: {len(users_list)} users")Parallel Processing
For CPU-bound operations, use Python's concurrent processing capabilities:
Using ThreadPoolExecutor
python
from concurrent.futures import ThreadPoolExecutor, as_completed
from syntheticusers import ApiClient, Configuration, InterviewsApi
def process_interview(interview_id, api_client):
"""Process a single interview"""
try:
interviews_api = InterviewsApi(api_client)
# Perform some operation
interview = interviews_api.get_interview_v1(interview_id)
# Do something with the interview
return {"success": True, "interview_id": interview_id}
except Exception as e:
return {"success": False, "interview_id": interview_id, "error": str(e)}
with ApiClient(configuration) as api_client:
interviews_api = InterviewsApi(api_client)
# Get all interviews to process
interviews = paginate_all(
interviews_api.list_interviews_v1,
project_id="your-project-id"
)
interview_ids = [i.id for i in interviews]
# Process in parallel (max 5 concurrent)
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {
executor.submit(process_interview, iid, api_client): iid
for iid in interview_ids
}
for future in as_completed(futures):
result = future.result()
if result["success"]:
print(f"✓ Processed {result['interview_id']}")
else:
print(f"✗ Failed {result['interview_id']}: {result['error']}")Rate Limiting Best Practices
Handle rate limits gracefully with exponential backoff:
python
import time
from syntheticusers.exceptions import ApiException
def api_call_with_retry(api_method, max_retries=3, **kwargs):
"""
Call an API method with exponential backoff retry logic.
Args:
api_method: The API method to call
max_retries: Maximum number of retry attempts
**kwargs: Arguments to pass to the API method
Returns:
The API response
Raises:
ApiException: If all retries are exhausted
"""
for attempt in range(max_retries):
try:
return api_method(**kwargs)
except ApiException as e:
# Check if it's a rate limit error (429)
if e.status == 429 and attempt < max_retries - 1:
# Exponential backoff: 1s, 2s, 4s, etc.
wait_time = 2 ** attempt
print(f"Rate limited. Waiting {wait_time}s before retry...")
time.sleep(wait_time)
else:
raise
# Usage
with ApiClient(configuration) as api_client:
interviews_api = InterviewsApi(api_client)
interview = api_call_with_retry(
interviews_api.get_interview_v1,
interview_id="some-interview-id"
)Bulk Operations
Use bulk endpoints when available for better performance:
Extend an Audience
python
from syntheticusers import AudiencesApi
with ApiClient(configuration) as api_client:
audiences_api = AudiencesApi(api_client)
# Add 50 more users to an existing audience
result = audiences_api.extend_audience_v1(
audience_id="your-audience-id",
audience_extend={
"size": 50
}
)
print(f"Added {len(result.new_users)} users to audience")Regenerate All Interviews
python
from syntheticusers import StudiesApi
with ApiClient(configuration) as api_client:
studies_api = StudiesApi(api_client)
# Regenerate all interviews in a study at once
result = studies_api.regenerate_all_interviews_v1(
study_id="your-study-id"
)
print(f"Regenerating {result.total_interviews} interviews")Progress Tracking
Track progress for long-running batch operations:
python
from tqdm import tqdm # pip install tqdm
with ApiClient(configuration) as api_client:
interviews_api = InterviewsApi(api_client)
interviews = paginate_all(
interviews_api.list_interviews_v1,
project_id="your-project-id"
)
# Process with progress bar
results = []
for interview in tqdm(interviews, desc="Processing interviews"):
try:
# Your processing logic here
result = process_interview(interview)
results.append(result)
except Exception as e:
print(f"\nError processing {interview.id}: {e}")
print(f"\nCompleted: {len(results)}/{len(interviews)}")Error Handling Strategies
Implement robust error handling for batch operations:
python
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def batch_process_with_error_handling(items, process_func):
"""
Process a batch of items with comprehensive error handling.
Args:
items: List of items to process
process_func: Function to process each item
Returns:
Dictionary with results, errors, and summary
"""
results = {
"successful": [],
"failed": [],
"start_time": datetime.now()
}
for item in items:
try:
result = process_func(item)
results["successful"].append({
"item": item,
"result": result
})
logger.info(f"✓ Processed {item.id}")
except Exception as e:
results["failed"].append({
"item": item,
"error": str(e)
})
logger.error(f"✗ Failed {item.id}: {e}")
results["end_time"] = datetime.now()
results["duration"] = (results["end_time"] - results["start_time"]).total_seconds()
logger.info(f"""
Batch Processing Summary:
- Total: {len(items)}
- Successful: {len(results['successful'])}
- Failed: {len(results['failed'])}
- Duration: {results['duration']:.2f}s
""")
return results
# Usage
def process_single_interview(interview):
# Your processing logic
return {"processed": True}
with ApiClient(configuration) as api_client:
interviews_api = InterviewsApi(api_client)
interviews = paginate_all(
interviews_api.list_interviews_v1,
project_id="your-project-id"
)
results = batch_process_with_error_handling(
interviews,
process_single_interview
)
# Save failed items for retry
if results["failed"]:
with open("failed_interviews.txt", "w") as f:
for item in results["failed"]:
f.write(f"{item['item'].id}: {item['error']}\n")Next Steps
- Export Pipeline - Learn how to export and download results
- API Reference - Browse all available API methods
- Quick Start - Review the basic workflow