Skip to content
This repository has been archived by the owner on Nov 23, 2019. It is now read-only.

Commit

Permalink
Moved event stream processing into engine api
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Horwitz <[email protected]>
  • Loading branch information
jhorwit2 committed Jul 21, 2016
1 parent 98348ad commit a8a4f69
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 38 deletions.
68 changes: 58 additions & 10 deletions client/events.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,70 @@
package client

import (
"io"
"encoding/json"
"net/url"
"time"

"golang.org/x/net/context"

"github.com/docker/engine-api/types"
"github.com/docker/engine-api/types/events"
"github.com/docker/engine-api/types/filters"
timetypes "github.com/docker/engine-api/types/time"
)

// Events returns a stream of events in the daemon in a ReadCloser.
// It's up to the caller to close the stream.
func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (io.ReadCloser, error) {
// Events returns a stream of events in the daemon. It's up to the caller to close the stream
// by cancelling the context. Once the stream has been completely read an io.EOF error will
// be sent over the error channel. If an error is sent all processing will be stopped. It's up
// to the caller to reopen the stream in the event of an error by reinvoking this method.
func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) {

messages := make(chan events.Message, 1)
errs := make(chan error, 1)

go func() {
defer close(messages)
defer close(errs)

query, err := buildEventsQueryParams(cli.version, options)
if err != nil {
errs <- err
return
}

resp, err := cli.get(ctx, "/events", query, nil)
if err != nil {
errs <- err
return
}
defer resp.body.Close()

decoder := json.NewDecoder(resp.body)

for {
select {
case <-ctx.Done():
return
default:
var event events.Message
if err := decoder.Decode(&event); err != nil {
errs <- err
return
}

select {
case messages <- event:
case <-ctx.Done():
return
}
}
}
}()

return messages, errs
}

func buildEventsQueryParams(cliVersion string, options types.EventsOptions) (url.Values, error) {
query := url.Values{}
ref := time.Now()

Expand All @@ -25,24 +75,22 @@ func (cli *Client) Events(ctx context.Context, options types.EventsOptions) (io.
}
query.Set("since", ts)
}

if options.Until != "" {
ts, err := timetypes.GetTimestamp(options.Until, ref)
if err != nil {
return nil, err
}
query.Set("until", ts)
}

if options.Filters.Len() > 0 {
filterJSON, err := filters.ToParamWithVersion(cli.version, options.Filters)
filterJSON, err := filters.ToParamWithVersion(cliVersion, options.Filters)
if err != nil {
return nil, err
}
query.Set("filters", filterJSON)
}

serverResponse, err := cli.get(ctx, "/events", query, nil)
if err != nil {
return nil, err
}
return serverResponse.body, nil
return query, nil
}
93 changes: 66 additions & 27 deletions client/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package client

import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
Expand All @@ -11,6 +13,7 @@ import (
"golang.org/x/net/context"

"github.com/docker/engine-api/types"
"github.com/docker/engine-api/types/events"
"github.com/docker/engine-api/types/filters"
)

