v0.2.0

FastAPI SSE Events

Add real-time "refresh-less" updates to your FastAPI REST API using Server-Sent Events (SSE) and Redis Pub/Sub. Perfect for collaborative tools and dashboards.

Simple Integration
Type Safe
Redis Backed
Horizontally Scalable

Overview

FastAPI SSE Events enables near real-time notifications for REST-based applications without the complexity of WebSockets. Clients subscribe to Server-Sent Event streams for specific topics, receive lightweight notifications when data changes, then refresh data via existing REST endpoints.

Perfect For

  • Collaborative CRM systems where sales reps see updates from teammates
  • Project management tools with real-time task updates
  • Live dashboards showing metrics and notifications
  • Multi-user editing interfaces requiring change notifications

Why SSE?

The Problem

Traditional REST APIs require manual refresh to see updates from other users. This creates poor collaboration experiences and inefficient workflows. Users constantly hit refresh or polling adds unnecessary server load.

Why Not WebSockets?

WebSockets are powerful but come with complexity:

  • Require separate infrastructure and connection management
  • More difficult to debug and monitor
  • Overkill for one-way notifications
  • Don't work well with existing REST patterns
  • Harder to secure with standard auth mechanisms

The SSE Solution

HTTP Based

Works with existing infrastructure - no special proxying or routing needed.

Native Browser Support

Built-in EventSource API in all modern browsers with automatic reconnection.

One-Way Communication

Perfect for notifications - server pushes updates, client remains stateless.

REST as Source of Truth

SSE notifies, REST endpoints provide data - clean separation of concerns.

Installation

bash
pip install fastapi-sse-events

Requirements

  • Python 3.10+
  • FastAPI 0.104.0+
  • Redis 5.0+ (running locally or remotely)

Redis Setup

You'll need a Redis server. Quick options:

Docker
docker run -d -p 6379:6379 redis:alpine
Local Install
redis-server

Quick Start

Recommended Flow

Use the simplified decorator-based API with SSEApp, @publish_event, and @subscribe_to_events. This is the current, recommended approach.

Get SSE running in your FastAPI app with just a few lines of code:

1. Create Your FastAPI App

app.py
from fastapi import Request
from fastapi_sse_events import SSEApp, publish_event, subscribe_to_events

app = SSEApp(
    title="My API",
    redis_url="redis://localhost:6379"
)

@app.post("/tasks")
@publish_event(topic="tasks", event="task:created")
async def create_task(request: Request, task: dict):
    # ... save task to database ...
    return {"id": 1, **task}  # auto-published to SSE clients

@app.get("/events")
@subscribe_to_events()
async def events(request: Request):
    pass

2. Connect from Browser

client.js
// Subscribe to topic from query param
const eventSource = new EventSource('/events?topic=tasks');

eventSource.addEventListener('task:created', async (event) => {
    const data = JSON.parse(event.data);
    console.log('Task event:', data);

    // Notify-then-fetch pattern (recommended)
    const response = await fetch(`/tasks/${data.id}`);
    const task = await response.json();
    renderTask(task);
});

eventSource.addEventListener('open', () => {
    console.log('✅ Connected to SSE');
});

That's It!

You now have real-time notifications. Clients connect to /events?topic=... and receive instant updates when decorated endpoints return.

Simplified API (Recommended)

Recommended Approach

The simplified decorator-based API makes adding real-time SSE events incredibly easy by eliminating 75%+ of boilerplate code. It provides auto-configuration, automatic event publishing via decorators, and automatic SSE streaming.

Naming Conventions

Current Names (Recommended): @publish_event, @subscribe_to_events

Legacy Aliases (Still Supported): @sse_event, @sse_endpoint

Both naming conventions work interchangeably. For new projects, use the current names.

Auto-Configuration

One-line app setup with automatic broker, Redis, and CORS configuration.

@sse_event Decorator

Automatically publish endpoint responses as SSE events - no manual calls needed.

@sse_endpoint Decorator

Create SSE streaming endpoints with one decorator - handles everything automatically.

75% Less Code

Reduce boilerplate from ~50 lines to ~10 lines. Focus on your business logic.

Getting Started (3 Steps)

Step 1: Create SSEApp

Replace FastAPI() with SSEApp() for automatic configuration:

app.py
from fastapi import Request
from fastapi_sse_events import SSEApp, sse_event, sse_endpoint

# One-line setup with auto-configuration!
app = SSEApp(
    title="My API",
    redis_url="redis://localhost:6379"  # or use REDIS_URL env var
)

# That's it! No manual broker setup, no lifecycle management needed!

Step 2: Use @publish_event Decorator

Automatically publish events when endpoints return:

Python
@app.post("/comments")
@publish_event(topic="comments", event="comment_created")
async def create_comment(request: Request, comment: dict):
    # Your business logic here
    new_comment = {"id": 1, "content": comment["content"]}
    
    # Just return it - automatically published to SSE clients!
    return new_comment

Step 3: Use @subscribe_to_events Decorator

Create SSE streaming endpoints with zero boilerplate:

