Streaming Context System

πŸš€ Real-time streaming communication system for AI assistants with Firebase persistence and advanced session management.

Overview

The StreamingContext provides a comprehensive solution for real-time streaming conversations with AI assistants. It handles message streaming, Firebase persistence, session management, artifact integration, and tool confirmations while maintaining state consistency across the application.

Core Features

  • Real-time Streaming: Bidirectional communication with AI assistants
  • Firebase Integration: Automatic message persistence and synchronization
  • Session Management: Trace IDs, session IDs, and abort control
  • Artifact Support: Real-time artifact creation during streaming
  • Tool Confirmation: Enhanced workflow for tool usage approval
  • State Synchronization: Coordinated state between local streaming and Firestore
  • Auto Introduction: Intelligent assistant introduction on empty conversations

Usage

Basic Setup

import { StreamingProvider, useStreaming } from '@/contexts/StreamingContext'

// Configure assistant
const assistantConfig = {
  assistantId: 'assistant-123',
  name: 'AI Assistant',
  avatar: '/avatar.png',
  tools: ['artifacts', 'search', 'documents']
}

// Wrap your chat interface
<StreamingProvider 
  assistantConfig={assistantConfig}
  currentUser={user}
>
  <ChatInterface />
</StreamingProvider>

Using the Hook

function ChatInterface() {
  const {
    streamingMessage,
    effectiveThinkingContent,
    isStreaming,
    error,
    sessionId,
    traceId,
    sendMessage,
    sendToolConfirmationMessage,
    stopStreaming,
    clearError
  } = useStreaming()

  const handleSendMessage = async () => {
    await sendMessage("Hello, can you help me?")
  }

  const handleStopStreaming = () => {
    stopStreaming()
  }

  return (
    <div>
      {isStreaming && <p>Assistant is typing...</p>}
      {streamingMessage && (
        <div>{streamingMessage.content}</div>
      )}
      {effectiveThinkingContent && (
        <div className="thinking">{effectiveThinkingContent}</div>
      )}
      <button onClick={handleSendMessage} disabled={isStreaming}>
        Send Message
      </button>
      <button onClick={handleStopStreaming} disabled={!isStreaming}>
        Stop
      </button>
    </div>
  )
}

API Reference

StreamingProvider Props

Prop Type Description Required
children ReactNode Child components Yes
assistantConfig StreamingServiceConfig Assistant configuration Yes
currentUser User | null Firebase authenticated user Yes

StreamingServiceConfig

interface StreamingServiceConfig {
  assistantId: string    // Unique assistant identifier
  name: string          // Display name
  avatar?: string       // Avatar URL
  tools?: string[]      // Available tools array
  // ... other configuration options
}

StreamingContextType

State Properties

Property Type Description
streamingMessage LocalChatMessage | null Current streaming message
effectiveThinkingContent string Combined thinking content
isStreaming boolean Streaming status
error string | null Current error message
sessionId string Current session identifier
traceId string | null Current trace identifier

Core Actions

Method Signature Description
sendMessage (text: string) => Promise Send user message and start streaming
sendToolConfirmationMessage (userMessage, tools, traceId, signal) => Promise Send tool confirmation response
stopStreaming () => void Abort current streaming operation
clearError () => void Clear current error state

Message Flow

Standard Message Flow

// 1. User sends message
await sendMessage("What's the weather like?")

// 2. Context creates trace ID and user message
const traceId = sessionManager.createNewTraceId()
const messageData = EnhancedFirebaseService.createUserMessagePayload(text, user, traceId)

// 3. Save to Firestore (parallel with streaming)
const savePromise = EnhancedFirebaseService.saveMessage(assistantId, messageData)

// 4. Start streaming with abort control
const controller = new AbortController()
await StreamingService.sendMessage(text, config, context, callbacks)

// 5. Handle streaming callbacks
onStreamChunk: (chunk) => dispatch({ type: 'UPDATE_STREAMING_CONTENT', chunk })
onArtifactContent: (artifacts) => artifacts.forEach(artifact => addArtifact(artifact))
onStreamEnd: (finalContent) => dispatch({ type: 'END_STREAMING', finalContent })

