How to use FastAPI with Materialize for real-time data processing

Introduction

Diagram

Running the demo

git clone https://github.com/bobbyiliev/materialize-tutorials.git
cd mz-fastapi-demo
docker-compose pull
docker-compose build
docker-compose up

Create the Materialize sources and views

CREATE SOURCE sensors
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'sensors'
FORMAT BYTES;
CREATE VIEW sensors_data AS
SELECT
*
FROM (
SELECT
(data->>'id')::int AS id,
(data->>'pm25')::double AS pm25,
(data->>'pm10')::double AS pm10,
(data->>'geo_lat')::double AS geo_lat,
(data->>'geo_lon')::double AS geo_lon,
(data->>'timestamp')::double AS timestamp
FROM (
SELECT CAST(data AS jsonb) AS data
FROM (
SELECT convert_from(data, 'utf8') AS data
FROM sensors
)
)
);
CREATE MATERIALIZED VIEW sensors_view AS
SELECT
*
FROM sensors_data
WHERE
mz_logical_timestamp() < (timestamp*1000 + 100000)::numeric;
CREATE MATERIALIZED VIEW sensors_view_1s AS
SELECT
*
FROM sensors_data
WHERE
mz_logical_timestamp() < (timestamp*1000 + 6000)::numeric;

FastAPI Demo

Materialize Cloud

CREATE SOURCE sensors
FROM KAFKA BROKER 'your_redpanda_instance:9092' TOPIC 'sensors'
FORMAT BYTES;

Stop the demo

docker-compose down -v
docker-compose stop datagen

Helpful Links

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Bobby Iliev

I am a professional System Administrator with a demonstrated history of working in the internet industry. I am a Linux lover