Tool Orchestrator System

Overview

The Tool Orchestrator System is the central component responsible for coordinating and executing AI tools within the backend. It serves as the bridge between the AI model’s tool selections and the actual execution of those tools, managing everything from configuration merging to parallel execution and context enrichment.

Located in backend/tools/tool_orchestrator.py, this system orchestrates complex workflows involving multiple tools, handles streaming responses, and provides comprehensive error management.

Core Functionality

Primary Functions

create_context()

The main orchestration function that takes AI tool selections and executes them to create enriched context for final responses.

async def create_context(
    new_question: str, 
    contents: List[Any], 
    trace, 
    first_responder_response: str,
    callback: BufferStreamingStdOutCallbackHandlerAsync,
    first_response_tools: List[Dict[str, Any]],
    toolConfigs: Dict[str, Any],
    currentUser: Dict[str, Any],
    parent_observation_id = None,
    instructions: str = "",
    documents: Iterable = [],
    selectedItems: Iterable = []
) -> Dict[str, Any]

Parameters:

  • new_question: User’s current question or input
  • contents: Existing content/context from previous processing
  • trace: Langfuse trace object for observability tracking
  • first_responder_response: AI’s initial response before tool execution
  • callback: Streaming callback for real-time updates
  • first_response_tools: Tools selected by the AI for execution
  • toolConfigs: Configuration settings for each tool type
  • currentUser: Current user context for permissions
  • parent_observation_id: Optional parent observation ID for nested Langfuse spans
  • instructions: Additional instructions for tool execution
  • documents: Uploaded documents to include in context
  • selectedItems: User-selected items for context

Returns: Dictionary containing:

  • answers: Results from individual tool executions
  • gs_uris: Google Cloud Storage URIs found in content
  • total_context_tokens: Token count for context
  • response_so_far: Accumulated response content
  • references: Reference materials from tool executions
  • instructions: Processed instructions

merge_tool_configs()

Combines AI-selected tools with user-configured tool settings to create unified configurations.

def merge_tool_configs(tools: List, tool_configs: Dict[str, Any]) -> Dict[str, List]

Example:

tools = [
    {'name': 'google_search', 'config': [{'parameter': 'query', 'value': 'weather today'}]},
    {'name': 'vertex_search', 'config': [{'parameter': 'datastore_id', 'value': 'main'}]}
]
tool_configs = {
    'google_search': {'max_results': 5, 'safe_search': True},
    'vertex_search': {'datastore_id': 'backup', 'max_chunks': 10}
}
result = merge_tool_configs(tools, tool_configs)
# Returns:
# {
#     'google_search': [{'query': 'weather today', 'max_results': 5, 'safe_search': True}],
#     'vertex_search': [{'datastore_id': 'main', 'max_chunks': 10}]
# }

Special Parameter Handling:

  • assistantIds: Automatically converts strings to lists and handles comma-separated values
  • Duplicate Parameters: Creates separate tool instances when the same parameter appears multiple times
  • Parameter Precedence: Tool-specific configs override user default configs

Supported Tools

The orchestrator supports the following tool types:

Search & Retrieval Tools

google_search_retrieval

  • Function: google_search_retrieval()
  • Purpose: Performs web searches using Google Search API
  • Configuration: Query parameters, result limits, safe search settings
  • Function: vertex_search()
  • Purpose: Searches Vector AI datastores for semantic content retrieval
  • Configuration: Datastore ID, query filters, chunk limits

url_processing

  • Function: url_processing()
  • Purpose: Processes and extracts content from URLs
  • Configuration: URL parameters, processing options

File & Document Tools

file-browser

  • Function: extract_from_files()
  • Purpose: Extracts content from uploaded files and cloud storage
  • Configuration: Bucket URLs, file paths, extraction options

document_search_agent

  • Function: document_search_agent_tool()
  • Purpose: Intelligent document search and analysis
  • Configuration: Search parameters, document filters

Execution Tools

code_execution

  • Function: code_execution()
  • Purpose: Executes code in a sandboxed environment
  • Configuration: Runtime settings, security constraints
  • Special Handling: Direct streaming due to potentially large outputs

History Tools

add_chat_histories

  • Function: add_chat_histories()
  • Purpose: Retrieves relevant chat history for context
  • Configuration: User preferences, history scope

Agent Tools

