Skip to content

Commit

Permalink
Implement event subscriptions.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmalloc committed Apr 8, 2024
1 parent d6fe322 commit c1cd5b1
Show file tree
Hide file tree
Showing 8 changed files with 917 additions and 160 deletions.
171 changes: 171 additions & 0 deletions internal/eventstream/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package eventstream

import (
"context"
"fmt"
"time"

"github.com/dogmatiq/enginekit/protobuf/envelopepb"
"github.com/dogmatiq/enginekit/protobuf/uuidpb"
"github.com/dogmatiq/persistencekit/journal"
"github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal"
"github.com/dogmatiq/veracity/internal/messaging"
"github.com/dogmatiq/veracity/internal/signaling"
)

// A Subscriber is sent events from a stream, by way of a [Reader].
type Subscriber struct {
// StreamID is the ID of the stream from which events are read.
StreamID *uuidpb.UUID

// Offset is the offset of the next event to read.
//
// It must not be read or modified while the subscription is active. It is
// incremented as events are sent to the subscriber.
Offset Offset

// Filter is a predicate function that returns true if the subscriber should
// receive the event in the given envelope.
//
// It is used to avoid filling the subscriber's channel with events they are
// not interested in. It is called by the event stream worker in its own
// goroutine, and hence must not block.
Filter func(*envelopepb.Envelope) bool

// Events is the channel to which the subscriber's events are sent.
Events chan<- Event

canceled signaling.Event
}

// A Reader reads ordered events from a stream.
type Reader struct {
Journals journal.BinaryStore
SubscribeQueue *messaging.RequestQueue[*Subscriber]
UnsubscribeQueue *messaging.RequestQueue[*Subscriber]
}

// Read reads events from a stream and sends them to the given subscriber.
//
// It starts by reading events directly from the stream's journal records. Once
// it has "caught up" to the end of the journal it receives events in
// "real-time" from the supervisor of that stream.
//
// If the subscriber's channel becomes full, it reverts to reading from the
// journal until it catches up again.
func (r *Reader) Read(ctx context.Context, sub *Subscriber) error {
for {
if err := r.readHistorical(ctx, sub); err != nil {
return err
}

if err := r.readContemporary(ctx, sub); err != nil {
return err
}
}
}

func (r *Reader) readHistorical(ctx context.Context, sub *Subscriber) error {
j, err := eventstreamjournal.Open(ctx, r.Journals, sub.StreamID)
if err != nil {
return err
}
defer j.Close()

searchBegin, searchEnd, err := j.Bounds(ctx)
if err != nil {
return err
}

return journal.RangeFromSearchResult(
ctx,
j,
searchBegin, searchEnd,
eventstreamjournal.SearchByOffset(uint64(sub.Offset)),
func(
ctx context.Context,
pos journal.Position,
rec *eventstreamjournal.Record,
) (bool, error) {
begin := Offset(rec.StreamOffsetBefore)
end := Offset(rec.StreamOffsetAfter)

if begin == end {
// no events in this record
return true, nil
}

if sub.Offset < begin || sub.Offset >= end {
return false, fmt.Errorf(
"event stream integrity error at journal position %d: expected event at offset %d, but found offset range [%d, %d)",
pos,
sub.Offset,
begin,
end,
)
}

index := sub.Offset - begin

for _, env := range rec.GetEventsAppended().Events[index:] {
if !sub.Filter(env) {
sub.Offset++
continue
}

select {
case <-ctx.Done():
return false, ctx.Err()
case sub.Events <- Event{sub.StreamID, sub.Offset, env}:
sub.Offset++
}
}

return true, nil
},
)
}

func (r *Reader) readContemporary(ctx context.Context, sub *Subscriber) error {
// TODO: remote read

if err := r.subscribe(ctx, sub); err != nil {
return err
}
defer r.unsubscribe(ctx, sub)

select {
case <-ctx.Done():
return ctx.Err()
case <-sub.canceled.Signaled():
return nil
}
}

func (r *Reader) subscribe(ctx context.Context, sub *Subscriber) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second) // TODO: make configurable
cancel()

if err := r.SubscribeQueue.Do(ctx, sub); err != nil {
return fmt.Errorf("cannot subscribe to event stream: %w", err)
}

return nil
}