Python
@app.get("/events")
@subscribe_to_events()
async def events(request: Request):
    pass  # Decorator handles all streaming logic!

Complete!

You now have real-time SSE with minimal code. Open the API docs at http://localhost:8000/docs to test it!

SSEApp - Auto-configured FastAPI

SSEApp extends FastAPI and automatically configures everything needed for SSE:

Python
from fastapi_sse_events import SSEApp

app = SSEApp(
    title="My API",
    version="1.0.0",
    
    # Redis connection (choose one)
    redis_url="redis://localhost:6379",       # Option 1: URL
    # redis_host="localhost",                  # Option 2: Host/Port
    # redis_port=6379,
    # redis_db=0,
    
    # Optional settings
    topic_prefix="myapp:",                    # Prefix all topics
    enable_cors=True,                         # Auto-enable CORS
    cors_origins=["*"],                       # Allowed origins
)

What SSEApp Does Automatically

  • ✅ Creates and configures EventBroker
  • ✅ Sets up Redis connection with Pub/Sub
  • ✅ Manages broker lifecycle (connect on startup, disconnect on shutdown)
  • ✅ Configures CORS middleware (optional)
  • ✅ Makes broker available at app.broker and request.app.state.broker

Environment Variables

SSEApp reads from environment variables if parameters not provided:

  • REDIS_URL - Redis connection URL
  • REDIS_HOST - Redis host (default: localhost)
  • REDIS_PORT - Redis port (default: 6379)
  • REDIS_DB - Redis database (default: 0)
  • TOPIC_PREFIX - Topic prefix (default: "")
  • CORS_ORIGINS - Comma-separated origins (default: *)

@publish_event - Auto-Publish Decorator

The @publish_event decorator automatically publishes your endpoint's response as an SSE event. No more manual broker.publish() calls!
Legacy Alias: @sse_event (still supported for backwards compatibility)

Python
from fastapi_sse_events import publish_event

# Explicit topic and event name
@app.post("/comments")
@publish_event(topic="comments", event="comment_created")
async def create_comment(request: Request, comment: dict):
    return {"id": 1, "content": comment["content"]}

# Auto-infer topic from route path
@app.post("/threads/{thread_id}/comments")
@publish_event()  # Auto-infers topic: "threads.comments"
async def add_comment(request: Request, thread_id: int, comment: dict):
    return {"id": 1, "thread_id": thread_id, **comment}

# Custom data extraction
@app.post("/users")
@publish_event(
    topic="users",
    extract_data=lambda resp: {"user_id": resp["id"]}  # Only publish ID
)
async def create_user(request: Request, user: dict):
    full_user = {"id": 1, "password": "secret", **user}
    return full_user  # Only {"user_id": 1} published to SSE

Decorator Parameters

Parameter Type Description
topic str | None Topic to publish to. Auto-inferred from route if None.
event str | None Event name. Uses function name if None.
extract_data Callable | None Function to transform response before publishing.
auto_topic bool Auto-generate topic from route path (default: True).

Auto-Topic Inference

When topic is not provided, the decorator infers it from the route path:

  • /commentscomments
  • /threads/123/commentsthreads.comments
  • /api/v1/users/456users (filters out "api", "v1", IDs)

Important

You must add request: Request parameter to your endpoint for the decorator to work. The decorator needs access to the app state to publish events.

@subscribe_to_events - Auto-Streaming Decorator

The @subscribe_to_events decorator creates SSE streaming endpoints with zero boilerplate. It handles subscription, event streaming, heartbeat, and connection management automatically.
Legacy Alias: @sse_endpoint (still supported for backwards compatibility)

Python
from fastapi_sse_events import sse_endpoint

# Simple SSE endpoint with static topics
@app.get("/events")
@sse_endpoint(topics=["comments", "users"])
async def events(request: Request):
    pass  # Decorator handles everything!

# Dynamic topics from query params
@app.get("/events/dynamic")
@sse_endpoint()  # Topics from ?topic=comments,users
async def events_dynamic(request: Request):
    pass

# With authorization
async def check_auth(request: Request, topic: str) -> bool:
    user = await get_current_user(request)
    return user.can_access(topic)

@app.get("/events/secure")
@sse_endpoint(topics=["comments"], authorize=check_auth)
async def events_secure(request: Request):
    pass

# Custom heartbeat
@app.get("/events/fast")
@sse_endpoint(topics=["live-data"], heartbeat=10)  # 10 second heartbeat
async def events_fast(request: Request):
    pass

Decorator Parameters

Parameter Type Description
topics list[str] | None Topics to subscribe to. Uses query param if None.
authorize Callable | None Authorization function (request, topic) → bool.
heartbeat int Heartbeat interval in seconds (default: 30, 0 to disable).

Before vs After Comparison

❌ Before (Manual API)

~50 lines
from fastapi import FastAPI
from fastapi_sse_events import (
    EventBroker, RealtimeConfig,
    RedisBackend, mount_sse
)

app = FastAPI()

# Manual configuration
config = RealtimeConfig(
    redis_url="redis://localhost:6379"
)
redis_backend = RedisBackend(config)
broker = EventBroker(config, redis_backend)

