Streaming Examples
Real-time responses using Server-Sent Events (SSE).
Basic Streaming
Enable streaming for real-time token delivery:
import { wrapFetchWithPayment } from 'x402-fetch';
import { privateKeyToAccount } from 'viem/accounts';
const account = privateKeyToAccount(process.env.PRIVATE_KEY as `0x${string}`);
const fetchWithPayment = wrapFetchWithPayment(fetch, account);
async function basicStreaming() {
const response = await fetchWithPayment('https://api.x-router.ai/v1/chat/completions', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
messages: [
{ role: 'user', content: 'Tell me a short story about AI.' }
],
model: 'anthropic/claude-3.5-sonnet',
max_tokens: 200,
stream: true // Enable streaming
})
});
// Parse SSE stream
const reader = response.body?.getReader();
if (!reader) throw new Error('No response body');
const decoder = new TextDecoder();
let fullContent = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
console.log('\n[Stream complete]');
break;
}
try {
const json = JSON.parse(data);
const content = json.choices?.[0]?.delta?.content || '';
if (content) {
process.stdout.write(content); // Print in real-time
fullContent += content;
}
} catch (e) {
// Skip invalid JSON
}
}
}
}
return fullContent;
}
basicStreaming();Streaming with Callback
Process each token with a callback function:
async function streamWithCallback(
messages: any[],
onToken: (token: string) => void,
onComplete: (fullText: string) => void
) {
const response = await fetchWithPayment('https://api.x-router.ai/v1/chat/completions', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
messages,
model: 'anthropic/claude-3.5-sonnet',
max_tokens: 300,
stream: true
})
});
const reader = response.body?.getReader();
if (!reader) throw new Error('No response body');
const decoder = new TextDecoder();
let fullContent = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
onComplete(fullContent);
return fullContent;
}
try {
const json = JSON.parse(data);
const content = json.choices?.[0]?.delta?.content || '';
if (content) {
onToken(content);
fullContent += content;
}
} catch (e) {}
}
}
}
}
// Usage
await streamWithCallback(
[{ role: 'user', content: 'Explain recursion' }],
(token) => process.stdout.write(token),
(full) => console.log(`\n\nComplete! Total length: ${full.length}`)
);React Streaming Component
Use streaming in a React application:
'use client';
import { useState } from 'react';
export default function StreamingChat() {
const [messages, setMessages] = useState<{ role: string; content: string }[]>([]);
const [input, setInput] = useState('');
const [streaming, setStreaming] = useState(false);
async function sendMessage() {
if (!input.trim()) return;
const userMessage = { role: 'user', content: input };
setMessages(prev => [...prev, userMessage]);
setInput('');
setStreaming(true);
try {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
messages: [...messages, userMessage],
stream: true
})
});
const reader = response.body?.getReader();
if (!reader) throw new Error('No response body');
const decoder = new TextDecoder();
let assistantMessage = { role: 'assistant', content: '' };
setMessages(prev => [...prev, assistantMessage]);
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') break;
try {
const json = JSON.parse(data);
const content = json.choices?.[0]?.delta?.content || '';
if (content) {
assistantMessage.content += content;
setMessages(prev => {
const newMessages = [...prev];
newMessages[newMessages.length - 1] = { ...assistantMessage };
return newMessages;
});
}
} catch (e) {}
}
}
}
} catch (error) {
console.error('Streaming error:', error);
} finally {
setStreaming(false);
}
}
return (
<div className="chat-container">
<div className="messages">
{messages.map((msg, i) => (
<div key={i} className={`message ${msg.role}`}>
<strong>{msg.role}:</strong> {msg.content}
</div>
))}
</div>
<div className="input-area">
<input
value={input}
onChange={(e) => setInput(e.target.value)}
onKeyPress={(e) => e.key === 'Enter' && sendMessage()}
disabled={streaming}
placeholder="Type a message..."
/>
<button onClick={sendMessage} disabled={streaming}>
{streaming ? 'Sending...' : 'Send'}
</button>
</div>
</div>
);
}Server-Side Streaming (Next.js API Route)
Handle streaming in a Next.js API route:
// app/api/chat/route.ts
import { wrapFetchWithPayment } from 'x402-fetch';
import { privateKeyToAccount } from 'viem/accounts';
import { NextRequest } from 'next/server';
const account = privateKeyToAccount(process.env.PRIVATE_KEY as `0x${string}`);
const fetchWithPayment = wrapFetchWithPayment(fetch, account);
export async function POST(request: NextRequest) {
const { messages, stream = false } = await request.json();
const response = await fetchWithPayment('https://api.x-router.ai/v1/chat/completions', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
messages,
model: 'anthropic/claude-3.5-sonnet',
max_tokens: 500,
stream
})
});
if (stream) {
// Pass through the streaming response
return new Response(response.body, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
} else {
// Return JSON response
const data = await response.json();
return Response.json(data);
}
}Streaming with Progress Indicator
Show progress while streaming:
async function streamWithProgress(messages: any[]) {
console.log('Starting stream...');
const response = await fetchWithPayment('https://api.x-router.ai/v1/chat/completions', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
messages,
model: 'anthropic/claude-3.5-sonnet',
max_tokens: 500,
stream: true
})
});
const reader = response.body?.getReader();
if (!reader) throw new Error('No response body');
const decoder = new TextDecoder();
let fullContent = '';
let tokenCount = 0;
process.stdout.write('\n');
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
console.log(`\n\nComplete! Received ${tokenCount} tokens`);
return fullContent;
}
try {
const json = JSON.parse(data);
const content = json.choices?.[0]?.delta?.content || '';
if (content) {
process.stdout.write(content);
fullContent += content;
tokenCount++;
// Show progress every 10 tokens
if (tokenCount % 10 === 0) {
process.stdout.write(` [${tokenCount}]`);
}
}
} catch (e) {}
}
}
}
return fullContent;
}Handling Stream Errors
Robust error handling for streams:
async function safeStreaming(messages: any[]) {
let reader: ReadableStreamDefaultReader | null = null;
try {
const response = await fetchWithPayment('https://api.x-router.ai/v1/chat/completions', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
messages,
model: 'anthropic/claude-3.5-sonnet',
max_tokens: 300,
stream: true
})
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${await response.text()}`);
}
reader = response.body?.getReader();
if (!reader) throw new Error('No response body');
const decoder = new TextDecoder();
let fullContent = '';
let timeout: NodeJS.Timeout;
// Set stream timeout
const streamTimeout = new Promise((_, reject) => {
timeout = setTimeout(() => reject(new Error('Stream timeout')), 30000);
});
const readStream = async () => {
while (true) {
const { done, value } = await reader!.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
clearTimeout(timeout);
return fullContent;
}
try {
const json = JSON.parse(data);
const content = json.choices?.[0]?.delta?.content || '';
if (content) {
fullContent += content;
process.stdout.write(content);
}
} catch (e) {}
}
}
}
clearTimeout(timeout);
return fullContent;
};
return await Promise.race([readStream(), streamTimeout]);
} catch (error) {
console.error('Stream error:', error.message);
if (reader) {
await reader.cancel();
}
throw error;
}
}Key Points
Payment Before Streaming
Payment is required and processed before the stream begins. Once streaming starts, you won’t be charged again.
Stream Format
Streams use Server-Sent Events (SSE) format:
- Each line starts with
data: - Each data line contains a JSON object
- Stream ends with
data: [DONE]
Error Handling
Always handle stream errors and cleanup:
- Cancel readers on error
- Implement timeouts
- Handle network interruptions
Best Practices
- Use streaming for long responses (>100 tokens)
- Show progress indicators to users
- Implement proper cleanup
- Test timeout handling
Last updated on