How to use FastAPI with Materialize for real-time data processing

Introduction

This is a self-contained demo of FastAPI and .

This demo project contains the following components:

Diagram

Running the demo

Clone the repository:

git clone 

Access the FastAPI demo project directory:

cd mz-fastapi-demo

Pull all Docker images:

docker-compose pull

Build the project:

docker-compose build

Finally, run all containers:

docker-compose up

Create the Materialize sources and views

Once the demo is running, you can create the Materialize sources and views.

Let’s start by creating a Redpanda/Kafka :

CREATE SOURCE sensors
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'sensors'
FORMAT BYTES;

Then create a which you can think of essentially an alias that we will use to create our materialized views. The non-materialized views do not store the results of the query:

CREATE VIEW sensors_data AS
SELECT
*
FROM (
SELECT
(data->>'id')::int AS id,
(data->>'pm25')::double AS pm25,
(data->>'pm10')::double AS pm10,
(data->>'geo_lat')::double AS geo_lat,
(data->>'geo_lon')::double AS geo_lon,
(data->>'timestamp')::double AS timestamp
FROM (
SELECT CAST(data AS jsonb) AS data
FROM (
SELECT convert_from(data, 'utf8') AS data
FROM sensors
)
)
);

After that, create a materialized view that will hold all records in the last 10 minutes:

CREATE MATERIALIZED VIEW sensors_view AS
SELECT
*
FROM sensors_data
WHERE
mz_logical_timestamp() < (timestamp*1000 + 100000)::numeric;

Note that we are using the mz_logical_timestamp() function rather than the now() function. This is because in Materialize now() doesn’t represent the system time, as it does in most systems; it represents the time with timezone when the query was executed. It cannot be used when creating views. For more information, see the documentation .

Next, let’s create materialized view that will only include data from the last second so we can see the dataflow and use it for our Server-Sent Events (SSE) demo later on:

CREATE MATERIALIZED VIEW sensors_view_1s AS
SELECT
*
FROM sensors_data
WHERE
mz_logical_timestamp() < (timestamp*1000 + 6000)::numeric;

With that our materialized views are ready and we can visit the FastAPI demo project in the browser!

FastAPI Demo

Finally, visit the FastAPI demo app via your browser:

  • Endpoint for all records in the last 10 minutes:

  • SSE Endpoint streaming the latest records as they are generated using :

Example response:

Materialize Cloud

If you want to run the demo on the cloud, you would need the following:

  • A publicly accessible Redpanda/Kafka instance so that you can connect to it.
  • A Materialize Cloud account. You can sign up for a free account to get started with Materialize Cloud.

If you already have that setup, you would need to make the following changes to the demo project:

  • When creating the source, change the redpanda:9092 to your Redpanda/Kafka instance:
CREATE SOURCE sensors
FROM KAFKA BROKER 'your_redpanda_instance:9092' TOPIC 'sensors'
FORMAT BYTES;
  • Change the DATABASE_URL environment variable to your Materialize Cloud database URL and uncomment the certificate-specific environment variables in the docker-compose.yml file. in the docker-compose.yml file.
  • Download the Materialize instance certificate files from your Materialize Cloud dashboard.

Stop the demo

To stop the demo, run the following command:

docker-compose down -v

You can also stop only the data generation container:

docker-compose stop datagen

Helpful Links

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Bobby Iliev

I am a professional System Administrator with a demonstrated history of working in the internet industry. I am a Linux lover