# Manual lifecycle
@app.on_event("startup")
async def startup():
    await redis_backend.connect()

@app.on_event("shutdown")
async def shutdown():
    await redis_backend.disconnect()

# Manual publishing
@app.post("/comments")
async def create_comment(comment: dict):
    new_comment = {"id": 1, **comment}
    
    # Manual broker call
    await broker.publish(
        topic="comments",
        event="comment_created",
        data=new_comment
    )
    
    return new_comment

# Manual SSE endpoint
mount_sse(app, broker)

✅ After (Simplified API)

~10 lines
from fastapi import Request
from fastapi_sse_events import (
    SSEApp, sse_event, sse_endpoint
)

# One-line setup!
app = SSEApp(
    redis_url="redis://localhost:6379"
)

# Auto-publishing
@app.post("/comments")
@sse_event(topic="comments")
async def create_comment(
    request: Request,
    comment: dict
):
    new_comment = {"id": 1, **comment}
    return new_comment  # Auto-published!

# Auto-streaming
@app.get("/events")
@sse_endpoint(topics=["comments"])
async def events(request: Request):
    pass  # All automatic!

80% Reduction in Boilerplate

The simplified API reduces setup from ~50 lines to ~10 lines, letting you focus on your business logic instead of configuration.

Migration from Manual API

Migrating to the simplified API is straightforward and can be done gradually:

Step 1: Replace FastAPI with SSEApp

Before → After
# Before
app = FastAPI()
config = RealtimeConfig(redis_url="...")
# ... manual setup ...

# After
app = SSEApp(redis_url="...")

Step 2: Add Decorators

Before → After
# Before
@app.post("/comments")
async def create_comment(comment: dict):
    new_comment = save(comment)
    await broker.publish("comments", "created", new_comment)
    return new_comment

# After
@app.post("/comments")
@sse_event(topic="comments", event="created")
async def create_comment(request: Request, comment: dict):
    new_comment = save(comment)
    return new_comment  # Auto-published!

Step 3: Simplify SSE Endpoints

Before → After
# Before
mount_sse(app, broker, authorize_fn)
# Or manual endpoint creation...

# After
@app.get("/events")
@sse_endpoint(topics=["comments"], authorize=authorize_fn)
async def events(request: Request):
    pass

Fully Backwards Compatible

The simplified API is fully backwards compatible. You can mix both APIs in the same application: use decorators for new endpoints while keeping manual broker.publish() calls in existing code. Access the broker directly via app.broker or request.app.state.broker.

Complete Example

Here's a complete CRM comments system using the simplified API:

app_simple.py
"""Simplified CRM Comments with SSE"""
from fastapi import Request, HTTPException
from pydantic import BaseModel
from fastapi_sse_events import SSEApp, sse_event, sse_endpoint

# One-line app setup
app = SSEApp(
    title="CRM Comments",
    redis_url="redis://localhost:6379",
    enable_cors=True
)

# In-memory storage
comments_db = {}
comment_id = 0

class CommentCreate(BaseModel):
    thread_id: int
    author: str
    content: str

# CRUD endpoints with auto-publishing
@app.post("/comments")
@sse_event(topic="comments", event="comment_created")
async def create_comment(request: Request, comment: CommentCreate):
    global comment_id
    comment_id += 1
    
    new_comment = {
        "id": comment_id,
        "thread_id": comment.thread_id,
        "author": comment.author,
        "content": comment.content
    }
    
    comments_db[comment_id] = new_comment
    return new_comment  # Auto-published to SSE!

@app.put("/comments/{comment_id}")
@sse_event(topic="comments", event="comment_updated")
async def update_comment(
    request: Request,
    comment_id: int,
    content: str
):
    if comment_id not in comments_db:
        raise HTTPException(404, "Not found")
    
    comments_db[comment_id]["content"] = content
    return comments_db[comment_id]  # Auto-published!

@app.delete("/comments/{comment_id}")
@sse_event(topic="comments", event="comment_deleted")
async def delete_comment(request: Request, comment_id: int):
    if comment_id not in comments_db:
        raise HTTPException(404, "Not found")
    
    deleted = comments_db.pop(comment_id)
    return {"id": comment_id}  # Auto-published!

@app.get("/comments")
async def list_comments():
    return list(comments_db.values())

# SSE endpoint with auto-streaming
@app.get("/events")
@sse_endpoint(topics=["comments"])
async def events(request: Request):
    pass  # All automatic!

# Run with: uvicorn app_simple:app --reload

Try It!

The complete example is available in examples/crm_comments/app_simple.py. Run it with: cd examples/crm_comments && ./start_simple.sh

FAQ

Can I still use the manual API?

Yes! The simplified API is built on top of the manual API and both work together. You can access the broker directly via app.broker.publish(...) whenever needed.

Do I need the Request parameter?

Yes, for @sse_event and @sse_endpoint decorators. The decorators need access to the app state where the broker is stored. Add request: Request as the first parameter after self (for class methods).

Can I use dynamic topics?

