Assistant SSE Streaming API

Overview

The Assistant SSE (Server-Sent Events) streaming endpoint is the primary production endpoint for real-time assistant communication. It provides streaming responses with full thinking content preservation, Firebase persistence, and tool orchestration.

Endpoints

Production Endpoint (SSE)

  • URL: /vac/assistant/{assistant_id}/sse
  • Method: POST
  • Function: assistant_stream_sse() in backend/assistant_config.py
  • Status: ✅ PRODUCTION - Used by frontend

Legacy Endpoint

  • URL: /vac/assistant/{assistant_id}
  • Method: POST
  • Function: assistant_stream() in backend/assistant_config.py
  • Status: ⚠️ LEGACY - Kept for backward compatibility

Key Features

1. Real-Time SSE Streaming

The endpoint streams responses in real-time using Server-Sent Events:

  • Chunks are sent as data: {"chunk": "..."}
  • Final response includes metadata: data: {"answer": "...", "metadata": {...}}
  • Stream ends with: data: [DONE]

2. Thinking Content Preservation

CRITICAL: The endpoint accumulates all streamed content including <thinking> tags:

# Content is accumulated during streaming
accumulated_response += chunk

# When streaming completes, the accumulated content (WITH thinking tags) 
# is preserved and saved to Firestore
chunk_with_full_content['answer'] = accumulated_response

This ensures that:

  • Thinking content is visible during streaming ✅
  • Thinking content is saved to Firestore ✅
  • Thinking content is available when messages are reloaded ✅

3. Firebase Persistence

Messages are automatically saved to Firestore with:

  • User messages with proper metadata
  • Assistant responses with thinking content preserved
  • Structured output when available
  • Trace IDs for debugging

Request Format

{
  "user_input": "Your question here",
  "chat_history": [],
  "save_to_history": true,
  "user_message_already_saved": false,
  "emissaryConfig": {
    "assistantId": "assistant-id",
    "name": "Assistant Name",
    "initialInstructions": "Be helpful",
    "tools": ["tool1", "tool2"],
    "toolConfigs": {},
    "currentUser": {
      "email": "user@example.com",
      "displayName": "User Name"
    }
  }
}

Response Format

Streaming Chunks

data: {"chunk": "Here is "}
data: {"chunk": "the response "}
data: {"chunk": "<thinking>Processing request</thinking>"}
data: {"chunk": "with content."}

Final Response

data: {
  "answer": "Here is the response <thinking>Processing request</thinking> with content.",
  "metadata": {
    "trace_id": "trace-123",
    "structured_output": {},
    "tools_to_use": []
  }
}
data: [DONE]

Implementation Details

Content Accumulation Flow

  1. Streaming Phase: Content chunks are accumulated
    accumulated_response = ""
    async for chunk in stream:
        if isinstance(chunk, str):
            accumulated_response += chunk
            yield f"data: {json.dumps({'chunk': chunk})}\n\n"
    
  2. Final Response Phase: Accumulated content is preserved
    # CRITICAL: Don't overwrite accumulated_response!
    # It contains the full content WITH thinking tags
    if not accumulated_response:
        accumulated_response = chunk.get('answer', '')  # Fallback only
    
  3. Save Phase: Full content is saved to Firestore
    chunk_with_full_content = {**chunk}
    chunk_with_full_content['answer'] = accumulated_response
    await save_messages_after_streaming(..., chunk_with_full_content, ...)
    

Message Saving

The save_messages_after_streaming() function:

  1. Extracts thinking content from the full response
  2. Saves user message (if not already saved by frontend)
  3. Saves assistant message with:
    • Clean content (visible text)
    • Thinking content (preserved separately)
    • Structured output (if available)
    • Trace ID for debugging

Frontend Integration

The frontend uses this endpoint via vacChat.ts:

// Frontend calls the SSE endpoint
apiEndpoint: `/vac/assistant/${assistantId}/sse`

// Accumulates content during streaming
accumulatedContent += chunk

// Displays thinking content when available
if (message.thinkingContent) {
  // Show thinking panel
}

Migration from Legacy Endpoint

If you’re still using the legacy /vac/assistant/{assistant_id} endpoint:

  1. Update URL: Add /sse to the endpoint
  2. No other changes needed: Request/response format is compatible
  3. Benefits:
    • Real-time streaming
    • Thinking content preservation
    • Better performance

Debugging

Check Thinking Content

  1. Look for <thinking> tags in the accumulated response
  2. Verify thinkingContent field in Firestore documents
  3. Check browser console for thinking content extraction

Common Issues

Issue: Thinking content not saved

  • Cause: Accumulated content being overwritten
  • Fix: Ensure accumulated_response is preserved (implemented in latest version)

Issue: Streaming not working

  • Cause: Missing SSE headers
  • Fix: Ensure response has Content-Type: text/event-stream

Testing

Manual Test

curl -X POST http://localhost:1956/vac/assistant/{assistant_id}/sse \
  -H "Content-Type: application/json" \
  -H "Accept: text/event-stream" \
  -d '{
    "user_input": "Test with thinking",
    "save_to_history": true,
    "emissaryConfig": {
      "assistantId": "test-assistant",
      "name": "Test Assistant",
      "currentUser": {"email": "test@example.com"}
    }
  }'

Verify Thinking Content

  1. Send a message that triggers thinking
  2. Check streaming output for <thinking> tags
  3. Reload page and verify thinking content persists
  4. Check Firestore for thinkingContent field