FastAPI SSE Events
Add real-time "refresh-less" updates to your FastAPI REST API using Server-Sent Events (SSE) and Redis Pub/Sub. Perfect for collaborative tools and dashboards.
Overview
FastAPI SSE Events enables near real-time notifications for REST-based applications without the complexity of WebSockets. Clients subscribe to Server-Sent Event streams for specific topics, receive lightweight notifications when data changes, then refresh data via existing REST endpoints.
Perfect For
- Collaborative CRM systems where sales reps see updates from teammates
- Project management tools with real-time task updates
- Live dashboards showing metrics and notifications
- Multi-user editing interfaces requiring change notifications
Why SSE?
The Problem
Traditional REST APIs require manual refresh to see updates from other users. This creates poor collaboration experiences and inefficient workflows. Users constantly hit refresh or polling adds unnecessary server load.
Why Not WebSockets?
WebSockets are powerful but come with complexity:
- Require separate infrastructure and connection management
- More difficult to debug and monitor
- Overkill for one-way notifications
- Don't work well with existing REST patterns
- Harder to secure with standard auth mechanisms
The SSE Solution
HTTP Based
Works with existing infrastructure - no special proxying or routing needed.
Native Browser Support
Built-in EventSource API in all modern browsers with automatic reconnection.
One-Way Communication
Perfect for notifications - server pushes updates, client remains stateless.
REST as Source of Truth
SSE notifies, REST endpoints provide data - clean separation of concerns.
Installation
pip install fastapi-sse-events
Requirements
- Python 3.10+
- FastAPI 0.104.0+
- Redis 5.0+ (running locally or remotely)
Redis Setup
You'll need a Redis server. Quick options:
docker run -d -p 6379:6379 redis:alpine
redis-server
Quick Start
Recommended Flow
Use the simplified decorator-based API with
SSEApp, @publish_event, and
@subscribe_to_events. This is the current, recommended approach.
Get SSE running in your FastAPI app with just a few lines of code:
1. Create Your FastAPI App
from fastapi import Request
from fastapi_sse_events import SSEApp, publish_event, subscribe_to_events
app = SSEApp(
title="My API",
redis_url="redis://localhost:6379"
)
@app.post("/tasks")
@publish_event(topic="tasks", event="task:created")
async def create_task(request: Request, task: dict):
# ... save task to database ...
return {"id": 1, **task} # auto-published to SSE clients
@app.get("/events")
@subscribe_to_events()
async def events(request: Request):
pass
2. Connect from Browser
// Subscribe to topic from query param
const eventSource = new EventSource('/events?topic=tasks');
eventSource.addEventListener('task:created', async (event) => {
const data = JSON.parse(event.data);
console.log('Task event:', data);
// Notify-then-fetch pattern (recommended)
const response = await fetch(`/tasks/${data.id}`);
const task = await response.json();
renderTask(task);
});
eventSource.addEventListener('open', () => {
console.log('✅ Connected to SSE');
});
That's It!
You now have real-time notifications. Clients connect to
/events?topic=... and receive instant updates when decorated endpoints return.
Simplified API (Recommended)
The simplified decorator-based API makes adding real-time SSE events incredibly easy by eliminating 75%+ of boilerplate code. It provides auto-configuration, automatic event publishing via decorators, and automatic SSE streaming.
Naming Conventions
Current Names (Recommended): @publish_event, @subscribe_to_events
Legacy Aliases (Still Supported): @sse_event, @sse_endpoint
Both naming conventions work interchangeably. For new projects, use the current names.
Auto-Configuration
One-line app setup with automatic broker, Redis, and CORS configuration.
@sse_event Decorator
Automatically publish endpoint responses as SSE events - no manual calls needed.
@sse_endpoint Decorator
Create SSE streaming endpoints with one decorator - handles everything automatically.
75% Less Code
Reduce boilerplate from ~50 lines to ~10 lines. Focus on your business logic.
Getting Started (3 Steps)
Step 1: Create SSEApp
Replace FastAPI() with SSEApp() for automatic configuration:
from fastapi import Request
from fastapi_sse_events import SSEApp, sse_event, sse_endpoint
# One-line setup with auto-configuration!
app = SSEApp(
title="My API",
redis_url="redis://localhost:6379" # or use REDIS_URL env var
)
# That's it! No manual broker setup, no lifecycle management needed!
Step 2: Use @publish_event Decorator
Automatically publish events when endpoints return:
@app.post("/comments")
@publish_event(topic="comments", event="comment_created")
async def create_comment(request: Request, comment: dict):
# Your business logic here
new_comment = {"id": 1, "content": comment["content"]}
# Just return it - automatically published to SSE clients!
return new_comment
Step 3: Use @subscribe_to_events Decorator
Create SSE streaming endpoints with zero boilerplate:
@app.get("/events")
@subscribe_to_events()
async def events(request: Request):
pass # Decorator handles all streaming logic!
Complete!
You now have real-time SSE with minimal code. Open the API docs at
http://localhost:8000/docs to test it!
SSEApp - Auto-configured FastAPI
SSEApp extends FastAPI and automatically configures everything needed
for SSE:
from fastapi_sse_events import SSEApp
app = SSEApp(
title="My API",
version="1.0.0",
# Redis connection (choose one)
redis_url="redis://localhost:6379", # Option 1: URL
# redis_host="localhost", # Option 2: Host/Port
# redis_port=6379,
# redis_db=0,
# Optional settings
topic_prefix="myapp:", # Prefix all topics
enable_cors=True, # Auto-enable CORS
cors_origins=["*"], # Allowed origins
)
What SSEApp Does Automatically
- ✅ Creates and configures
EventBroker - ✅ Sets up Redis connection with Pub/Sub
- ✅ Manages broker lifecycle (connect on startup, disconnect on shutdown)
- ✅ Configures CORS middleware (optional)
- ✅ Makes broker available at
app.brokerandrequest.app.state.broker
Environment Variables
SSEApp reads from environment variables if parameters not provided:
REDIS_URL- Redis connection URLREDIS_HOST- Redis host (default: localhost)REDIS_PORT- Redis port (default: 6379)REDIS_DB- Redis database (default: 0)TOPIC_PREFIX- Topic prefix (default: "")CORS_ORIGINS- Comma-separated origins (default: *)
@publish_event - Auto-Publish Decorator
The @publish_event decorator automatically publishes your endpoint's response as an SSE
event.
No more manual broker.publish() calls!
Legacy Alias: @sse_event (still supported for backwards compatibility)
from fastapi_sse_events import publish_event
# Explicit topic and event name
@app.post("/comments")
@publish_event(topic="comments", event="comment_created")
async def create_comment(request: Request, comment: dict):
return {"id": 1, "content": comment["content"]}
# Auto-infer topic from route path
@app.post("/threads/{thread_id}/comments")
@publish_event() # Auto-infers topic: "threads.comments"
async def add_comment(request: Request, thread_id: int, comment: dict):
return {"id": 1, "thread_id": thread_id, **comment}
# Custom data extraction
@app.post("/users")
@publish_event(
topic="users",
extract_data=lambda resp: {"user_id": resp["id"]} # Only publish ID
)
async def create_user(request: Request, user: dict):
full_user = {"id": 1, "password": "secret", **user}
return full_user # Only {"user_id": 1} published to SSE
Decorator Parameters
| Parameter | Type | Description |
|---|---|---|
topic |
str | None | Topic to publish to. Auto-inferred from route if None. |
event |
str | None | Event name. Uses function name if None. |
extract_data |
Callable | None | Function to transform response before publishing. |
auto_topic |
bool | Auto-generate topic from route path (default: True). |
Auto-Topic Inference
When topic is not provided, the decorator infers it from the route path:
/comments→comments/threads/123/comments→threads.comments/api/v1/users/456→users(filters out "api", "v1", IDs)
Important
You must add request: Request parameter to your endpoint for the decorator to
work. The decorator needs access to the app state to publish events.
@subscribe_to_events - Auto-Streaming Decorator
The @subscribe_to_events decorator creates SSE streaming endpoints with zero boilerplate.
It handles subscription, event streaming, heartbeat, and connection management automatically.
Legacy Alias: @sse_endpoint (still supported for backwards compatibility)
from fastapi_sse_events import sse_endpoint
# Simple SSE endpoint with static topics
@app.get("/events")
@sse_endpoint(topics=["comments", "users"])
async def events(request: Request):
pass # Decorator handles everything!
# Dynamic topics from query params
@app.get("/events/dynamic")
@sse_endpoint() # Topics from ?topic=comments,users
async def events_dynamic(request: Request):
pass
# With authorization
async def check_auth(request: Request, topic: str) -> bool:
user = await get_current_user(request)
return user.can_access(topic)
@app.get("/events/secure")
@sse_endpoint(topics=["comments"], authorize=check_auth)
async def events_secure(request: Request):
pass
# Custom heartbeat
@app.get("/events/fast")
@sse_endpoint(topics=["live-data"], heartbeat=10) # 10 second heartbeat
async def events_fast(request: Request):
pass
Decorator Parameters
| Parameter | Type | Description |
|---|---|---|
topics |
list[str] | None | Topics to subscribe to. Uses query param if None. |
authorize |
Callable | None | Authorization function (request, topic) → bool. |
heartbeat |
int | Heartbeat interval in seconds (default: 30, 0 to disable). |
Before vs After Comparison
❌ Before (Manual API)
from fastapi import FastAPI
from fastapi_sse_events import (
EventBroker, RealtimeConfig,
RedisBackend, mount_sse
)
app = FastAPI()
# Manual configuration
config = RealtimeConfig(
redis_url="redis://localhost:6379"
)
redis_backend = RedisBackend(config)
broker = EventBroker(config, redis_backend)
# Manual lifecycle
@app.on_event("startup")
async def startup():
await redis_backend.connect()
@app.on_event("shutdown")
async def shutdown():
await redis_backend.disconnect()
# Manual publishing
@app.post("/comments")
async def create_comment(comment: dict):
new_comment = {"id": 1, **comment}
# Manual broker call
await broker.publish(
topic="comments",
event="comment_created",
data=new_comment
)
return new_comment
# Manual SSE endpoint
mount_sse(app, broker)
✅ After (Simplified API)
from fastapi import Request
from fastapi_sse_events import (
SSEApp, sse_event, sse_endpoint
)
# One-line setup!
app = SSEApp(
redis_url="redis://localhost:6379"
)
# Auto-publishing
@app.post("/comments")
@sse_event(topic="comments")
async def create_comment(
request: Request,
comment: dict
):
new_comment = {"id": 1, **comment}
return new_comment # Auto-published!
# Auto-streaming
@app.get("/events")
@sse_endpoint(topics=["comments"])
async def events(request: Request):
pass # All automatic!
80% Reduction in Boilerplate
The simplified API reduces setup from ~50 lines to ~10 lines, letting you focus on your business logic instead of configuration.
Migration from Manual API
Migrating to the simplified API is straightforward and can be done gradually:
Step 1: Replace FastAPI with SSEApp
# Before
app = FastAPI()
config = RealtimeConfig(redis_url="...")
# ... manual setup ...
# After
app = SSEApp(redis_url="...")
Step 2: Add Decorators
# Before
@app.post("/comments")
async def create_comment(comment: dict):
new_comment = save(comment)
await broker.publish("comments", "created", new_comment)
return new_comment
# After
@app.post("/comments")
@sse_event(topic="comments", event="created")
async def create_comment(request: Request, comment: dict):
new_comment = save(comment)
return new_comment # Auto-published!
Step 3: Simplify SSE Endpoints
# Before
mount_sse(app, broker, authorize_fn)
# Or manual endpoint creation...
# After
@app.get("/events")
@sse_endpoint(topics=["comments"], authorize=authorize_fn)
async def events(request: Request):
pass
Fully Backwards Compatible
The simplified API is fully backwards compatible. You can mix both APIs in the same
application:
use decorators for new endpoints while keeping manual broker.publish() calls in
existing code.
Access the broker directly via app.broker or
request.app.state.broker.
Complete Example
Here's a complete CRM comments system using the simplified API:
"""Simplified CRM Comments with SSE"""
from fastapi import Request, HTTPException
from pydantic import BaseModel
from fastapi_sse_events import SSEApp, sse_event, sse_endpoint
# One-line app setup
app = SSEApp(
title="CRM Comments",
redis_url="redis://localhost:6379",
enable_cors=True
)
# In-memory storage
comments_db = {}
comment_id = 0
class CommentCreate(BaseModel):
thread_id: int
author: str
content: str
# CRUD endpoints with auto-publishing
@app.post("/comments")
@sse_event(topic="comments", event="comment_created")
async def create_comment(request: Request, comment: CommentCreate):
global comment_id
comment_id += 1
new_comment = {
"id": comment_id,
"thread_id": comment.thread_id,
"author": comment.author,
"content": comment.content
}
comments_db[comment_id] = new_comment
return new_comment # Auto-published to SSE!
@app.put("/comments/{comment_id}")
@sse_event(topic="comments", event="comment_updated")
async def update_comment(
request: Request,
comment_id: int,
content: str
):
if comment_id not in comments_db:
raise HTTPException(404, "Not found")
comments_db[comment_id]["content"] = content
return comments_db[comment_id] # Auto-published!
@app.delete("/comments/{comment_id}")
@sse_event(topic="comments", event="comment_deleted")
async def delete_comment(request: Request, comment_id: int):
if comment_id not in comments_db:
raise HTTPException(404, "Not found")
deleted = comments_db.pop(comment_id)
return {"id": comment_id} # Auto-published!
@app.get("/comments")
async def list_comments():
return list(comments_db.values())
# SSE endpoint with auto-streaming
@app.get("/events")
@sse_endpoint(topics=["comments"])
async def events(request: Request):
pass # All automatic!
# Run with: uvicorn app_simple:app --reload
Try It!
The complete example is available in examples/crm_comments/app_simple.py.
Run it with: cd examples/crm_comments && ./start_simple.sh
FAQ
Can I still use the manual API?
Yes! The simplified API is built on top of the manual API and both work together.
You can access the broker directly via app.broker.publish(...) whenever needed.
Do I need the Request parameter?
Yes, for @sse_event and @sse_endpoint decorators.
The decorators need access to the app state where the broker is stored.
Add request: Request as the first parameter after self (for class
methods).
Can I use dynamic topics?
Yes! You can still use manual app.broker.publish() for dynamic topics like
f"thread:{thread_id}",
or use the decorator for general topics and manual publish for specific ones.
What about authorization?
Use the authorize parameter in @sse_endpoint:
@sse_endpoint(authorize=my_auth_function).
The function receives (request, topic) and returns True/False.
Does it work with dependency injection?
Yes! FastAPI dependencies work normally. Just make sure request: Request is one of
your parameters.
Architecture
FastAPI SSE Events uses a simple but powerful architecture:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Browser │ SSE │ FastAPI │ Pub │ Redis │
│ Client │◄────────│ Instance │◄────────│ Pub/Sub │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ ▲
│ │ │
│ POST /api/task │ Publish Event │
└──────────────────────►│───────────────────────►│
│ │
│ Subscribe Topics │
└───────────────────────►│
Flow Breakdown
- Client connects to
/sse?topics=task:123 - FastAPI subscribes to Redis channels matching those topics
- Another client makes a POST request that modifies data
- Your endpoint publishes event to Redis:
task:created - Redis broadcasts to all subscribed FastAPI instances
- FastAPI forwards event to connected SSE clients
- Browser receives notification and refreshes data via REST
Horizontal Scaling
Because Redis Pub/Sub is used as the message broker, you can run multiple FastAPI instances behind a load balancer. An event published by any instance reaches all connected clients across all instances.
Features
Simple Integration
Add SSE with 3 lines of code. Minimal changes to existing endpoints. Works alongside your REST API.
Server-Sent Events
Lightweight, one-way communication (server → client). Built on standard HTTP with automatic reconnection.
Redis Pub/Sub
Horizontal scaling across multiple API instances. Battle-tested message broker with low latency.
Authorization Hooks
Secure topic subscriptions with custom auth logic. Integrate with your existing authentication system.
Heartbeat Support
Automatic connection keepalive with configurable intervals. Prevents proxy timeouts.
Topic-based Routing
Fine-grained subscription control per resource. Subscribe to exactly what you need.
Type Safe
Full type hints and mypy compliance. Catch errors at development time, not runtime.
Well Tested
Comprehensive test suite with 80%+ coverage. Production-ready and battle-tested.
How It Works
The "Notify Then Fetch" Pattern
FastAPI SSE Events implements a clean separation between notifications and data:
- SSE notifies that something changed
- Client fetches fresh data via REST
- REST API remains the source of truth
SSE vs Full Data Push
We recommend sending lightweight notifications (IDs, types) via SSE rather than full objects. This keeps your REST API as the canonical data source and reduces bandwidth. Clients can intelligently fetch only changed resources.
Example Flow
User A Server User B
│ │ │
│ 1. POST /comments │ │
│────────────────────────►│ │
│ │ │
│ │ 2. SSE Event │
│ │────────────────────►│
│ │ "comment:created" │
│ │ │
│ │ 3. GET /comments │
│ │◄────────────────────│
│ │ │
│ │ 4. Fresh Data │
│ │────────────────────►│
Basic Integration
Step 1: Create SSEApp
from fastapi import Request
from fastapi_sse_events import SSEApp, publish_event, subscribe_to_events
app = SSEApp(
title="My API",
redis_url="redis://localhost:6379"
)
Step 2: Publish Events with Decorators
@app.post("/comments")
@publish_event(topic="comments", event="comment:created")
async def create_comment(request: Request, comment: dict):
# Save to database
saved_comment = {"id": 1, **comment}
return saved_comment # auto-published
Step 3: Create SSE Endpoint + Client Subscription
@app.get("/events")
@subscribe_to_events() # topics from ?topic=...
async def events(request: Request):
pass
// Connect to SSE endpoint
const sse = new EventSource('/events?topic=comments');
// Listen for specific events
sse.addEventListener('comment:created', (event) => {
const data = JSON.parse(event.data);
console.log('New comment:', data);
// Refresh data from REST API
fetch(`/comments/${data.id}`).then(r => r.json()).then(renderComment);
});
// Connection status
sse.addEventListener('open', () => {
console.log('Connected to SSE');
});
sse.addEventListener('error', (error) => {
console.error('SSE error:', error);
// EventSource auto-reconnects
});
Publishing Events
There are two primary approaches to publishing events: using decorators (recommended) or direct publish.
Method 1: Decorator-Based Publishing (Recommended)
The simplest approach - just add @publish_event() to any endpoint and the response is automatically published:
from fastapi_sse_events import publish_event
@app.post("/tickets/{ticket_id}")
@publish_event(topic="tickets", event="ticket:updated")
async def update_ticket(request: Request, ticket_id: int, status: str):
await db.update(ticket_id, status=status)
return {"id": ticket_id, "status": status} # Auto-published!
Method 2: Direct Publish (Advanced)
For fine-grained control, publish manually using app.state.event_broker:
await app.state.event_broker.publish(
topic="resource:123", # Topic (routing key)
event="resource:updated", # Event type
data={ # Payload
"id": "123",
"changed_fields": ["status"]
}
)
Event Publishing API
| Parameter | Type | Description |
|---|---|---|
topic |
str | Topic subscribers are listening to (e.g., "task:123") |
event |
str | Event type identifier (e.g., "task:updated") |
data |
dict | EventData | Event payload (serialized to JSON) |
Common Publishing Patterns
# Resource Created
await app.state.event_broker.publish(
topic="tasks",
event="task:created",
data={"id": task_id, "title": "New Task"}
)
# Resource Updated
await app.state.event_broker.publish(
topic=f"task:{task_id}",
event="task:updated",
data={"id": task_id, "status": "completed"}
)
# Resource Deleted
await app.state.event_broker.publish(
topic=f"task:{task_id}",
event="task:deleted",
data={"id": task_id}
)
# Broadcast to All
await app.state.event_broker.publish(
topic="global",
event="system:maintenance",
data={"message": "Scheduled maintenance in 5 minutes"}
)
Topic Patterns
Topics are simple string identifiers that route events to subscribers. Use consistent naming conventions across your application.
Simple Topic Strings
Just use strings directly - no special helpers required:
# Simple topics
await app.state.event_broker.publish(
topic="comments", # Broadcast to all subscribers
event="comment:created",
data={"id": 1, "text": "Hello"}
)
# With resource IDs for granularity
await app.state.event_broker.publish(
topic="comment:123", # Specific comment
event="updated",
data={"text": "Edited"}
)
# Workspace-specific
await app.state.event_broker.publish(
topic="workspace:acme-corp",
event="task:created",
data={"id": 456, "title": "New Task"}
)
Topic Naming Conventions
Simple Broadcast
Topic for broadcasting to everyone: comments, tasks, announcements
Resource-Specific
Include resource IDs for security: comment:123, task:456, user:john
Namespace Hierarchy
Use colons to group related topics: workspace:wsp1, org:org1:team:dev
System Topics
Reserved topics for app-wide events: broadcast, global, system
Using TopicBuilder (Optional)
For consistency, you can optionally use the TopicBuilder helper:
from fastapi_sse_events import TopicBuilder
# Built-in patterns (all optional)
TopicBuilder.comment("c123") # → "comment:c123"
TopicBuilder.task("t456") # → "task:t456"
TopicBuilder.workspace("ws789") # → "workspace:ws789"
TopicBuilder.user("john") # → "user:john"
# Custom topics
TopicBuilder.custom("invoice", "inv001") # → "invoice:inv001"
Client Subscriptions
// Subscribe to broadcast topic
new EventSource('/events?topic=comments')
// Subscribe to specific resource
new EventSource('/events?topic=comment:123')
// Subscribe to workspace
new EventSource('/events?topic=workspace:acme-corp')
// Subscribe to multiple topics (comma-separated)
new EventSource('/events?topic=comment:123,workspace:acme-corp')
Configuration
The SSEApp class provides simple one-line configuration with sensible defaults:
from fastapi_sse_events import SSEApp
import os
# Minimal setup
app = SSEApp(redis_url="redis://localhost:6379/0")
# Full configuration
app = SSEApp(
title="My API",
redis_url=os.getenv("REDIS_URL", "redis://localhost:6379/0"),
heartbeat_seconds=30, # SSE keepalive interval (5-60s)
authorize=authorize_subscription, # Authorization callback (optional)
)
Configuration Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
title |
str | "FastAPI" | App title (optional) |
redis_url |
str | ⚠️ Required | Redis connection URL |
heartbeat_seconds |
int | 30 | SSE keepalive interval (5-60) |
authorize |
callable | None | Authorization callback function |
Environment Variables
Load configuration from environment variables:
# Development
REDIS_URL=redis://localhost:6379/0
SSE_HEARTBEAT_SECONDS=30
# Production (with authentication)
REDIS_URL=redis://:password@redis-prod.example.com:6379/0
SSE_HEARTBEAT_SECONDS=60
from dotenv import load_dotenv
import os
from fastapi_sse_events import SSEApp
load_dotenv() # Load .env file
app = SSEApp(
redis_url=os.getenv("REDIS_URL"),
heartbeat_seconds=int(os.getenv("SSE_HEARTBEAT_SECONDS", "30")),
)
JavaScript Client
Use the native EventSource API in the browser for simple, efficient real-time updates:
// Create SSE connection
const eventSource = new EventSource('/events?topic=comments');
// Listen for connection events
eventSource.addEventListener('open', () => {
console.log('✅ Connected to SSE');
updateStatus('Connected');
});
eventSource.addEventListener('error', (error) => {
console.error('❌ SSE Error:', error);
updateStatus('Disconnected');
// EventSource automatically reconnects
});
// Listen for specific events
eventSource.addEventListener('comment:created', (event) => {
const data = JSON.parse(event.data);
console.log('New comment:', data);
// Fetch and display the new comment
fetch(`/comments/${data.id}`)
.then(r => r.json())
.then(comment => addCommentToDOM(comment));
});
eventSource.addEventListener('comment:updated', (event) => {
const data = JSON.parse(event.data);
updateCommentInDOM(data.id, data);
});
eventSource.addEventListener('heartbeat', () => {
console.log('💓 Heartbeat');
});
// Subscribe to multiple topics
const topics = ['comments', 'notifications'];
const eventSource2 = new EventSource(
`/events?topic=${topics.join(',')}`
);
// Cleanup
function closeConnection() {
eventSource.close();
eventSource2.close();
}
window.addEventListener('beforeunload', closeConnection);
Automatic Reconnection
EventSource automatically reconnects if the connection drops. You don't need to
implement retry logic - just handle the open event to know when reconnection
succeeds.
React Client
Create a reusable React hook for managing SSE connections:
import { useEffect, useState, useCallback } from 'react';
interface UseSSEOptions {
topic: string;
onEvent?: (eventType: string, data: any) => void;
}
export function useSSE({ topic, onEvent }: UseSSEOptions) {
const [connected, setConnected] = useState(false);
const [error, setError] = useState(null);
useEffect(() => {
const eventSource = new EventSource(`/events?topic=${topic}`);
eventSource.addEventListener('open', () => {
setConnected(true);
setError(null);
});
eventSource.addEventListener('error', () => {
setConnected(false);
setError(new Error('SSE connection error'));
});
// Forward all events to callback
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
onEvent?.(event.type, data);
};
return () => eventSource.close();
}, [topic, onEvent]);
return { connected, error };
}
// Usage example
export function CommentThread({ threadId }: { threadId: number }) {
const [comments, setComments] = useState([]);
const handleEvent = useCallback((eventType: string, data: any) => {
if (['comment:created', 'comment:updated'].includes(eventType)) {
// Refresh comments from API
fetchComments();
}
}, []);
const { connected } = useSSE(
{ topic: `comment_thread:${threadId}`, onEvent: handleEvent }
);
useEffect(() => {
fetchComments();
}, [threadId]);
const fetchComments = async () => {
const res = await fetch(`/comments?thread_id=${threadId}`);
setComments(await res.json());
};
return (
{connected ? '🟢 Live' : '🔴 Offline'}
{comments.map(c => (
{c.content}
))}
);
}
Python Client
Connect to SSE from Python using httpx or sseclient-py library:
import httpx
import json
async def listen_to_sse():
"""Listen to SSE events using httpx."""
async with httpx.AsyncClient() as client:
async with client.stream(
"GET",
"http://localhost:8000/events?topic=comments",
headers={"Accept": "text/event-stream"}
) as response:
async for line in response.aiter_lines():
if line.startswith("event:"):
event_type = line[7:].strip()
elif line.startswith("data:"):
data_str = line[5:].strip()
try:
data = json.loads(data_str)
print(f"Event {event_type}: {data}")
except json.JSONDecodeError:
pass
from sseclient import SSEClient
import json
def listen_to_events():
"""Simple SSE listening using sseclient-py."""
url = "http://localhost:8000/events?topic=comments"
for event in SSEClient(url):
if event.event != "heartbeat":
data = json.loads(event.data)
print(f"Event: {event.event}")
print(f"Data: {data}")
# Run it
listen_to_events()
Production Setup
Redis Configuration
For production, use a managed Redis service or configure Redis with persistence:
# Memory management
maxmemory 2gb
maxmemory-policy allkeys-lru
# Persistence (optional for pub/sub)
save ""
appendonly no
# Network
bind 0.0.0.0
protected-mode yes
requirepass your-secure-password
# Pub/Sub specific
timeout 0
tcp-keepalive 300
Environment Variables
# Redis
SSE_REDIS_URL=redis://:password@redis-host:6379/0
# SSE Config
SSE_HEARTBEAT_SECONDS=30
SSE_PATH=/sse
SSE_TOPIC_PREFIX=production
# FastAPI
APP_ENV=production
LOG_LEVEL=info
Uvicorn Production Settings
uvicorn app:app \
--host 0.0.0.0 \
--port 8000 \
--workers 4 \
--loop uvloop \
--log-level info \
--proxy-headers \
--forwarded-allow-ips='*'
Nginx Configuration
Configure Nginx to proxy SSE connections with proper timeouts:
upstream fastapi_backend {
server 127.0.0.1:8000;
server 127.0.0.1:8001;
server 127.0.0.1:8002;
keepalive 32;
}
server {
listen 80;
server_name api.example.com;
# Standard API endpoints
location / {
proxy_pass http://fastapi_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
# SSE endpoint - special configuration
location /sse {
proxy_pass http://fastapi_backend;
proxy_http_version 1.1;
# SSE specific headers
proxy_set_header Connection '';
proxy_set_header Cache-Control 'no-cache';
# Disable buffering
proxy_buffering off;
proxy_cache off;
# Long timeouts for SSE
proxy_read_timeout 86400s;
proxy_send_timeout 86400s;
# Forward headers
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# Chunked transfer encoding
chunked_transfer_encoding on;
}
}
Critical SSE Settings
proxy_buffering off- Prevents Nginx from buffering SSE responsesproxy_read_timeout 86400s- Long timeout for persistent connectionsproxy_http_version 1.1- Required for persistent connectionschunked_transfer_encoding on- Enables streaming
Docker Deployment
version: '3.8'
services:
redis:
image: redis:7-alpine
command: redis-server --requirepass ${REDIS_PASSWORD}
ports:
- "6379:6379"
volumes:
- redis_data:/data
restart: unless-stopped
api:
build: .
command: uvicorn app:app --host 0.0.0.0 --port 8000
environment:
SSE_REDIS_URL: redis://:${REDIS_PASSWORD}@redis:6379/0
SSE_HEARTBEAT_SECONDS: 30
ports:
- "8000:8000"
depends_on:
- redis
restart: unless-stopped
deploy:
replicas: 3
volumes:
redis_data:
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Run
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
Performance Tips
Horizontal Scaling
Run multiple FastAPI instances behind a load balancer. Redis Pub/Sub ensures events reach all instances.
Granular Topics
Use specific topics (e.g., task:123) instead of broad wildcards to reduce
unnecessary events.
Lightweight Payloads
Send IDs and types via SSE, not full objects. Let clients fetch complete data via REST.
Tune Heartbeat
Balance connection keepalive (30-60s) with bandwidth. Higher intervals reduce traffic.
Redis Memory
Pub/Sub uses minimal memory. Configure maxmemory-policy appropriately if
using Redis for caching too.
Monitor Connections
Track active SSE connections and Redis pub/sub channels. Set alerts for anomalies.
Connection Limits
| Component | Limit | Notes |
|---|---|---|
| Redis Pub/Sub | Unlimited channels | Memory is per connection, not per channel |
| FastAPI Instance | ~10K connections | Depends on system resources |
| Nginx | Configurable | Set worker_connections |
| Browser | 6 per domain | HTTP/1.1 limitation |
API Reference
SSEApp (Recommended)
FastAPI subclass with built-in SSE support and automatic setup:
class SSEApp(FastAPI):
"""FastAPI with SSE pre-configured."""
def __init__(
self,
title: str = "API",
redis_url: str = "redis://localhost:6379/0",
heartbeat_seconds: int = 30,
authorize: Optional[AuthorizeFn] = None,
**kwargs
)
| Parameter | Type | Default | Description |
|---|---|---|---|
title |
str | "API" | Application title |
redis_url |
str | redis://localhost:6379/0 | Redis connection URL |
heartbeat_seconds |
int | 30 | SSE keepalive interval (5-60) |
authorize |
AuthorizeFn | None | None | Optional authorization callback |
@publish_event()
Automatically publish endpoint response as SSE event:
@publish_event(
topic: str,
event: str
)
| Parameter | Type | Description |
|---|---|---|
topic |
str | Topic to publish to |
event |
str | Event type identifier |
@subscribe_to_events()
Convert endpoint to SSE stream. Use query parameter: ?topic=...
@subscribe_to_events()
async def events(request: Request):
pass
EventBroker.publish()
Manually publish event (use decorators when possible):
async def publish(
self,
topic: str,
event: str,
data: dict | EventData
) -> None
TopicBuilder (Optional)
Helper for consistent topic naming (not required - plain strings work):
class TopicBuilder:
@staticmethod
def comment(comment_id: str) -> str # "comment:id"
@staticmethod
def task(task_id: str) -> str # "task:id"
@staticmethod
def workspace(workspace_id: str) -> str # "workspace:id"
@staticmethod
def user(user_id: str) -> str # "user:id"
@staticmethod
def custom(resource: str, id: str) -> str # "resource:id"
Type Definitions
# Authorization callback type
AuthorizeFn = Callable[[Request, str], Awaitable[bool]]
# Example authorization
async def authorize(request: Request, topic: str) -> bool:
"""Return True to allow subscription, False to deny."""
user = await get_current_user(request)
return user is not None
# Event data type
EventData = dict[str, Any] # Any JSON-serializable dict
Advanced: Manual Setup
For complex use cases, manually configure the broker:
from fastapi import FastAPI
from fastapi_sse_events import mount_sse, RealtimeConfig
app = FastAPI()
config = RealtimeConfig(
redis_url="redis://localhost:6379/0",
heartbeat_seconds=30,
)
broker = mount_sse(app, config, authorize=auth_fn)
# Now use: app.state.event_broker
Examples
Quick Start Example (Recommended)
The easiest way to get started is with the Quick Start example at examples/quickstart/.
This demonstrates the recommended simplified API approach with minimal boilerplate.
What's Included
app.py- Simplified API using SSEApp + decorators- Task management API with real-time updates
- Embedded browser client (built into app)
- ~60 lines of code (75% less boilerplate!)
cd examples/quickstart
pip install -r requirements.txt # Or: pip install fastapi uvicorn fastapi-sse-events redis
redis-server # Start Redis (or: docker run -d -p 6379:6379 redis:7-alpine)
uvicorn app:app --reload # Start FastAPI
Open http://localhost:8000 in your browser to see real-time task updates!
CRM Comments Example (Advanced)
For a more complex example with authorization and advanced patterns, see examples/crm_comments/.
This demonstrates a multi-user comment system with role-based access control.
Available Versions
app.py- Traditional approach with manual brokerapp_simple.py- Simplified approach (basic decorators)app_simple_new.py- Simplified approach (current decorators)client.html- Beautiful web UI with EventSource
cd examples/crm_comments
./start.sh
# Or manually:
pip install -r requirements.txt
uvicorn app:app --reload
Open http://localhost:8000/client.html in multiple browser windows to see real-time updates!
Production Scale Example
For deployment at scale, see examples/production_scale/.
This includes Docker, Nginx reverse proxy, load testing, and performance benchmarks.
Production Features
- Multi-container Docker setup (FastAPI + Redis)
- Nginx reverse proxy configuration
- Load testing script (supporting 100K+ concurrent connections)
- Prometheus monitoring setup
- Performance tuning guidelines
cd examples/production_scale
./start.sh # Starts all services via docker-compose
# Run load tests
pip install -r load_test_requirements.txt
python load_test.py
See the README.md in each example directory for detailed information.
Troubleshooting
Connection Issues
SSE not connecting
- Check Redis is running:
redis-cli ping - Verify SSE path in client matches server config
- Check browser console for CORS errors
- Ensure no firewall blocking port 6379
Connection drops frequently
- Reduce
heartbeat_seconds(try 15-20s) - Check proxy/load balancer timeout settings
- Verify Nginx buffering is disabled for SSE path
- Monitor network stability
Event Issues
Events not received
- Verify client subscribed to correct topic
- Check server is publishing to matching topic
- Confirm
topic_prefixmatches on both sides - Test Redis pub/sub:
redis-cli subscribe topic:123
Events only work on one server
- Verify all FastAPI instances use same Redis
- Check Redis connection string includes correct host
- Ensure no firewall between servers and Redis
Performance Issues
High memory usage
- Check number of active SSE connections
- Verify event payloads are small
- Monitor Redis memory with
redis-cli info memory - Consider connection limits per user
Debug Mode
// Enable verbose logging
const eventSource = new EventSource('/sse?topics=debug');
eventSource.onmessage = (event) => {
console.log('Raw SSE:', event);
};
// Monitor all event types
['open', 'error', 'message'].forEach(type => {
eventSource.addEventListener(type, (e) => {
console.log(`[${type}]`, e);
});
});