Yes! You can still use manual app.broker.publish() for dynamic topics like f"thread:{thread_id}", or use the decorator for general topics and manual publish for specific ones.

What about authorization?

Use the authorize parameter in @sse_endpoint: @sse_endpoint(authorize=my_auth_function). The function receives (request, topic) and returns True/False.

Does it work with dependency injection?

Yes! FastAPI dependencies work normally. Just make sure request: Request is one of your parameters.

Architecture

FastAPI SSE Events uses a simple but powerful architecture:

Architecture Flow
┌─────────────┐         ┌─────────────┐         ┌─────────────┐
│   Browser   │  SSE    │  FastAPI    │  Pub    │    Redis    │
│   Client    │◄────────│   Instance  │◄────────│   Pub/Sub   │
└─────────────┘         └─────────────┘         └─────────────┘
      │                       │                        ▲
      │                       │                        │
      │  POST /api/task       │  Publish Event         │
      └──────────────────────►│───────────────────────►│
                              │                        │
                              │   Subscribe Topics     │
                              └───────────────────────►│

Flow Breakdown

  1. Client connects to /sse?topics=task:123
  2. FastAPI subscribes to Redis channels matching those topics
  3. Another client makes a POST request that modifies data
  4. Your endpoint publishes event to Redis: task:created
  5. Redis broadcasts to all subscribed FastAPI instances
  6. FastAPI forwards event to connected SSE clients
  7. Browser receives notification and refreshes data via REST

Horizontal Scaling

Because Redis Pub/Sub is used as the message broker, you can run multiple FastAPI instances behind a load balancer. An event published by any instance reaches all connected clients across all instances.

Features

Simple Integration

Add SSE with 3 lines of code. Minimal changes to existing endpoints. Works alongside your REST API.

Server-Sent Events

Lightweight, one-way communication (server → client). Built on standard HTTP with automatic reconnection.

Redis Pub/Sub

Horizontal scaling across multiple API instances. Battle-tested message broker with low latency.

Authorization Hooks

Secure topic subscriptions with custom auth logic. Integrate with your existing authentication system.

Heartbeat Support

Automatic connection keepalive with configurable intervals. Prevents proxy timeouts.

Topic-based Routing

Fine-grained subscription control per resource. Subscribe to exactly what you need.

Type Safe

Full type hints and mypy compliance. Catch errors at development time, not runtime.

Well Tested

Comprehensive test suite with 80%+ coverage. Production-ready and battle-tested.

How It Works

The "Notify Then Fetch" Pattern

FastAPI SSE Events implements a clean separation between notifications and data:

  1. SSE notifies that something changed
  2. Client fetches fresh data via REST
  3. REST API remains the source of truth

SSE vs Full Data Push

We recommend sending lightweight notifications (IDs, types) via SSE rather than full objects. This keeps your REST API as the canonical data source and reduces bandwidth. Clients can intelligently fetch only changed resources.

Example Flow

sequence-diagram
User A                  Server                  User B
  │                         │                      │
  │  1. POST /comments      │                      │
  │────────────────────────►│                      │
  │                         │                      │
  │                         │  2. SSE Event        │
  │                         │────────────────────►│
  │                         │   "comment:created"  │
  │                         │                      │
  │                         │  3. GET /comments    │
  │                         │◄────────────────────│
  │                         │                      │
  │                         │  4. Fresh Data       │
  │                         │────────────────────►│

Basic Integration

Step 1: Create SSEApp

Python
from fastapi import Request
from fastapi_sse_events import SSEApp, publish_event, subscribe_to_events

app = SSEApp(
    title="My API",
    redis_url="redis://localhost:6379"
)

Step 2: Publish Events with Decorators

Python
@app.post("/comments")
@publish_event(topic="comments", event="comment:created")
async def create_comment(request: Request, comment: dict):
    # Save to database
    saved_comment = {"id": 1, **comment}
    return saved_comment  # auto-published

Step 3: Create SSE Endpoint + Client Subscription

Python
@app.get("/events")
@subscribe_to_events()  # topics from ?topic=...
async def events(request: Request):
    pass
JavaScript
// Connect to SSE endpoint
const sse = new EventSource('/events?topic=comments');

// Listen for specific events
sse.addEventListener('comment:created', (event) => {
  const data = JSON.parse(event.data);
  console.log('New comment:', data);

  // Refresh data from REST API
    fetch(`/comments/${data.id}`).then(r => r.json()).then(renderComment);
});

// Connection status
sse.addEventListener('open', () => {
  console.log('Connected to SSE');
});

sse.addEventListener('error', (error) => {
  console.error('SSE error:', error);
  // EventSource auto-reconnects
});

Publishing Events

There are two primary approaches to publishing events: using decorators (recommended) or direct publish.

Method 1: Decorator-Based Publishing (Recommended)

The simplest approach - just add @publish_event() to any endpoint and the response is automatically published:

Python
from fastapi_sse_events import publish_event

