The car processing dataflow reads car events from the cars
and licenses
topics and performs a series of operations:
- Identify speeding vehicles (filter-map).
- Divide the events based on location (split).
- Count the cars based on color (tumbling windows).
- Identify stolen license plates (ref state).
The following diagram is a visual representation generated by sdf
:
The dataset is hosted by the InfinyOn Synthetic data generator at demo-data.infinyon.com
.
-
The
licenses
are a list of static license plate numbers, so we'll only read them once.[ { "maker": "Toyota", "model": "Camry", "license": "0VTI452" }, { "maker": "Ford", "model": "Fusion", "license": "7YHM485" }, ... ]
-
The
cars
are a continuous stream of events, which we'll read using the http-source connector.{ "maker": "Fiat", "model": "124 Spider", "category": "Convertible", "color": "beige", "license": "7WPK493", "fuel": "gasoline", "location": "Saratoga", "mph": 43, "timestamp": "2024-03-14T16:12:10.493Z" }
The connector sends events to the licenses
topic.
Take a look at the dataflow.yaml to get an idea of what we're doing.
Make sure to [Install SDF Start a Cluster].
Use sdf
command line tool to run the dataflow:
sdf run --ui
- Use
--ui
to generate the graphical representation and run the Studio. - Using
sdf run
as opposed tosdf deploy
will run the dataflow with an ephemeral worker, which will be cleaned up on exit.
Note: It is important to run the dataflow first, as it creates the topics for you.
Read license plates from the data generator and add them to the licenses
topic:
curl -s https://demo-data.infinyon.com/api/car/licenses | fluvio produce licenses
To see the licenses, run fluvio consume licenses -Bd -O json
.
In a new terminal change directory to ./connectors
, download the connector binary, and start the connector:
cd ./connectors
cdk hub download infinyon/[email protected]
cdk deploy start --ipkg infinyon-http-source-0.4.3.ipkg -c car-connector.yaml
To see the events, run fluvio consume cars
.
For additional context, checkout connectors.
In the sdf
terminal, checkout the state maintained by the dataflow:
show state
We are intersted in the following:
Namespace Keys Type
save-license-plates/licence-plates/state 100 u32
count-by-color/count-by-color/state 5 u32
count-by-color/count-by-color/watermark 1 timestamp
License Plates state stores the license plates read from the licenses
topic. This is used to check for stolen plates later in the dataflow.
show state save-license-plates/license-plates/state
Key maker model
0FDR715 Buick Encore
0FJV738 Ford Explorer
0FQP572 Tesla Model S
0GFX265 Volvo V90
...
Let's say we want to analyze the license plates to look-up makers, models, colors, etc. For that, we can now use the SQL interface.
To enter the SQL interface, run:
>> sql
Let's start by showing the tables:
show tables
There are two tables, but in this example we are interested in the license_plates
table.
Let's run a simple select:
select * from license_plates
Now we can look-up how many models toyota has:
select * from license_plates where maker = 'Toyota'
Pretty cool... feel free to execute additional commands.
When you are done, exit the SQL interface with .exit
.
Similarly, the count-by-color
state stores the count of cars by color.
Let's show the raw state:
show state count-by-color/count-by-color/state
In this view, you are looking at the state in real-time, while the data is collecting.
Key Window Value
beige 2024-04-16T01:38:00Z:2024-04-16T01:38:30Z 6
black 2024-04-16T01:38:00Z:2024-04-16T01:38:30Z 4
green 2024-04-16T01:38:00Z:2024-04-16T01:38:30Z 3
grey 2024-04-16T01:38:00Z:2024-04-16T01:38:30Z 4
orange 2024-04-16T01:38:00Z:2024-04-16T01:38:30Z 2
perl 2024-04-16T01:38:00Z:2024-04-16T01:38:30Z 4
red 2024-04-16T01:38:00Z:2024-04-16T01:38:30Z 2
white 2024-04-16T01:38:00Z:2024-04-16T01:38:30Z 4
yellow 2024-04-16T01:38:00Z:2024-04-16T01:38:30Z 3
Once the state is flushed, it is kept in memory until the next state flush. That allows us to experiment with the last stable state. Let's open the SQL interface and run a query:
>> sql
select * from count_by_color
Now we can experiment with other SQL operations, such as sorting, filtering, etc:
select * from count_by_color order by count desc
When you are done, exit the SQL interface with .exit
.
Watermark tracks the cutoff timestamp for tumbling windows.
show state count-by-color/count-by-color/watermark
Key Value Window
default 2024-03-15T13:44:30.043Z *
This is used to determine when to flush window state for the tumbling window that counts the cars by color, flushing state to the 'car-colors' topic.
Each car event triggers an http call-out
to gather additional information about the car maker. Consume from the makers
topic to see the results:
fluvio consume makers -Bd -O json
The maker continent is returned:
{"continent":"Europe","country":"Germany","maker":"Audi"}
{"continent":"Europe","country":"United Kingdom","maker":"McLaren"}
{"continent":"Europe","country":"Italy","maker":"Fiat"}
...
Consume from speeding
to see all cars driving faster than 45 mph
:
fluvio consume speeding -O json
Consuming records from 'speeding'
{
"color": "beige",
"license": "6ZVM807",
"location": "Sunnyvale",
"maker": "Lincoln",
"model": "Navigator",
"mph": 63,
"timestamp": "2024-04-20T17:07:09.851Z"
}
...
Hit to exit.
The dataflow splits the traffic into 2 locations, sunnyvale
, and saratoga
. Consume from sunnuvale
to see the cars:
fluvio consume sunnyvale
Consuming records from 'sunnyvale'
{"car":"Tesla Model Y","color":"yellow","location":"Sunnyvale"}
{"car":"Smart Fortwo","color":"yellow","location":"Sunnyvale"}
{"car":"Honda Accord","color":"orange","location":"Sunnyvale"}
...
Hit to exit.
Consume from Saratoga
to see the cars:
fluvio consume saratoga
Consuming records from 'saratoga'
{"car":"Subaru BRZ","color":"green","location":"Saratoga"}
{"car":"Mazda Mazda3","color":"beige","location":"Saratoga"}
{"car":"Chevrolet Blazer","color":"beige","location":"Saratoga"}
...
Hit to exit.
Count all cars seen at the Sunnyvale location by colors. As mentioned above, the cars are counted and flushed to the car-colors
topic every 30 seconds.
fluvio consume car-colors -B -O json
Consuming records from 'car-colors'
[
{
"color": "perl",
"count": 3
},
{
"color": "green",
"count": 2
},
...
Hit to exit.
The dataflow also detects anomalies by check if a license plate is seen in 2 different cars. To demonstrate this functionality, we are ingesting a data set that send an annomaly every 10 records (~5 sec).
fluvio consume violations -B -O json
Consuming records from 'violations'
{
"license": "1KFR357",
"owner": "Maserati Quattroporte",
"violator": "Chevrolet Equinox"
}
{
"license": "0FQP572",
"owner": "Land Rover Discovery",
"violator": "Tesla Model S"
}
{
"license": "2RFQ375",
"owner": "Audi Q7",
"violator": "Infiniti Q50"
}
Hit to exit.
For simplicity, the code assumes that the first car is always the owner, and the 2nd is the violator.
Congratulations! You've successfully built and run a dataflow!
Note that your connector will continue to run in the background until you shut it down:
cdk deploy shutdown --name car-connector
Exit sdf
terminal and clean-up. The --force
flag removes the topics:
sdf clean --force
To run the above with a container, a Dockerfile is included. The docker container will install all the required files and packages. The included shell script starts a fluvio cluster as well as the connectors required for the car dataflow to work. You can alter the consume command in the run.sh to view different topics. The dataflow does take a long time to run.
docker build -t carproc .
docker run carproc
📝 Running pre-flight checks
🎉 All checks passed!
🎯 Successfully installed Local Fluvio cluster
📝 Running cluster status checks with profile local
... checking package
Log file: /workspace/connectors/license-connector.log
Connector runs with process id: 54
Started connector `license-connector`
... checking package
Log file: /workspace/connectors/car-connector.log
Connector runs with process id: 68
Started connector `car-connector`
Waiting for SDF to Start...
Welcome to SDF
Running SDF
{"color":"orange","license":"6WLM817","location":"Saratoga","maker":"Mercedes-Benz","model":"GT AMG","mph":63,"timestamp":"2024-08-14T19:22:49.722Z"}
{"color":"beige","license":"6FKL683","location":"Sunnyvale","maker":"Mini","model":"Cooper","mph":52,"timestamp":"2024-08-14T19:22:51.222Z"}
{"color":"perl","license":"1KFR357","location":"Saratoga","maker":"Chevrolet","model":"Equinox","mph":50,"timestamp":"2024-08-14T19:22:52.222Z"}
(...)
Note: docker has a tendency to use a lot of space for volumes. If you ever end up using a lot of disk space, run the following command to clean out past containers
docker system prune -a --volumes -f
Packed in the repo is a demo in TS with express + fluvio's npm package. The demo dumps all the topics associated with the dataflow. As topics get populated, the express server updates the demo.
To run the demo, the connectors and dataflow must be running.
- Navigate to the visualizer after starting the required clusters and dataflow.
cd visualizer
- Install all the packages.
npm install
- Start the express server with node
node app.js
- Open the web application via a browser. Should be localhost:3000