Skip to content

Commit

Permalink
Merge pull request #6661 from TheThingsNetwork/feature/console-ws-eve…
Browse files Browse the repository at this point in the history
…nts-os

Console internal events API
  • Loading branch information
adriansmares authored Oct 30, 2023
2 parents 424bb46 + addcfe5 commit 29e889b
Show file tree
Hide file tree
Showing 20 changed files with 2,094 additions and 94 deletions.
84 changes: 60 additions & 24 deletions config/messages.json
Original file line number Diff line number Diff line change
Expand Up @@ -3509,6 +3509,42 @@
"file": "shared.go"
}
},
"error:pkg/console/internal/events/protocol:message_type": {
"translations": {
"en": "invalid message type `{type}`"
},
"description": {
"package": "pkg/console/internal/events/protocol",
"file": "protocol.go"
}
},
"error:pkg/console/internal/events/subscriptions:already_subscribed": {
"translations": {
"en": "already subscribed with ID `{id}`"
},
"description": {
"package": "pkg/console/internal/events/subscriptions",
"file": "subscriptions.go"
}
},
"error:pkg/console/internal/events/subscriptions:no_identifiers": {
"translations": {
"en": "no identifiers"
},
"description": {
"package": "pkg/console/internal/events/subscriptions",
"file": "subscriptions.go"
}
},
"error:pkg/console/internal/events/subscriptions:not_subscribed": {
"translations": {
"en": "not subscribed with ID `{id}`"
},
"description": {
"package": "pkg/console/internal/events/subscriptions",
"file": "subscriptions.go"
}
},
"error:pkg/crypto/cryptoservices:no_app_key": {
"translations": {
"en": "no AppKey specified"
Expand Down Expand Up @@ -4382,67 +4418,67 @@
"file": "conversion.go"
}
},
"error:pkg/events/grpc:invalid_regexp": {
"error:pkg/events/grpc:no_identifiers": {
"translations": {
"en": "invalid regexp"
"en": "no identifiers"
},
"description": {
"package": "pkg/events/grpc",
"file": "grpc.go"
}
},
"error:pkg/events/grpc:no_identifiers": {
"error:pkg/events/grpc:storage_disabled": {
"translations": {
"en": "no identifiers"
"en": "events storage is not not enabled"
},
"description": {
"package": "pkg/events/grpc",
"file": "grpc.go"
}
},
"error:pkg/events/grpc:no_matching_events": {
"error:pkg/events/redis:channel_closed": {
"translations": {
"en": "no matching events for regexp `{regexp}`"
"en": "channel closed"
},
"description": {
"package": "pkg/events/grpc",
"file": "grpc.go"
"package": "pkg/events/redis",
"file": "redis.go"
}
},
"error:pkg/events/grpc:storage_disabled": {
"error:pkg/events/redis:unknown_encoding": {
"translations": {
"en": "events storage is not not enabled"
"en": "unknown encoding"
},
"description": {
"package": "pkg/events/grpc",
"file": "grpc.go"
"package": "pkg/events/redis",
"file": "codec.go"
}
},
"error:pkg/events/grpc:unknown_event_name": {
"error:pkg/events:invalid_regexp": {
"translations": {
"en": "unknown event `{name}`"
"en": "invalid regexp"
},
"description": {
"package": "pkg/events/grpc",
"file": "grpc.go"
"package": "pkg/events",
"file": "pattern.go"
}
},
"error:pkg/events/redis:channel_closed": {
"error:pkg/events:no_matching_events": {
"translations": {
"en": "channel closed"
"en": "no matching events for regexp `{regexp}`"
},
"description": {
"package": "pkg/events/redis",
"file": "redis.go"
"package": "pkg/events",
"file": "pattern.go"
}
},
"error:pkg/events/redis:unknown_encoding": {
"error:pkg/events:unknown_event_name": {
"translations": {
"en": "unknown encoding"
"en": "unknown event `{name}`"
},
"description": {
"package": "pkg/events/redis",
"file": "codec.go"
"package": "pkg/events",
"file": "pattern.go"
}
},
"error:pkg/fetch:fetch_file": {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ require (
gopkg.in/mail.v2 v2.3.1
gopkg.in/square/go-jose.v2 v2.6.0
gopkg.in/yaml.v2 v2.4.0
nhooyr.io/websocket v1.8.10
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,8 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
mellium.im/sasl v0.3.1 h1:wE0LW6g7U83vhvxjC1IY8DnXM+EU095yeo8XClvCdfo=
mellium.im/sasl v0.3.1/go.mod h1:xm59PUYpZHhgQ9ZqoJ5QaCqzWMi8IeS49dhp6plPCzw=
nhooyr.io/websocket v1.8.10 h1:mv4p+MnGrLDcPlBoWsvPP7XCzTYMXP9F9eIGoKbgx7Q=
nhooyr.io/websocket v1.8.10/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
6 changes: 2 additions & 4 deletions pkg/auth/rights/auth_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,14 @@ func AuthInfo(ctx context.Context) (authInfo *ttnpb.AuthInfoResponse, err error)

var errUnauthenticated = errors.DefineUnauthenticated("unauthenticated", "unauthenticated")

// RequireAuthentication confirms if the authentication information within a context contains any rights, if so,
// RequireAuthenticated confirms if the authentication information within a context contains any rights, if so,
// the request is considered to be authenticated.
func RequireAuthentication(ctx context.Context) error {
log.FromContext(ctx).Debug("Authenticate request")
func RequireAuthenticated(ctx context.Context) error {
authInfo, err := AuthInfo(ctx)
if err != nil {
log.FromContext(ctx).WithError(err).Debug("Failed to validate authentication information")
return errUnauthenticated.WithCause(err)
}

if authInfo.GetAccessMethod() == nil && len(authInfo.GetUniversalRights().GetRights()) == 0 {
return errUnauthenticated.New()
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/gorilla/csrf"
"github.com/gorilla/mux"
"go.thethings.network/lorawan-stack/v3/pkg/component"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events"
"go.thethings.network/lorawan-stack/v3/pkg/web"
"go.thethings.network/lorawan-stack/v3/pkg/web/oauthclient"
"go.thethings.network/lorawan-stack/v3/pkg/webhandlers"
Expand Down Expand Up @@ -58,6 +59,7 @@ func New(c *component.Component, config Config) (*Console, error) {
}

c.RegisterWeb(console)
c.RegisterWeb(events.New(c))

return console, nil
}
Expand Down
135 changes: 135 additions & 0 deletions pkg/console/internal/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright © 2023 The Things Network Foundation, The Things Industries B.V.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package events contains the internal events APi for the Console.
package events

import (
"context"
"net/http"
"sync"

"github.com/gorilla/mux"
"go.thethings.network/lorawan-stack/v3/pkg/auth/rights"
"go.thethings.network/lorawan-stack/v3/pkg/config"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/eventsmux"
"go.thethings.network/lorawan-stack/v3/pkg/console/internal/events/subscriptions"
"go.thethings.network/lorawan-stack/v3/pkg/events"
"go.thethings.network/lorawan-stack/v3/pkg/log"
"go.thethings.network/lorawan-stack/v3/pkg/ratelimit"
"go.thethings.network/lorawan-stack/v3/pkg/task"
"go.thethings.network/lorawan-stack/v3/pkg/ttnpb"
"go.thethings.network/lorawan-stack/v3/pkg/web"
"go.thethings.network/lorawan-stack/v3/pkg/webhandlers"
"go.thethings.network/lorawan-stack/v3/pkg/webmiddleware"
"nhooyr.io/websocket"
)

// Component is the interface of the component to the events API handler.
type Component interface {
task.Starter
Context() context.Context
RateLimiter() ratelimit.Interface
GetBaseConfig(context.Context) config.ServiceBase
}

type eventsHandler struct {
component Component
subscriber events.Subscriber
definedNames map[string]struct{}
}

var _ web.Registerer = (*eventsHandler)(nil)

func (h *eventsHandler) RegisterRoutes(server *web.Server) {
router := server.APIRouter().PathPrefix(ttnpb.HTTPAPIPrefix + "/console/internal/events/").Subrouter()
router.Use(
mux.MiddlewareFunc(webmiddleware.Namespace("console/internal/events")),
ratelimit.HTTPMiddleware(h.component.RateLimiter(), "http:console:internal:events"),
mux.MiddlewareFunc(webmiddleware.Metadata("Authorization")),
)
router.Path("/").HandlerFunc(h.handleEvents).Methods(http.MethodGet)
}

func (h *eventsHandler) handleEvents(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
logger := log.FromContext(ctx)

if err := rights.RequireAuthenticated(ctx); err != nil {
webhandlers.Error(w, r, err)
return
}

conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{
InsecureSkipVerify: true, // CORS is not enabled for APIs.
CompressionMode: websocket.CompressionContextTakeover,
})
if err != nil {
logger.WithError(err).Debug("Failed to accept WebSocket")
return
}
defer conn.Close(websocket.StatusNormalClosure, "main task closed")

ctx, cancel := context.WithCancelCause(ctx)
defer cancel(nil)

var wg sync.WaitGroup
defer wg.Wait()

m := eventsmux.New(func(ctx context.Context, cancel func(error)) subscriptions.Interface {
return subscriptions.New(ctx, cancel, h.subscriber, h.definedNames, h.component)
})
for name, f := range map[string]func(context.Context) error{
"console_events_mux": makeMuxTask(m, cancel),
"console_events_read": makeReadTask(conn, m, cancel),
"console_events_write": makeWriteTask(conn, m, cancel),
} {
wg.Add(1)
h.component.StartTask(&task.Config{
Context: ctx,
ID: name,
Func: f,
Done: wg.Done,
Restart: task.RestartNever,
Backoff: task.DefaultBackoffConfig,
})
}
}

// Option configures the events API handler.
type Option func(*eventsHandler)

// WithSubscriber configures the Subscriber to use for events.
func WithSubscriber(subscriber events.Subscriber) Option {
return func(h *eventsHandler) {
h.subscriber = subscriber
}
}

// New returns an events API handler for the Console.
func New(c Component, opts ...Option) web.Registerer {
definedNames := make(map[string]struct{})
for _, def := range events.All().Definitions() {
definedNames[def.Name()] = struct{}{}
}
h := &eventsHandler{
component: c,
subscriber: events.DefaultPubSub(),
definedNames: definedNames,
}
for _, opt := range opts {
opt(h)
}
return h
}
Loading

0 comments on commit 29e889b

Please sign in to comment.