Tool Confirmation Flow

// 1. User confirms tools
await sendToolConfirmationMessage(
  "Use these tools to help me",
  confirmedTools,
  existingTraceId,
  abortSignal
)

// 2. Context reuses existing trace ID
// 3. Streams with tool confirmation payload
// 4. Handles artifacts and responses normally

State Management

Message State Coordination

The context coordinates between local streaming state and Firestore messages:

// Clear streaming message when Firestore updates
useEffect(() => {
  if (shouldClearStreamingOnNewMessage({ 
    ...state, 
    firestoreMessages: messages 
  })) {
    dispatch({ type: 'CLEAR_STREAMING_MESSAGE' })
  }
}, [messages, state.isStreaming, state.streamingMessage])

Thinking Content Management

Thinking content is preserved during interruptions:

const stopStreaming = useCallback(() => {
  if (abortControllerRef.current && state.isStreaming) {
    // Preserve thinking content before stopping
    dispatch({ 
      type: 'PRESERVE_THINKING_CONTENT', 
      content: state.streamingThinkingContent 
    })
    
    abortControllerRef.current.abort()
  }
}, [state.isStreaming, state.streamingThinkingContent])

Advanced Features

Session Management

// Session and trace management
const sessionManagerRef = useRef(new SessionManager())

// Get current session info
const sessionId = sessionManager.getSessionId()
const traceId = sessionManager.getTraceId()

// Create new trace for each message
const newTraceId = sessionManager.createNewTraceId()

Artifact Integration

Real-time artifact handling during streaming:

const { getArtifactsForApi, addArtifact } = useArtifacts()

// Include existing artifacts in API calls
const artifactContent = getArtifactsForApi(assistantConfig.assistantId, assistantConfig.tools)

// Handle new artifacts from streaming
onArtifactContent: (artifacts: ArtifactData[]) => {
  console.log('Received artifacts during streaming:', artifacts)
  artifacts.forEach(artifact => addArtifact(artifact))
}

Auto Introduction

Intelligent assistant introduction for new conversations:

useEffect(() => {
  if (
    messagesLoaded &&           // Wait for Firestore to load
    messages.length === 0 &&    // No existing messages
    currentUser?.email &&       // User is authenticated
    !state.isStreaming &&       // Not currently streaming
    assistantConfig.assistantId // Assistant is configured
  ) {
    const timer = setTimeout(() => {
      sendMessage("Hello! Please introduce yourself and tell me what you can help with.")
    }, 500)
    
    return () => clearTimeout(timer)
  }
}, [messagesLoaded, messages.length, currentUser?.email, state.isStreaming, assistantConfig.assistantId, sendMessage])

Error Handling

Comprehensive Error Management

// Stream errors
onStreamError: (err) => {
  dispatch({ 
    type: 'STREAMING_ERROR', 
    error: err instanceof Error ? err.message : 'Streaming error' 
  })
}

// Network errors
catch (error) {
  console.error('Error sending message:', error)
  toast({
    title: "Error",
    description: "Failed to send message",
    variant: "destructive"
  })
}

// User cancellation
stopStreaming() // Sets error: 'Streaming stopped by user'

Abort Control

Robust abort handling for streaming operations:

// Clean up existing controller
if (abortControllerRef.current) {
  abortControllerRef.current.abort()
}

// Create new controller for each operation
abortControllerRef.current = new AbortController()
const signal = abortControllerRef.current.signal

// Pass signal to streaming service
await StreamingService.sendMessage(text, config, { signal }, callbacks)

Integration Examples

With Message State Context

function StreamingChatInterface() {
  const { messages, selectedItems, messagesLoaded } = useMessageState()
  const { 
    streamingMessage, 
    isStreaming, 
    sendMessage 
  } = useStreaming()

  // Combine Firestore messages with streaming message
  const allMessages = useMemo(() => {
    return streamingMessage 
      ? [...messages, streamingMessage]
      : messages
  }, [messages, streamingMessage])

  return (
    <div>
      {allMessages.map(message => (
        <MessageComponent key={message.id || 'streaming'} message={message} />
      ))}
      {isStreaming && <TypingIndicator />}
    </div>
  )
}

