Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/channel based autoconnect #8

Merged
merged 33 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
24add74
wip
grooviegermanikus Jan 18, 2024
fa05a1b
add connection state
grooviegermanikus Jan 18, 2024
c9144ee
compiles
grooviegermanikus Jan 18, 2024
bbba7b1
send works now
grooviegermanikus Jan 18, 2024
e093a58
check tonic error
grooviegermanikus Jan 18, 2024
def0853
more cases on connect
grooviegermanikus Jan 18, 2024
ca55a8b
handle connect problems
grooviegermanikus Jan 18, 2024
33cb8cb
split impl
grooviegermanikus Jan 18, 2024
21f2ef3
wip
grooviegermanikus Jan 19, 2024
dc53a50
switch to mpsc
grooviegermanikus Jan 19, 2024
cf0c83b
compile
grooviegermanikus Jan 19, 2024
9dec39d
test/example for autoconnect
grooviegermanikus Jan 19, 2024
b9875cb
cleanup
grooviegermanikus Jan 19, 2024
0200e9d
map fatal errors
grooviegermanikus Jan 19, 2024
11b24bc
testcase lagging
grooviegermanikus Jan 19, 2024
80d5c97
channel plugger mspc->broadcast
grooviegermanikus Jan 19, 2024
afefb18
cleanup
grooviegermanikus Jan 19, 2024
46f5687
introduce T
grooviegermanikus Jan 19, 2024
87a4bbc
use channel_plugger in test
grooviegermanikus Jan 19, 2024
6e15250
minor cleanup
grooviegermanikus Jan 22, 2024
b3808da
example
grooviegermanikus Jan 22, 2024
1d0b789
move Message type
grooviegermanikus Jan 22, 2024
550b607
update v1.12.0+solana.1.17.15
grooviegermanikus Jan 22, 2024
542f116
Cargo.lock
grooviegermanikus Jan 22, 2024
1b9926f
slots
grooviegermanikus Jan 23, 2024
f22a0f2
remove exclamation mark from logs
grooviegermanikus Jan 23, 2024
ec43c29
handle timeout
grooviegermanikus Jan 26, 2024
db03404
handle timeout for stream version
grooviegermanikus Jan 26, 2024
68221ce
clippy+fmt
grooviegermanikus Jan 26, 2024
1ddd4bd
clippy+fmt
grooviegermanikus Jan 26, 2024
5c568a2
remove grpc_subscription.rs
grooviegermanikus Jan 26, 2024
048ad2b
clippy+fmt
grooviegermanikus Jan 31, 2024
e608758
Merge remote-tracking branch 'origin/main' into feature/channel-based…
grooviegermanikus Jan 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 102 additions & 87 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "geyser-grpc-connector"
version = "0.7.2+yellowstone.1.11"
version = "0.10.1+yellowstone.1.12"
edition = "2021"

description = "Multiplexing and Reconnection on Yellowstone gRPC Geyser client streaming"
Expand All @@ -9,13 +9,12 @@ authors = ["GroovieGermanikus <[email protected]>"]
repository = "https://github.com/blockworks-foundation/geyser-grpc-connector"

