Tool Orchestrator System
Overview
The Tool Orchestrator System is the central component responsible for coordinating and executing AI tools within the backend. It serves as the bridge between the AI model’s tool selections and the actual execution of those tools, managing everything from configuration merging to parallel execution and context enrichment.
Located in backend/tools/tool_orchestrator.py, this system orchestrates complex workflows involving multiple tools, handles streaming responses, and provides comprehensive error management.
Core Functionality
Primary Functions
create_context()
The main orchestration function that takes AI tool selections and executes them to create enriched context for final responses.
async def create_context(
new_question: str,
contents: List[Any],
trace,
first_responder_response: str,
callback: BufferStreamingStdOutCallbackHandlerAsync,
first_response_tools: List[Dict[str, Any]],
toolConfigs: Dict[str, Any],
currentUser: Dict[str, Any],
parent_observation_id = None,
instructions: str = "",
documents: Iterable = [],
selectedItems: Iterable = []
) -> Dict[str, Any]
Parameters:
new_question: User’s current question or inputcontents: Existing content/context from previous processingtrace: Langfuse trace object for observability trackingfirst_responder_response: AI’s initial response before tool executioncallback: Streaming callback for real-time updatesfirst_response_tools: Tools selected by the AI for executiontoolConfigs: Configuration settings for each tool typecurrentUser: Current user context for permissionsparent_observation_id: Optional parent observation ID for nested Langfuse spansinstructions: Additional instructions for tool executiondocuments: Uploaded documents to include in contextselectedItems: User-selected items for context
Returns: Dictionary containing:
answers: Results from individual tool executionsgs_uris: Google Cloud Storage URIs found in contenttotal_context_tokens: Token count for contextresponse_so_far: Accumulated response contentreferences: Reference materials from tool executionsinstructions: Processed instructions
merge_tool_configs()
Combines AI-selected tools with user-configured tool settings to create unified configurations.
def merge_tool_configs(tools: List, tool_configs: Dict[str, Any]) -> Dict[str, List]
Example:
tools = [
{'name': 'google_search', 'config': [{'parameter': 'query', 'value': 'weather today'}]},
{'name': 'vertex_search', 'config': [{'parameter': 'datastore_id', 'value': 'main'}]}
]
tool_configs = {
'google_search': {'max_results': 5, 'safe_search': True},
'vertex_search': {'datastore_id': 'backup', 'max_chunks': 10}
}
result = merge_tool_configs(tools, tool_configs)
# Returns:
# {
# 'google_search': [{'query': 'weather today', 'max_results': 5, 'safe_search': True}],
# 'vertex_search': [{'datastore_id': 'main', 'max_chunks': 10}]
# }
Special Parameter Handling:
- assistantIds: Automatically converts strings to lists and handles comma-separated values
- Duplicate Parameters: Creates separate tool instances when the same parameter appears multiple times
- Parameter Precedence: Tool-specific configs override user default configs
Supported Tools
The orchestrator supports the following tool types:
Search & Retrieval Tools
google_search_retrieval
- Function:
google_search_retrieval() - Purpose: Performs web searches using Google Search API
- Configuration: Query parameters, result limits, safe search settings
vertex_search
- Function:
vertex_search() - Purpose: Searches Vector AI datastores for semantic content retrieval
- Configuration: Datastore ID, query filters, chunk limits
url_processing
- Function:
url_processing() - Purpose: Processes and extracts content from URLs
- Configuration: URL parameters, processing options
File & Document Tools
file-browser
- Function:
extract_from_files() - Purpose: Extracts content from uploaded files and cloud storage
- Configuration: Bucket URLs, file paths, extraction options
document_search_agent
- Function:
document_search_agent_tool() - Purpose: Intelligent document search and analysis
- Configuration: Search parameters, document filters
Execution Tools
code_execution
- Function:
code_execution() - Purpose: Executes code in a sandboxed environment
- Configuration: Runtime settings, security constraints
- Special Handling: Direct streaming due to potentially large outputs
History Tools
add_chat_histories
- Function:
add_chat_histories() - Purpose: Retrieves relevant chat history for context
- Configuration: User preferences, history scope
Agent Tools
aitana-agent
- Function:
assistant_calling_agent_tool() - Purpose: Orchestrates calls to other AI assistants for specialized tasks
- Configuration: Assistant IDs, call strategy (parallel/sequential), context inclusion
- Special Features: Supports multiple assistant instances and strategic execution patterns
Execution Flow
1. Configuration Preparation
# Merge AI tool selections with user configurations
merged_tools = merge_tool_configs(first_response_tools, tool_configs=toolConfigs)
# Extract document content
doc_contents = await add_document_content(
documents=documents,
selectedItems=selectedItems,
toolConfigs=toolConfigs,
callback=callback
)
2. Task Runner Setup
runner = AsyncTaskRunner(
retry_enabled=True,
retry_kwargs = {
'wait': wait_random_exponential(multiplier=1, max=20),
'stop': stop_after_attempt(2),
'retry': retry_if_exception_type(Exception),
}
)
3. Tool Execution
The orchestrator executes tools in parallel using the AsyncTaskRunner:
# Add tasks to runner
for tool_name in tools_to_use:
func, specific_args = task_mapping[tool_name]
full_args = {**common_args, **specific_args}
runner.add_task(func, **full_args)
# Process results as they complete
async for message in runner.run_async_as_completed():
if message['type'] == 'task_complete':
# Handle successful completion
task_complete_dict = await task_complete(message, ...)
elif message['type'] == 'task_error':
# Handle errors
task_complete_dict = await task_error(message, ...)
4. Result Processing
Each tool result is processed through the task_complete() function in tools/taskrunner.py:
- Streaming: Results can be streamed in real-time to the user
- Smart Processing: AI models can further process tool results
- Context Integration: Results are integrated into the overall context
- Error Handling: Comprehensive error management and user feedback
Configuration Options
Tool-Specific Configuration
Each tool can have specific configuration options:
toolConfigs = {
'vertex_search': {
'datastore_id': 'main-datastore',
'max_chunks': 10
},
'google_search': {
'max_results': 5,
'safe_search': True
},
'file-browser': {
'bucketUrl': 'gs://my-bucket'
}
}
Streaming Control
Control streaming behavior for individual tools:
toolConfigs = {
'some_tool': {
'no-stream': True # Disable streaming for this tool
},
'suppress-tool-streaming': {
'no-stream': True # Disable streaming globally
}
}
Advanced Model Selection
Choose which AI model processes tool results:
toolConfigs = {
'advanced_models': {
'smart_tool': 'anthropic' # or 'gemini'
}
}
Error Handling
The orchestrator provides comprehensive error handling:
Retry Logic
- Exponential backoff:
wait_random_exponential(multiplier=1, max=20) - Retry attempts: Up to 2 attempts per tool
- Exception types: Retries on general exceptions
Error Processing
async def task_error(message, callback, references, answers, contents, response_so_far):
func_name = message['func_name']
error = message['error']
msg = f"\n❌ {func_name} Error: {error}\n"
response_so_far += msg
await nice_errors(msg, callback)
references[func_name] = msg
Graceful Degradation
- Failed tools don’t stop other tools from executing
- Error messages are incorporated into the final response
- User receives clear feedback about tool failures
Observability
Langfuse Integration
span = langfuse.span(
trace_id=trace.id,
name="Context",
metadata={
"merged_tools": merged_tools,
"toolConfigs": toolConfigs,
"first_response_tools": first_response_tools
},
input={
"new_question": new_question,
"contents": contents,
"instructions": instructions
}
)
Real-time Updates
await check_and_display_thinking(f"Found tools: {str(list(merged_tools.keys()))}", callback)
await check_and_display_thinking(f"Calling tools: {' '.join(tools_to_use)}", callback)
Usage Examples
Basic Tool Execution
# AI selects tools
first_response_tools = [
{
'name': 'vertex_search',
'config': [{'parameter': 'query', 'value': 'renewable energy'}]
}
]
# User configuration
toolConfigs = {
'vertex_search': {'datastore_id': 'energy-docs'}
}
# Execute orchestration
context = await create_context(
new_question="What are the latest renewable energy trends?",
contents=[],
trace=trace_obj,
first_responder_response="I'll search for renewable energy information",
callback=callback,
first_response_tools=first_response_tools,
toolConfigs=toolConfigs,
currentUser={"id": "user123"}
)
Multi-Tool Workflow
# Multiple tools selected by AI
first_response_tools = [
{
'name': 'google_search_retrieval',
'config': [{'parameter': 'query', 'value': 'latest AI research 2024'}]
},
{
'name': 'vertex_search',
'config': [{'parameter': 'query', 'value': 'machine learning advances'}]
},
{
'name': 'file-browser',
'config': [{'parameter': 'path', 'value': '/research-papers/'}]
}
]
# All tools execute in parallel
context = await create_context(
new_question="What are the latest developments in AI?",
contents=[],
trace=trace_obj,
first_responder_response="I'll gather information from multiple sources",
callback=callback,
first_response_tools=first_response_tools,
toolConfigs=toolConfigs,
currentUser=currentUser
)
Assistant Agent Workflow
# AI assistant orchestration
first_response_tools = [
{
'name': 'aitana-agent',
'config': [
{'parameter': 'assistantIds', 'value': 'research-assistant,analysis-assistant'},
{'parameter': 'call_strategy', 'value': 'parallel'},
{'parameter': 'include_context', 'value': True}
]
}
]
# Configuration with assistant settings
toolConfigs = {
'aitana-agent': {
'default_strategy': 'sequential',
'timeout': 120
}
}
context = await create_context(
new_question="Analyze this research paper and provide recommendations",
contents=[],
trace=trace_obj,
first_responder_response="I'll coordinate with specialized assistants",
callback=callback,
first_response_tools=first_response_tools,
toolConfigs=toolConfigs,
currentUser=currentUser
)
Architecture Integration
Workflow Position
User Request → AI Model (Tool Selection) → Tool Orchestrator → Individual Tools → Context Enrichment → Final AI Response
Key Dependencies
- AsyncTaskRunner: Parallel execution framework
- Langfuse: Observability and tracing
- BufferStreamingStdOutCallbackHandlerAsync: Real-time streaming
- Individual Tool Modules: Actual tool implementations
- Task Runner: Result processing and streaming
Performance Considerations
Parallel Execution
- All selected tools run concurrently via AsyncTaskRunner
- No blocking between tools unless dependencies exist
- Results processed as they complete
Memory Management
- Streaming responses prevent memory buildup
- Content limitation controls context size
- Garbage collection of completed tasks
Error Isolation
- Individual tool failures don’t affect other tools
- Comprehensive error tracking and reporting
- Graceful degradation strategies
Security Features
Tool Isolation
- Each tool runs in its own execution context
- No shared state between tools except intended outputs
- Sandboxed execution for code tools
Configuration Validation
- Tool configurations validated before execution
- User permissions checked for tool access
- Secure handling of sensitive data (API keys, etc.)
Content Filtering
- Input/output content can be filtered
- Size limits prevent abuse
- Malicious content detection
Best Practices
Configuration Design
- Keep tool configurations modular and reusable
- Use descriptive parameter names
- Provide sensible defaults for optional parameters
Error Handling
- Always provide meaningful error messages
- Include context about what the tool was attempting
- Log errors for debugging while sanitizing user-facing messages
Performance Optimization
- Use streaming for large outputs
- Implement appropriate timeouts
- Consider tool execution costs in selection logic
Monitoring
- Track tool execution times
- Monitor error rates by tool type
- Observe resource usage patterns
Related Components
tools/taskrunner.py: Handles individual tool completion and error processingtools/: Directory containing all individual tool implementationsmodels/smarts.py: AI model integration for processing tool resultssunholo.invoke.AsyncTaskRunner: Parallel execution frameworkagents/: Specialized agent tools for complex workflows
Troubleshooting
Common Issues
- Tool Configuration Errors
- Verify toolConfigs is not None
- Check parameter names match expected values
- Ensure required configurations are provided
- Execution Timeouts
- Review retry configuration
- Check individual tool performance
- Consider breaking down complex operations
- Memory Issues
- Enable streaming for large outputs
- Review content limitation settings
- Monitor task runner memory usage
- Authentication Failures
- Verify API credentials for external tools
- Check user permissions for tool access
- Review service account configurations
For additional debugging, enable detailed logging and examine Langfuse traces for execution flow analysis.