Skip to content

Batch Processing

Learn how to efficiently process large volumes of interviews, studies, and synthetic users at scale.

Overview

When working with production workloads, you'll often need to:

  • Process multiple interviews in parallel
  • Iterate through paginated lists of results
  • Filter and batch process specific subsets of data
  • Handle rate limiting and retries gracefully

This guide covers best practices for batch processing operations.

Iterating Through Paginated Results

Most list endpoints return paginated results. Here's how to iterate through all items:

List All Interviews

python
from syntheticusers import ApiClient, Configuration, InterviewsApi

configuration = Configuration(
    host="https://api.syntheticusers.com/api/v1",
    access_token="your-access-token"
)

with ApiClient(configuration) as api_client:
    interviews_api = InterviewsApi(api_client)
    
    # Iterate through all interviews in a project
    project_id = "your-project-id"
    page = 1
    page_size = 50  # Adjust based on your needs
    all_interviews = []
    
    while True:
        # Fetch page of results
        response = interviews_api.list_interviews_v1(
            project_id=project_id,
            page=page,
            page_size=page_size
        )
        
        # Add items to our collection
        all_interviews.extend(response.items)
        
        # Check if we've reached the end
        if len(response.items) < page_size:
            break
            
        page += 1
    
    print(f"Total interviews: {len(all_interviews)}")

Helper Function for Pagination

Create a reusable helper to paginate any endpoint:

python
def paginate_all(api_method, **kwargs):
    """
    Generic pagination helper for any list endpoint.
    
    Args:
        api_method: The API method to call (e.g., interviews_api.list_interviews_v1)
        **kwargs: Additional arguments to pass to the API method
    
    Returns:
        List of all items across all pages
    """
    page = 1
    page_size = kwargs.pop('page_size', 50)
    all_items = []
    
    while True:
        response = api_method(page=page, page_size=page_size, **kwargs)
        all_items.extend(response.items)
        
        if len(response.items) < page_size:
            break
            
        page += 1
    
    return all_items

# Usage
all_interviews = paginate_all(
    interviews_api.list_interviews_v1,
    project_id="your-project-id",
    status="completed"
)

Filtering and Batch Processing

Use filters to process specific subsets of data efficiently.

Process Interviews by Status

python
from syntheticusers import ApiClient, Configuration, InterviewsApi, SummariesApi

with ApiClient(configuration) as api_client:
    interviews_api = InterviewsApi(api_client)
    summaries_api = SummariesApi(api_client)
    
    # Get all completed interviews that need analysis
    completed_interviews = paginate_all(
        interviews_api.list_interviews_v1,
        project_id="your-project-id",
        status="completed"
    )
    
    print(f"Processing {len(completed_interviews)} completed interviews...")
    
    # Process each interview
    for interview in completed_interviews:
        try:
            # Example: Generate summary for each interview
            summary = summaries_api.create_summary_v1(
                project_id="your-project-id",
                summary_create={
                    "study_id": interview.study_id,
                    "interview_ids": [interview.id]
                }
            )
            print(f"✓ Generated summary for interview {interview.id}")
        except Exception as e:
            print(f"✗ Failed to process interview {interview.id}: {e}")

Batch by Audience

python
# Get all synthetic users in an audience
from syntheticusers import SyntheticUsersApi

synthetic_users_api = SyntheticUsersApi(api_client)

users = paginate_all(
    synthetic_users_api.list_synthetic_users_v1,
    project_id="your-project-id",
    audience_id="your-audience-id"
)

print(f"Found {len(users)} synthetic users in audience")

# Group by some attribute for batch processing
from collections import defaultdict

by_persona = defaultdict(list)
for user in users:
    persona_type = user.persona.get("type", "default")
    by_persona[persona_type].append(user)

for persona_type, users_list in by_persona.items():
    print(f"{persona_type}: {len(users_list)} users")

Parallel Processing

For CPU-bound operations, use Python's concurrent processing capabilities:

Using ThreadPoolExecutor

python
from concurrent.futures import ThreadPoolExecutor, as_completed
from syntheticusers import ApiClient, Configuration, InterviewsApi

def process_interview(interview_id, api_client):
    """Process a single interview"""
    try:
        interviews_api = InterviewsApi(api_client)
        # Perform some operation
        interview = interviews_api.get_interview_v1(interview_id)
        # Do something with the interview
        return {"success": True, "interview_id": interview_id}
    except Exception as e:
        return {"success": False, "interview_id": interview_id, "error": str(e)}

with ApiClient(configuration) as api_client:
    interviews_api = InterviewsApi(api_client)
    
    # Get all interviews to process
    interviews = paginate_all(
        interviews_api.list_interviews_v1,
        project_id="your-project-id"
    )
    
    interview_ids = [i.id for i in interviews]
    
    # Process in parallel (max 5 concurrent)
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = {
            executor.submit(process_interview, iid, api_client): iid 
            for iid in interview_ids
        }
        
        for future in as_completed(futures):
            result = future.result()
            if result["success"]:
                print(f"✓ Processed {result['interview_id']}")
            else:
                print(f"✗ Failed {result['interview_id']}: {result['error']}")

Rate Limiting Best Practices

Handle rate limits gracefully with exponential backoff:

python
import time
from syntheticusers.exceptions import ApiException

def api_call_with_retry(api_method, max_retries=3, **kwargs):
    """
    Call an API method with exponential backoff retry logic.
    
    Args:
        api_method: The API method to call
        max_retries: Maximum number of retry attempts
        **kwargs: Arguments to pass to the API method
    
    Returns:
        The API response
    
    Raises:
        ApiException: If all retries are exhausted
    """
    for attempt in range(max_retries):
        try:
            return api_method(**kwargs)
        except ApiException as e:
            # Check if it's a rate limit error (429)
            if e.status == 429 and attempt < max_retries - 1:
                # Exponential backoff: 1s, 2s, 4s, etc.
                wait_time = 2 ** attempt
                print(f"Rate limited. Waiting {wait_time}s before retry...")
                time.sleep(wait_time)
            else:
                raise

# Usage
with ApiClient(configuration) as api_client:
    interviews_api = InterviewsApi(api_client)
    
    interview = api_call_with_retry(
        interviews_api.get_interview_v1,
        interview_id="some-interview-id"
    )

Bulk Operations

Use bulk endpoints when available for better performance:

Extend an Audience

python
from syntheticusers import AudiencesApi

with ApiClient(configuration) as api_client:
    audiences_api = AudiencesApi(api_client)
    
    # Add 50 more users to an existing audience
    result = audiences_api.extend_audience_v1(
        audience_id="your-audience-id",
        audience_extend={
            "size": 50
        }
    )
    
    print(f"Added {len(result.new_users)} users to audience")

Regenerate All Interviews

python
from syntheticusers import StudiesApi

with ApiClient(configuration) as api_client:
    studies_api = StudiesApi(api_client)
    
    # Regenerate all interviews in a study at once
    result = studies_api.regenerate_all_interviews_v1(
        study_id="your-study-id"
    )
    
    print(f"Regenerating {result.total_interviews} interviews")

Progress Tracking

Track progress for long-running batch operations:

python
from tqdm import tqdm  # pip install tqdm

with ApiClient(configuration) as api_client:
    interviews_api = InterviewsApi(api_client)
    
    interviews = paginate_all(
        interviews_api.list_interviews_v1,
        project_id="your-project-id"
    )
    
    # Process with progress bar
    results = []
    for interview in tqdm(interviews, desc="Processing interviews"):
        try:
            # Your processing logic here
            result = process_interview(interview)
            results.append(result)
        except Exception as e:
            print(f"\nError processing {interview.id}: {e}")
    
    print(f"\nCompleted: {len(results)}/{len(interviews)}")

Error Handling Strategies

Implement robust error handling for batch operations:

python
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def batch_process_with_error_handling(items, process_func):
    """
    Process a batch of items with comprehensive error handling.
    
    Args:
        items: List of items to process
        process_func: Function to process each item
    
    Returns:
        Dictionary with results, errors, and summary
    """
    results = {
        "successful": [],
        "failed": [],
        "start_time": datetime.now()
    }
    
    for item in items:
        try:
            result = process_func(item)
            results["successful"].append({
                "item": item,
                "result": result
            })
            logger.info(f"✓ Processed {item.id}")
        except Exception as e:
            results["failed"].append({
                "item": item,
                "error": str(e)
            })
            logger.error(f"✗ Failed {item.id}: {e}")
    
    results["end_time"] = datetime.now()
    results["duration"] = (results["end_time"] - results["start_time"]).total_seconds()
    
    logger.info(f"""
    Batch Processing Summary:
    - Total: {len(items)}
    - Successful: {len(results['successful'])}
    - Failed: {len(results['failed'])}
    - Duration: {results['duration']:.2f}s
    """)
    
    return results

# Usage
def process_single_interview(interview):
    # Your processing logic
    return {"processed": True}

with ApiClient(configuration) as api_client:
    interviews_api = InterviewsApi(api_client)
    
    interviews = paginate_all(
        interviews_api.list_interviews_v1,
        project_id="your-project-id"
    )
    
    results = batch_process_with_error_handling(
        interviews,
        process_single_interview
    )
    
    # Save failed items for retry
    if results["failed"]:
        with open("failed_interviews.txt", "w") as f:
            for item in results["failed"]:
                f.write(f"{item['item'].id}: {item['error']}\n")

Next Steps

Released under the MIT License.