How to use server-sent events (SSE) with FastAPI?

Introduction

Server-sent events (SSE) is a way to send data to the browser without reloading the page. This allows you to use streaming data and build real-time applications that can be used in a variety of scenarios.

Prerequisites

In order to follow this tutorial, you will need to have a Python and pip installed on your machine:

Installing FastAPI

To install FastAPI and all of its dependencies, you can use the following command:

pip install "fastapi[all]"

Installing sse-starlette

Once you’ve installed FastAPI, you can install the sse-starlette extension to add support for SSE to your FastAPI project:

pip install sse-starlette
pip install asyncio

Creating a simple hello world endpoint

Once you’ve installed FastAPI, you can create a simple hello world endpoint to get started.

import asyncio
import uvicorn
from fastapi import FastAPI, Request
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}

Running the uvicorn server

To run the server, you can use the following command:

uvicorn main:app --reload
{
"message": "Hello World"
}

Adding SSE support to your FastAPI project

Next, let’s extend the main.py file to add SSE support. To do so you can add SSE support to your project by adding the following line to your main.py file:

from sse_starlette.sse import EventSourceResponse
STREAM_DELAY = 1  # second
RETRY_TIMEOUT = 15000 # milisecond
@app.get('/stream')
async def message_stream(request: Request):
def new_messages():
# Add logic here to check for new messages
yield 'Hello World'
async def event_generator():
while True:
# If client closes connection, stop sending events
if await request.is_disconnected():
break
# Checks for new messages and return them to client if any
if new_messages():
yield {
"event": "new_message",
"id": "message_id",
"retry": RETRY_TIMEOUT,
"data": "message_content"
}
await asyncio.sleep(STREAM_DELAY) return EventSourceResponse(event_generator())

FastAPI with streaming data and Materialize

To learn more about streaming data, you can check out this tutorial here on how to use FastAPI with Materialize:

What is Materialize?

Materialize is a streaming database that takes data coming from different sources like Kafka, PostgreSQL, S3 buckets, and more and allows users to write views that aggregate/materialize that data and let you query those views using pure SQL with very low latency.

Streaming data with Materialize

For the demo project, we are using the [TAIL](https://materialize.com/docs/sql/tail/#conceptual-framework) statement. TAIL streams updates from a source, table, or view as they occur which allows you to query the data as it is being updated and is a perfect fit for the SSE example.

@app.get('/stream')
async def message_stream(request: Request):
def new_messages():
# Check if data in table
results = engine.execute('SELECT count(*) FROM sensors_view_1s')
if results.fetchone()[0] == 0:
return None
else:
return True
async def event_generator():
while True:
# If client was closed the connection
if await request.is_disconnected():
break
# Checks for new messages and return them to client if any
if new_messages():
connection = engine.raw_connection()
with connection.cursor() as cur:
cur.execute("DECLARE c CURSOR FOR TAIL sensors_view_1s")
cur.execute("FETCH ALL c")
for row in cur:
yield row
await asyncio.sleep(MESSAGE_STREAM_DELAY) return EventSourceResponse(event_generator())

Conclusion

To learn more about FastAPI, check out the FastAPI documentation.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Bobby Iliev

I am a professional System Administrator with a demonstrated history of working in the internet industry. I am a Linux lover