Skip to content

Commit

Permalink
Expose MessageState to Cython to Eliminate Data Copying
Browse files Browse the repository at this point in the history
  • Loading branch information
deanlee committed Oct 16, 2024
1 parent 2f0385d commit be80445
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 89 deletions.
8 changes: 4 additions & 4 deletions opendbc/can/common.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <map>
#include <set>
#include <string>
#include <utility>
#include <unordered_map>
Expand Down Expand Up @@ -72,19 +73,18 @@ class CANParser {
bool can_valid = false;
bool bus_timeout = false;
uint64_t first_nanos = 0;
uint64_t last_nanos = 0;
uint64_t last_nonempty_nanos = 0;
uint64_t bus_timeout_threshold = 0;
uint64_t can_invalid_cnt = CAN_INVALID_CNT;

CANParser(int abus, const std::string& dbc_name,
const std::vector<std::pair<uint32_t, int>> &messages);
CANParser(int abus, const std::string& dbc_name, bool ignore_checksum, bool ignore_counter);
void update(const std::vector<CanData> &can_data, std::vector<SignalValue> &vals);
void query_latest(std::vector<SignalValue> &vals, uint64_t last_ts = 0);
std::set<uint32_t> update(const std::vector<CanData> &can_data);
MessageState *messageState(uint32_t address) { return &message_states.at(address); }

protected:
void UpdateCans(const CanData &can);
void UpdateCans(const CanData &can, std::set<uint32_t> &updated_addresses);
void UpdateValid(uint64_t nanos);
};

Expand Down
17 changes: 9 additions & 8 deletions opendbc/can/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from libc.stdint cimport uint8_t, uint32_t, uint64_t
from libcpp cimport bool
from libcpp.pair cimport pair
from libcpp.set cimport set
from libcpp.string cimport string
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map
Expand Down Expand Up @@ -53,13 +54,6 @@ cdef extern from "common_dbc.h":
unordered_map[uint32_t, const Msg*] addr_to_msg
unordered_map[string, const Msg*] name_to_msg

cdef struct SignalValue:
uint32_t address
uint64_t ts_nanos
string name
double value
vector[double] all_values

cdef struct SignalPackValue:
string name
double value
Expand All @@ -68,6 +62,12 @@ cdef extern from "common_dbc.h":
cdef extern from "common.h":
cdef const DBC* dbc_lookup(const string) except +

cdef cppclass MessageState:
vector[Signal] parse_sigs;
vector[double] vals;
vector[vector[double]] all_vals;
uint64_t last_seen_nanos;

cdef struct CanFrame:
long src
uint32_t address
Expand All @@ -81,7 +81,8 @@ cdef extern from "common.h":
bool can_valid
bool bus_timeout
CANParser(int, string, vector[pair[uint32_t, int]]) except +
void update(vector[CanData]&, vector[SignalValue]&) except +
set[uint32_t] update(vector[CanData]&) except +
MessageState *messageState(uint32_t address)

cdef cppclass CANPacker:
CANPacker(string)
Expand Down
8 changes: 0 additions & 8 deletions opendbc/can/common_dbc.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,6 @@ struct SignalPackValue {
double value;
};

struct SignalValue {
uint32_t address;
uint64_t ts_nanos;
std::string name;
double value; // latest value
std::vector<double> all_values; // all values from this cycle
};

