Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gateio: Add concurrent subscription support #1722

Open
wants to merge 23 commits into
base: master
Choose a base branch
from

Conversation

shazbert
Copy link
Collaborator

@shazbert shazbert commented Nov 19, 2024

PR Description

  • Move ParallelChanOp functionality which is epic (shout out @gbjk 🔥) and create generic common func ProcessBatches
  • Create specific generic processing function with different signature for higher throughput ProcessElementsByBatch
  • concurrently subscribe/unsubscribe support across asset types
  • remove rate limiter
  • requires gateio: update rate limit definitions #1733

Type of change

Please delete options that are not relevant and add an x in [] as item is complete.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

How has this been tested

Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration and
also consider improving test coverage whilst working on a certain feature or package.

  • go test ./... -race
  • golangci-lint run
  • Test X

Checklist

  • My code follows the style guidelines of this project
  • I have performed a self-review of my own code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation and regenerated documentation via the documentation tool
  • My changes generate no new warnings
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally and on Github Actions with my changes
  • Any dependent changes have been merged and published in downstream modules

Ryan O'Hara-Reid added 2 commits November 20, 2024 09:30
…rder_book (snapshot) from incremental spot.order_book_update include commentary
@shazbert shazbert added the review me This pull request is ready for review label Nov 19, 2024
@shazbert shazbert requested a review from a team November 19, 2024 23:21
@shazbert shazbert self-assigned this Nov 19, 2024
@shazbert shazbert added blocked and removed review me This pull request is ready for review labels Nov 25, 2024
@shazbert shazbert added review me This pull request is ready for review medium priority and removed blocked labels Nov 25, 2024
@shazbert shazbert requested a review from gbjk November 25, 2024 22:25
@shazbert shazbert added blocked and removed review me This pull request is ready for review labels Nov 29, 2024
shazbert added 2 commits December 5, 2024 15:53
… item to nil for systems that do not require rate limiting; add glorious nit
@shazbert shazbert mentioned this pull request Dec 5, 2024
15 tasks
@shazbert shazbert added nomerge requires dependency This pull request is dependent on another, so it can't be merged until the dependent one is merged and removed blocked labels Dec 5, 2024
@shazbert shazbert changed the title gateio: Add concurrent subscription support and remove the need for rate limiting gateio: Add concurrent subscription support Dec 5, 2024
Copy link
Collaborator

@gbjk gbjk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feedback on the common part only to start with.

More to come, but some of it is easy to action.

common/common.go Outdated
Comment on lines 681 to 684
// ProcessBatches takes a slice of elements and processes them in batches of `batchSize` concurrently.
// For example, if batchSize = 10 and list has 100 elements, 10 goroutines will be created to process
// 10 batches. Each batch is processed sequentially by the `process` function, and batches are processed
// in parallel.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid I had to re-read this comment a few times.
I don't believe we should say:
Each batch is processed sequentially by the `process` function
Because I don't think you can guarantee the goros will execute in the order scheduled.

I'd suggest vastly simplified:

Suggested change
// ProcessBatches takes a slice of elements and processes them in batches of `batchSize` concurrently.
// For example, if batchSize = 10 and list has 100 elements, 10 goroutines will be created to process
// 10 batches. Each batch is processed sequentially by the `process` function, and batches are processed
// in parallel.
// ProcessBatches processes `S` concurrently in batches of `batchSize`

Note: I often remove types from comments, because the user sees the param names and types in the signature in godoc and all other circumstances. It doesn't help to repeat the types, and it makes it harder to read and internalise quickly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I don't think you can guarantee the goros will execute in the order scheduled.

They are executed sequentially in that individual go routine.

Here's a better example:

List of elements: [E1, E2, E3, ..., E100]

Batch size: 10

Step-by-step process:
1. Divide the list into batches of 10 elements each:
   Batch 1: [E1, E2, E3, ..., E10]
   Batch 2: [E11, E12, E13, ..., E20]
   ...
   Batch 10: [E91, E92, E93, ..., E100]

2. Spawn 10 goroutines to process the batches concurrently:
   Goroutine 1 -> Processes Batch 1
   Goroutine 2 -> Processes Batch 2
   ...
   Goroutine 10 -> Processes Batch 10