aitana-agent

  • Function: assistant_calling_agent_tool()
  • Purpose: Orchestrates calls to other AI assistants for specialized tasks
  • Configuration: Assistant IDs, call strategy (parallel/sequential), context inclusion
  • Special Features: Supports multiple assistant instances and strategic execution patterns

Execution Flow

1. Configuration Preparation

# Merge AI tool selections with user configurations
merged_tools = merge_tool_configs(first_response_tools, tool_configs=toolConfigs)

# Extract document content
doc_contents = await add_document_content(
    documents=documents,
    selectedItems=selectedItems,
    toolConfigs=toolConfigs,
    callback=callback
)

2. Task Runner Setup

runner = AsyncTaskRunner(
    retry_enabled=True,                 
    retry_kwargs = {
        'wait': wait_random_exponential(multiplier=1, max=20),
        'stop': stop_after_attempt(2),
        'retry': retry_if_exception_type(Exception),
    }
)

3. Tool Execution

The orchestrator executes tools in parallel using the AsyncTaskRunner:

# Add tasks to runner
for tool_name in tools_to_use:
    func, specific_args = task_mapping[tool_name]
    full_args = {**common_args, **specific_args}
    runner.add_task(func, **full_args)

# Process results as they complete
async for message in runner.run_async_as_completed():
    if message['type'] == 'task_complete':
        # Handle successful completion
        task_complete_dict = await task_complete(message, ...)
    elif message['type'] == 'task_error':
        # Handle errors
        task_complete_dict = await task_error(message, ...)

4. Result Processing

Each tool result is processed through the task_complete() function in tools/taskrunner.py:

  • Streaming: Results can be streamed in real-time to the user
  • Smart Processing: AI models can further process tool results
  • Context Integration: Results are integrated into the overall context
  • Error Handling: Comprehensive error management and user feedback

Configuration Options

Tool-Specific Configuration

Each tool can have specific configuration options:

toolConfigs = {
    'vertex_search': {
        'datastore_id': 'main-datastore',
        'max_chunks': 10
    },
    'google_search': {
        'max_results': 5,
        'safe_search': True
    },
    'file-browser': {
        'bucketUrl': 'gs://my-bucket'
    }
}

Streaming Control

Control streaming behavior for individual tools:

toolConfigs = {
    'some_tool': {
        'no-stream': True  # Disable streaming for this tool
    },
    'suppress-tool-streaming': {
        'no-stream': True  # Disable streaming globally
    }
}

Advanced Model Selection

Choose which AI model processes tool results:

toolConfigs = {
    'advanced_models': {
        'smart_tool': 'anthropic'  # or 'gemini'
    }
}

Error Handling

The orchestrator provides comprehensive error handling:

Retry Logic

  • Exponential backoff: wait_random_exponential(multiplier=1, max=20)
  • Retry attempts: Up to 2 attempts per tool
  • Exception types: Retries on general exceptions

Error Processing

async def task_error(message, callback, references, answers, contents, response_so_far):
    func_name = message['func_name']
    error = message['error']
    
    msg = f"\n{func_name} Error: {error}\n"
    response_so_far += msg
    await nice_errors(msg, callback)
    references[func_name] = msg

Graceful Degradation

  • Failed tools don’t stop other tools from executing
  • Error messages are incorporated into the final response
  • User receives clear feedback about tool failures

Observability

Langfuse Integration

span = langfuse.span(
    trace_id=trace.id,
    name="Context",
    metadata={
        "merged_tools": merged_tools, 
        "toolConfigs": toolConfigs, 
        "first_response_tools": first_response_tools
    },
    input={
        "new_question": new_question, 
        "contents": contents, 
        "instructions": instructions
    }
)

Real-time Updates

await check_and_display_thinking(f"Found tools: {str(list(merged_tools.keys()))}", callback)
await check_and_display_thinking(f"Calling tools: {' '.join(tools_to_use)}", callback)

Usage Examples

Basic Tool Execution

# AI selects tools
first_response_tools = [
    {
        'name': 'vertex_search',
        'config': [{'parameter': 'query', 'value': 'renewable energy'}]
    }
]

# User configuration
toolConfigs = {
    'vertex_search': {'datastore_id': 'energy-docs'}
}

# Execute orchestration
context = await create_context(
    new_question="What are the latest renewable energy trends?",
    contents=[],
    trace=trace_obj,
    first_responder_response="I'll search for renewable energy information",
    callback=callback,
    first_response_tools=first_response_tools,
    toolConfigs=toolConfigs,
    currentUser={"id": "user123"}
)