@app.post("/tickets/{ticket_id}")
@publish_event(topic="tickets", event="ticket:updated")
async def update_ticket(request: Request, ticket_id: int, status: str):
    await db.update(ticket_id, status=status)
    return {"id": ticket_id, "status": status}  # Auto-published!

Method 2: Direct Publish (Advanced)

For fine-grained control, publish manually using app.state.event_broker:

Python
await app.state.event_broker.publish(
    topic="resource:123",      # Topic (routing key)
    event="resource:updated",  # Event type
    data={                     # Payload
        "id": "123",
        "changed_fields": ["status"]
    }
)

Event Publishing API

Parameter Type Description
topic str Topic subscribers are listening to (e.g., "task:123")
event str Event type identifier (e.g., "task:updated")
data dict | EventData Event payload (serialized to JSON)

Common Publishing Patterns

Python
# Resource Created
await app.state.event_broker.publish(
    topic="tasks",
    event="task:created",
    data={"id": task_id, "title": "New Task"}
)

# Resource Updated
await app.state.event_broker.publish(
    topic=f"task:{task_id}",
    event="task:updated",
    data={"id": task_id, "status": "completed"}
)

# Resource Deleted
await app.state.event_broker.publish(
    topic=f"task:{task_id}",
    event="task:deleted",
    data={"id": task_id}
)

# Broadcast to All
await app.state.event_broker.publish(
    topic="global",
    event="system:maintenance",
    data={"message": "Scheduled maintenance in 5 minutes"}
)

Authorization

Protect SSE subscriptions by providing an authorization callback that validates whether a user can subscribe to specific topics. Pass this to the SSEApp constructor:

Python
from fastapi import Request
from fastapi_sse_events import SSEApp

async def authorize_subscription(request: Request, topic: str) -> bool:
    """
    Authorization callback - return True to allow, False to deny.
    Called for each topic subscription attempt.
    """
    # Get current user (implement based on your auth method)
    user = await get_current_user(request)
    if not user:
        return False
    
    # Allow user to subscribe to their own topic
    if topic.startswith("user:"):
        user_id = topic.split(":")[1]
        return str(user.id) == user_id
    
    # Check workspace membership
    if topic.startswith("workspace:"):
        workspace_id = topic.split(":")[1]
        return await user_has_workspace_access(user.id, workspace_id)
    
    # Broadcast topics - anyone can subscribe
    if topic in ["announcements", "system"]:
        return True
    
    # Deny by default
    return False

# Create app with authorization
app = SSEApp(
    redis_url="redis://localhost:6379/0",
    authorize=authorize_subscription  # Pass authorization callback
)

Security Best Practices

  • ✅ Always validate user authentication before checking permissions
  • ✅ Use granular topic patterns (e.g., "workspace:123") that include resource IDs
  • ✅ Log authorization failures for security monitoring
  • ✅ Consider rate limiting SSE connections per user
  • ❌ Don't allow wildcard subscriptions without explicit auth
  • ❌ Never trust client-supplied topic names

Topic Patterns

Topics are simple string identifiers that route events to subscribers. Use consistent naming conventions across your application.

Simple Topic Strings

Just use strings directly - no special helpers required:

Python
# Simple topics
await app.state.event_broker.publish(
    topic="comments",  # Broadcast to all subscribers
    event="comment:created",
    data={"id": 1, "text": "Hello"}
)

# With resource IDs for granularity
await app.state.event_broker.publish(
    topic="comment:123",  # Specific comment
    event="updated",
    data={"text": "Edited"}
)

# Workspace-specific
await app.state.event_broker.publish(
    topic="workspace:acme-corp",
    event="task:created",
    data={"id": 456, "title": "New Task"}
)

Topic Naming Conventions

Simple Broadcast

Topic for broadcasting to everyone: comments, tasks, announcements

Resource-Specific

Include resource IDs for security: comment:123, task:456, user:john

Namespace Hierarchy

Use colons to group related topics: workspace:wsp1, org:org1:team:dev

System Topics

Reserved topics for app-wide events: broadcast, global, system

Using TopicBuilder (Optional)

For consistency, you can optionally use the TopicBuilder helper:

Python
from fastapi_sse_events import TopicBuilder

# Built-in patterns (all optional)
TopicBuilder.comment("c123")        # → "comment:c123"
TopicBuilder.task("t456")           # → "task:t456"
TopicBuilder.workspace("ws789")     # → "workspace:ws789"
TopicBuilder.user("john")           # → "user:john"

# Custom topics
TopicBuilder.custom("invoice", "inv001")  # → "invoice:inv001"

Client Subscriptions

JavaScript
// Subscribe to broadcast topic
new EventSource('/events?topic=comments')

// Subscribe to specific resource
new EventSource('/events?topic=comment:123')

// Subscribe to workspace
new EventSource('/events?topic=workspace:acme-corp')

// Subscribe to multiple topics (comma-separated)
new EventSource('/events?topic=comment:123,workspace:acme-corp')

Configuration

The SSEApp class provides simple one-line configuration with sensible defaults:

Python
from fastapi_sse_events import SSEApp
import os

# Minimal setup
app = SSEApp(redis_url="redis://localhost:6379/0")