enum SignalType {
DEFAULT,
COUNTER,
Expand Down
50 changes: 14 additions & 36 deletions opendbc/can/parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@
#include <stdexcept>
#include <sstream>

#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/mman.h>

#include "opendbc/can/common.h"

int64_t get_raw_value(const std::vector<uint8_t> &msg, const Signal &sig) {
Expand Down Expand Up @@ -157,24 +152,28 @@ CANParser::CANParser(int abus, const std::string& dbc_name, bool ignore_checksum
}
}

void CANParser::update(const std::vector<CanData> &can_data, std::vector<SignalValue> &vals) {
std::set<uint32_t> CANParser::update(const std::vector<CanData> &can_data) {
// Clear all_values
for (auto &state : message_states) {
for (auto &vals : state.second.all_vals) vals.clear();
}

uint64_t current_nanos = 0;
std::set<uint32_t> updated_addresses;
for (const auto &c : can_data) {
if (first_nanos == 0) {
first_nanos = c.nanos;
}
if (current_nanos == 0) {
current_nanos = c.nanos;
}
last_nanos = c.nanos;

UpdateCans(c);
UpdateValid(last_nanos);
UpdateCans(c, updated_addresses);
UpdateValid(c.nanos);
}
query_latest(vals, current_nanos);
return updated_addresses;
}

void CANParser::UpdateCans(const CanData &can) {
void CANParser::UpdateCans(const CanData &can, std::set<uint32_t> &updated_addresses) {
//DEBUG("got %zu messages\n", can.frames.size());

bool bus_empty = true;
Expand Down Expand Up @@ -202,7 +201,9 @@ void CANParser::UpdateCans(const CanData &can) {
// continue;
//}

state_it->second.parse(can.nanos, frame.dat);
if (state_it->second.parse(can.nanos, frame.dat)) {
updated_addresses.insert(state_it->first);
}
}

// update bus timeout
Expand Down Expand Up @@ -240,26 +241,3 @@ void CANParser::UpdateValid(uint64_t nanos) {
can_invalid_cnt = _valid ? 0 : (can_invalid_cnt + 1);
can_valid = (can_invalid_cnt < CAN_INVALID_CNT) && _counters_valid;
}

void CANParser::query_latest(std::vector<SignalValue> &vals, uint64_t last_ts) {
if (last_ts == 0) {
last_ts = last_nanos;
}
for (auto& kv : message_states) {
auto& state = kv.second;
if (last_ts != 0 && state.last_seen_nanos < last_ts) {
continue;
}

for (int i = 0; i < state.parse_sigs.size(); i++) {
const Signal &sig = state.parse_sigs[i];
SignalValue &v = vals.emplace_back();
v.address = state.address;
v.ts_nanos = state.last_seen_nanos;
v.name = sig.name;
v.value = state.vals[i];
v.all_values = state.all_vals[i];
state.all_vals[i].clear();
}
}
}
50 changes: 17 additions & 33 deletions opendbc/can/parser_pyx.pyx
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
# distutils: language = c++
# cython: c_string_encoding=ascii, language_level=3

from cython.operator cimport dereference as deref, preincrement as preinc
from libcpp.pair cimport pair
from libcpp.string cimport string
from libcpp.vector cimport vector
from libc.stdint cimport uint32_t

from .common cimport CANParser as cpp_CANParser
from .common cimport dbc_lookup, SignalValue, DBC, CanData, CanFrame
from .common cimport dbc_lookup, Msg, DBC, CanData, CanFrame

import numbers
from collections import defaultdict
Expand Down Expand Up @@ -50,15 +49,16 @@ cdef class CANParser:
self.addresses.push_back(address)

name = m.name.decode("utf8")
self.vl[address] = {}
signal_names = [sig.name.decode("utf-8") for sig in (<Msg*>m).sigs]

self.vl[address] = {name : 0 for name in signal_names}
self.vl[name] = self.vl[address]
self.vl_all[address] = defaultdict(list)
self.vl_all[name] = self.vl_all[address]
self.ts_nanos[address] = {}
self.ts_nanos[address] = {name : 0 for name in signal_names}
self.ts_nanos[name] = self.ts_nanos[address]

self.can = new cpp_CANParser(bus, dbc_name, message_v)
self.update_strings([])

def __dealloc__(self):
if self.can:
Expand All @@ -71,13 +71,6 @@ cdef class CANParser:
for address in self.addresses:
self.vl_all[address].clear()

cur_address = -1
vl = {}
vl_all = {}
ts_nanos = {}
updated_addrs = set()

cdef vector[SignalValue] new_vals
cdef CanFrame* frame
cdef CanData* can_data
cdef vector[CanData] can_data_array
Expand All @@ -99,27 +92,18 @@ cdef class CANParser:
except TypeError:
raise RuntimeError("invalid parameter")

self.can.update(can_data_array, new_vals)

cdef vector[SignalValue].iterator it = new_vals.begin()
cdef SignalValue* cv
while it != new_vals.end():
cv = &deref(it)

# Check if the address has changed
if cv.address != cur_address:
cur_address = cv.address
vl = self.vl[cur_address]
vl_all = self.vl_all[cur_address]
ts_nanos = self.ts_nanos[cur_address]
updated_addrs.add(cur_address)

# Cast char * directly to unicode
cv_name = <unicode>cv.name
vl[cv_name] = cv.value
vl_all[cv_name] = cv.all_values
ts_nanos[cv_name] = cv.ts_nanos
preinc(it)
updated_addrs = self.can.update(can_data_array)
for addr in updated_addrs:
vl = self.vl[addr]
vl_all = self.vl_all[addr]
ts_nanos = self.ts_nanos[addr]

state = self.can.messageState(addr)
for i in range(state.parse_sigs.size()):
name = <unicode>state.parse_sigs[i].name
vl[name] = state.vals[i]
vl_all[name] = state.all_vals[i]
ts_nanos[name] = state.last_seen_nanos

return updated_addrs

Expand Down

0 comments on commit be80445

Please sign in to comment.