-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DPLT-1076 create node worker to process real time streams #159
DPLT-1076 create node worker to process real time streams #159
Conversation
1e4313d
to
fcb0bcd
Compare
79cfae0
to
aeaeaef
Compare
storage::sadd( | ||
context.redis_connection_manager, | ||
storage::INDEXER_SET_KEY, | ||
indexer_function.get_full_name(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Storing the indexer name alone to make it easier to build the many other keys related to this indexer, i.e. :storage
and :stream
|
||
const results = await client.xRead( | ||
{ key: generateStreamKey(indexerName), id }, | ||
// can't use blocking calls as running single threaded |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might not be true, I'll investigate more later
Merging as I'd like to test the related infra. Feel free to comment retrospectively. |
Looks good! |
This PR creates a Node app for processing the real-time Redis streams.
I've created a new top-level directory called
runner/
to house this code. It borrows most of the code fromindexer-js-queue-handler
, but I have:I've added separate commits for copying the file over, and adding the above modifications if you are interested/concerned about the changes. The core logic is in
runner/src/index.ts
, most other code has remained the same aside from the above.To process each stream,
runner
will continuously poll theindexers
set to get the list of current indexers, and spin out a promise which polls its respective stream. As Node is single-threaded, this code isn't truely concurrent, and all JS execution will be executed sequentially. However, I/O will be offloaded to the system allowing for some concurrency, and hopefully a system which is still somewhat performant. I'll address concurrency in future PRs.