# Full configuration
app = SSEApp(
    title="My API",
    redis_url=os.getenv("REDIS_URL", "redis://localhost:6379/0"),
    heartbeat_seconds=30,              # SSE keepalive interval (5-60s)
    authorize=authorize_subscription,   # Authorization callback (optional)
)

Configuration Parameters

Parameter Type Default Description
title str "FastAPI" App title (optional)
redis_url str ⚠️ Required Redis connection URL
heartbeat_seconds int 30 SSE keepalive interval (5-60)
authorize callable None Authorization callback function

Environment Variables

Load configuration from environment variables:

.env
# Development
REDIS_URL=redis://localhost:6379/0
SSE_HEARTBEAT_SECONDS=30

# Production (with authentication)
REDIS_URL=redis://:password@redis-prod.example.com:6379/0
SSE_HEARTBEAT_SECONDS=60
Python
from dotenv import load_dotenv
import os
from fastapi_sse_events import SSEApp

load_dotenv()  # Load .env file

app = SSEApp(
    redis_url=os.getenv("REDIS_URL"),
    heartbeat_seconds=int(os.getenv("SSE_HEARTBEAT_SECONDS", "30")),
)

JavaScript Client

Use the native EventSource API in the browser for simple, efficient real-time updates:

JavaScript
// Create SSE connection
const eventSource = new EventSource('/events?topic=comments');

// Listen for connection events
eventSource.addEventListener('open', () => {
  console.log('✅ Connected to SSE');
  updateStatus('Connected');
});

eventSource.addEventListener('error', (error) => {
  console.error('❌ SSE Error:', error);
  updateStatus('Disconnected');
  // EventSource automatically reconnects
});

// Listen for specific events
eventSource.addEventListener('comment:created', (event) => {
  const data = JSON.parse(event.data);
  console.log('New comment:', data);
  
  // Fetch and display the new comment
  fetch(`/comments/${data.id}`)
    .then(r => r.json())
    .then(comment => addCommentToDOM(comment));
});

eventSource.addEventListener('comment:updated', (event) => {
  const data = JSON.parse(event.data);
  updateCommentInDOM(data.id, data);
});

eventSource.addEventListener('heartbeat', () => {
  console.log('💓 Heartbeat');
});

// Subscribe to multiple topics
const topics = ['comments', 'notifications'];
const eventSource2 = new EventSource(
  `/events?topic=${topics.join(',')}`
);

// Cleanup
function closeConnection() {
  eventSource.close();
  eventSource2.close();
}
window.addEventListener('beforeunload', closeConnection);

Automatic Reconnection

EventSource automatically reconnects if the connection drops. You don't need to implement retry logic - just handle the open event to know when reconnection succeeds.

React Client

Create a reusable React hook for managing SSE connections:

useSSE.tsx
import { useEffect, useState, useCallback } from 'react';

interface UseSSEOptions {
  topic: string;
  onEvent?: (eventType: string, data: any) => void;
}

export function useSSE({ topic, onEvent }: UseSSEOptions) {
  const [connected, setConnected] = useState(false);
  const [error, setError] = useState(null);

  useEffect(() => {
    const eventSource = new EventSource(`/events?topic=${topic}`);

    eventSource.addEventListener('open', () => {
      setConnected(true);
      setError(null);
    });

    eventSource.addEventListener('error', () => {
      setConnected(false);
      setError(new Error('SSE connection error'));
    });

    // Forward all events to callback
    eventSource.onmessage = (event) => {
      const data = JSON.parse(event.data);
      onEvent?.(event.type, data);
    };

    return () => eventSource.close();
  }, [topic, onEvent]);

  return { connected, error };
}

// Usage example
export function CommentThread({ threadId }: { threadId: number }) {
  const [comments, setComments] = useState([]);

  const handleEvent = useCallback((eventType: string, data: any) => {
    if (['comment:created', 'comment:updated'].includes(eventType)) {
      // Refresh comments from API
      fetchComments();
    }
  }, []);

  const { connected } = useSSE(
    { topic: `comment_thread:${threadId}`, onEvent: handleEvent }
  );

  useEffect(() => {
    fetchComments();
  }, [threadId]);

  const fetchComments = async () => {
    const res = await fetch(`/comments?thread_id=${threadId}`);
    setComments(await res.json());
  };

  return (
    
{connected ? '🟢 Live' : '🔴 Offline'} {comments.map(c => (
{c.content}
))}
); }

Python Client

Connect to SSE from Python using httpx or sseclient-py library:

Python (httpx)
import httpx
import json

async def listen_to_sse():
    """Listen to SSE events using httpx."""
    async with httpx.AsyncClient() as client:
        async with client.stream(
            "GET",
            "http://localhost:8000/events?topic=comments",
            headers={"Accept": "text/event-stream"}
        ) as response:
            async for line in response.aiter_lines():
                if line.startswith("event:"):
                    event_type = line[7:].strip()
                elif line.startswith("data:"):
                    data_str = line[5:].strip()
                    try:
                        data = json.loads(data_str)
                        print(f"Event {event_type}: {data}")
                    except json.JSONDecodeError:
                        pass
