Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds FilteredNotifier #547

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions webhook/filtered_notifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package webhook

import (
"context"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)

type FilteredNotifierParams struct {
// Events will be used to filter out webhook events. One might want only a subset of events
// If Events is nil or zero-sized, all events will be sent
Events []string
Logger logger.Logger
}

type FilteredNotifier struct {
logger logger.Logger
events []string // TODO do we really need a map[string]struct{}
queuedNotifier QueuedNotifier
}

func NewFilteredNotifier(notifier QueuedNotifier, params FilteredNotifierParams) *FilteredNotifier {
if params.Logger == nil {
params.Logger = logger.GetLogger()
}
return &FilteredNotifier{
logger: params.Logger,
events: params.Events,
queuedNotifier: notifier,
}
}

func (notifier *FilteredNotifier) QueueNotify(ctx context.Context, event *livekit.WebhookEvent) error {
if len(notifier.events) == 0 {
return notifier.queuedNotifier.QueueNotify(ctx, event)
}

for _, ev := range notifier.events {
if ev == event.Event {
return notifier.queuedNotifier.QueueNotify(ctx, event)
}
}

notifier.logger.Debugw("ignoring event: %s", event.Event)
return nil
}

func (notifier *FilteredNotifier) Stop(force bool) {
notifier.queuedNotifier.Stop(force)
}
40 changes: 31 additions & 9 deletions webhook/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,41 +24,63 @@ import (

type QueuedNotifier interface {
QueueNotify(ctx context.Context, event *livekit.WebhookEvent) error
Stop(force bool)
}

type DefaultNotifier struct {
urlNotifiers []*URLNotifier
queuedNotifiers []QueuedNotifier
}

func NewDefaultNotifier(apiKey, apiSecret string, urls []string) QueuedNotifier {
n := &DefaultNotifier{}
for _, url := range urls {
u := NewURLNotifier(URLNotifierParams{
u := NewURLNotifierWrapper(URLNotifierParams{
URL: url,
Logger: logger.GetLogger().WithComponent("webhook"),
APIKey: apiKey,
APISecret: apiSecret,
})
n.urlNotifiers = append(n.urlNotifiers, u)
n.queuedNotifiers = append(n.queuedNotifiers, u)
}
return n
}

// NewDefaultNotifierWithFilter takes an events eventsFilter that is shared across all urls.
// if eventsFilter is nil, then all events will be sent. If not only the events specified
// by eventsFilter will be sent and any other event will be ignored with a debug log
// TODO maybe add eventsFilter per url?! but it's not my use case.
func NewDefaultNotifierWithFilter(apiKey, apiSecret string, urls []string, eventsFilter []string) QueuedNotifier {
n := &DefaultNotifier{}
for _, url := range urls {
u := NewFilteredNotifier(NewURLNotifierWrapper(URLNotifierParams{
URL: url,
Logger: logger.GetLogger().WithComponent("webhook"),
APIKey: apiKey,
APISecret: apiSecret,
}), FilteredNotifierParams{
Events: eventsFilter,
Logger: logger.GetLogger().WithComponent("webhook"),
})
n.queuedNotifiers = append(n.queuedNotifiers, u)
}
return n
}

func (n *DefaultNotifier) Stop(force bool) {
wg := sync.WaitGroup{}
for _, u := range n.urlNotifiers {
for _, u := range n.queuedNotifiers {
wg.Add(1)
go func(u *URLNotifier) {
go func(qn QueuedNotifier) {
defer wg.Done()
u.Stop(force)
qn.Stop(force)
}(u)
}
wg.Wait()
}

func (n *DefaultNotifier) QueueNotify(_ context.Context, event *livekit.WebhookEvent) error {
for _, u := range n.urlNotifiers {
if err := u.QueueNotify(event); err != nil {
func (n *DefaultNotifier) QueueNotify(ctx context.Context, event *livekit.WebhookEvent) error {
for _, u := range n.queuedNotifiers {
if err := u.QueueNotify(ctx, event); err != nil {
return err
}
}
Expand Down
32 changes: 32 additions & 0 deletions webhook/url_notifier_wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package webhook

import (
"context"
"github.com/livekit/protocol/livekit"
)

type URLNotifierWrapper struct {
*URLNotifier
}

func NewURLNotifierWrapper(params URLNotifierParams) *URLNotifierWrapper {
return &URLNotifierWrapper{URLNotifier: NewURLNotifier(params)}
}

func (n *URLNotifierWrapper) QueueNotify(_ context.Context, event *livekit.WebhookEvent) error {
return n.URLNotifier.QueueNotify(event)
}
96 changes: 96 additions & 0 deletions webhook/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package webhook

import (
"context"
"github.com/livekit/protocol/logger"
"net"
"net/http"
"sync"
Expand Down Expand Up @@ -143,6 +144,80 @@ func TestURLNotifierLifecycle(t *testing.T) {
})
}

func TestFilteredNotifier(t *testing.T) {
var (
maxItr = 10
events = []string{
EventRoomStarted,
EventRoomFinished,
EventParticipantJoined,
EventParticipantLeft,
EventTrackPublished,
EventTrackUnpublished,
EventEgressStarted,
EventEgressUpdated,
EventEgressEnded,
EventIngressStarted,
EventIngressEnded,
}
nm = notifierMock{numCalled: &atomic.Int32{}}
)

t.Run("empty filter should send all events", func(t *testing.T) {
filteredNotifier := newTestFilteredNotifier(&nm, nil)
defer filteredNotifier.Stop(true)

for i := 0; i < maxItr; i++ {
for _, event := range events {
_ = filteredNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: event})
}
}

require.Equal(t, int32(maxItr*len(events)), nm.numCalled.Load())
})

t.Run("one filter", func(t *testing.T) {
events := events[:1]
filteredNotifier := newTestFilteredNotifier(&nm, events)
defer filteredNotifier.Stop(true)

for i := 0; i < maxItr; i++ {
for _, event := range events {
_ = filteredNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: event})
}
}

require.Equal(t, int32(maxItr*len(events)), nm.numCalled.Load())
})

t.Run("some filter", func(t *testing.T) {
events := events[2:5]
filteredNotifier := newTestFilteredNotifier(&nm, events)
defer filteredNotifier.Stop(true)

for i := 0; i < maxItr; i++ {
for _, event := range events {
_ = filteredNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: event})
}
}

require.Equal(t, int32(maxItr*len(events)), nm.numCalled.Load())
})

t.Run("all filter should send all events", func(t *testing.T) {
filteredNotifier := newTestFilteredNotifier(&nm, events)
defer filteredNotifier.Stop(true)

for i := 0; i < maxItr; i++ {
for _, event := range events {
_ = filteredNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: event})
}
}

require.Equal(t, int32(maxItr*len(events)), nm.numCalled.Load())
})
}

func newTestNotifier() *URLNotifier {
return NewURLNotifier(URLNotifierParams{
QueueSize: 20,
Expand All @@ -152,6 +227,27 @@ func newTestNotifier() *URLNotifier {
})
}

type notifierMock struct {
numCalled *atomic.Int32
}

func (n notifierMock) QueueNotify(ctx context.Context, event *livekit.WebhookEvent) error {
n.numCalled.Inc()
return nil
}

func (n notifierMock) Stop(force bool) {
n.numCalled.Store(0)
}

func newTestFilteredNotifier(notifier *notifierMock, events []string) *FilteredNotifier {

return NewFilteredNotifier(notifier, FilteredNotifierParams{
Events: events,
Logger: logger.GetLogger(),
})
}

type testServer struct {
handler func(r *http.Request)
server *http.Server
Expand Down
Loading