Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] initial stab at streaming analysis of telemetry #506

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions components/paddock/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ openai = "*"
pydub = "*"
django-bootstrap5 = "*"
django-bootstrap-icons = "*"
pathway = "*"

[dev-packages]
black = "*"
Expand Down
911 changes: 730 additions & 181 deletions components/paddock/Pipfile.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from collections import deque

from telemetry.pitcrew.history import History
from telemetry.pitcrew.logging import LoggingMixin
from components.paddock.telemetry.pitcrew.logging_mixin import LoggingMixin

from .response import Response, ResponseInstant
from .session import Session
Expand Down
2 changes: 1 addition & 1 deletion components/paddock/telemetry/pitcrew/coach.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from telemetry.models import Coach as DbCoach
from telemetry.models import TrackGuide
from telemetry.pitcrew.logging import LoggingMixin
from telemetry.pitcrew.logging_mixin import LoggingMixin

from .history import History
from .message import (
Expand Down
2 changes: 1 addition & 1 deletion components/paddock/telemetry/pitcrew/coach_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import django.utils.timezone

from telemetry.models import Coach, SessionType
from telemetry.pitcrew.logging import LoggingMixin
from components.paddock.telemetry.pitcrew.logging_mixin import LoggingMixin

from .application.debug_application import DebugApplication
from .application.response import ResponseInstant
Expand Down
2 changes: 1 addition & 1 deletion components/paddock/telemetry/pitcrew/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from telemetry.analyzer import Analyzer
from telemetry.fast_lap_analyzer import FastLapAnalyzer
from telemetry.models import Coach, Driver, FastLap, Game
from telemetry.pitcrew.logging import LoggingMixin
from telemetry.pitcrew.logging_mixin import LoggingMixin
from telemetry.pitcrew.segment import Segment
from telemetry.racing_stats import RacingStats

Expand Down
2 changes: 2 additions & 0 deletions components/paddock/telemetry/pitcrew/output.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
sum,time,diff
136,1702980135662,1
32 changes: 32 additions & 0 deletions components/paddock/telemetry/pitcrew/output_stream.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
sum,time,diff
1,1702979416630,1
1,1702979418130,-1
3,1702979418130,1
3,1702979418204,-1
6,1702979418204,1
6,1702979419626,-1
10,1702979419626,1
10,1702979421130,-1
15,1702979421130,1
15,1702979421228,-1
21,1702979421228,1
21,1702979422626,-1
28,1702979422626,1
28,1702979424130,-1
36,1702979424130,1
36,1702979424258,-1
45,1702979424258,1
45,1702979425630,-1
55,1702979425630,1
55,1702979427128,-1
66,1702979427128,1
66,1702979427286,-1
78,1702979427286,1
78,1702979428630,-1
91,1702979428630,1
91,1702979430130,-1
105,1702979430130,1
105,1702979430312,-1
120,1702979430312,1
120,1702979431630,-1
136,1702979431630,1
2 changes: 1 addition & 1 deletion components/paddock/telemetry/pitcrew/session.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import django.utils.timezone

from telemetry.models import Game
from telemetry.pitcrew.logging import LoggingMixin
from telemetry.pitcrew.logging_mixin import LoggingMixin


class Lap:
Expand Down
39 changes: 39 additions & 0 deletions components/paddock/telemetry/pitcrew/session_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import pathway as pw
import logging

class MqttClient():

def __init__(self, subject) -> None:
self._subject = subject

def notify(self, topic, payload) -> None:
self._subject.next_json(payload)

def run(self) -> None:
# start the mqtt client and subscribe to the topic
logging.info("Starting client")
self._subject.run()

class TelemetrySubject(pw.io.python.ConnectorSubject):
def __init__(self) -> None:
super().__init__()

def set_client(self, client) -> None:
self._mqtt_client = client

def notify(self, topic, payload) -> None:
self.next_json(payload)

def run(self) -> None:
logging.info("TelemetrySubject running")
self._mqtt_client.run()

def on_stop(self) -> None:
pass
# self._mqtt_client.disconnect()


class InputSchema(pw.Schema):
key: int = pw.column_definition(primary_key=True)
text: str

14 changes: 14 additions & 0 deletions components/paddock/telemetry/pitcrew/stream_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import pathway as pw

class InputSchema(pw.Schema):
value: int


t = pw.io.csv.read(
'./sum_input_data/',
schema=InputSchema,
mode="streaming"
)
t = t.reduce(sum=pw.reducers.sum(t.value))
pw.io.csv.write(t, "output_stream.csv")
pw.run()
195 changes: 195 additions & 0 deletions components/paddock/telemetry/tests/test_session_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
from pprint import pprint

import django.utils.timezone
import logging
from django.db import IntegrityError
from django.test import TestCase

from telemetry.models import Car, Driver, Game
from telemetry.models import Session as SessionModel
from telemetry.models import SessionType, Track
from telemetry.pitcrew.firehose import Firehose
from telemetry.pitcrew.session import Lap, Session
from telemetry.pitcrew.session_stream import MqttClient, TelemetrySubject, InputSchema
from telemetry.pitcrew.mqtt import Mqtt

from .utils import get_session_df

import pathway as pw

class TestSessionStreaming(TestCase):
def _test_session_firehose(self, session_id, measurement="fast_laps", bucket="fast_laps"):
session_df = get_session_df(session_id, measurement=measurement, bucket=bucket)

# create a subject
subject = TelemetrySubject()
# client = MqttClient(subject)
client = Mqtt(subject, "crewchief/#")
subject.set_client(client)

table = pw.io.python.read(
subject,
schema=InputSchema
)
pw.io.csv.write(table, "output.csv")

# pw.debug.compute_and_print(table)

# client.notify("topic", {"key": 1, "text": "hello"})

pw.debug.compute_and_print(table)
# pw.run()

# mqtt_client = Mqtt(client, "crewchief/#")
# mqtt_client.run()
# logging.info("Starting MQTT client")
# pw.debug.compute_and_print(table)


# session = Session(666)
# for index, row in session_df.iterrows():
# # convert row to dict
# row = row.to_dict()
# now = row["_time"]
# firehose.notify(row["topic"], row, now)
# if index == 0:
# session = firehose.sessions[row["topic"]]

# pprint(session.laps)
# return session

# def _test_session(self, session_id, measurement="fast_laps", bucket="fast_laps"):
# session_df = get_session_df(session_id, measurement=measurement, bucket=bucket)

# # Create an instance of the Session class
# test_session = Session(666)

# for index, row in session_df.iterrows():
# # convert row to dict
# row = row.to_dict()
# now = row["_time"]
# test_session.signal(row, now)

# pprint(test_session.laps)
# return test_session

def _assert_laps(self, test_session, expected_laps):
# Iterate over the expected_laps dictionary and compare to the test_session.laps
for lap_number, expected_lap in expected_laps.items():
lap = test_session.laps[lap_number]

self.assertEqual(lap.number, expected_lap.number)
self.assertEqual(lap.time, expected_lap.time)
self.assertEqual(lap.valid, expected_lap.valid)
self.assertEqual(lap.finished, expected_lap.finished)
self.assertAlmostEqual(int(lap.length), int(expected_lap.length), places=0)

if lap.time != -1:
# the difference between lap.end and lap.start should be equal to lap.time
time_delta = lap.end - lap.start
self.assertAlmostEqual(time_delta.total_seconds(), expected_lap.time, places=0)

def test_iracing(self):
# measurement = "fast_laps"
# bucket = "fast_laps"
# start = "-10y"
session_id = "1681021274"

expected_laps = {
1: Lap(1, time=100.818, valid=True, length=4409),
2: Lap(2, time=-1, valid=False, length=4408),
3: Lap(3, time=101.0466, valid=True, length=4408),
4: Lap(4, time=100.823, valid=True, length=4408),
5: Lap(5, time=99.4026, valid=True, length=4410),
6: Lap(6, time=107.9166, valid=False, length=4410),
7: Lap(7, time=99.0674, valid=False, length=4410),
8: Lap(8, time=-1, valid=False, length=4409),
9: Lap(9, time=101.7361, valid=False, length=4409),
10: Lap(10, time=-1, valid=False, length=1890),
}
for lap_number, expected_lap in expected_laps.items():
expected_lap.finished = True
expected_laps[10].finished = False
session = self._test_session_firehose(session_id)
# self._assert_laps(session, expected_laps)

# def test_ac(self):
# # measurement = "fast_laps"
# # bucket = "fast_laps"
# # start = "-10y"
# session_id = "1673613558"

# # For this session the following laps are valid:
# expected_laps = {
# 2: Lap(2, time=63.79, valid=False, length=2338.449),
# 3: Lap(3, time=62.4660034, valid=True, length=2341.24487),
# 4: Lap(4, time=64.097, valid=False, length=2339.07666),
# 5: Lap(5, time=61.833, valid=True, length=2340.87329),
# 6: Lap(6, time=70.983, valid=False, length=2342.41357),
# 7: Lap(7, time=61.465, valid=True, length=2337.27051),
# 8: Lap(8, time=67.749, valid=True, length=2337.886),
# 9: Lap(9, time=61.703, valid=True, length=2338.9856),
# 10: Lap(10, time=73.402, valid=False, length=2341.15625),
# 11: Lap(11, time=85.821, valid=False, length=2340.723),
# # 12: Lap(12, time=169.38, valid=False, length=2338.358), # during this lap the game was paused
# # 12: Lap(12, time=174.319331, valid=False, length=2338.358),
# 13: Lap(13, time=64.435, valid=False, length=2337.65625),
# 14: Lap(14, time=62.7619972, valid=False, length=2341.5625),
# 15: Lap(15, time=77.178, valid=False, length=2341.84863),
# 16: Lap(16, time=79.708, valid=False, length=2342.23486),
# 17: Lap(17, time=-1.0, valid=True, length=33.61092),
# }
# for lap_number, expected_lap in expected_laps.items():
# expected_lap.finished = True
# expected_laps[17].finished = False

# session = self._test_session(session_id)
# self._assert_laps(session, expected_laps)

# def test_car_class(self):
# session_id = "1692140843"

# session = self._test_session_firehose(session_id)

# self.assertEqual(session.car_class, "ARC_CAMERO")

# def test_telemetry_invalid(self):
# # 2390.0: LapTimePrevious: None -> None
# # 2390.0: CurrentLapIsValid: None -> None
# # 2390.0: PreviousLapWasValid: None -> None
# # these values are always None in that session
# session_id = "1672395579"
# session = self._test_session(session_id)
# self.assertEqual(session.laps, {})

# def test_telemetry_missing_fields(self):
# # last lap CurrentLapIsValid is None
# session_id = "1680321341"
# session = self._test_session(session_id)

# expected_laps = {
# 11: Lap(11, time=-1, length=82, valid=True, finished=False),
# }
# self._assert_laps(session, expected_laps)

# def test_duplicate_lap(self):
# # create 2 laps with the same number
# game = Game.objects.create(name="test_game")
# track = Track.objects.create(name="test_track", game=game)
# car = Car.objects.create(name="test_car", game=game)
# driver = Driver.objects.create(name="test_driver")
# session_type = SessionType.objects.create(type="test_session_type")
# session = SessionModel.objects.create(session_id=666, driver=driver, game=game, session_type=session_type)

# now = django.utils.timezone.now()

# session.laps.create(number=1, car=car, track=track, start=now)
# try:
# session.laps.create(number=2, car=car, track=track, start=now)
# except IntegrityError as e:
# self.assertEqual(
# e.args[0],
# "UNIQUE constraint failed: telemetry_lap.session_id, telemetry_lap.start",
# )
# except Exception as e:
# self.fail(f"Unexpected exception: {e}")