Python (sseclient-py)
from sseclient import SSEClient
import json

def listen_to_events():
    """Simple SSE listening using sseclient-py."""
    url = "http://localhost:8000/events?topic=comments"
    
    for event in SSEClient(url):
        if event.event != "heartbeat":
            data = json.loads(event.data)
            print(f"Event: {event.event}")
            print(f"Data: {data}")

# Run it
listen_to_events()

Production Setup

Redis Configuration

For production, use a managed Redis service or configure Redis with persistence:

redis.conf
# Memory management
maxmemory 2gb
maxmemory-policy allkeys-lru

# Persistence (optional for pub/sub)
save ""
appendonly no

# Network
bind 0.0.0.0
protected-mode yes
requirepass your-secure-password

# Pub/Sub specific
timeout 0
tcp-keepalive 300

Environment Variables

.env
# Redis
SSE_REDIS_URL=redis://:password@redis-host:6379/0

# SSE Config
SSE_HEARTBEAT_SECONDS=30
SSE_PATH=/sse
SSE_TOPIC_PREFIX=production

# FastAPI
APP_ENV=production
LOG_LEVEL=info

Uvicorn Production Settings

bash
uvicorn app:app \
  --host 0.0.0.0 \
  --port 8000 \
  --workers 4 \
  --loop uvloop \
  --log-level info \
  --proxy-headers \
  --forwarded-allow-ips='*'

Nginx Configuration

Configure Nginx to proxy SSE connections with proper timeouts:

nginx.conf
upstream fastapi_backend {
    server 127.0.0.1:8000;
    server 127.0.0.1:8001;
    server 127.0.0.1:8002;
    keepalive 32;
}

server {
    listen 80;
    server_name api.example.com;

    # Standard API endpoints
    location / {
        proxy_pass http://fastapi_backend;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    }

    # SSE endpoint - special configuration
    location /sse {
        proxy_pass http://fastapi_backend;
        proxy_http_version 1.1;
        
        # SSE specific headers
        proxy_set_header Connection '';
        proxy_set_header Cache-Control 'no-cache';
        
        # Disable buffering
        proxy_buffering off;
        proxy_cache off;
        
        # Long timeouts for SSE
        proxy_read_timeout 86400s;
        proxy_send_timeout 86400s;
        
        # Forward headers
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        
        # Chunked transfer encoding
        chunked_transfer_encoding on;
    }
}

Critical SSE Settings

  • proxy_buffering off - Prevents Nginx from buffering SSE responses
  • proxy_read_timeout 86400s - Long timeout for persistent connections
  • proxy_http_version 1.1 - Required for persistent connections
  • chunked_transfer_encoding on - Enables streaming

Docker Deployment

docker-compose.yml
version: '3.8'

services:
  redis:
    image: redis:7-alpine
    command: redis-server --requirepass ${REDIS_PASSWORD}
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    restart: unless-stopped

  api:
    build: .
    command: uvicorn app:app --host 0.0.0.0 --port 8000
    environment:
      SSE_REDIS_URL: redis://:${REDIS_PASSWORD}@redis:6379/0
      SSE_HEARTBEAT_SECONDS: 30
    ports:
      - "8000:8000"
    depends_on:
      - redis
    restart: unless-stopped
    deploy:
      replicas: 3
      
volumes:
  redis_data:
Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Run
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]

Performance Tips

Horizontal Scaling

Run multiple FastAPI instances behind a load balancer. Redis Pub/Sub ensures events reach all instances.

Granular Topics

Use specific topics (e.g., task:123) instead of broad wildcards to reduce unnecessary events.

Lightweight Payloads

Send IDs and types via SSE, not full objects. Let clients fetch complete data via REST.

Tune Heartbeat

Balance connection keepalive (30-60s) with bandwidth. Higher intervals reduce traffic.

Redis Memory

Pub/Sub uses minimal memory. Configure maxmemory-policy appropriately if using Redis for caching too.

Monitor Connections

Track active SSE connections and Redis pub/sub channels. Set alerts for anomalies.

Connection Limits

Component Limit Notes
Redis Pub/Sub Unlimited channels Memory is per connection, not per channel
FastAPI Instance ~10K connections Depends on system resources
Nginx Configurable Set worker_connections
Browser 6 per domain HTTP/1.1 limitation

API Reference

SSEApp (Recommended)

FastAPI subclass with built-in SSE support and automatic setup:

Python
class SSEApp(FastAPI):
    """FastAPI with SSE pre-configured."""
    
    def __init__(
        self,
        title: str = "API",
        redis_url: str = "redis://localhost:6379/0",
        heartbeat_seconds: int = 30,
        authorize: Optional[AuthorizeFn] = None,
        **kwargs
    )
Parameter Type Default Description
title str "API" Application title
redis_url str redis://localhost:6379/0 Redis connection URL
heartbeat_seconds int 30 SSE keepalive interval (5-60)
authorize AuthorizeFn | None None Optional authorization callback

@publish_event()

Automatically publish endpoint response as SSE event:

