-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DPLT-1077 Process historical messages via Redis Streams #181
Changes from all commits
4457256
ed699c2
b79c418
b4956dc
187a234
059eafc
7285e7d
213e252
e5a6b31
1f83147
fd58ccc
f416e23
75b8d8b
def5d5f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,23 +25,37 @@ pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20; | |
pub fn spawn_historical_message_thread( | ||
block_height: BlockHeight, | ||
new_indexer_function: &IndexerFunction, | ||
redis_connection_manager: &storage::ConnectionManager, | ||
) -> Option<JoinHandle<i64>> { | ||
let redis_connection_manager = redis_connection_manager.clone(); | ||
new_indexer_function.start_block_height.map(|_| { | ||
let new_indexer_function_copy = new_indexer_function.clone(); | ||
tokio::spawn(process_historical_messages_or_handle_error( | ||
block_height, | ||
new_indexer_function_copy, | ||
Opts::parse(), | ||
)) | ||
tokio::spawn(async move { | ||
process_historical_messages_or_handle_error( | ||
block_height, | ||
new_indexer_function_copy, | ||
Opts::parse(), | ||
&redis_connection_manager, | ||
) | ||
.await | ||
}) | ||
}) | ||
} | ||
|
||
pub(crate) async fn process_historical_messages_or_handle_error( | ||
block_height: BlockHeight, | ||
indexer_function: IndexerFunction, | ||
opts: Opts, | ||
redis_connection_manager: &storage::ConnectionManager, | ||
) -> i64 { | ||
match process_historical_messages(block_height, indexer_function, opts).await { | ||
match process_historical_messages( | ||
block_height, | ||
indexer_function, | ||
opts, | ||
redis_connection_manager, | ||
) | ||
.await | ||
{ | ||
Ok(block_difference) => block_difference, | ||
Err(err) => { | ||
// todo: when Coordinator can send log messages to Runner, send this error to Runner | ||
|
@@ -58,6 +72,7 @@ pub(crate) async fn process_historical_messages( | |
block_height: BlockHeight, | ||
indexer_function: IndexerFunction, | ||
opts: Opts, | ||
redis_connection_manager: &storage::ConnectionManager, | ||
) -> anyhow::Result<i64> { | ||
let start_block = indexer_function.start_block_height.unwrap(); | ||
let block_difference: i64 = (block_height - start_block) as i64; | ||
|
@@ -124,7 +139,30 @@ pub(crate) async fn process_historical_messages( | |
blocks_from_index.append(&mut blocks_between_indexed_and_current_block); | ||
|
||
let first_block_in_index = *blocks_from_index.first().unwrap_or(&start_block); | ||
|
||
if !blocks_from_index.is_empty() { | ||
storage::sadd( | ||
redis_connection_manager, | ||
storage::STREAMS_SET_KEY, | ||
storage::generate_historical_stream_key(&indexer_function.get_full_name()), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we have the ability to distinguish between different historical processes? Won't using the indexer function's name cause the old historical processes's with the same name to be overwritten? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, and that's intentional. We don't want to have multiple concurrent historical processes for a given indexer. I'll be working on stopping existing processes soon, which is why I didn't bother creating unique streams. |
||
) | ||
.await?; | ||
storage::set( | ||
redis_connection_manager, | ||
storage::generate_historical_storage_key(&indexer_function.get_full_name()), | ||
serde_json::to_string(&indexer_function)?, | ||
) | ||
.await?; | ||
} | ||
|
||
for current_block in blocks_from_index { | ||
storage::xadd( | ||
redis_connection_manager, | ||
storage::generate_historical_stream_key(&indexer_function.get_full_name()), | ||
&[("block_height", current_block)], | ||
) | ||
.await?; | ||
|
||
send_execution_message( | ||
block_height, | ||
first_block_in_index, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,18 +2,26 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs}; | |
|
||
const STORAGE: &str = "storage_alertexer"; | ||
|
||
pub const INDEXER_SET_KEY: &str = "indexers"; | ||
pub const STREAMS_SET_KEY: &str = "streams"; | ||
|
||
pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client { | ||
redis::Client::open(redis_connection_str).expect("can create redis client") | ||
} | ||
|
||
pub fn generate_storage_key(name: &str) -> String { | ||
format!("{}:storage", name) | ||
pub fn generate_real_time_stream_key(prefix: &str) -> String { | ||
format!("{}:real_time:stream", prefix) | ||
} | ||
|
||
pub fn generate_stream_key(name: &str) -> String { | ||
format!("{}:stream", name) | ||
pub fn generate_real_time_storage_key(prefix: &str) -> String { | ||
format!("{}:real_time:stream:storage", prefix) | ||
} | ||
|
||
pub fn generate_historical_stream_key(prefix: &str) -> String { | ||
format!("{}:historical:stream", prefix) | ||
} | ||
|
||
pub fn generate_historical_storage_key(prefix: &str) -> String { | ||
format!("{}:historical:stream:storage", prefix) | ||
} | ||
|
||
pub async fn connect(redis_connection_str: &str) -> anyhow::Result<ConnectionManager> { | ||
|
@@ -84,14 +92,6 @@ pub async fn xadd( | |
) -> anyhow::Result<()> { | ||
tracing::debug!(target: STORAGE, "XADD: {:?}, {:?}", stream_key, fields); | ||
|
||
// TODO: Remove stream cap when we finally start processing it | ||
redis::cmd("XTRIM") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We now delete the stream messages after they are processed - so this can be removed |
||
.arg(&stream_key) | ||
.arg("MAXLEN") | ||
.arg(100) | ||
.query_async(&mut redis_connection_manager.clone()) | ||
.await?; | ||
|
||
let mut cmd = redis::cmd("XADD"); | ||
cmd.arg(stream_key).arg("*"); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to ownership of
redis_connection_manager
to the thread