-
Notifications
You must be signed in to change notification settings - Fork 163
Fixes #246 - moved event stream processing to engine api #315
Conversation
This is also related #246 |
@vdemeester I noticed that after starting to work on this PR. I noticed your lib takes into account types; do you think I should incorporate that into this PR? I initially was trying to keep it close to how core docker has it. The only thing I was thinking that should be different if we did add type/action handlers is to use a struct for keys over strings to make sure we don't have a collisions in the future. Another option is to just add a wildcard action instead of adding types. Then you can use the event options to set |
Consider some of the suggestions in #89 (comment) for an API. I'm not sure if a callback-based API is the correct approach if you have to listen for errors on a channel anyways. This pattern is employed here. |
9b27ea3
to
1194e3f
Compare
case event := <-eventsq: | ||
processer := handler.Get(event.Action) | ||
if processer != nil { | ||
processer(event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep going back and forth here on whether or not to do this in another goroutine. Thoughts? I like not because then you keep to sequential processing which can be an important thing with events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That won't provide much. Serialization is cheap.
@stevvooe updated from your feedback 👍 |
b93f7e7
to
e0b0080
Compare
// handlers for processing. It's up to the caller to stop the processing by canceling | ||
// the context. The error channel will be closed once the end of the input stream has been | ||
// reached. If an error is received all processing will be stopped. | ||
func (cli *Client) EventsWithHandler(ctx context.Context, options types.EventsOptions, handler eventtypes.EventHandler) <-chan error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather see something like this:
Events(ctx context.Context, options types.EventsOptions) (<-chan eventtypes.Message, <-chan error)
Then, we can have a wrapper built with a handler on top of this.
The issue with the handler approach is that it requires too much boilerplate to get something useful. A handler system can be built on the above, but it is hard to readapt the handler system to channel based system, if that is required instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You put Events
as the name. Can I replace the legacy method with this new one? I figured we needed backwards compatibility so I went with another method name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can I replace the legacy method with this new one? I figured we needed backwards compatibility so I went with another method name.
All the users of the engine-api
vendor the dependency, so please feel free to change.
As it is, the method is very hard to use, so replacing the definition is fine. We'll stabilize these eventually, but we need to have an exported API worth stabilizing and we're not there yet.
ffc2373
to
bc698e0
Compare
@stevvooe i think this is what you had in mind 👍 |
@jhorwit2 Love the new Do you mind splitting the handler parts into a second PR? |
Mostly, I'd like to get the Events signature merged... 👍 👍 |
bc698e0
to
a8a4f69
Compare
Sounds good! Just pushed w/o the handler code @stevvooe. |
@jhorwit2 Thanks for the awesome contribution! LGTM Presumably, wait until have 1.12 GA until we merge this but it is up to engine-api maintainers. |
@jhorwit2 Looks very good, thanks 😻 But let's wait the 1.12 release before merging this 👼 |
@jhorwit2 would you like to make a PR in docker/docker with these changes to see how it behaves ? 👼 |
@vdemeester yea sure! ill cherry pick the commit in and open a pr with vendor changes. |
func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) { | ||
|
||
messages := make(chan events.Message, 1) | ||
errs := make(chan error, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevvooe You mentioned in previous comment to make this a buffered channel in case the error wasn't consumed, but when you do this you force the consumer to check the event channel after an error has been received because both the event and error channel can have data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
messages
shouldn't be buffered, only errq
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Derp... good catch.
9258520
to
50949ed
Compare
LGTM PTAL @tonistiigi @aaronlehmann |
buffer := new(bytes.Buffer) | ||
|
||
for _, e := range eventsCase.events { | ||
b, _ := json.Marshal(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it really worth it here to not check the error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, it's probably more efficient to use a json.Encoder:
buffer := new(bytes.Buffer)
enc := json.NewEncoder(buffer)
for _, e := eventsCase.events {
if err := enc.Encode(e); err != nil {
// do something
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just realized this is the test! still, would be good to check the error.
Isn't it bit awkward to return 2 channels? Also when using select on these 2 channels it is easy to skip last messages because you could get the error first. Why not return a object instead with method for getting the events or error. Or channel returning union of these fields. |
Signed-off-by: Joshua Horwitz <[email protected]>
50949ed
to
863e816
Compare
@tonistiigi because the message channel isn't buffered that won't happen |
Yeah, an object with a Next() method that returns |
@tamird That would disable the ability to select on a channel while consuming. You could always build a wrapper that does the |
That's true. |
select { | ||
case <-ctx.Done(): | ||
return | ||
default: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pattern makes sense, but I would end this select
here to reduce unnecessary indentation.
LGTM |
LGTM. Thanks! |
So what's the plan with this since engine-api is now deprecated? |
@jhorwit2 Just combine it with moby/moby#25853 and copy this code to |
anyway let's close this now as it is all in moby/moby#25853 |
Signed-off-by: Joshua Horwitz [email protected]
Move the events logic to the engine-api while keeping backwards compatibility with the original
Events
method. Porting this logic to the engine-api makes it easier to write libraries utilizing the events code. I was having to use both docker core & engine-api while writing a library. Docker core has some dependencies that aren't needed by external libraries.