func (r *Reader) unsubscribe(ctx context.Context, sub *Subscriber) error {
ctx, cancel := context.WithCancel(context.WithoutCancel(ctx))
defer cancel()

// Cancel the unsubscribe context when the subscription is canceled,
// regardless of the reason.
//
// This handles the situation where the subscription is canceled because the
// worker shutdown (and hence wont service the unsubscribe request).
go func() {
<-sub.canceled.Signaled()
cancel()
}()

return r.UnsubscribeQueue.Do(ctx, sub)
}
70 changes: 59 additions & 11 deletions internal/eventstream/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ import (
"github.com/dogmatiq/veracity/internal/signaling"
)

// errShuttingDown is sent in response to append requests that are not serviced
// errShuttingDown is sent in response to requests that are not serviced
// because of an error within the event stream supervisor or a worker.
var errShuttingDown = errors.New("event stream sub-system is shutting down")

// A Supervisor coordinates event stream workers.
type Supervisor struct {
Journals journal.BinaryStore
AppendQueue messaging.ExchangeQueue[AppendRequest, AppendResponse]
Logger *slog.Logger
Journals journal.BinaryStore
AppendQueue messaging.ExchangeQueue[AppendRequest, AppendResponse]
SubscribeQueue messaging.RequestQueue[*Subscriber]
UnsubscribeQueue messaging.RequestQueue[*Subscriber]
Logger *slog.Logger

shutdown signaling.Latch
workers uuidpb.Map[*worker]
Expand Down Expand Up @@ -64,12 +66,18 @@ func (s *Supervisor) idleState(ctx context.Context) fsm.Action {
return fsm.StayInCurrentState()

case ex := <-s.AppendQueue.Recv():
return fsm.With(ex).EnterState(s.forwardAppendState)
return fsm.With(ex).EnterState(s.appendState)

case req := <-s.SubscribeQueue.Recv():
return fsm.With(req).EnterState(s.subscribeState)

case req := <-s.UnsubscribeQueue.Recv():
return fsm.With(req).EnterState(s.unsubscribeState)
}
}

// forwardAppendState forwards an append request to the appropriate worker.
func (s *Supervisor) forwardAppendState(
// appendState forwards an append request to the appropriate worker.
func (s *Supervisor) appendState(
ctx context.Context,
ex messaging.Exchange[AppendRequest, AppendResponse],
) fsm.Action {
Expand All @@ -79,20 +87,59 @@ func (s *Supervisor) forwardAppendState(
return fsm.Fail(err)
}

return forwardToWorker(ctx, s, w.AppendQueue.Send(), ex)
}

// subscribeState forwards a subscribe request to the appropriate worker.
func (s *Supervisor) subscribeState(
ctx context.Context,
req messaging.Request[*Subscriber],
) fsm.Action {
w, err := s.workerByStreamID(ctx, req.Request.StreamID)
if err != nil {
req.Err(errShuttingDown)
return fsm.Fail(err)
}

return forwardToWorker(ctx, s, w.SubscribeQueue.Send(), req)
}

// unsubscribeState forwards an unsubscribe request to the appropriate worker.
func (s *Supervisor) unsubscribeState(
ctx context.Context,
req messaging.Request[*Subscriber],
) fsm.Action {
w, ok := s.workers.TryGet(req.Request.StreamID)
if !ok {
req.Ok()
return fsm.EnterState(s.idleState)
}

return forwardToWorker(ctx, s, w.UnsubscribeQueue.Send(), req)
}

func forwardToWorker[
T interface{ Err(error) },
](
ctx context.Context,
s *Supervisor,
q chan<- T,
v T,
) fsm.Action {
select {
case <-ctx.Done():
ex.Err(errShuttingDown)
v.Err(errShuttingDown)
return fsm.Stop()

case res := <-s.workerStopped:
s.workers.Delete(res.StreamID)
if res.Err != nil {
ex.Err(errShuttingDown)
v.Err(errShuttingDown)
return fsm.Fail(res.Err)
}
return fsm.StayInCurrentState()

case w.AppendQueue.Send() <- ex:
case q <- v:
return fsm.EnterState(s.idleState)
}
}
Expand Down Expand Up @@ -142,7 +189,8 @@ func (s *Supervisor) startWorkerForStreamID(
}

w := &worker{
Journal: j,
StreamID: streamID,
Journal: j,
Logger: s.Logger.With(
slog.String("stream_id", streamID.AsString()),
),
Expand Down
Loading

0 comments on commit c1cd5b1

Please sign in to comment.