Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPLT-1077 Process historical messages via Redis Streams #181

Merged
merged 14 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 44 additions & 6 deletions indexer/queryapi_coordinator/src/historical_block_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,37 @@ pub const MAX_RPC_BLOCKS_TO_PROCESS: u8 = 20;
pub fn spawn_historical_message_thread(
block_height: BlockHeight,
new_indexer_function: &IndexerFunction,
redis_connection_manager: &storage::ConnectionManager,
) -> Option<JoinHandle<i64>> {
let redis_connection_manager = redis_connection_manager.clone();
new_indexer_function.start_block_height.map(|_| {
let new_indexer_function_copy = new_indexer_function.clone();
tokio::spawn(process_historical_messages_or_handle_error(
block_height,
new_indexer_function_copy,
Opts::parse(),
))
tokio::spawn(async move {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to ownership of redis_connection_manager to the thread

process_historical_messages_or_handle_error(
block_height,
new_indexer_function_copy,
Opts::parse(),
&redis_connection_manager,
)
.await
})
})
}

pub(crate) async fn process_historical_messages_or_handle_error(
block_height: BlockHeight,
indexer_function: IndexerFunction,
opts: Opts,
redis_connection_manager: &storage::ConnectionManager,
) -> i64 {
match process_historical_messages(block_height, indexer_function, opts).await {
match process_historical_messages(
block_height,
indexer_function,
opts,
redis_connection_manager,
)
.await
{
Ok(block_difference) => block_difference,
Err(err) => {
// todo: when Coordinator can send log messages to Runner, send this error to Runner
Expand All @@ -58,6 +72,7 @@ pub(crate) async fn process_historical_messages(
block_height: BlockHeight,
indexer_function: IndexerFunction,
opts: Opts,
redis_connection_manager: &storage::ConnectionManager,
) -> anyhow::Result<i64> {
let start_block = indexer_function.start_block_height.unwrap();
let block_difference: i64 = (block_height - start_block) as i64;
Expand Down Expand Up @@ -124,7 +139,30 @@ pub(crate) async fn process_historical_messages(
blocks_from_index.append(&mut blocks_between_indexed_and_current_block);

let first_block_in_index = *blocks_from_index.first().unwrap_or(&start_block);

if !blocks_from_index.is_empty() {
storage::sadd(
redis_connection_manager,
storage::STREAMS_SET_KEY,
storage::generate_historical_stream_key(&indexer_function.get_full_name()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have the ability to distinguish between different historical processes?

Won't using the indexer function's name cause the old historical processes's with the same name to be overwritten?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and that's intentional. We don't want to have multiple concurrent historical processes for a given indexer. I'll be working on stopping existing processes soon, which is why I didn't bother creating unique streams.

)
.await?;
storage::set(
redis_connection_manager,
storage::generate_historical_storage_key(&indexer_function.get_full_name()),
serde_json::to_string(&indexer_function)?,
)
.await?;
}

for current_block in blocks_from_index {
storage::xadd(
redis_connection_manager,
storage::generate_historical_stream_key(&indexer_function.get_full_name()),
&[("block_height", current_block)],
)
.await?;

send_execution_message(
block_height,
first_block_in_index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ mod tests {

let opts = Opts::test_opts_with_aws();
let aws_config: &SdkConfig = &opts.lake_aws_sdk_config();
let redis_connection_manager = storage::connect(&opts.redis_connection_string)
.await
.unwrap();
let fake_block_height =
historical_block_processing::last_indexed_block_from_metadata(aws_config)
.await
Expand All @@ -84,6 +87,7 @@ mod tests {
fake_block_height + 1,
indexer_function,
opts,
&redis_connection_manager,
)
.await;
assert!(result.unwrap() > 0);
Expand Down
1 change: 1 addition & 0 deletions indexer/queryapi_coordinator/src/indexer_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ fn index_and_process_register_calls(
crate::historical_block_processing::spawn_historical_message_thread(
block_height,
&mut new_indexer_function,
context.redis_connection_manager,
)
{
spawned_start_from_block_threads.push(thread);
Expand Down
8 changes: 4 additions & 4 deletions indexer/queryapi_coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,19 +202,19 @@ async fn handle_streamer_message(

storage::sadd(
context.redis_connection_manager,
storage::INDEXER_SET_KEY,
indexer_function.get_full_name(),
storage::STREAMS_SET_KEY,
storage::generate_real_time_stream_key(&indexer_function.get_full_name()),
)
.await?;
storage::set(
context.redis_connection_manager,
storage::generate_storage_key(&indexer_function.get_full_name()),
storage::generate_real_time_storage_key(&indexer_function.get_full_name()),
serde_json::to_string(indexer_function)?,
)
.await?;
storage::xadd(
context.redis_connection_manager,
storage::generate_stream_key(&indexer_function.get_full_name()),
storage::generate_real_time_stream_key(&indexer_function.get_full_name()),
&[("block_height", block_height)],
)
.await?;
Expand Down
26 changes: 13 additions & 13 deletions indexer/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,26 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs};

const STORAGE: &str = "storage_alertexer";

pub const INDEXER_SET_KEY: &str = "indexers";
pub const STREAMS_SET_KEY: &str = "streams";

pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client {
redis::Client::open(redis_connection_str).expect("can create redis client")
}

pub fn generate_storage_key(name: &str) -> String {
format!("{}:storage", name)
pub fn generate_real_time_stream_key(prefix: &str) -> String {
format!("{}:real_time:stream", prefix)
}

pub fn generate_stream_key(name: &str) -> String {
format!("{}:stream", name)
pub fn generate_real_time_storage_key(prefix: &str) -> String {
format!("{}:real_time:stream:storage", prefix)
}

pub fn generate_historical_stream_key(prefix: &str) -> String {
format!("{}:historical:stream", prefix)
}

pub fn generate_historical_storage_key(prefix: &str) -> String {
format!("{}:historical:stream:storage", prefix)
}

pub async fn connect(redis_connection_str: &str) -> anyhow::Result<ConnectionManager> {
Expand Down Expand Up @@ -84,14 +92,6 @@ pub async fn xadd(
) -> anyhow::Result<()> {
tracing::debug!(target: STORAGE, "XADD: {:?}, {:?}", stream_key, fields);

// TODO: Remove stream cap when we finally start processing it
redis::cmd("XTRIM")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We now delete the stream messages after they are processed - so this can be removed

.arg(&stream_key)
.arg("MAXLEN")
.arg(100)
.query_async(&mut redis_connection_manager.clone())
.await?;

let mut cmd = redis::cmd("XADD");
cmd.arg(stream_key).arg("*");

Expand Down
Loading
Loading