Assistant Calling Agent Module
Overview
The Assistant Calling Agent is an intelligent backend agent that handles calling other assistants and managing cross-assistant communication within the Aitana platform. This agent enables discovery of available assistants, sequential or parallel calling strategies, context sharing, and real-time streaming of responses.
Location: backend/agents/assistant_calling_agent.py
Key Features
- Discovery - Find available assistants based on user access permissions
- Execution Strategies - Sequential or parallel calling of multiple assistants
- Context Sharing - Share chat history, selected items, and file uploads between assistants
- Access Control - Tag-based recursion limits and permission validation
- Real-time Streaming - Live response streaming as assistant calls complete
- Error Handling - Robust error management with retry logic and graceful degradation
Functions
assistant_calling_agent(question, assistant_ids, call_strategy, …)
Main orchestration function that coordinates calls to multiple assistants.
Parameters:
question(str): The question/request to send to target assistantsassistant_ids(List[str], optional): List of assistant IDs to callcall_strategy(str, default=’parallel’): ‘parallel’ or ‘sequential’ calling strategyinclude_context(bool, default=True): Whether to share chat history and contextmax_iterations(int, default=1): Maximum recursion depth for assistant callscontext_data(Dict[str, Any], optional): Additional context data to pass to assistantschat_history(List[Dict[str, Any]], optional): Current conversation history to shareselected_items(List[Dict[str, Any]], optional): Selected items from document browsercurrentUser(Dict[str, Any], optional): Current user context for authenticationtoolConfigs(Dict[str, Any], optional): Tool configurations including access permissionstrace(optional): Langfuse trace for observabilityparent_observation_id(optional): Parent observation for nested tracingcallback(optional): Streaming callback for user updates
Returns:
str: Combined response from target assistants or error message
Example:
result = await assistant_calling_agent(
question="What's the weather like today?",
assistant_ids=["weather-bot", "news-bot"],
call_strategy="parallel",
include_context=True,
currentUser=user_context,
callback=streaming_callback
)
Workflow:
- Validates input parameters and enforces recursion limits
- Validates access to target assistants
- Prepares call context with shared data
- Executes assistant calls based on strategy
- Formats and returns combined response
_get_recursion_limit(currentUser, toolConfigs)
Determines recursion limit based on user tags and permissions.
Parameters:
currentUser(Dict[str, Any]): Current user context containing tagstoolConfigs(Dict[str, Any]): Tool configurations (currently unused)
Returns:
int: Maximum recursion depth allowed for this user
Tag-based Limits:
admin-toolstag: 50 (capped for safety)beta-testerstag: 10- Default users: 3
Example:
# Admin user
user = {"tags": ["admin-tools"]}
limit = _get_recursion_limit(user, {}) # Returns 50
# Beta tester
user = {"tags": ["beta-testers"]}
limit = _get_recursion_limit(user, {}) # Returns 10
# Regular user
user = {"tags": []}
limit = _get_recursion_limit(user, {}) # Returns 3
_validate_assistants_access(assistant_ids, currentUser, trace, …)
Validates user access to target assistants and returns accessible ones.
Parameters:
assistant_ids(List[str]): List of assistant IDs to validatecurrentUser(Dict[str, Any]): Current user context for access controltrace(optional): Langfuse trace for observabilityparent_observation_id(optional): Parent observation IDcallback(optional): Streaming callback for progress updates
Returns:
List[Dict[str, Any]]: List of accessible assistant configurations
Example:
accessible = await _validate_assistants_access(
assistant_ids=["assistant-1", "assistant-2"],
currentUser=user_context,
trace=langfuse_trace
)
# Returns: [{'id': 'assistant-1', 'name': 'Assistant 1', 'accessible': True}, ...]
Note: Current implementation is a placeholder that assumes all provided assistants are accessible. Full implementation would:
- Query Firestore to check if assistants exist
- Apply access control logic from
firebase.ts - Filter based on user permissions and assistant visibility settings
_prepare_call_context(question, context_data, chat_history, …)
Prepares the context data structure for assistant calls.
Parameters:
question(str): User input questioncontext_data(Dict[str, Any]): Additional context datachat_history(List[Dict[str, Any]]): Conversation historyselected_items(List[Dict[str, Any]]): Selected document itemscurrentUser(Dict[str, Any]): User contextmax_iterations(int): Recursion depth limittrace(optional): Langfuse trace for observabilityparent_observation_id(optional): Parent observation ID
Returns:
Dict[str, Any]: Prepared call context for assistant execution
Context Structure:
{
'user_input': question,
'context_data': context_data,
'chat_history': chat_history,
'selected_items': selected_items,
'currentUser': currentUser,
'max_iterations': max_iterations - 1, # Decremented for recursion
'metadata': {
'caller': 'assistant_calling_agent',
'timestamp': current_time,
'recursion_depth': max_iterations
}
}
Example:
context = await _prepare_call_context(
question="Summarize this document",
context_data={"document_id": "doc123"},
chat_history=conversation_history,
selected_items=[{"type": "pdf", "url": "..."}],
currentUser=user_info,
max_iterations=3
)
_call_assistants_parallel(assistants, call_context, trace, …)
Executes calls to multiple assistants in parallel using AsyncTaskRunner.
Parameters:
assistants(List[Dict[str, Any]]): List of accessible assistant configurationscall_context(Dict[str, Any]): Prepared context for assistant callstrace(optional): Langfuse trace for observabilityparent_observation_id(optional): Parent observation IDcallback(optional): Streaming callback for progress updates
Returns:
Tuple[Dict[str, str], Dict[str, Dict[str, str]]]:- Responses dictionary (task_name -> response)
- Task-to-assistant mapping for display names
Features:
- Event Loop Safety: Handles closed/missing event loops gracefully
- Retry Logic: Built-in exponential backoff retry mechanism
- Real-time Updates: Streams progress via heartbeat messages
- Error Isolation: Failed assistant calls don’t affect others
- Unique Task Names: Each assistant gets a unique task identifier
Example:
responses, task_mapping = await _call_assistants_parallel(
assistants=[
{'id': 'weather-bot', 'name': 'Weather Assistant'},
{'id': 'news-bot', 'name': 'News Assistant'}
],
call_context=prepared_context,
callback=progress_callback
)
# responses = {
# 'assistant_weather-bot': 'Today is sunny with 75°F...',
# 'assistant_news-bot': 'Breaking news: ...'
# }
Progress Updates:
heartbeat: Assistant is working (shows elapsed time)task_complete: Assistant finished successfullytask_error: Assistant encountered an error
Integration
Tool Interface
The module provides assistant_calling_agent_tool() for integration with the tool orchestrator system.
Input Format:
[
{
'question': 'user question',
'assistant_ids': ['assistant-1', 'assistant-2'],
'call_strategy': 'parallel', # or 'sequential'
'include_context': True,
'max_iterations': 3
}
]
VAC Service Integration
The agent integrates with the VAC (Virtual Assistant Calling) service through:
- Direct calls to
vac_stream()for individual assistant execution - Shared context including
emissaryConfigfor assistant configuration - Streaming callback wrapper for progress updates
Error Handling
The agent implements comprehensive error handling:
- Input Validation: Checks for required parameters and valid formats
- Access Control: Validates user permissions before making calls
- Recursion Limits: Enforces user-specific depth limits
- Network Resilience: Retry logic with exponential backoff
- Graceful Degradation: Continues with successful assistants if others fail
- Event Loop Management: Handles async/await context safely
Performance Considerations
Parallel vs Sequential
Parallel Strategy (default):
- Pros: Faster overall execution, better user experience
- Cons: Higher resource usage, potential rate limiting
- Best for: Independent queries, quick responses
Sequential Strategy:
- Pros: Lower resource usage, context building between calls
- Cons: Slower execution, user waits longer
- Best for: Dependent queries, when context matters
Resource Management
- Event Loop Safety: Automatically handles closed/missing event loops
- Memory Efficient: Streams responses rather than buffering everything
- Timeout Protection: 120-second timeout per assistant call
- Retry Budget: Single retry attempt with exponential backoff
Observability
The agent provides comprehensive tracing through Langfuse:
- Span Hierarchy: Main span with child spans for each operation
- Metadata Tracking: User email, assistant IDs, strategy, timing
- Error Logging: Detailed error traces with stack traces
- Performance Metrics: Response times and success rates
Related Components
vac_service.py: Core assistant execution serviceassistant_utils.py: Assistant configuration and managementmy_utils.py: Utility functions for thinking display and error handling- Tool Orchestrator: Integration layer for tool-based assistant calling
- Firebase: User authentication and assistant storage
Security
- User Validation: All calls require valid user context
- Permission Checks: Assistant access validated before execution
- Recursion Limits: Prevents infinite loops and resource exhaustion
- Input Sanitization: Safe handling of user input and context data
- Error Isolation: Failed assistant calls don’t expose sensitive information