With Tool Confirmation

function ToolConfirmationFlow() {
  const { sendToolConfirmationMessage } = useStreaming()
  
  const handleConfirmTools = async (
    originalMessage: string,
    confirmedTools: FirstImpressionTool[],
    traceId: string
  ) => {
    const controller = new AbortController()
    
    await sendToolConfirmationMessage(
      originalMessage,
      confirmedTools,
      traceId,
      controller.signal
    )
  }

  return <ToolConfirmationDialog onConfirm={handleConfirmTools} />
}

With Firebase Service

// The context automatically integrates with Firebase
const messageData = EnhancedFirebaseService.createUserMessagePayload(text, currentUser, traceId)
const savePromise = EnhancedFirebaseService.saveMessage(assistantConfig.assistantId, messageData)

// Wait for save and update usage stats
await savePromise
await EnhancedFirebaseService.updateUsageStats(assistantConfig.assistantId)

Performance Considerations

Efficient State Updates

// Memoized context value to prevent unnecessary re-renders
const contextValue = useMemo((): StreamingContextType => ({
  streamingMessage: state.streamingMessage,
  effectiveThinkingContent: getEffectiveThinkingContent(state),
  isStreaming: state.isStreaming,
  // ... other properties
}), [
  state.streamingMessage,
  state.isStreaming,
  state.error,
  // ... dependencies
])

Memory Management

  • Abort Controllers: Properly cleaned up to prevent memory leaks
  • Session References: useRef for persistent session management
  • Effect Cleanup: Timers and subscriptions are properly cleared

Callback Optimization

All callbacks are memoized using useCallback to prevent recreation:

const sendMessage = useCallback(async (text: string) => {
  // Implementation
}, [state.isStreaming, currentUser, assistantConfig, messages, selectedItems])

Best Practices

  1. Always check streaming state: Prevent multiple concurrent streams
  2. Handle abort signals: Respect user cancellation
  3. Preserve thinking content: Don’t lose AI reasoning on interruption
  4. Coordinate with Firestore: Wait for message persistence
  5. Error boundaries: Wrap streaming components in error boundaries
  6. Loading states: Provide clear feedback during streaming
  7. Session consistency: Maintain session/trace ID continuity

Troubleshooting

Common Issues

Streaming doesn’t start

  • Check if isStreaming is already true
  • Verify user authentication and assistant configuration
  • Ensure network connectivity

Messages not persisting

  • Check Firebase rules and authentication
  • Verify assistantId is valid
  • Monitor network requests in dev tools

Artifacts not appearing

  • Ensure ArtifactContext is available
  • Check if β€˜artifacts’ tool is enabled
  • Verify artifact data format

State inconsistencies

  • Check message state coordination logic
  • Verify useEffect dependencies
  • Monitor Redux/context state changes

Debug Helpers

// Log streaming state
console.log('Streaming state:', {
  isStreaming,
  messageContent: streamingMessage?.content?.length,
  thinkingContent: effectiveThinkingContent?.length,
  error,
  sessionId,
  traceId
})

// Monitor abort controller
console.log('Abort controller:', abortControllerRef.current?.signal.aborted)

// Track Firebase sync
console.log('Messages loaded:', messagesLoaded, 'Count:', messages.length)

Architecture

StreamingProvider
β”œβ”€β”€ State Management (useReducer)
β”œβ”€β”€ Session Management (SessionManager)
β”œβ”€β”€ Abort Control (AbortController)
β”œβ”€β”€ Firebase Integration (EnhancedFirebaseService)
β”œβ”€β”€ Artifact Integration (ArtifactContext)
└── Error Handling (Toast notifications)

useStreaming Hook
β”œβ”€β”€ Message sending
β”œβ”€β”€ Tool confirmation
β”œβ”€β”€ Stream control
β”œβ”€β”€ Error management
└── State access

The StreamingContext serves as the central hub for all real-time communication, coordinating between multiple services and contexts to provide a seamless streaming experience while maintaining data consistency and error resilience.