Streaming Retrieval API

Using real-time streaming for RAG and agent interactions

R2R provides powerful streaming capabilities for its retrieval services, including RAG responses and agent interactions. These streaming features allow for real-time updates as information is retrieved and processed, enhancing user experience for applications that benefit from immediate feedback.

Streaming Events

When using streaming in R2R, various event types are emitted during the retrieval and generation process:

Event TypeDescription
SearchResultsEventContains initial search results from documents
MessageEventStreams partial tokens of the response as they’re generated
CitationEventIndicates when a citation is added to the response
ThinkingEventContains the model’s step-by-step reasoning (for agents)
ToolCallEventIndicates when the model calls a tool (for agents)
ToolResultEventContains results from tool calls (for agents)
FinalAnswerEventContains the complete generated answer with citations

Streaming RAG

Basic Streaming RAG

To use streaming with basic RAG functionality:

from r2r import (
    CitationEvent,
    FinalAnswerEvent,
    MessageEvent,
    SearchResultsEvent,
    R2RClient,
)

client = R2RClient("http://localhost:7272")

result_stream = client.retrieval.rag(
    query="What is DeepSeek R1?",
    search_settings={"limit": 25},
    rag_generation_config={"stream": True},
)

for event in result_stream:
    if isinstance(event, SearchResultsEvent):
        print("Search results:", event.data)
    elif isinstance(event, MessageEvent):
        print("Partial message:", event.data.delta)
    elif isinstance(event, CitationEvent):
        print("New citation detected:", event.data)
    elif isinstance(event, FinalAnswerEvent):
        print("Final answer:", event.data.generated_answer)
        print("Citations:", event.data.citations)
curl -X POST https://api.intelligence.io.solutions/api/r2r/v3/retrieval/rag \
  -H "Content-Type: application/json" \
  -H "Accept: text/event-stream" \
  -d '{
    "query": "What is DeepSeek R1?",
    "search_settings": {"limit": 25},
    "rag_generation_config": {"stream": true}
  }' \
  -H "Authorization: Bearer $IOINTELLIGENCE_API_KEY"

Streaming RAG with Web Search

To include web search in your streaming RAG:

result_stream = client.retrieval.rag(
    query="What are the latest developments with DeepSeek R1?", 
    rag_generation_config={"stream": True},
    include_web_search=True
)

for event in result_stream:
    # Process events as shown in previous example
    pass

Streaming Agent

R2R provides a powerful streaming agent mode that supports complex interactions with both document-based knowledge and web resources.

Basic Streaming Agent

from r2r import (
    ThinkingEvent,
    ToolCallEvent,
    ToolResultEvent,
    CitationEvent,
    MessageEvent,
    FinalAnswerEvent,
)

agent_stream = client.retrieval.agent(
    query="What does DeepSeek R1 imply for the future of AI?",
    generation_config={"stream": True},
    mode="research"
)

for event in agent_stream:
    if isinstance(event, ThinkingEvent):
        print(f"🧠 Thinking: {event.data.delta.content[0].payload.value}")
    elif isinstance(event, ToolCallEvent):
        print(f"🔧 Tool call: {event.data.name}({event.data.arguments})")
    elif isinstance(event, ToolResultEvent):
        print(f"📊 Tool result: {event.data.content[:60]}...")
    elif isinstance(event, CitationEvent):
        print(f"📑 Citation: {event.data}")
    elif isinstance(event, MessageEvent):
        print(f"💬 Message: {event.data.delta.content[0].payload.value}")
    elif isinstance(event, FinalAnswerEvent):
        print(f"✅ Final answer: {event.data.generated_answer[:100]}...")
        print(f"   Citations: {len(event.data.citations)} sources referenced")

Advanced Research Agent with Tools

R2R’s agent mode can leverage multiple tools to perform in-depth research:

agent_stream = client.retrieval.agent(
    query="Analyze DeepSeek R1's performance compared to other models",
    generation_config={
        "model": "anthropic/claude-3-7-sonnet-20250219",
        "extended_thinking": True,
        "thinking_budget": 4096,
        "temperature": 1,
        "max_tokens_to_sample": 16000,
        "stream": True
    },
    mode="research",
    rag_tools=["web_search", "web_scrape"]
)

# Process events as shown in previous example

Streaming Citations

R2R’s streaming citations provide detailed attribution information that links specific parts of the response to source documents:

{
  "event": "citation",
  "data": {
    "id": "abc123",
    "object": "citation",
    "raw_index": 1,
    "index": 1,
    "start_index": 305,
    "end_index": 308,
    "source_type": "chunk",
    "source_id": "e760bb76-1c6e-52eb-910d-0ce5b567011b",
    "document_id": "e43864f5-a36f-548e-aacd-6f8d48b30c7f",
    "source_title": "DeepSeek_R1.pdf"
  }
}

Each citation includes:

  • id: Unique identifier for the citation
  • index: The display index (e.g., [1], [2])
  • start_index and end_index: Character positions in the response
  • source_type: The type of source (chunk, graph, web)
  • source_id: ID of the specific chunk/node
  • document_id: ID of the parent document
  • source_title: Title of the source document

Implementing Streaming UI

To create a responsive UI with streaming RAG, consider these patterns:

Frontend Implementation

import { useState, useEffect } from 'react';

function RAGComponent() {
  const [messages, setMessages] = useState([]);
  const [currentMessage, setCurrentMessage] = useState('');
  const [citations, setCitations] = useState([]);
  const [isLoading, setIsLoading] = useState(false);

  const handleSubmit = async (query) => {
    setIsLoading(true);
    setCurrentMessage('');
    setCitations([]);
    
    try {
      const response = await fetch('/api/rag', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          query,
          stream: true
        })
      });
      
      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        
        const chunk = decoder.decode(value);
        const events = chunk.split('\n\n').filter(Boolean);
        
        for (const eventText of events) {
          if (!eventText.startsWith('data: ')) continue;
          
          const eventData = JSON.parse(eventText.slice(6));
          
          switch (eventData.event) {
            case 'message':
              setCurrentMessage(prev => prev + eventData.data.delta.content[0].payload.value);
              break;
            case 'citation':
              setCitations(prev => [...prev, eventData.data]);
              break;
            case 'final_answer':
              setMessages(prev => [...prev, {
                role: 'assistant',
                content: eventData.data.generated_answer,
                citations: eventData.data.citations
              }]);
              break;
          }
        }
      }
    } catch (error) {
      console.error('Error with streaming RAG:', error);
    } finally {
      setIsLoading(false);
    }
  };

  return (
    <div className="rag-container">
      {/* UI implementation */}
      {isLoading && <div className="typing-indicator">{currentMessage}</div>}
      {/* Display messages and citations */}
    </div>
  );
}