3. Within each goroutine:
   - Process elements in the batch sequentially using the `process` function.
   Example for Batch 1:
       Process E1 -> Process E2 -> Process E3 -> ... -> Process E10

Copy link
Collaborator

@gloriousCode gloriousCode Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't guarantee process does it sequentially. You can't even guarantee that process does anything from here (edit: whoops except return error). It's just that you're doing it sequentially for ws subs for the moment. I agree that comment about sequentially processed should be removed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this as an option for the commentary.?
// ProcessBatches splits S into batches of batchSize and processes each batch concurrently.

There is also an issue when batchSize >= len(S) then there is no concurrency. Should I handle that case?

common/common.go Outdated
// For example, if batchSize = 10 and list has 100 elements, 10 goroutines will process 10 elements concurrently
// in each batch. Each batch completes before the next batch begins.
// `process` is a function called for each individual element with its index and value.
func ProcessElementsByBatch[S ~[]E, E any](batchSize int, list S, process ElementProcessor[E]) (errs error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more about throttling than it is batching.
I'll think today about naming / wording, but it wasn't until the third time round that I really clocked fully that this is a concurrent throughput throttler, not a batcher.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also just realised:
Is this implementation mechanics truly optimal for your use-case?
Normally you'd see things processed with a concurrency of 10.
Your implementation here may end up with a concurrency of 1 or 2 at then end of each batch, instead of maintaining 10 throughout
Maybe this is going to be self-evident in the code that calls this.

Copy link
Collaborator Author

@shazbert shazbert Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tested this as happening? Here's a better example like this one above:

List of elements: [E1, E2, E3, ..., E100]

Batch size: 10

Step-by-step process:
1. Divide the list into batches of 10 elements each:
   Batch 1: [E1, E2, E3, ..., E10]
   Batch 2: [E11, E12, E13, ..., E20]
   ...
   Batch 10: [E91, E92, E93, ..., E100]

2. Process the batches sequentially:
   Batch 1 -> Batch 2 -> Batch 3 -> ... -> Batch 10
   (Each batch completes fully before moving to the next batch.)

3. Within each batch:
   - Spawn 10 goroutines (one for each element in the batch).
   - Each goroutine processes a single element by calling the `process` function with its index and value.
   
   Example for Batch 1:
       Goroutine 1 -> `process(0, E1)`
       Goroutine 2 -> `process(1, E2)`
       Goroutine 3 -> `process(2, E3)`
       ...
       Goroutine 10 -> `process(9, E10)`

So concurrency with this one scales linearly to how big the batch size is. As an example instead of 10 above we have 100 batch size in this case will spawn 100 go routines and process the entire list at once. Whereas the original I think (?) had an inverse relationship with the batch size, the higher the batch size the lower the concurrency. 100 batch size will process the entire list sequentially but a batch size of 1 would process instantly.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I did understand from the start.
I just don't know why you'd want to separate things into groups of 10 and then process all 10 before processing the next batch.
If 9 subscriptions complete in 0.2s each concurrently, but gateio decides to take 4 seconds on just one in that batch, because it decided now's a good time to cache invalidate, or whatever, then you're sitting there twiddling your thumbs.

I can imagine situations where this pattern is necessary, where there's some inter-relation that 10 must happen fully before the next 10. I just don't think that's the case for you here.

What would be the downside of a channel throttle that meant that you're always processing no more than 10 subscriptions?
That would make this a standard mini-worker and queue pattern.
I'm sure you know, but: Fill a channel with slice, spawn N goros which read from the channel and exit when they don't get anything, waitgroup them and it's done.

Happy to throw up a PR into your branch if it helps.

But maybe I'm being really stupid and missing something obvious about why we'd need fully completed batches.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just remembered the main reasoning for Gateio snapshots specifically with 100 len books starts to interfere with gorilla websocket buffer and clips data, now that rate limiting subscription requests have been removed, so its been limited to 200 sub blocks which spawns 200 go routines. I will add that into the commentary 😆.

If 9 subscriptions complete in 0.2s each concurrently, but gateio decides to take 4 seconds on just one in that batch, because it decided now's a good time to cache invalidate, or whatever, then you're sitting there twiddling your thumbs.

Has this happened in testing? We can always mitigate it with a context deadline? But this seems like a bottom quintile issue only on startup and not really an issue with a long running program as the changes with subs are going to be limited to scaling up and down on exchange pair additions, removals and strategy requirements.

Will check out your example now thanks.

common/common.go Outdated Show resolved Hide resolved
common/common.go Outdated Show resolved Hide resolved
common/common.go Outdated Show resolved Hide resolved
common/common.go Outdated Show resolved Hide resolved
common/common.go Outdated Show resolved Hide resolved
Comment on lines 587 to 592
return common.ProcessElementsByBatch(subscriptionBatchCount, subs, func(_ int, s *subscription.Subscription) error {
if err := g.manageTemplatePayload(ctx, conn, event, s); err != nil {
return fmt.Errorf("%s %s %s: %w", s.Channel, s.Asset, s.Pairs, err)
}
return nil
})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment to explain why this should be done in batches rather than just throttled ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment.
I think your explanation doesn't justify staggered batch processing over outright throughput workers.
I've provided an example implementation covering what I think might be more suitable.

@shazbert shazbert added review me This pull request is ready for review and removed nomerge requires dependency This pull request is dependent on another, so it can't be merged until the dependent one is merged labels Dec 19, 2024
@shazbert shazbert requested a review from gbjk December 19, 2024 22:46
Comment on lines +651 to +668
// ElementProcessor defines the function signature for processing an individual element with its index.
type ElementProcessor[E any] func(index int, element E) error

// ProcessElementsByBatch takes a slice of elements and processes them in batches of `batchSize` concurrently.
// For example, if batchSize = 10 and list has 100 elements, 10 goroutines will process 10 elements concurrently
// in each batch. Each batch completes before the next batch begins.
// `process` is a function called for each individual element with its index and value.
func ProcessElementsByBatch[S ~[]E, E any](batchSize int, list S, process ElementProcessor[E]) error {
var errs error
for i, s := range Batch(list, batchSize) {
err := CollectErrors(len(s))
for j, e := range s {
go func(index int, element E) { err.C <- process(index, element); err.Wg.Done() }((i*batchSize)+j, e)
}
errs = AppendError(errs, err.Collect())
}
return errs
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so this is what I think is appropriate for your use case.
Note: Completely untested. It compiles and it's (probably) sane)
I've omitted i because I can't see that you need it, and can't imagine how it'd make sense given it's not guaranteed to be in sequence at all. If there was some use-case for it that I'm missing, then you could use a sequence generator to get it, or something. You could also maybe do this less elegantly by using mutexes and reducing the slice, or using a producer goro.
Anyway. Enough babbling. More code:

Suggested change
// ElementProcessor defines the function signature for processing an individual element with its index.
type ElementProcessor[E any] func(index int, element E) error
// ProcessElementsByBatch takes a slice of elements and processes them in batches of `batchSize` concurrently.
// For example, if batchSize = 10 and list has 100 elements, 10 goroutines will process 10 elements concurrently
// in each batch. Each batch completes before the next batch begins.
// `process` is a function called for each individual element with its index and value.
func ProcessElementsByBatch[S ~[]E, E any](batchSize int, list S, process ElementProcessor[E]) error {
var errs error
for i, s := range Batch(list, batchSize) {
err := CollectErrors(len(s))
for j, e := range s {
go func(index int, element E) { err.C <- process(index, element); err.Wg.Done() }((i*batchSize)+j, e)
}
errs = AppendError(errs, err.Collect())
}
return errs
}
// PipelineProcessor defines the function signature for processing an individual elements
type PipelineProcessor[E any] func(element E) error
// ThrottledPipeline processes a slice concurrently with a throttled number of workers
func ThrottledPipeline[S ~[]E, E any](workers int, list S, process PipelineProcessor[E]) error {
q := make(chan E, len(list))
for _, s := range list {
q <- s
}
err := CollectErrors(len(list))
for range workers {
go func() {
defer err.Wg.Done()
for {
select {
case s := <-q:
err.C <- process(s)
default:
return
}
}
}()
}
return err.Collect()
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's definitely a use case for this if it works 🔥 Note: Completely untested 😆 Will Save this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
medium priority review me This pull request is ready for review
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants