Skip to Content
GuidesStreaming Examples

Streaming Examples

Real-time responses using Server-Sent Events (SSE).

Basic Streaming

Enable streaming for real-time token delivery:

import { wrapFetchWithPayment } from 'x402-fetch'; import { privateKeyToAccount } from 'viem/accounts'; const account = privateKeyToAccount(process.env.PRIVATE_KEY as `0x${string}`); const fetchWithPayment = wrapFetchWithPayment(fetch, account); async function basicStreaming() { const response = await fetchWithPayment('https://api.x-router.ai/v1/chat/completions', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ messages: [ { role: 'user', content: 'Tell me a short story about AI.' } ], model: 'anthropic/claude-3.5-sonnet', max_tokens: 200, stream: true // Enable streaming }) }); // Parse SSE stream const reader = response.body?.getReader(); if (!reader) throw new Error('No response body'); const decoder = new TextDecoder(); let fullContent = ''; while (true) { const { done, value } = await reader.read(); if (done) break; const chunk = decoder.decode(value); const lines = chunk.split('\n'); for (const line of lines) { if (line.startsWith('data: ')) { const data = line.slice(6); if (data === '[DONE]') { console.log('\n[Stream complete]'); break; } try { const json = JSON.parse(data); const content = json.choices?.[0]?.delta?.content || ''; if (content) { process.stdout.write(content); // Print in real-time fullContent += content; } } catch (e) { // Skip invalid JSON } } } } return fullContent; } basicStreaming();

Streaming with Callback

Process each token with a callback function:

async function streamWithCallback( messages: any[], onToken: (token: string) => void, onComplete: (fullText: string) => void ) { const response = await fetchWithPayment('https://api.x-router.ai/v1/chat/completions', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ messages, model: 'anthropic/claude-3.5-sonnet', max_tokens: 300, stream: true }) }); const reader = response.body?.getReader(); if (!reader) throw new Error('No response body'); const decoder = new TextDecoder(); let fullContent = ''; while (true) { const { done, value } = await reader.read(); if (done) break; const chunk = decoder.decode(value); const lines = chunk.split('\n'); for (const line of lines) { if (line.startsWith('data: ')) { const data = line.slice(6); if (data === '[DONE]') { onComplete(fullContent); return fullContent; } try { const json = JSON.parse(data); const content = json.choices?.[0]?.delta?.content || ''; if (content) { onToken(content); fullContent += content; } } catch (e) {} } } } } // Usage await streamWithCallback( [{ role: 'user', content: 'Explain recursion' }], (token) => process.stdout.write(token), (full) => console.log(`\n\nComplete! Total length: ${full.length}`) );

React Streaming Component

Use streaming in a React application:

'use client'; import { useState } from 'react'; export default function StreamingChat() { const [messages, setMessages] = useState<{ role: string; content: string }[]>([]); const [input, setInput] = useState(''); const [streaming, setStreaming] = useState(false); async function sendMessage() { if (!input.trim()) return; const userMessage = { role: 'user', content: input }; setMessages(prev => [...prev, userMessage]); setInput(''); setStreaming(true); try { const response = await fetch('/api/chat', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ messages: [...messages, userMessage], stream: true }) }); const reader = response.body?.getReader(); if (!reader) throw new Error('No response body'); const decoder = new TextDecoder(); let assistantMessage = { role: 'assistant', content: '' }; setMessages(prev => [...prev, assistantMessage]); while (true) { const { done, value } = await reader.read(); if (done) break; const chunk = decoder.decode(value); const lines = chunk.split('\n'); for (const line of lines) { if (line.startsWith('data: ')) { const data = line.slice(6); if (data === '[DONE]') break; try { const json = JSON.parse(data); const content = json.choices?.[0]?.delta?.content || ''; if (content) { assistantMessage.content += content; setMessages(prev => { const newMessages = [...prev]; newMessages[newMessages.length - 1] = { ...assistantMessage }; return newMessages; }); } } catch (e) {} } } } } catch (error) { console.error('Streaming error:', error); } finally { setStreaming(false); } } return ( <div className="chat-container"> <div className="messages"> {messages.map((msg, i) => ( <div key={i} className={`message ${msg.role}`}> <strong>{msg.role}:</strong> {msg.content} </div> ))} </div> <div className="input-area"> <input value={input} onChange={(e) => setInput(e.target.value)} onKeyPress={(e) => e.key === 'Enter' && sendMessage()} disabled={streaming} placeholder="Type a message..." /> <button onClick={sendMessage} disabled={streaming}> {streaming ? 'Sending...' : 'Send'} </button> </div> </div> ); }