Multi-Tool Workflow

# Multiple tools selected by AI
first_response_tools = [
    {
        'name': 'google_search_retrieval',
        'config': [{'parameter': 'query', 'value': 'latest AI research 2024'}]
    },
    {
        'name': 'vertex_search',
        'config': [{'parameter': 'query', 'value': 'machine learning advances'}]
    },
    {
        'name': 'file-browser',
        'config': [{'parameter': 'path', 'value': '/research-papers/'}]
    }
]

# All tools execute in parallel
context = await create_context(
    new_question="What are the latest developments in AI?",
    contents=[],
    trace=trace_obj,
    first_responder_response="I'll gather information from multiple sources",
    callback=callback,
    first_response_tools=first_response_tools,
    toolConfigs=toolConfigs,
    currentUser=currentUser
)

Assistant Agent Workflow

# AI assistant orchestration
first_response_tools = [
    {
        'name': 'aitana-agent',
        'config': [
            {'parameter': 'assistantIds', 'value': 'research-assistant,analysis-assistant'},
            {'parameter': 'call_strategy', 'value': 'parallel'},
            {'parameter': 'include_context', 'value': True}
        ]
    }
]

# Configuration with assistant settings
toolConfigs = {
    'aitana-agent': {
        'default_strategy': 'sequential',
        'timeout': 120
    }
}

context = await create_context(
    new_question="Analyze this research paper and provide recommendations",
    contents=[],
    trace=trace_obj,
    first_responder_response="I'll coordinate with specialized assistants",
    callback=callback,
    first_response_tools=first_response_tools,
    toolConfigs=toolConfigs,
    currentUser=currentUser
)

Architecture Integration

Workflow Position

User Request → AI Model (Tool Selection) → Tool Orchestrator → Individual Tools → Context Enrichment → Final AI Response

Key Dependencies

  • AsyncTaskRunner: Parallel execution framework
  • Langfuse: Observability and tracing
  • BufferStreamingStdOutCallbackHandlerAsync: Real-time streaming
  • Individual Tool Modules: Actual tool implementations
  • Task Runner: Result processing and streaming

Performance Considerations

Parallel Execution

  • All selected tools run concurrently via AsyncTaskRunner
  • No blocking between tools unless dependencies exist
  • Results processed as they complete

Memory Management

  • Streaming responses prevent memory buildup
  • Content limitation controls context size
  • Garbage collection of completed tasks

Error Isolation

  • Individual tool failures don’t affect other tools
  • Comprehensive error tracking and reporting
  • Graceful degradation strategies

Security Features

Tool Isolation

  • Each tool runs in its own execution context
  • No shared state between tools except intended outputs
  • Sandboxed execution for code tools

Configuration Validation

  • Tool configurations validated before execution
  • User permissions checked for tool access
  • Secure handling of sensitive data (API keys, etc.)

Content Filtering

  • Input/output content can be filtered
  • Size limits prevent abuse
  • Malicious content detection

Best Practices

Configuration Design

  • Keep tool configurations modular and reusable
  • Use descriptive parameter names
  • Provide sensible defaults for optional parameters

Error Handling

  • Always provide meaningful error messages
  • Include context about what the tool was attempting
  • Log errors for debugging while sanitizing user-facing messages

Performance Optimization

  • Use streaming for large outputs
  • Implement appropriate timeouts
  • Consider tool execution costs in selection logic

Monitoring

  • Track tool execution times
  • Monitor error rates by tool type
  • Observe resource usage patterns
  • tools/taskrunner.py: Handles individual tool completion and error processing
  • tools/: Directory containing all individual tool implementations
  • models/smarts.py: AI model integration for processing tool results
  • sunholo.invoke.AsyncTaskRunner: Parallel execution framework
  • agents/: Specialized agent tools for complex workflows

Troubleshooting

Common Issues

  1. Tool Configuration Errors
    • Verify toolConfigs is not None
    • Check parameter names match expected values
    • Ensure required configurations are provided
  2. Execution Timeouts
    • Review retry configuration
    • Check individual tool performance
    • Consider breaking down complex operations
  3. Memory Issues
    • Enable streaming for large outputs
    • Review content limitation settings
    • Monitor task runner memory usage
  4. Authentication Failures
    • Verify API credentials for external tools
    • Check user permissions for tool access
    • Review service account configurations

For additional debugging, enable detailed logging and examine Langfuse traces for execution flow analysis.