Python
@publish_event(
    topic: str,
    event: str
)
Parameter Type Description
topic str Topic to publish to
event str Event type identifier

@subscribe_to_events()

Convert endpoint to SSE stream. Use query parameter: ?topic=...

Python
@subscribe_to_events()
async def events(request: Request):
    pass

EventBroker.publish()

Manually publish event (use decorators when possible):

Python
async def publish(
    self,
    topic: str,
    event: str,
    data: dict | EventData
) -> None

TopicBuilder (Optional)

Helper for consistent topic naming (not required - plain strings work):

Python
class TopicBuilder:
    @staticmethod
    def comment(comment_id: str) -> str       # "comment:id"
    
    @staticmethod
    def task(task_id: str) -> str             # "task:id"
    
    @staticmethod
    def workspace(workspace_id: str) -> str   # "workspace:id"
    
    @staticmethod
    def user(user_id: str) -> str             # "user:id"
    
    @staticmethod
    def custom(resource: str, id: str) -> str # "resource:id"

Type Definitions

Python
# Authorization callback type
AuthorizeFn = Callable[[Request, str], Awaitable[bool]]

# Example authorization
async def authorize(request: Request, topic: str) -> bool:
    """Return True to allow subscription, False to deny."""
    user = await get_current_user(request)
    return user is not None

# Event data type
EventData = dict[str, Any]  # Any JSON-serializable dict

Advanced: Manual Setup

For complex use cases, manually configure the broker:

Python
from fastapi import FastAPI
from fastapi_sse_events import mount_sse, RealtimeConfig

app = FastAPI()

config = RealtimeConfig(
    redis_url="redis://localhost:6379/0",
    heartbeat_seconds=30,
)

broker = mount_sse(app, config, authorize=auth_fn)

# Now use: app.state.event_broker

Examples

Quick Start Example (Recommended)

The easiest way to get started is with the Quick Start example at examples/quickstart/. This demonstrates the recommended simplified API approach with minimal boilerplate.

What's Included

  • app.py - Simplified API using SSEApp + decorators
  • Task management API with real-time updates
  • Embedded browser client (built into app)
  • ~60 lines of code (75% less boilerplate!)
bash
cd examples/quickstart
pip install -r requirements.txt  # Or: pip install fastapi uvicorn fastapi-sse-events redis
redis-server                      # Start Redis (or: docker run -d -p 6379:6379 redis:7-alpine)
uvicorn app:app --reload         # Start FastAPI

Open http://localhost:8000 in your browser to see real-time task updates!


CRM Comments Example (Advanced)

For a more complex example with authorization and advanced patterns, see examples/crm_comments/. This demonstrates a multi-user comment system with role-based access control.

Available Versions

  • app.py - Traditional approach with manual broker
  • app_simple.py - Simplified approach (basic decorators)
  • app_simple_new.py - Simplified approach (current decorators)
  • client.html - Beautiful web UI with EventSource
bash
cd examples/crm_comments
./start.sh

# Or manually:
pip install -r requirements.txt
uvicorn app:app --reload

Open http://localhost:8000/client.html in multiple browser windows to see real-time updates!


Production Scale Example

For deployment at scale, see examples/production_scale/. This includes Docker, Nginx reverse proxy, load testing, and performance benchmarks.

Production Features

  • Multi-container Docker setup (FastAPI + Redis)
  • Nginx reverse proxy configuration
  • Load testing script (supporting 100K+ concurrent connections)
  • Prometheus monitoring setup
  • Performance tuning guidelines
bash
cd examples/production_scale
./start.sh          # Starts all services via docker-compose

# Run load tests
pip install -r load_test_requirements.txt
python load_test.py

See the README.md in each example directory for detailed information.

Troubleshooting

Connection Issues

SSE not connecting

  • Check Redis is running: redis-cli ping
  • Verify SSE path in client matches server config
  • Check browser console for CORS errors
  • Ensure no firewall blocking port 6379

Connection drops frequently

  • Reduce heartbeat_seconds (try 15-20s)
  • Check proxy/load balancer timeout settings
  • Verify Nginx buffering is disabled for SSE path
  • Monitor network stability

Event Issues

Events not received

  • Verify client subscribed to correct topic
  • Check server is publishing to matching topic
  • Confirm topic_prefix matches on both sides
  • Test Redis pub/sub: redis-cli subscribe topic:123

Events only work on one server

  • Verify all FastAPI instances use same Redis
  • Check Redis connection string includes correct host
  • Ensure no firewall between servers and Redis

Performance Issues

High memory usage

  • Check number of active SSE connections
  • Verify event payloads are small
  • Monitor Redis memory with redis-cli info memory
  • Consider connection limits per user

Debug Mode

JavaScript
// Enable verbose logging
const eventSource = new EventSource('/sse?topics=debug');

eventSource.onmessage = (event) => {
  console.log('Raw SSE:', event);
};

// Monitor all event types
['open', 'error', 'message'].forEach(type => {
  eventSource.addEventListener(type, (e) => {
    console.log(`[${type}]`, e);
  });
});