diff --git a/TODO.org b/TODO.org index 5902ac0..a8b8294 100644 --- a/TODO.org +++ b/TODO.org @@ -91,9 +91,7 @@ SG-Proto Orgfile ** Unorganized *** Organize TODOs - *** Cloud (self?) deploy service - *** Better testing of REST resource security. *** More auth.Login tests *** Ledger? @@ -122,7 +120,6 @@ SG-Proto Orgfile *** Bad usernames cannot be looked up for expired Sessions **** This is just a reverse lookup. **** Can't find example of why this is a bug. -*** If database is closed, can't clean up rivers *** No auth timeout / river / notifs closure *** User set type *** Think about true vs false users in groups -- use slice in API? @@ -213,7 +210,8 @@ SG-Proto Orgfile *** Thoroughly test ws package *** client package uses custom HTTP client instead of global -* v0.0.2 + +* v0.0.3 ** TODO More auth.Login tests ** TODO Test HandleDeleteToken (URL encoding, etc.) ** TODO Don't check whether token is valid in REST since this is in mw. @@ -248,6 +246,16 @@ SG-Proto Orgfile ** TODO Migration tests ** TODO Don't deliver AppVeyor binary unless the branch is merged ** TODO Figure out travisCI build artifacts / releases +** TODO Update changelog v0.0.1 +** TODO Update CONTRIBUTING.md etc + +* v0.0.2 +** DONE Add GET /admin/tickets?count= + CLOSED: [2017-04-10 Mon 08:39] +** DONE Make stream/river errExists clearer + CLOSED: [2017-04-10 Mon 08:30] +** DONE If database is closed, can't clean up rivers + CLOSED: [2017-05-08 Mon 14:58] * v0.0.1 ** DONE Bugs @@ -620,4 +628,5 @@ SG-Proto Orgfile CLOSED: [2017-03-16 Thu 09:59] *** DONE Unit tests CLOSED: [2017-03-16 Thu 09:59] -** TODO Deploy +** DONE Deploy + CLOSED: [2017-04-30 Sun 12:16] diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..6684c60 --- /dev/null +++ b/client/client.go @@ -0,0 +1,188 @@ +package client + +import ( + "crypto/sha256" + "crypto/tls" + "encoding/base64" + "errors" + "fmt" + "net/http" + "net/url" + + "github.com/synapse-garden/sg-proto/auth" + "github.com/synapse-garden/sg-proto/incept" + "github.com/synapse-garden/sg-proto/rest" + "github.com/synapse-garden/sg-proto/stream" + "github.com/synapse-garden/sg-proto/users" + + ws "golang.org/x/net/websocket" +) + +// Client is a client for an SG backend. +type Client struct { + State State + APIKey string + Backend *url.URL +} + +// Info sets the given info based on the backend's /source. +func (c *Client) Info(i *rest.SourceInfo) error { + return DecodeGet(i, c.Backend.String()+"/source") +} + +// GetTickets returns GET /admin/tickets?count=n +func (c *Client) GetTickets(into *[]incept.Ticket, n int) error { + if c.APIKey == "" { + return errors.New("client must have a valid admin API key") + } + + str := c.Backend.String() + "/admin/tickets" + if n > 1 { + str = fmt.Sprintf("%s?count=%d", str, n) + } + return DecodeGet(into, str, AdminHeader(c.APIKey)) +} + +// CreateTickets creates incept tickets using the given API key. +func (c *Client) CreateTickets(into *[]incept.Ticket, n int) error { + if c.APIKey == "" { + return errors.New("client must have a valid admin API key") + } + + str := c.Backend.String() + "/admin/tickets" + if n > 1 { + str = fmt.Sprintf("%s?count=%d", str, n) + } + return DecodePost(into, str, nil, AdminHeader(c.APIKey)) +} + +// CreateLogin creates a user with the given name and password (which is +// hashed before sending) and unmarshals the response into the given *Login. +func (c *Client) CreateLogin( + l *auth.Login, + ticket, name, pw string, +) error { + b := sha256.Sum256([]byte(pw)) + return DecodePost(l, + c.Backend.String()+"/incept/"+ticket, + &auth.Login{ + User: users.User{Name: name}, + PWHash: b[:], + }, + ) +} + +// VerifyAdmin checks that the Client's APIKey is a valid Admin key. +func (c *Client) VerifyAdmin(key string) error { + var ok bool + return DecodeGet(&ok, c.Backend.String()+"/admin/verify", + AdminHeader(key)) +} + +// Login uses the given *auth.Login's Token to get a Session. +func (c *Client) Login(l *auth.Login) error { + s := c.State.Session + return DecodePost(s, c.Backend.String()+"/tokens", l) +} + +// Logout deletes the given *auth.Login's Token. +func (c *Client) Logout() error { + if sesh := c.State.Session; sesh == nil { + return errors.New("nil session") + } else if t := sesh.Token; t == nil { + return errors.New("nil session token") + } else { + return Delete(c.Backend.String()+"/tokens", + AuthHeader(auth.BearerType, t)) + } +} + +// GetProfile gets the User for the given Session. +func (c *Client) GetProfile(u *users.User) error { + s := c.State.Session + return DecodeGet(u, c.Backend.String()+"/profile", + AuthHeader(auth.BearerType, s.Token)) +} + +// DeleteProfile deletes the Session owner's profile. +func (c *Client) DeleteProfile() error { + s := c.State.Session + return Delete(c.Backend.String()+"/profile", + AuthHeader(auth.BearerType, s.Token)) +} + +// CreateStream creates a new Stream belonging to the Session owner. +func (c *Client) CreateStream(str *stream.Stream) error { + if sesh := c.State.Session; sesh == nil { + return errors.New("nil session") + } else if t := sesh.Token; t == nil { + return errors.New("nil session token") + } else { + return DecodePost(str, + c.Backend.String()+"/streams", + str, + AuthHeader(auth.BearerType, t), + ) + } +} + +// GetStream gets a Stream by ID. +func (c *Client) GetStream(str *stream.Stream, id string) error { + if sesh := c.State.Session; sesh == nil { + return errors.New("nil session") + } else if t := sesh.Token; t == nil { + return errors.New("nil session token") + } else { + return DecodeGet(str, + c.Backend.String()+"/streams/"+id, + AuthHeader(auth.BearerType, t), + ) + } +} + +// AllStreams gets the User's owned Streams. +func (c *Client) AllStreams(strs *[]*stream.Stream, filters ...Param) error { + if sesh := c.State.Session; sesh == nil { + return errors.New("nil session") + } else if t := sesh.Token; t == nil { + return errors.New("nil session token") + } else { + return DecodeGet(strs, fmt.Sprintf( + "%s/streams%s", + c.Backend, + ApplyParams(filters...)), + AuthHeader(auth.BearerType, t), + ) + } +} + +// GetStreamWS opens and returns a *golang.org/x/net/websocket.Conn. +func (c *Client) GetStreamWS(id string) (*ws.Conn, error) { + s := c.State.Session + if c.State.Session == nil { + return nil, fmt.Errorf("cannot get stream with a nil Session") + } + backend := *c.Backend + switch backend.Scheme { + case "http": + backend.Scheme = "ws" + case "https": + backend.Scheme = "wss" + } + + backend.Path += "/streams/" + id + "/start" + var conf *tls.Config + if t, ok := customClient.Transport.(*http.Transport); ok { + conf = t.TLSClientConfig + } + + wsToken := base64.RawURLEncoding.EncodeToString(s.Token) + + return ws.DialConfig(&ws.Config{ + Location: &backend, + Origin: &url.URL{}, + TlsConfig: conf, + Version: ws.ProtocolVersionHybi13, + Protocol: []string{"Bearer+" + wsToken}, + }) +} diff --git a/client/client_test.go b/client/client_test.go new file mode 100644 index 0000000..2316ee6 --- /dev/null +++ b/client/client_test.go @@ -0,0 +1,145 @@ +package client_test + +import ( + "crypto/sha256" + "time" + + uuid "github.com/satori/go.uuid" + "github.com/synapse-garden/sg-proto/auth" + "github.com/synapse-garden/sg-proto/incept" + "github.com/synapse-garden/sg-proto/rest" + "github.com/synapse-garden/sg-proto/users" + + . "gopkg.in/check.v1" +) + +func (s *ClientSuite) TestInfo(c *C) { + info := new(rest.SourceInfo) + c.Assert(s.cli.Info(info), IsNil) + c.Check(info, DeepEquals, &src) +} + +func (s *ClientSuite) TestCreateTickets(c *C) { + ak := s.cli.APIKey + s.cli.APIKey = "" + + ts := make([]incept.Ticket, 0) + c.Check(s.cli.CreateTickets(&ts, 5), ErrorMatches, `client must have a valid admin API key`) + c.Check(ts, DeepEquals, []incept.Ticket{}) + + s.cli.APIKey = ak + c.Assert(s.cli.CreateTickets(&ts, 5), IsNil) + c.Check(len(ts), Equals, 5) + for _, t := range ts { + c.Check(len(t), Equals, 16) + uu, err := uuid.FromBytes(t.Bytes()) + c.Check(err, IsNil) + c.Check(uuid.Equal(uu, uuid.UUID(t)), Equals, true) + // It's a valid UUID + } +} + +func (s *ClientSuite) TestCreateLogin(c *C) { + var ts []incept.Ticket + c.Assert(s.cli.CreateTickets(&ts, 1), IsNil) + c.Check(len(ts), Equals, 1) + + l := new(auth.Login) + ticket := ts[0].String() + c.Assert(s.cli.CreateLogin(l, ticket, "bodie", "hello"), IsNil) + c.Check(l, DeepEquals, &auth.Login{ + User: users.User{ + Name: "bodie", + Coin: 0, + }, + }) + + // Does it work? + bs := sha256.Sum256([]byte("hello")) + c.Assert(s.cli.Login(&auth.Login{ + User: users.User{Name: "bodie"}, + PWHash: bs[:], + }), IsNil) + sesh := s.cli.State.Session + c.Check(sesh.Expiration, Not(Equals), time.Time{}) + c.Check(sesh.Token, NotNil) +} + +func (s *ClientSuite) TestLogin(c *C) { + var ts []incept.Ticket + c.Assert(s.cli.CreateTickets(&ts, 1), IsNil) + c.Check(len(ts), Equals, 1) + + l := new(auth.Login) + ticket := ts[0].String() + c.Assert(s.cli.CreateLogin(l, ticket, "bodie", "hello"), IsNil) + c.Check(l, DeepEquals, &auth.Login{ + User: users.User{ + Name: "bodie", + Coin: 0, + }, + }) + + bs := sha256.Sum256([]byte("hello")) + c.Assert(s.cli.Login(&auth.Login{ + User: users.User{Name: "bodie"}, + PWHash: bs[:], + }), IsNil) + sesh := s.cli.State.Session + c.Check(sesh.Expiration, Not(Equals), time.Time{}) +} + +func (s *ClientSuite) TestGetProfile(c *C) { + var ts []incept.Ticket + c.Assert(s.cli.CreateTickets(&ts, 1), IsNil) + c.Check(len(ts), Equals, 1) + + l := new(auth.Login) + ticket := ts[0].String() + c.Assert(s.cli.CreateLogin(l, ticket, "bodie", "hello"), IsNil) + c.Check(l, DeepEquals, &auth.Login{ + User: users.User{ + Name: "bodie", + Coin: 0, + }, + }) + + bs := sha256.Sum256([]byte("hello")) + c.Assert(s.cli.Login(&auth.Login{ + User: users.User{Name: "bodie"}, + PWHash: bs[:], + }), IsNil) + + u := new(users.User) + c.Assert(s.cli.GetProfile(u), IsNil) + c.Check(*u, DeepEquals, l.User) +} + +func (s *ClientSuite) TestDeleteProfile(c *C) { + var ts []incept.Ticket + c.Assert(s.cli.CreateTickets(&ts, 1), IsNil) + c.Check(len(ts), Equals, 1) + + l := new(auth.Login) + ticket := ts[0].String() + c.Assert(s.cli.CreateLogin(l, ticket, "bodie", "hello"), IsNil) + c.Check(l, DeepEquals, &auth.Login{ + User: users.User{ + Name: "bodie", + Coin: 0, + }, + }) + + bs := sha256.Sum256([]byte("hello")) + c.Assert(s.cli.Login(&auth.Login{ + User: users.User{Name: "bodie"}, + PWHash: bs[:], + }), IsNil) + + u := new(users.User) + c.Assert(s.cli.GetProfile(u), IsNil) + c.Check(*u, DeepEquals, l.User) + + c.Assert(s.cli.DeleteProfile(), IsNil) + c.Check(s.db.View(users.CheckNotExist(u.Name)), IsNil) +} diff --git a/client/common_test.go b/client/common_test.go new file mode 100644 index 0000000..59b2d6e --- /dev/null +++ b/client/common_test.go @@ -0,0 +1,71 @@ +package client_test + +import ( + "bytes" + "encoding/base64" + htt "net/http/httptest" + "net/url" + "os" + "testing" + + "github.com/synapse-garden/sg-proto/auth" + "github.com/synapse-garden/sg-proto/client" + "github.com/synapse-garden/sg-proto/rest" + sgt "github.com/synapse-garden/sg-proto/testing" + + "github.com/boltdb/bolt" + uuid "github.com/satori/go.uuid" + . "gopkg.in/check.v1" +) + +func Test(t *testing.T) { TestingT(t) } + +type ClientSuite struct { + db *bolt.DB + tmpDir string + + srv *htt.Server + cli *client.Client + + rbuf, wbuf *bytes.Buffer +} + +var _ = Suite(&ClientSuite{}) + +var src = rest.SourceInfo{ + License: "Affero GPL V3", + LicensedTo: "SynapseGarden 2017", + Location: "https://github.com/synapse-garden/sg-proto", +} + +func (s *ClientSuite) SetUpTest(c *C) { + db, tmpDir, err := sgt.TempDB("synapse-test") + c.Assert(err, IsNil) + key := uuid.NewV4() + r, cleanups, err := rest.Bind(db, src, auth.Token(key.Bytes())) + defer cleanups.Cleanup() + c.Assert(err, IsNil) + srv := htt.NewServer(r) + u, err := url.Parse(srv.URL) + c.Assert(err, IsNil) + rbuf, wbuf := new(bytes.Buffer), new(bytes.Buffer) + cli := &client.Client{ + Backend: u, + APIKey: base64.StdEncoding.EncodeToString(key.Bytes()), + State: client.MakeState(rbuf, wbuf), + } + + s.tmpDir = tmpDir + s.db, s.srv, s.cli = db, srv, cli +} + +func (s *ClientSuite) TearDownTest(c *C) { + if db := s.db; db != nil { + c.Assert(sgt.Cleanup(db), IsNil) + c.Assert(os.Remove(s.tmpDir), IsNil) + } + + if srv := s.srv; srv != nil { + srv.Close() + } +} diff --git a/client/state.go b/client/state.go new file mode 100644 index 0000000..8641bc0 --- /dev/null +++ b/client/state.go @@ -0,0 +1,52 @@ +package client + +import ( + "bufio" + "io" + + "github.com/synapse-garden/sg-proto/auth" + "github.com/synapse-garden/sg-proto/users" +) + +// State models a Client's side effects, including buffered output. +type State struct { + Session *auth.Session + User *users.User + + *bufio.Writer + *bufio.Scanner +} + +var ( + _ = io.ReaderFrom(State{}) + _ = io.Writer(State{}) +) + +// MakeState returns a new State made from the given Reader and Writer. +func MakeState(w io.Writer, r io.Reader) State { + return State{ + Session: new(auth.Session), + Writer: bufio.NewWriter(w), + Scanner: bufio.NewScanner(r), + } +} + +// ReadFrom implements ReaderFrom on State using its underlying Writer, +// which is flushed after reading. +func (s State) ReadFrom(r io.Reader) (int64, error) { + n, err := s.Writer.ReadFrom(r) + if err != nil { + return n, err + } + return n, s.Writer.Flush() +} + +// Write implements io.Writer on State using its underlying Writer, +// which is flushed after writing. +func (s State) Write(bs []byte) (int, error) { + n, err := s.Writer.Write(bs) + if err != nil { + return n, err + } + return int(n), s.Writer.Flush() +} diff --git a/client/stream.go b/client/stream.go new file mode 100644 index 0000000..4d374f9 --- /dev/null +++ b/client/stream.go @@ -0,0 +1,34 @@ +package client + +import ( + "github.com/synapse-garden/sg-proto/stream" + "github.com/synapse-garden/sg-proto/users" + + "github.com/pkg/errors" +) + +// NewStream POSTs a new Stream to the given Client, with the given +// Name, owned by "from", with "from" and "to" both having read and +// write access. It returns the new Stream or any error. +func NewStream(c *Client, name, from, to string) (*stream.Stream, error) { + // Create Stream if not exist + str := &stream.Stream{ + Group: users.Group{ + Owner: from, + Readers: map[string]bool{ + from: true, + to: true, + }, + Writers: map[string]bool{ + from: true, + to: true, + }, + }, + + Name: name, + } + if err := c.CreateStream(str); err != nil { + return nil, errors.Wrap(err, "failed to create Stream") + } + return str, nil +} diff --git a/client/util.go b/client/util.go new file mode 100644 index 0000000..b66c71b --- /dev/null +++ b/client/util.go @@ -0,0 +1,206 @@ +package client + +import ( + "bytes" + "crypto/tls" + "crypto/x509" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + + "github.com/synapse-garden/sg-proto/auth" + + "github.com/pkg/errors" +) + +var customClient = &http.Client{} + +// SetCustomCert sets the internal HTTP Client's TLS config to accept +// the passed certificate bytes. It is not safe to do this concurrently +// with HTTP requests in this package. Any error will reset the +// internal Client back to its defaults. +func SetCustomCert(cert []byte) error { + roots := x509.NewCertPool() + if !roots.AppendCertsFromPEM(cert) { + return errors.New("failed to append cert") + } + + customClient.Transport = &http.Transport{ + TLSClientConfig: &tls.Config{RootCAs: roots}, + } + + return nil +} + +// DecodeDelete makes an HTTP DELETE request to the given resource under +// the given RequestTransforms. +func DecodeDelete(resource string, xfs ...RequestTransform) error { + req, err := http.NewRequest(http.MethodDelete, resource, nil) + if err != nil { + return errors.Wrap(err, "failed to create HTTP request") + } + + if res, err := customClient.Do(transform(req, xfs...)); err != nil { + return errors.Wrap(err, "failed to make HTTP request") + } else if stat := res.StatusCode; stat != http.StatusOK { + err := errors.Errorf("HTTP request failed with status %d (%s)", + stat, http.StatusText(stat), + ) + defer res.Body.Close() + bs, e := ioutil.ReadAll(res.Body) + if e != nil { + return errors.Wrapf(err, "failed to read error body after bad request: %s", e.Error()) + } + + return errors.Wrap(err, fmt.Sprintf("%#q", bs)) + } else { + return res.Body.Close() + } +} + +// DecodePost makes an HTTP POST to the given resource using a JSON +// marshaled request body from 'body', and applying xfs to the request. +func DecodePost( + v interface{}, + resource string, + body interface{}, + xfs ...RequestTransform, +) error { + var bs []byte + if body != nil { + var err error + bs, err = json.Marshal(body) + if err != nil { + return errors.Wrap(err, "failed to marshal JSON body") + } + } + req, err := http.NewRequest(http.MethodPost, resource, bytes.NewBuffer(bs)) + if err != nil { + return errors.Wrap(err, "failed to create HTTP request") + } + + if res, err := customClient.Do(transform(req, xfs...)); err != nil { + return errors.Wrap(err, "failed to make HTTP request") + } else if stat := res.StatusCode; stat != http.StatusOK { + err := errors.Errorf("HTTP request failed with status %d (%s)", + stat, http.StatusText(stat), + ) + defer res.Body.Close() + bs, e := ioutil.ReadAll(res.Body) + if e != nil { + return errors.Wrapf(err, "failed to read error body after bad request: %s", e.Error()) + } + + return errors.Wrap(err, fmt.Sprintf("%#q", bs)) + } else { + defer res.Body.Close() + return json.NewDecoder(res.Body).Decode(v) + } +} + +// DecodeGet unmarshals the given resource (after applying xfs) into v +// using an HTTP GET. +func DecodeGet( + v interface{}, + resource string, + xfs ...RequestTransform, +) error { + req, err := http.NewRequest(http.MethodGet, resource, nil) + if err != nil { + return errors.Wrap(err, "failed to create HTTP request") + } + if res, err := customClient.Do(transform(req, xfs...)); err != nil { + return errors.Wrap(err, "failed to make HTTP request") + } else if stat := res.StatusCode; stat != http.StatusOK { + err := errors.Errorf("HTTP request failed with status %d (%s)", + stat, http.StatusText(stat), + ) + defer res.Body.Close() + bs, e := ioutil.ReadAll(res.Body) + if e != nil { + return errors.Wrapf(err, "failed to read error body after bad request: %s", e.Error()) + } + + return errors.Wrap(err, fmt.Sprintf("%#q", bs)) + } else { + defer res.Body.Close() + return json.NewDecoder(res.Body).Decode(v) + } +} + +// Delete makes an HTTP DELETE request on the given resource using xfs. +func Delete(resource string, xfs ...RequestTransform) error { + req, err := http.NewRequest(http.MethodDelete, resource, nil) + if err != nil { + return errors.Wrap(err, "failed to create HTTP request") + } + if res, err := customClient.Do(transform(req, xfs...)); err != nil { + return errors.Wrap(err, "failed to make HTTP request") + } else if stat := res.StatusCode; stat != http.StatusOK { + err := errors.Errorf("HTTP request failed with status %d (%s)", + stat, http.StatusText(stat), + ) + defer res.Body.Close() + bs, e := ioutil.ReadAll(res.Body) + if e != nil { + return errors.Wrapf(err, "failed to read error body after bad request: %s", e.Error()) + } + + return errors.Wrap(err, fmt.Sprintf("%#q", bs)) + } else { + return res.Body.Close() + } +} + +// Param is a URL param builder. Use ApplyParams to get the parameterized +// URL. +type Param fmt.Stringer + +// Filter is a Param for filtering by keyword. +type Filter string + +// String implements Param on Filter. +func (f Filter) String() string { + return "filter=" + string(f) +} + +// ApplyParams applies the given Params to generate a sequence of URL parameters. +func ApplyParams(ps ...Param) string { + if len(ps) == 0 { + return "" + } + str := bytes.NewBufferString("?") + for _, p := range ps { + str.WriteString("&" + p.String()) + } + return str.String() +} + +// A RequestTransform can be applied to transform an *http.Request. +type RequestTransform func(*http.Request) *http.Request + +func transform(req *http.Request, xfs ...RequestTransform) *http.Request { + for _, xf := range xfs { + req = xf(req) + } + return req +} + +// AuthHeader adds an Authorization header of the given TokenType +// (auth.BearerType or auth.RefreshType.) +func AuthHeader(t auth.TokenType, tk auth.Token) RequestTransform { + return func(req *http.Request) *http.Request { + req.Header.Add("Authorization", t.String()+" "+tk.String()) + return req + } +} + +// AdminHeader adds an Authorization header of type Admin using the +// given API key. +func AdminHeader(apiKey string) RequestTransform { + return func(req *http.Request) *http.Request { + req.Header.Add("Authorization", "Admin "+apiKey) + return req + } +} diff --git a/cmd/cmd.go b/cmd/cmd.go new file mode 100644 index 0000000..06bddce --- /dev/null +++ b/cmd/cmd.go @@ -0,0 +1,531 @@ +package cmd + +import ( + "crypto/sha256" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "strconv" + "strings" + "time" + + "github.com/synapse-garden/sg-proto/auth" + "github.com/synapse-garden/sg-proto/client" + "github.com/synapse-garden/sg-proto/incept" + "github.com/synapse-garden/sg-proto/rest" + "github.com/synapse-garden/sg-proto/stream" + "github.com/synapse-garden/sg-proto/users" + + "github.com/pkg/errors" + uuid "github.com/satori/go.uuid" + xws "golang.org/x/net/websocket" +) + +const help = `SG Help: + - help -- Show this help message + - quit -- Exit SG + - login -- Log into your account + - incept -- Create a new user + - profile -- Check profile + - profile delete -- Delete profile + - stream -- Open a stream to username + - admin key -- Log in as admin +` + +const adminHelp = help + ` + - admin tickets [n] -- Create [n or 1] tickets + - admin tickets get [n] -- Get the first [n or 10] tickets +` + +type Command func(*client.Client) error + +func Confirm(text string, comm Command) Command { + return GetInput( + fmt.Sprintf("really %s? (y/n)", text), + func(s string) Command { + switch s { + case "y", "Y", "yes": + return comm + case "n", "N", "no": + return OutputString("canceled\n") + default: + // TODO: improve UX without recursing + // Confirm(...)(c) + return OutputStringf("Invalid option "+ + "%#q; input a valid option.\n", s) + } + }) +} + +func GetInput(prompt string, comm func(string) Command) Command { + return func(c *client.Client) error { + if err := OutputStringf("%s: ", prompt)(c); err != nil { + return err + } + s := c.State + if !s.Scan() { + return OutputString("failed to scan input")(c) + } + return comm(s.Text())(c) + } +} + +func OutputFrom(r io.Reader) Command { + return func(c *client.Client) error { + _, err := io.Copy(c.State, r) + return err + } +} + +func OutputString(vals ...interface{}) Command { + return func(c *client.Client) error { + _, err := fmt.Fprint(c.State, vals...) + return err + } +} + +func OutputStringf(f string, vals ...interface{}) Command { + return func(c *client.Client) error { + _, err := fmt.Fprintf(c.State, f, vals...) + return err + } +} + +func OutputJSON(f string, val interface{}) Command { + return func(c *client.Client) error { + if _, err := fmt.Fprintln(c.State, f); err != nil { + return err + } + return json.NewEncoder(c.State).Encode(val) + } +} + +func OutputError(err error, f string) Command { + return func(c *client.Client) error { + _, err := fmt.Fprint(c.State, + errors.Wrap(err, f).Error()+"\n") + return err + } +} + +func OutputErrorf(err error, f string, vals ...interface{}) Command { + return func(c *client.Client) error { + _, err := fmt.Fprintf(c.State, + errors.Wrapf(err, f, vals...).Error()) + return err + } +} + +func OutputHelp(topics ...string) Command { + return func(c *client.Client) error { + h := help + if c.APIKey != "" { + h = adminHelp + } + if len(topics) == 0 { + return OutputStringf(h)(c) + } + switch topics[0] { + case "login": + return OutputStringf(h)(c) + case "stream", "streams": + return OutputStringf("usage: login \n")(c) + default: + return OutputStringf("unknown command %#q\n%s", topics[0], h)(c) + } + } +} + +func Info(c *client.Client) error { + info := new(rest.SourceInfo) + if err := c.Info(info); err != nil { + return errors.Wrap(err, "failed to get backend /source info") + } + return OutputStringf( + "SG is online:\nLicensed to %s under %s.\nSource "+ + "location: %s\n\n", + info.LicensedTo, + info.License, + info.Location, + )(c) +} + +func Incept(tkt, name, pw string) Command { + return func(c *client.Client) error { + if _, err := uuid.FromString(tkt); err != nil { + return OutputErrorf(err, "invalid ticket %#q", tkt)(c) + } + l := new(auth.Login) + if err := c.CreateLogin(l, tkt, name, pw); err != nil { + return OutputError(err, "failed to create user")(c) + } + + return OutputJSON("New user created: ", l.User)(c) + } +} + +func Login(user, pw string) Command { + return func(c *client.Client) error { + pwbs := sha256.Sum256([]byte(pw)) + l := &auth.Login{ + User: users.User{Name: user}, + PWHash: pwbs[:], + } + if err := c.Login(l); err != nil { + return OutputError(err, "failed to create user")(c) + } + return OutputStringf("user %s logged in\n", l.Name)(c) + } +} + +func LoggedIn(c *client.Client) error { + switch { + case c.State.Session.Token == nil: + return errors.New("no session") + case time.Now().After(c.State.Session.Expiration): + return errors.New("session timed out") + } + return nil +} + +func Logout(c *client.Client) error { + if err := LoggedIn(c); err != nil { + return OutputError(err, "not logged in")(c) + } else if err := c.Logout(); err != nil { + return OutputError(err, "failed to log out")(c) + } + c.State.Session = new(auth.Session) + return OutputString("logged out successfully")(c) +} + +func DeleteProfile(c *client.Client) error { + if err := LoggedIn(c); err != nil { + return OutputError(err, "not logged in")(c) + } else if err = c.DeleteProfile(); err != nil { + return OutputError(err, "failed to delete profile")(c) + } + c.State.Session = new(auth.Session) + return OutputString("profile deleted successfully")(c) +} + +func Profile(c *client.Client) error { + if err := LoggedIn(c); err != nil { + return OutputError(err, "not logged in")(c) + } + user := new(users.User) + if err := c.GetProfile(user); err != nil { + return OutputError(err, "failed to get profile")(c) + } + + return OutputJSON("User: ", user)(c) +} + +func getStreamByName(c *client.Client, name string) (*stream.Stream, error) { + strs := new([]*stream.Stream) + // Find or create a stream for the user. + if err := c.AllStreams(strs); err != nil { + return nil, err + } + for _, s := range *strs { + if s.Name == name { + return s, nil + } + } + return nil, errors.Errorf("stream %#q not found", name) +} + +func getStreamByID(c *client.Client, uu uuid.UUID) (*stream.Stream, error) { + str := new(stream.Stream) + return str, c.GetStream(str, uu.String()) +} + +// getStream gets a Stream which has the given name and participants. +// If no such Stream exists, it POSTs and returns a new one for the user. +func getStream( + c *client.Client, + name, from, to string, +) (*stream.Stream, error) { + strs := new([]*stream.Stream) + // Find or create a stream for the user. + if err := c.AllStreams(strs); err != nil { + return nil, err + } + for _, s := range *strs { + if s.Name != name { + continue + } + + rsTo := s.Readers[to] + fromOwner := s.Owner == from + wsTo, wsFrom := s.Writers[to], s.Writers[from] + if (wsFrom || fromOwner) && (rsTo || wsTo) { + return s, nil + } + } + + return client.NewStream(c, name, from, to) +} + +func Stream(which string, to ...string) Command { + return func(c *client.Client) error { + if err := LoggedIn(c); err != nil { + return OutputError(err, "not logged in")(c) + } + user := c.State.User + if user == nil { + user = new(users.User) + if err := c.GetProfile(user); err != nil { + return OutputError(err, "failed to get profile")(c) + } + c.State.User = user + } + + uu := uuid.FromStringOrNil(which) + var str *stream.Stream + var err error + + switch { + case strings.HasPrefix(which, "#"): + // User wants a stream by Name + str, err = getStreamByName(c, which) + case !uuid.Equal(uu, uuid.Nil): + // User wants a stream by ID + str, err = getStreamByID(c, uu) + default: + // The passed value was not a UUID or a + // #chatname, so it was a read / write user's + // name. + // + // User wants to find or create a stream. + str, err = getStream(c, + "#chat", + user.Name, which, + ) + } + + if err != nil { + return OutputError(err, "failed to get Stream")(c) + } + if err := OutputStringf("joining stream %#q\n", str.ID)(c); err != nil { + return OutputError(err, "failed to output")(c) + } + + conn, err := c.GetStreamWS(str.ID) + if err != nil { + return OutputError(err, "failed to open websocket")(c) + } + + s := c.State + if err := OutputStringf("streaming on %s\n", str.Name)(c); err != nil { + return err + } + + errs := make(chan error, 10) + go func() { + for s.Scan() { + err = xws.JSON.Send(conn, &stream.Message{ + Content: s.Text(), + }) + if err != nil { + select { + case errs <- errors.Wrap(err, "send"): + default: + } + + return + } + } + }() + go func() { + msg := new(stream.Message) + enc := json.NewEncoder(s) + for { + if err := xws.JSON.Receive(conn, msg); err != nil { + select { + case errs <- errors.Wrap(err, "recieve"): + default: + } + + return + } + if err = enc.Encode(msg); err != nil { + select { + case errs <- errors.Wrap(err, "encode"): + default: + } + + return + } + } + }() + defer close(errs) + return <-errs + } +} + +func Quit(_ *client.Client) error { + return ErrQuit +} + +var ErrQuit = errors.New("goodbye") + +// GetCommand returns a Command function, including help messages. +func GetCommand(args ...string) Command { + switch { + case len(args) < 1: + return OutputHelp() + } + + cmd, opts := args[0], args[1:] + switch cmd { + case "h", "help": + return OutputHelp(opts...) + case "quit", "q", "x", "exit", "bye": + return Quit + case "incept": + if len(opts) != 3 { + return OutputHelp("incept") + } + return Incept(opts[0], opts[1], opts[2]) + case "login": + switch len(opts) { + case 1: + return GetInput("enter password", func(s string) Command { + return Login(opts[0], s) + }) + case 2: + return Login(opts[0], opts[1]) + default: + return OutputHelp("login") + } + case "logout": + return Logout + case "admin": + return AdminCommand(opts...) + case "do": + return OutputHelp() + case "profile": + switch len(opts) { + case 0: + return Profile + case 1: + return Confirm("delete profile", DeleteProfile) + default: + return OutputHelp(cmd) + } + case "stream", "streams": + if len(opts) != 1 { + return OutputHelp(cmd) + } + return Stream(opts[0]) + default: + return OutputHelp(cmd) + } +} + +// GetTickets creates n new tickets using the given Admin key. +func GetTickets(n int) Command { + return func(c *client.Client) error { + tkts := new([]incept.Ticket) + if err := c.GetTickets(tkts, n); err != nil { + return OutputErrorf(err, "failed to get %d ticket(s)", n)(c) + } + + return OutputJSON("got tickets:\n", tkts)(c) + } +} + +// Tickets creates n new tickets using the given Admin key. +func Tickets(n int) Command { + return func(c *client.Client) error { + tkts := new([]incept.Ticket) + if err := c.CreateTickets(tkts, n); err != nil { + return OutputErrorf(err, "failed to create %d ticket(s)", n)(c) + } + + return OutputJSON("created tickets:\n", tkts)(c) + } +} + +func SetAPI(key string) Command { + return func(c *client.Client) error { + if bs, err := base64.StdEncoding.DecodeString(key); err != nil { + return OutputErrorf(err, "invalid admin key %#q", key)(c) + } else if _, err := uuid.FromBytes(bs); err != nil { + return OutputErrorf(err, "invalid admin key %#q", key)(c) + } + + if err := c.VerifyAdmin(key); err != nil { + return OutputErrorf(err, "invalid admin key %#q", key)(c) + } + + c.APIKey = key + return OutputString("API key verified")(c) + } +} + +// AdminCommand gets an Admin sub-command. +func AdminCommand(args ...string) Command { + switch { + case len(args) < 1: + return OutputHelp("admin") + } + + cmd, opts := args[0], args[1:] + switch cmd { + // case "profile", "profiles": + // switch len(opts) { + // case 0: + // case + case "key", "api-key", "api": + if len(opts) != 1 { + return OutputHelp(cmd) + } + return SetAPI(opts[0]) + case "ticket", "tickets": + switch len(opts) { + case 0: + return Tickets(1) + case 1: + if opts[0] == "get" { + return GetTickets(10) + } + + n, err := strconv.Atoi(opts[0]) + switch { + case err != nil: + return OutputStringf( + "must provide integer 'n': %s", + err.Error(), + ) + case n < 1: + return OutputStringf("must provide positive 'n'") + + } + return Tickets(n) + + case 2: + if opts[0] != "get" { + return OutputHelp(cmd) + } + + n, err := strconv.Atoi(opts[1]) + switch { + case err != nil: + return OutputStringf( + "must provide integer 'n': %s", + err.Error(), + ) + case n < 1: + return OutputStringf("must provide positive 'n'") + + } + return GetTickets(n) + } + fallthrough + + default: + return OutputHelp(cmd) + } +} diff --git a/convo/convo_test.go b/convo/convo_test.go index ce0db88..9e85d12 100644 --- a/convo/convo_test.go +++ b/convo/convo_test.go @@ -36,7 +36,7 @@ func (s *ConvoSuite) SetUpTest(c *C) { func (s *ConvoSuite) TearDownTest(c *C) { if db := s.db; db != nil { - c.Assert(sgt.CleanupDB(db), IsNil) + c.Assert(sgt.Cleanup(db), IsNil) c.Assert(os.Remove(s.tmpDir), IsNil) } } diff --git a/incept/incept.go b/incept/incept.go index a3aebf1..df6db11 100644 --- a/incept/incept.go +++ b/incept/incept.go @@ -3,6 +3,7 @@ package incept import ( "encoding/json" "fmt" + "io" "github.com/synapse-garden/sg-proto/auth" "github.com/synapse-garden/sg-proto/store" @@ -58,6 +59,30 @@ func NewTicket(t Ticket) func(*bolt.Tx) error { return store.Put(TicketBucket, t.Bytes(), nil) } +func GetTickets(n int) func(*bolt.Tx) ([]Ticket, error) { + return func(tx *bolt.Tx) (tkts []Ticket, e error) { + e = store.ForEach(TicketBucket, func(k, _ []byte) error { + if len(tkts) >= n { + return io.EOF + } + + uu, err := uuid.FromBytes(k) + if err != nil { + return err + } + + tkts = append(tkts, Ticket(uu)) + return nil + })(tx) + + if e == io.EOF { + e = nil + } + + return + } +} + func CheckTicketExist(key Ticket) func(*bolt.Tx) error { return func(tx *bolt.Tx) error { err := store.CheckExists(TicketBucket, key.Bytes())(tx) diff --git a/incept/incept_test.go b/incept/incept_test.go index 69b593d..1439870 100644 --- a/incept/incept_test.go +++ b/incept/incept_test.go @@ -35,7 +35,7 @@ func (s *InceptSuite) SetUpTest(c *C) { func (s *InceptSuite) TearDownTest(c *C) { if db := s.db; db != nil { - c.Assert(sgt.CleanupDB(db), IsNil) + c.Assert(sgt.Cleanup(db), IsNil) c.Assert(os.Remove(s.tmpDir), IsNil) } } diff --git a/notif/notif_test.go b/notif/notif_test.go index e7f4a7f..81bc04f 100644 --- a/notif/notif_test.go +++ b/notif/notif_test.go @@ -39,7 +39,7 @@ func (s *NotifSuite) SetUpTest(c *C) { func (s *NotifSuite) TearDownTest(c *C) { if db := s.db; db != nil { - c.Assert(sgt.CleanupDB(db), IsNil) + c.Assert(sgt.Cleanup(db), IsNil) c.Assert(os.Remove(s.tmpDir), IsNil) } } diff --git a/rest/admin.go b/rest/admin.go index dadce80..186614b 100644 --- a/rest/admin.go +++ b/rest/admin.go @@ -36,25 +36,17 @@ type Admin struct { } // Bind implements API.Bind on Admin. -func (a *Admin) Bind(r *htr.Router) error { +func (a *Admin) Bind(r *htr.Router) (Cleanup, error) { db := a.DB if db == nil { - return errors.New("Admin DB handle must not be nil") - } - - err := db.Update(func(tx *bolt.Tx) (e error) { - a.Pub, e = river.NewPub(AdminNotifs, NotifStream, tx) - return - }) - if err != nil { - return err + return nil, errors.New("nil Admin DB handle") } if a.Token != nil { // User wants to create a new token. err := db.Update(admin.NewToken(a.Token)) if err != nil { - return err + return nil, err } } else if err := db.View(admin.CheckExists); err != nil { switch err.(type) { @@ -64,14 +56,23 @@ func (a *Admin) Bind(r *htr.Router) error { base64.StdEncoding.EncodeToString(newToken)) err = db.Update(admin.NewToken(newToken)) if err != nil { - return err + return nil, err } default: - return errors.Wrap(err, "failed to check for existing admin key") + return nil, errors.Wrap(err, "failed to check for existing admin key") } } + err := db.Update(func(tx *bolt.Tx) (e error) { + a.Pub, e = river.NewPub(AdminNotifs, NotifStream, tx) + return + }) + if err != nil { + return nil, err + } + r.GET("/admin/verify", mw.AuthAdmin(a.Verify, db)) + r.GET("/admin/tickets", mw.AuthAdmin(a.GetTickets, db)) r.POST("/admin/tickets", mw.AuthAdmin(a.NewTicket, db)) r.GET("/admin/profiles", mw.AuthAdmin(a.GetAllProfiles, db)) // PATCH /admin/profiles/bodie?addCoin=1000 (or -1000) @@ -81,7 +82,17 @@ func (a *Admin) Bind(r *htr.Router) error { r.DELETE("/admin/tickets/:ticket", mw.AuthAdmin(a.DeleteTicket, db)) r.DELETE("/admin/users/:user_id", mw.AuthAdmin(a.DeleteUser, db)) - return nil + return a.Cleanup, nil +} + +// Cleanup closes the Admin's Pub river and deletes it from the DB. +func (a Admin) Cleanup() error { + if err := a.Pub.Close(); err != nil { + return err + } + return a.Update(func(tx *bolt.Tx) error { + return river.DeletePub(AdminNotifs, NotifStream, tx) + }) } func (Admin) Verify(w http.ResponseWriter, r *http.Request, _ htr.Params) { @@ -90,6 +101,48 @@ func (Admin) Verify(w http.ResponseWriter, r *http.Request, _ htr.Params) { } } +func (a Admin) GetTickets(w http.ResponseWriter, r *http.Request, ps htr.Params) { + var ( + countStr = r.FormValue("count") + count = 20 + err error + ) + + if len(countStr) != 0 { + count, err = strconv.Atoi(countStr) + switch { + case err != nil: + http.Error(w, errors.Wrapf(err, fmt.Sprintf( + `invalid "count" value %#q`, countStr, + )).Error(), http.StatusBadRequest) + return + case count < 1: + http.Error(w, `invalid "count" value < 1`, http.StatusBadRequest) + return + } + } + + var tkts []incept.Ticket + err = a.View(func(tx *bolt.Tx) (e error) { + tkts, e = incept.GetTickets(count)(tx) + return + }) + + if err != nil { + http.Error(w, errors.Wrap( + err, "failed to get tickets", + ).Error(), http.StatusInternalServerError) + return + } + + if err := json.NewEncoder(w).Encode(tkts); err != nil { + http.Error(w, errors.Wrap( + err, "failed to write response", + ).Error(), http.StatusInternalServerError) + return + } +} + func (a Admin) NewTicket(w http.ResponseWriter, r *http.Request, _ htr.Params) { var ( countStr = r.FormValue("count") diff --git a/rest/admin_test.go b/rest/admin_test.go index 31b7937..93cc442 100644 --- a/rest/admin_test.go +++ b/rest/admin_test.go @@ -14,11 +14,9 @@ import ( "github.com/synapse-garden/sg-proto/rest" "github.com/synapse-garden/sg-proto/store" "github.com/synapse-garden/sg-proto/stream" - "github.com/synapse-garden/sg-proto/stream/river" sgt "github.com/synapse-garden/sg-proto/testing" "github.com/synapse-garden/sg-proto/users" - "github.com/boltdb/bolt" "github.com/davecgh/go-spew/spew" htr "github.com/julienschmidt/httprouter" uuid "github.com/satori/go.uuid" @@ -32,7 +30,7 @@ func prepAdminAPI(c *C, r *htr.Router, api *rest.Admin, users ...string, -) (*htt.Server, map[string]auth.Token) { +) (*htt.Server, rest.Cleanup, map[string]auth.Token) { tokens := make(map[string]auth.Token) for _, user := range users { @@ -43,39 +41,37 @@ func prepAdminAPI(c *C, tokens[user] = sesh.Token } - c.Assert(api.Bind(r), IsNil) - c.Assert(rest.Token{api.DB}.Bind(r), IsNil) - c.Assert(rest.Profile{api.DB}.Bind(r), IsNil) + cc, err := api.Bind(r) + defer func() { + if c.Failed() { + cc() + } + }() + c.Assert(err, IsNil) + _, err = rest.Token{api.DB}.Bind(r) + c.Assert(err, IsNil) + _, err = rest.Profile{api.DB}.Bind(r) + c.Assert(err, IsNil) // Make a testing server to run it. - return htt.NewServer(r), tokens -} - -func cleanupAdminAPI(c *C, api *rest.Admin) { - c.Assert(api.Pub.Close(), IsNil) - c.Assert(api.Update(func(tx *bolt.Tx) error { - return river.DeletePub(rest.AdminNotifs, rest.NotifStream, tx) - }), IsNil) + return htt.NewServer(r), cc, tokens } func (s *RESTSuite) TestAdminNilDB(c *C) { - c.Assert( - new(rest.Admin).Bind(htr.New()), - ErrorMatches, - "Admin DB handle must not be nil", - ) + _, err := new(rest.Admin).Bind(nil) + c.Assert(err, ErrorMatches, "nil Admin DB handle") } func (s *RESTSuite) TestAdminGetAllProfiles(c *C) { var ( - tokenUUID = uuid.NewV4() - adminKey = auth.Token(tokenUUID[:]) - api = &rest.Admin{Token: adminKey, DB: s.db} - r = htr.New() - srv, _ = prepAdminAPI(c, r, api, "bob", "bodie") + tokenUUID = uuid.NewV4() + adminKey = auth.Token(tokenUUID[:]) + api = &rest.Admin{Token: adminKey, DB: s.db} + r = htr.New() + srv, cc, _ = prepAdminAPI(c, r, api, "bob", "bodie") ) defer srv.Close() - defer cleanupAdminAPI(c, api) + defer cc() uu := uuid.NewV4() tok := auth.Token(uu[:]) @@ -119,14 +115,14 @@ func (s *RESTSuite) TestAdminGetAllProfiles(c *C) { func (s *RESTSuite) TestAdminNewLoginErrors(c *C) { var ( - tokenUUID = uuid.NewV4() - adminKey = auth.Token(tokenUUID[:]) - api = &rest.Admin{Token: adminKey, DB: s.db} - r = htr.New() - srv, tokens = prepAdminAPI(c, r, api, "bob", "bodie") + tokenUUID = uuid.NewV4() + adminKey = auth.Token(tokenUUID[:]) + api = &rest.Admin{Token: adminKey, DB: s.db} + r = htr.New() + srv, cc, tokens = prepAdminAPI(c, r, api, "bob", "bodie") ) defer srv.Close() - defer cleanupAdminAPI(c, api) + defer cc() uu := uuid.NewV4() tok := auth.Token(uu[:]) @@ -203,17 +199,17 @@ func (s *RESTSuite) TestAdminNewLoginErrors(c *C) { func (s *RESTSuite) TestAdminNewLoginWorks(c *C) { var ( - tokenUUID = uuid.NewV4() - adminKey = auth.Token(tokenUUID[:]) - api = &rest.Admin{Token: adminKey, DB: s.db} - r = htr.New() - srv, _ = prepAdminAPI(c, r, api) + tokenUUID = uuid.NewV4() + adminKey = auth.Token(tokenUUID[:]) + api = &rest.Admin{Token: adminKey, DB: s.db} + r = htr.New() + srv, cc, _ = prepAdminAPI(c, r, api) intoUser = new(users.User) intoSession = new(auth.Session) ) defer srv.Close() - defer cleanupAdminAPI(c, api) + defer cc() newLogin := &auth.Login{ User: users.User{Name: "bodo"}, @@ -254,14 +250,16 @@ func (s *RESTSuite) TestAdminNewLoginWorks(c *C) { func (s *RESTSuite) TestAdminPatchProfile(c *C) { var ( - tokenUUID = uuid.NewV4() - adminKey = auth.Token(tokenUUID[:]) - api = &rest.Admin{Token: adminKey, DB: s.db} - r = htr.New() - srv, tokens = prepAdminAPI(c, r, api, "bob", "bodie") - notifErr = rest.Notif{DB: s.db}.Bind(r) + tokenUUID = uuid.NewV4() + adminKey = auth.Token(tokenUUID[:]) + api = &rest.Admin{Token: adminKey, DB: s.db} + r = htr.New() + srv, cc, tokens = prepAdminAPI(c, r, api, "bob", "bodie") + _, notifErr = rest.Notif{DB: s.db}.Bind(r) ) defer srv.Close() + defer cc() + c.Assert(notifErr, IsNil) // Get websocket connection for "bodie". @@ -281,8 +279,6 @@ func (s *RESTSuite) TestAdminPatchProfile(c *C) { defer func() { c.Assert(connBodie.Close(), IsNil) c.Assert(connBob.Close(), IsNil) - - cleanupAdminAPI(c, api) }() uu := uuid.NewV4() @@ -441,20 +437,95 @@ func (s *RESTSuite) TestAdminPatchProfile(c *C) { } } +func (s *RESTSuite) TestAdminGetTickets(c *C) { + var ( + tokenUUID = uuid.NewV4() + adminKey = auth.Token(tokenUUID[:]) + api = &rest.Admin{Token: adminKey, DB: s.db} + r = htr.New() + cc, err = api.Bind(r) + ) + + c.Assert(err, IsNil) + defer cc() + + c.Log("GET /admin/tickets returns empty array") + + c.Assert(sgt.ExpectResponse(r, + "/admin/tickets", "GET", nil, + new([]incept.Ticket), new([]incept.Ticket), + 200, + sgt.Admin(adminKey), + ), IsNil) + + var ( + t1 = incept.Ticket(uuid.NewV4()) + t2 = incept.Ticket(uuid.NewV4()) + expect = []incept.Ticket{t1, t2} + ) + if t2.String() < t1.String() { + expect = []incept.Ticket{t2, t1} + } + + c.Assert(s.db.Update(incept.NewTickets(t1, t2)), IsNil) + + for i, test := range []struct { + should string + + verb, path string + header http.Header + expectStatus int + into, expectResp interface{} + + expectNotifs map[*ws.Conn][]*store.ResourceBox + expectHeaders []http.Header + }{{ + should: "reject wrong HTTP method", + verb: "DELETE", path: "/admin/tickets", + header: sgt.Admin(adminKey), + expectStatus: http.StatusMethodNotAllowed, + into: new(string), + expectResp: "Method Not Allowed\n", + expectHeaders: []http.Header{ + sgt.FailHeader, + sgt.Options("GET", "POST", "OPTIONS"), + }, + }, { + should: "return expected tickets", + verb: "GET", path: "/admin/tickets", + header: sgt.Admin(adminKey), + expectStatus: http.StatusOK, + into: new([]incept.Ticket), + expectResp: &expect, + }} { + c.Logf("test %d: %s on %s should %s", i, + test.verb, test.path, + test.should, + ) + c.Assert(sgt.ExpectResponse(r, + test.path, test.verb, nil, + test.into, test.expectResp, + test.expectStatus, + test.header, + test.expectHeaders..., + ), IsNil) + } +} + func (s *RESTSuite) TestAdminDeleteUser(c *C) { var ( - tokenUUID = uuid.NewV4() - adminKey = auth.Token(tokenUUID[:]) - api = &rest.Admin{Token: adminKey, DB: s.db} - r = htr.New() - srv, tokens = prepAdminAPI(c, r, api, "bob", "bodie") - convoAPI = &rest.Convo{DB: s.db} - convoErr = convoAPI.Bind(r) - notifErr = rest.Notif{DB: s.db}.Bind(r) + tokenUUID = uuid.NewV4() + adminKey = auth.Token(tokenUUID[:]) + api = &rest.Admin{Token: adminKey, DB: s.db} + r = htr.New() + srv, cc, tokens = prepAdminAPI(c, r, api, "bob", "bodie") + convoAPI = &rest.Convo{DB: s.db} + cc2, convoErr = convoAPI.Bind(r) + _, notifErr = rest.Notif{DB: s.db}.Bind(r) ) defer srv.Close() - defer cleanupConvoAPI(c, *convoAPI) - defer cleanupAdminAPI(c, api) + defer cc() + defer cc2() c.Assert(convoErr, IsNil) c.Assert(notifErr, IsNil) @@ -555,7 +626,8 @@ func (s *RESTSuite) TestAdminDeleteUser(c *C) { c.Assert(s.db.Update(incept.NewTickets(tick)), IsNil) // Bind the Incept API for testing. - c.Assert(rest.Incept{DB: api.DB}.Bind(r), IsNil) + _, err = rest.Incept{DB: api.DB}.Bind(r) + c.Assert(err, IsNil) // bodie's login is disabled; new bodie user cannot be created; // bodie's sessions are cleared. diff --git a/rest/convo.go b/rest/convo.go index d6c7c16..58896a4 100644 --- a/rest/convo.go +++ b/rest/convo.go @@ -32,10 +32,10 @@ type Convo struct { } // Bind implements API.Bind on Convo. -func (c *Convo) Bind(r *htr.Router) error { +func (c *Convo) Bind(r *htr.Router) (Cleanup, error) { db := c.DB if db == nil { - return errors.New("Convo DB handle must not be nil") + return nil, errors.New("nil Convo DB handle") } err := db.Update(func(tx *bolt.Tx) (e error) { @@ -43,7 +43,7 @@ func (c *Convo) Bind(r *htr.Router) error { return }) if err != nil { - return err + return nil, err } r.GET("/convos/:convo_id/start", mw.AuthWSUser( @@ -81,7 +81,18 @@ func (c *Convo) Bind(r *htr.Router) error { db, mw.CtxSetUserID, )) - return nil + return c.Cleanup, nil +} + +// Cleanup closes the Convo's Pub river and deletes it from the DB. +func (c Convo) Cleanup() error { + if err := c.Pub.Close(); err != nil { + return err + } + + return c.Update(func(tx *bolt.Tx) error { + return river.DeletePub(ConvoNotifs, NotifStream, tx) + }) } // Connect is a Handle which opens and binds a WebSocket session to a diff --git a/rest/convo_test.go b/rest/convo_test.go index 46bb64a..ce67ca9 100644 --- a/rest/convo_test.go +++ b/rest/convo_test.go @@ -12,22 +12,22 @@ import ( "github.com/synapse-garden/sg-proto/convo" "github.com/synapse-garden/sg-proto/rest" "github.com/synapse-garden/sg-proto/stream" - "github.com/synapse-garden/sg-proto/stream/river" sgt "github.com/synapse-garden/sg-proto/testing" "github.com/synapse-garden/sg-proto/users" - "github.com/boltdb/bolt" "github.com/julienschmidt/httprouter" uuid "github.com/satori/go.uuid" ws "golang.org/x/net/websocket" . "gopkg.in/check.v1" ) +var _ = rest.API(new(rest.Convo)) + func prepConvoAPI(c *C, r *httprouter.Router, api *rest.Convo, names ...string, -) (*htt.Server, map[string]auth.Token) { +) (*htt.Server, rest.Cleanup, map[string]auth.Token) { tokens := make(map[string]auth.Token) for _, user := range names { @@ -38,24 +38,19 @@ func prepConvoAPI(c *C, tokens[user] = sesh.Token } - c.Assert(api.Bind(r), IsNil) + cc, err := api.Bind(r) + c.Assert(err, IsNil) // Make a testing server to run it. - return htt.NewServer(r), tokens -} - -func cleanupConvoAPI(c *C, api rest.Convo) { - c.Assert(api.Pub.Close(), IsNil) - c.Assert(api.Update(func(tx *bolt.Tx) error { - return river.DeletePub(rest.ConvoNotifs, rest.NotifStream, tx) - }), IsNil) + return htt.NewServer(r), cc, tokens } func (s *RESTSuite) TestConvoCreate(c *C) { r := httprouter.New() api := rest.Convo{DB: s.db} - srv, tokens := prepConvoAPI(c, r, &api, "bodie", "bob", "jim") + srv, cc, tokens := prepConvoAPI(c, r, &api, "bodie", "bob", "jim") defer srv.Close() + defer cc() conv := &convo.Convo{Group: users.Group{ Owner: "bodie", @@ -101,15 +96,14 @@ func (s *RESTSuite) TestConvoCreate(c *C) { http.StatusUnauthorized, sgt.Bearer(tokens["jim"]), ), IsNil) - - cleanupConvoAPI(c, api) } func (s *RESTSuite) TestConvoPut(c *C) { r := httprouter.New() api := rest.Convo{DB: s.db} - srv, tokens := prepConvoAPI(c, r, &api, "bodie", "bob", "jim") + srv, cc, tokens := prepConvoAPI(c, r, &api, "bodie", "bob", "jim") defer srv.Close() + defer cc() conv := &convo.Convo{Group: users.Group{ Owner: "bodie", @@ -222,15 +216,14 @@ func (s *RESTSuite) TestConvoPut(c *C) { c.Assert(connBodie.Close(), IsNil) c.Assert(connJim.Close(), IsNil) - - cleanupConvoAPI(c, api) } func (s *RESTSuite) TestConvoDelete(c *C) { r := httprouter.New() api := rest.Convo{DB: s.db} - srv, tokens := prepConvoAPI(c, r, &api, "bodie", "bob", "jim") + srv, cc, tokens := prepConvoAPI(c, r, &api, "bodie", "bob", "jim") defer srv.Close() + defer cc() conv := &convo.Convo{Group: users.Group{ Owner: "bodie", @@ -294,8 +287,6 @@ func (s *RESTSuite) TestConvoDelete(c *C) { http.StatusNotFound, sgt.Bearer(tokens["bodie"]), ), IsNil) - - cleanupConvoAPI(c, api) } // GET messages @@ -310,9 +301,9 @@ func (s *RESTSuite) TestConvoHangupWhileDelete(c *C) { // delete when the websocket is also being hung up. r := httprouter.New() api := rest.Convo{DB: s.db} - srv, tokens := prepConvoAPI(c, r, &api, "bodie") + srv, cc, tokens := prepConvoAPI(c, r, &api, "bodie") defer srv.Close() - defer cleanupConvoAPI(c, api) + defer cc() conv := &convo.Convo{Group: users.Group{ Owner: "bodie", diff --git a/rest/incept.go b/rest/incept.go index b703a88..2151792 100644 --- a/rest/incept.go +++ b/rest/incept.go @@ -21,12 +21,12 @@ type Incept struct { } // Bind implements API.Bind on Incept. -func (i Incept) Bind(r *httprouter.Router) error { +func (i Incept) Bind(r *httprouter.Router) (Cleanup, error) { if i.DB == nil { - return errors.New("Incept DB handle must not be nil") + return nil, errors.New("nil Incept DB handle") } r.POST("/incept/:key", i.Incept) - return nil + return nil, nil } func (i Incept) Incept(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { diff --git a/rest/incept_test.go b/rest/incept_test.go index 0157643..eb0484b 100644 --- a/rest/incept_test.go +++ b/rest/incept_test.go @@ -20,6 +20,8 @@ import ( . "gopkg.in/check.v1" ) +var _ = rest.API(rest.Incept{}) + func (s *RESTSuite) TestIncept(c *C) { correctURL := "/incept/" @@ -35,8 +37,10 @@ func (s *RESTSuite) TestIncept(c *C) { c.Assert(err, IsNil) correctBody := string(correctBodyBs) - tkts := make([]string, len(s.tickets)) - for i, t := range s.tickets { + ts := prepareTickets(c, s.db) + + tkts := make([]string, len(ts)) + for i, t := range ts { tkts[i] = t.String() } c.Logf("correct tickets:\n %+v", strings.Join(tkts, "\n ")) @@ -144,7 +148,8 @@ func (s *RESTSuite) TestIncept(c *C) { c.Logf(" Body: %#q", test.body) r := httprouter.New() - c.Assert(rest.Incept{DB: s.db}.Bind(r), IsNil) + _, err := rest.Incept{DB: s.db}.Bind(r) + c.Assert(err, IsNil) rdr := bytes.NewBufferString(test.body) req := htt.NewRequest(test.method, test.url, rdr) w := htt.NewRecorder() diff --git a/rest/middleware/auth_test.go b/rest/middleware/auth_test.go index fb69e35..cf799f1 100644 --- a/rest/middleware/auth_test.go +++ b/rest/middleware/auth_test.go @@ -48,7 +48,7 @@ func (s *MiddlewareSuite) SetUpTest(c *C) { func (s *MiddlewareSuite) TearDownTest(c *C) { if db := s.db; db != nil { - c.Assert(sgt.CleanupDB(db), IsNil) + c.Assert(sgt.Cleanup(db), IsNil) c.Assert(os.Remove(s.tmpDir), IsNil) } } diff --git a/rest/notif.go b/rest/notif.go index 7dfe9b9..230e6bb 100644 --- a/rest/notif.go +++ b/rest/notif.go @@ -27,13 +27,13 @@ type Notif struct { } // Bind implements API.Bind on Notif. -func (n Notif) Bind(r *htr.Router) error { +func (n Notif) Bind(r *htr.Router) (Cleanup, error) { if n.DB == nil { - return errors.New("Notif DB handle must not be nil") + return nil, errors.New("nil Notif DB handle") } // When a client wants to connect to notifs, use stream.NewSub. r.GET("/notifs", mw.AuthWSUser(n.Connect, n.DB, mw.CtxSetUserID)) - return nil + return nil, nil } // Connect binds a subscriber River and serves it over a Websocket. diff --git a/rest/notif_test.go b/rest/notif_test.go index 2a20d2c..044f210 100644 --- a/rest/notif_test.go +++ b/rest/notif_test.go @@ -37,8 +37,11 @@ func (s *RESTSuite) TestConnectNotifs(c *C) { User: users.User{Name: "bob"}, PWHash: []byte("12345"), } - c.Assert(incept.Incept(s.tickets[0], user1, s.db), IsNil) - c.Assert(incept.Incept(s.tickets[1], user2, s.db), IsNil) + + tkts := prepareTickets(c, s.db) + + c.Assert(incept.Incept(tkts[0], user1, s.db), IsNil) + c.Assert(incept.Incept(tkts[1], user2, s.db), IsNil) // Get a session token for each. sesh1, sesh2 := new(auth.Session), new(auth.Session) @@ -68,7 +71,8 @@ func (s *RESTSuite) TestConnectNotifs(c *C) { }), IsNil) r := httprouter.New() - c.Assert(rest.Notif{DB: s.db}.Bind(r), IsNil) + _, err := rest.Notif{DB: s.db}.Bind(r) + c.Assert(err, IsNil) // Make a testing server to run it. srv := httptest.NewServer(r) defer srv.Close() diff --git a/rest/profile.go b/rest/profile.go index 3a91e3e..7f6c835 100644 --- a/rest/profile.go +++ b/rest/profile.go @@ -26,15 +26,15 @@ type Profile struct { } // Bind implements API.Bind on Profile. -func (p Profile) Bind(r *htr.Router) error { +func (p Profile) Bind(r *htr.Router) (Cleanup, error) { db := p.DB if db == nil { - return errors.New("Profile DB handle must not be nil") + return nil, errors.New("nil Profile DB handle") } r.GET("/profile", mw.AuthUser(p.Get, db, mw.CtxSetUserID)) r.DELETE("/profile", mw.AuthUser(p.Delete, db, mw.CtxSetUserID)) - return nil + return nil, nil } // Get fetches the user's Profile by userID. diff --git a/rest/profile_test.go b/rest/profile_test.go index 071d2dc..1f82071 100644 --- a/rest/profile_test.go +++ b/rest/profile_test.go @@ -23,11 +23,13 @@ import ( . "gopkg.in/check.v1" ) +var _ = rest.API(rest.Profile{}) + func prepProfileAPI(c *C, r *htr.Router, api rest.Profile, users ...string, -) (*htt.Server, *rest.Convo, map[string]auth.Token) { +) (*htt.Server, rest.Cleanup, *rest.Convo, map[string]auth.Token) { tokens := make(map[string]auth.Token) for _, user := range users { @@ -38,32 +40,40 @@ func prepProfileAPI(c *C, tokens[user] = sesh.Token } - conv := &rest.Convo{DB: api.DB} + _, err := rest.Token{DB: api.DB}.Bind(r) + c.Assert(err, IsNil) - c.Assert(rest.Token{DB: api.DB}.Bind(r), IsNil) - c.Assert(conv.Bind(r), IsNil) - c.Assert(rest.Incept{DB: api.DB}.Bind(r), IsNil) - c.Assert(api.Bind(r), IsNil) + _, err = rest.Incept{DB: api.DB}.Bind(r) + c.Assert(err, IsNil) + + _, err = api.Bind(r) + c.Assert(err, IsNil) + + conv := &rest.Convo{DB: api.DB} + cc, err := conv.Bind(r) + c.Assert(err, IsNil) // Make a testing server to run it. - return htt.NewServer(r), conv, tokens + return htt.NewServer(r), cc, conv, tokens } func (s *RESTSuite) TestProfileBind(c *C) { + _, err := rest.Profile{}.Bind(nil) + c.Check(err, ErrorMatches, "nil Profile DB handle") r := htr.New() - c.Check(rest.Profile{}.Bind(r), ErrorMatches, ".*not be nil") - c.Check(rest.Profile{DB: s.db}.Bind(r), IsNil) + _, err = rest.Profile{DB: s.db}.Bind(r) + c.Check(err, IsNil) } func (s *RESTSuite) TestProfileGet(c *C) { var ( - api = rest.Profile{DB: s.db} - r = htr.New() - srv, conv, tokens = prepProfileAPI(c, r, api, "bob", "bodie") + api = rest.Profile{DB: s.db} + r = htr.New() + srv, cc, _, tokens = prepProfileAPI(c, r, api, "bob", "bodie") ) defer srv.Close() - defer cleanupConvoAPI(c, *conv) + defer cc() uu := uuid.NewV4() someToken := auth.Token(uu[:]) @@ -116,12 +126,12 @@ func (s *RESTSuite) TestProfileDelete(c *C) { // This test is supposed to show that after a user's profile is // deleted, that user's tokens are removed, and login disabled. var ( - api = rest.Profile{DB: s.db} - r = htr.New() - srv, conv, tokens = prepProfileAPI(c, r, api, "bob", "bodie") + api = rest.Profile{DB: s.db} + r = htr.New() + srv, cc, _, tokens = prepProfileAPI(c, r, api, "bob", "bodie") ) defer srv.Close() - defer cleanupConvoAPI(c, *conv) + defer cc() uu := uuid.NewV4() someToken := auth.Token(uu[:]) @@ -205,15 +215,17 @@ func (s *RESTSuite) TestProfileDeleteHangups(c *C) { // This test is supposed to create a convo, connect to it, and // show that when the profile is deleted, the user is hung up. var ( - api = rest.Profile{DB: s.db} - r = htr.New() - srv, conv, tokens = prepProfileAPI(c, r, api, "bob", "bodie") + api = rest.Profile{DB: s.db} + r = htr.New() + srv, cc, _, tokens = prepProfileAPI(c, r, api, "bob", "bodie") + notifs = rest.Notif{DB: s.db} + _, err = notifs.Bind(r) ) + defer srv.Close() - defer cleanupConvoAPI(c, *conv) + defer cc() - notifs := rest.Notif{DB: s.db} - c.Assert(notifs.Bind(r), IsNil) + c.Assert(err, IsNil) toPOST := &convo.Convo{Group: users.Group{ Owner: "bodie", @@ -293,10 +305,10 @@ func (s *RESTSuite) TestProfileDeleteHangups(c *C) { func (s *RESTSuite) TestProfileOptions(c *C) { var ( - api = rest.Profile{DB: s.db} - r = htr.New() - err = api.Bind(r) - srv = htt.NewServer(r) + api = rest.Profile{DB: s.db} + r = htr.New() + _, err = api.Bind(r) + srv = htt.NewServer(r) ) defer srv.Close() diff --git a/rest/rest.go b/rest/rest.go index 142ec48..9c134ad 100644 --- a/rest/rest.go +++ b/rest/rest.go @@ -14,6 +14,7 @@ import ( "github.com/boltdb/bolt" "github.com/julienschmidt/httprouter" + "github.com/pkg/errors" ) // Needed endpoints: @@ -43,9 +44,31 @@ import ( // - POST /todo/:id/complete => Get bounty if before due // API is a transform on an httprouter.Router, passing a DB for passing -// on to httprouter.Handles. +// on to httprouter.Handles. If a non-nil Cleanup is returned, it +// may be called to deallocate any resources the API acquired during +// Bind. type API interface { - Bind(*httprouter.Router) error + Bind(*httprouter.Router) (Cleanup, error) +} + +// Cleanup is a deferred cleanup function meant to deallocate any +// resources which an API acquires, such as sockets addresses. It may +// optionally be returned by API.Bind. +type Cleanup func() error + +// Cleanups gathers a slice of Cleanup in order to simplify cleanup. +type Cleanups []Cleanup + +// Cleanup runs Cleanup on each Cleanup in the Cleanups. It returns the +// first error encountered and stops. +func (c Cleanups) Cleanup() error { + for i, cc := range c { + if err := cc(); err != nil { + return errors.Wrapf(err, "cleanup %d failed", i) + } + } + + return nil } // Bind binds the API on the given DB. It sets up REST endpoints as needed. @@ -53,7 +76,7 @@ func Bind( db *bolt.DB, source SourceInfo, apiKey auth.Token, -) (*httprouter.Router, error) { +) (*httprouter.Router, Cleanups, error) { if err := db.Update(store.Wrap( store.Prep( admin.AdminBucket, @@ -73,9 +96,11 @@ func Bind( auth.ClearSessions, river.ClearRivers, )); err != nil { - return nil, err + return nil, nil, err } + var cleanups []Cleanup + htr := httprouter.New() for _, api := range []API{ source, @@ -91,10 +116,14 @@ func Bind( // Connect Notif last so Pubs are already registered. Notif{DB: db}, } { - if err := api.Bind(htr); err != nil { - return nil, err + c, err := api.Bind(htr) + switch { + case err != nil: + return nil, cleanups, err + case c != nil: + cleanups = append(cleanups, c) } } - return htr, nil + return htr, cleanups, nil } diff --git a/rest/rest_test.go b/rest/rest_test.go index 8122fac..cf4c8a3 100644 --- a/rest/rest_test.go +++ b/rest/rest_test.go @@ -26,9 +26,8 @@ import ( func Test(t *testing.T) { TestingT(t) } type RESTSuite struct { - db *bolt.DB - tmpDir string - tickets []incept.Ticket + db *bolt.DB + tmpDir string } var _ = Suite(new(RESTSuite)) @@ -58,21 +57,23 @@ func (s *RESTSuite) SetUpTest(c *C) { river.ClearRivers, )), IsNil) s.db, s.tmpDir = db, tmpDir +} +func prepareTickets(c *C, db *bolt.DB) []incept.Ticket { tkts := make([]incept.Ticket, 3) for i := range tkts { tkts[i] = incept.Ticket(uuid.NewV4()) } - c.Assert(s.db.Update(incept.NewTickets(tkts...)), IsNil) + c.Assert(db.Update(incept.NewTickets(tkts...)), IsNil) - s.tickets = tkts + return tkts } func (s *RESTSuite) TearDownTest(c *C) { runtime.Gosched() time.Sleep(sgt.CleanupWait) if db := s.db; db != nil { - c.Assert(sgt.CleanupDB(db), IsNil) + c.Assert(sgt.Cleanup(db), IsNil) c.Assert(os.Remove(s.tmpDir), IsNil) } } diff --git a/rest/source.go b/rest/source.go index 168966f..59fb7fe 100644 --- a/rest/source.go +++ b/rest/source.go @@ -19,10 +19,10 @@ type SourceInfo struct { } // Bind implements API.Bind on SourceInfo. -func (s SourceInfo) Bind(r *htr.Router) error { +func (s SourceInfo) Bind(r *htr.Router) (Cleanup, error) { bs, err := json.MarshalIndent(s, "", " ") if err != nil { - return err + return nil, err } r.GET("/source", func( w http.ResponseWriter, @@ -31,5 +31,5 @@ func (s SourceInfo) Bind(r *htr.Router) error { ) { w.Write(bs) }) - return nil + return nil, nil } diff --git a/rest/source_test.go b/rest/source_test.go new file mode 100644 index 0000000..e6fcb60 --- /dev/null +++ b/rest/source_test.go @@ -0,0 +1,5 @@ +package rest_test + +import "github.com/synapse-garden/sg-proto/rest" + +var _ = rest.API(rest.SourceInfo{}) diff --git a/rest/stream.go b/rest/stream.go index e8880be..d0c4720 100644 --- a/rest/stream.go +++ b/rest/stream.go @@ -32,10 +32,10 @@ type Stream struct { } // Bind implements API.Bind on Stream. -func (s *Stream) Bind(r *htr.Router) error { +func (s *Stream) Bind(r *htr.Router) (Cleanup, error) { db := s.DB if db == nil { - return errors.New("Stream DB handle must not be nil") + return nil, errors.New("nil Stream DB handle") } err := db.Update(func(tx *bolt.Tx) (e error) { @@ -43,7 +43,7 @@ func (s *Stream) Bind(r *htr.Router) error { return }) if err != nil { - return err + return nil, err } // vx.y.0: @@ -110,7 +110,18 @@ func (s *Stream) Bind(r *htr.Router) error { db, mw.CtxSetUserID, )) - return nil + return s.Cleanup, nil +} + +// Cleanup closes the Stream's notification Pub river. +func (s Stream) Cleanup() error { + if err := s.Pub.Close(); err != nil { + return err + } + + return s.Update(func(tx *bolt.Tx) error { + return river.DeletePub(StreamNotifs, NotifStream, tx) + }) } // Connect is a Handle which opens and binds a WebSocket session to a diff --git a/rest/stream_test.go b/rest/stream_test.go index 97192bc..dc549d0 100644 --- a/rest/stream_test.go +++ b/rest/stream_test.go @@ -20,6 +20,8 @@ import ( . "gopkg.in/check.v1" ) +var _ = rest.API(new(rest.Stream)) + func (s *RESTSuite) TestStream(c *C) { user1, err := sgt.MakeLogin("bodie", "hello", s.db) c.Assert(err, IsNil) @@ -35,11 +37,13 @@ func (s *RESTSuite) TestStream(c *C) { r := httprouter.New() api := &rest.Stream{DB: s.db} - c.Assert(api.Bind(r), IsNil) + cc, err := api.Bind(r) + c.Assert(err, IsNil) + defer cc() + // Make a testing server to run it. srv := htt.NewServer(r) defer srv.Close() - defer c.Assert(api.Pub.Close(), IsNil) id := uuid.NewV4() diff --git a/rest/task.go b/rest/task.go index 4b9c49b..2449947 100644 --- a/rest/task.go +++ b/rest/task.go @@ -32,9 +32,9 @@ type Task struct { util.Timer } -func (t *Task) Bind(r *htr.Router) error { +func (t *Task) Bind(r *htr.Router) (Cleanup, error) { if t.DB == nil { - return errors.New("Bind called with nil DB handle") + return nil, errors.New("nil Task DB handle") } err := t.Update(func(tx *bolt.Tx) (e error) { @@ -42,7 +42,7 @@ func (t *Task) Bind(r *htr.Router) error { return }) if err != nil { - return err + return nil, err } r.GET("/tasks", mw.AuthUser( @@ -75,10 +75,21 @@ func (t *Task) Bind(r *htr.Router) error { mw.CtxSetUserID, )) - return nil + return t.Cleanup, nil +} + +// Cleanup closes the Task's Pub river and deletes it from the DB. +func (t Task) Cleanup() error { + if err := t.Pub.Close(); err != nil { + return err + } + + return t.Update(func(tx *bolt.Tx) error { + return river.DeletePub(TaskNotifs, NotifStream, tx) + }) } -func (t *Task) GetAll(w http.ResponseWriter, r *http.Request, _ htr.Params) { +func (t Task) GetAll(w http.ResponseWriter, r *http.Request, _ htr.Params) { vals, err := url.ParseQuery(r.URL.RawQuery) if err != nil { http.Error(w, errors.Wrap( @@ -147,7 +158,7 @@ func (t *Task) GetAll(w http.ResponseWriter, r *http.Request, _ htr.Params) { json.NewEncoder(w).Encode(ts) } -func (t *Task) Create(w http.ResponseWriter, r *http.Request, _ htr.Params) { +func (t Task) Create(w http.ResponseWriter, r *http.Request, _ htr.Params) { userID := mw.CtxGetUserID(r) tsk := new(task.Task) @@ -234,7 +245,7 @@ func (t *Task) Create(w http.ResponseWriter, r *http.Request, _ htr.Params) { json.NewEncoder(w).Encode(tsk) } -func (t *Task) Get(w http.ResponseWriter, r *http.Request, ps htr.Params) { +func (t Task) Get(w http.ResponseWriter, r *http.Request, ps htr.Params) { userID := mw.CtxGetUserID(r) tIDString, err := uuid.FromString(ps.ByName("id")) if err != nil { @@ -264,7 +275,7 @@ func (t *Task) Get(w http.ResponseWriter, r *http.Request, ps htr.Params) { json.NewEncoder(w).Encode(tsk) } -func (t *Task) Delete(w http.ResponseWriter, r *http.Request, ps htr.Params) { +func (t Task) Delete(w http.ResponseWriter, r *http.Request, ps htr.Params) { userID := mw.CtxGetUserID(r) tIDString := ps.ByName("id") tUUID, err := uuid.FromString(tIDString) @@ -313,7 +324,7 @@ func (t *Task) Delete(w http.ResponseWriter, r *http.Request, ps htr.Params) { } } -func (t *Task) Put(w http.ResponseWriter, r *http.Request, ps htr.Params) { +func (t Task) Put(w http.ResponseWriter, r *http.Request, ps htr.Params) { userID := mw.CtxGetUserID(r) tIDString, err := uuid.FromString(ps.ByName("id")) if err != nil { diff --git a/rest/task_test.go b/rest/task_test.go index 6554988..96af15f 100644 --- a/rest/task_test.go +++ b/rest/task_test.go @@ -12,12 +12,10 @@ import ( "github.com/synapse-garden/sg-proto/auth" "github.com/synapse-garden/sg-proto/rest" "github.com/synapse-garden/sg-proto/store" - "github.com/synapse-garden/sg-proto/stream/river" "github.com/synapse-garden/sg-proto/task" sgt "github.com/synapse-garden/sg-proto/testing" "github.com/synapse-garden/sg-proto/users" - "github.com/boltdb/bolt" "github.com/davecgh/go-spew/spew" htr "github.com/julienschmidt/httprouter" uuid "github.com/satori/go.uuid" @@ -31,7 +29,7 @@ func prepTaskAPI(c *C, r *htr.Router, api *rest.Task, names ...string, -) (*htt.Server, map[string]auth.Token) { +) (*htt.Server, rest.Cleanup, map[string]auth.Token) { tokens := make(map[string]auth.Token) for _, user := range names { @@ -42,25 +40,16 @@ func prepTaskAPI(c *C, tokens[user] = sesh.Token } - c.Assert(api.Bind(r), IsNil) + cc, err := api.Bind(r) + c.Assert(err, IsNil) // Make a testing server to run it. - return htt.NewServer(r), tokens -} - -func cleanupTaskAPI(c *C, api *rest.Task) { - c.Assert(api.Pub.Close(), IsNil) - c.Assert(api.Update(func(tx *bolt.Tx) error { - return river.DeletePub(rest.TaskNotifs, rest.NotifStream, tx) - }), IsNil) + return htt.NewServer(r), cc, tokens } func (s *RESTSuite) TestTaskBind(c *C) { - c.Assert( - new(rest.Task).Bind(htr.New()), - ErrorMatches, - "Bind called with nil DB handle", - ) + _, err := new(rest.Task).Bind(nil) + c.Assert(err, ErrorMatches, "nil Task DB handle") var ( r = htr.New() @@ -69,13 +58,14 @@ func (s *RESTSuite) TestTaskBind(c *C) { someWhen = now.Add(2 * time.Hour) beforeNow = now.Add(-1 * time.Hour) - notifErr = rest.Notif{DB: s.db}.Bind(r) - api = &rest.Task{DB: s.db, Timer: sgt.Timer(now)} + _, notifErr = rest.Notif{DB: s.db}.Bind(r) + api = &rest.Task{DB: s.db, Timer: sgt.Timer(now)} - srv, tokens = prepTaskAPI(c, r, api, "bodie", "bob") + srv, cc, tokens = prepTaskAPI(c, r, api, "bodie", "bob") ) defer srv.Close() + defer cc() c.Assert(notifErr, IsNil) // Get websocket connection for "bodie". @@ -963,6 +953,4 @@ func (s *RESTSuite) TestTaskBind(c *C) { c.Assert(connBodie.Close(), IsNil) c.Assert(connBob.Close(), IsNil) - - cleanupTaskAPI(c, api) } diff --git a/rest/token.go b/rest/token.go index d09621f..92cd55c 100644 --- a/rest/token.go +++ b/rest/token.go @@ -18,14 +18,14 @@ import ( type Token struct{ *bolt.DB } // Bind implements API.Bind on Token. -func (t Token) Bind(r *htr.Router) error { +func (t Token) Bind(r *htr.Router) (Cleanup, error) { if t.DB == nil { - return errors.New("Token DB handle must not be nil") + return nil, errors.New("nil Token DB handle") } r.POST("/tokens", t.Create) r.DELETE("/tokens", mw.AuthUser(t.Delete, t.DB, mw.CtxSetToken)) - return nil + return nil, nil } func (t Token) Create(w http.ResponseWriter, r *http.Request, _ htr.Params) { diff --git a/rest/token_test.go b/rest/token_test.go index 54592fb..1047192 100644 --- a/rest/token_test.go +++ b/rest/token_test.go @@ -1,13 +1,13 @@ package rest_test import ( - // "github.com/synapse-garden/sg-proto/incept" - // "github.com/synapse-garden/sg-proto/users" + "github.com/synapse-garden/sg-proto/rest" - // "github.com/boltdb/bolt" . "gopkg.in/check.v1" ) +var _ = rest.API(rest.Token{}) + func (s *RESTSuite) TestToken(c *C) { // If a user's account does not exist, he cannot create a token. // If a user's account is disabled, he cannot create a token. diff --git a/sg/serve.go b/sg/serve.go index 5cbd853..100e4b0 100644 --- a/sg/serve.go +++ b/sg/serve.go @@ -16,7 +16,8 @@ func serveInsecure( addr, port string, source rest.SourceInfo, ) { - router, err := rest.Bind(db, source, apiKey) + router, cleanups, err := rest.Bind(db, source, apiKey) + defer cleanups.Cleanup() if err != nil { log.Fatalf("failed to bind on DB: %s", err.Error()) } @@ -31,7 +32,8 @@ func serveSecure( addr, port, cert, key string, source rest.SourceInfo, ) { - router, err := rest.Bind(db, source, apiKey) + router, cleanups, err := rest.Bind(db, source, apiKey) + defer cleanups.Cleanup() if err != nil { log.Fatalf("failed to bind on DB: %s", err.Error()) } @@ -50,7 +52,8 @@ func devServeInsecure( addr, port string, source rest.SourceInfo, ) { - router, err := rest.Bind(db, source, apiKey) + router, cleanups, err := rest.Bind(db, source, apiKey) + defer cleanups.Cleanup() if err != nil { log.Fatalf("failed to bind on DB: %s", err.Error()) } @@ -66,7 +69,8 @@ func devServeSecure( addr, port, cert, key string, source rest.SourceInfo, ) { - router, err := rest.Bind(db, source, apiKey) + router, cleanups, err := rest.Bind(db, source, apiKey) + defer cleanups.Cleanup() if err != nil { log.Fatalf("failed to bind on DB: %s", err.Error()) } diff --git a/store/store_test.go b/store/store_test.go index 328e2e2..c27503d 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -29,7 +29,7 @@ func (s *StoreSuite) SetUpTest(c *C) { } func (s *StoreSuite) TearDownTest(c *C) { - c.Assert(testing.CleanupDB(s.DB), IsNil) + c.Assert(testing.Cleanup(s.DB), IsNil) c.Assert(os.Remove(s.tmpDir), IsNil) } diff --git a/stream/common_test.go b/stream/common_test.go index 7cfbbfe..43a1d6d 100644 --- a/stream/common_test.go +++ b/stream/common_test.go @@ -33,7 +33,7 @@ func (s *StreamSuite) SetUpTest(c *C) { func (s *StreamSuite) TearDownTest(c *C) { if db := s.db; db != nil { - c.Assert(sgt.CleanupDB(db), IsNil) + c.Assert(sgt.Cleanup(db), IsNil) c.Assert(os.Remove(s.tmpDir), IsNil) } } diff --git a/stream/river/common_test.go b/stream/river/common_test.go index d2fb927..72715af 100644 --- a/stream/river/common_test.go +++ b/stream/river/common_test.go @@ -33,7 +33,7 @@ func (s *RiverSuite) SetUpTest(c *C) { func (s *RiverSuite) TearDownTest(c *C) { if db := s.db; db != nil { - c.Assert(sgt.CleanupDB(db), IsNil) + c.Assert(sgt.Cleanup(db), IsNil) c.Assert(os.Remove(s.tmpDir), IsNil) } } diff --git a/stream/river/pub.go b/stream/river/pub.go index 564f72f..560bf02 100644 --- a/stream/river/pub.go +++ b/stream/river/pub.go @@ -2,6 +2,7 @@ package river import ( "bytes" + "fmt" "github.com/synapse-garden/sg-proto/store" @@ -47,17 +48,22 @@ func NewPub(id, streamID string, tx *bolt.Tx) (r Pub, e error) { bID := []byte(id) if k, _ := b.Cursor().Seek(bID); bytes.Equal(k, bID) { - return nil, errExists(id) + return nil, errExists(fmt.Sprintf( + "pub %s/%s", streamID, id, + )) } if err = b.Put(bID, nil); err != nil { return nil, errors.Wrap(err, "failed to write river to DB") } - err = sock.Listen("inproc://" + streamID + "/" + id) + + addr := "inproc://" + streamID + "/" + id + + err = sock.Listen(addr) switch { case err == mangos.ErrAddrInUse: - return nil, errExists(id) + return nil, errExists("pub " + addr) case err != nil: return nil, errors.Wrap(err, "failed to start listening") diff --git a/stream/river/pub_test.go b/stream/river/pub_test.go index e3b272d..6ed58d1 100644 --- a/stream/river/pub_test.go +++ b/stream/river/pub_test.go @@ -47,7 +47,7 @@ func (s *RiverSuite) TestNewPub(c *C) { c.Assert(s.db.Update(func(tx *bolt.Tx) (e error) { _, e = river.NewPub("p1", "goodbye", tx) return - }), ErrorMatches, "river `p1` already exists") + }), ErrorMatches, "river `pub goodbye/p1` already exists") checkRivers(c, s.db, "goodbye", "p1", "p2") diff --git a/stream/river/river.go b/stream/river/river.go index 7330c40..4d96e6a 100644 --- a/stream/river/river.go +++ b/stream/river/river.go @@ -2,6 +2,7 @@ package river import ( "bytes" + "fmt" "github.com/boltdb/bolt" ) @@ -27,7 +28,7 @@ func CheckRiverNotExists(id, streamID string) func(*bolt.Tx) error { return nil } - return errExists(id) + return errExists(fmt.Sprintf("%s/%s", streamID, id)) } } diff --git a/stream/river/river_test.go b/stream/river/river_test.go index 646aa3b..3c89d12 100644 --- a/stream/river/river_test.go +++ b/stream/river/river_test.go @@ -196,7 +196,7 @@ func (s *RiverSuite) TestCheckRiverNotExists(c *C) { defer func() { c.Assert(r.Close(), IsNil) }() err = s.db.View(river.CheckRiverNotExists("hello", "goodbye")) - c.Check(err, DeepEquals, river.MakeRiverExistsErr("hello")) + c.Check(err, DeepEquals, river.MakeRiverExistsErr("goodbye/hello")) } func (s *RiverSuite) TestClearRivers(c *C) { diff --git a/synapse/.gitignore b/synapse/.gitignore new file mode 100644 index 0000000..1984bfd --- /dev/null +++ b/synapse/.gitignore @@ -0,0 +1,3 @@ +synapse* +!synapse.go + diff --git a/synapse/main.go b/synapse/main.go new file mode 100644 index 0000000..88e3ddf --- /dev/null +++ b/synapse/main.go @@ -0,0 +1,59 @@ +package main + +import ( + "bufio" + "flag" + "io/ioutil" + "log" + "net/url" + "os" + "strings" + + "github.com/synapse-garden/sg-proto/auth" + "github.com/synapse-garden/sg-proto/client" +) + +var ( + host = flag.String("host", "127.0.0.1", "the host to connect to") + port = flag.String("port", ":8080", "the port to connect on") + acceptCert = flag.String("acceptCert", "", "a custom cert to accept") + + secure = flag.Bool("secure", true, "whether to connect securely") +) + +func main() { + flag.Parse() + + if c := *acceptCert; c != "" { + if !strings.HasSuffix(c, "pem") { + log.Fatalf("custom cert must be a .pem file") + } + if certBytes, err := ioutil.ReadFile(c); err != nil { + log.Fatalf("failed to read custom cert: %s", err.Error()) + } else if err = client.SetCustomCert(certBytes); err != nil { + log.Fatalf("failed to set custom cert: %s", err.Error()) + } + } + + scheme := "http" + if *secure { + scheme = "https" + } + + hostURL, err := url.Parse(scheme + "://" + *host + *port) + if err != nil { + log.Fatalf("error parsing host URL: %s", err.Error()) + } + + cli := &client.Client{ + Backend: hostURL, + State: client.State{ + Session: new(auth.Session), + Writer: bufio.NewWriter(os.Stdout), + Scanner: bufio.NewScanner(os.Stdin), + }, + } + if err := RunWindow(cli); err != nil { + log.Fatalf("Synapse crashed: %s", err.Error()) + } +} diff --git a/synapse/run.go b/synapse/run.go new file mode 100644 index 0000000..222fe91 --- /dev/null +++ b/synapse/run.go @@ -0,0 +1,25 @@ +package main + +import ( + "strings" + + "github.com/synapse-garden/sg-proto/client" + "github.com/synapse-garden/sg-proto/cmd" +) + +// RunWindow runs the SG client terminal. +func RunWindow(cli *client.Client) error { + if err := cmd.Info(cli); err != nil { + return err + } + + for s := cli.State; s.Scan(); { + com := cmd.GetCommand(strings.Split(s.Text(), " ")...) + if err := com(cli); err == cmd.ErrQuit { + return cmd.OutputString("Bye!")(cli) + } else if err != nil { + return err + } + } + return nil +} diff --git a/task/common_test.go b/task/common_test.go index 76fba74..62fa5f2 100644 --- a/task/common_test.go +++ b/task/common_test.go @@ -39,6 +39,6 @@ func (s *TaskSuite) SetUpTest(c *C) { } func (s *TaskSuite) TearDownTest(c *C) { - c.Assert(sgt.CleanupDB(s.DB), IsNil) + c.Assert(sgt.Cleanup(s.DB), IsNil) c.Assert(os.Remove(s.tmpDir), IsNil) } diff --git a/testing/db.go b/testing/db.go index 2bbd700..904f302 100644 --- a/testing/db.go +++ b/testing/db.go @@ -5,6 +5,7 @@ import ( "os" "path/filepath" + "github.com/synapse-garden/sg-proto/rest" "github.com/synapse-garden/sg-proto/store" "github.com/boltdb/bolt" @@ -29,14 +30,22 @@ func TempDB(name string) (*bolt.DB, string, error) { return db, d, nil } -func CleanupDB(db *bolt.DB) error { +func Cleanup(db *bolt.DB, ccs ...rest.Cleanup) error { + if err := rest.Cleanups(ccs).Cleanup(); err != nil { + return errors.Wrap(err, "failed to clean up APIs") + } + path := db.Path() if err := db.Close(); err != nil { - return err + return errors.Wrap(err, "failed to close DB") } + if err := os.Remove(path); err != nil { - return err + return errors.Wrapf(err, + "failed to remove DB file %s", path, + ) } + return nil } diff --git a/text/text_test.go b/text/text_test.go index 2b7b5a4..b0523a7 100644 --- a/text/text_test.go +++ b/text/text_test.go @@ -37,7 +37,7 @@ func (s *TextSuite) SetUpTest(c *C) { } func (s *TextSuite) TearDownTest(c *C) { - c.Assert(sgt.CleanupDB(s.DB), IsNil) + c.Assert(sgt.Cleanup(s.DB), IsNil) c.Assert(os.Remove(s.tmpDir), IsNil) } diff --git a/users/users_test.go b/users/users_test.go index 981383a..6879f3e 100644 --- a/users/users_test.go +++ b/users/users_test.go @@ -33,7 +33,7 @@ func (s *UsersSuite) SetUpTest(c *C) { } func (s *UsersSuite) TearDownTest(c *C) { - c.Assert(testing.CleanupDB(s.DB), IsNil) + c.Assert(testing.Cleanup(s.DB), IsNil) c.Assert(os.Remove(s.tmpDir), IsNil) } diff --git a/vendor/vendor.json b/vendor/vendor.json index 018226e..1b4e0e1 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -79,6 +79,56 @@ "revisionTime": "2016-09-12T15:30:41Z", "tree": true }, + { + { + "checksumSHA1": "ztypj7pDqHM1CVbBfHcPRp1n+bI=", + "path": "github.com/droundy/goopt", + "revision": "e5c9ab183a102ef3ad2363b04483784258984157", + "revisionTime": "2015-10-12T15:57:52Z", + "tree": true + }, + { + "checksumSHA1": "iJdDp5m6MIjMPKCOxFMvl7jEJ/Y=", + "path": "github.com/go-mangos/mangos", + "revision": "c3916ba23b3aa721fd13b4dec916b2d15f4a2520", + "revisionTime": "2016-07-19T02:05:16Z", + "tree": true + }, + { + "checksumSHA1": "bCABisMlBcpDa8h9npMeYZVcHHk=", + "path": "github.com/go-mangos/mangos/protocol/bus", + "revision": "c3916ba23b3aa721fd13b4dec916b2d15f4a2520", + "revisionTime": "2016-07-19T02:05:16Z", + "tree": true + }, + { + "checksumSHA1": "MmHghN8jA3dtjd5O17EKE5I+ODE=", + "path": "github.com/go-mangos/mangos/protocol/pub", + "revision": "c3916ba23b3aa721fd13b4dec916b2d15f4a2520", + "revisionTime": "2016-07-19T02:05:16Z", + "tree": true + }, + { + "checksumSHA1": "Jq6TpwsJA3GZI9ZciK/cmG1dYrM=", + "path": "github.com/go-mangos/mangos/protocol/sub", + "revision": "c3916ba23b3aa721fd13b4dec916b2d15f4a2520", + "revisionTime": "2016-07-19T02:05:16Z", + "tree": true + }, + { + "checksumSHA1": "7NRp/7iQJtQ8HcjwXGtAQzva/b8=", + "path": "github.com/go-mangos/mangos/transport/inproc", + "revision": "c3916ba23b3aa721fd13b4dec916b2d15f4a2520", + "revisionTime": "2016-07-19T02:05:16Z", + "tree": true + }, + { + "checksumSHA1": "VKgJ7n+9zWTjTv696ilbvLOkDpA=", + "path": "github.com/gorilla/websocket", + "revision": "2d1e4548da234d9cb742cc3628556fef86aafbac", + "revisionTime": "2016-09-12T15:30:41Z", + "tree": true + }, { "checksumSHA1": "Drsz3oT4NmM2tVQCRk+occxVvPA=", "path": "github.com/julienschmidt/httprouter", @@ -114,6 +164,13 @@ "revisionTime": "2016-09-12T21:59:12Z", "tree": true }, + { + "checksumSHA1": "lz2tjPljCQa8GsdeTPUj9VdkL1A=", + "path": "golang.org/x/net/context", + "revision": "71a035914f99bb58fe82eac0f1289f10963d876c", + "revisionTime": "2016-09-12T21:59:12Z", + "tree": true + }, { "checksumSHA1": "likOl7O0BJntqagR050kkOBuY4o=", "path": "golang.org/x/net/websocket",