Expand All @@ -36,7 +39,8 @@ func TestEventsErrorInOptions(t *testing.T) {
client := &Client{
transport: newMockClient(nil, errorMock(http.StatusInternalServerError, "Server error")),
}
_, err := client.Events(context.Background(), e.options)
_, errs := client.Events(context.Background(), e.options)
err := <-errs
if err == nil || !strings.Contains(err.Error(), e.expectedError) {
t.Fatalf("expected a error %q, got %v", e.expectedError, err)
}
Expand All @@ -47,39 +51,36 @@ func TestEventsErrorFromServer(t *testing.T) {
client := &Client{
transport: newMockClient(nil, errorMock(http.StatusInternalServerError, "Server error")),
}
_, err := client.Events(context.Background(), types.EventsOptions{})
_, errs := client.Events(context.Background(), types.EventsOptions{})
err := <-errs
if err == nil || err.Error() != "Error response from daemon: Server error" {
t.Fatalf("expected a Server Error, got %v", err)
}
}

func TestEvents(t *testing.T) {

expectedURL := "/events"

filters := filters.NewArgs()
filters.Add("label", "label1")
filters.Add("label", "label2")
expectedFiltersJSON := `{"label":{"label1":true,"label2":true}}`
filters.Add("type", events.ContainerEventType)
expectedFiltersJSON := fmt.Sprintf(`{"type":{"%s":true}}`, events.ContainerEventType)

eventsCases := []struct {
options types.EventsOptions
events []events.Message
expectedEvents map[string]bool
expectedQueryParams map[string]string
}{
{
options: types.EventsOptions{
Since: "invalid but valid",
},
expectedQueryParams: map[string]string{
"since": "invalid but valid",
},
},
{
options: types.EventsOptions{
Until: "invalid but valid",
Filters: filters,
},
expectedQueryParams: map[string]string{
"until": "invalid but valid",
"filters": expectedFiltersJSON,
},
events: []events.Message{},
expectedEvents: make(map[string]bool),
},
{
options: types.EventsOptions{
Expand All @@ -88,6 +89,28 @@ func TestEvents(t *testing.T) {
expectedQueryParams: map[string]string{
"filters": expectedFiltersJSON,
},
events: []events.Message{
events.Message{
Type: "container",
ID: "1",
Action: "create",
},
events.Message{
Type: "container",
ID: "2",
Action: "die",
},
events.Message{
Type: "container",
ID: "3",
Action: "create",
},
},
expectedEvents: map[string]bool{
"1": true,
"2": true,
"3": true,
},
},
}

Expand All @@ -98,29 +121,45 @@ func TestEvents(t *testing.T) {
return nil, fmt.Errorf("Expected URL '%s', got '%s'", expectedURL, req.URL)
}
query := req.URL.Query()

for key, expected := range eventsCase.expectedQueryParams {
actual := query.Get(key)
if actual != expected {
return nil, fmt.Errorf("%s not set in URL query properly. Expected '%s', got %s", key, expected, actual)
}
}

buffer := new(bytes.Buffer)

for _, e := range eventsCase.events {
b, _ := json.Marshal(e)
buffer.Write(b)
}

return &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader([]byte("response"))),
Body: ioutil.NopCloser(buffer),
}, nil
}),
}
body, err := client.Events(context.Background(), eventsCase.options)
if err != nil {
t.Fatal(err)
}
defer body.Close()
content, err := ioutil.ReadAll(body)
if err != nil {
t.Fatal(err)
}
if string(content) != "response" {
t.Fatalf("expected response to contain 'response', got %s", string(content))

messages, errs := client.Events(context.Background(), eventsCase.options)

loop:
for {
select {
case err := <-errs:
if err != nil && err != io.EOF {
t.Fatal(err)
}

break loop
case e := <-messages:
_, ok := eventsCase.expectedEvents[e.ID]
if !ok {
t.Fatalf("event received not expected with action %s & id %s", e.Action, e.ID)
}
}
}
}
}
3 changes: 2 additions & 1 deletion client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/docker/engine-api/types"
"github.com/docker/engine-api/types/container"
"github.com/docker/engine-api/types/events"
"github.com/docker/engine-api/types/filters"
"github.com/docker/engine-api/types/network"
"github.com/docker/engine-api/types/registry"
Expand Down Expand Up @@ -120,7 +121,7 @@ type SwarmAPIClient interface {

// SystemAPIClient defines API client methods for the system
type SystemAPIClient interface {
Events(ctx context.Context, options types.EventsOptions) (io.ReadCloser, error)
Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error)
Info(ctx context.Context) (types.Info, error)
RegistryLogin(ctx context.Context, auth types.AuthConfig) (types.AuthResponse, error)
}
Expand Down

0 comments on commit a8a4f69

Please sign in to comment.