-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DPLT-1074 Queue real-time messages on Redis Streams #157
DPLT-1074 Queue real-time messages on Redis Streams #157
Conversation
key: &str, | ||
fields: &[(&str, impl ToRedisArgs + std::fmt::Debug)], | ||
) -> anyhow::Result<()> { | ||
sadd(redis_connection_manager, STREAMS_SET_KEY, key).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a set to track the complete list of streams as it is not straight forward to query them directly
tracing::debug!(target: STORAGE, "XADD: {}, {:?}", stream_key, fields); | ||
|
||
// TODO: Remove stream cap when we finally start processing it | ||
redis::cmd("XTRIM") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we have nothing processing/removing these messages I'm adding a limit to avoid hitting the memory limit of Redis
@@ -195,6 +195,20 @@ async fn handle_streamer_message( | |||
if !indexer_function.provisioned { | |||
set_provisioned_flag(&mut indexer_registry_locked, &indexer_function); | |||
} | |||
|
|||
storage::set( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only adding a subset of the data for now. This will definitely evolve over time and we'll need to have a decent think around where each piece belongs; on the stream on in storage.
I'm specifically trying to avoid cases where the 'static' data changes and is read immediately, but it corresponds to a block height later in the stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could set it in the same places as the in memory registry: on startup and update when the indexer_registry.index_and_process_register_calls function detects a change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I'll look in to it 👍🏼
This reverts commit d2f44e8.
This PR adds a subset of the
IndexerQueueMessage
s to a Redis Stream dedicated to that indexer.