Assistant Calling Agent Module

Overview

The Assistant Calling Agent is an intelligent backend agent that handles calling other assistants and managing cross-assistant communication within the Aitana platform. This agent enables discovery of available assistants, sequential or parallel calling strategies, context sharing, and real-time streaming of responses.

Location: backend/agents/assistant_calling_agent.py

Key Features

  1. Discovery - Find available assistants based on user access permissions
  2. Execution Strategies - Sequential or parallel calling of multiple assistants
  3. Context Sharing - Share chat history, selected items, and file uploads between assistants
  4. Access Control - Tag-based recursion limits and permission validation
  5. Real-time Streaming - Live response streaming as assistant calls complete
  6. Error Handling - Robust error management with retry logic and graceful degradation

Functions

assistant_calling_agent(question, assistant_ids, call_strategy, …)

Main orchestration function that coordinates calls to multiple assistants.

Parameters:

  • question (str): The question/request to send to target assistants
  • assistant_ids (List[str], optional): List of assistant IDs to call
  • call_strategy (str, default=’parallel’): ‘parallel’ or ‘sequential’ calling strategy
  • include_context (bool, default=True): Whether to share chat history and context
  • max_iterations (int, default=1): Maximum recursion depth for assistant calls
  • context_data (Dict[str, Any], optional): Additional context data to pass to assistants
  • chat_history (List[Dict[str, Any]], optional): Current conversation history to share
  • selected_items (List[Dict[str, Any]], optional): Selected items from document browser
  • currentUser (Dict[str, Any], optional): Current user context for authentication
  • toolConfigs (Dict[str, Any], optional): Tool configurations including access permissions
  • trace (optional): Langfuse trace for observability
  • parent_observation_id (optional): Parent observation for nested tracing
  • callback (optional): Streaming callback for user updates

Returns:

  • str: Combined response from target assistants or error message

Example:

result = await assistant_calling_agent(
    question="What's the weather like today?",
    assistant_ids=["weather-bot", "news-bot"],
    call_strategy="parallel",
    include_context=True,
    currentUser=user_context,
    callback=streaming_callback
)

Workflow:

  1. Validates input parameters and enforces recursion limits
  2. Validates access to target assistants
  3. Prepares call context with shared data
  4. Executes assistant calls based on strategy
  5. Formats and returns combined response

_get_recursion_limit(currentUser, toolConfigs)

Determines recursion limit based on user tags and permissions.

Parameters:

  • currentUser (Dict[str, Any]): Current user context containing tags
  • toolConfigs (Dict[str, Any]): Tool configurations (currently unused)

Returns:

  • int: Maximum recursion depth allowed for this user

Tag-based Limits:

  • admin-tools tag: 50 (capped for safety)
  • beta-testers tag: 10
  • Default users: 3

Example:

# Admin user
user = {"tags": ["admin-tools"]}
limit = _get_recursion_limit(user, {})  # Returns 50

# Beta tester
user = {"tags": ["beta-testers"]}
limit = _get_recursion_limit(user, {})  # Returns 10

# Regular user
user = {"tags": []}
limit = _get_recursion_limit(user, {})  # Returns 3

_validate_assistants_access(assistant_ids, currentUser, trace, …)

Validates user access to target assistants and returns accessible ones.

Parameters:

  • assistant_ids (List[str]): List of assistant IDs to validate
  • currentUser (Dict[str, Any]): Current user context for access control
  • trace (optional): Langfuse trace for observability
  • parent_observation_id (optional): Parent observation ID
  • callback (optional): Streaming callback for progress updates

Returns:

  • List[Dict[str, Any]]: List of accessible assistant configurations

Example:

accessible = await _validate_assistants_access(
    assistant_ids=["assistant-1", "assistant-2"],
    currentUser=user_context,
    trace=langfuse_trace
)
# Returns: [{'id': 'assistant-1', 'name': 'Assistant 1', 'accessible': True}, ...]

Note: Current implementation is a placeholder that assumes all provided assistants are accessible. Full implementation would:

  1. Query Firestore to check if assistants exist
  2. Apply access control logic from firebase.ts
  3. Filter based on user permissions and assistant visibility settings

_prepare_call_context(question, context_data, chat_history, …)

Prepares the context data structure for assistant calls.

Parameters:

  • question (str): User input question
  • context_data (Dict[str, Any]): Additional context data
  • chat_history (List[Dict[str, Any]]): Conversation history
  • selected_items (List[Dict[str, Any]]): Selected document items
  • currentUser (Dict[str, Any]): User context
  • max_iterations (int): Recursion depth limit
  • trace (optional): Langfuse trace for observability
  • parent_observation_id (optional): Parent observation ID

Returns:

  • Dict[str, Any]: Prepared call context for assistant execution

