Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPLT-1077 Process historical messages via Redis Streams #181

Merged
merged 14 commits into from
Aug 17, 2023

Conversation

morgsmccauley
Copy link
Collaborator

@morgsmccauley morgsmccauley commented Aug 16, 2023

This PR write historical messages to a dedicated Redis Stream per indexer. Redis keys have been slightly restructured to make it easier to distinguish which streams need to be monitored, mainly:

  • Rather than storing the indexer name, i.e. morgs.near/test, we store the stream name, as this indexer could potentially have two streams, historical and real-time.
  • storage, which stores the indexer config, is now suffixed under the stream name, so we can have separate storage for both historical and real-time, preventing race conflicts between them

The last point is a bit of a weird one, but I am trying to avoid the scenario where we change the historical indexers code out from underneath it. This would be the case if the developer changes their indexer code while a historical process is still running. Keeping the indexer code consistent throughout a given historical backfill seems most correct.

It would be possible for a developer to start another historical process, while an existing one is ongoing, causing the indexer code to change, as historical processes for a given indexer share the same storage key. But I don't think it's worth actively preventing this, as our ideal state is to stop the existing historical backfill when another is kicked off.

Once an historical process has been kicked off, Runner will always start a process to listen to the stream, there's nothing telling it to stop listening. I may address this in a future PR, or just wait until we have control messages :).

@morgsmccauley morgsmccauley requested a review from a team as a code owner August 16, 2023 21:47
@morgsmccauley morgsmccauley marked this pull request as draft August 16, 2023 21:47
@morgsmccauley morgsmccauley changed the title DPLT-1077 Write historical messages to Redis Stream DPLT-1077 Process historical messages via Redis Streams Aug 17, 2023
new_indexer_function_copy,
Opts::parse(),
))
tokio::spawn(async move {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to ownership of redis_connection_manager to the thread

@@ -84,14 +92,6 @@ pub async fn xadd(
) -> anyhow::Result<()> {
tracing::debug!(target: STORAGE, "XADD: {:?}, {:?}", stream_key, fields);

// TODO: Remove stream cap when we finally start processing it
redis::cmd("XTRIM")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We now delete the stream messages after they are processed - so this can be removed

@@ -1,157 +1,57 @@
import { createClient } from 'redis';
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All Redis related actions have been extracted to RedisClient

@morgsmccauley morgsmccauley marked this pull request as ready for review August 17, 2023 02:56
@morgsmccauley morgsmccauley requested review from a team and removed request for a team August 17, 2023 03:10
@morgsmccauley morgsmccauley force-pushed the DPLT-1077-process-historical-messages branch from 5ccc003 to 75b8d8b Compare August 17, 2023 03:24
Copy link
Contributor

@roshaans roshaans left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall looks good to me.

storage::sadd(
redis_connection_manager,
storage::STREAMS_SET_KEY,
storage::generate_historical_stream_key(&indexer_function.get_full_name()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have the ability to distinguish between different historical processes?

Won't using the indexer function's name cause the old historical processes's with the same name to be overwritten?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and that's intentional. We don't want to have multiple concurrent historical processes for a given indexer. I'll be working on stopping existing processes soon, which is why I didn't bother creating unique streams.

@morgsmccauley morgsmccauley merged commit 639064d into main Aug 17, 2023
6 checks passed
@morgsmccauley morgsmccauley deleted the DPLT-1077-process-historical-messages branch August 17, 2023 20:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants