diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index de3d6298..100e7006 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,4 +1,3 @@ ---- name: CI on: @@ -50,6 +49,7 @@ jobs: - stream-to-file - trace-archiver - trace-archiver-tdengine + - trace-reader - trace-to-events steps: diff --git a/Cargo.lock b/Cargo.lock index 28f7a091..f1c3c640 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -87,7 +87,7 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" dependencies = [ - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -97,7 +97,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628" dependencies = [ "anstyle", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -295,14 +295,14 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets", + "windows-targets 0.48.5", ] [[package]] name = "clap" -version = "4.4.8" +version = "4.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2275f18819641850fa26c89acc84d465c1bf91ce57bc2748b28c420473352f64" +checksum = "41fffed7514f420abec6d183b1d3acfd9099c79c3a10a06ade4f8203f1411272" dependencies = [ "clap_builder", "clap_derive", @@ -310,9 +310,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.8" +version = "4.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07cdf1b148b25c1e1f7a42225e30a0d99a615cd4637eae7365548dd4529b95bc" +checksum = "63361bae7eef3771745f02d8d892bec2fee5f6e34af316ba556e7f97a7069ff1" dependencies = [ "anstream", "anstyle", @@ -417,11 +417,11 @@ dependencies = [ [[package]] name = "crossterm" -version = "0.27.0" +version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df" +checksum = "a84cda67535339806297f1b331d6dd6320470d2a0fe65381e79ee9e156dd3d13" dependencies = [ - "bitflags 2.4.1", + "bitflags 1.3.2", "crossterm_winapi", "libc", "mio", @@ -548,7 +548,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown 0.14.2", + "hashbrown 0.14.3", "lock_api", "once_cell", "parking_lot_core 0.9.9", @@ -717,12 +717,12 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.7" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f258a7194e7f7c2a7837a8913aeab7fd8c383457034fa20ce4dd3dcb813e8eb8" +checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.52.0", ] [[package]] @@ -912,9 +912,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.14.2" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f93e7192158dbcda357bdec5fb5788eebf8bbac027f3f33e719d29135ae84156" +checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" [[package]] name = "hdf5" @@ -1110,7 +1110,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" dependencies = [ "equivalent", - "hashbrown 0.14.2", + "hashbrown 0.14.3", "serde", ] @@ -1137,7 +1137,7 @@ checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ "hermit-abi", "rustix", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1166,9 +1166,9 @@ checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" [[package]] name = "js-sys" -version = "0.3.65" +version = "0.3.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54c0c35952f67de54bb584e9fd912b3023117cbafc0a77d8f3dee1fb5f572fe8" +checksum = "cee9c64da59eae3b50095c18d3e74f8b73c0b86d2792824ff01bbce68ba229ca" dependencies = [ "wasm-bindgen", ] @@ -1183,10 +1183,7 @@ dependencies = [ "common", "crossterm", "env_logger", - "hdf5", - "lazy_static", "log", - "ndarray", "ratatui", "rdkafka", "streaming-types", @@ -1205,8 +1202,8 @@ dependencies = [ "prometheus-client", "serde", "serde_json", - "strum 0.24.1", - "strum_macros 0.24.3", + "strum", + "strum_macros", "tokio", "tower", ] @@ -1340,7 +1337,7 @@ dependencies = [ "libc", "log", "wasi", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1562,7 +1559,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ae859aa07428ca9a929b936690f8b12dc5f11dd8c6992a18ca93919f28bc177" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -1610,7 +1607,7 @@ dependencies = [ "libc", "redox_syscall 0.4.1", "smallvec", - "windows-targets", + "windows-targets 0.48.5", ] [[package]] @@ -1724,9 +1721,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.69" +version = "1.0.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" +checksum = "39278fbbf5fb4f646ce651690877f89d1c5811a3d4acb27700c1cb3cdb78fd3b" dependencies = [ "unicode-ident", ] @@ -1821,17 +1818,15 @@ dependencies = [ [[package]] name = "ratatui" -version = "0.23.0" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e2e4cd95294a85c3b4446e63ef054eea43e0205b1fd60120c16b74ff7ff96ad" +checksum = "8285baa38bdc9f879d92c0e37cb562ef38aa3aeefca22b3200186bc39242d3d5" dependencies = [ "bitflags 2.4.1", "cassowary", "crossterm", "indoc", - "itertools 0.11.0", "paste", - "strum 0.25.0", "unicode-segmentation", "unicode-width", ] @@ -2031,7 +2026,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -2250,7 +2245,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -2294,15 +2289,6 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f" -[[package]] -name = "strum" -version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" -dependencies = [ - "strum_macros 0.25.3", -] - [[package]] name = "strum_macros" version = "0.24.3" @@ -2316,19 +2302,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "strum_macros" -version = "0.25.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23dc1fa9ac9c169a78ba62f0b841814b7abae11bdd047b9c58f893439e309ea0" -dependencies = [ - "heck", - "proc-macro2", - "quote", - "rustversion", - "syn 2.0.39", -] - [[package]] name = "syn" version = "1.0.109" @@ -2365,9 +2338,9 @@ dependencies = [ [[package]] name = "taos" -version = "0.10.21" +version = "0.10.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c669f065c5c3da93643f53923be102000d2d3a82318b84155385c959f855238b" +checksum = "5ff1fab463291480a917e206cf81d49ffccaeebf9bb17d896ab8818f473bee8a" dependencies = [ "async-trait", "futures", @@ -2378,9 +2351,9 @@ dependencies = [ [[package]] name = "taos-error" -version = "0.10.21" +version = "0.10.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94f8569a53799e8f7e4985505d43876c1b62d3f4d2cf2b000911bb7cc38d7119" +checksum = "5515ef8637bfa357e6867c896baaa2ede704a90fdb70cb080b21a5b2219f2945" dependencies = [ "anyhow", "derive_more", @@ -2392,9 +2365,9 @@ dependencies = [ [[package]] name = "taos-query" -version = "0.10.21" +version = "0.10.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b93150dd9e60cd55f001b82c5518b1e695d411d3001ef98be613fba63f4b5ee" +checksum = "30362cbfa93f5ae49d483a7cf2fbc05e07a0884834e4599e7fb51299d960c6b4" dependencies = [ "anyhow", "async-trait", @@ -2431,9 +2404,9 @@ dependencies = [ [[package]] name = "taos-ws" -version = "0.10.21" +version = "0.10.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11bd220ff3ebe94def40a9885959a03f1645c3153c78898289a66422d75ce98a" +checksum = "ed6e6f5ae9c86aea2f3c5b03d71530820a7f1a7078ddfaa8e174962c22bde2e7" dependencies = [ "anyhow", "async-trait", @@ -2559,7 +2532,7 @@ dependencies = [ "pin-project-lite", "socket2 0.5.5", "tokio-macros", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -2676,6 +2649,22 @@ dependencies = [ "tokio", ] +[[package]] +name = "trace-reader" +version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "clap", + "common", + "env_logger", + "log", + "rand", + "rdkafka", + "streaming-types", + "tokio", +] + [[package]] name = "trace-to-events" version = "0.1.0" @@ -2843,9 +2832,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.88" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7daec296f25a1bae309c0cd5c29c4b260e510e6d813c286b19eaadf409d40fce" +checksum = "0ed0d4f68a3015cc185aff4db9506a015f4b96f95303897bfa23f846db54064e" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -2853,9 +2842,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.88" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e397f4664c0e4e428e8313a469aaa58310d302159845980fd23b0f22a847f217" +checksum = "1b56f625e64f3a1084ded111c4d5f477df9f8c92df113852fa5a374dbda78826" dependencies = [ "bumpalo", "log", @@ -2868,9 +2857,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.88" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5961017b3b08ad5f3fe39f1e79877f8ee7c23c5e5fd5eb80de95abc41f1f16b2" +checksum = "0162dbf37223cd2afce98f3d0785506dcb8d266223983e4b5b525859e6e182b2" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2878,9 +2867,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.88" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5353b8dab669f5e10f5bd76df26a9360c748f054f862ff5f3f8aae0c7fb3907" +checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", @@ -2891,9 +2880,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.88" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d046c5d029ba91a1ed14da14dca44b68bf2f124cfbaf741c54151fdb3e0750b" +checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f" [[package]] name = "winapi" @@ -2932,7 +2921,7 @@ version = "0.51.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1f8cf84f35d2db49a46868f947758c7a1138116f7fac3bc844f43ade1292e64" dependencies = [ - "windows-targets", + "windows-targets 0.48.5", ] [[package]] @@ -2941,7 +2930,16 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets", + "windows-targets 0.48.5", +] + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.0", ] [[package]] @@ -2950,13 +2948,28 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + +[[package]] +name = "windows-targets" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +dependencies = [ + "windows_aarch64_gnullvm 0.52.0", + "windows_aarch64_msvc 0.52.0", + "windows_i686_gnu 0.52.0", + "windows_i686_msvc 0.52.0", + "windows_x86_64_gnu 0.52.0", + "windows_x86_64_gnullvm 0.52.0", + "windows_x86_64_msvc 0.52.0", ] [[package]] @@ -2965,42 +2978,84 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" + [[package]] name = "windows_i686_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" +[[package]] +name = "windows_i686_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" + [[package]] name = "windows_i686_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" +[[package]] +name = "windows_i686_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" + [[package]] name = "winnow" version = "0.5.19" diff --git a/Cargo.toml b/Cargo.toml index af09fb0c..c172cbe9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "streaming-types", "trace-archiver", "trace-archiver-tdengine", + "trace-reader", "trace-to-events", ] diff --git a/flake.nix b/flake.nix index 51b6004d..a5a4078e 100644 --- a/flake.nix +++ b/flake.nix @@ -90,6 +90,7 @@ // import ./stream-to-file {inherit pkgs naersk' version git_revision nativeBuildInputs buildInputs hdf5-joined;} // import ./trace-archiver {inherit pkgs naersk' version git_revision nativeBuildInputs buildInputs hdf5-joined;} // import ./trace-archiver-tdengine {inherit pkgs naersk' version git_revision nativeBuildInputs buildInputs;} + // import ./trace-reader {inherit pkgs naersk' version git_revision nativeBuildInputs buildInputs;} // import ./trace-to-events {inherit pkgs naersk' version git_revision nativeBuildInputs buildInputs;}; } ); diff --git a/kafka-daq-report/Cargo.toml b/kafka-daq-report/Cargo.toml index 775b5112..2e6c9ec1 100644 --- a/kafka-daq-report/Cargo.toml +++ b/kafka-daq-report/Cargo.toml @@ -11,12 +11,9 @@ streaming-types = { path = "../streaming-types" } anyhow.workspace = true chrono.workspace = true clap.workspace = true -crossterm = "0.27.0" +crossterm = "0.26.1" env_logger.workspace = true -hdf5.workspace = true -lazy_static.workspace = true log.workspace = true -ndarray.workspace = true -ratatui = "0.23.0" +ratatui = "0.22.0" rdkafka.workspace = true -tokio.workspace = true +tokio.workspace = true \ No newline at end of file diff --git a/kafka-daq-report/default.nix b/kafka-daq-report/default.nix new file mode 100644 index 00000000..f1e3b41a --- /dev/null +++ b/kafka-daq-report/default.nix @@ -0,0 +1,42 @@ +{ + pkgs, + naersk', + version, + git_revision, + nativeBuildInputs, + buildInputs, +}: rec { + kafka-daq-report = naersk'.buildPackage { + name = "kafka-daq-report"; + version = version; + + src = ./..; + cargoBuildOptions = x: x ++ ["--package" "kafka-daq-report"]; + + nativeBuildInputs = nativeBuildInputs; + buildInputs = buildInputs; + + overrideMain = p: { + GIT_REVISION = git_revision; + }; + }; + + kafka-daq-report-container-image = pkgs.dockerTools.buildImage { + name = "supermusr-kafka-daq-report"; + tag = "latest"; + created = "now"; + + copyToRoot = pkgs.buildEnv { + name = "image-root"; + paths = with pkgs; [bashInteractive coreutils]; + pathsToLink = ["/bin"]; + }; + + config = { + Entrypoint = ["${pkgs.tini}/bin/tini" "--" "${kafka-daq-report}/bin/kafka-daq-report"]; + Env = [ + "SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt" + ]; + }; + }; +} diff --git a/trace-reader/Cargo.toml b/trace-reader/Cargo.toml new file mode 100644 index 00000000..70f5471a --- /dev/null +++ b/trace-reader/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "trace-reader" +version.workspace = true +license.workspace = true +edition.workspace = true + +[dependencies] +common = { path = "../common" } +streaming-types = { path = "../streaming-types" } + +anyhow.workspace = true +clap.workspace = true +rdkafka.workspace = true +tokio.workspace = true +chrono = "0.4.22" +env_logger = "0.10" +log = "0.4" +rand = "0.8.5" diff --git a/trace-reader/Readme.md b/trace-reader/Readme.md new file mode 100644 index 00000000..f08fa161 --- /dev/null +++ b/trace-reader/Readme.md @@ -0,0 +1,28 @@ +# trace-reader + +## Introduction +This program reads picscope .trace files, a binary file developed by E.M Schooneveld. + +## Command Line Interface +For command line instructions run +``` +trace-reader --help +``` + +### Options +The `number-of-trace-events` parameter is the number of traces events that are extracted from the file (either randomly or in sequence). The file defines how many channels the digitizer has, each trace event contains one trace for each channel. The number of messages dispatched is equal to the number of trace events times the number of channels. + +If `random-sample` is set then trace-events are read from the file randomly. Selection is made with replacement so duplication is possible. +If this flag is not set then trace-events are read in order. +If `number-of-trace-events` is greater than the number available then trace-events are the reader wraps around to the beginning of the file as often as necessary. + +### Example +The following reads 18 trace events (sampled randomly with replacement) from `Traces/MuSR_A41_B42_C43_D44_Apr2021_Ag_ZF_IntDeg_Slit60_short.trace` and dispatches them to the topic `Traces` on the Kafka broker located at `localhost:19092`: +``` +trace-reader --broker localhost:19092 --consumer-group trace-producer --trace-topic Traces --file-name Traces/MuSR_A41_B42_C43_D44_Apr2021_Ag_ZF_IntDeg_Slit60_short.traces --number-of-trace-events 18 --random-sample +``` + +## Terminology +- Trace: This is continous block of voltage readings from a digitizer channel. +- Trace Event: This is a collection of traces, one for each channel on the digitizer. +Note that "Event" here is meant in a different sense than the trace-to-event tool. The overlap is a result of terminology used in the .trace file. To avoid confusion, the term "Trace Event" is used here. \ No newline at end of file diff --git a/trace-reader/default.nix b/trace-reader/default.nix new file mode 100644 index 00000000..f666202b --- /dev/null +++ b/trace-reader/default.nix @@ -0,0 +1,42 @@ +{ + pkgs, + naersk', + version, + git_revision, + nativeBuildInputs, + buildInputs, +}: rec { + trace-reader = naersk'.buildPackage { + name = "trace-reader"; + version = version; + + src = ./..; + cargoBuildOptions = x: x ++ ["--package" "trace-reader"]; + + nativeBuildInputs = nativeBuildInputs; + buildInputs = buildInputs; + + overrideMain = p: { + GIT_REVISION = git_revision; + }; + }; + + trace-reader-container-image = pkgs.dockerTools.buildImage { + name = "supermusr-trace-reader"; + tag = "latest"; + created = "now"; + + copyToRoot = pkgs.buildEnv { + name = "image-root"; + paths = with pkgs; [bashInteractive coreutils]; + pathsToLink = ["/bin"]; + }; + + config = { + Entrypoint = ["${pkgs.tini}/bin/tini" "--" "${trace-reader}/bin/trace-reader"]; + Env = [ + "SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt" + ]; + }; + }; +} diff --git a/trace-reader/src/loader.rs b/trace-reader/src/loader.rs new file mode 100644 index 00000000..70c63945 --- /dev/null +++ b/trace-reader/src/loader.rs @@ -0,0 +1,298 @@ +use std::{ + fmt::Debug, + fs::File, + io::{Error, ErrorKind, Read, Seek, SeekFrom}, + mem::size_of, + path::PathBuf, + usize, +}; + +#[derive(Default, Debug)] +pub(crate) struct TraceFileHeader { + pub(crate) prog_version: String, + pub(crate) run_descript: String, + pub(crate) _resolution: i32, + pub(crate) number_of_channels: i32, + pub(crate) _channel_enabled: Vec, + pub(crate) _volts_scale_factor: Vec, + pub(crate) _channel_offset_volts: Vec, + pub(crate) _sample_time: f64, + pub(crate) number_of_samples: i32, + pub(crate) _trigger_enabled: Vec, + pub(crate) _ex_trigger_enabled: bool, + pub(crate) _trigger_level: Vec, + pub(crate) _ex_trigger_level: f64, + pub(crate) _trigger_slope: Vec, + pub(crate) _ex_trigger_slope: i32, + total_bytes: usize, +} + +impl TraceFileHeader { + pub(crate) fn load(file: &mut File) -> Result { + let mut total_bytes = usize::default(); + let prog_version = load_string(file, &mut total_bytes)?; + let run_descript = load_string(file, &mut total_bytes)?; + let _resolution = load_i32(file, &mut total_bytes)?; + let number_of_channels = load_i32(file, &mut total_bytes)?; + Ok(TraceFileHeader { + prog_version, + run_descript, + _resolution, + number_of_channels, + _channel_enabled: load_bool_vec(file, number_of_channels as usize, &mut total_bytes)?, + _volts_scale_factor: load_f64_vec(file, number_of_channels as usize, &mut total_bytes)?, + _channel_offset_volts: load_f64_vec( + file, + number_of_channels as usize, + &mut total_bytes, + )?, + _sample_time: load_f64(file, &mut total_bytes)?, + number_of_samples: load_i32(file, &mut total_bytes)?, + _trigger_enabled: load_bool_vec(file, number_of_channels as usize, &mut total_bytes)?, + _ex_trigger_enabled: load_bool(file, &mut total_bytes)?, + _trigger_level: load_f64_vec(file, number_of_channels as usize, &mut total_bytes)?, + _ex_trigger_level: load_f64(file, &mut total_bytes)?, + _trigger_slope: load_i32_vec(file, number_of_channels as usize, &mut total_bytes)?, + _ex_trigger_slope: load_i32(file, &mut total_bytes)?, + total_bytes, + }) + } +} + +impl TraceFileHeader { + fn get_total_bytes(&self) -> usize { + self.total_bytes + } + + fn get_size(&self) -> usize { + size_of::() + self.prog_version.len() + //pub prog_version : String, + size_of::() + self.run_descript.len() + //pub run_descript : String, + size_of::() + //pub resolution : i32, + size_of::() + //pub number_of_channels : i32, + size_of::()*self.number_of_channels as usize +//pub channel_enabled : Vec, + size_of::()*self.number_of_channels as usize + //pub volts_scale_factor : Vec, + size_of::()*self.number_of_channels as usize + //pub channel_offset_volts : Vec, + size_of::() + //pub sample_time : f64, + size_of::() + //pub number_of_samples : i32, + size_of::()*self.number_of_channels as usize + //pub trigger_enabled : Vec, + size_of::() + //pub ex_trigger_enabled : bool, + size_of::()*self.number_of_channels as usize + //pub trigger_level : Vec, + size_of::() + //pub ex_trigger_level : f64, + size_of::()*self.number_of_channels as usize + //pub trigger_slope : Vec, + size_of::() //pub ex_trigger_slope : i32, + } + + fn get_event_size(&self) -> usize { + TraceFileEvent::get_size( + self.number_of_channels as usize, + self.number_of_samples as usize, + ) + } + + fn get_trace_event(&self, file: &mut File) -> Result { + TraceFileEvent::load_raw( + file, + self.number_of_channels as usize, + self.number_of_samples as usize, + ) + } +} + +#[derive(Default, Debug)] +pub(crate) struct TraceFileEvent { + pub(crate) cur_trace_event: i32, + pub(crate) trace_event_runtime: f64, + pub(crate) number_saved_traces: i32, + pub(crate) saved_channels: Vec, + pub(crate) trigger_time: f64, + pub(crate) _trace: Vec>, + pub(crate) raw_trace: Vec>, + total_bytes: usize, +} + +impl TraceFileEvent { + fn get_size(num_channels: usize, num_samples: usize) -> usize { + size_of::() + //pub cur_trace_event : i32, + size_of::() + //pub trace_event_runtime : f64, + size_of::() + //pub number_saved_traces : i32, + size_of::()*num_channels + //pub saved_channels : Vec, + size_of::() + //pub trigger_time : f64, + size_of::()*num_channels*num_samples //pub raw_trace : Vec>, + } + + pub(crate) fn load(file: &mut File, num_channels: usize) -> Result { + let mut total_bytes = usize::default(); + Ok(TraceFileEvent { + cur_trace_event: load_i32(file, &mut total_bytes)?, + trace_event_runtime: load_f64(file, &mut total_bytes)?, + number_saved_traces: load_i32(file, &mut total_bytes)?, + saved_channels: load_bool_vec(file, num_channels, &mut total_bytes)?, + trigger_time: load_f64(file, &mut total_bytes)?, + total_bytes, + ..Default::default() + }) + } + + pub(crate) fn load_raw( + file: &mut File, + num_channels: usize, + num_samples: usize, + ) -> Result { + let mut total_bytes = usize::default(); + let trace_event = Self::load(file, num_channels)?; + Ok(TraceFileEvent { + cur_trace_event: trace_event.cur_trace_event, + trace_event_runtime: trace_event.trace_event_runtime, + number_saved_traces: trace_event.number_saved_traces, + saved_channels: trace_event.saved_channels, + trigger_time: trace_event.trigger_time, + raw_trace: (0..num_channels) + .map(|_| load_raw_trace(file, num_samples, &mut total_bytes)) + .collect::>()?, + total_bytes: trace_event.total_bytes + total_bytes, + ..Default::default() + }) + } +} + +#[derive(Debug)] +pub(crate) struct TraceFile { + file: File, + header: TraceFileHeader, + num_trace_events: usize, +} + +impl TraceFile { + pub(crate) fn get_trace_event(&mut self, event: usize) -> Result { + if event < self.num_trace_events { + self.file.seek(SeekFrom::Start( + (self.header.get_size() + event * self.header.get_event_size()) as u64, + ))?; + self.header.get_trace_event(&mut self.file) + } else { + Err(Error::new( + ErrorKind::InvalidInput, + "Invalid event index: {event} should be less than {num_events}", + )) + } + } + + pub(crate) fn get_number_of_trace_events(&self) -> usize { + self.num_trace_events + } + + pub(crate) fn get_num_channels(&self) -> usize { + self.header.number_of_channels as usize + } + + pub(crate) fn get_num_samples(&self) -> usize { + self.header.number_of_samples as usize + } +} + +pub(crate) fn load_trace_file(name: PathBuf) -> Result { + let mut file = File::open(name)?; + let header: TraceFileHeader = TraceFileHeader::load(&mut file)?; + let file_size = file + .metadata() + .map_err(|e| Error::new(ErrorKind::InvalidInput, e))? + .len() as usize; + let size_minus_header = file_size - header.get_total_bytes(); + let trace_event_size = header.get_event_size(); + if size_minus_header % trace_event_size != 0 { + Err(Error::new( + ErrorKind::Other, + format!("Problem: {0} != 0", size_minus_header % trace_event_size), + )) + } else { + Ok(TraceFile { + file, + header, + num_trace_events: size_minus_header / trace_event_size, + }) + } +} + +fn load_scalar( + file: &mut File, + bytes: &mut [u8], + total_bytes: &mut usize, +) -> Result<(), Error> { + let num_bytes = file.read(bytes)?; + *total_bytes += num_bytes; + if num_bytes == B { + Ok(()) + } else { + Err(Error::new( + ErrorKind::UnexpectedEof, + format!("Expected {B} bytes, got {num_bytes}."), + )) + } +} + +pub(crate) fn load_i32(file: &mut File, total_bytes: &mut usize) -> Result { + let mut bytes = i32::to_le_bytes(0); + load_scalar::<4>(file, &mut bytes, total_bytes)?; + Ok(i32::from_le_bytes(bytes)) +} + +pub(crate) fn load_f64(file: &mut File, total_bytes: &mut usize) -> Result { + let mut bytes = f64::to_le_bytes(0.); + load_scalar::<8>(file, &mut bytes, total_bytes)?; + Ok(f64::from_le_bytes(bytes)) +} + +pub(crate) fn load_bool(file: &mut File, total_bytes: &mut usize) -> Result { + let mut bytes = u8::to_le_bytes(0); + load_scalar::<1>(file, &mut bytes, total_bytes)?; + Ok(u8::from_le_bytes(bytes) != 0) +} + +pub(crate) fn load_bool_vec( + file: &mut File, + size: usize, + total_bytes: &mut usize, +) -> Result, Error> { + (0..size).map(|_| load_bool(file, total_bytes)).collect() +} + +pub(crate) fn load_f64_vec( + file: &mut File, + size: usize, + total_bytes: &mut usize, +) -> Result, Error> { + (0..size).map(|_| load_f64(file, total_bytes)).collect() +} + +pub(crate) fn load_i32_vec( + file: &mut File, + size: usize, + total_bytes: &mut usize, +) -> Result, Error> { + (0..size).map(|_| load_i32(file, total_bytes)).collect() +} + +pub(crate) fn load_string(file: &mut File, total_bytes: &mut usize) -> Result { + let size = load_i32(file, total_bytes)?; + *total_bytes += size as usize; + let mut string_bytes = Vec::::new(); + string_bytes.resize(size as usize, 0); + file.read_exact(&mut string_bytes)?; + String::from_utf8(string_bytes).map_err(|e| Error::new(ErrorKind::InvalidData, e)) +} + +pub(crate) fn load_raw_trace( + file: &mut File, + size: usize, + total_bytes: &mut usize, +) -> Result, Error> { + let mut trace_bytes = Vec::::new(); + let bytes = (u16::BITS / u8::BITS) as usize * size; + *total_bytes += bytes; + + trace_bytes.resize(bytes, 0); + file.read_exact(&mut trace_bytes)?; + Ok((0..size) + .map(|i| u16::from_be_bytes([trace_bytes[2 * i], trace_bytes[2 * i + 1]])) + .collect()) +} diff --git a/trace-reader/src/main.rs b/trace-reader/src/main.rs new file mode 100644 index 00000000..704be2e3 --- /dev/null +++ b/trace-reader/src/main.rs @@ -0,0 +1,103 @@ +use clap::Parser; +use common::{DigitizerId, FrameNumber}; +use rand::{seq::IteratorRandom, thread_rng}; +use rdkafka::producer::FutureProducer; +use std::path::PathBuf; + +mod loader; +mod processing; +use loader::load_trace_file; +use processing::dispatch_trace_file; + +#[derive(Debug, Parser)] +#[clap(author, version, about)] +struct Cli { + /// Kafka message broker, should have format `host:port`, e.g. `localhost:19092` + #[clap(long)] + broker: String, + + /// Optional Kafka username + #[clap(long)] + username: Option, + + /// Optional Kafka password + #[clap(long)] + password: Option, + + /// Name of the Kafka consumer group + #[clap(long)] + consumer_group: String, + + /// The Kafka topic that trace messages are produced to + #[clap(long)] + trace_topic: String, + + /// Relative path to the .trace file to be read + #[clap(long)] + file_name: PathBuf, + + /// The frame number to assign the message + #[clap(long, default_value = "0")] + frame_number: FrameNumber, + + /// The digitizer id to assign the message + #[clap(long, default_value = "0")] + digitizer_id: DigitizerId, + + /// The number of trace events to read. If zero, then all trace events are read + #[clap(long, default_value = "1")] + number_of_trace_events: usize, + + /// If set, then trace events are sampled randomly with replacement, if not set then trace events are read in order + #[clap(long, default_value = "false")] + random_sample: bool, +} + +#[tokio::main] +async fn main() { + env_logger::init(); + + let args = Cli::parse(); + + let client_config = + common::generate_kafka_client_config(&args.broker, &args.username, &args.password); + + let producer: FutureProducer = client_config + .create() + .expect("Kafka Producer should be created"); + + let trace_file = load_trace_file(args.file_name).expect("Trace File should load"); + let total_trace_events = trace_file.get_number_of_trace_events(); + let num_trace_events = if args.number_of_trace_events == 0 { + total_trace_events + } else { + args.number_of_trace_events + }; + + let trace_event_indices: Vec<_> = if args.random_sample { + (0..num_trace_events) + .map(|_| { + (0..num_trace_events) + .choose(&mut thread_rng()) + .unwrap_or_default() + }) + .collect() + } else { + (0..num_trace_events) + .cycle() + .take(num_trace_events) + .collect() + }; + + dispatch_trace_file( + trace_file, + trace_event_indices, + args.frame_number, + args.digitizer_id, + &producer, + &args.trace_topic, + 6000, + ) + .await + .expect("Trace File should be dispatched to Kafka"); +} diff --git a/trace-reader/src/processing.rs b/trace-reader/src/processing.rs new file mode 100644 index 00000000..230b0d70 --- /dev/null +++ b/trace-reader/src/processing.rs @@ -0,0 +1,111 @@ +//! This module allows one to simulate instances of DigitizerAnalogTraceMessage +//! using the FlatBufferBuilder. + +use super::loader::{TraceFile, TraceFileEvent}; +use anyhow::{Error, Result}; +use chrono::Utc; +use log::{debug, error}; +use rdkafka::{ + producer::{FutureProducer, FutureRecord}, + util::Timeout, +}; +use std::time::Duration; + +use common::{Channel, DigitizerId, FrameNumber, Intensity}; +use streaming_types::{ + dat1_digitizer_analog_trace_v1_generated::{ + finish_digitizer_analog_trace_message_buffer, ChannelTrace, ChannelTraceArgs, + DigitizerAnalogTraceMessage, DigitizerAnalogTraceMessageArgs, + }, + flatbuffers::{FlatBufferBuilder, WIPOffset}, + frame_metadata_v1_generated::{FrameMetadataV1, FrameMetadataV1Args, GpsTime}, +}; + +/// Reads the contents of trace_file and dispatches messages to the given Kafka topic. +pub(crate) async fn dispatch_trace_file( + mut trace_file: TraceFile, + trace_event_indices: Vec, + frame_number: FrameNumber, + digitizer_id: DigitizerId, + producer: &FutureProducer, + topic: &str, + timeout_ms: u64, +) -> Result<()> { + let mut fbb = FlatBufferBuilder::new(); + for index in trace_event_indices { + let event = trace_file.get_trace_event(index)?; + create_message( + &mut fbb, + Utc::now().into(), + frame_number, + digitizer_id, + trace_file.get_num_channels(), + trace_file.get_num_samples(), + &event, + )?; + + let future_record = FutureRecord::to(topic).payload(fbb.finished_data()).key(""); + let timeout = Timeout::After(Duration::from_millis(timeout_ms)); + match producer.send(future_record, timeout).await { + Ok(r) => debug!("Delivery: {:?}", r), + Err(e) => error!("Delivery failed: {:?}", e.0), + }; + } + Ok(()) +} + +pub(crate) fn create_channel<'a>( + fbb: &mut FlatBufferBuilder<'a>, + channel: Channel, + trace: &[Intensity], +) -> WIPOffset> { + let voltage = Some(fbb.create_vector::(trace)); + ChannelTrace::create(fbb, &ChannelTraceArgs { channel, voltage }) +} + +/// Loads a FlatBufferBuilder with a new DigitizerAnalogTraceMessage instance with a custom timestamp. +/// #Arguments +/// * `fbb` - A mutable reference to the FlatBufferBuilder to use. +/// * `time` - A `frame_metadata_v1_generated::GpsTime` instance containing the timestamp. +/// * `frame_number` - The frame number to use. +/// * `digitizer_id` - The id of the digitizer to use. +/// * `measurements_per_frame` - The number of measurements to simulate in each channel. +/// * `num_channels` - The number of channels to simulate. +/// #Returns +/// A string result, or an error. +pub(crate) fn create_message( + fbb: &mut FlatBufferBuilder<'_>, + time: GpsTime, + frame_number: u32, + digitizer_id: u8, + number_of_channels: usize, + number_of_samples: usize, + event: &TraceFileEvent, +) -> Result { + fbb.reset(); + + let metadata: FrameMetadataV1Args = FrameMetadataV1Args { + frame_number, + period_number: 0, + protons_per_pulse: 0, + running: true, + timestamp: Some(&time), + veto_flags: 0, + }; + let metadata: WIPOffset = FrameMetadataV1::create(fbb, &metadata); + + let channels: Vec<_> = (0..number_of_channels) + .map(|c| create_channel(fbb, c as u32, event.raw_trace[c].as_slice())) + .collect(); + + let message = DigitizerAnalogTraceMessageArgs { + digitizer_id, + metadata: Some(metadata), + sample_rate: 1_000_000_000, + channels: Some(fbb.create_vector_from_iter(channels.iter())), + }; + let message = DigitizerAnalogTraceMessage::create(fbb, &message); + finish_digitizer_analog_trace_message_buffer(fbb, message); + + Ok(format!("New message created for digitizer {digitizer_id}, frame number {frame_number}, and has {number_of_channels} channels, and {number_of_samples} measurements.")) +}