The Helsinki transit dataflow uses MQTT to ingests data from the Helsinki transit API and then processes the data to provide insights into the transit system. The purpose of the dataflow is to demonstrate how to use the SQL interface to analyze the data.
The following diagram is a visual representation generated by sdf
:
The dataflow has two services:
clean-events
removes events with incomplete datagenerate-vehicle-stats
generates average speed per vehicle every 5 seconds.
Take a look at the dataflow.yaml to get an idea of what we're doing.
Make sure to Install SDF and Start a Cluster.
Use sdf
command line tool to run the dataflow:
sdf run --ui
- Use
--ui
to generate the graphical representation and run the Studio. - Using
sdf run
as opposed tosdf deploy
will run the dataflow with an ephemeral worker, which will be cleaned up on exit.
In a new terminal change directory to ./connectors
, download the connector binary, and start connector:
cd ./connectors
fluvio hub smartmodule download infinyon/[email protected]
cdk hub download infinyon/infinyon/[email protected]
cdk deploy start --ipkg infinyon-mqtt-source-0.2.9.ipkg -c mqtt-helsinki.yaml
To see the events, run fluvio consume helsinki
.
For additional context, checkout connectors.
In the sdf
terminal, checkout the state maintained by the dataflow:
show state
We are interested in the following state:
show state generate-vehicle-stats/vehicle-stat/state
Let's say we want to analyze the top 10 vehicles by average speed. We can use the SQL interface to query the state.
To enter the SQL interface, run:
>> sql
Let's start by showing the tables:
show tables
There are two tables, but in this example we are interested in the license_plates
table.
Let's run a simple select:
select * from vehicle_stat
Now we can look-up top 5 vehicles by speed:
select * from vehicle_stat order by speed desc limit 5
When you are done, exit the SQL interface with .exit
.
In our dataflow example, we are producing the average speed for all vehicles to the average-speed
topic.
fluvio consume average-speed -O json
If we decide to change the dataflow to give us the top 5 vehicles by speed, all we need to do is update the SQL statement in the collect_vehicle_stats
function:
let vs = sql("select * from vehicle_stat order by speed desc limit 5")?;
Then run the service again using sdf run
. Checking the output again, you will notice that the output has changeed. It only returns the top 5 vehicles by speed:
fluvio consume average-speed -O json
🎉 Congratulations! You have just learned how to use the SQL interface to analyze the data and improve your dataflow based on your analysis.
Exit sdf
terminal and clean-up. The --force
flag removes the topics:
sdf clean --force
Stop the connector:
cdk deploy shutdown --name helsinki-mqtt