From cb0797bf475ca70159296797ca5d6dfab8d432c3 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 27 Nov 2023 11:13:10 +1300 Subject: [PATCH 01/17] feat: Add gRPC example with `tonic` --- block-streamer/Cargo.lock | 300 +++++++++- block-streamer/Cargo.toml | 11 + block-streamer/build.rs | 6 + block-streamer/data/route_guide_db.json | 702 ++++++++++++++++++++++++ block-streamer/proto/route_guide.proto | 106 ++++ block-streamer/src/client.rs | 124 +++++ block-streamer/src/data.rs | 34 ++ block-streamer/src/main.rs | 103 ++-- block-streamer/src/server.rs | 188 +++++++ 9 files changed, 1534 insertions(+), 40 deletions(-) create mode 100644 block-streamer/build.rs create mode 100644 block-streamer/data/route_guide_db.json create mode 100644 block-streamer/proto/route_guide.proto create mode 100644 block-streamer/src/client.rs create mode 100644 block-streamer/src/data.rs create mode 100644 block-streamer/src/server.rs diff --git a/block-streamer/Cargo.lock b/block-streamer/Cargo.lock index b96492ead..bb9f03abe 100644 --- a/block-streamer/Cargo.lock +++ b/block-streamer/Cargo.lock @@ -836,6 +836,51 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "base16ct" version = "0.1.1" @@ -879,6 +924,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitflags" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" + [[package]] name = "blake2" version = "0.9.2" @@ -913,6 +964,7 @@ name = "block-streamer" version = "0.1.0" dependencies = [ "anyhow", + "async-stream", "async-trait", "aws-config 1.0.0", "aws-sdk-s3 0.39.1", @@ -924,12 +976,16 @@ dependencies = [ "futures", "mockall", "near-lake-framework", + "prost", + "rand 0.8.5", "redis", "serde", "serde_json", "tokio", "tokio-stream", "tokio-util", + "tonic", + "tonic-build", "tracing", "tracing-subscriber", "wildmatch", @@ -1472,6 +1528,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "errno" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f258a7194e7f7c2a7837a8913aeab7fd8c383457034fa20ce4dd3dcb813e8eb8" +dependencies = [ + "libc", + "windows-sys", +] + [[package]] name = "fastrand" version = "1.9.0" @@ -1506,6 +1572,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "float-cmp" version = "0.9.0" @@ -1748,6 +1820,15 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "home" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5444c27eef6923071f7ebcc33e3444508466a76f7a2b93da00ed6e19f30c1ddb" +dependencies = [ + "windows-sys", +] + [[package]] name = "http" version = "0.2.11" @@ -1837,6 +1918,18 @@ dependencies = [ "tokio-rustls 0.24.1", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "iana-time-zone" version = "0.1.58" @@ -1949,6 +2042,12 @@ version = "0.2.150" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" +[[package]] +name = "linux-raw-sys" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "969488b55f8ac402214f3f5fd243ebb7206cf82de60d3172994707a4bcc2b829" + [[package]] name = "log" version = "0.4.20" @@ -1964,6 +2063,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md-5" version = "0.10.6" @@ -1980,6 +2085,12 @@ version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "mio" version = "0.8.9" @@ -2018,6 +2129,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + [[package]] name = "near-account-id" version = "0.17.0" @@ -2335,6 +2452,16 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "petgraph" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" +dependencies = [ + "fixedbitset", + "indexmap 2.1.0", +] + [[package]] name = "pin-project" version = "1.1.3" @@ -2419,6 +2546,16 @@ dependencies = [ "termtree", ] +[[package]] +name = "prettyplease" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" +dependencies = [ + "proc-macro2", + "syn 2.0.39", +] + [[package]] name = "primitive-types" version = "0.10.1" @@ -2447,6 +2584,60 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c55e02e35260070b6f716a2423c2ff1c3bb1642ddca6f99e1f26d06268a0e2d2" +dependencies = [ + "bytes", + "heck", + "itertools", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.39", + "tempfile", + "which", +] + +[[package]] +name = "prost-derive" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.39", +] + +[[package]] +name = "prost-types" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "193898f59edcf43c26227dcd4c8427f00d99d61e95dcde58dabd49fa291d470e" +dependencies = [ + "prost", +] + [[package]] name = "quote" version = "1.0.33" @@ -2549,6 +2740,15 @@ dependencies = [ "url", ] +[[package]] +name = "redox_syscall" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "reed-solomon-erasure" version = "4.0.2" @@ -2651,6 +2851,19 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "0.38.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc99bc2d4f1fed22595588a013687477aedf3cdcfb26558c559edb67b4d9b22e" +dependencies = [ + "bitflags 2.4.1", + "errno", + "libc", + "linux-raw-sys", + "windows-sys", +] + [[package]] name = "rustls" version = "0.20.9" @@ -2776,7 +2989,7 @@ version = "2.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de" dependencies = [ - "bitflags", + "bitflags 1.3.2", "core-foundation", "core-foundation-sys", "libc", @@ -3090,6 +3303,25 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + +[[package]] +name = "tempfile" +version = "3.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ef1adac450ad7f4b3c28589471ade84f25f731a7a0fe30d71dfa9f60fd808e5" +dependencies = [ + "cfg-if", + "fastrand 2.0.1", + "redox_syscall", + "rustix", + "windows-sys", +] + [[package]] name = "termtree" version = "0.4.1" @@ -3188,6 +3420,16 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.1.0" @@ -3254,6 +3496,46 @@ dependencies = [ "serde", ] +[[package]] +name = "tonic" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64", + "bytes", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-build" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d021fc044c18582b9a2408cd0dd05b1596e3ecdb5c4df822bb0183545683889" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn 2.0.39", +] + [[package]] name = "tower" version = "0.4.13" @@ -3262,9 +3544,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap 1.9.3", "pin-project", "pin-project-lite", + "rand 0.8.5", + "slab", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -3543,6 +3829,18 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "which" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" +dependencies = [ + "either", + "home", + "once_cell", + "rustix", +] + [[package]] name = "wildmatch" version = "2.1.1" diff --git a/block-streamer/Cargo.toml b/block-streamer/Cargo.toml index 2335b81f5..93d9801d3 100644 --- a/block-streamer/Cargo.toml +++ b/block-streamer/Cargo.toml @@ -3,6 +3,10 @@ name = "block-streamer" version = "0.1.0" edition = "2021" +[[bin]] +name = "routeguide-client" +path = "src/client.rs" + [dependencies] anyhow = "1.0.57" async-trait = "0.1.74" @@ -15,6 +19,7 @@ borsh = "0.10.2" chrono = "0.4.25" futures = "0.3.5" mockall = "0.11.4" +prost = "0.12.3" redis = { version = "0.21.5", features = ["tokio-comp", "connection-manager"] } serde = { version = "1", features = ["derive"] } serde_json = "1.0.55" @@ -23,6 +28,12 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } tokio = { version = "1.28.0", features = ["rt-multi-thread"]} tokio-util = "0.7.10" tokio-stream = "0.1.14" +tonic = "0.10.2" wildmatch = "2.1.1" near-lake-framework = "0.7.1" +async-stream = "0.3.5" +rand = "0.8.5" + +[build-dependencies] +tonic-build = "0.10" diff --git a/block-streamer/build.rs b/block-streamer/build.rs new file mode 100644 index 000000000..20f973625 --- /dev/null +++ b/block-streamer/build.rs @@ -0,0 +1,6 @@ +fn main() -> Result<(), Box> { + tonic_build::compile_protos("proto/route_guide.proto") + .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); + + Ok(()) +} diff --git a/block-streamer/data/route_guide_db.json b/block-streamer/data/route_guide_db.json new file mode 100644 index 000000000..22e93e313 --- /dev/null +++ b/block-streamer/data/route_guide_db.json @@ -0,0 +1,702 @@ +[ + { + "location": { + "latitude": 407838351, + "longitude": -746143763 + }, + "name": "Patriots Path, Mendham, NJ 07945, USA" + }, + { + "location": { + "latitude": 408122808, + "longitude": -743999179 + }, + "name": "101 New Jersey 10, Whippany, NJ 07981, USA" + }, + { + "location": { + "latitude": 413628156, + "longitude": -749015468 + }, + "name": "U.S. 6, Shohola, PA 18458, USA" + }, + { + "location": { + "latitude": 419999544, + "longitude": -740371136 + }, + "name": "5 Conners Road, Kingston, NY 12401, USA" + }, + { + "location": { + "latitude": 414008389, + "longitude": -743951297 + }, + "name": "Mid Hudson Psychiatric Center, New Hampton, NY 10958, USA" + }, + { + "location": { + "latitude": 419611318, + "longitude": -746524769 + }, + "name": "287 Flugertown Road, Livingston Manor, NY 12758, USA" + }, + { + "location": { + "latitude": 406109563, + "longitude": -742186778 + }, + "name": "4001 Tremley Point Road, Linden, NJ 07036, USA" + }, + { + "location": { + "latitude": 416802456, + "longitude": -742370183 + }, + "name": "352 South Mountain Road, Wallkill, NY 12589, USA" + }, + { + "location": { + "latitude": 412950425, + "longitude": -741077389 + }, + "name": "Bailey Turn Road, Harriman, NY 10926, USA" + }, + { + "location": { + "latitude": 412144655, + "longitude": -743949739 + }, + "name": "193-199 Wawayanda Road, Hewitt, NJ 07421, USA" + }, + { + "location": { + "latitude": 415736605, + "longitude": -742847522 + }, + "name": "406-496 Ward Avenue, Pine Bush, NY 12566, USA" + }, + { + "location": { + "latitude": 413843930, + "longitude": -740501726 + }, + "name": "162 Merrill Road, Highland Mills, NY 10930, USA" + }, + { + "location": { + "latitude": 410873075, + "longitude": -744459023 + }, + "name": "Clinton Road, West Milford, NJ 07480, USA" + }, + { + "location": { + "latitude": 412346009, + "longitude": -744026814 + }, + "name": "16 Old Brook Lane, Warwick, NY 10990, USA" + }, + { + "location": { + "latitude": 402948455, + "longitude": -747903913 + }, + "name": "3 Drake Lane, Pennington, NJ 08534, USA" + }, + { + "location": { + "latitude": 406337092, + "longitude": -740122226 + }, + "name": "6324 8th Avenue, Brooklyn, NY 11220, USA" + }, + { + "location": { + "latitude": 406421967, + "longitude": -747727624 + }, + "name": "1 Merck Access Road, Whitehouse Station, NJ 08889, USA" + }, + { + "location": { + "latitude": 416318082, + "longitude": -749677716 + }, + "name": "78-98 Schalck Road, Narrowsburg, NY 12764, USA" + }, + { + "location": { + "latitude": 415301720, + "longitude": -748416257 + }, + "name": "282 Lakeview Drive Road, Highland Lake, NY 12743, USA" + }, + { + "location": { + "latitude": 402647019, + "longitude": -747071791 + }, + "name": "330 Evelyn Avenue, Hamilton Township, NJ 08619, USA" + }, + { + "location": { + "latitude": 412567807, + "longitude": -741058078 + }, + "name": "New York State Reference Route 987E, Southfields, NY 10975, USA" + }, + { + "location": { + "latitude": 416855156, + "longitude": -744420597 + }, + "name": "103-271 Tempaloni Road, Ellenville, NY 12428, USA" + }, + { + "location": { + "latitude": 404663628, + "longitude": -744820157 + }, + "name": "1300 Airport Road, North Brunswick Township, NJ 08902, USA" + }, + { + "location": { + "latitude": 407113723, + "longitude": -749746483 + }, + "name": "" + }, + { + "location": { + "latitude": 402133926, + "longitude": -743613249 + }, + "name": "" + }, + { + "location": { + "latitude": 400273442, + "longitude": -741220915 + }, + "name": "" + }, + { + "location": { + "latitude": 411236786, + "longitude": -744070769 + }, + "name": "" + }, + { + "location": { + "latitude": 411633782, + "longitude": -746784970 + }, + "name": "211-225 Plains Road, Augusta, NJ 07822, USA" + }, + { + "location": { + "latitude": 415830701, + "longitude": -742952812 + }, + "name": "" + }, + { + "location": { + "latitude": 413447164, + "longitude": -748712898 + }, + "name": "165 Pedersen Ridge Road, Milford, PA 18337, USA" + }, + { + "location": { + "latitude": 405047245, + "longitude": -749800722 + }, + "name": "100-122 Locktown Road, Frenchtown, NJ 08825, USA" + }, + { + "location": { + "latitude": 418858923, + "longitude": -746156790 + }, + "name": "" + }, + { + "location": { + "latitude": 417951888, + "longitude": -748484944 + }, + "name": "650-652 Willi Hill Road, Swan Lake, NY 12783, USA" + }, + { + "location": { + "latitude": 407033786, + "longitude": -743977337 + }, + "name": "26 East 3rd Street, New Providence, NJ 07974, USA" + }, + { + "location": { + "latitude": 417548014, + "longitude": -740075041 + }, + "name": "" + }, + { + "location": { + "latitude": 410395868, + "longitude": -744972325 + }, + "name": "" + }, + { + "location": { + "latitude": 404615353, + "longitude": -745129803 + }, + "name": "" + }, + { + "location": { + "latitude": 406589790, + "longitude": -743560121 + }, + "name": "611 Lawrence Avenue, Westfield, NJ 07090, USA" + }, + { + "location": { + "latitude": 414653148, + "longitude": -740477477 + }, + "name": "18 Lannis Avenue, New Windsor, NY 12553, USA" + }, + { + "location": { + "latitude": 405957808, + "longitude": -743255336 + }, + "name": "82-104 Amherst Avenue, Colonia, NJ 07067, USA" + }, + { + "location": { + "latitude": 411733589, + "longitude": -741648093 + }, + "name": "170 Seven Lakes Drive, Sloatsburg, NY 10974, USA" + }, + { + "location": { + "latitude": 412676291, + "longitude": -742606606 + }, + "name": "1270 Lakes Road, Monroe, NY 10950, USA" + }, + { + "location": { + "latitude": 409224445, + "longitude": -748286738 + }, + "name": "509-535 Alphano Road, Great Meadows, NJ 07838, USA" + }, + { + "location": { + "latitude": 406523420, + "longitude": -742135517 + }, + "name": "652 Garden Street, Elizabeth, NJ 07202, USA" + }, + { + "location": { + "latitude": 401827388, + "longitude": -740294537 + }, + "name": "349 Sea Spray Court, Neptune City, NJ 07753, USA" + }, + { + "location": { + "latitude": 410564152, + "longitude": -743685054 + }, + "name": "13-17 Stanley Street, West Milford, NJ 07480, USA" + }, + { + "location": { + "latitude": 408472324, + "longitude": -740726046 + }, + "name": "47 Industrial Avenue, Teterboro, NJ 07608, USA" + }, + { + "location": { + "latitude": 412452168, + "longitude": -740214052 + }, + "name": "5 White Oak Lane, Stony Point, NY 10980, USA" + }, + { + "location": { + "latitude": 409146138, + "longitude": -746188906 + }, + "name": "Berkshire Valley Management Area Trail, Jefferson, NJ, USA" + }, + { + "location": { + "latitude": 404701380, + "longitude": -744781745 + }, + "name": "1007 Jersey Avenue, New Brunswick, NJ 08901, USA" + }, + { + "location": { + "latitude": 409642566, + "longitude": -746017679 + }, + "name": "6 East Emerald Isle Drive, Lake Hopatcong, NJ 07849, USA" + }, + { + "location": { + "latitude": 408031728, + "longitude": -748645385 + }, + "name": "1358-1474 New Jersey 57, Port Murray, NJ 07865, USA" + }, + { + "location": { + "latitude": 413700272, + "longitude": -742135189 + }, + "name": "367 Prospect Road, Chester, NY 10918, USA" + }, + { + "location": { + "latitude": 404310607, + "longitude": -740282632 + }, + "name": "10 Simon Lake Drive, Atlantic Highlands, NJ 07716, USA" + }, + { + "location": { + "latitude": 409319800, + "longitude": -746201391 + }, + "name": "11 Ward Street, Mount Arlington, NJ 07856, USA" + }, + { + "location": { + "latitude": 406685311, + "longitude": -742108603 + }, + "name": "300-398 Jefferson Avenue, Elizabeth, NJ 07201, USA" + }, + { + "location": { + "latitude": 419018117, + "longitude": -749142781 + }, + "name": "43 Dreher Road, Roscoe, NY 12776, USA" + }, + { + "location": { + "latitude": 412856162, + "longitude": -745148837 + }, + "name": "Swan Street, Pine Island, NY 10969, USA" + }, + { + "location": { + "latitude": 416560744, + "longitude": -746721964 + }, + "name": "66 Pleasantview Avenue, Monticello, NY 12701, USA" + }, + { + "location": { + "latitude": 405314270, + "longitude": -749836354 + }, + "name": "" + }, + { + "location": { + "latitude": 414219548, + "longitude": -743327440 + }, + "name": "" + }, + { + "location": { + "latitude": 415534177, + "longitude": -742900616 + }, + "name": "565 Winding Hills Road, Montgomery, NY 12549, USA" + }, + { + "location": { + "latitude": 406898530, + "longitude": -749127080 + }, + "name": "231 Rocky Run Road, Glen Gardner, NJ 08826, USA" + }, + { + "location": { + "latitude": 407586880, + "longitude": -741670168 + }, + "name": "100 Mount Pleasant Avenue, Newark, NJ 07104, USA" + }, + { + "location": { + "latitude": 400106455, + "longitude": -742870190 + }, + "name": "517-521 Huntington Drive, Manchester Township, NJ 08759, USA" + }, + { + "location": { + "latitude": 400066188, + "longitude": -746793294 + }, + "name": "" + }, + { + "location": { + "latitude": 418803880, + "longitude": -744102673 + }, + "name": "40 Mountain Road, Napanoch, NY 12458, USA" + }, + { + "location": { + "latitude": 414204288, + "longitude": -747895140 + }, + "name": "" + }, + { + "location": { + "latitude": 414777405, + "longitude": -740615601 + }, + "name": "" + }, + { + "location": { + "latitude": 415464475, + "longitude": -747175374 + }, + "name": "48 North Road, Forestburgh, NY 12777, USA" + }, + { + "location": { + "latitude": 404062378, + "longitude": -746376177 + }, + "name": "" + }, + { + "location": { + "latitude": 405688272, + "longitude": -749285130 + }, + "name": "" + }, + { + "location": { + "latitude": 400342070, + "longitude": -748788996 + }, + "name": "" + }, + { + "location": { + "latitude": 401809022, + "longitude": -744157964 + }, + "name": "" + }, + { + "location": { + "latitude": 404226644, + "longitude": -740517141 + }, + "name": "9 Thompson Avenue, Leonardo, NJ 07737, USA" + }, + { + "location": { + "latitude": 410322033, + "longitude": -747871659 + }, + "name": "" + }, + { + "location": { + "latitude": 407100674, + "longitude": -747742727 + }, + "name": "" + }, + { + "location": { + "latitude": 418811433, + "longitude": -741718005 + }, + "name": "213 Bush Road, Stone Ridge, NY 12484, USA" + }, + { + "location": { + "latitude": 415034302, + "longitude": -743850945 + }, + "name": "" + }, + { + "location": { + "latitude": 411349992, + "longitude": -743694161 + }, + "name": "" + }, + { + "location": { + "latitude": 404839914, + "longitude": -744759616 + }, + "name": "1-17 Bergen Court, New Brunswick, NJ 08901, USA" + }, + { + "location": { + "latitude": 414638017, + "longitude": -745957854 + }, + "name": "35 Oakland Valley Road, Cuddebackville, NY 12729, USA" + }, + { + "location": { + "latitude": 412127800, + "longitude": -740173578 + }, + "name": "" + }, + { + "location": { + "latitude": 401263460, + "longitude": -747964303 + }, + "name": "" + }, + { + "location": { + "latitude": 412843391, + "longitude": -749086026 + }, + "name": "" + }, + { + "location": { + "latitude": 418512773, + "longitude": -743067823 + }, + "name": "" + }, + { + "location": { + "latitude": 404318328, + "longitude": -740835638 + }, + "name": "42-102 Main Street, Belford, NJ 07718, USA" + }, + { + "location": { + "latitude": 419020746, + "longitude": -741172328 + }, + "name": "" + }, + { + "location": { + "latitude": 404080723, + "longitude": -746119569 + }, + "name": "" + }, + { + "location": { + "latitude": 401012643, + "longitude": -744035134 + }, + "name": "" + }, + { + "location": { + "latitude": 404306372, + "longitude": -741079661 + }, + "name": "" + }, + { + "location": { + "latitude": 403966326, + "longitude": -748519297 + }, + "name": "" + }, + { + "location": { + "latitude": 405002031, + "longitude": -748407866 + }, + "name": "" + }, + { + "location": { + "latitude": 409532885, + "longitude": -742200683 + }, + "name": "" + }, + { + "location": { + "latitude": 416851321, + "longitude": -742674555 + }, + "name": "" + }, + { + "location": { + "latitude": 406411633, + "longitude": -741722051 + }, + "name": "3387 Richmond Terrace, Staten Island, NY 10303, USA" + }, + { + "location": { + "latitude": 413069058, + "longitude": -744597778 + }, + "name": "261 Van Sickle Road, Goshen, NY 10924, USA" + }, + { + "location": { + "latitude": 418465462, + "longitude": -746859398 + }, + "name": "" + }, + { + "location": { + "latitude": 411733222, + "longitude": -744228360 + }, + "name": "" + }, + { + "location": { + "latitude": 410248224, + "longitude": -747127767 + }, + "name": "3 Hasta Way, Newton, NJ 07860, USA" + } +] diff --git a/block-streamer/proto/route_guide.proto b/block-streamer/proto/route_guide.proto new file mode 100644 index 000000000..224f22024 --- /dev/null +++ b/block-streamer/proto/route_guide.proto @@ -0,0 +1,106 @@ +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package routeguide; + +// Interface exported by the server. +service RouteGuide { + // A simple RPC. + // + // Obtains the feature at a given position. + // + // A feature with an empty name is returned if there's no feature at the given + // position. + rpc GetFeature(Point) returns (Feature) {} + + // A server-to-client streaming RPC. + // + // Obtains the Features available within the given Rectangle. Results are + // streamed rather than returned at once (e.g. in a response message with a + // repeated field), as the rectangle may cover a large area and contain a + // huge number of features. + rpc ListFeatures(Rectangle) returns (stream Feature) {} + + // A client-to-server streaming RPC. + // + // Accepts a stream of Points on a route being traversed, returning a + // RouteSummary when traversal is completed. + rpc RecordRoute(stream Point) returns (RouteSummary) {} + + // A Bidirectional streaming RPC. + // + // Accepts a stream of RouteNotes sent while a route is being traversed, + // while receiving other RouteNotes (e.g. from other users). + rpc RouteChat(stream RouteNote) returns (stream RouteNote) {} +} + +// Points are represented as latitude-longitude pairs in the E7 representation +// (degrees multiplied by 10**7 and rounded to the nearest integer). +// Latitudes should be in the range +/- 90 degrees and longitude should be in +// the range +/- 180 degrees (inclusive). +message Point { + int32 latitude = 1; + int32 longitude = 2; +} + +// A latitude-longitude rectangle, represented as two diagonally opposite +// points "lo" and "hi". +message Rectangle { + // One corner of the rectangle. + Point lo = 1; + + // The other corner of the rectangle. + Point hi = 2; +} + +// A feature names something at a given point. +// +// If a feature could not be named, the name is empty. +message Feature { + // The name of the feature. + string name = 1; + + // The point where the feature is detected. + Point location = 2; +} + +// A RouteNote is a message sent while at a given point. +message RouteNote { + // The location from which the message is sent. + Point location = 1; + + // The message to be sent. + string message = 2; +} + +// A RouteSummary is received in response to a RecordRoute rpc. +// +// It contains the number of individual points received, the number of +// detected features, and the total distance covered as the cumulative sum of +// the distance between each point. +message RouteSummary { + // The number of points received. + int32 point_count = 1; + + // The number of known features passed while traversing the route. + int32 feature_count = 2; + + // The distance covered in metres. + int32 distance = 3; + + // The duration of the traversal in seconds. + int32 elapsed_time = 4; +} diff --git a/block-streamer/src/client.rs b/block-streamer/src/client.rs new file mode 100644 index 000000000..662dfccad --- /dev/null +++ b/block-streamer/src/client.rs @@ -0,0 +1,124 @@ +use std::error::Error; +use std::time::Duration; + +use rand::rngs::ThreadRng; +use rand::Rng; +use tokio::time; +use tonic::transport::Channel; +use tonic::Request; + +use routeguide::route_guide_client::RouteGuideClient; +use routeguide::{Point, Rectangle, RouteNote}; + +pub mod routeguide { + tonic::include_proto!("routeguide"); +} + +async fn print_features(client: &mut RouteGuideClient) -> Result<(), Box> { + let rectangle = Rectangle { + lo: Some(Point { + latitude: 400_000_000, + longitude: -750_000_000, + }), + hi: Some(Point { + latitude: 420_000_000, + longitude: -730_000_000, + }), + }; + + let mut stream = client + .list_features(Request::new(rectangle)) + .await? + .into_inner(); + + while let Some(feature) = stream.message().await? { + println!("NOTE = {:?}", feature); + } + + Ok(()) +} + +async fn run_record_route(client: &mut RouteGuideClient) -> Result<(), Box> { + let mut rng = rand::thread_rng(); + let point_count: i32 = rng.gen_range(2..100); + + let mut points = vec![]; + for _ in 0..=point_count { + points.push(random_point(&mut rng)) + } + + println!("Traversing {} points", points.len()); + let request = Request::new(tokio_stream::iter(points)); + + match client.record_route(request).await { + Ok(response) => println!("SUMMARY: {:?}", response.into_inner()), + Err(e) => println!("something went wrong: {:?}", e), + } + + Ok(()) +} + +async fn run_route_chat(client: &mut RouteGuideClient) -> Result<(), Box> { + let start = time::Instant::now(); + + let outbound = async_stream::stream! { + let mut interval = time::interval(Duration::from_secs(1)); + + loop { + let time = interval.tick().await; + let elapsed = time.duration_since(start); + let note = RouteNote { + location: Some(Point { + latitude: 409146138 + elapsed.as_secs() as i32, + longitude: -746188906, + }), + message: format!("at {:?}", elapsed), + }; + + yield note; + } + }; + + let response = client.route_chat(Request::new(outbound)).await?; + let mut inbound = response.into_inner(); + + while let Some(note) = inbound.message().await? { + println!("NOTE = {:?}", note); + } + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut client = RouteGuideClient::connect("http://[::1]:10000").await?; + + println!("*** SIMPLE RPC ***"); + let response = client + .get_feature(Request::new(Point { + latitude: 409_146_138, + longitude: -746_188_906, + })) + .await?; + println!("RESPONSE = {:?}", response); + + println!("\n*** SERVER STREAMING ***"); + print_features(&mut client).await?; + + println!("\n*** CLIENT STREAMING ***"); + run_record_route(&mut client).await?; + + println!("\n*** BIDIRECTIONAL STREAMING ***"); + run_route_chat(&mut client).await?; + + Ok(()) +} + +fn random_point(rng: &mut ThreadRng) -> Point { + let latitude = (rng.gen_range(0..180) - 90) * 10_000_000; + let longitude = (rng.gen_range(0..360) - 180) * 10_000_000; + Point { + latitude, + longitude, + } +} diff --git a/block-streamer/src/data.rs b/block-streamer/src/data.rs new file mode 100644 index 000000000..bbff6c672 --- /dev/null +++ b/block-streamer/src/data.rs @@ -0,0 +1,34 @@ +use serde::Deserialize; +use std::fs::File; + +#[derive(Debug, Deserialize)] +struct Feature { + location: Location, + name: String, +} + +#[derive(Debug, Deserialize)] +struct Location { + latitude: i32, + longitude: i32, +} + +#[allow(dead_code)] +pub fn load() -> Vec { + let data_dir = std::path::PathBuf::from_iter([std::env!("CARGO_MANIFEST_DIR"), "data"]); + let file = File::open(data_dir.join("route_guide_db.json")).expect("failed to open data file"); + + let decoded: Vec = + serde_json::from_reader(&file).expect("failed to deserialize features"); + + decoded + .into_iter() + .map(|feature| crate::routeguide::Feature { + name: feature.name, + location: Some(crate::routeguide::Point { + longitude: feature.location.longitude, + latitude: feature.location.latitude, + }), + }) + .collect() +} diff --git a/block-streamer/src/main.rs b/block-streamer/src/main.rs index c6efd3b51..222d65ccf 100644 --- a/block-streamer/src/main.rs +++ b/block-streamer/src/main.rs @@ -1,15 +1,26 @@ use tracing_subscriber::prelude::*; -use crate::indexer_config::IndexerConfig; -use crate::rules::types::indexer_rule_match::ChainId; -use crate::rules::{IndexerRule, IndexerRuleKind, MatchingRule, Status}; +// use crate::indexer_config::IndexerConfig; +// use crate::rules::types::indexer_rule_match::ChainId; +// use crate::rules::{IndexerRule, IndexerRuleKind, MatchingRule, Status}; + +use std::sync::Arc; + +use routeguide::route_guide_server::RouteGuideServer; +use tonic::transport::Server; mod block_stream; +mod data; mod delta_lake_client; mod indexer_config; mod redis; mod rules; mod s3_client; +mod server; + +pub mod routeguide { + tonic::include_proto!("routeguide"); +} pub(crate) const LOG_TARGET: &str = "block_streamer"; @@ -22,47 +33,61 @@ async fn main() -> anyhow::Result<()> { tracing::info!("Starting {}", crate::LOG_TARGET); - let redis_connection_manager = redis::connect("redis://127.0.0.1").await?; - - let aws_config = aws_config::from_env().load().await; - let s3_client = crate::s3_client::S3Client::new(&aws_config); + let addr = "[::1]:10000".parse().unwrap(); - let delta_lake_client = crate::delta_lake_client::DeltaLakeClient::new(s3_client); + println!("RouteGuideServer listening on: {}", addr); - let contract = "queryapi.dataplatform.near"; - let matching_rule = MatchingRule::ActionAny { - affected_account_id: contract.to_string(), - status: Status::Any, + let route_guide = crate::server::RouteGuideService { + features: Arc::new(data::load()), }; - let filter_rule = IndexerRule { - indexer_rule_kind: IndexerRuleKind::Action, - matching_rule, - id: None, - name: None, - }; - let indexer = IndexerConfig { - account_id: "buildnear.testnet".to_string().parse().unwrap(), - function_name: "index_stuff".to_string(), - code: "".to_string(), - start_block_height: Some(85376002), - schema: None, - provisioned: false, - indexer_rule: filter_rule, - }; - - let mut streamer = block_stream::BlockStream::new(); - streamer.start( - 106000000, - indexer, - redis_connection_manager, - delta_lake_client, - ChainId::Mainnet, - )?; + let svc = RouteGuideServer::new(route_guide); - streamer.take_handle().unwrap().await??; - - println!("done"); + Server::builder().add_service(svc).serve(addr).await?; Ok(()) + + // let redis_connection_manager = redis::connect("redis://127.0.0.1").await?; + // + // let aws_config = aws_config::from_env().load().await; + // let s3_client = crate::s3_client::S3Client::new(&aws_config); + // + // let delta_lake_client = crate::delta_lake_client::DeltaLakeClient::new(s3_client); + // + // let contract = "queryapi.dataplatform.near"; + // let matching_rule = MatchingRule::ActionAny { + // affected_account_id: contract.to_string(), + // status: Status::Any, + // }; + // let filter_rule = IndexerRule { + // indexer_rule_kind: IndexerRuleKind::Action, + // matching_rule, + // id: None, + // name: None, + // }; + // let indexer = IndexerConfig { + // account_id: "buildnear.testnet".to_string().parse().unwrap(), + // function_name: "index_stuff".to_string(), + // code: "".to_string(), + // start_block_height: Some(85376002), + // schema: None, + // provisioned: false, + // indexer_rule: filter_rule, + // }; + // + // let mut streamer = block_stream::BlockStream::new(); + // + // streamer.start( + // 106000000, + // indexer, + // redis_connection_manager, + // delta_lake_client, + // ChainId::Mainnet, + // )?; + // + // streamer.take_handle().unwrap().await??; + // + // println!("done"); + // + // Ok(()) } diff --git a/block-streamer/src/server.rs b/block-streamer/src/server.rs new file mode 100644 index 000000000..4b5c1e2e7 --- /dev/null +++ b/block-streamer/src/server.rs @@ -0,0 +1,188 @@ +use std::collections::HashMap; +use std::hash::{Hash, Hasher}; +use std::pin::Pin; +use std::sync::Arc; +use std::time::Instant; + +use tokio::sync::mpsc; +use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; +use tonic::{Request, Response, Status}; + +use crate::routeguide::route_guide_server::RouteGuide; +use crate::routeguide::{Feature, Point, Rectangle, RouteNote, RouteSummary}; + +#[derive(Debug)] +pub struct RouteGuideService { + pub features: Arc>, +} + +impl RouteGuideService { + pub fn new() -> Self { + Self { + features: Arc::new(crate::data::load()), + } + } +} + +#[tonic::async_trait] +impl RouteGuide for RouteGuideService { + async fn get_feature(&self, request: Request) -> Result, Status> { + println!("GetFeature = {:?}", request); + + for feature in &self.features[..] { + if feature.location.as_ref() == Some(request.get_ref()) { + return Ok(Response::new(feature.clone())); + } + } + + Ok(Response::new(Feature::default())) + } + + type ListFeaturesStream = ReceiverStream>; + + async fn list_features( + &self, + request: Request, + ) -> Result, Status> { + println!("ListFeatures = {:?}", request); + + let (tx, rx) = mpsc::channel(4); + let features = self.features.clone(); + + tokio::spawn(async move { + for feature in &features[..] { + if in_range(feature.location.as_ref().unwrap(), request.get_ref()) { + println!(" => send {:?}", feature); + tx.send(Ok(feature.clone())).await.unwrap(); + } + } + + println!(" /// done sending"); + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } + + async fn record_route( + &self, + request: Request>, + ) -> Result, Status> { + println!("RecordRoute"); + + let mut stream = request.into_inner(); + + let mut summary = RouteSummary::default(); + let mut last_point = None; + let now = Instant::now(); + + while let Some(point) = stream.next().await { + let point = point?; + + println!(" ==> Point = {:?}", point); + + // Increment the point count + summary.point_count += 1; + + // Find features + for feature in &self.features[..] { + if feature.location.as_ref() == Some(&point) { + summary.feature_count += 1; + } + } + + // Calculate the distance + if let Some(ref last_point) = last_point { + summary.distance += calc_distance(last_point, &point); + } + + last_point = Some(point); + } + + summary.elapsed_time = now.elapsed().as_secs() as i32; + + Ok(Response::new(summary)) + } + + type RouteChatStream = Pin> + Send + 'static>>; + + async fn route_chat( + &self, + request: Request>, + ) -> Result, Status> { + println!("RouteChat"); + + let mut notes = HashMap::new(); + let mut stream = request.into_inner(); + + let output = async_stream::try_stream! { + while let Some(note) = stream.next().await { + let note = note?; + + let location = note.location.clone().unwrap(); + + let location_notes = notes.entry(location).or_insert(vec![]); + location_notes.push(note); + + for note in location_notes { + yield note.clone(); + } + } + }; + + Ok(Response::new(Box::pin(output) as Self::RouteChatStream)) + } +} + +impl Hash for Point { + fn hash(&self, state: &mut H) + where + H: Hasher, + { + self.latitude.hash(state); + self.longitude.hash(state); + } +} + +impl Eq for Point {} + +fn in_range(point: &Point, rect: &Rectangle) -> bool { + use std::cmp; + + let lo = rect.lo.as_ref().unwrap(); + let hi = rect.hi.as_ref().unwrap(); + + let left = cmp::min(lo.longitude, hi.longitude); + let right = cmp::max(lo.longitude, hi.longitude); + let top = cmp::max(lo.latitude, hi.latitude); + let bottom = cmp::min(lo.latitude, hi.latitude); + + point.longitude >= left + && point.longitude <= right + && point.latitude >= bottom + && point.latitude <= top +} + +/// Calculates the distance between two points using the "haversine" formula. +/// This code was taken from http://www.movable-type.co.uk/scripts/latlong.html. +fn calc_distance(p1: &Point, p2: &Point) -> i32 { + const CORD_FACTOR: f64 = 1e7; + const R: f64 = 6_371_000.0; // meters + + let lat1 = p1.latitude as f64 / CORD_FACTOR; + let lat2 = p2.latitude as f64 / CORD_FACTOR; + let lng1 = p1.longitude as f64 / CORD_FACTOR; + let lng2 = p2.longitude as f64 / CORD_FACTOR; + + let lat_rad1 = lat1.to_radians(); + let lat_rad2 = lat2.to_radians(); + + let delta_lat = (lat2 - lat1).to_radians(); + let delta_lng = (lng2 - lng1).to_radians(); + + let a = (delta_lat / 2f64).sin() * (delta_lat / 2f64).sin() + + (lat_rad1).cos() * (lat_rad2).cos() * (delta_lng / 2f64).sin() * (delta_lng / 2f64).sin(); + + let c = 2f64 * a.sqrt().atan2((1f64 - a).sqrt()); + + (R * c) as i32 +} From 08db0e0d15d84c652094e31ea7e37d41f1850eef Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Tue, 28 Nov 2023 13:56:01 +1300 Subject: [PATCH 02/17] refactor: Encapsulate all server logic to own folder --- block-streamer/src/data.rs | 6 ++--- block-streamer/src/main.rs | 25 +++---------------- block-streamer/src/server/mod.rs | 23 +++++++++++++++++ .../route_guide_service.rs} | 4 +-- 4 files changed, 31 insertions(+), 27 deletions(-) create mode 100644 block-streamer/src/server/mod.rs rename block-streamer/src/{server.rs => server/route_guide_service.rs} (97%) diff --git a/block-streamer/src/data.rs b/block-streamer/src/data.rs index bbff6c672..86ee0969b 100644 --- a/block-streamer/src/data.rs +++ b/block-streamer/src/data.rs @@ -14,7 +14,7 @@ struct Location { } #[allow(dead_code)] -pub fn load() -> Vec { +pub fn load() -> Vec { let data_dir = std::path::PathBuf::from_iter([std::env!("CARGO_MANIFEST_DIR"), "data"]); let file = File::open(data_dir.join("route_guide_db.json")).expect("failed to open data file"); @@ -23,9 +23,9 @@ pub fn load() -> Vec { decoded .into_iter() - .map(|feature| crate::routeguide::Feature { + .map(|feature| crate::server::routeguide::Feature { name: feature.name, - location: Some(crate::routeguide::Point { + location: Some(crate::server::routeguide::Point { longitude: feature.location.longitude, latitude: feature.location.latitude, }), diff --git a/block-streamer/src/main.rs b/block-streamer/src/main.rs index 222d65ccf..089f1278d 100644 --- a/block-streamer/src/main.rs +++ b/block-streamer/src/main.rs @@ -4,11 +4,6 @@ use tracing_subscriber::prelude::*; // use crate::rules::types::indexer_rule_match::ChainId; // use crate::rules::{IndexerRule, IndexerRuleKind, MatchingRule, Status}; -use std::sync::Arc; - -use routeguide::route_guide_server::RouteGuideServer; -use tonic::transport::Server; - mod block_stream; mod data; mod delta_lake_client; @@ -18,12 +13,6 @@ mod rules; mod s3_client; mod server; -pub mod routeguide { - tonic::include_proto!("routeguide"); -} - -pub(crate) const LOG_TARGET: &str = "block_streamer"; - #[tokio::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::registry() @@ -31,19 +20,11 @@ async fn main() -> anyhow::Result<()> { .with(tracing_subscriber::EnvFilter::from_default_env()) .init(); - tracing::info!("Starting {}", crate::LOG_TARGET); - - let addr = "[::1]:10000".parse().unwrap(); - - println!("RouteGuideServer listening on: {}", addr); - - let route_guide = crate::server::RouteGuideService { - features: Arc::new(data::load()), - }; + std::env::var("CHAIN_ID").expect("expected`CHAIN_ID` to be set in the environment"); - let svc = RouteGuideServer::new(route_guide); + tracing::info!("Starting Block Streamer Service..."); - Server::builder().add_service(svc).serve(addr).await?; + server::init().await?; Ok(()) diff --git a/block-streamer/src/server/mod.rs b/block-streamer/src/server/mod.rs new file mode 100644 index 000000000..a94b832ca --- /dev/null +++ b/block-streamer/src/server/mod.rs @@ -0,0 +1,23 @@ +use tonic::transport::Server; + +mod route_guide_service; + +pub mod routeguide { + tonic::include_proto!("routeguide"); +} + +pub async fn init() -> anyhow::Result<()> { + let addr = "[::1]:10000".parse().unwrap(); + + println!("RouteGuideServer listening on: {}", addr); + + let route_guide_service = route_guide_service::RouteGuideService::new(); + let route_guide_server = + routeguide::route_guide_server::RouteGuideServer::new(route_guide_service); + + Server::builder() + .add_service(route_guide_server) + .serve(addr) + .await + .map_err(|err| err.into()) +} diff --git a/block-streamer/src/server.rs b/block-streamer/src/server/route_guide_service.rs similarity index 97% rename from block-streamer/src/server.rs rename to block-streamer/src/server/route_guide_service.rs index 4b5c1e2e7..dda352f89 100644 --- a/block-streamer/src/server.rs +++ b/block-streamer/src/server/route_guide_service.rs @@ -8,8 +8,8 @@ use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{Request, Response, Status}; -use crate::routeguide::route_guide_server::RouteGuide; -use crate::routeguide::{Feature, Point, Rectangle, RouteNote, RouteSummary}; +use crate::server::routeguide::route_guide_server::RouteGuide; +use crate::server::routeguide::{Feature, Point, Rectangle, RouteNote, RouteSummary}; #[derive(Debug)] pub struct RouteGuideService { From ece25cbb2d3bb6cc5ef4af4e31b69bf0ca7dcaeb Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Tue, 28 Nov 2023 14:49:39 +1300 Subject: [PATCH 03/17] feat: Add stubbed blockstreamer rpc --- block-streamer/build.rs | 1 + block-streamer/proto/block_streamer.proto | 60 +++++++++++++++++++ block-streamer/src/block_stream.rs | 2 + .../src/server/block_streamer_service.rs | 33 ++++++++++ block-streamer/src/server/mod.rs | 14 ++++- 5 files changed, 107 insertions(+), 3 deletions(-) create mode 100644 block-streamer/proto/block_streamer.proto create mode 100644 block-streamer/src/server/block_streamer_service.rs diff --git a/block-streamer/build.rs b/block-streamer/build.rs index 20f973625..a7a4eb570 100644 --- a/block-streamer/build.rs +++ b/block-streamer/build.rs @@ -1,6 +1,7 @@ fn main() -> Result<(), Box> { tonic_build::compile_protos("proto/route_guide.proto") .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); + tonic_build::compile_protos("proto/block_streamer.proto")?; Ok(()) } diff --git a/block-streamer/proto/block_streamer.proto b/block-streamer/proto/block_streamer.proto new file mode 100644 index 000000000..70ad73da7 --- /dev/null +++ b/block-streamer/proto/block_streamer.proto @@ -0,0 +1,60 @@ +syntax = "proto3"; + +package blockstreamer; + +// The BlockStreamer service provides RPCs to manage BlockStream instances. +service BlockStreamer { + // Starts a new BlockStream instance. + rpc StartStream (StartStreamRequest) returns (StartStreamResponse); + + // Stops an existing BlockStream instance. + rpc StopStream (StopStreamRequest) returns (StopStreamResponse); + + // Lists all current BlockStream instances. + rpc ListStreams (ListStreamsRequest) returns (ListStreamsResponse); +} + +// Request message for starting a BlockStream. +message StartStreamRequest { + // Parameters required to start a BlockStream. + int64 start_block_height = 1; + string indexer_config = 2; +} + +// Response message for starting a BlockStream. +message StartStreamResponse { + // ID or handle of the started BlockStream. + string stream_id = 1; +} + +// Request message for stopping a BlockStream. +message StopStreamRequest { + // ID or handle of the BlockStream to stop. + string stream_id = 1; +} + +// Response message for stopping a BlockStream. +message StopStreamResponse { + // Confirmation message or status. + string status = 1; +} + +// Request message for listing BlockStreams. +message ListStreamsRequest { + // Optional filters or parameters for listing streams. +} + +// Response message for listing BlockStreams. +message ListStreamsResponse { + // List of active BlockStreams. + repeated StreamInfo streams = 1; +} + +// Information about a single BlockStream instance. +message StreamInfo { + string stream_id = 1; + int64 start_block_height = 2; + string indexer_name = 3; + string chain_id = 4; + string status = 5; +} diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 39bb2b4e6..af8670444 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -12,6 +12,8 @@ pub struct Task { pub struct BlockStream { task: Option, + // indexer_config: IndexerConfig, + // chain_id: ChainId, } impl BlockStream { diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs new file mode 100644 index 000000000..2a2f9c17a --- /dev/null +++ b/block-streamer/src/server/block_streamer_service.rs @@ -0,0 +1,33 @@ +use tonic::{Request, Response, Status}; + +use crate::server::blockstreamer; + +#[derive(Debug)] +pub struct BlockStreamerService {} + +#[tonic::async_trait] +impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerService { + async fn start_stream( + &self, + request: Request, + ) -> Result, Status> { + println!("StartStream = {:?}", request); + Ok(Response::new(blockstreamer::StartStreamResponse::default())) + } + + async fn stop_stream( + &self, + request: Request, + ) -> Result, Status> { + println!("StopStream = {:?}", request); + Ok(Response::new(blockstreamer::StopStreamResponse::default())) + } + + async fn list_streams( + &self, + request: Request, + ) -> Result, Status> { + println!("ListStreams = {:?}", request); + Ok(Response::new(blockstreamer::ListStreamsResponse::default())) + } +} diff --git a/block-streamer/src/server/mod.rs b/block-streamer/src/server/mod.rs index a94b832ca..557081bf8 100644 --- a/block-streamer/src/server/mod.rs +++ b/block-streamer/src/server/mod.rs @@ -1,11 +1,14 @@ -use tonic::transport::Server; - +mod block_streamer_service; mod route_guide_service; pub mod routeguide { tonic::include_proto!("routeguide"); } +pub mod blockstreamer { + tonic::include_proto!("blockstreamer"); +} + pub async fn init() -> anyhow::Result<()> { let addr = "[::1]:10000".parse().unwrap(); @@ -15,8 +18,13 @@ pub async fn init() -> anyhow::Result<()> { let route_guide_server = routeguide::route_guide_server::RouteGuideServer::new(route_guide_service); - Server::builder() + let block_streamer_service = block_streamer_service::BlockStreamerService {}; + let block_streamer_server = + blockstreamer::block_streamer_server::BlockStreamerServer::new(block_streamer_service); + + tonic::transport::Server::builder() .add_service(route_guide_server) + .add_service(block_streamer_server) .serve(addr) .await .map_err(|err| err.into()) From fa4ad122709d149395125ab3da3670b2382d8ebf Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Wed, 29 Nov 2023 07:39:33 +1300 Subject: [PATCH 04/17] feat: Update blockstream start request proto --- block-streamer/proto/block_streamer.proto | 45 ++++++++++++++++--- block-streamer/src/client.rs | 38 +++++++++++----- .../src/server/block_streamer_service.rs | 36 ++++++++++++++- 3 files changed, 100 insertions(+), 19 deletions(-) diff --git a/block-streamer/proto/block_streamer.proto b/block-streamer/proto/block_streamer.proto index 70ad73da7..6ad4b35f0 100644 --- a/block-streamer/proto/block_streamer.proto +++ b/block-streamer/proto/block_streamer.proto @@ -4,21 +4,54 @@ package blockstreamer; // The BlockStreamer service provides RPCs to manage BlockStream instances. service BlockStreamer { - // Starts a new BlockStream instance. + // Starts a new BlockStream process. rpc StartStream (StartStreamRequest) returns (StartStreamResponse); - // Stops an existing BlockStream instance. + // Stops an existing BlockStream process. rpc StopStream (StopStreamRequest) returns (StopStreamResponse); - // Lists all current BlockStream instances. + // Lists all current BlockStream processes. rpc ListStreams (ListStreamsRequest) returns (ListStreamsResponse); } // Request message for starting a BlockStream. message StartStreamRequest { - // Parameters required to start a BlockStream. - int64 start_block_height = 1; - string indexer_config = 2; + // Which block height to start from. + uint64 start_block_height = 1; + // The account ID which the indexer is defined under + string account_id = 2; + // The name of the indexer + string function_name = 3; + // The filter rule to apply to incoming blocks + oneof rule { + ActionAnyRule action_any_rule = 4; + ActionFunctionCallRule action_function_call_rule = 5; + } +} + +// Match any action against the specified account +message ActionAnyRule { + // The account ID pattern to match against + string affected_account_id = 1; + // The status of the action to match against + Status status = 2; +} + +// Match a specific function call against the specified account +message ActionFunctionCallRule { + // The account ID pattern to match against + string affected_account_id = 1; + // The function name to match against + string function_name = 2; + // The status of the action to match against + Status status = 3; +} + +enum Status { + STATUS_UNSPECIFIED = 0; + STATUS_SUCCESS = 1; + STATUS_FAILURE = 2; + STATUS_ANY = 3; } // Response message for starting a BlockStream. diff --git a/block-streamer/src/client.rs b/block-streamer/src/client.rs index 662dfccad..8974d2c62 100644 --- a/block-streamer/src/client.rs +++ b/block-streamer/src/client.rs @@ -10,10 +10,18 @@ use tonic::Request; use routeguide::route_guide_client::RouteGuideClient; use routeguide::{Point, Rectangle, RouteNote}; +use blockstreamer::block_streamer_client::BlockStreamerClient; +// use blockstreamer::{IndexerRule, MatchingRule, StartStreamRequest}; +use blockstreamer::*; + pub mod routeguide { tonic::include_proto!("routeguide"); } +pub mod blockstreamer { + tonic::include_proto!("blockstreamer"); +} + async fn print_features(client: &mut RouteGuideClient) -> Result<(), Box> { let rectangle = Rectangle { lo: Some(Point { @@ -91,25 +99,31 @@ async fn run_route_chat(client: &mut RouteGuideClient) -> Result<(), Bo #[tokio::main] async fn main() -> Result<(), Box> { - let mut client = RouteGuideClient::connect("http://[::1]:10000").await?; + // let mut client = RouteGuideClient::connect("http://[::1]:10000").await?; + let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?; println!("*** SIMPLE RPC ***"); let response = client - .get_feature(Request::new(Point { - latitude: 409_146_138, - longitude: -746_188_906, + .start_stream(Request::new(StartStreamRequest { + start_block_height: 10101010, + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + rule: Some(start_stream_request::Rule::ActionAnyRule(ActionAnyRule { + affected_account_id: "token.sweat".to_string(), + status: Status::Success.into(), + })), })) .await?; println!("RESPONSE = {:?}", response); - println!("\n*** SERVER STREAMING ***"); - print_features(&mut client).await?; - - println!("\n*** CLIENT STREAMING ***"); - run_record_route(&mut client).await?; - - println!("\n*** BIDIRECTIONAL STREAMING ***"); - run_route_chat(&mut client).await?; + // println!("\n*** SERVER STREAMING ***"); + // print_features(&mut client).await?; + // + // println!("\n*** CLIENT STREAMING ***"); + // run_record_route(&mut client).await?; + // + // println!("\n*** BIDIRECTIONAL STREAMING ***"); + // run_route_chat(&mut client).await?; Ok(()) } diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 2a2f9c17a..92b972356 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -1,5 +1,10 @@ use tonic::{Request, Response, Status}; +use crate::indexer_config::IndexerConfig; +use crate::rules::types::indexer_rule_match::ChainId; +use crate::rules::{IndexerRule, IndexerRuleKind, MatchingRule}; + +use crate::block_stream; use crate::server::blockstreamer; #[derive(Debug)] @@ -11,7 +16,36 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic &self, request: Request, ) -> Result, Status> { - println!("StartStream = {:?}", request); + let request = request.into_inner(); + let matching_rule = match request.rule.unwrap() { + start_stream_request::Rule::ActionAnyRule(action_any_rule) => { + let affected_account_id = action_any_rule.affected_account_id; + let status = match action_any_rule.status { + _ => crate::rules::Status::Success, + }; + MatchingRule::ActionAny { + affected_account_id, + status, + } + } + _ => unimplemented!(), + }; + let filter_rule = IndexerRule { + indexer_rule_kind: IndexerRuleKind::Action, + matching_rule, + id: None, + name: None, + }; + let indexer = IndexerConfig { + account_id: request.account_id.parse().expect("Invalid account id"), + function_name: request.function_name, + code: "".to_string(), + start_block_height: Some(request.start_block_height), + schema: None, + provisioned: false, + indexer_rule: filter_rule, + }; + println!("StopStream = {:?}", indexer); Ok(Response::new(blockstreamer::StartStreamResponse::default())) } From bf66cb75557c649358524aec8fa0494d1af78ede Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Wed, 29 Nov 2023 07:40:17 +1300 Subject: [PATCH 05/17] feat: Inject necessary deps to `BlockStreamerService` --- block-streamer/src/main.rs | 10 ++++++++-- .../src/server/block_streamer_service.rs | 20 +++++++++++++++++-- block-streamer/src/server/mod.rs | 10 ++++++++-- 3 files changed, 34 insertions(+), 6 deletions(-) diff --git a/block-streamer/src/main.rs b/block-streamer/src/main.rs index 089f1278d..29eeea714 100644 --- a/block-streamer/src/main.rs +++ b/block-streamer/src/main.rs @@ -24,11 +24,17 @@ async fn main() -> anyhow::Result<()> { tracing::info!("Starting Block Streamer Service..."); - server::init().await?; + let redis_connection_manager = redis::connect("redis://127.0.0.1").await?; + + let aws_config = aws_config::from_env().load().await; + let s3_client = crate::s3_client::S3Client::new(&aws_config); + + let delta_lake_client = crate::delta_lake_client::DeltaLakeClient::new(s3_client); + + server::init(redis_connection_manager, delta_lake_client).await?; Ok(()) - // let redis_connection_manager = redis::connect("redis://127.0.0.1").await?; // // let aws_config = aws_config::from_env().load().await; // let s3_client = crate::s3_client::S3Client::new(&aws_config); diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 92b972356..e2621a27f 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -7,8 +7,24 @@ use crate::rules::{IndexerRule, IndexerRuleKind, MatchingRule}; use crate::block_stream; use crate::server::blockstreamer; -#[derive(Debug)] -pub struct BlockStreamerService {} +use blockstreamer::*; + +pub struct BlockStreamerService { + redis_connection_manager: crate::redis::ConnectionManager, + delta_lake_client: crate::delta_lake_client::DeltaLakeClient, +} + +impl BlockStreamerService { + pub fn new( + redis_connection_manager: crate::redis::ConnectionManager, + delta_lake_client: crate::delta_lake_client::DeltaLakeClient, + ) -> Self { + Self { + redis_connection_manager, + delta_lake_client, + } + } +} #[tonic::async_trait] impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerService { diff --git a/block-streamer/src/server/mod.rs b/block-streamer/src/server/mod.rs index 557081bf8..526d3a8bc 100644 --- a/block-streamer/src/server/mod.rs +++ b/block-streamer/src/server/mod.rs @@ -9,7 +9,10 @@ pub mod blockstreamer { tonic::include_proto!("blockstreamer"); } -pub async fn init() -> anyhow::Result<()> { +pub async fn init( + redis_connection_manager: crate::redis::ConnectionManager, + delta_lake_client: crate::delta_lake_client::DeltaLakeClient, +) -> anyhow::Result<()> { let addr = "[::1]:10000".parse().unwrap(); println!("RouteGuideServer listening on: {}", addr); @@ -18,7 +21,10 @@ pub async fn init() -> anyhow::Result<()> { let route_guide_server = routeguide::route_guide_server::RouteGuideServer::new(route_guide_service); - let block_streamer_service = block_streamer_service::BlockStreamerService {}; + let block_streamer_service = block_streamer_service::BlockStreamerService::new( + redis_connection_manager, + delta_lake_client, + ); let block_streamer_server = blockstreamer::block_streamer_server::BlockStreamerServer::new(block_streamer_service); From c964e7014609f9193569ae27ce5bf6008dfd23f3 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Wed, 29 Nov 2023 07:57:54 +1300 Subject: [PATCH 06/17] feat: Expose gRPC client types in lib --- block-streamer/src/lib.rs | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 block-streamer/src/lib.rs diff --git a/block-streamer/src/lib.rs b/block-streamer/src/lib.rs new file mode 100644 index 000000000..a91817f41 --- /dev/null +++ b/block-streamer/src/lib.rs @@ -0,0 +1,5 @@ +mod blockstreamer { + tonic::include_proto!("blockstreamer"); +} + +pub use blockstreamer::*; From 4ceb47c3a59dc368a53dc228401e61bd5642298c Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Wed, 29 Nov 2023 08:01:50 +1300 Subject: [PATCH 07/17] feat: Replace client binary with dedicated example --- block-streamer/Cargo.toml | 4 - block-streamer/examples/start_stream.rs | 25 +++++ block-streamer/src/client.rs | 138 ------------------------ 3 files changed, 25 insertions(+), 142 deletions(-) create mode 100644 block-streamer/examples/start_stream.rs delete mode 100644 block-streamer/src/client.rs diff --git a/block-streamer/Cargo.toml b/block-streamer/Cargo.toml index 93d9801d3..44214658d 100644 --- a/block-streamer/Cargo.toml +++ b/block-streamer/Cargo.toml @@ -3,10 +3,6 @@ name = "block-streamer" version = "0.1.0" edition = "2021" -[[bin]] -name = "routeguide-client" -path = "src/client.rs" - [dependencies] anyhow = "1.0.57" async-trait = "0.1.74" diff --git a/block-streamer/examples/start_stream.rs b/block-streamer/examples/start_stream.rs new file mode 100644 index 000000000..973bfa62e --- /dev/null +++ b/block-streamer/examples/start_stream.rs @@ -0,0 +1,25 @@ +use tonic::Request; + +use block_streamer::block_streamer_client::BlockStreamerClient; +use block_streamer::{start_stream_request::Rule, ActionAnyRule, StartStreamRequest, Status}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?; + + let response = client + .start_stream(Request::new(StartStreamRequest { + start_block_height: 10101010, + account_id: "morgs.near".to_string(), + function_name: "test".to_string(), + rule: Some(Rule::ActionAnyRule(ActionAnyRule { + affected_account_id: "token.sweat".to_string(), + status: Status::Success.into(), + })), + })) + .await?; + + println!("RESPONSE = {:?}", response); + + Ok(()) +} diff --git a/block-streamer/src/client.rs b/block-streamer/src/client.rs deleted file mode 100644 index 8974d2c62..000000000 --- a/block-streamer/src/client.rs +++ /dev/null @@ -1,138 +0,0 @@ -use std::error::Error; -use std::time::Duration; - -use rand::rngs::ThreadRng; -use rand::Rng; -use tokio::time; -use tonic::transport::Channel; -use tonic::Request; - -use routeguide::route_guide_client::RouteGuideClient; -use routeguide::{Point, Rectangle, RouteNote}; - -use blockstreamer::block_streamer_client::BlockStreamerClient; -// use blockstreamer::{IndexerRule, MatchingRule, StartStreamRequest}; -use blockstreamer::*; - -pub mod routeguide { - tonic::include_proto!("routeguide"); -} - -pub mod blockstreamer { - tonic::include_proto!("blockstreamer"); -} - -async fn print_features(client: &mut RouteGuideClient) -> Result<(), Box> { - let rectangle = Rectangle { - lo: Some(Point { - latitude: 400_000_000, - longitude: -750_000_000, - }), - hi: Some(Point { - latitude: 420_000_000, - longitude: -730_000_000, - }), - }; - - let mut stream = client - .list_features(Request::new(rectangle)) - .await? - .into_inner(); - - while let Some(feature) = stream.message().await? { - println!("NOTE = {:?}", feature); - } - - Ok(()) -} - -async fn run_record_route(client: &mut RouteGuideClient) -> Result<(), Box> { - let mut rng = rand::thread_rng(); - let point_count: i32 = rng.gen_range(2..100); - - let mut points = vec![]; - for _ in 0..=point_count { - points.push(random_point(&mut rng)) - } - - println!("Traversing {} points", points.len()); - let request = Request::new(tokio_stream::iter(points)); - - match client.record_route(request).await { - Ok(response) => println!("SUMMARY: {:?}", response.into_inner()), - Err(e) => println!("something went wrong: {:?}", e), - } - - Ok(()) -} - -async fn run_route_chat(client: &mut RouteGuideClient) -> Result<(), Box> { - let start = time::Instant::now(); - - let outbound = async_stream::stream! { - let mut interval = time::interval(Duration::from_secs(1)); - - loop { - let time = interval.tick().await; - let elapsed = time.duration_since(start); - let note = RouteNote { - location: Some(Point { - latitude: 409146138 + elapsed.as_secs() as i32, - longitude: -746188906, - }), - message: format!("at {:?}", elapsed), - }; - - yield note; - } - }; - - let response = client.route_chat(Request::new(outbound)).await?; - let mut inbound = response.into_inner(); - - while let Some(note) = inbound.message().await? { - println!("NOTE = {:?}", note); - } - - Ok(()) -} - -#[tokio::main] -async fn main() -> Result<(), Box> { - // let mut client = RouteGuideClient::connect("http://[::1]:10000").await?; - let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?; - - println!("*** SIMPLE RPC ***"); - let response = client - .start_stream(Request::new(StartStreamRequest { - start_block_height: 10101010, - account_id: "morgs.near".to_string(), - function_name: "test".to_string(), - rule: Some(start_stream_request::Rule::ActionAnyRule(ActionAnyRule { - affected_account_id: "token.sweat".to_string(), - status: Status::Success.into(), - })), - })) - .await?; - println!("RESPONSE = {:?}", response); - - // println!("\n*** SERVER STREAMING ***"); - // print_features(&mut client).await?; - // - // println!("\n*** CLIENT STREAMING ***"); - // run_record_route(&mut client).await?; - // - // println!("\n*** BIDIRECTIONAL STREAMING ***"); - // run_route_chat(&mut client).await?; - - Ok(()) -} - -fn random_point(rng: &mut ThreadRng) -> Point { - let latitude = (rng.gen_range(0..180) - 90) * 10_000_000; - let longitude = (rng.gen_range(0..360) - 180) * 10_000_000; - Point { - latitude, - longitude, - } -} From 5cc91cc486621a48d0d02af13485c9b1328b1b8b Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Sun, 3 Dec 2023 14:03:13 +1300 Subject: [PATCH 08/17] feat: Start block stream via RPC --- block-streamer/examples/start_stream.rs | 4 +- block-streamer/src/block_stream.rs | 25 +++++++----- block-streamer/src/delta_lake_client.rs | 3 +- block-streamer/src/indexer_config.rs | 11 +++--- .../src/server/block_streamer_service.rs | 39 ++++++++++++++----- 5 files changed, 55 insertions(+), 27 deletions(-) diff --git a/block-streamer/examples/start_stream.rs b/block-streamer/examples/start_stream.rs index 973bfa62e..108f4e799 100644 --- a/block-streamer/examples/start_stream.rs +++ b/block-streamer/examples/start_stream.rs @@ -9,11 +9,11 @@ async fn main() -> Result<(), Box> { let response = client .start_stream(Request::new(StartStreamRequest { - start_block_height: 10101010, + start_block_height: 106700000, account_id: "morgs.near".to_string(), function_name: "test".to_string(), rule: Some(Rule::ActionAnyRule(ActionAnyRule { - affected_account_id: "token.sweat".to_string(), + affected_account_id: "social.near".to_string(), status: Status::Success.into(), })), })) diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index af8670444..0c8e1f7f3 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -12,22 +12,24 @@ pub struct Task { pub struct BlockStream { task: Option, - // indexer_config: IndexerConfig, - // chain_id: ChainId, + indexer_config: IndexerConfig, + chain_id: ChainId, } impl BlockStream { - pub fn new() -> Self { - Self { task: None } + pub fn new(indexer_config: IndexerConfig, chain_id: ChainId) -> Self { + Self { + task: None, + indexer_config, + chain_id, + } } pub fn start( &mut self, start_block_height: near_indexer_primitives::types::BlockHeight, - indexer: IndexerConfig, redis_connection_manager: crate::redis::ConnectionManager, delta_lake_client: crate::delta_lake_client::DeltaLakeClient, - chain_id: ChainId, ) -> anyhow::Result<()> { if self.task.is_some() { return Err(anyhow::anyhow!("BlockStreamer has already been started",)); @@ -36,19 +38,22 @@ impl BlockStream { let cancellation_token = tokio_util::sync::CancellationToken::new(); let cancellation_token_clone = cancellation_token.clone(); + let indexer_config = self.indexer_config.clone(); + let chain_id = self.chain_id.clone(); + let handle = tokio::spawn(async move { tokio::select! { _ = cancellation_token_clone.cancelled() => { tracing::info!( "Cancelling existing block stream task for indexer: {}", - indexer.get_full_name(), + indexer_config.get_full_name(), ); Ok(()) }, result = start_block_stream( start_block_height, - indexer.clone(), + &indexer_config, &redis_connection_manager, &delta_lake_client, &chain_id, @@ -56,7 +61,7 @@ impl BlockStream { result.map_err(|err| { tracing::error!( "Block stream task for indexer: {} stopped due to error: {:?}", - indexer.get_full_name(), + indexer_config.get_full_name(), err, ); err @@ -93,7 +98,7 @@ impl BlockStream { pub(crate) async fn start_block_stream( start_block_height: near_indexer_primitives::types::BlockHeight, - indexer: IndexerConfig, + indexer: &IndexerConfig, redis_connection_manager: &crate::redis::ConnectionManager, delta_lake_client: &crate::delta_lake_client::DeltaLakeClient, chain_id: &ChainId, diff --git a/block-streamer/src/delta_lake_client.rs b/block-streamer/src/delta_lake_client.rs index 26fd2b144..bff283a35 100644 --- a/block-streamer/src/delta_lake_client.rs +++ b/block-streamer/src/delta_lake_client.rs @@ -31,6 +31,7 @@ pub struct IndexFile { pub actions: Vec, } +#[derive(Clone)] pub struct DeltaLakeClient where T: crate::s3_client::S3ClientTrait, @@ -46,7 +47,7 @@ where pub fn new(s3_client: T) -> Self { DeltaLakeClient { s3_client, - // hardcode to mainnet for + // hardcode to mainnet for now chain_id: ChainId::Mainnet, } } diff --git a/block-streamer/src/indexer_config.rs b/block-streamer/src/indexer_config.rs index 06328d69d..9d055044e 100644 --- a/block-streamer/src/indexer_config.rs +++ b/block-streamer/src/indexer_config.rs @@ -1,6 +1,7 @@ -use crate::rules::IndexerRule; use near_lake_framework::near_indexer_primitives::types::AccountId; +use crate::rules::IndexerRule; + #[derive( borsh::BorshSerialize, borsh::BorshDeserialize, @@ -12,10 +13,10 @@ use near_lake_framework::near_indexer_primitives::types::AccountId; pub struct IndexerConfig { pub account_id: AccountId, pub function_name: String, - pub code: String, - pub start_block_height: Option, - pub schema: Option, - pub provisioned: bool, + // pub code: String, + // pub start_block_height: Option, + // pub schema: Option, + // pub provisioned: bool, pub indexer_rule: IndexerRule, } diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index e2621a27f..4df1497b7 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -12,6 +12,7 @@ use blockstreamer::*; pub struct BlockStreamerService { redis_connection_manager: crate::redis::ConnectionManager, delta_lake_client: crate::delta_lake_client::DeltaLakeClient, + chain_id: ChainId, } impl BlockStreamerService { @@ -22,6 +23,7 @@ impl BlockStreamerService { Self { redis_connection_manager, delta_lake_client, + chain_id: ChainId::Mainnet, } } } @@ -33,35 +35,54 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic request: Request, ) -> Result, Status> { let request = request.into_inner(); + let matching_rule = match request.rule.unwrap() { start_stream_request::Rule::ActionAnyRule(action_any_rule) => { let affected_account_id = action_any_rule.affected_account_id; let status = match action_any_rule.status { - _ => crate::rules::Status::Success, + 1 => crate::rules::Status::Success, + 2 => crate::rules::Status::Fail, + 3 => crate::rules::Status::Any, + _ => return Err(Status::invalid_argument("Invalid status")), }; + MatchingRule::ActionAny { affected_account_id, status, } } - _ => unimplemented!(), + _ => unimplemented!("Rules other than ActionAny are not supported yet"), }; let filter_rule = IndexerRule { + // TODO: Remove kind as it is unused indexer_rule_kind: IndexerRuleKind::Action, matching_rule, id: None, name: None, }; - let indexer = IndexerConfig { - account_id: request.account_id.parse().expect("Invalid account id"), + let indexer_config = IndexerConfig { + account_id: request.account_id.parse().unwrap(), function_name: request.function_name, - code: "".to_string(), - start_block_height: Some(request.start_block_height), - schema: None, - provisioned: false, indexer_rule: filter_rule, }; - println!("StopStream = {:?}", indexer); + + let mut block_stream = + block_stream::BlockStream::new(indexer_config.clone(), self.chain_id.clone()); + + match block_stream.start( + request.start_block_height, + self.redis_connection_manager.clone(), + self.delta_lake_client.clone(), + ) { + Ok(_) => {} + Err(_) => { + return Err(Status::already_exists(format!( + "Block stream for {} already exists", + indexer_config.get_full_name() + ))) + } + } + Ok(Response::new(blockstreamer::StartStreamResponse::default())) } From 7a2847d30e9d45655e1612e552ceca678aad6a12 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Sun, 3 Dec 2023 14:54:41 +1300 Subject: [PATCH 09/17] feat: List streams via RPC --- block-streamer/examples/list_streams.rs | 17 +++++++++++ block-streamer/src/block_stream.rs | 4 +-- .../src/server/block_streamer_service.rs | 29 +++++++++++++++++-- 3 files changed, 45 insertions(+), 5 deletions(-) create mode 100644 block-streamer/examples/list_streams.rs diff --git a/block-streamer/examples/list_streams.rs b/block-streamer/examples/list_streams.rs new file mode 100644 index 000000000..16cde7d0f --- /dev/null +++ b/block-streamer/examples/list_streams.rs @@ -0,0 +1,17 @@ +use tonic::Request; + +use block_streamer::block_streamer_client::BlockStreamerClient; +use block_streamer::{start_stream_request::Rule, ActionAnyRule, ListStreamsRequest, Status}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?; + + let response = client + .list_streams(Request::new(ListStreamsRequest {})) + .await?; + + println!("RESPONSE = {:#?}", response); + + Ok(()) +} diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index 0c8e1f7f3..ef242ec76 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -12,8 +12,8 @@ pub struct Task { pub struct BlockStream { task: Option, - indexer_config: IndexerConfig, - chain_id: ChainId, + pub indexer_config: IndexerConfig, + pub chain_id: ChainId, } impl BlockStream { diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 4df1497b7..857df5c39 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -1,3 +1,6 @@ +use std::collections::HashMap; +use std::sync::Mutex; + use tonic::{Request, Response, Status}; use crate::indexer_config::IndexerConfig; @@ -13,6 +16,7 @@ pub struct BlockStreamerService { redis_connection_manager: crate::redis::ConnectionManager, delta_lake_client: crate::delta_lake_client::DeltaLakeClient, chain_id: ChainId, + block_streams: Mutex>, } impl BlockStreamerService { @@ -24,6 +28,7 @@ impl BlockStreamerService { redis_connection_manager, delta_lake_client, chain_id: ChainId::Mainnet, + block_streams: Mutex::new(HashMap::new()), } } } @@ -83,6 +88,9 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic } } + let mut lock = self.block_streams.lock().unwrap(); + lock.insert(indexer_config.get_full_name(), block_stream); + Ok(Response::new(blockstreamer::StartStreamResponse::default())) } @@ -96,9 +104,24 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic async fn list_streams( &self, - request: Request, + _request: Request, ) -> Result, Status> { - println!("ListStreams = {:?}", request); - Ok(Response::new(blockstreamer::ListStreamsResponse::default())) + let lock = self.block_streams.lock().unwrap(); + let block_streams: Vec = lock + .values() + .map(|block_stream| StreamInfo { + stream_id: "id".to_string(), + chain_id: self.chain_id.to_string(), + indexer_name: block_stream.indexer_config.get_full_name(), + start_block_height: 0, + status: "OK".to_string(), + }) + .collect(); + + let response = blockstreamer::ListStreamsResponse { + streams: block_streams, + }; + + Ok(Response::new(response)) } } From 47f97a5ba43d7991b50acde7142635fcfca2cb3b Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Sun, 3 Dec 2023 15:26:25 +1300 Subject: [PATCH 10/17] feat: Assign each stream a consistent ID --- block-streamer/src/indexer_config.rs | 8 ++++++++ block-streamer/src/server/block_streamer_service.rs | 6 ++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/block-streamer/src/indexer_config.rs b/block-streamer/src/indexer_config.rs index 9d055044e..eba4485b0 100644 --- a/block-streamer/src/indexer_config.rs +++ b/block-streamer/src/indexer_config.rs @@ -1,4 +1,6 @@ use near_lake_framework::near_indexer_primitives::types::AccountId; +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; use crate::rules::IndexerRule; @@ -24,4 +26,10 @@ impl IndexerConfig { pub fn get_full_name(&self) -> String { format!("{}/{}", self.account_id, self.function_name) } + + pub fn get_hash_id(&self) -> String { + let mut hasher = DefaultHasher::new(); + self.get_full_name().hash(&mut hasher); + hasher.finish().to_string() + } } diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 857df5c39..a5e031d58 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -91,7 +91,9 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic let mut lock = self.block_streams.lock().unwrap(); lock.insert(indexer_config.get_full_name(), block_stream); - Ok(Response::new(blockstreamer::StartStreamResponse::default())) + Ok(Response::new(blockstreamer::StartStreamResponse { + stream_id: indexer_config.get_hash_id(), + })) } async fn stop_stream( @@ -110,7 +112,7 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic let block_streams: Vec = lock .values() .map(|block_stream| StreamInfo { - stream_id: "id".to_string(), + stream_id: block_stream.indexer_config.get_hash_id(), chain_id: self.chain_id.to_string(), indexer_name: block_stream.indexer_config.get_full_name(), start_block_height: 0, From 629034f9873d9922cedddb47d81b72cececfb8d3 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Sun, 3 Dec 2023 20:17:58 +1300 Subject: [PATCH 11/17] feat: Stop block stream via RPC --- block-streamer/examples/stop_stream.rs | 20 +++++++++++++++++++ block-streamer/src/block_stream.rs | 2 +- .../src/server/block_streamer_service.rs | 13 ++++++++++-- 3 files changed, 32 insertions(+), 3 deletions(-) create mode 100644 block-streamer/examples/stop_stream.rs diff --git a/block-streamer/examples/stop_stream.rs b/block-streamer/examples/stop_stream.rs new file mode 100644 index 000000000..e451f2ae3 --- /dev/null +++ b/block-streamer/examples/stop_stream.rs @@ -0,0 +1,20 @@ +use tonic::Request; + +use block_streamer::block_streamer_client::BlockStreamerClient; +use block_streamer::StopStreamRequest; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?; + + let response = client + .stop_stream(Request::new(StopStreamRequest { + // ID for indexer morgs.near/test + stream_id: "16210176318434468568".to_string(), + })) + .await?; + + println!("RESPONSE = {:?}", response); + + Ok(()) +} diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index ef242ec76..3942888d7 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -45,7 +45,7 @@ impl BlockStream { tokio::select! { _ = cancellation_token_clone.cancelled() => { tracing::info!( - "Cancelling existing block stream task for indexer: {}", + "Cancelling block stream task for indexer: {}", indexer_config.get_full_name(), ); diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index a5e031d58..8d785b537 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -89,7 +89,7 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic } let mut lock = self.block_streams.lock().unwrap(); - lock.insert(indexer_config.get_full_name(), block_stream); + lock.insert(indexer_config.get_hash_id(), block_stream); Ok(Response::new(blockstreamer::StartStreamResponse { stream_id: indexer_config.get_hash_id(), @@ -100,7 +100,16 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic &self, request: Request, ) -> Result, Status> { - println!("StopStream = {:?}", request); + let request = request.into_inner(); + + let stream_id = request.stream_id; + + let mut block_stream = { + let mut lock = self.block_streams.lock().unwrap(); + lock.remove(&stream_id).unwrap() + }; + block_stream.cancel().await.unwrap(); + Ok(Response::new(blockstreamer::StopStreamResponse::default())) } From 954c5b5cfb5232c1c6f79e65cd94e12e845de509 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 4 Dec 2023 09:42:28 +1300 Subject: [PATCH 12/17] refactor: Remove `unwrap()`s from `start_stream` --- .../src/server/block_streamer_service.rs | 73 +++++++++++++------ 1 file changed, 49 insertions(+), 24 deletions(-) diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 8d785b537..1fdced2f0 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::sync::Mutex; +use near_lake_framework::near_indexer_primitives; use tonic::{Request, Response, Status}; use crate::indexer_config::IndexerConfig; @@ -12,6 +13,19 @@ use crate::server::blockstreamer; use blockstreamer::*; +impl TryFrom for crate::rules::Status { + type Error = (); + + fn try_from(status: i32) -> Result { + match status { + 0 => Ok(crate::rules::Status::Success), + 1 => Ok(crate::rules::Status::Fail), + 2 => Ok(crate::rules::Status::Any), + _ => Err(()), + } + } +} + pub struct BlockStreamerService { redis_connection_manager: crate::redis::ConnectionManager, delta_lake_client: crate::delta_lake_client::DeltaLakeClient, @@ -41,22 +55,27 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic ) -> Result, Status> { let request = request.into_inner(); - let matching_rule = match request.rule.unwrap() { + let rule = request + .rule + .ok_or(Status::invalid_argument("Rule must be provided"))?; + + let matching_rule = match rule { start_stream_request::Rule::ActionAnyRule(action_any_rule) => { let affected_account_id = action_any_rule.affected_account_id; - let status = match action_any_rule.status { - 1 => crate::rules::Status::Success, - 2 => crate::rules::Status::Fail, - 3 => crate::rules::Status::Any, - _ => return Err(Status::invalid_argument("Invalid status")), - }; + let status = action_any_rule.status.try_into().map_err(|_| { + Status::invalid_argument("Invalid status value for ActionAnyRule") + })?; MatchingRule::ActionAny { affected_account_id, status, } } - _ => unimplemented!("Rules other than ActionAny are not supported yet"), + _ => { + return Err(Status::unimplemented( + "Rules other than ActionAny are not supported yet", + )) + } }; let filter_rule = IndexerRule { // TODO: Remove kind as it is unused @@ -65,8 +84,16 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic id: None, name: None, }; + + let account_id = near_indexer_primitives::types::AccountId::try_from(request.account_id) + .map_err(|err| { + Status::invalid_argument(format!( + "Invalid account_id value for StartStreamRequest: {}", + err + )) + })?; let indexer_config = IndexerConfig { - account_id: request.account_id.parse().unwrap(), + account_id, function_name: request.function_name, indexer_rule: filter_rule, }; @@ -74,21 +101,18 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic let mut block_stream = block_stream::BlockStream::new(indexer_config.clone(), self.chain_id.clone()); - match block_stream.start( - request.start_block_height, - self.redis_connection_manager.clone(), - self.delta_lake_client.clone(), - ) { - Ok(_) => {} - Err(_) => { - return Err(Status::already_exists(format!( - "Block stream for {} already exists", - indexer_config.get_full_name() - ))) - } - } - - let mut lock = self.block_streams.lock().unwrap(); + block_stream + .start( + request.start_block_height, + self.redis_connection_manager.clone(), + self.delta_lake_client.clone(), + ) + .map_err(|_| Status::already_exists("Block stream already exists"))?; + + let mut lock = self + .block_streams + .lock() + .map_err(|err| Status::internal(format!("Failed to lock block_streams: {}", err)))?; lock.insert(indexer_config.get_hash_id(), block_stream); Ok(Response::new(blockstreamer::StartStreamResponse { @@ -126,6 +150,7 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic indexer_name: block_stream.indexer_config.get_full_name(), start_block_height: 0, status: "OK".to_string(), + // last_indexed_block }) .collect(); From 7e5789e6cc763f231ce2b60ecbebed0fa113bc1c Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 4 Dec 2023 09:57:00 +1300 Subject: [PATCH 13/17] refactor: Remove `unwrap()`s from `stop_stream` --- .../src/server/block_streamer_service.rs | 31 +++++++++++++++---- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 1fdced2f0..4e9cbe67c 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -112,7 +112,7 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic let mut lock = self .block_streams .lock() - .map_err(|err| Status::internal(format!("Failed to lock block_streams: {}", err)))?; + .map_err(|err| Status::internal(format!("Failed to acquire lock: {}", err)))?; lock.insert(indexer_config.get_hash_id(), block_stream); Ok(Response::new(blockstreamer::StartStreamResponse { @@ -128,13 +128,32 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic let stream_id = request.stream_id; - let mut block_stream = { - let mut lock = self.block_streams.lock().unwrap(); - lock.remove(&stream_id).unwrap() + let exising_block_stream = { + let mut lock = self + .block_streams + .lock() + .map_err(|err| Status::internal(format!("Failed to acquire lock: {}", err)))?; + lock.remove(&stream_id) }; - block_stream.cancel().await.unwrap(); - Ok(Response::new(blockstreamer::StopStreamResponse::default())) + match exising_block_stream { + None => { + return Err(Status::not_found(format!( + "Block stream with id {} not found", + stream_id + ))) + } + Some(mut block_stream) => { + block_stream + .cancel() + .await + .map_err(|_| Status::internal("Failed to cancel block stream"))?; + } + } + + Ok(Response::new(blockstreamer::StopStreamResponse { + status: "ok".to_string(), + })) } async fn list_streams( From fec98be959ca751a2a7b88cd4818d080eb872b8f Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 4 Dec 2023 10:02:01 +1300 Subject: [PATCH 14/17] chore: Remove example route guide RPC service --- block-streamer/build.rs | 2 - block-streamer/data/route_guide_db.json | 702 ------------------ block-streamer/proto/route_guide.proto | 106 --- block-streamer/src/data.rs | 34 - block-streamer/src/main.rs | 1 - block-streamer/src/server/mod.rs | 16 +- .../src/server/route_guide_service.rs | 188 ----- 7 files changed, 4 insertions(+), 1045 deletions(-) delete mode 100644 block-streamer/data/route_guide_db.json delete mode 100644 block-streamer/proto/route_guide.proto delete mode 100644 block-streamer/src/data.rs delete mode 100644 block-streamer/src/server/route_guide_service.rs diff --git a/block-streamer/build.rs b/block-streamer/build.rs index a7a4eb570..4683c5353 100644 --- a/block-streamer/build.rs +++ b/block-streamer/build.rs @@ -1,6 +1,4 @@ fn main() -> Result<(), Box> { - tonic_build::compile_protos("proto/route_guide.proto") - .unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); tonic_build::compile_protos("proto/block_streamer.proto")?; Ok(()) diff --git a/block-streamer/data/route_guide_db.json b/block-streamer/data/route_guide_db.json deleted file mode 100644 index 22e93e313..000000000 --- a/block-streamer/data/route_guide_db.json +++ /dev/null @@ -1,702 +0,0 @@ -[ - { - "location": { - "latitude": 407838351, - "longitude": -746143763 - }, - "name": "Patriots Path, Mendham, NJ 07945, USA" - }, - { - "location": { - "latitude": 408122808, - "longitude": -743999179 - }, - "name": "101 New Jersey 10, Whippany, NJ 07981, USA" - }, - { - "location": { - "latitude": 413628156, - "longitude": -749015468 - }, - "name": "U.S. 6, Shohola, PA 18458, USA" - }, - { - "location": { - "latitude": 419999544, - "longitude": -740371136 - }, - "name": "5 Conners Road, Kingston, NY 12401, USA" - }, - { - "location": { - "latitude": 414008389, - "longitude": -743951297 - }, - "name": "Mid Hudson Psychiatric Center, New Hampton, NY 10958, USA" - }, - { - "location": { - "latitude": 419611318, - "longitude": -746524769 - }, - "name": "287 Flugertown Road, Livingston Manor, NY 12758, USA" - }, - { - "location": { - "latitude": 406109563, - "longitude": -742186778 - }, - "name": "4001 Tremley Point Road, Linden, NJ 07036, USA" - }, - { - "location": { - "latitude": 416802456, - "longitude": -742370183 - }, - "name": "352 South Mountain Road, Wallkill, NY 12589, USA" - }, - { - "location": { - "latitude": 412950425, - "longitude": -741077389 - }, - "name": "Bailey Turn Road, Harriman, NY 10926, USA" - }, - { - "location": { - "latitude": 412144655, - "longitude": -743949739 - }, - "name": "193-199 Wawayanda Road, Hewitt, NJ 07421, USA" - }, - { - "location": { - "latitude": 415736605, - "longitude": -742847522 - }, - "name": "406-496 Ward Avenue, Pine Bush, NY 12566, USA" - }, - { - "location": { - "latitude": 413843930, - "longitude": -740501726 - }, - "name": "162 Merrill Road, Highland Mills, NY 10930, USA" - }, - { - "location": { - "latitude": 410873075, - "longitude": -744459023 - }, - "name": "Clinton Road, West Milford, NJ 07480, USA" - }, - { - "location": { - "latitude": 412346009, - "longitude": -744026814 - }, - "name": "16 Old Brook Lane, Warwick, NY 10990, USA" - }, - { - "location": { - "latitude": 402948455, - "longitude": -747903913 - }, - "name": "3 Drake Lane, Pennington, NJ 08534, USA" - }, - { - "location": { - "latitude": 406337092, - "longitude": -740122226 - }, - "name": "6324 8th Avenue, Brooklyn, NY 11220, USA" - }, - { - "location": { - "latitude": 406421967, - "longitude": -747727624 - }, - "name": "1 Merck Access Road, Whitehouse Station, NJ 08889, USA" - }, - { - "location": { - "latitude": 416318082, - "longitude": -749677716 - }, - "name": "78-98 Schalck Road, Narrowsburg, NY 12764, USA" - }, - { - "location": { - "latitude": 415301720, - "longitude": -748416257 - }, - "name": "282 Lakeview Drive Road, Highland Lake, NY 12743, USA" - }, - { - "location": { - "latitude": 402647019, - "longitude": -747071791 - }, - "name": "330 Evelyn Avenue, Hamilton Township, NJ 08619, USA" - }, - { - "location": { - "latitude": 412567807, - "longitude": -741058078 - }, - "name": "New York State Reference Route 987E, Southfields, NY 10975, USA" - }, - { - "location": { - "latitude": 416855156, - "longitude": -744420597 - }, - "name": "103-271 Tempaloni Road, Ellenville, NY 12428, USA" - }, - { - "location": { - "latitude": 404663628, - "longitude": -744820157 - }, - "name": "1300 Airport Road, North Brunswick Township, NJ 08902, USA" - }, - { - "location": { - "latitude": 407113723, - "longitude": -749746483 - }, - "name": "" - }, - { - "location": { - "latitude": 402133926, - "longitude": -743613249 - }, - "name": "" - }, - { - "location": { - "latitude": 400273442, - "longitude": -741220915 - }, - "name": "" - }, - { - "location": { - "latitude": 411236786, - "longitude": -744070769 - }, - "name": "" - }, - { - "location": { - "latitude": 411633782, - "longitude": -746784970 - }, - "name": "211-225 Plains Road, Augusta, NJ 07822, USA" - }, - { - "location": { - "latitude": 415830701, - "longitude": -742952812 - }, - "name": "" - }, - { - "location": { - "latitude": 413447164, - "longitude": -748712898 - }, - "name": "165 Pedersen Ridge Road, Milford, PA 18337, USA" - }, - { - "location": { - "latitude": 405047245, - "longitude": -749800722 - }, - "name": "100-122 Locktown Road, Frenchtown, NJ 08825, USA" - }, - { - "location": { - "latitude": 418858923, - "longitude": -746156790 - }, - "name": "" - }, - { - "location": { - "latitude": 417951888, - "longitude": -748484944 - }, - "name": "650-652 Willi Hill Road, Swan Lake, NY 12783, USA" - }, - { - "location": { - "latitude": 407033786, - "longitude": -743977337 - }, - "name": "26 East 3rd Street, New Providence, NJ 07974, USA" - }, - { - "location": { - "latitude": 417548014, - "longitude": -740075041 - }, - "name": "" - }, - { - "location": { - "latitude": 410395868, - "longitude": -744972325 - }, - "name": "" - }, - { - "location": { - "latitude": 404615353, - "longitude": -745129803 - }, - "name": "" - }, - { - "location": { - "latitude": 406589790, - "longitude": -743560121 - }, - "name": "611 Lawrence Avenue, Westfield, NJ 07090, USA" - }, - { - "location": { - "latitude": 414653148, - "longitude": -740477477 - }, - "name": "18 Lannis Avenue, New Windsor, NY 12553, USA" - }, - { - "location": { - "latitude": 405957808, - "longitude": -743255336 - }, - "name": "82-104 Amherst Avenue, Colonia, NJ 07067, USA" - }, - { - "location": { - "latitude": 411733589, - "longitude": -741648093 - }, - "name": "170 Seven Lakes Drive, Sloatsburg, NY 10974, USA" - }, - { - "location": { - "latitude": 412676291, - "longitude": -742606606 - }, - "name": "1270 Lakes Road, Monroe, NY 10950, USA" - }, - { - "location": { - "latitude": 409224445, - "longitude": -748286738 - }, - "name": "509-535 Alphano Road, Great Meadows, NJ 07838, USA" - }, - { - "location": { - "latitude": 406523420, - "longitude": -742135517 - }, - "name": "652 Garden Street, Elizabeth, NJ 07202, USA" - }, - { - "location": { - "latitude": 401827388, - "longitude": -740294537 - }, - "name": "349 Sea Spray Court, Neptune City, NJ 07753, USA" - }, - { - "location": { - "latitude": 410564152, - "longitude": -743685054 - }, - "name": "13-17 Stanley Street, West Milford, NJ 07480, USA" - }, - { - "location": { - "latitude": 408472324, - "longitude": -740726046 - }, - "name": "47 Industrial Avenue, Teterboro, NJ 07608, USA" - }, - { - "location": { - "latitude": 412452168, - "longitude": -740214052 - }, - "name": "5 White Oak Lane, Stony Point, NY 10980, USA" - }, - { - "location": { - "latitude": 409146138, - "longitude": -746188906 - }, - "name": "Berkshire Valley Management Area Trail, Jefferson, NJ, USA" - }, - { - "location": { - "latitude": 404701380, - "longitude": -744781745 - }, - "name": "1007 Jersey Avenue, New Brunswick, NJ 08901, USA" - }, - { - "location": { - "latitude": 409642566, - "longitude": -746017679 - }, - "name": "6 East Emerald Isle Drive, Lake Hopatcong, NJ 07849, USA" - }, - { - "location": { - "latitude": 408031728, - "longitude": -748645385 - }, - "name": "1358-1474 New Jersey 57, Port Murray, NJ 07865, USA" - }, - { - "location": { - "latitude": 413700272, - "longitude": -742135189 - }, - "name": "367 Prospect Road, Chester, NY 10918, USA" - }, - { - "location": { - "latitude": 404310607, - "longitude": -740282632 - }, - "name": "10 Simon Lake Drive, Atlantic Highlands, NJ 07716, USA" - }, - { - "location": { - "latitude": 409319800, - "longitude": -746201391 - }, - "name": "11 Ward Street, Mount Arlington, NJ 07856, USA" - }, - { - "location": { - "latitude": 406685311, - "longitude": -742108603 - }, - "name": "300-398 Jefferson Avenue, Elizabeth, NJ 07201, USA" - }, - { - "location": { - "latitude": 419018117, - "longitude": -749142781 - }, - "name": "43 Dreher Road, Roscoe, NY 12776, USA" - }, - { - "location": { - "latitude": 412856162, - "longitude": -745148837 - }, - "name": "Swan Street, Pine Island, NY 10969, USA" - }, - { - "location": { - "latitude": 416560744, - "longitude": -746721964 - }, - "name": "66 Pleasantview Avenue, Monticello, NY 12701, USA" - }, - { - "location": { - "latitude": 405314270, - "longitude": -749836354 - }, - "name": "" - }, - { - "location": { - "latitude": 414219548, - "longitude": -743327440 - }, - "name": "" - }, - { - "location": { - "latitude": 415534177, - "longitude": -742900616 - }, - "name": "565 Winding Hills Road, Montgomery, NY 12549, USA" - }, - { - "location": { - "latitude": 406898530, - "longitude": -749127080 - }, - "name": "231 Rocky Run Road, Glen Gardner, NJ 08826, USA" - }, - { - "location": { - "latitude": 407586880, - "longitude": -741670168 - }, - "name": "100 Mount Pleasant Avenue, Newark, NJ 07104, USA" - }, - { - "location": { - "latitude": 400106455, - "longitude": -742870190 - }, - "name": "517-521 Huntington Drive, Manchester Township, NJ 08759, USA" - }, - { - "location": { - "latitude": 400066188, - "longitude": -746793294 - }, - "name": "" - }, - { - "location": { - "latitude": 418803880, - "longitude": -744102673 - }, - "name": "40 Mountain Road, Napanoch, NY 12458, USA" - }, - { - "location": { - "latitude": 414204288, - "longitude": -747895140 - }, - "name": "" - }, - { - "location": { - "latitude": 414777405, - "longitude": -740615601 - }, - "name": "" - }, - { - "location": { - "latitude": 415464475, - "longitude": -747175374 - }, - "name": "48 North Road, Forestburgh, NY 12777, USA" - }, - { - "location": { - "latitude": 404062378, - "longitude": -746376177 - }, - "name": "" - }, - { - "location": { - "latitude": 405688272, - "longitude": -749285130 - }, - "name": "" - }, - { - "location": { - "latitude": 400342070, - "longitude": -748788996 - }, - "name": "" - }, - { - "location": { - "latitude": 401809022, - "longitude": -744157964 - }, - "name": "" - }, - { - "location": { - "latitude": 404226644, - "longitude": -740517141 - }, - "name": "9 Thompson Avenue, Leonardo, NJ 07737, USA" - }, - { - "location": { - "latitude": 410322033, - "longitude": -747871659 - }, - "name": "" - }, - { - "location": { - "latitude": 407100674, - "longitude": -747742727 - }, - "name": "" - }, - { - "location": { - "latitude": 418811433, - "longitude": -741718005 - }, - "name": "213 Bush Road, Stone Ridge, NY 12484, USA" - }, - { - "location": { - "latitude": 415034302, - "longitude": -743850945 - }, - "name": "" - }, - { - "location": { - "latitude": 411349992, - "longitude": -743694161 - }, - "name": "" - }, - { - "location": { - "latitude": 404839914, - "longitude": -744759616 - }, - "name": "1-17 Bergen Court, New Brunswick, NJ 08901, USA" - }, - { - "location": { - "latitude": 414638017, - "longitude": -745957854 - }, - "name": "35 Oakland Valley Road, Cuddebackville, NY 12729, USA" - }, - { - "location": { - "latitude": 412127800, - "longitude": -740173578 - }, - "name": "" - }, - { - "location": { - "latitude": 401263460, - "longitude": -747964303 - }, - "name": "" - }, - { - "location": { - "latitude": 412843391, - "longitude": -749086026 - }, - "name": "" - }, - { - "location": { - "latitude": 418512773, - "longitude": -743067823 - }, - "name": "" - }, - { - "location": { - "latitude": 404318328, - "longitude": -740835638 - }, - "name": "42-102 Main Street, Belford, NJ 07718, USA" - }, - { - "location": { - "latitude": 419020746, - "longitude": -741172328 - }, - "name": "" - }, - { - "location": { - "latitude": 404080723, - "longitude": -746119569 - }, - "name": "" - }, - { - "location": { - "latitude": 401012643, - "longitude": -744035134 - }, - "name": "" - }, - { - "location": { - "latitude": 404306372, - "longitude": -741079661 - }, - "name": "" - }, - { - "location": { - "latitude": 403966326, - "longitude": -748519297 - }, - "name": "" - }, - { - "location": { - "latitude": 405002031, - "longitude": -748407866 - }, - "name": "" - }, - { - "location": { - "latitude": 409532885, - "longitude": -742200683 - }, - "name": "" - }, - { - "location": { - "latitude": 416851321, - "longitude": -742674555 - }, - "name": "" - }, - { - "location": { - "latitude": 406411633, - "longitude": -741722051 - }, - "name": "3387 Richmond Terrace, Staten Island, NY 10303, USA" - }, - { - "location": { - "latitude": 413069058, - "longitude": -744597778 - }, - "name": "261 Van Sickle Road, Goshen, NY 10924, USA" - }, - { - "location": { - "latitude": 418465462, - "longitude": -746859398 - }, - "name": "" - }, - { - "location": { - "latitude": 411733222, - "longitude": -744228360 - }, - "name": "" - }, - { - "location": { - "latitude": 410248224, - "longitude": -747127767 - }, - "name": "3 Hasta Way, Newton, NJ 07860, USA" - } -] diff --git a/block-streamer/proto/route_guide.proto b/block-streamer/proto/route_guide.proto deleted file mode 100644 index 224f22024..000000000 --- a/block-streamer/proto/route_guide.proto +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright 2015 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; - -package routeguide; - -// Interface exported by the server. -service RouteGuide { - // A simple RPC. - // - // Obtains the feature at a given position. - // - // A feature with an empty name is returned if there's no feature at the given - // position. - rpc GetFeature(Point) returns (Feature) {} - - // A server-to-client streaming RPC. - // - // Obtains the Features available within the given Rectangle. Results are - // streamed rather than returned at once (e.g. in a response message with a - // repeated field), as the rectangle may cover a large area and contain a - // huge number of features. - rpc ListFeatures(Rectangle) returns (stream Feature) {} - - // A client-to-server streaming RPC. - // - // Accepts a stream of Points on a route being traversed, returning a - // RouteSummary when traversal is completed. - rpc RecordRoute(stream Point) returns (RouteSummary) {} - - // A Bidirectional streaming RPC. - // - // Accepts a stream of RouteNotes sent while a route is being traversed, - // while receiving other RouteNotes (e.g. from other users). - rpc RouteChat(stream RouteNote) returns (stream RouteNote) {} -} - -// Points are represented as latitude-longitude pairs in the E7 representation -// (degrees multiplied by 10**7 and rounded to the nearest integer). -// Latitudes should be in the range +/- 90 degrees and longitude should be in -// the range +/- 180 degrees (inclusive). -message Point { - int32 latitude = 1; - int32 longitude = 2; -} - -// A latitude-longitude rectangle, represented as two diagonally opposite -// points "lo" and "hi". -message Rectangle { - // One corner of the rectangle. - Point lo = 1; - - // The other corner of the rectangle. - Point hi = 2; -} - -// A feature names something at a given point. -// -// If a feature could not be named, the name is empty. -message Feature { - // The name of the feature. - string name = 1; - - // The point where the feature is detected. - Point location = 2; -} - -// A RouteNote is a message sent while at a given point. -message RouteNote { - // The location from which the message is sent. - Point location = 1; - - // The message to be sent. - string message = 2; -} - -// A RouteSummary is received in response to a RecordRoute rpc. -// -// It contains the number of individual points received, the number of -// detected features, and the total distance covered as the cumulative sum of -// the distance between each point. -message RouteSummary { - // The number of points received. - int32 point_count = 1; - - // The number of known features passed while traversing the route. - int32 feature_count = 2; - - // The distance covered in metres. - int32 distance = 3; - - // The duration of the traversal in seconds. - int32 elapsed_time = 4; -} diff --git a/block-streamer/src/data.rs b/block-streamer/src/data.rs deleted file mode 100644 index 86ee0969b..000000000 --- a/block-streamer/src/data.rs +++ /dev/null @@ -1,34 +0,0 @@ -use serde::Deserialize; -use std::fs::File; - -#[derive(Debug, Deserialize)] -struct Feature { - location: Location, - name: String, -} - -#[derive(Debug, Deserialize)] -struct Location { - latitude: i32, - longitude: i32, -} - -#[allow(dead_code)] -pub fn load() -> Vec { - let data_dir = std::path::PathBuf::from_iter([std::env!("CARGO_MANIFEST_DIR"), "data"]); - let file = File::open(data_dir.join("route_guide_db.json")).expect("failed to open data file"); - - let decoded: Vec = - serde_json::from_reader(&file).expect("failed to deserialize features"); - - decoded - .into_iter() - .map(|feature| crate::server::routeguide::Feature { - name: feature.name, - location: Some(crate::server::routeguide::Point { - longitude: feature.location.longitude, - latitude: feature.location.latitude, - }), - }) - .collect() -} diff --git a/block-streamer/src/main.rs b/block-streamer/src/main.rs index 29eeea714..aa32b5e9e 100644 --- a/block-streamer/src/main.rs +++ b/block-streamer/src/main.rs @@ -5,7 +5,6 @@ use tracing_subscriber::prelude::*; // use crate::rules::{IndexerRule, IndexerRuleKind, MatchingRule, Status}; mod block_stream; -mod data; mod delta_lake_client; mod indexer_config; mod redis; diff --git a/block-streamer/src/server/mod.rs b/block-streamer/src/server/mod.rs index 526d3a8bc..544482dda 100644 --- a/block-streamer/src/server/mod.rs +++ b/block-streamer/src/server/mod.rs @@ -1,9 +1,4 @@ mod block_streamer_service; -mod route_guide_service; - -pub mod routeguide { - tonic::include_proto!("routeguide"); -} pub mod blockstreamer { tonic::include_proto!("blockstreamer"); @@ -13,13 +8,11 @@ pub async fn init( redis_connection_manager: crate::redis::ConnectionManager, delta_lake_client: crate::delta_lake_client::DeltaLakeClient, ) -> anyhow::Result<()> { - let addr = "[::1]:10000".parse().unwrap(); - - println!("RouteGuideServer listening on: {}", addr); + let addr = "[::1]:10000" + .parse() + .expect("Failed to parse RPC socket address"); - let route_guide_service = route_guide_service::RouteGuideService::new(); - let route_guide_server = - routeguide::route_guide_server::RouteGuideServer::new(route_guide_service); + tracing::info!("Starting RPC server at {}", addr); let block_streamer_service = block_streamer_service::BlockStreamerService::new( redis_connection_manager, @@ -29,7 +22,6 @@ pub async fn init( blockstreamer::block_streamer_server::BlockStreamerServer::new(block_streamer_service); tonic::transport::Server::builder() - .add_service(route_guide_server) .add_service(block_streamer_server) .serve(addr) .await diff --git a/block-streamer/src/server/route_guide_service.rs b/block-streamer/src/server/route_guide_service.rs deleted file mode 100644 index dda352f89..000000000 --- a/block-streamer/src/server/route_guide_service.rs +++ /dev/null @@ -1,188 +0,0 @@ -use std::collections::HashMap; -use std::hash::{Hash, Hasher}; -use std::pin::Pin; -use std::sync::Arc; -use std::time::Instant; - -use tokio::sync::mpsc; -use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; -use tonic::{Request, Response, Status}; - -use crate::server::routeguide::route_guide_server::RouteGuide; -use crate::server::routeguide::{Feature, Point, Rectangle, RouteNote, RouteSummary}; - -#[derive(Debug)] -pub struct RouteGuideService { - pub features: Arc>, -} - -impl RouteGuideService { - pub fn new() -> Self { - Self { - features: Arc::new(crate::data::load()), - } - } -} - -#[tonic::async_trait] -impl RouteGuide for RouteGuideService { - async fn get_feature(&self, request: Request) -> Result, Status> { - println!("GetFeature = {:?}", request); - - for feature in &self.features[..] { - if feature.location.as_ref() == Some(request.get_ref()) { - return Ok(Response::new(feature.clone())); - } - } - - Ok(Response::new(Feature::default())) - } - - type ListFeaturesStream = ReceiverStream>; - - async fn list_features( - &self, - request: Request, - ) -> Result, Status> { - println!("ListFeatures = {:?}", request); - - let (tx, rx) = mpsc::channel(4); - let features = self.features.clone(); - - tokio::spawn(async move { - for feature in &features[..] { - if in_range(feature.location.as_ref().unwrap(), request.get_ref()) { - println!(" => send {:?}", feature); - tx.send(Ok(feature.clone())).await.unwrap(); - } - } - - println!(" /// done sending"); - }); - - Ok(Response::new(ReceiverStream::new(rx))) - } - - async fn record_route( - &self, - request: Request>, - ) -> Result, Status> { - println!("RecordRoute"); - - let mut stream = request.into_inner(); - - let mut summary = RouteSummary::default(); - let mut last_point = None; - let now = Instant::now(); - - while let Some(point) = stream.next().await { - let point = point?; - - println!(" ==> Point = {:?}", point); - - // Increment the point count - summary.point_count += 1; - - // Find features - for feature in &self.features[..] { - if feature.location.as_ref() == Some(&point) { - summary.feature_count += 1; - } - } - - // Calculate the distance - if let Some(ref last_point) = last_point { - summary.distance += calc_distance(last_point, &point); - } - - last_point = Some(point); - } - - summary.elapsed_time = now.elapsed().as_secs() as i32; - - Ok(Response::new(summary)) - } - - type RouteChatStream = Pin> + Send + 'static>>; - - async fn route_chat( - &self, - request: Request>, - ) -> Result, Status> { - println!("RouteChat"); - - let mut notes = HashMap::new(); - let mut stream = request.into_inner(); - - let output = async_stream::try_stream! { - while let Some(note) = stream.next().await { - let note = note?; - - let location = note.location.clone().unwrap(); - - let location_notes = notes.entry(location).or_insert(vec![]); - location_notes.push(note); - - for note in location_notes { - yield note.clone(); - } - } - }; - - Ok(Response::new(Box::pin(output) as Self::RouteChatStream)) - } -} - -impl Hash for Point { - fn hash(&self, state: &mut H) - where - H: Hasher, - { - self.latitude.hash(state); - self.longitude.hash(state); - } -} - -impl Eq for Point {} - -fn in_range(point: &Point, rect: &Rectangle) -> bool { - use std::cmp; - - let lo = rect.lo.as_ref().unwrap(); - let hi = rect.hi.as_ref().unwrap(); - - let left = cmp::min(lo.longitude, hi.longitude); - let right = cmp::max(lo.longitude, hi.longitude); - let top = cmp::max(lo.latitude, hi.latitude); - let bottom = cmp::min(lo.latitude, hi.latitude); - - point.longitude >= left - && point.longitude <= right - && point.latitude >= bottom - && point.latitude <= top -} - -/// Calculates the distance between two points using the "haversine" formula. -/// This code was taken from http://www.movable-type.co.uk/scripts/latlong.html. -fn calc_distance(p1: &Point, p2: &Point) -> i32 { - const CORD_FACTOR: f64 = 1e7; - const R: f64 = 6_371_000.0; // meters - - let lat1 = p1.latitude as f64 / CORD_FACTOR; - let lat2 = p2.latitude as f64 / CORD_FACTOR; - let lng1 = p1.longitude as f64 / CORD_FACTOR; - let lng2 = p2.longitude as f64 / CORD_FACTOR; - - let lat_rad1 = lat1.to_radians(); - let lat_rad2 = lat2.to_radians(); - - let delta_lat = (lat2 - lat1).to_radians(); - let delta_lng = (lng2 - lng1).to_radians(); - - let a = (delta_lat / 2f64).sin() * (delta_lat / 2f64).sin() - + (lat_rad1).cos() * (lat_rad2).cos() * (delta_lng / 2f64).sin() * (delta_lng / 2f64).sin(); - - let c = 2f64 * a.sqrt().atan2((1f64 - a).sqrt()); - - (R * c) as i32 -} From dff49952e4058b8afc7ba4f62de153093f0c71ff Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 4 Dec 2023 10:09:16 +1300 Subject: [PATCH 15/17] chore: Remove unused code --- block-streamer/src/main.rs | 49 -------------------------------------- 1 file changed, 49 deletions(-) diff --git a/block-streamer/src/main.rs b/block-streamer/src/main.rs index aa32b5e9e..50e87ac06 100644 --- a/block-streamer/src/main.rs +++ b/block-streamer/src/main.rs @@ -1,9 +1,5 @@ use tracing_subscriber::prelude::*; -// use crate::indexer_config::IndexerConfig; -// use crate::rules::types::indexer_rule_match::ChainId; -// use crate::rules::{IndexerRule, IndexerRuleKind, MatchingRule, Status}; - mod block_stream; mod delta_lake_client; mod indexer_config; @@ -19,8 +15,6 @@ async fn main() -> anyhow::Result<()> { .with(tracing_subscriber::EnvFilter::from_default_env()) .init(); - std::env::var("CHAIN_ID").expect("expected`CHAIN_ID` to be set in the environment"); - tracing::info!("Starting Block Streamer Service..."); let redis_connection_manager = redis::connect("redis://127.0.0.1").await?; @@ -33,47 +27,4 @@ async fn main() -> anyhow::Result<()> { server::init(redis_connection_manager, delta_lake_client).await?; Ok(()) - - // - // let aws_config = aws_config::from_env().load().await; - // let s3_client = crate::s3_client::S3Client::new(&aws_config); - // - // let delta_lake_client = crate::delta_lake_client::DeltaLakeClient::new(s3_client); - // - // let contract = "queryapi.dataplatform.near"; - // let matching_rule = MatchingRule::ActionAny { - // affected_account_id: contract.to_string(), - // status: Status::Any, - // }; - // let filter_rule = IndexerRule { - // indexer_rule_kind: IndexerRuleKind::Action, - // matching_rule, - // id: None, - // name: None, - // }; - // let indexer = IndexerConfig { - // account_id: "buildnear.testnet".to_string().parse().unwrap(), - // function_name: "index_stuff".to_string(), - // code: "".to_string(), - // start_block_height: Some(85376002), - // schema: None, - // provisioned: false, - // indexer_rule: filter_rule, - // }; - // - // let mut streamer = block_stream::BlockStream::new(); - // - // streamer.start( - // 106000000, - // indexer, - // redis_connection_manager, - // delta_lake_client, - // ChainId::Mainnet, - // )?; - // - // streamer.take_handle().unwrap().await??; - // - // println!("done"); - // - // Ok(()) } From a7b9f3618251855b75aa255b11356a3f5230835b Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 4 Dec 2023 11:08:54 +1300 Subject: [PATCH 16/17] chore: Remove unused crates --- block-streamer/Cargo.lock | 2 -- block-streamer/Cargo.toml | 2 -- 2 files changed, 4 deletions(-) diff --git a/block-streamer/Cargo.lock b/block-streamer/Cargo.lock index bb9f03abe..0035120f2 100644 --- a/block-streamer/Cargo.lock +++ b/block-streamer/Cargo.lock @@ -964,7 +964,6 @@ name = "block-streamer" version = "0.1.0" dependencies = [ "anyhow", - "async-stream", "async-trait", "aws-config 1.0.0", "aws-sdk-s3 0.39.1", @@ -977,7 +976,6 @@ dependencies = [ "mockall", "near-lake-framework", "prost", - "rand 0.8.5", "redis", "serde", "serde_json", diff --git a/block-streamer/Cargo.toml b/block-streamer/Cargo.toml index 44214658d..73db949bb 100644 --- a/block-streamer/Cargo.toml +++ b/block-streamer/Cargo.toml @@ -28,8 +28,6 @@ tonic = "0.10.2" wildmatch = "2.1.1" near-lake-framework = "0.7.1" -async-stream = "0.3.5" -rand = "0.8.5" [build-dependencies] tonic-build = "0.10" From 2d570f01e4fe8b491514e10d04a189cb101a321f Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Mon, 4 Dec 2023 13:56:50 +1300 Subject: [PATCH 17/17] fix: Prevent duplicate block streams from starting --- .../src/server/block_streamer_service.rs | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/block-streamer/src/server/block_streamer_service.rs b/block-streamer/src/server/block_streamer_service.rs index 4e9cbe67c..79fd1b541 100644 --- a/block-streamer/src/server/block_streamer_service.rs +++ b/block-streamer/src/server/block_streamer_service.rs @@ -45,6 +45,14 @@ impl BlockStreamerService { block_streams: Mutex::new(HashMap::new()), } } + + fn get_block_streams_lock( + &self, + ) -> Result>, Status> { + self.block_streams + .lock() + .map_err(|err| Status::internal(format!("Failed to acquire lock: {}", err))) + } } #[tonic::async_trait] @@ -98,6 +106,12 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic indexer_rule: filter_rule, }; + let lock = self.get_block_streams_lock()?; + match lock.get(&indexer_config.get_hash_id()) { + Some(_) => return Err(Status::already_exists("Block stream already exists")), + None => drop(lock), + } + let mut block_stream = block_stream::BlockStream::new(indexer_config.clone(), self.chain_id.clone()); @@ -107,12 +121,9 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic self.redis_connection_manager.clone(), self.delta_lake_client.clone(), ) - .map_err(|_| Status::already_exists("Block stream already exists"))?; + .map_err(|_| Status::internal("Failed to start block stream"))?; - let mut lock = self - .block_streams - .lock() - .map_err(|err| Status::internal(format!("Failed to acquire lock: {}", err)))?; + let mut lock = self.get_block_streams_lock()?; lock.insert(indexer_config.get_hash_id(), block_stream); Ok(Response::new(blockstreamer::StartStreamResponse { @@ -129,10 +140,7 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic let stream_id = request.stream_id; let exising_block_stream = { - let mut lock = self - .block_streams - .lock() - .map_err(|err| Status::internal(format!("Failed to acquire lock: {}", err)))?; + let mut lock = self.get_block_streams_lock()?; lock.remove(&stream_id) };