Server-Side Streaming (Next.js API Route)

Handle streaming in a Next.js API route:

// app/api/chat/route.ts import { wrapFetchWithPayment } from 'x402-fetch'; import { privateKeyToAccount } from 'viem/accounts'; import { NextRequest } from 'next/server'; const account = privateKeyToAccount(process.env.PRIVATE_KEY as `0x${string}`); const fetchWithPayment = wrapFetchWithPayment(fetch, account); export async function POST(request: NextRequest) { const { messages, stream = false } = await request.json(); const response = await fetchWithPayment('https://api.x-router.ai/v1/chat/completions', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ messages, model: 'anthropic/claude-3.5-sonnet', max_tokens: 500, stream }) }); if (stream) { // Pass through the streaming response return new Response(response.body, { headers: { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', }, }); } else { // Return JSON response const data = await response.json(); return Response.json(data); } }

Streaming with Progress Indicator

Show progress while streaming:

async function streamWithProgress(messages: any[]) { console.log('Starting stream...'); const response = await fetchWithPayment('https://api.x-router.ai/v1/chat/completions', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ messages, model: 'anthropic/claude-3.5-sonnet', max_tokens: 500, stream: true }) }); const reader = response.body?.getReader(); if (!reader) throw new Error('No response body'); const decoder = new TextDecoder(); let fullContent = ''; let tokenCount = 0; process.stdout.write('\n'); while (true) { const { done, value } = await reader.read(); if (done) break; const chunk = decoder.decode(value); const lines = chunk.split('\n'); for (const line of lines) { if (line.startsWith('data: ')) { const data = line.slice(6); if (data === '[DONE]') { console.log(`\n\nComplete! Received ${tokenCount} tokens`); return fullContent; } try { const json = JSON.parse(data); const content = json.choices?.[0]?.delta?.content || ''; if (content) { process.stdout.write(content); fullContent += content; tokenCount++; // Show progress every 10 tokens if (tokenCount % 10 === 0) { process.stdout.write(` [${tokenCount}]`); } } } catch (e) {} } } } return fullContent; }

Handling Stream Errors

Robust error handling for streams:

async function safeStreaming(messages: any[]) { let reader: ReadableStreamDefaultReader | null = null; try { const response = await fetchWithPayment('https://api.x-router.ai/v1/chat/completions', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ messages, model: 'anthropic/claude-3.5-sonnet', max_tokens: 300, stream: true }) }); if (!response.ok) { throw new Error(`HTTP ${response.status}: ${await response.text()}`); } reader = response.body?.getReader(); if (!reader) throw new Error('No response body'); const decoder = new TextDecoder(); let fullContent = ''; let timeout: NodeJS.Timeout; // Set stream timeout const streamTimeout = new Promise((_, reject) => { timeout = setTimeout(() => reject(new Error('Stream timeout')), 30000); }); const readStream = async () => { while (true) { const { done, value } = await reader!.read(); if (done) break; const chunk = decoder.decode(value); const lines = chunk.split('\n'); for (const line of lines) { if (line.startsWith('data: ')) { const data = line.slice(6); if (data === '[DONE]') { clearTimeout(timeout); return fullContent; } try { const json = JSON.parse(data); const content = json.choices?.[0]?.delta?.content || ''; if (content) { fullContent += content; process.stdout.write(content); } } catch (e) {} } } } clearTimeout(timeout); return fullContent; }; return await Promise.race([readStream(), streamTimeout]); } catch (error) { console.error('Stream error:', error.message); if (reader) { await reader.cancel(); } throw error; } }

Key Points

Payment Before Streaming

Payment is required and processed before the stream begins. Once streaming starts, you won’t be charged again.

Stream Format

Streams use Server-Sent Events (SSE) format:

  • Each line starts with data:
  • Each data line contains a JSON object
  • Stream ends with data: [DONE]

Error Handling

Always handle stream errors and cleanup:

  • Cancel readers on error
  • Implement timeouts
  • Handle network interruptions

Best Practices

  • Use streaming for long responses (>100 tokens)
  • Show progress indicators to users
  • Implement proper cleanup
  • Test timeout handling
Last updated on