[dependencies]
# v1.11.0+solana.1.16.17
yellowstone-grpc-proto = "1.11.0"
# v1.12.0+solana.1.16.17
yellowstone-grpc-client = "1.12.0"
yellowstone-grpc-client = { version = "1.13.0+solana.1.17.15", git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" }
yellowstone-grpc-proto = { version = "1.12.0+solana.1.17.15", git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" }


# required for CommitmentConfig
solana-sdk = "~1.16.17"
solana-sdk = "~1.17.15"

url = "2.5.0"
async-stream = "0.3.5"
Expand All @@ -33,3 +32,4 @@ bincode = "1.3.3"

[dev-dependencies]
tracing-subscriber = "0.3.16"
solana-logger = "1"
138 changes: 138 additions & 0 deletions examples/stream_blocks_autoconnect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use futures::StreamExt;

Check warning on line 1 in examples/stream_blocks_autoconnect.rs

View workflow job for this annotation

GitHub Actions / test

unused import: `futures::StreamExt`

Check warning on line 1 in examples/stream_blocks_autoconnect.rs

View workflow job for this annotation

GitHub Actions / test

unused import: `futures::StreamExt`

Check warning on line 1 in examples/stream_blocks_autoconnect.rs

View workflow job for this annotation

GitHub Actions / test

unused import: `futures::StreamExt`

Check warning on line 1 in examples/stream_blocks_autoconnect.rs

View workflow job for this annotation

GitHub Actions / test

unused import: `futures::StreamExt`
use log::info;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::env;

use geyser_grpc_connector::channel_plugger::spawn_broadcast_channel_plug;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task;
use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message};
use tokio::time::{sleep, Duration};
use tracing::warn;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
use yellowstone_grpc_proto::prost::Message as _;

pub struct BlockMini {
pub blocksize: usize,
pub slot: Slot,
pub commitment_config: CommitmentConfig,
}

struct BlockMiniExtractor(CommitmentConfig);

impl FromYellowstoneExtractor for BlockMiniExtractor {
type Target = BlockMini;
fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)> {
match update.update_oneof {
Some(UpdateOneof::Block(update_block_message)) => {
let blocksize = update_block_message.encoded_len();
let slot = update_block_message.slot;
let mini = BlockMini {
blocksize,
slot,
commitment_config: self.0,
};
Some((slot, mini))
}
Some(UpdateOneof::BlockMeta(update_blockmeta_message)) => {
let blocksize = update_blockmeta_message.encoded_len();
let slot = update_blockmeta_message.slot;
let mini = BlockMini {
blocksize,
slot,
commitment_config: self.0,
};
Some((slot, mini))
}
_ => None,
}
}
}

#[allow(dead_code)]
enum TestCases {
Basic,
SlowReceiverStartup,
TemporaryLaggingReceiver,
CloseAfterReceiving,
AbortTaskFromOutside,
}
const TEST_CASE: TestCases = TestCases::Basic;

#[tokio::main]
pub async fn main() {
// RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace
tracing_subscriber::fmt::init();
// console_subscriber::init();

let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green");
let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok();

info!(
"Using grpc source on {} ({})",
grpc_addr_green,
grpc_x_token_green.is_some()
);

let timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(5),
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
};

let green_config =
GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());

info!("Write Block stream..");

let (jh_geyser_task, message_channel) = create_geyser_autoconnection_task(
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
);
let mut message_channel =
spawn_broadcast_channel_plug(tokio::sync::broadcast::channel(8), message_channel);

tokio::spawn(async move {
if let TestCases::SlowReceiverStartup = TEST_CASE {
sleep(Duration::from_secs(5)).await;
}

let mut message_count = 0;
while let Ok(message) = message_channel.recv().await {
if let TestCases::AbortTaskFromOutside = TEST_CASE {
if message_count > 5 {
info!("(testcase) aborting task from outside");
jh_geyser_task.abort();
}
}
match message {
Message::GeyserSubscribeUpdate(subscriber_update) => {
message_count += 1;
info!("got update - {} bytes", subscriber_update.encoded_len());

if let TestCases::CloseAfterReceiving = TEST_CASE {
info!("(testcase) closing stream after receiving");
return;
}
}
Message::Connecting(attempt) => {
warn!("Connection attempt: {}", attempt);
}
}

if let TestCases::TemporaryLaggingReceiver = TEST_CASE {
if message_count % 3 == 1 {
info!("(testcase) lagging a bit");
sleep(Duration::from_millis(1500)).await;
}
}
}
warn!("Stream aborted");
});