Context Structure:

{
    'user_input': question,
    'context_data': context_data,
    'chat_history': chat_history,
    'selected_items': selected_items,
    'currentUser': currentUser,
    'max_iterations': max_iterations - 1,  # Decremented for recursion
    'metadata': {
        'caller': 'assistant_calling_agent',
        'timestamp': current_time,
        'recursion_depth': max_iterations
    }
}

Example:

context = await _prepare_call_context(
    question="Summarize this document",
    context_data={"document_id": "doc123"},
    chat_history=conversation_history,
    selected_items=[{"type": "pdf", "url": "..."}],
    currentUser=user_info,
    max_iterations=3
)

_call_assistants_parallel(assistants, call_context, trace, …)

Executes calls to multiple assistants in parallel using AsyncTaskRunner.

Parameters:

  • assistants (List[Dict[str, Any]]): List of accessible assistant configurations
  • call_context (Dict[str, Any]): Prepared context for assistant calls
  • trace (optional): Langfuse trace for observability
  • parent_observation_id (optional): Parent observation ID
  • callback (optional): Streaming callback for progress updates

Returns:

  • Tuple[Dict[str, str], Dict[str, Dict[str, str]]]:
    • Responses dictionary (task_name -> response)
    • Task-to-assistant mapping for display names

Features:

  • Event Loop Safety: Handles closed/missing event loops gracefully
  • Retry Logic: Built-in exponential backoff retry mechanism
  • Real-time Updates: Streams progress via heartbeat messages
  • Error Isolation: Failed assistant calls don’t affect others
  • Unique Task Names: Each assistant gets a unique task identifier

Example:

responses, task_mapping = await _call_assistants_parallel(
    assistants=[
        {'id': 'weather-bot', 'name': 'Weather Assistant'},
        {'id': 'news-bot', 'name': 'News Assistant'}
    ],
    call_context=prepared_context,
    callback=progress_callback
)

# responses = {
#     'assistant_weather-bot': 'Today is sunny with 75°F...',
#     'assistant_news-bot': 'Breaking news: ...'
# }

Progress Updates:

  • heartbeat: Assistant is working (shows elapsed time)
  • task_complete: Assistant finished successfully
  • task_error: Assistant encountered an error

Integration

Tool Interface

The module provides assistant_calling_agent_tool() for integration with the tool orchestrator system.

Input Format:

[
    {
        'question': 'user question',
        'assistant_ids': ['assistant-1', 'assistant-2'],
        'call_strategy': 'parallel',  # or 'sequential'
        'include_context': True,
        'max_iterations': 3
    }
]

VAC Service Integration

The agent integrates with the VAC (Virtual Assistant Calling) service through:

  • Direct calls to vac_stream() for individual assistant execution
  • Shared context including emissaryConfig for assistant configuration
  • Streaming callback wrapper for progress updates

Error Handling

The agent implements comprehensive error handling:

  1. Input Validation: Checks for required parameters and valid formats
  2. Access Control: Validates user permissions before making calls
  3. Recursion Limits: Enforces user-specific depth limits
  4. Network Resilience: Retry logic with exponential backoff
  5. Graceful Degradation: Continues with successful assistants if others fail
  6. Event Loop Management: Handles async/await context safely

Performance Considerations

Parallel vs Sequential

Parallel Strategy (default):

  • Pros: Faster overall execution, better user experience
  • Cons: Higher resource usage, potential rate limiting
  • Best for: Independent queries, quick responses

Sequential Strategy:

  • Pros: Lower resource usage, context building between calls
  • Cons: Slower execution, user waits longer
  • Best for: Dependent queries, when context matters

Resource Management

  • Event Loop Safety: Automatically handles closed/missing event loops
  • Memory Efficient: Streams responses rather than buffering everything
  • Timeout Protection: 120-second timeout per assistant call
  • Retry Budget: Single retry attempt with exponential backoff

Observability

The agent provides comprehensive tracing through Langfuse:

  • Span Hierarchy: Main span with child spans for each operation
  • Metadata Tracking: User email, assistant IDs, strategy, timing
  • Error Logging: Detailed error traces with stack traces
  • Performance Metrics: Response times and success rates
  • vac_service.py: Core assistant execution service
  • assistant_utils.py: Assistant configuration and management
  • my_utils.py: Utility functions for thinking display and error handling
  • Tool Orchestrator: Integration layer for tool-based assistant calling
  • Firebase: User authentication and assistant storage

Security

  • User Validation: All calls require valid user context
  • Permission Checks: Assistant access validated before execution
  • Recursion Limits: Prevents infinite loops and resource exhaustion
  • Input Sanitization: Safe handling of user input and context data
  • Error Isolation: Failed assistant calls don’t expose sensitive information