How to use FastAPI with Materialize for real-time data processing
This is a self-contained demo of FastAPI and Materialize.
This demo project contains the following components:
- FastAPI: A fast, modern, and feature-rich framework for building APIs with Python.
- Redpanda: Kafka® compatible event streaming platform written in C++.
- Materialize: A streaming database for real-time analytics.
- A mock service written in BASH for producing records to a Redpanda topic. The mock service simulates data of air quality readings of IoT devices.
Running the demo
Clone the repository:
Access the FastAPI demo project directory:
Pull all Docker images:
Build the project:
Finally, run all containers:
Create the Materialize sources and views
Once the demo is running, you can create the Materialize sources and views.
Let’s start by creating a Redpanda/Kafka source:
CREATE SOURCE sensors
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'sensors'
Then create a non-materialized view which you can think of essentially an alias that we will use to create our materialized views. The non-materialized views do not store the results of the query:
CREATE VIEW sensors_data AS
(data->>'id')::int AS id,
(data->>'pm25')::double AS pm25,
(data->>'pm10')::double AS pm10,
(data->>'geo_lat')::double AS geo_lat,
(data->>'geo_lon')::double AS geo_lon,
(data->>'timestamp')::double AS timestamp
SELECT CAST(data AS jsonb) AS data
SELECT convert_from(data, 'utf8') AS data
After that, create a materialized view that will hold all records in the last 10 minutes:
CREATE MATERIALIZED VIEW sensors_view AS
mz_logical_timestamp() < (timestamp*1000 + 100000)::numeric;
Note that we are using the
mz_logical_timestamp()function rather than the
now()function. This is because in Materialize
now()doesn’t represent the system time, as it does in most systems; it represents the time with timezone when the query was executed. It cannot be used when creating views. For more information, see the documentation here.
Next, let’s create materialized view that will only include data from the last second so we can see the dataflow and use it for our Server-Sent Events (SSE) demo later on:
CREATE MATERIALIZED VIEW sensors_view_1s AS
mz_logical_timestamp() < (timestamp*1000 + 6000)::numeric;
With that our materialized views are ready and we can visit the FastAPI demo project in the browser!
Finally, visit the FastAPI demo app via your browser:
- Endpoint for all records in the last 10 minutes:
- SSE Endpoint streaming the latest records as they are generated using
If you want to run the demo on the cloud, you would need the following:
- A publicly accessible Redpanda/Kafka instance so that you can connect to it.
- A Materialize Cloud account. You can sign up for a free Materialize Cloud account to get started with Materialize Cloud.
If you already have that setup, you would need to make the following changes to the demo project:
- When creating the source, change the
redpanda:9092to your Redpanda/Kafka instance:
CREATE SOURCE sensors
FROM KAFKA BROKER 'your_redpanda_instance:9092' TOPIC 'sensors'
- Change the
DATABASE_URLenvironment variable to your Materialize Cloud database URL and uncomment the certificate-specific environment variables in the
docker-compose.ymlfile. in the
- Download the Materialize instance certificate files from your Materialize Cloud dashboard.
Stop the demo
To stop the demo, run the following command:
docker-compose down -v
You can also stop only the data generation container:
docker-compose stop datagen