Using real-time streaming for RAG and agent interactions
R2R provides powerful streaming capabilities for its retrieval services, including RAG responses and agent interactions. These streaming features allow for real-time updates as information is retrieved and processed, enhancing user experience for applications that benefit from immediate feedback.
Streaming Events
When using streaming in R2R, various event types are emitted during the retrieval and generation process:
Event Type | Description |
---|---|
SearchResultsEvent | Contains initial search results from documents |
MessageEvent | Streams partial tokens of the response as they’re generated |
CitationEvent | Indicates when a citation is added to the response |
ThinkingEvent | Contains the model’s step-by-step reasoning (for agents) |
ToolCallEvent | Indicates when the model calls a tool (for agents) |
ToolResultEvent | Contains results from tool calls (for agents) |
FinalAnswerEvent | Contains the complete generated answer with citations |
Streaming RAG
Basic Streaming RAG
To use streaming with basic RAG functionality:
from r2r import (
CitationEvent,
FinalAnswerEvent,
MessageEvent,
SearchResultsEvent,
R2RClient,
)
client = R2RClient("http://localhost:7272")
result_stream = client.retrieval.rag(
query="What is DeepSeek R1?",
search_settings={"limit": 25},
rag_generation_config={"stream": True},
)
for event in result_stream:
if isinstance(event, SearchResultsEvent):
print("Search results:", event.data)
elif isinstance(event, MessageEvent):
print("Partial message:", event.data.delta)
elif isinstance(event, CitationEvent):
print("New citation detected:", event.data)
elif isinstance(event, FinalAnswerEvent):
print("Final answer:", event.data.generated_answer)
print("Citations:", event.data.citations)
curl -X POST https://api.intelligence.io.solutions/api/r2r/v3/retrieval/rag \
-H "Content-Type: application/json" \
-H "Accept: text/event-stream" \
-d '{
"query": "What is DeepSeek R1?",
"search_settings": {"limit": 25},
"rag_generation_config": {"stream": true}
}' \
-H "Authorization: Bearer $IOINTELLIGENCE_API_KEY"
Streaming RAG with Web Search
To include web search in your streaming RAG:
result_stream = client.retrieval.rag(
query="What are the latest developments with DeepSeek R1?",
rag_generation_config={"stream": True},
include_web_search=True
)
for event in result_stream:
# Process events as shown in previous example
pass
Streaming Agent
R2R provides a powerful streaming agent mode that supports complex interactions with both document-based knowledge and web resources.
Basic Streaming Agent
from r2r import (
ThinkingEvent,
ToolCallEvent,
ToolResultEvent,
CitationEvent,
MessageEvent,
FinalAnswerEvent,
)
agent_stream = client.retrieval.agent(
query="What does DeepSeek R1 imply for the future of AI?",
generation_config={"stream": True},
mode="research"
)
for event in agent_stream:
if isinstance(event, ThinkingEvent):
print(f"🧠Thinking: {event.data.delta.content[0].payload.value}")
elif isinstance(event, ToolCallEvent):
print(f"🔧 Tool call: {event.data.name}({event.data.arguments})")
elif isinstance(event, ToolResultEvent):
print(f"📊 Tool result: {event.data.content[:60]}...")
elif isinstance(event, CitationEvent):
print(f"📑 Citation: {event.data}")
elif isinstance(event, MessageEvent):
print(f"💬 Message: {event.data.delta.content[0].payload.value}")
elif isinstance(event, FinalAnswerEvent):
print(f"✅ Final answer: {event.data.generated_answer[:100]}...")
print(f" Citations: {len(event.data.citations)} sources referenced")
Advanced Research Agent with Tools
R2R’s agent mode can leverage multiple tools to perform in-depth research:
agent_stream = client.retrieval.agent(
query="Analyze DeepSeek R1's performance compared to other models",
generation_config={
"model": "anthropic/claude-3-7-sonnet-20250219",
"extended_thinking": True,
"thinking_budget": 4096,
"temperature": 1,
"max_tokens_to_sample": 16000,
"stream": True
},
mode="research",
rag_tools=["web_search", "web_scrape"]
)
# Process events as shown in previous example
Streaming Citations
R2R’s streaming citations provide detailed attribution information that links specific parts of the response to source documents:
{
"event": "citation",
"data": {
"id": "abc123",
"object": "citation",
"raw_index": 1,
"index": 1,
"start_index": 305,
"end_index": 308,
"source_type": "chunk",
"source_id": "e760bb76-1c6e-52eb-910d-0ce5b567011b",
"document_id": "e43864f5-a36f-548e-aacd-6f8d48b30c7f",
"source_title": "DeepSeek_R1.pdf"
}
}
Each citation includes:
id
: Unique identifier for the citationindex
: The display index (e.g., [1], [2])start_index
and end_index: Character positions in the responsesource_type
: The type of source (chunk, graph, web)source_id
: ID of the specific chunk/nodedocument_id
: ID of the parent documentsource_title
: Title of the source document
Implementing Streaming UI
To create a responsive UI with streaming RAG, consider these patterns:
Frontend Implementation
import { useState, useEffect } from 'react';
function RAGComponent() {
const [messages, setMessages] = useState([]);
const [currentMessage, setCurrentMessage] = useState('');
const [citations, setCitations] = useState([]);
const [isLoading, setIsLoading] = useState(false);
const handleSubmit = async (query) => {
setIsLoading(true);
setCurrentMessage('');
setCitations([]);
try {
const response = await fetch('/api/rag', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
query,
stream: true
})
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const events = chunk.split('\n\n').filter(Boolean);
for (const eventText of events) {
if (!eventText.startsWith('data: ')) continue;
const eventData = JSON.parse(eventText.slice(6));
switch (eventData.event) {
case 'message':
setCurrentMessage(prev => prev + eventData.data.delta.content[0].payload.value);
break;
case 'citation':
setCitations(prev => [...prev, eventData.data]);
break;
case 'final_answer':
setMessages(prev => [...prev, {
role: 'assistant',
content: eventData.data.generated_answer,
citations: eventData.data.citations
}]);
break;
}
}
}
} catch (error) {
console.error('Error with streaming RAG:', error);
} finally {
setIsLoading(false);
}
};
return (
<div className="rag-container">
{/* UI implementation */}
{isLoading && <div className="typing-indicator">{currentMessage}</div>}
{/* Display messages and citations */}
</div>
);
}