diff --git a/client/events.go b/client/events.go index f22a18e1..e83162cd 100644 --- a/client/events.go +++ b/client/events.go @@ -1,20 +1,70 @@ package client import ( - "io" + "encoding/json" "net/url" "time" "golang.org/x/net/context" "github.com/docker/engine-api/types" + "github.com/docker/engine-api/types/events" "github.com/docker/engine-api/types/filters" timetypes "github.com/docker/engine-api/types/time" ) -// Events returns a stream of events in the daemon in a ReadCloser. -// It's up to the caller to close the stream. -func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (io.ReadCloser, error) { +// Events returns a stream of events in the daemon. It's up to the caller to close the stream +// by cancelling the context. Once the stream has been completely read an io.EOF error will +// be sent over the error channel. If an error is sent all processing will be stopped. It's up +// to the caller to reopen the stream in the event of an error by reinvoking this method. +func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) { + + messages := make(chan events.Message, 1) + errs := make(chan error, 1) + + go func() { + defer close(messages) + defer close(errs) + + query, err := buildEventsQueryParams(cli.version, options) + if err != nil { + errs <- err + return + } + + resp, err := cli.get(ctx, "/events", query, nil) + if err != nil { + errs <- err + return + } + defer resp.body.Close() + + decoder := json.NewDecoder(resp.body) + + for { + select { + case <-ctx.Done(): + return + default: + var event events.Message + if err := decoder.Decode(&event); err != nil { + errs <- err + return + } + + select { + case messages <- event: + case <-ctx.Done(): + return + } + } + } + }() + + return messages, errs +} + +func buildEventsQueryParams(cliVersion string, options types.EventsOptions) (url.Values, error) { query := url.Values{} ref := time.Now() @@ -25,6 +75,7 @@ func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (io. } query.Set("since", ts) } + if options.Until != "" { ts, err := timetypes.GetTimestamp(options.Until, ref) if err != nil { @@ -32,17 +83,14 @@ func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (io. } query.Set("until", ts) } + if options.Filters.Len() > 0 { - filterJSON, err := filters.ToParamWithVersion(cli.version, options.Filters) + filterJSON, err := filters.ToParamWithVersion(cliVersion, options.Filters) if err != nil { return nil, err } query.Set("filters", filterJSON) } - serverResponse, err := cli.get(ctx, "/events", query, nil) - if err != nil { - return nil, err - } - return serverResponse.body, nil + return query, nil } diff --git a/client/events_test.go b/client/events_test.go index cb459384..b6b765f3 100644 --- a/client/events_test.go +++ b/client/events_test.go @@ -2,7 +2,9 @@ package client import ( "bytes" + "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "strings" @@ -11,6 +13,7 @@ import ( "golang.org/x/net/context" "github.com/docker/engine-api/types" + "github.com/docker/engine-api/types/events" "github.com/docker/engine-api/types/filters" ) @@ -36,7 +39,8 @@ func TestEventsErrorInOptions(t *testing.T) { client := &Client{ transport: newMockClient(nil, errorMock(http.StatusInternalServerError, "Server error")), } - _, err := client.Events(context.Background(), e.options) + _, errs := client.Events(context.Background(), e.options) + err := <-errs if err == nil || !strings.Contains(err.Error(), e.expectedError) { t.Fatalf("expected a error %q, got %v", e.expectedError, err) } @@ -47,39 +51,36 @@ func TestEventsErrorFromServer(t *testing.T) { client := &Client{ transport: newMockClient(nil, errorMock(http.StatusInternalServerError, "Server error")), } - _, err := client.Events(context.Background(), types.EventsOptions{}) + _, errs := client.Events(context.Background(), types.EventsOptions{}) + err := <-errs if err == nil || err.Error() != "Error response from daemon: Server error" { t.Fatalf("expected a Server Error, got %v", err) } } func TestEvents(t *testing.T) { + expectedURL := "/events" filters := filters.NewArgs() - filters.Add("label", "label1") - filters.Add("label", "label2") - expectedFiltersJSON := `{"label":{"label1":true,"label2":true}}` + filters.Add("type", events.ContainerEventType) + expectedFiltersJSON := fmt.Sprintf(`{"type":{"%s":true}}`, events.ContainerEventType) eventsCases := []struct { options types.EventsOptions + events []events.Message + expectedEvents map[string]bool expectedQueryParams map[string]string }{ { options: types.EventsOptions{ - Since: "invalid but valid", - }, - expectedQueryParams: map[string]string{ - "since": "invalid but valid", - }, - }, - { - options: types.EventsOptions{ - Until: "invalid but valid", + Filters: filters, }, expectedQueryParams: map[string]string{ - "until": "invalid but valid", + "filters": expectedFiltersJSON, }, + events: []events.Message{}, + expectedEvents: make(map[string]bool), }, { options: types.EventsOptions{ @@ -88,6 +89,28 @@ func TestEvents(t *testing.T) { expectedQueryParams: map[string]string{ "filters": expectedFiltersJSON, }, + events: []events.Message{ + events.Message{ + Type: "container", + ID: "1", + Action: "create", + }, + events.Message{ + Type: "container", + ID: "2", + Action: "die", + }, + events.Message{ + Type: "container", + ID: "3", + Action: "create", + }, + }, + expectedEvents: map[string]bool{ + "1": true, + "2": true, + "3": true, + }, }, } @@ -98,29 +121,45 @@ func TestEvents(t *testing.T) { return nil, fmt.Errorf("Expected URL '%s', got '%s'", expectedURL, req.URL) } query := req.URL.Query() + for key, expected := range eventsCase.expectedQueryParams { actual := query.Get(key) if actual != expected { return nil, fmt.Errorf("%s not set in URL query properly. Expected '%s', got %s", key, expected, actual) } } + + buffer := new(bytes.Buffer) + + for _, e := range eventsCase.events { + b, _ := json.Marshal(e) + buffer.Write(b) + } + return &http.Response{ StatusCode: http.StatusOK, - Body: ioutil.NopCloser(bytes.NewReader([]byte("response"))), + Body: ioutil.NopCloser(buffer), }, nil }), } - body, err := client.Events(context.Background(), eventsCase.options) - if err != nil { - t.Fatal(err) - } - defer body.Close() - content, err := ioutil.ReadAll(body) - if err != nil { - t.Fatal(err) - } - if string(content) != "response" { - t.Fatalf("expected response to contain 'response', got %s", string(content)) + + messages, errs := client.Events(context.Background(), eventsCase.options) + + loop: + for { + select { + case err := <-errs: + if err != nil && err != io.EOF { + t.Fatal(err) + } + + break loop + case e := <-messages: + _, ok := eventsCase.expectedEvents[e.ID] + if !ok { + t.Fatalf("event received not expected with action %s & id %s", e.Action, e.ID) + } + } } } } diff --git a/client/interface.go b/client/interface.go index 1cadaef5..56cd3083 100644 --- a/client/interface.go +++ b/client/interface.go @@ -6,6 +6,7 @@ import ( "github.com/docker/engine-api/types" "github.com/docker/engine-api/types/container" + "github.com/docker/engine-api/types/events" "github.com/docker/engine-api/types/filters" "github.com/docker/engine-api/types/network" "github.com/docker/engine-api/types/registry" @@ -120,7 +121,7 @@ type SwarmAPIClient interface { // SystemAPIClient defines API client methods for the system type SystemAPIClient interface { - Events(ctx context.Context, options types.EventsOptions) (io.ReadCloser, error) + Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) Info(ctx context.Context) (types.Info, error) RegistryLogin(ctx context.Context, auth types.AuthConfig) (types.AuthResponse, error) }