// "infinite" sleep
sleep(Duration::from_secs(2000)).await;
}
6 changes: 3 additions & 3 deletions examples/stream_blocks_mainnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ use solana_sdk::signature::Signature;
use solana_sdk::transaction::TransactionError;
use yellowstone_grpc_proto::geyser::SubscribeUpdateBlock;

use geyser_grpc_connector::grpc_subscription_autoreconnect::{
create_geyser_reconnecting_stream, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig,
};
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
};
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig};
use tokio::time::{sleep, Duration};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
Expand Down Expand Up @@ -130,6 +129,7 @@ pub async fn main() {
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(5),
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
};

let green_config =
Expand Down
76 changes: 60 additions & 16 deletions examples/stream_blocks_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@
use std::env;
use std::pin::pin;

use geyser_grpc_connector::grpc_subscription_autoreconnect::{
create_geyser_reconnecting_stream, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig,
};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
};
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message};
use tokio::time::{sleep, Duration};
use tracing::warn;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
use yellowstone_grpc_proto::prost::Message;
use yellowstone_grpc_proto::prost::Message as _;

fn start_example_blockmini_consumer(

Check warning on line 17 in examples/stream_blocks_single.rs

View workflow job for this annotation

GitHub Actions / test

function `start_example_blockmini_consumer` is never used

Check warning on line 17 in examples/stream_blocks_single.rs

View workflow job for this annotation

GitHub Actions / test

function `start_example_blockmini_consumer` is never used

Check warning on line 17 in examples/stream_blocks_single.rs

View workflow job for this annotation

GitHub Actions / test

function `start_example_blockmini_consumer` is never used

Check warning on line 17 in examples/stream_blocks_single.rs

View workflow job for this annotation

GitHub Actions / test

function `start_example_blockmini_consumer` is never used
multiplex_stream: impl Stream<Item = BlockMini> + Send + 'static,
) {
tokio::spawn(async move {
Expand Down Expand Up @@ -86,23 +84,69 @@
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(5),
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
};

let green_config =
GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
let config = GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());

info!("Write Block stream..");

let green_stream = create_geyser_reconnecting_stream(
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
// GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
config.clone(),
GeyserFilter(CommitmentConfig::finalized()).blocks_and_txs(),
);
let multiplex_stream = create_multiplexed_stream(
vec![green_stream],
BlockMiniExtractor(CommitmentConfig::confirmed()),

let blue_stream = create_geyser_reconnecting_stream(
config.clone(),
GeyserFilter(CommitmentConfig::processed()).blocks_and_txs(),
);
start_example_blockmini_consumer(multiplex_stream);

tokio::spawn(async move {
let mut green_stream = pin!(green_stream);
while let Some(message) = green_stream.next().await {
match message {
Message::GeyserSubscribeUpdate(subscriber_update) => {
let mapped = map_block_update(*subscriber_update);
if let Some(slot) = mapped {
info!("got update (green)!!! slot: {}", slot);
}
}
Message::Connecting(attempt) => {
warn!("Connection attempt: {}", attempt);
}
}
}
warn!("Stream aborted");
});

tokio::spawn(async move {
let mut blue_stream = pin!(blue_stream);
while let Some(message) = blue_stream.next().await {
match message {
Message::GeyserSubscribeUpdate(subscriber_update) => {
let mapped = map_block_update(*subscriber_update);
if let Some(slot) = mapped {
info!("got update (blue)!!! slot: {}", slot);
}
}
Message::Connecting(attempt) => {
warn!("Connection attempt: {}", attempt);
}
}
}
warn!("Stream aborted");
});

// "infinite" sleep
sleep(Duration::from_secs(1800)).await;
}

fn map_block_update(update: SubscribeUpdate) -> Option<Slot> {
match update.update_oneof {
Some(UpdateOneof::Block(update_block_message)) => {
let slot = update_block_message.slot;
Some(slot)
}
_ => None,
}
}
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[toolchain]
channel = "1.70"
channel = "1.73.0"
Loading
Loading