AINative provides native, first-class support for token-by-token streaming from LLMs.
Default method. Works over HTTP with automatic reconnection.
const config = {
apiUrl: 'http://localhost:3001',
streamMethod: 'SSE' as const,
};Pros:
- Simple HTTP connection
- Automatic reconnection
- Wide browser support
Cons:
- Unidirectional (server to client)
Bidirectional streaming for advanced use cases.
const config = {
apiUrl: 'http://localhost:3001',
streamMethod: 'WS' as const,
};Pros:
- Bidirectional communication
- Lower latency
- Better for long-running connections
Cons:
- More complex setup
- Requires WebSocket support
Automatically selects the best method.
const config = {
streamMethod: 'AUTO' as const,
};<AIAppComponent config={config}>
{(state, app) => (
<div>
{state.streaming && <div>Streaming...</div>}
<button onClick={() => app.sendMessage('Hello', { stream: true })}>
Send
</button>
</div>
)}
</AIAppComponent>import { StreamingEngine, EventBus } from '@hari7261/ainative-client';
const eventBus = new EventBus();
const streaming = new StreamingEngine(eventBus);
// Listen for tokens
eventBus.on('stream-data', ({ payload }) => {
console.log('Token:', payload.token);
});
// Start streaming
await streaming.start({
url: 'http://localhost:3001/ai/stream',
method: 'SSE',
}, {
action: 'chat',
params: { message: 'Hello' },
});import { StreamManager } from '@hari7261/ainative-server-node';
app.post('/ai/stream', async (req, res) => {
const stream = new StreamManager(res);
try {
const generator = provider.generate({
prompt: req.body.message,
stream: true,
});
for await (const token of generator) {
stream.sendToken(token);
}
stream.sendDone();
} catch (error) {
stream.sendError(error.message);
stream.sendDone();
}
});from ainative.streaming import stream_tokens
@app.post("/ai/stream")
async def stream_handler(request: ActionRequest):
generator = provider.generate(
prompt=request.params["message"],
stream=True,
)
return StreamingResponse(
stream_tokens(generator),
media_type="text/event-stream",
)SSE messages follow this format:
data: {"type":"token","data":"Hello"}
data: {"type":"token","data":" world"}
data: {"type":"metadata","data":{"model":"gpt-4"}}
data: {"type":"done"}
data: [DONE]
Handle streaming errors:
eventBus.on('stream-error', ({ payload }) => {
console.error('Stream error:', payload.error);
});
eventBus.on('stream-end', ({ payload }) => {
if (payload.reason === 'error') {
// Handle error
}
});Stop streaming at any time:
app.stopStreaming();The streaming engine handles backpressure automatically:
- Buffers tokens when UI can't keep up
- Throttles updates for performance
- Batches rapid updates
- Use batching for rapid tokens:
const reconciler = new AIReconciler(stateManager, eventBus);
reconciler.applyPatch(patch, { batching: true, batchDelay: 16 });- Limit re-renders:
Use React.memo for message components:
const AIMessage = React.memo(({ message }) => {
return <div>{message.content}</div>;
});- Virtual scrolling for long conversations:
import { useVirtualizer } from '@tanstack/react-virtual';
const virtualizer = useVirtualizer({
count: messages.length,
getScrollElement: () => parentRef.current,
estimateSize: () => 50,
});import { test } from 'vitest';
test('streaming works', async () => {
const tokens: string[] = [];
eventBus.on('stream-data', ({ payload }) => {
tokens.push(payload.token);
});
await streaming.start(config, data);
expect(tokens.length).toBeGreaterThan(0);
});