Streaming Context System
π Real-time streaming communication system for AI assistants with Firebase persistence and advanced session management.
Overview
The StreamingContext provides a comprehensive solution for real-time streaming conversations with AI assistants. It handles message streaming, Firebase persistence, session management, artifact integration, and tool confirmations while maintaining state consistency across the application.
Core Features
- Real-time Streaming: Bidirectional communication with AI assistants
- Firebase Integration: Automatic message persistence and synchronization
- Session Management: Trace IDs, session IDs, and abort control
- Artifact Support: Real-time artifact creation during streaming
- Tool Confirmation: Enhanced workflow for tool usage approval
- State Synchronization: Coordinated state between local streaming and Firestore
- Auto Introduction: Intelligent assistant introduction on empty conversations
Usage
Basic Setup
import { StreamingProvider, useStreaming } from '@/contexts/StreamingContext'
// Configure assistant
const assistantConfig = {
assistantId: 'assistant-123',
name: 'AI Assistant',
avatar: '/avatar.png',
tools: ['artifacts', 'search', 'documents']
}
// Wrap your chat interface
<StreamingProvider
assistantConfig={assistantConfig}
currentUser={user}
>
<ChatInterface />
</StreamingProvider>
Using the Hook
function ChatInterface() {
const {
streamingMessage,
effectiveThinkingContent,
isStreaming,
error,
sessionId,
traceId,
sendMessage,
sendToolConfirmationMessage,
stopStreaming,
clearError
} = useStreaming()
const handleSendMessage = async () => {
await sendMessage("Hello, can you help me?")
}
const handleStopStreaming = () => {
stopStreaming()
}
return (
<div>
{isStreaming && <p>Assistant is typing...</p>}
{streamingMessage && (
<div>{streamingMessage.content}</div>
)}
{effectiveThinkingContent && (
<div className="thinking">{effectiveThinkingContent}</div>
)}
<button onClick={handleSendMessage} disabled={isStreaming}>
Send Message
</button>
<button onClick={handleStopStreaming} disabled={!isStreaming}>
Stop
</button>
</div>
)
}
API Reference
StreamingProvider Props
| Prop | Type | Description | Required |
|---|---|---|---|
| children | ReactNode | Child components | Yes |
| assistantConfig | StreamingServiceConfig | Assistant configuration | Yes |
| currentUser | User | null | Firebase authenticated user | Yes |
StreamingServiceConfig
interface StreamingServiceConfig {
assistantId: string // Unique assistant identifier
name: string // Display name
avatar?: string // Avatar URL
tools?: string[] // Available tools array
// ... other configuration options
}
StreamingContextType
State Properties
| Property | Type | Description |
|---|---|---|
| streamingMessage | LocalChatMessage | null | Current streaming message |
| effectiveThinkingContent | string | Combined thinking content |
| isStreaming | boolean | Streaming status |
| error | string | null | Current error message |
| sessionId | string | Current session identifier |
| traceId | string | null | Current trace identifier |
Core Actions
| Method | Signature | Description |
|---|---|---|
| sendMessage | (text: string) => Promise |
Send user message and start streaming |
| sendToolConfirmationMessage | (userMessage, tools, traceId, signal) => Promise |
Send tool confirmation response |
| stopStreaming | () => void | Abort current streaming operation |
| clearError | () => void | Clear current error state |
Message Flow
Standard Message Flow
// 1. User sends message
await sendMessage("What's the weather like?")
// 2. Context creates trace ID and user message
const traceId = sessionManager.createNewTraceId()
const messageData = EnhancedFirebaseService.createUserMessagePayload(text, user, traceId)
// 3. Save to Firestore (parallel with streaming)
const savePromise = EnhancedFirebaseService.saveMessage(assistantId, messageData)
// 4. Start streaming with abort control
const controller = new AbortController()
await StreamingService.sendMessage(text, config, context, callbacks)
// 5. Handle streaming callbacks
onStreamChunk: (chunk) => dispatch({ type: 'UPDATE_STREAMING_CONTENT', chunk })
onArtifactContent: (artifacts) => artifacts.forEach(artifact => addArtifact(artifact))
onStreamEnd: (finalContent) => dispatch({ type: 'END_STREAMING', finalContent })
Tool Confirmation Flow
// 1. User confirms tools
await sendToolConfirmationMessage(
"Use these tools to help me",
confirmedTools,
existingTraceId,
abortSignal
)
// 2. Context reuses existing trace ID
// 3. Streams with tool confirmation payload
// 4. Handles artifacts and responses normally
State Management
Message State Coordination
The context coordinates between local streaming state and Firestore messages:
// Clear streaming message when Firestore updates
useEffect(() => {
if (shouldClearStreamingOnNewMessage({
...state,
firestoreMessages: messages
})) {
dispatch({ type: 'CLEAR_STREAMING_MESSAGE' })
}
}, [messages, state.isStreaming, state.streamingMessage])
Thinking Content Management
Thinking content is preserved during interruptions:
const stopStreaming = useCallback(() => {
if (abortControllerRef.current && state.isStreaming) {
// Preserve thinking content before stopping
dispatch({
type: 'PRESERVE_THINKING_CONTENT',
content: state.streamingThinkingContent
})
abortControllerRef.current.abort()
}
}, [state.isStreaming, state.streamingThinkingContent])
Advanced Features
Session Management
// Session and trace management
const sessionManagerRef = useRef(new SessionManager())
// Get current session info
const sessionId = sessionManager.getSessionId()
const traceId = sessionManager.getTraceId()
// Create new trace for each message
const newTraceId = sessionManager.createNewTraceId()
Artifact Integration
Real-time artifact handling during streaming:
const { getArtifactsForApi, addArtifact } = useArtifacts()
// Include existing artifacts in API calls
const artifactContent = getArtifactsForApi(assistantConfig.assistantId, assistantConfig.tools)
// Handle new artifacts from streaming
onArtifactContent: (artifacts: ArtifactData[]) => {
console.log('Received artifacts during streaming:', artifacts)
artifacts.forEach(artifact => addArtifact(artifact))
}
Auto Introduction
Intelligent assistant introduction for new conversations:
useEffect(() => {
if (
messagesLoaded && // Wait for Firestore to load
messages.length === 0 && // No existing messages
currentUser?.email && // User is authenticated
!state.isStreaming && // Not currently streaming
assistantConfig.assistantId // Assistant is configured
) {
const timer = setTimeout(() => {
sendMessage("Hello! Please introduce yourself and tell me what you can help with.")
}, 500)
return () => clearTimeout(timer)
}
}, [messagesLoaded, messages.length, currentUser?.email, state.isStreaming, assistantConfig.assistantId, sendMessage])
Error Handling
Comprehensive Error Management
// Stream errors
onStreamError: (err) => {
dispatch({
type: 'STREAMING_ERROR',
error: err instanceof Error ? err.message : 'Streaming error'
})
}
// Network errors
catch (error) {
console.error('Error sending message:', error)
toast({
title: "Error",
description: "Failed to send message",
variant: "destructive"
})
}
// User cancellation
stopStreaming() // Sets error: 'Streaming stopped by user'
Abort Control
Robust abort handling for streaming operations:
// Clean up existing controller
if (abortControllerRef.current) {
abortControllerRef.current.abort()
}
// Create new controller for each operation
abortControllerRef.current = new AbortController()
const signal = abortControllerRef.current.signal
// Pass signal to streaming service
await StreamingService.sendMessage(text, config, { signal }, callbacks)
Integration Examples
With Message State Context
function StreamingChatInterface() {
const { messages, selectedItems, messagesLoaded } = useMessageState()
const {
streamingMessage,
isStreaming,
sendMessage
} = useStreaming()
// Combine Firestore messages with streaming message
const allMessages = useMemo(() => {
return streamingMessage
? [...messages, streamingMessage]
: messages
}, [messages, streamingMessage])
return (
<div>
{allMessages.map(message => (
<MessageComponent key={message.id || 'streaming'} message={message} />
))}
{isStreaming && <TypingIndicator />}
</div>
)
}
With Tool Confirmation
function ToolConfirmationFlow() {
const { sendToolConfirmationMessage } = useStreaming()
const handleConfirmTools = async (
originalMessage: string,
confirmedTools: FirstImpressionTool[],
traceId: string
) => {
const controller = new AbortController()
await sendToolConfirmationMessage(
originalMessage,
confirmedTools,
traceId,
controller.signal
)
}
return <ToolConfirmationDialog onConfirm={handleConfirmTools} />
}
With Firebase Service
// The context automatically integrates with Firebase
const messageData = EnhancedFirebaseService.createUserMessagePayload(text, currentUser, traceId)
const savePromise = EnhancedFirebaseService.saveMessage(assistantConfig.assistantId, messageData)
// Wait for save and update usage stats
await savePromise
await EnhancedFirebaseService.updateUsageStats(assistantConfig.assistantId)
Performance Considerations
Efficient State Updates
// Memoized context value to prevent unnecessary re-renders
const contextValue = useMemo((): StreamingContextType => ({
streamingMessage: state.streamingMessage,
effectiveThinkingContent: getEffectiveThinkingContent(state),
isStreaming: state.isStreaming,
// ... other properties
}), [
state.streamingMessage,
state.isStreaming,
state.error,
// ... dependencies
])
Memory Management
- Abort Controllers: Properly cleaned up to prevent memory leaks
- Session References: useRef for persistent session management
- Effect Cleanup: Timers and subscriptions are properly cleared
Callback Optimization
All callbacks are memoized using useCallback to prevent recreation:
const sendMessage = useCallback(async (text: string) => {
// Implementation
}, [state.isStreaming, currentUser, assistantConfig, messages, selectedItems])
Best Practices
- Always check streaming state: Prevent multiple concurrent streams
- Handle abort signals: Respect user cancellation
- Preserve thinking content: Donβt lose AI reasoning on interruption
- Coordinate with Firestore: Wait for message persistence
- Error boundaries: Wrap streaming components in error boundaries
- Loading states: Provide clear feedback during streaming
- Session consistency: Maintain session/trace ID continuity
Troubleshooting
Common Issues
Streaming doesnβt start
- Check if
isStreamingis already true - Verify user authentication and assistant configuration
- Ensure network connectivity
Messages not persisting
- Check Firebase rules and authentication
- Verify assistantId is valid
- Monitor network requests in dev tools
Artifacts not appearing
- Ensure ArtifactContext is available
- Check if βartifactsβ tool is enabled
- Verify artifact data format
State inconsistencies
- Check message state coordination logic
- Verify useEffect dependencies
- Monitor Redux/context state changes
Debug Helpers
// Log streaming state
console.log('Streaming state:', {
isStreaming,
messageContent: streamingMessage?.content?.length,
thinkingContent: effectiveThinkingContent?.length,
error,
sessionId,
traceId
})
// Monitor abort controller
console.log('Abort controller:', abortControllerRef.current?.signal.aborted)
// Track Firebase sync
console.log('Messages loaded:', messagesLoaded, 'Count:', messages.length)
Related Components
- MessageStateContext - Message state management
- ArtifactContext - Artifact integration
- EnhancedFirebaseService - Firebase persistence
- StreamingService - Core streaming implementation
- ChatInterface - UI integration
Architecture
StreamingProvider
βββ State Management (useReducer)
βββ Session Management (SessionManager)
βββ Abort Control (AbortController)
βββ Firebase Integration (EnhancedFirebaseService)
βββ Artifact Integration (ArtifactContext)
βββ Error Handling (Toast notifications)
useStreaming Hook
βββ Message sending
βββ Tool confirmation
βββ Stream control
βββ Error management
βββ State access
The StreamingContext serves as the central hub for all real-time communication, coordinating between multiple services and contexts to provide a seamless streaming experience while maintaining data consistency and error resilience.