From 91c00564cfb4a97dc37f9c5814a663c33418ea3f Mon Sep 17 00:00:00 2001 From: James Harris Date: Tue, 2 Apr 2024 08:00:40 +1000 Subject: [PATCH 1/4] Implement event subscriptions. --- internal/eventstream/reader.go | 171 +++++++++++++++ internal/eventstream/supervisor.go | 70 +++++- internal/eventstream/worker.go | 267 ++++++++++------------- internal/eventstream/workerappend.go | 226 +++++++++++++++++++ internal/eventstream/workercache.go | 80 +++++++ internal/eventstream/workeridle.go | 60 +++++ internal/eventstream/workersubscriber.go | 139 ++++++++++++ internal/messaging/request.go | 64 ++++++ 8 files changed, 917 insertions(+), 160 deletions(-) create mode 100644 internal/eventstream/reader.go create mode 100644 internal/eventstream/workerappend.go create mode 100644 internal/eventstream/workercache.go create mode 100644 internal/eventstream/workeridle.go create mode 100644 internal/eventstream/workersubscriber.go create mode 100644 internal/messaging/request.go diff --git a/internal/eventstream/reader.go b/internal/eventstream/reader.go new file mode 100644 index 00000000..2d04baa2 --- /dev/null +++ b/internal/eventstream/reader.go @@ -0,0 +1,171 @@ +package eventstream + +import ( + "context" + "fmt" + "time" + + "github.com/dogmatiq/enginekit/protobuf/envelopepb" + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + "github.com/dogmatiq/persistencekit/journal" + "github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal" + "github.com/dogmatiq/veracity/internal/messaging" + "github.com/dogmatiq/veracity/internal/signaling" +) + +// A Subscriber is sent events from a stream, by way of a [Reader]. +type Subscriber struct { + // StreamID is the ID of the stream from which events are read. + StreamID *uuidpb.UUID + + // Offset is the offset of the next event to read. + // + // It must not be read or modified while the subscription is active. It is + // incremented as events are sent to the subscriber. + Offset Offset + + // Filter is a predicate function that returns true if the subscriber should + // receive the event in the given envelope. + // + // It is used to avoid filling the subscriber's channel with events they are + // not interested in. It is called by the event stream worker in its own + // goroutine, and hence must not block. + Filter func(*envelopepb.Envelope) bool + + // Events is the channel to which the subscriber's events are sent. + Events chan<- Event + + canceled signaling.Event +} + +// A Reader reads ordered events from a stream. +type Reader struct { + Journals journal.BinaryStore + SubscribeQueue *messaging.RequestQueue[*Subscriber] + UnsubscribeQueue *messaging.RequestQueue[*Subscriber] +} + +// Read reads events from a stream and sends them to the given subscriber. +// +// It starts by reading events directly from the stream's journal records. Once +// it has "caught up" to the end of the journal it receives events in +// "real-time" from the supervisor of that stream. +// +// If the subscriber's channel becomes full, it reverts to reading from the +// journal until it catches up again. +func (r *Reader) Read(ctx context.Context, sub *Subscriber) error { + for { + if err := r.readHistorical(ctx, sub); err != nil { + return err + } + + if err := r.readContemporary(ctx, sub); err != nil { + return err + } + } +} + +func (r *Reader) readHistorical(ctx context.Context, sub *Subscriber) error { + j, err := eventstreamjournal.Open(ctx, r.Journals, sub.StreamID) + if err != nil { + return err + } + defer j.Close() + + searchBegin, searchEnd, err := j.Bounds(ctx) + if err != nil { + return err + } + + return journal.RangeFromSearchResult( + ctx, + j, + searchBegin, searchEnd, + eventstreamjournal.SearchByOffset(uint64(sub.Offset)), + func( + ctx context.Context, + pos journal.Position, + rec *eventstreamjournal.Record, + ) (bool, error) { + begin := Offset(rec.StreamOffsetBefore) + end := Offset(rec.StreamOffsetAfter) + + if begin == end { + // no events in this record + return true, nil + } + + if sub.Offset < begin || sub.Offset >= end { + return false, fmt.Errorf( + "event stream integrity error at journal position %d: expected event at offset %d, but found offset range [%d, %d)", + pos, + sub.Offset, + begin, + end, + ) + } + + index := sub.Offset - begin + + for _, env := range rec.GetEventsAppended().Events[index:] { + if !sub.Filter(env) { + sub.Offset++ + continue + } + + select { + case <-ctx.Done(): + return false, ctx.Err() + case sub.Events <- Event{sub.StreamID, sub.Offset, env}: + sub.Offset++ + } + } + + return true, nil + }, + ) +} + +func (r *Reader) readContemporary(ctx context.Context, sub *Subscriber) error { + // TODO: remote read + + if err := r.subscribe(ctx, sub); err != nil { + return err + } + defer r.unsubscribe(ctx, sub) + + select { + case <-ctx.Done(): + return ctx.Err() + case <-sub.canceled.Signaled(): + return nil + } +} + +func (r *Reader) subscribe(ctx context.Context, sub *Subscriber) error { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) // TODO: make configurable + cancel() + + if err := r.SubscribeQueue.Do(ctx, sub); err != nil { + return fmt.Errorf("cannot subscribe to event stream: %w", err) + } + + return nil +} + +func (r *Reader) unsubscribe(ctx context.Context, sub *Subscriber) error { + ctx, cancel := context.WithCancel(context.WithoutCancel(ctx)) + defer cancel() + + // Cancel the unsubscribe context when the subscription is canceled, + // regardless of the reason. + // + // This handles the situation where the subscription is canceled because the + // worker shutdown (and hence wont service the unsubscribe request). + go func() { + <-sub.canceled.Signaled() + cancel() + }() + + return r.UnsubscribeQueue.Do(ctx, sub) +} diff --git a/internal/eventstream/supervisor.go b/internal/eventstream/supervisor.go index dd5e0fc9..d514435f 100644 --- a/internal/eventstream/supervisor.go +++ b/internal/eventstream/supervisor.go @@ -13,15 +13,17 @@ import ( "github.com/dogmatiq/veracity/internal/signaling" ) -// errShuttingDown is sent in response to append requests that are not serviced +// errShuttingDown is sent in response to requests that are not serviced // because of an error within the event stream supervisor or a worker. var errShuttingDown = errors.New("event stream sub-system is shutting down") // A Supervisor coordinates event stream workers. type Supervisor struct { - Journals journal.BinaryStore - AppendQueue messaging.ExchangeQueue[AppendRequest, AppendResponse] - Logger *slog.Logger + Journals journal.BinaryStore + AppendQueue messaging.ExchangeQueue[AppendRequest, AppendResponse] + SubscribeQueue messaging.RequestQueue[*Subscriber] + UnsubscribeQueue messaging.RequestQueue[*Subscriber] + Logger *slog.Logger shutdown signaling.Latch workers uuidpb.Map[*worker] @@ -64,12 +66,18 @@ func (s *Supervisor) idleState(ctx context.Context) fsm.Action { return fsm.StayInCurrentState() case ex := <-s.AppendQueue.Recv(): - return fsm.With(ex).EnterState(s.forwardAppendState) + return fsm.With(ex).EnterState(s.appendState) + + case req := <-s.SubscribeQueue.Recv(): + return fsm.With(req).EnterState(s.subscribeState) + + case req := <-s.UnsubscribeQueue.Recv(): + return fsm.With(req).EnterState(s.unsubscribeState) } } -// forwardAppendState forwards an append request to the appropriate worker. -func (s *Supervisor) forwardAppendState( +// appendState forwards an append request to the appropriate worker. +func (s *Supervisor) appendState( ctx context.Context, ex messaging.Exchange[AppendRequest, AppendResponse], ) fsm.Action { @@ -79,20 +87,59 @@ func (s *Supervisor) forwardAppendState( return fsm.Fail(err) } + return forwardToWorker(ctx, s, w.AppendQueue.Send(), ex) +} + +// subscribeState forwards a subscribe request to the appropriate worker. +func (s *Supervisor) subscribeState( + ctx context.Context, + req messaging.Request[*Subscriber], +) fsm.Action { + w, err := s.workerByStreamID(ctx, req.Request.StreamID) + if err != nil { + req.Err(errShuttingDown) + return fsm.Fail(err) + } + + return forwardToWorker(ctx, s, w.SubscribeQueue.Send(), req) +} + +// unsubscribeState forwards an unsubscribe request to the appropriate worker. +func (s *Supervisor) unsubscribeState( + ctx context.Context, + req messaging.Request[*Subscriber], +) fsm.Action { + w, ok := s.workers.TryGet(req.Request.StreamID) + if !ok { + req.Ok() + return fsm.EnterState(s.idleState) + } + + return forwardToWorker(ctx, s, w.UnsubscribeQueue.Send(), req) +} + +func forwardToWorker[ + T interface{ Err(error) }, +]( + ctx context.Context, + s *Supervisor, + q chan<- T, + v T, +) fsm.Action { select { case <-ctx.Done(): - ex.Err(errShuttingDown) + v.Err(errShuttingDown) return fsm.Stop() case res := <-s.workerStopped: s.workers.Delete(res.StreamID) if res.Err != nil { - ex.Err(errShuttingDown) + v.Err(errShuttingDown) return fsm.Fail(res.Err) } return fsm.StayInCurrentState() - case w.AppendQueue.Send() <- ex: + case q <- v: return fsm.EnterState(s.idleState) } } @@ -142,7 +189,8 @@ func (s *Supervisor) startWorkerForStreamID( } w := &worker{ - Journal: j, + StreamID: streamID, + Journal: j, Logger: s.Logger.With( slog.String("stream_id", streamID.AsString()), ), diff --git a/internal/eventstream/worker.go b/internal/eventstream/worker.go index 3d2dab45..b8c51f60 100644 --- a/internal/eventstream/worker.go +++ b/internal/eventstream/worker.go @@ -5,44 +5,63 @@ import ( "log/slog" "time" + "github.com/dogmatiq/enginekit/protobuf/uuidpb" "github.com/dogmatiq/persistencekit/journal" "github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal" - "github.com/dogmatiq/veracity/internal/fsm" "github.com/dogmatiq/veracity/internal/messaging" "github.com/dogmatiq/veracity/internal/signaling" ) -const defaultIdleTimeout = 5 * time.Minute - // A worker manages the state of an event stream. type worker struct { + // StreamID is the ID of the event stream that the worker manages. + StreamID *uuidpb.UUID + // Journal stores the event stream's state. Journal journal.Journal[*eventstreamjournal.Record] // AppendQueue is a queue of requests to append events to the stream. AppendQueue messaging.ExchangeQueue[AppendRequest, AppendResponse] + // SubscribeQueue is a queue of requests to subscribe to the stream. + SubscribeQueue messaging.RequestQueue[*Subscriber] + + // UnsubscribeQueue is a queue of requests to unsubscribe from the stream. + UnsubscribeQueue messaging.RequestQueue[*Subscriber] + // Shutdown signals the worker to stop when it next becomes idle. Shutdown signaling.Latch - // IdleTimeout is the maximum amount of time the worker will sit idle before - // shutting down. If it is non-positive, defaultIdleTimeout is used. - IdleTimeout time.Duration - // Logger is the target for log messages about the stream. Logger *slog.Logger - pos journal.Position - off Offset + nextPos journal.Position + nextOffset Offset + recentEvents []Event + idleTimer *time.Timer + subscribers map[*Subscriber]struct{} } // Run starts the worker. // -// It processes requests until ctx is canceled, r.Shutdown is latched, or -// an error occurrs. +// It processes requests until ctx is canceled, an error occurs, the worker is +// shutdown by the supervisor, or the idle timeout expires. func (w *worker) Run(ctx context.Context) (err error) { - w.Logger.DebugContext(ctx, "event stream worker started") - defer w.Logger.DebugContext(ctx, "event stream worker stopped") + defer func() { + if err != nil { + w.Logger.Debug( + "event stream worker stopped due to an error", + slog.String("error", err.Error()), + slog.Uint64("next_journal_position", uint64(w.nextPos)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + slog.Int("subscriber_count", len(w.subscribers)), + ) + } + + for sub := range w.subscribers { + sub.canceled.Signal() + } + }() pos, rec, ok, err := journal.LastRecord(ctx, w.Journal) if err != nil { @@ -50,165 +69,115 @@ func (w *worker) Run(ctx context.Context) (err error) { } if ok { - w.pos = pos + 1 - w.off = Offset(rec.StreamOffsetAfter) + w.nextPos = pos + 1 + w.nextOffset = Offset(rec.StreamOffsetAfter) + + w.Logger.Debug( + "event stream journal has existing records", + slog.Uint64("next_journal_position", uint64(w.nextPos)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + ) + } else { + w.Logger.Debug("event stream journal is empty") } - return fsm.Start(ctx, w.idleState) -} + w.resetIdleTimer() + defer w.idleTimer.Stop() -// idleState waits for a request or the shutdown signal. -func (w *worker) idleState(ctx context.Context) fsm.Action { - duration := w.IdleTimeout - if duration <= 0 { - duration = defaultIdleTimeout + for { + ok, err := w.tick(ctx) + if !ok || err != nil { + return err + } } +} - timeout := time.NewTimer(duration) - defer timeout.Stop() - +// tick handles a single event stream operation. +func (w *worker) tick(ctx context.Context) (bool, error) { select { case <-ctx.Done(): - return fsm.Stop() - - case <-w.Shutdown.Signaled(): - return fsm.Stop() - - case <-timeout.C: - return fsm.Stop() + return false, ctx.Err() case ex := <-w.AppendQueue.Recv(): - return fsm.With(ex).EnterState(w.handleAppendState) - } -} + res, err := w.handleAppend(ctx, ex.Request) + if err != nil { + ex.Err(errShuttingDown) + } else { + ex.Ok(res) + } + return true, err -// handleAppendState appends events to the stream. -func (w *worker) handleAppendState( - ctx context.Context, - ex messaging.Exchange[AppendRequest, AppendResponse], -) fsm.Action { - n := len(ex.Request.Events) - if n == 0 { - panic("cannot record zero events") - } + case ex := <-w.SubscribeQueue.Recv(): + w.handleSubscribe(ex.Request) + ex.Ok() + return true, nil - res, err := w.appendEvents(ctx, ex.Request) - if err != nil { - ex.Err(err) - return fsm.Fail(err) - } + case ex := <-w.UnsubscribeQueue.Recv(): + w.handleUnsubscribe(ex.Request) + ex.Ok() + return true, nil - ex.Ok(res) + case <-w.idleTimer.C: + return w.handleIdle(ctx) - if res.AppendedByPriorAttempt { - return fsm.EnterState(w.idleState) + case <-w.Shutdown.Signaled(): + w.Logger.Debug( + "event stream worker stopped by supervisor", + slog.Uint64("next_journal_position", uint64(w.nextPos)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + slog.Int("subscriber_count", len(w.subscribers)), + ) + return false, nil } - - return fsm.EnterState(w.idleState) } -// appendEvents writes the events in req to the journal if they have not been -// written already. It returns the offset of the first event. -func (w *worker) appendEvents( - ctx context.Context, - req AppendRequest, -) (AppendResponse, error) { - if w.mightBeDuplicates(req) { - if rec, err := w.findAppendRecord(ctx, req); err == nil { - for i, e := range req.Events { - w.Logger.WarnContext( - ctx, - "ignored event that has already been appended to the stream", - slog.Uint64("stream_offset", uint64(rec.StreamOffsetBefore)+uint64(i)), - slog.String("message_id", e.MessageId.AsString()), - slog.String("description", e.Description), - ) +// catchUpWithJournal reads the journal to catch up with any records that have +// been appended by other nodes. +// +// It is called whenever the worker has some indication that it may be out of +// date, such as when there is an OCC conflict. It is also called periodically +// by otherwise idle workers. +func (w *worker) catchUpWithJournal(ctx context.Context) error { + recordCount := 0 + eventCount := 0 + + if err := w.Journal.Range( + ctx, + w.nextPos, + func( + ctx context.Context, + pos journal.Position, + rec *eventstreamjournal.Record, + ) (ok bool, err error) { + recordCount++ + + events := rec.GetEventsAppended().GetEvents() + if len(events) != 0 { + if eventCount == 0 { + w.Logger.Warn("event stream contains events that were not appended by this worker") + } + w.publishEvents(Offset(rec.StreamOffsetBefore), events) + eventCount += len(events) } - return AppendResponse{ - BeginOffset: Offset(rec.StreamOffsetBefore), - EndOffset: Offset(rec.StreamOffsetAfter), - AppendedByPriorAttempt: true, - }, nil - } else if err != journal.ErrNotFound { - return AppendResponse{}, err - } - } - - before := w.off - after := w.off + Offset(len(req.Events)) + w.nextPos = pos + 1 + w.nextOffset = Offset(rec.StreamOffsetAfter) - if err := w.Journal.Append( - ctx, - w.pos, - eventstreamjournal. - NewRecordBuilder(). - WithStreamOffsetBefore(uint64(before)). - WithStreamOffsetAfter(uint64(after)). - WithEventsAppended(&eventstreamjournal.EventsAppended{ - Events: req.Events, - }). - Build(), + return true, nil + }, ); err != nil { - return AppendResponse{}, err + return err } - for i, e := range req.Events { - w.Logger.InfoContext( - ctx, - "appended event to the stream", - slog.Uint64("stream_offset", uint64(before)+uint64(i)), - slog.String("message_id", e.MessageId.AsString()), - slog.String("description", e.Description), + if recordCount != 0 { + w.Logger.Debug( + "caught up to the end of the event stream journal", + slog.Int("record_count", recordCount), + slog.Int("event_count", eventCount), + slog.Uint64("next_journal_position", uint64(w.nextPos)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), ) } - w.pos++ - w.off = after - - return AppendResponse{ - BeginOffset: before, - EndOffset: after, - AppendedByPriorAttempt: false, - }, nil -} - -// mightBeDuplicates returns true if it's possible that the events in req have -// already been appended to the stream. -func (w *worker) mightBeDuplicates(req AppendRequest) bool { - // The events can't be duplicates if the lowest possible offset that - // they could have been appended is the current end of the stream. - return req.LowestPossibleOffset < w.off -} - -// findAppendRecord searches the journal to find the record that contains the -// append operation for the given events. -// -// TODO: This is a brute-force approach that searches the journal directly -// (though efficiently). We could improve upon this approach by keeping some -// in-memory state of recent event IDs (either explicitly, or via a bloom -// filter, for example). -func (w *worker) findAppendRecord( - ctx context.Context, - req AppendRequest, -) (*eventstreamjournal.Record, error) { - return journal.ScanFromSearchResult( - ctx, - w.Journal, - 0, - w.pos, - eventstreamjournal.SearchByOffset(uint64(req.LowestPossibleOffset)), - func( - ctx context.Context, - _ journal.Position, - rec *eventstreamjournal.Record, - ) (*eventstreamjournal.Record, bool, error) { - if op := rec.GetEventsAppended(); op != nil { - targetID := req.Events[0].MessageId - candidateID := op.Events[0].MessageId - return rec, candidateID.Equal(targetID), nil - } - return nil, false, nil - }, - ) + return nil } diff --git a/internal/eventstream/workerappend.go b/internal/eventstream/workerappend.go new file mode 100644 index 00000000..ad0d4158 --- /dev/null +++ b/internal/eventstream/workerappend.go @@ -0,0 +1,226 @@ +package eventstream + +import ( + "context" + "log/slog" + + "github.com/dogmatiq/enginekit/protobuf/envelopepb" + "github.com/dogmatiq/persistencekit/journal" + "github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal" +) + +func (w *worker) handleAppend( + ctx context.Context, + req AppendRequest, +) (AppendResponse, error) { + if !req.StreamID.Equal(w.StreamID) { + panic("received request for a different stream ID") + } + + if len(req.Events) == 0 { + // We panic rather than just failing the exchange because we never want + // empty requests to occupy space in the worker's queue. The sender + // should simply not send empty requests. + panic("received append request with no events") + } + + defer w.resetIdleTimer() + + if req.LowestPossibleOffset > w.nextOffset { + if err := w.catchUpWithJournal(ctx); err != nil { + return AppendResponse{}, err + } + } + + for { + res, err := w.findPriorAppend(ctx, req) + if err != nil { + return AppendResponse{}, err + } + + if res.AppendedByPriorAttempt { + for index, event := range req.Events { + w.Logger.Info( + "discarded duplicate event", + slog.Uint64("stream_offset", uint64(res.BeginOffset)+uint64(index)), + slog.String("message_id", event.MessageId.AsString()), + slog.String("description", event.Description), + ) + } + return res, nil + } + + res, err = w.writeEventsToJournal(ctx, req) + if err == nil { + w.publishEvents(res.BeginOffset, req.Events) + return res, nil + } + + if err != journal.ErrConflict { + return AppendResponse{}, err + } + + if err := w.catchUpWithJournal(ctx); err != nil { + return AppendResponse{}, err + } + } +} + +// findPriorAppend returns an [AppendResponse] if the given [AppendRequest] has +// already been handled. +func (w *worker) findPriorAppend( + ctx context.Context, + req AppendRequest, +) (AppendResponse, error) { + // If the lowest possible offset is ahead of the next offset the request is + // malformed. Either theres a bug in Veracity, or the journal has suffered + // catastrophic data loss. + if req.LowestPossibleOffset > w.nextOffset { + panic("lowest possible offset is greater than the next offset") + } + + // If the lowest possible offset is equal to the next offset, no events + // have been recorded since the the request was created, and hence there + // can be no prior append attempt. + if req.LowestPossibleOffset == w.nextOffset { + return AppendResponse{}, nil + } + + // If the lowest possible offset is in the cache, we can check for + // duplicates without using the journal. We search using the last event in + // the request as it's the most likely to still be in the cache. + lowestPossibleOffset := req.LowestPossibleOffset + Offset(len(req.Events)) + + if cacheIndex := w.findInCache(lowestPossibleOffset); cacheIndex != -1 { + lastMessageIndex := len(req.Events) - 1 + lastMessageID := req.Events[lastMessageIndex].MessageId + + for _, event := range w.recentEvents[cacheIndex:] { + if event.Envelope.MessageId.Equal(lastMessageID) { + return AppendResponse{ + // We know the offset of the last message in the request, so + // we can compute the offset of the first message, even if + // it's no longer in the cache. + BeginOffset: event.Offset - Offset(lastMessageIndex), + EndOffset: event.Offset + 1, + AppendedByPriorAttempt: true, + }, nil + } + } + } + + // Finally, we search the journal for the record containing the events. + rec, err := journal.ScanFromSearchResult( + ctx, + w.Journal, + 0, + w.nextPos, + eventstreamjournal.SearchByOffset(uint64(req.LowestPossibleOffset)), + func( + _ context.Context, + _ journal.Position, + rec *eventstreamjournal.Record, + ) (*eventstreamjournal.Record, bool, error) { + if op := rec.GetEventsAppended(); op != nil { + targetID := req.Events[0].MessageId + candidateID := op.Events[0].MessageId + return rec, candidateID.Equal(targetID), nil + } + return nil, false, nil + }, + ) + if err != nil { + return AppendResponse{}, journal.IgnoreNotFound(err) + } + + return AppendResponse{ + BeginOffset: Offset(rec.StreamOffsetBefore), + EndOffset: Offset(rec.StreamOffsetAfter), + AppendedByPriorAttempt: true, + }, nil +} + +func (w *worker) writeEventsToJournal( + ctx context.Context, + req AppendRequest, +) (AppendResponse, error) { + before := w.nextOffset + after := w.nextOffset + Offset(len(req.Events)) + + if err := w.Journal.Append( + ctx, + w.nextPos, + eventstreamjournal. + NewRecordBuilder(). + WithStreamOffsetBefore(uint64(before)). + WithStreamOffsetAfter(uint64(after)). + WithEventsAppended(&eventstreamjournal.EventsAppended{ + Events: req.Events, + }). + Build(), + ); err != nil { + return AppendResponse{}, err + } + + for index, event := range req.Events { + w.Logger.Info( + "appended event to the stream", + slog.Uint64("journal_position", uint64(w.nextPos)), + slog.Uint64("stream_offset", uint64(before)+uint64(index)), + slog.String("message_id", event.MessageId.AsString()), + slog.String("description", event.Description), + ) + } + + w.nextPos++ + w.nextOffset = after + + return AppendResponse{ + BeginOffset: before, + EndOffset: after, + AppendedByPriorAttempt: false, + }, nil +} + +// publishEvents publishes the events to both the recent event cache and any +// interested subscribers. +func (w *worker) publishEvents( + offset Offset, + events []*envelopepb.Envelope, +) { + w.growCache(len(events)) + + for _, env := range events { + event := Event{w.StreamID, offset, env} + offset++ + + w.appendEventToCache(event) + + if len(w.subscribers) == 0 { + continue + } + + var delivered, filtered, canceled int + + for sub := range w.subscribers { + switch w.deliverEventToSubscriber(event, sub) { + case eventDelivered: + delivered++ + case eventFiltered: + filtered++ + case subscriptionCanceled: + canceled++ + } + } + + w.Logger.Debug( + "event published to subscribers", + slog.Uint64("stream_offset", uint64(event.Offset)), + slog.String("message_id", env.MessageId.AsString()), + slog.String("description", env.Description), + slog.Int("delivered_count", delivered), + slog.Int("filtered_count", filtered), + slog.Int("canceled_count", canceled), + ) + } +} diff --git a/internal/eventstream/workercache.go b/internal/eventstream/workercache.go new file mode 100644 index 00000000..072ecb5e --- /dev/null +++ b/internal/eventstream/workercache.go @@ -0,0 +1,80 @@ +package eventstream + +import ( + "slices" + "time" +) + +const ( + // maxCacheAge is the maximum age of an event that will be retained in the + // cache of recent events. + maxCacheAge = 1 * time.Minute + + // maxCacheCapacity is the maximum number of events that will be retained in + // the cache of recent events. + maxCacheCapacity = 1000 +) + +// findInCache returns the index of the event with the given offset in the cache +// of recent events, or -1 if the event is not in the cache. +func (w *worker) findInCache(offset Offset) int { + begin := w.nextOffset - Offset(len(w.recentEvents)) + + if begin <= offset && offset < w.nextOffset { + return int(offset - begin) + } + + return -1 +} + +// growCache grows the cache capacity to fit an additional n events. It removes +// old events if necessary. +// +// It returns the number of events that may be added to the cache. +func (w *worker) growCache(n int) int { + begin := 0 + end := len(w.recentEvents) + + if end > maxCacheCapacity { + panic("cache is over capacity, always use appendToCache() to add events") + } + + if n >= maxCacheCapacity { + // We've requested the entire cache, so just clear it entirely. + end = 0 + n = maxCacheCapacity + } else { + // Otherwise, first remove any events that are older than the cache TTL. + for index, event := range w.recentEvents[begin:end] { + createdAt := event.Envelope.CreatedAt.AsTime() + + if time.Since(createdAt) < maxCacheAge { + begin += index + break + } + } + + // Then, if we still don't have enough space, remove the oldest events. + capacity := end - begin + n + if capacity > maxCacheCapacity { + begin += capacity - maxCacheCapacity + } + } + + // Note, the slice indices are computed without modifying the slice so that + // we only perform a single copy operation. + copy(w.recentEvents, w.recentEvents[begin:end]) + + w.recentEvents = w.recentEvents[:end-begin] + w.recentEvents = slices.Grow(w.recentEvents, n) + + return n +} + +// appendEventToCache appends the given event to the cache of recent events. +func (w *worker) appendEventToCache(event Event) { + if len(w.recentEvents) == cap(w.recentEvents) { + panic("cache is at capacity, call purgeCache() before appending") + } + w.recentEvents = append(w.recentEvents, event) +} diff --git a/internal/eventstream/workeridle.go b/internal/eventstream/workeridle.go new file mode 100644 index 00000000..49e20550 --- /dev/null +++ b/internal/eventstream/workeridle.go @@ -0,0 +1,60 @@ +package eventstream + +import ( + "context" + "log/slog" + "time" +) + +const ( + // shutdownTimeout is the amount of time a worker WITH NO SUBSCRIBERS will + // wait after appending events before shutting down. + shutdownTimeout = 5 * time.Minute + + // catchUpTimeout is the amount of time a worker WITH SUBSCRIBERS will wait + // after appending events before "catching up" with any journal records that + // have been appended by other nodes. + catchUpTimeout = 10 * time.Second +) + +// resetIdleTimer starts or resets the idle timer. +func (w *worker) resetIdleTimer() { + timeout := shutdownTimeout + if len(w.subscribers) > 0 { + timeout = catchUpTimeout + } + + if w.idleTimer == nil { + w.idleTimer = time.NewTimer(timeout) + } else { + if !w.idleTimer.Stop() { + <-w.idleTimer.C + } + w.idleTimer.Reset(timeout) + } +} + +// handleIdle is called when the worker has not appended any new events for some +// period of time. +// +// If there are no subscribers, it returns false, indicating that the worker +// should shutdown. Otherwise, it reads the journal to see if there are new +// events to deliver to the subscribers. +func (w *worker) handleIdle(ctx context.Context) (bool, error) { + if len(w.subscribers) == 0 { + w.Logger.Debug( + "event stream worker stopped due to inactivity", + slog.Uint64("next_journal_position", uint64(w.nextPos)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + ) + return false, nil + } + + if err := w.catchUpWithJournal(ctx); err != nil { + return false, err + } + + w.resetIdleTimer() + + return true, nil +} diff --git a/internal/eventstream/workersubscriber.go b/internal/eventstream/workersubscriber.go new file mode 100644 index 00000000..eb8a2325 --- /dev/null +++ b/internal/eventstream/workersubscriber.go @@ -0,0 +1,139 @@ +package eventstream + +import ( + "fmt" + "log/slog" +) + +// handleSubscribe adds sub to the subscriber list. +// +// It delivers any cached events that the subscriber has not yet seen. If the +// subscriber's requested event is older than the events in the cache the +// subscription is canceled immediately. +func (w *worker) handleSubscribe(sub *Subscriber) { + if !sub.StreamID.Equal(w.StreamID) { + panic("received request for a different stream ID") + } + + if w.subscribers == nil { + w.subscribers = map[*Subscriber]struct{}{} + } + w.subscribers[sub] = struct{}{} + + w.Logger.Debug( + "subscription activated", + slog.String("channel_address", fmt.Sprint(sub.Events)), + slog.Int("channel_capacity", cap(sub.Events)), + slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), + slog.Int("subscriber_count", len(w.subscribers)), + slog.Uint64("requested_stream_offset", uint64(sub.Offset)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + ) + + if sub.Offset >= w.nextOffset { + return + } + + index := w.findInCache(sub.Offset) + + if index == -1 { + sub.canceled.Signal() + w.Logger.Warn( + "subscription canceled immediately due request for historical events", + slog.String("channel_address", fmt.Sprint(sub.Events)), + slog.Int("channel_capacity", cap(sub.Events)), + slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), + slog.Int("subscriber_count", len(w.subscribers)), + slog.Int("cached_event_count", len(w.recentEvents)), + slog.Uint64("requested_stream_offset", uint64(sub.Offset)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + ) + return + } + + for _, event := range w.recentEvents[index:] { + if w.deliverEventToSubscriber(event, sub) == subscriptionCanceled { + return + } + } +} + +// handleUnsubscribe removes sub from the subscriber list. +func (w *worker) handleUnsubscribe(sub *Subscriber) { + if !sub.StreamID.Equal(w.StreamID) { + panic("received request for a different stream ID") + } + + before := len(w.subscribers) + delete(w.subscribers, sub) + after := len(w.subscribers) + + if before > after { + sub.canceled.Signal() + + w.Logger.Debug( + "subscription canceled by subscriber", + slog.String("channel_address", fmt.Sprint(sub.Events)), + slog.Int("channel_capacity", cap(sub.Events)), + slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), + slog.Int("subscriber_count", after), + ) + } +} + +// deliverResult is an enumeration of the possible outcomes of delivering an +// event to a subscriber. +type deliverResult int + +const ( + // eventDelivered means that the event was sent to the subscriber's events + // channel, which may or may not be buffered. + eventDelivered deliverResult = iota + + // eventFiltered means that the event was filtered by the subscriber's + // filter function, and did not need to be delivered. + eventFiltered + + // subscriptionCanceled means that an attempt was made to send the event to + // the subscriber's event channel, but the channel buffer was full (or + // unbuffered and not ready to read), and so the subscription was canceled. + subscriptionCanceled +) + +// deliverEventToSubscriber attempts to deliver an event to a subscriber's event +// channel. +func (w *worker) deliverEventToSubscriber(event Event, sub *Subscriber) deliverResult { + if event.Offset > sub.Offset { + panic("event is out of order") + } + + if event.Offset < sub.Offset { + return eventFiltered + } + + if !sub.Filter(event.Envelope) { + sub.Offset++ + return eventFiltered + } + + select { + case sub.Events <- event: + sub.Offset++ + return eventDelivered + + default: + delete(w.subscribers, sub) + sub.canceled.Signal() + + w.Logger.Warn( + "subscription canceled because the subscriber can not keep up with the event stream", + slog.String("channel_address", fmt.Sprint(sub.Events)), + slog.Int("channel_capacity", cap(sub.Events)), + slog.Int("channel_headroom", 0), + slog.Int("subscriber_count", len(w.subscribers)), + slog.Uint64("stream_offset", uint64(event.Offset)), + ) + + return subscriptionCanceled + } +} diff --git a/internal/messaging/request.go b/internal/messaging/request.go new file mode 100644 index 00000000..d5ff6a23 --- /dev/null +++ b/internal/messaging/request.go @@ -0,0 +1,64 @@ +package messaging + +import ( + "context" + "sync" +) + +// Request encapsulates a request. +type Request[Req any] struct { + Context context.Context + Request Req + Error chan<- error +} + +// Ok sends a successful response. +func (e Request[Req]) Ok() { + e.Error <- nil +} + +// Err sends an error response. +func (e Request[Req]) Err(err error) { + e.Error <- err +} + +// RequestQueue is a queue of requests. +type RequestQueue[Req any] struct { + init sync.Once + queue chan Request[Req] +} + +// Recv returns a channel that, when read, dequeues the next request. +func (q *RequestQueue[Req]) Recv() <-chan Request[Req] { + return q.getQueue() +} + +// Send returns a channel that, when written, enqueues an request. +func (q *RequestQueue[Req]) Send() chan<- Request[Req] { + return q.getQueue() +} + +// Do performs a synchronous request. +func (q *RequestQueue[Req]) Do(ctx context.Context, req Req) error { + response := make(chan error, 1) + + select { + case <-ctx.Done(): + return ctx.Err() + case q.Send() <- Request[Req]{ctx, req, response}: + } + + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-response: + return err + } +} + +func (q *RequestQueue[Req]) getQueue() chan Request[Req] { + q.init.Do(func() { + q.queue = make(chan Request[Req]) + }) + return q.queue +} From 24a1a46cd88941f55ea97b251069e8deb6b2875e Mon Sep 17 00:00:00 2001 From: James Harris Date: Tue, 9 Apr 2024 07:22:46 +1000 Subject: [PATCH 2/4] Ensure that the supervisor recovers from optimistic concurrency conflicts. --- go.sum | 74 ++++++ internal/eventstream/append.go | 178 ++++++++++++++ internal/eventstream/append_test.go | 105 ++++++-- .../eventstream/{workercache.go => cache.go} | 17 +- .../eventstream/{workeridle.go => idle.go} | 0 internal/eventstream/reader.go | 179 ++++++++++++++ internal/eventstream/worker.go | 2 +- internal/eventstream/workerappend.go | 226 ------------------ internal/eventstream/workersubscriber.go | 139 ----------- 9 files changed, 529 insertions(+), 391 deletions(-) rename internal/eventstream/{workercache.go => cache.go} (85%) rename internal/eventstream/{workeridle.go => idle.go} (100%) delete mode 100644 internal/eventstream/workerappend.go delete mode 100644 internal/eventstream/workersubscriber.go diff --git a/go.sum b/go.sum index c3575eeb..bf0534d5 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,45 @@ +cloud.google.com/go/compute v1.24.0 h1:phWcR2eWzRJaL/kOiJwfFsPs4BaKq1j6vnpZrc1YlVg= +cloud.google.com/go/compute v1.24.0/go.mod h1:kw1/T+h/+tK2LJK0wiPPx1intgdAM3j/g3hFDlscY40= +cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= +cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= +github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA= +github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= +github.com/aws/aws-sdk-go-v2/config v1.27.10 h1:PS+65jThT0T/snC5WjyfHHyUgG+eBoupSDV+f838cro= +github.com/aws/aws-sdk-go-v2/config v1.27.10/go.mod h1:BePM7Vo4OBpHreKRUMuDXX+/+JWP38FLkzl5m27/Jjs= +github.com/aws/aws-sdk-go-v2/credentials v1.17.10 h1:qDZ3EA2lv1KangvQB6y258OssCHD0xvaGiEDkG4X/10= +github.com/aws/aws-sdk-go-v2/credentials v1.17.10/go.mod h1:6t3sucOaYDwDssHQa0ojH1RpmVmF5/jArkye1b2FKMI= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 h1:FVJ0r5XTHSmIHJV6KuDmdYhEpvlHpiSd38RQWhut5J4= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1/go.mod h1:zusuAeqezXzAB24LGuzuekqMAEgWkVYukBec3kr3jUg= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 h1:aw39xVGeRWlWx9EzGVnhOR4yOjQDHPQ6o6NmBlscyQg= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5/go.mod h1:FSaRudD0dXiMPK2UjknVwwTYyZMRsHv3TtkabsZih5I= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 h1:PG1F3OD1szkuQPzDw3CIQsRIrtTlUC3lP84taWzHlq0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5/go.mod h1:jU1li6RFryMz+so64PpKtudI+QzbKoIEivqdf6LNpOc= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.31.1 h1:dZXY07Dm59TxAjJcUfNMJHLDI/gLMxTRZefn2jFAVsw= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.31.1/go.mod h1:lVLqEtX+ezgtfalyJs7Peb0uv9dEpAQP5yuq2O26R44= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 h1:Ji0DY1xUsUr3I8cHps0G+XM3WWU16lP6yG8qu1GAZAs= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2/go.mod h1:5CsjAbs3NlGQyZNFACh+zztPDI7fU6eW9QsxjfnuBKg= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.6 h1:6tayEze2Y+hiL3kdnEUxSPsP+pJsUfwLSFspFl1ru9Q= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.9.6/go.mod h1:qVNb/9IOVsLCZh0x2lnagrBwQ9fxajUpXS7OZfIsKn0= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 h1:ogRAwT1/gxJBcSWDMZlgyFUM962F51A5CRhDLbxLdmo= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7/go.mod h1:YCsIZhXfRPLFFCl5xxY+1T9RKzOKjCut+28JSX2DnAk= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.4 h1:WzFol5Cd+yDxPAdnzTA5LmpHYSWinhmSj4rQChV0ee8= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.4/go.mod h1:qGzynb/msuZIE8I75DVRCUXw3o3ZyBmUvMwQ2t/BrGM= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 h1:Jux+gDDyi1Lruk+KHF91tK2KCuY61kzoCpvtvJJBtOE= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4/go.mod h1:mUYPBhaF2lGiukDEjJX2BLRRKTmoUSitGDUgM4tRxak= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 h1:cwIxeBttqPN3qkaAjcEcsh8NYr8n2HZPkcKgPAi1phU= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.6/go.mod h1:FZf1/nKNEkHdGGJP/cI2MoIMquumuRK6ol3QQJNDxmw= +github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q= +github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= +github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g= +github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ= +github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM= +github.com/dave/jennifer v1.7.0 h1:uRbSBH9UTS64yXbh4FrMHfgfY762RD+C7bUPKODpSJE= +github.com/dave/jennifer v1.7.0/go.mod h1:nXbxhEmQfOZhWml3D1cDK5M1FLnMSozpbFN/m3RmGZc= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dogmatiq/configkit v0.13.0 h1:pV2Pz0iBUBnRfOm6tbWVRXvuh2bWHBScOh8KfVp/N68= @@ -42,6 +82,12 @@ github.com/dogmatiq/sqltest v0.3.0 h1:DCwyLWfVk/ZHsqq5Itq3H/Lqsh/CIQ6nIRwI4YLywF github.com/dogmatiq/sqltest v0.3.0/go.mod h1:a8Da8NhU4m3lq5Sybhiv+ZQowSnGHWTIJHFNInVtffg= github.com/dogmatiq/testkit v0.13.11 h1:ikXg/Cxq58tzHL27JKCkVqUUElJCHcso7N/ymd3Wins= github.com/dogmatiq/testkit v0.13.11/go.mod h1:GDAEnCkfb8Chmbe+Dc4DfRaaCCV7eqOza61shb1hlE0= +github.com/emicklei/dot v1.6.1 h1:ujpDlBkkwgWUY+qPId5IwapRW/xEoligRSYjioR6DFI= +github.com/emicklei/dot v1.6.1/go.mod h1:DeV7GvQtIw4h2u73RKBkkFdvVAz0D9fzeJrgPW6gy/s= +github.com/envoyproxy/go-control-plane v0.12.0 h1:4X+VP1GHd1Mhj6IB5mMeGbLCleqxjletLK6K0rbxyZI= +github.com/envoyproxy/go-control-plane v0.12.0/go.mod h1:ZBTaoJ23lqITozF0M6G4/IragXCQKCnYbmlmtHvwRG0= +github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= +github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -56,6 +102,8 @@ github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gG github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= +github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68= +github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= @@ -63,6 +111,8 @@ github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrU github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -72,6 +122,7 @@ github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= @@ -91,6 +142,8 @@ github.com/jackc/pgx/v4 v4.18.3 h1:dE2/TrEsGX3RBprb3qryqSV9Y60iZN1C6i8IrmW9/BA= github.com/jackc/pgx/v4 v4.18.3/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= github.com/jmalloc/gomegax v0.0.0-20200507221434-64fca4c0e03a h1:Gk7Gkwl1KUJII/FiAjvBjRgEz/lpvTV8kNYp+9jdpuk= github.com/jmalloc/gomegax v0.0.0-20200507221434-64fca4c0e03a/go.mod h1:TZpc8ObQEKqTuy1/VXpPRfcMU80QFDU4zK3nchXts/k= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -118,6 +171,8 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y github.com/onsi/gomega v1.10.4/go.mod h1:g/HbgYopi++010VEqkFgJHKC09uJiW9UkXvMUuKHUCQ= github.com/onsi/gomega v1.32.0 h1:JRYU78fJ1LPxlckP6Txi/EYqJvjtMrDC04/MM5XRHPk= github.com/onsi/gomega v1.32.0/go.mod h1:a4x4gW6Pz2yK1MAmvluYme5lvYTn61afQ2ETw/8n4Lg= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= @@ -129,6 +184,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= +go.etcd.io/bbolt v1.3.7 h1:j+zJOnnEjF/kyHlDDgGnVL/AIqIJPq8UoB2GSNfkUfQ= +go.etcd.io/bbolt v1.3.7/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw= go.opentelemetry.io/otel v1.25.0 h1:gldB5FfhRl7OJQbUHt/8s0a7cE8fbsPAtdpRaApKy4k= go.opentelemetry.io/otel v1.25.0/go.mod h1:Wa2ds5NOXEMkCmUou1WA7ZBfLTHWIsp034OVD7AO+Vg= go.opentelemetry.io/otel/metric v1.25.0 h1:LUKbS7ArpFL/I2jJHdJcqMGxkRdxpPHE0VU/D4NuEwA= @@ -143,12 +200,16 @@ golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/exp v0.0.0-20230810033253-352e893a4cad h1:g0bG7Z4uG+OgH2QDODnjp6ggkk1bJDsINcuWmJN1iJU= golang.org/x/exp v0.0.0-20230810033253-352e893a4cad/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= +golang.org/x/mod v0.16.0 h1:QX4fJ0Rr5cPQCF7O9lh9Se4pmwfwskqZfq5moyldzic= +golang.org/x/mod v0.16.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/oauth2 v0.17.0 h1:6m3ZPmLEFdVxKKWnKq4VqZ60gutO35zm+zrAHVmHyDQ= +golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= @@ -163,6 +224,8 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= +golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -172,6 +235,14 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw= golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= +google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= +google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de h1:F6qOa9AZTYJXOUEr4jDysRDLrm4PHePlge4v4TGAlxY= +google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:VUhTRKeHn9wwcdrk73nvdC9gF178Tzhmt/qyaFcPLSo= +google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de h1:jFNzHPIeuzhdRwVhbZdiym9q0ory/xY3sA+v2wPg8I0= +google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:5iCWqnniDlqZHrd3neWVTOwvh/v6s3232omMecelax8= google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de h1:cZGRis4/ot9uVm639a+rHCUaG0JJHEsdyzSQTMX+suY= google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:H4O17MA/PE9BsGx3w+a+W2VOLLD1Qf7oJneAoU6WktY= google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= @@ -187,12 +258,15 @@ google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHh gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= pgregory.net/rapid v1.1.0 h1:CMa0sjHSru3puNx+J0MIAuiiEV4N0qj8/cMWGBBCsjw= diff --git a/internal/eventstream/append.go b/internal/eventstream/append.go index 9344f0ca..045b7b8c 100644 --- a/internal/eventstream/append.go +++ b/internal/eventstream/append.go @@ -1,8 +1,13 @@ package eventstream import ( + "context" + "log/slog" + "github.com/dogmatiq/enginekit/protobuf/envelopepb" "github.com/dogmatiq/enginekit/protobuf/uuidpb" + "github.com/dogmatiq/persistencekit/journal" + "github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal" ) // AppendRequest is a request to append events to an event stream. @@ -31,3 +36,176 @@ type AppendResponse struct { // [AppendRequest] and hence deduplicated. AppendedByPriorAttempt bool } + +func (w *worker) handleAppend( + ctx context.Context, + req AppendRequest, +) (AppendResponse, error) { + if !req.StreamID.Equal(w.StreamID) { + panic("received request for a different stream ID") + } + + if len(req.Events) == 0 { + // We panic rather than just failing the exchange because we never want + // empty requests to occupy space in the worker's queue. The sender + // should simply not send empty requests. + panic("received append request with no events") + } + + defer w.resetIdleTimer() + + if req.LowestPossibleOffset > w.nextOffset { + if err := w.catchUpWithJournal(ctx); err != nil { + return AppendResponse{}, err + } + } + + for { + res, err := w.findPriorAppend(ctx, req) + if err != nil { + return AppendResponse{}, err + } + + if res.AppendedByPriorAttempt { + for index, event := range req.Events { + w.Logger.Info( + "discarded duplicate event", + slog.Uint64("stream_offset", uint64(res.BeginOffset)+uint64(index)), + slog.String("message_id", event.MessageId.AsString()), + slog.String("description", event.Description), + ) + } + return res, nil + } + + res, err = w.writeEventsToJournal(ctx, req) + if err == nil { + w.publishEvents(res.BeginOffset, req.Events) + return res, nil + } + + if err != journal.ErrConflict { + return AppendResponse{}, err + } + + if err := w.catchUpWithJournal(ctx); err != nil { + return AppendResponse{}, err + } + } +} + +// findPriorAppend returns an [AppendResponse] if the given [AppendRequest] has +// already been handled. +func (w *worker) findPriorAppend( + ctx context.Context, + req AppendRequest, +) (AppendResponse, error) { + // If the lowest possible offset is ahead of the next offset the request is + // malformed. Either theres a bug in Veracity, or the journal has suffered + // catastrophic data loss. + if req.LowestPossibleOffset > w.nextOffset { + panic("lowest possible offset is greater than the next offset") + } + + // If the lowest possible offset is equal to the next offset, no events + // have been recorded since the the request was created, and hence there + // can be no prior append attempt. + if req.LowestPossibleOffset == w.nextOffset { + return AppendResponse{}, nil + } + + // If the lowest possible offset is in the cache, we can check for + // duplicates without using the journal. We search using the last event in + // the request as it's the most likely to still be in the cache. + lowestPossibleOffset := req.LowestPossibleOffset + Offset(len(req.Events)) + + if cacheIndex := w.findInCache(lowestPossibleOffset); cacheIndex != -1 { + lastMessageIndex := len(req.Events) - 1 + lastMessageID := req.Events[lastMessageIndex].MessageId + + for _, event := range w.recentEvents[cacheIndex:] { + if event.Envelope.MessageId.Equal(lastMessageID) { + return AppendResponse{ + // We know the offset of the last message in the request, so + // we can compute the offset of the first message, even if + // it's no longer in the cache. + BeginOffset: event.Offset - Offset(lastMessageIndex), + EndOffset: event.Offset + 1, + AppendedByPriorAttempt: true, + }, nil + } + } + } + + // Finally, we search the journal for the record containing the events. + rec, err := journal.ScanFromSearchResult( + ctx, + w.Journal, + 0, + w.nextPos, + eventstreamjournal.SearchByOffset(uint64(req.LowestPossibleOffset)), + func( + _ context.Context, + _ journal.Position, + rec *eventstreamjournal.Record, + ) (*eventstreamjournal.Record, bool, error) { + if op := rec.GetEventsAppended(); op != nil { + targetID := req.Events[0].MessageId + candidateID := op.Events[0].MessageId + return rec, candidateID.Equal(targetID), nil + } + return nil, false, nil + }, + ) + if err != nil { + return AppendResponse{}, journal.IgnoreNotFound(err) + } + + return AppendResponse{ + BeginOffset: Offset(rec.StreamOffsetBefore), + EndOffset: Offset(rec.StreamOffsetAfter), + AppendedByPriorAttempt: true, + }, nil +} + +func (w *worker) writeEventsToJournal( + ctx context.Context, + req AppendRequest, +) (AppendResponse, error) { + before := w.nextOffset + after := w.nextOffset + Offset(len(req.Events)) + + if err := w.Journal.Append( + ctx, + w.nextPos, + eventstreamjournal. + NewRecordBuilder(). + WithStreamOffsetBefore(uint64(before)). + WithStreamOffsetAfter(uint64(after)). + WithEventsAppended(&eventstreamjournal.EventsAppended{ + Events: req.Events, + }). + Build(), + ); err != nil { + return AppendResponse{}, err + } + + for index, event := range req.Events { + w.Logger.Info( + "appended event to the stream", + slog.Uint64("journal_position", uint64(w.nextPos)), + slog.Uint64("stream_offset", uint64(before)+uint64(index)), + slog.String("message_id", event.MessageId.AsString()), + slog.String("description", event.Description), + ) + } + + w.nextPos++ + w.nextOffset = after + + return AppendResponse{ + BeginOffset: before, + EndOffset: after, + AppendedByPriorAttempt: false, + }, nil +} diff --git a/internal/eventstream/append_test.go b/internal/eventstream/append_test.go index 4a791440..0e19c28e 100644 --- a/internal/eventstream/append_test.go +++ b/internal/eventstream/append_test.go @@ -26,6 +26,7 @@ func TestAppend(t *testing.T) { Journals *memoryjournal.BinaryStore Supervisor *Supervisor Packer *envelope.Packer + Barrier chan struct{} } setup := func(t test.TestingT) (deps dependencies) { @@ -41,6 +42,8 @@ func TestAppend(t *testing.T) { Marshaler: Marshaler, } + deps.Barrier = make(chan struct{}) + return deps } @@ -51,26 +54,26 @@ func TestAppend(t *testing.T) { cases := []struct { Desc string - InduceFailure func(*dependencies) + InduceFailure func(context.Context, *testing.T, *dependencies) }{ { Desc: "no faults", - InduceFailure: func(*dependencies) { - }, }, { Desc: "failure to open journal", - InduceFailure: func(deps *dependencies) { + InduceFailure: func(_ context.Context, t *testing.T, deps *dependencies) { test.FailOnJournalOpen( deps.Journals, eventstreamjournal.Name(streamID), errors.New(""), ) + t.Log("configured journal store to fail when opening the journal") + close(deps.Barrier) }, }, { Desc: "failure before appending to journal", - InduceFailure: func(deps *dependencies) { + InduceFailure: func(_ context.Context, t *testing.T, deps *dependencies) { test.FailBeforeJournalAppend( deps.Journals, eventstreamjournal.Name(streamID), @@ -79,11 +82,13 @@ func TestAppend(t *testing.T) { }, errors.New(""), ) + t.Log("configured journal store to fail before appending a record") + close(deps.Barrier) }, }, { Desc: "failure after appending to journal", - InduceFailure: func(deps *dependencies) { + InduceFailure: func(_ context.Context, t *testing.T, deps *dependencies) { test.FailAfterJournalAppend( deps.Journals, eventstreamjournal.Name(streamID), @@ -92,6 +97,56 @@ func TestAppend(t *testing.T) { }, errors.New(""), ) + t.Log("configured journal store to fail after appending a record") + close(deps.Barrier) + }, + }, + { + Desc: "optimistic concurrency conflict", + InduceFailure: func(ctx context.Context, t *testing.T, deps *dependencies) { + go func() { + if _, err := deps.Supervisor.AppendQueue.Do( + ctx, + AppendRequest{ + StreamID: streamID, + Events: []*envelopepb.Envelope{ + deps.Packer.Pack(MessageX1), + }, + }, + ); err != nil { + t.Error(err) + return + } + + t.Log("confirmed that the supervisor-under-test is running") + + s := &Supervisor{ + Journals: deps.Journals, + Logger: spruce.NewLogger(t), + } + + defer test. + RunInBackground(t, "conflict-generating-supervisor", s.Run). + UntilStopped(). + Stop() + + if _, err := s.AppendQueue.Do( + ctx, + AppendRequest{ + StreamID: streamID, + Events: []*envelopepb.Envelope{ + deps.Packer.Pack(MessageX2), + }, + }, + ); err != nil { + t.Error(err) + return + } + + t.Log("appended events using a different supervisor to induce a journal conflict") + + close(deps.Barrier) + }() }, }, } @@ -101,10 +156,10 @@ func TestAppend(t *testing.T) { tctx := test.WithContext(t) deps := setup(tctx) - t.Log("append some initial events to the stream") + t.Log("seeding the event stream with some initial events") supervisor := test. - RunInBackground(t, "supervisor", deps.Supervisor.Run). + RunInBackground(t, "event-seeding-supervisor", deps.Supervisor.Run). UntilStopped() res, err := deps.Supervisor.AppendQueue.Do( @@ -124,14 +179,35 @@ func TestAppend(t *testing.T) { supervisor.StopAndWait() - t.Log("induce a failure") + // Open a journal that was can use for verifying results + // _before_ inducing any failure. + j, err := eventstreamjournal.Open(tctx, deps.Journals, streamID) + if err != nil { + t.Fatal(err) + } + defer j.Close() - c.InduceFailure(&deps) + if c.InduceFailure != nil { + c.InduceFailure(tctx, t, &deps) + } else { + close(deps.Barrier) + } supervisor = test. - RunInBackground(t, "supervisor", deps.Supervisor.Run). + RunInBackground(t, "supervisor-under-test", deps.Supervisor.Run). RepeatedlyUntilStopped() + <-deps.Barrier + + // Read the journal bounds as they exist before the test + // commences. + begin, end, err := j.Bounds(tctx) + if err != nil { + t.Fatal(err) + } + + t.Logf("proceeding with test, journal bounds are [%d, %d)", begin, end) + event := deps.Packer.Pack(MessageE1) req := AppendRequest{ @@ -159,16 +235,11 @@ func TestAppend(t *testing.T) { t.Log("ensure that the event was appended to the stream exactly once") - j, err := eventstreamjournal.Open(tctx, deps.Journals, streamID) - if err != nil { - t.Fatal(err) - } - var events []*envelopepb.Envelope if err := j.Range( tctx, - 1, + end, // only read the records appended during the test func( ctx context.Context, _ journal.Position, diff --git a/internal/eventstream/workercache.go b/internal/eventstream/cache.go similarity index 85% rename from internal/eventstream/workercache.go rename to internal/eventstream/cache.go index 072ecb5e..a3064c2d 100644 --- a/internal/eventstream/workercache.go +++ b/internal/eventstream/cache.go @@ -30,7 +30,7 @@ func (w *worker) findInCache(offset Offset) int { // growCache grows the cache capacity to fit an additional n events. It removes // old events if necessary. // -// It returns the number of events that may be added to the cache. +// It returns the difference between n and the actual capacity of the cache. func (w *worker) growCache(n int) int { begin := 0 end := len(w.recentEvents) @@ -39,10 +39,12 @@ func (w *worker) growCache(n int) int { panic("cache is over capacity, always use appendToCache() to add events") } - if n >= maxCacheCapacity { + capacity := n + + if capacity >= maxCacheCapacity { // We've requested the entire cache, so just clear it entirely. end = 0 - n = maxCacheCapacity + capacity = maxCacheCapacity } else { // Otherwise, first remove any events that are older than the cache TTL. for index, event := range w.recentEvents[begin:end] { @@ -55,9 +57,8 @@ func (w *worker) growCache(n int) int { } // Then, if we still don't have enough space, remove the oldest events. - capacity := end - begin + n - if capacity > maxCacheCapacity { - begin += capacity - maxCacheCapacity + if c := end - begin + capacity; c > maxCacheCapacity { + begin += c - maxCacheCapacity } } @@ -66,9 +67,9 @@ func (w *worker) growCache(n int) int { copy(w.recentEvents, w.recentEvents[begin:end]) w.recentEvents = w.recentEvents[:end-begin] - w.recentEvents = slices.Grow(w.recentEvents, n) + w.recentEvents = slices.Grow(w.recentEvents, capacity) - return n + return n - capacity } // appendEventToCache appends the given event to the cache of recent events. diff --git a/internal/eventstream/workeridle.go b/internal/eventstream/idle.go similarity index 100% rename from internal/eventstream/workeridle.go rename to internal/eventstream/idle.go diff --git a/internal/eventstream/reader.go b/internal/eventstream/reader.go index 2d04baa2..17f3873a 100644 --- a/internal/eventstream/reader.go +++ b/internal/eventstream/reader.go @@ -3,6 +3,7 @@ package eventstream import ( "context" "fmt" + "log/slog" "time" "github.com/dogmatiq/enginekit/protobuf/envelopepb" @@ -169,3 +170,181 @@ func (r *Reader) unsubscribe(ctx context.Context, sub *Subscriber) error { return r.UnsubscribeQueue.Do(ctx, sub) } + +// handleSubscribe adds sub to the subscriber list. +// +// It delivers any cached events that the subscriber has not yet seen. If the +// subscriber's requested event is older than the events in the cache the +// subscription is canceled immediately. +func (w *worker) handleSubscribe(sub *Subscriber) { + if !sub.StreamID.Equal(w.StreamID) { + panic("received request for a different stream ID") + } + + if w.subscribers == nil { + w.subscribers = map[*Subscriber]struct{}{} + } + w.subscribers[sub] = struct{}{} + + w.Logger.Debug( + "subscription activated", + slog.String("channel_address", fmt.Sprint(sub.Events)), + slog.Int("channel_capacity", cap(sub.Events)), + slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), + slog.Int("subscriber_count", len(w.subscribers)), + slog.Uint64("requested_stream_offset", uint64(sub.Offset)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + ) + + if sub.Offset >= w.nextOffset { + return + } + + index := w.findInCache(sub.Offset) + + if index == -1 { + sub.canceled.Signal() + w.Logger.Warn( + "subscription canceled immediately due request for historical events", + slog.String("channel_address", fmt.Sprint(sub.Events)), + slog.Int("channel_capacity", cap(sub.Events)), + slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), + slog.Int("subscriber_count", len(w.subscribers)), + slog.Int("cached_event_count", len(w.recentEvents)), + slog.Uint64("requested_stream_offset", uint64(sub.Offset)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + ) + return + } + + for _, event := range w.recentEvents[index:] { + if w.deliverEventToSubscriber(event, sub) == subscriptionCanceled { + return + } + } +} + +// handleUnsubscribe removes sub from the subscriber list. +func (w *worker) handleUnsubscribe(sub *Subscriber) { + if !sub.StreamID.Equal(w.StreamID) { + panic("received request for a different stream ID") + } + + before := len(w.subscribers) + delete(w.subscribers, sub) + after := len(w.subscribers) + + if before > after { + sub.canceled.Signal() + + w.Logger.Debug( + "subscription canceled by subscriber", + slog.String("channel_address", fmt.Sprint(sub.Events)), + slog.Int("channel_capacity", cap(sub.Events)), + slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), + slog.Int("subscriber_count", after), + ) + } +} + +// deliverResult is an enumeration of the possible outcomes of delivering an +// event to a subscriber. +type deliverResult int + +const ( + // eventDelivered means that the event was sent to the subscriber's events + // channel, which may or may not be buffered. + eventDelivered deliverResult = iota + + // eventFiltered means that the event was filtered by the subscriber's + // filter function, and did not need to be delivered. + eventFiltered + + // subscriptionCanceled means that an attempt was made to send the event to + // the subscriber's event channel, but the channel buffer was full (or + // unbuffered and not ready to read), and so the subscription was canceled. + subscriptionCanceled +) + +// deliverEventToSubscriber attempts to deliver an event to a subscriber's event +// channel. +func (w *worker) deliverEventToSubscriber(event Event, sub *Subscriber) deliverResult { + if event.Offset > sub.Offset { + panic("event is out of order") + } + + if event.Offset < sub.Offset { + return eventFiltered + } + + if !sub.Filter(event.Envelope) { + sub.Offset++ + return eventFiltered + } + + select { + case sub.Events <- event: + sub.Offset++ + return eventDelivered + + default: + delete(w.subscribers, sub) + sub.canceled.Signal() + + w.Logger.Warn( + "subscription canceled because the subscriber can not keep up with the event stream", + slog.String("channel_address", fmt.Sprint(sub.Events)), + slog.Int("channel_capacity", cap(sub.Events)), + slog.Int("channel_headroom", 0), + slog.Int("subscriber_count", len(w.subscribers)), + slog.Uint64("stream_offset", uint64(event.Offset)), + ) + + return subscriptionCanceled + } +} + +// publishEvents publishes the events to both the recent event cache and any +// interested subscribers. +func (w *worker) publishEvents( + offset Offset, + events []*envelopepb.Envelope, +) { + skip := w.growCache(len(events)) + + for i, env := range events { + event := Event{w.StreamID, offset, env} + offset++ + + if i >= skip { + w.appendEventToCache(event) + } + + if len(w.subscribers) == 0 { + continue + } + + var delivered, filtered, canceled int + + for sub := range w.subscribers { + switch w.deliverEventToSubscriber(event, sub) { + case eventDelivered: + delivered++ + case eventFiltered: + filtered++ + case subscriptionCanceled: + canceled++ + } + } + + w.Logger.Debug( + "event published to subscribers", + slog.Uint64("stream_offset", uint64(event.Offset)), + slog.String("message_id", env.MessageId.AsString()), + slog.String("description", env.Description), + slog.Int("delivered_count", delivered), + slog.Int("filtered_count", filtered), + slog.Int("canceled_count", canceled), + ) + } +} diff --git a/internal/eventstream/worker.go b/internal/eventstream/worker.go index b8c51f60..4d6c66c6 100644 --- a/internal/eventstream/worker.go +++ b/internal/eventstream/worker.go @@ -48,7 +48,7 @@ type worker struct { // shutdown by the supervisor, or the idle timeout expires. func (w *worker) Run(ctx context.Context) (err error) { defer func() { - if err != nil { + if err != nil && err != context.Canceled { w.Logger.Debug( "event stream worker stopped due to an error", slog.String("error", err.Error()), diff --git a/internal/eventstream/workerappend.go b/internal/eventstream/workerappend.go deleted file mode 100644 index ad0d4158..00000000 --- a/internal/eventstream/workerappend.go +++ /dev/null @@ -1,226 +0,0 @@ -package eventstream - -import ( - "context" - "log/slog" - - "github.com/dogmatiq/enginekit/protobuf/envelopepb" - "github.com/dogmatiq/persistencekit/journal" - "github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal" -) - -func (w *worker) handleAppend( - ctx context.Context, - req AppendRequest, -) (AppendResponse, error) { - if !req.StreamID.Equal(w.StreamID) { - panic("received request for a different stream ID") - } - - if len(req.Events) == 0 { - // We panic rather than just failing the exchange because we never want - // empty requests to occupy space in the worker's queue. The sender - // should simply not send empty requests. - panic("received append request with no events") - } - - defer w.resetIdleTimer() - - if req.LowestPossibleOffset > w.nextOffset { - if err := w.catchUpWithJournal(ctx); err != nil { - return AppendResponse{}, err - } - } - - for { - res, err := w.findPriorAppend(ctx, req) - if err != nil { - return AppendResponse{}, err - } - - if res.AppendedByPriorAttempt { - for index, event := range req.Events { - w.Logger.Info( - "discarded duplicate event", - slog.Uint64("stream_offset", uint64(res.BeginOffset)+uint64(index)), - slog.String("message_id", event.MessageId.AsString()), - slog.String("description", event.Description), - ) - } - return res, nil - } - - res, err = w.writeEventsToJournal(ctx, req) - if err == nil { - w.publishEvents(res.BeginOffset, req.Events) - return res, nil - } - - if err != journal.ErrConflict { - return AppendResponse{}, err - } - - if err := w.catchUpWithJournal(ctx); err != nil { - return AppendResponse{}, err - } - } -} - -// findPriorAppend returns an [AppendResponse] if the given [AppendRequest] has -// already been handled. -func (w *worker) findPriorAppend( - ctx context.Context, - req AppendRequest, -) (AppendResponse, error) { - // If the lowest possible offset is ahead of the next offset the request is - // malformed. Either theres a bug in Veracity, or the journal has suffered - // catastrophic data loss. - if req.LowestPossibleOffset > w.nextOffset { - panic("lowest possible offset is greater than the next offset") - } - - // If the lowest possible offset is equal to the next offset, no events - // have been recorded since the the request was created, and hence there - // can be no prior append attempt. - if req.LowestPossibleOffset == w.nextOffset { - return AppendResponse{}, nil - } - - // If the lowest possible offset is in the cache, we can check for - // duplicates without using the journal. We search using the last event in - // the request as it's the most likely to still be in the cache. - lowestPossibleOffset := req.LowestPossibleOffset + Offset(len(req.Events)) - - if cacheIndex := w.findInCache(lowestPossibleOffset); cacheIndex != -1 { - lastMessageIndex := len(req.Events) - 1 - lastMessageID := req.Events[lastMessageIndex].MessageId - - for _, event := range w.recentEvents[cacheIndex:] { - if event.Envelope.MessageId.Equal(lastMessageID) { - return AppendResponse{ - // We know the offset of the last message in the request, so - // we can compute the offset of the first message, even if - // it's no longer in the cache. - BeginOffset: event.Offset - Offset(lastMessageIndex), - EndOffset: event.Offset + 1, - AppendedByPriorAttempt: true, - }, nil - } - } - } - - // Finally, we search the journal for the record containing the events. - rec, err := journal.ScanFromSearchResult( - ctx, - w.Journal, - 0, - w.nextPos, - eventstreamjournal.SearchByOffset(uint64(req.LowestPossibleOffset)), - func( - _ context.Context, - _ journal.Position, - rec *eventstreamjournal.Record, - ) (*eventstreamjournal.Record, bool, error) { - if op := rec.GetEventsAppended(); op != nil { - targetID := req.Events[0].MessageId - candidateID := op.Events[0].MessageId - return rec, candidateID.Equal(targetID), nil - } - return nil, false, nil - }, - ) - if err != nil { - return AppendResponse{}, journal.IgnoreNotFound(err) - } - - return AppendResponse{ - BeginOffset: Offset(rec.StreamOffsetBefore), - EndOffset: Offset(rec.StreamOffsetAfter), - AppendedByPriorAttempt: true, - }, nil -} - -func (w *worker) writeEventsToJournal( - ctx context.Context, - req AppendRequest, -) (AppendResponse, error) { - before := w.nextOffset - after := w.nextOffset + Offset(len(req.Events)) - - if err := w.Journal.Append( - ctx, - w.nextPos, - eventstreamjournal. - NewRecordBuilder(). - WithStreamOffsetBefore(uint64(before)). - WithStreamOffsetAfter(uint64(after)). - WithEventsAppended(&eventstreamjournal.EventsAppended{ - Events: req.Events, - }). - Build(), - ); err != nil { - return AppendResponse{}, err - } - - for index, event := range req.Events { - w.Logger.Info( - "appended event to the stream", - slog.Uint64("journal_position", uint64(w.nextPos)), - slog.Uint64("stream_offset", uint64(before)+uint64(index)), - slog.String("message_id", event.MessageId.AsString()), - slog.String("description", event.Description), - ) - } - - w.nextPos++ - w.nextOffset = after - - return AppendResponse{ - BeginOffset: before, - EndOffset: after, - AppendedByPriorAttempt: false, - }, nil -} - -// publishEvents publishes the events to both the recent event cache and any -// interested subscribers. -func (w *worker) publishEvents( - offset Offset, - events []*envelopepb.Envelope, -) { - w.growCache(len(events)) - - for _, env := range events { - event := Event{w.StreamID, offset, env} - offset++ - - w.appendEventToCache(event) - - if len(w.subscribers) == 0 { - continue - } - - var delivered, filtered, canceled int - - for sub := range w.subscribers { - switch w.deliverEventToSubscriber(event, sub) { - case eventDelivered: - delivered++ - case eventFiltered: - filtered++ - case subscriptionCanceled: - canceled++ - } - } - - w.Logger.Debug( - "event published to subscribers", - slog.Uint64("stream_offset", uint64(event.Offset)), - slog.String("message_id", env.MessageId.AsString()), - slog.String("description", env.Description), - slog.Int("delivered_count", delivered), - slog.Int("filtered_count", filtered), - slog.Int("canceled_count", canceled), - ) - } -} diff --git a/internal/eventstream/workersubscriber.go b/internal/eventstream/workersubscriber.go deleted file mode 100644 index eb8a2325..00000000 --- a/internal/eventstream/workersubscriber.go +++ /dev/null @@ -1,139 +0,0 @@ -package eventstream - -import ( - "fmt" - "log/slog" -) - -// handleSubscribe adds sub to the subscriber list. -// -// It delivers any cached events that the subscriber has not yet seen. If the -// subscriber's requested event is older than the events in the cache the -// subscription is canceled immediately. -func (w *worker) handleSubscribe(sub *Subscriber) { - if !sub.StreamID.Equal(w.StreamID) { - panic("received request for a different stream ID") - } - - if w.subscribers == nil { - w.subscribers = map[*Subscriber]struct{}{} - } - w.subscribers[sub] = struct{}{} - - w.Logger.Debug( - "subscription activated", - slog.String("channel_address", fmt.Sprint(sub.Events)), - slog.Int("channel_capacity", cap(sub.Events)), - slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), - slog.Int("subscriber_count", len(w.subscribers)), - slog.Uint64("requested_stream_offset", uint64(sub.Offset)), - slog.Uint64("next_stream_offset", uint64(w.nextOffset)), - ) - - if sub.Offset >= w.nextOffset { - return - } - - index := w.findInCache(sub.Offset) - - if index == -1 { - sub.canceled.Signal() - w.Logger.Warn( - "subscription canceled immediately due request for historical events", - slog.String("channel_address", fmt.Sprint(sub.Events)), - slog.Int("channel_capacity", cap(sub.Events)), - slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), - slog.Int("subscriber_count", len(w.subscribers)), - slog.Int("cached_event_count", len(w.recentEvents)), - slog.Uint64("requested_stream_offset", uint64(sub.Offset)), - slog.Uint64("next_stream_offset", uint64(w.nextOffset)), - ) - return - } - - for _, event := range w.recentEvents[index:] { - if w.deliverEventToSubscriber(event, sub) == subscriptionCanceled { - return - } - } -} - -// handleUnsubscribe removes sub from the subscriber list. -func (w *worker) handleUnsubscribe(sub *Subscriber) { - if !sub.StreamID.Equal(w.StreamID) { - panic("received request for a different stream ID") - } - - before := len(w.subscribers) - delete(w.subscribers, sub) - after := len(w.subscribers) - - if before > after { - sub.canceled.Signal() - - w.Logger.Debug( - "subscription canceled by subscriber", - slog.String("channel_address", fmt.Sprint(sub.Events)), - slog.Int("channel_capacity", cap(sub.Events)), - slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), - slog.Int("subscriber_count", after), - ) - } -} - -// deliverResult is an enumeration of the possible outcomes of delivering an -// event to a subscriber. -type deliverResult int - -const ( - // eventDelivered means that the event was sent to the subscriber's events - // channel, which may or may not be buffered. - eventDelivered deliverResult = iota - - // eventFiltered means that the event was filtered by the subscriber's - // filter function, and did not need to be delivered. - eventFiltered - - // subscriptionCanceled means that an attempt was made to send the event to - // the subscriber's event channel, but the channel buffer was full (or - // unbuffered and not ready to read), and so the subscription was canceled. - subscriptionCanceled -) - -// deliverEventToSubscriber attempts to deliver an event to a subscriber's event -// channel. -func (w *worker) deliverEventToSubscriber(event Event, sub *Subscriber) deliverResult { - if event.Offset > sub.Offset { - panic("event is out of order") - } - - if event.Offset < sub.Offset { - return eventFiltered - } - - if !sub.Filter(event.Envelope) { - sub.Offset++ - return eventFiltered - } - - select { - case sub.Events <- event: - sub.Offset++ - return eventDelivered - - default: - delete(w.subscribers, sub) - sub.canceled.Signal() - - w.Logger.Warn( - "subscription canceled because the subscriber can not keep up with the event stream", - slog.String("channel_address", fmt.Sprint(sub.Events)), - slog.Int("channel_capacity", cap(sub.Events)), - slog.Int("channel_headroom", 0), - slog.Int("subscriber_count", len(w.subscribers)), - slog.Uint64("stream_offset", uint64(event.Offset)), - ) - - return subscriptionCanceled - } -} From cbd3d60a255d4fbd2113132135eac77a5617937a Mon Sep 17 00:00:00 2001 From: James Harris Date: Thu, 11 Apr 2024 07:52:18 +1000 Subject: [PATCH 3/4] Use `errors.Is()` to check for context cancelation. --- internal/test/run.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/test/run.go b/internal/test/run.go index 5a4ffaff..1eac0e96 100644 --- a/internal/test/run.go +++ b/internal/test/run.go @@ -216,7 +216,7 @@ func (r TaskRunner) run( task.err = fn(ctx) - if task.err == context.Canceled && ctx.Err() == context.Canceled { + if errors.Is(task.err, context.Canceled) && ctx.Err() == context.Canceled { task.err = context.Cause(ctx) } From 2f807b3a5018d6d2779bea4f1b8a5ed0c09d5250 Mon Sep 17 00:00:00 2001 From: James Harris Date: Tue, 16 Jul 2024 07:52:51 +1000 Subject: [PATCH 4/4] WIP [ci skip] --- go.mod | 1 - go.sum | 4 +- internal/eventstream/append.go | 55 ++-- internal/eventstream/append_test.go | 8 +- internal/eventstream/idle.go | 25 +- internal/eventstream/reader.go | 474 ++++++++++++---------------- internal/eventstream/reader_test.go | 390 +++++++++++++++++++++++ internal/eventstream/subscriber.go | 251 +++++++++++++++ internal/eventstream/supervisor.go | 20 +- internal/eventstream/worker.go | 44 ++- internal/messaging/request.go | 64 ---- 11 files changed, 929 insertions(+), 407 deletions(-) create mode 100644 internal/eventstream/reader_test.go create mode 100644 internal/eventstream/subscriber.go delete mode 100644 internal/messaging/request.go diff --git a/go.mod b/go.mod index 87901a36..df6c58a8 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,6 @@ require ( require ( github.com/dogmatiq/cosyne v0.2.0 // indirect - github.com/dogmatiq/dapper v0.5.3 // indirect github.com/dogmatiq/dyad v1.0.0 // indirect github.com/dogmatiq/iago v0.4.0 // indirect github.com/dogmatiq/interopspec v0.5.3 // indirect diff --git a/go.sum b/go.sum index bf0534d5..63a0eb8d 100644 --- a/go.sum +++ b/go.sum @@ -46,8 +46,8 @@ github.com/dogmatiq/configkit v0.13.0 h1:pV2Pz0iBUBnRfOm6tbWVRXvuh2bWHBScOh8KfVp github.com/dogmatiq/configkit v0.13.0/go.mod h1:9Sx3e0G9o/wPvRfhpKcS7+3bhYHmOyRBqKdXRZdDx7M= github.com/dogmatiq/cosyne v0.2.0 h1:tO957BpS4I9kqSw31ds6Ef4CXvV8zPAqWzbXKElsGWg= github.com/dogmatiq/cosyne v0.2.0/go.mod h1:dD8EZjbRX7FFw9t6P7l1nwoZbA7YxtOCfl9ZZAHPucU= -github.com/dogmatiq/dapper v0.5.3 h1:DZkitO0TiokaiZt+9J7UNnagW2ezSYmJUlDTXLWGf8g= -github.com/dogmatiq/dapper v0.5.3/go.mod h1:lrBXvNri2wXkk1T0muaTUqd5lVDwIBRKeOzVRU46XI0= +github.com/dogmatiq/dapper v0.5.2 h1:/pjwTEa/tLosrxuahsGa/LzOcbpnXZE+sQNv8YSr7ZI= +github.com/dogmatiq/dapper v0.5.2/go.mod h1:lrBXvNri2wXkk1T0muaTUqd5lVDwIBRKeOzVRU46XI0= github.com/dogmatiq/discoverkit v0.1.2 h1:NFgFe151bINH3/mNrIS6w0fiEWToSVwIHrjCDiEHw/Y= github.com/dogmatiq/discoverkit v0.1.2/go.mod h1:mUFlbosF4i5papOkUa+OfTLv09AU/1cAU7GvN0Qd+VI= github.com/dogmatiq/dogma v0.13.0 h1:MKk9MHErGKD53Y+43I4fcoPZMQjX0N2DUZEc4rLp+Hk= diff --git a/internal/eventstream/append.go b/internal/eventstream/append.go index 045b7b8c..c329fb25 100644 --- a/internal/eventstream/append.go +++ b/internal/eventstream/append.go @@ -52,6 +52,8 @@ func (w *worker) handleAppend( panic("received append request with no events") } + // Reset the idle timer _after_ whatever work is done so it's duration is + // not "eaten up" by the work. defer w.resetIdleTimer() if req.LowestPossibleOffset > w.nextOffset { @@ -78,9 +80,9 @@ func (w *worker) handleAppend( return res, nil } - res, err = w.writeEventsToJournal(ctx, req) + pos, rec, err := w.writeEventsToJournal(ctx, req) if err == nil { - w.publishEvents(res.BeginOffset, req.Events) + w.publishEvents(pos, rec) return res, nil } @@ -171,41 +173,32 @@ func (w *worker) findPriorAppend( func (w *worker) writeEventsToJournal( ctx context.Context, req AppendRequest, -) (AppendResponse, error) { - before := w.nextOffset - after := w.nextOffset + Offset(len(req.Events)) - - if err := w.Journal.Append( - ctx, - w.nextPos, - eventstreamjournal. - NewRecordBuilder(). - WithStreamOffsetBefore(uint64(before)). - WithStreamOffsetAfter(uint64(after)). - WithEventsAppended(&eventstreamjournal.EventsAppended{ - Events: req.Events, - }). - Build(), - ); err != nil { - return AppendResponse{}, err +) (journal.Position, *eventstreamjournal.Record, error) { + pos := w.nextPos + rec := eventstreamjournal. + NewRecordBuilder(). + WithStreamOffsetBefore(uint64(w.nextOffset)). + WithStreamOffsetAfter(uint64(w.nextOffset) + uint64(len(req.Events))). + WithEventsAppended(&eventstreamjournal.EventsAppended{Events: req.Events}). + Build() + + if err := w.Journal.Append(ctx, pos, rec); err != nil { + return 0, nil, err } - for index, event := range req.Events { + w.nextPos++ + + for _, event := range req.Events { w.Logger.Info( - "appended event to the stream", - slog.Uint64("journal_position", uint64(w.nextPos)), - slog.Uint64("stream_offset", uint64(before)+uint64(index)), + "event appended to stream", + slog.Uint64("journal_position", uint64(pos)), + slog.Uint64("stream_offset", uint64(w.nextOffset)), slog.String("message_id", event.MessageId.AsString()), slog.String("description", event.Description), ) - } - w.nextPos++ - w.nextOffset = after + w.nextOffset++ + } - return AppendResponse{ - BeginOffset: before, - EndOffset: after, - AppendedByPriorAttempt: false, - }, nil + return pos, rec, nil } diff --git a/internal/eventstream/append_test.go b/internal/eventstream/append_test.go index 0e19c28e..2aa00ef0 100644 --- a/internal/eventstream/append_test.go +++ b/internal/eventstream/append_test.go @@ -105,7 +105,7 @@ func TestAppend(t *testing.T) { Desc: "optimistic concurrency conflict", InduceFailure: func(ctx context.Context, t *testing.T, deps *dependencies) { go func() { - if _, err := deps.Supervisor.AppendQueue.Do( + if _, err := deps.Supervisor.Append.Do( ctx, AppendRequest{ StreamID: streamID, @@ -130,7 +130,7 @@ func TestAppend(t *testing.T) { UntilStopped(). Stop() - if _, err := s.AppendQueue.Do( + if _, err := s.Append.Do( ctx, AppendRequest{ StreamID: streamID, @@ -162,7 +162,7 @@ func TestAppend(t *testing.T) { RunInBackground(t, "event-seeding-supervisor", deps.Supervisor.Run). UntilStopped() - res, err := deps.Supervisor.AppendQueue.Do( + res, err := deps.Supervisor.Append.Do( tctx, AppendRequest{ StreamID: streamID, @@ -220,7 +220,7 @@ func TestAppend(t *testing.T) { t.Logf("append an event, attempt #%d", attempt) attempt++ - _, err := deps.Supervisor.AppendQueue.Do(tctx, req) + _, err := deps.Supervisor.Append.Do(tctx, req) if err == nil { break } diff --git a/internal/eventstream/idle.go b/internal/eventstream/idle.go index 49e20550..d82fe4b5 100644 --- a/internal/eventstream/idle.go +++ b/internal/eventstream/idle.go @@ -14,10 +14,10 @@ const ( // catchUpTimeout is the amount of time a worker WITH SUBSCRIBERS will wait // after appending events before "catching up" with any journal records that // have been appended by other nodes. - catchUpTimeout = 10 * time.Second + catchUpTimeout = 1 * time.Millisecond ) -// resetIdleTimer starts or resets the idle timer. +// resetIdleTimer (re)starts the idle timer. func (w *worker) resetIdleTimer() { timeout := shutdownTimeout if len(w.subscribers) > 0 { @@ -26,12 +26,14 @@ func (w *worker) resetIdleTimer() { if w.idleTimer == nil { w.idleTimer = time.NewTimer(timeout) - } else { - if !w.idleTimer.Stop() { - <-w.idleTimer.C - } - w.idleTimer.Reset(timeout) + return } + + if !w.idleTimer.Stop() { + <-w.idleTimer.C + } + + w.idleTimer.Reset(timeout) } // handleIdle is called when the worker has not appended any new events for some @@ -43,13 +45,20 @@ func (w *worker) resetIdleTimer() { func (w *worker) handleIdle(ctx context.Context) (bool, error) { if len(w.subscribers) == 0 { w.Logger.Debug( - "event stream worker stopped due to inactivity", + "event stream worker is idle, shutting down", slog.Uint64("next_journal_position", uint64(w.nextPos)), slog.Uint64("next_stream_offset", uint64(w.nextOffset)), ) return false, nil } + w.Logger.Debug( + "event stream worker is idle with subscribers, polling journal", + slog.Uint64("next_journal_position", uint64(w.nextPos)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + slog.Int("subscriber_count", len(w.subscribers)), + ) + if err := w.catchUpWithJournal(ctx); err != nil { return false, err } diff --git a/internal/eventstream/reader.go b/internal/eventstream/reader.go index 17f3873a..cea58ee1 100644 --- a/internal/eventstream/reader.go +++ b/internal/eventstream/reader.go @@ -2,349 +2,295 @@ package eventstream import ( "context" + "errors" "fmt" "log/slog" "time" - "github.com/dogmatiq/enginekit/protobuf/envelopepb" "github.com/dogmatiq/enginekit/protobuf/uuidpb" "github.com/dogmatiq/persistencekit/journal" "github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal" "github.com/dogmatiq/veracity/internal/messaging" - "github.com/dogmatiq/veracity/internal/signaling" ) -// A Subscriber is sent events from a stream, by way of a [Reader]. -type Subscriber struct { - // StreamID is the ID of the stream from which events are read. - StreamID *uuidpb.UUID - - // Offset is the offset of the next event to read. - // - // It must not be read or modified while the subscription is active. It is - // incremented as events are sent to the subscriber. - Offset Offset - - // Filter is a predicate function that returns true if the subscriber should - // receive the event in the given envelope. - // - // It is used to avoid filling the subscriber's channel with events they are - // not interested in. It is called by the event stream worker in its own - // goroutine, and hence must not block. - Filter func(*envelopepb.Envelope) bool - - // Events is the channel to which the subscriber's events are sent. - Events chan<- Event - - canceled signaling.Event -} +// defaultSubscribeTimeout is the default maximum time to wait for a +// subscription to be acknowledged by the worker before reverting to reading +// from the journal. +const defaultSubscribeTimeout = 3 * time.Second // A Reader reads ordered events from a stream. type Reader struct { Journals journal.BinaryStore - SubscribeQueue *messaging.RequestQueue[*Subscriber] - UnsubscribeQueue *messaging.RequestQueue[*Subscriber] + SubscribeQueue *messaging.ExchangeQueue[*Subscriber, messaging.None] + UnsubscribeQueue *messaging.ExchangeQueue[*Subscriber, messaging.None] + SubscribeTimeout time.Duration + Logger *slog.Logger } -// Read reads events from a stream and sends them to the given subscriber. -// -// It starts by reading events directly from the stream's journal records. Once -// it has "caught up" to the end of the journal it receives events in -// "real-time" from the supervisor of that stream. +// Read sends events from a stream to the given subscriber's events channel. // -// If the subscriber's channel becomes full, it reverts to reading from the -// journal until it catches up again. +// It first attempts to "sync" with the local worker to receive contemporary +// events in "real-time". If the subscriber's requested offset is too old to be +// obtained from the worker, or if the events channel becomes full, the reader +// obtains events directly from the journal until the last record is reached, +// then the process repeats. func (r *Reader) Read(ctx context.Context, sub *Subscriber) error { + if sub.id == nil { + sub.id = uuidpb.Generate() + } + for { - if err := r.readHistorical(ctx, sub); err != nil { + if err := r.readContemporary(ctx, sub); err != nil { return err } - if err := r.readContemporary(ctx, sub); err != nil { + if err := r.readHistorical(ctx, sub); err != nil { return err } } } -func (r *Reader) readHistorical(ctx context.Context, sub *Subscriber) error { - j, err := eventstreamjournal.Open(ctx, r.Journals, sub.StreamID) - if err != nil { - return err - } - defer j.Close() - - searchBegin, searchEnd, err := j.Bounds(ctx) - if err != nil { - return err - } - - return journal.RangeFromSearchResult( - ctx, - j, - searchBegin, searchEnd, - eventstreamjournal.SearchByOffset(uint64(sub.Offset)), - func( - ctx context.Context, - pos journal.Position, - rec *eventstreamjournal.Record, - ) (bool, error) { - begin := Offset(rec.StreamOffsetBefore) - end := Offset(rec.StreamOffsetAfter) - - if begin == end { - // no events in this record - return true, nil - } - - if sub.Offset < begin || sub.Offset >= end { - return false, fmt.Errorf( - "event stream integrity error at journal position %d: expected event at offset %d, but found offset range [%d, %d)", - pos, - sub.Offset, - begin, - end, - ) - } - - index := sub.Offset - begin - - for _, env := range rec.GetEventsAppended().Events[index:] { - if !sub.Filter(env) { - sub.Offset++ - continue - } - - select { - case <-ctx.Done(): - return false, ctx.Err() - case sub.Events <- Event{sub.StreamID, sub.Offset, env}: - sub.Offset++ - } - } - - return true, nil - }, - ) -} - func (r *Reader) readContemporary(ctx context.Context, sub *Subscriber) error { // TODO: remote read + r.Logger.Debug( + "subscribing to receive contemporary events from local event stream worker", + slog.String("stream_id", sub.StreamID.AsString()), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + ) + if err := r.subscribe(ctx, sub); err != nil { + // If the subscription request times out, but the parent context isn't + // canceled we revert to reading from the journal (by returning nil). + if errors.Is(err, context.DeadlineExceeded) && ctx.Err() == nil { + r.Logger.Warn( + "timed-out waiting for local event stream worker to acknowledge subscription", + slog.String("stream_id", sub.StreamID.AsString()), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + ) + return nil + } + return err } - defer r.unsubscribe(ctx, sub) select { case <-ctx.Done(): + r.unsubscribe(sub) return ctx.Err() case <-sub.canceled.Signaled(): + r.Logger.Debug( + "subscription canceled by local event stream worker", + slog.String("stream_id", sub.StreamID.AsString()), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + ) return nil } } func (r *Reader) subscribe(ctx context.Context, sub *Subscriber) error { - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) // TODO: make configurable - cancel() - - if err := r.SubscribeQueue.Do(ctx, sub); err != nil { - return fmt.Errorf("cannot subscribe to event stream: %w", err) + // Impose our own subscription timeout. This handles the case that the + // supervisor/worker is not running or cannot service our subscription + // request in a timely manner, in which case we will revert to reading from + // the journal. + timeout := r.SubscribeTimeout + if timeout <= 0 { + timeout = defaultSubscribeTimeout } - return nil -} - -func (r *Reader) unsubscribe(ctx context.Context, sub *Subscriber) error { - ctx, cancel := context.WithCancel(context.WithoutCancel(ctx)) + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - // Cancel the unsubscribe context when the subscription is canceled, - // regardless of the reason. - // - // This handles the situation where the subscription is canceled because the - // worker shutdown (and hence wont service the unsubscribe request). - go func() { - <-sub.canceled.Signaled() - cancel() - }() - - return r.UnsubscribeQueue.Do(ctx, sub) -} + req, done := r.SubscribeQueue.New(sub) -// handleSubscribe adds sub to the subscriber list. -// -// It delivers any cached events that the subscriber has not yet seen. If the -// subscriber's requested event is older than the events in the cache the -// subscription is canceled immediately. -func (w *worker) handleSubscribe(sub *Subscriber) { - if !sub.StreamID.Equal(w.StreamID) { - panic("received request for a different stream ID") + select { + case <-ctx.Done(): + return ctx.Err() + case r.SubscribeQueue.Send() <- req: } - if w.subscribers == nil { - w.subscribers = map[*Subscriber]struct{}{} - } - w.subscribers[sub] = struct{}{} - - w.Logger.Debug( - "subscription activated", - slog.String("channel_address", fmt.Sprint(sub.Events)), - slog.Int("channel_capacity", cap(sub.Events)), - slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), - slog.Int("subscriber_count", len(w.subscribers)), - slog.Uint64("requested_stream_offset", uint64(sub.Offset)), - slog.Uint64("next_stream_offset", uint64(w.nextOffset)), - ) + // We don't want to use the context timeout waiting for the response, + // because then we wont know if the subscription was actually accepted. The + // worker ALWAYS sends a response to any subscription request it receives + // from the queue. + res := <-done - if sub.Offset >= w.nextOffset { - return + if _, err := res.Get(); err != nil { + return fmt.Errorf("cannot subscribe to event stream: %w", err) } - index := w.findInCache(sub.Offset) - - if index == -1 { - sub.canceled.Signal() - w.Logger.Warn( - "subscription canceled immediately due request for historical events", - slog.String("channel_address", fmt.Sprint(sub.Events)), - slog.Int("channel_capacity", cap(sub.Events)), - slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), - slog.Int("subscriber_count", len(w.subscribers)), - slog.Int("cached_event_count", len(w.recentEvents)), - slog.Uint64("requested_stream_offset", uint64(sub.Offset)), - slog.Uint64("next_stream_offset", uint64(w.nextOffset)), - ) - return - } + return nil +} - for _, event := range w.recentEvents[index:] { - if w.deliverEventToSubscriber(event, sub) == subscriptionCanceled { - return - } - } +func (r *Reader) unsubscribe(sub *Subscriber) { + // TODO: use a latch to indicate unsubscribing? + req, _ := r.UnsubscribeQueue.New(sub) + r.UnsubscribeQueue.Send() <- req + <-sub.canceled.Signaled() } -// handleUnsubscribe removes sub from the subscriber list. -func (w *worker) handleUnsubscribe(sub *Subscriber) { - if !sub.StreamID.Equal(w.StreamID) { - panic("received request for a different stream ID") +func (r *Reader) readHistorical(ctx context.Context, sub *Subscriber) error { + j, err := eventstreamjournal.Open(ctx, r.Journals, sub.StreamID) + if err != nil { + return fmt.Errorf("unable to open journal: %w", err) } + defer j.Close() - before := len(w.subscribers) - delete(w.subscribers, sub) - after := len(w.subscribers) + var records, delivered, filtered int - if before > after { - sub.canceled.Signal() + fn := func( + ctx context.Context, + pos journal.Position, + rec *eventstreamjournal.Record, + ) (bool, error) { + records++ - w.Logger.Debug( - "subscription canceled by subscriber", - slog.String("channel_address", fmt.Sprint(sub.Events)), - slog.Int("channel_capacity", cap(sub.Events)), - slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), - slog.Int("subscriber_count", after), - ) - } -} + begin := Offset(rec.StreamOffsetBefore) + end := Offset(rec.StreamOffsetAfter) -// deliverResult is an enumeration of the possible outcomes of delivering an -// event to a subscriber. -type deliverResult int + if begin == end { + // This condition is not producible at the moment, but is present to + // provide forward compatibility with future journal record types. + sub.beginPos = pos + 1 + return true, nil + } -const ( - // eventDelivered means that the event was sent to the subscriber's events - // channel, which may or may not be buffered. - eventDelivered deliverResult = iota + if sub.Offset < begin || sub.Offset >= end { + return false, fmt.Errorf( + "event stream integrity error at journal position %d: expected event at offset %d, but found offset range [%d, %d)", + pos, + sub.Offset, + begin, + end, + ) + } - // eventFiltered means that the event was filtered by the subscriber's - // filter function, and did not need to be delivered. - eventFiltered + sub.beginPos = pos + sub.beginPosIsDefinitive = true - // subscriptionCanceled means that an attempt was made to send the event to - // the subscriber's event channel, but the channel buffer was full (or - // unbuffered and not ready to read), and so the subscription was canceled. - subscriptionCanceled -) + index := sub.Offset - begin + + for _, env := range rec.GetEventsAppended().Events[index:] { + if sub.Filter != nil && !sub.Filter(env) { + sub.Offset++ + filtered++ + continue + } -// deliverEventToSubscriber attempts to deliver an event to a subscriber's event -// channel. -func (w *worker) deliverEventToSubscriber(event Event, sub *Subscriber) deliverResult { - if event.Offset > sub.Offset { - panic("event is out of order") + select { + case <-ctx.Done(): + return false, ctx.Err() + case sub.Events <- Event{sub.StreamID, sub.Offset, env}: + sub.Offset++ + delivered++ + } + } + + sub.beginPos++ + + return true, nil } - if event.Offset < sub.Offset { - return eventFiltered + iter := r.searchHistorical + if sub.beginPosIsDefinitive { + iter = r.rangeHistorical } - if !sub.Filter(event.Envelope) { - sub.Offset++ - return eventFiltered + if err := iter(ctx, sub, j, fn); err != nil { + return err } - select { - case sub.Events <- event: - sub.Offset++ - return eventDelivered - - default: - delete(w.subscribers, sub) - sub.canceled.Signal() - - w.Logger.Warn( - "subscription canceled because the subscriber can not keep up with the event stream", - slog.String("channel_address", fmt.Sprint(sub.Events)), - slog.Int("channel_capacity", cap(sub.Events)), - slog.Int("channel_headroom", 0), - slog.Int("subscriber_count", len(w.subscribers)), - slog.Uint64("stream_offset", uint64(event.Offset)), - ) + r.Logger.Debug( + "finished reading historical events from journal", + slog.String("stream_id", sub.StreamID.AsString()), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + slog.Int("journal_record_count", records), + slog.Int("events_delivered_count", delivered), + slog.Int("events_filtered_count", filtered), + ) - return subscriptionCanceled - } + return nil } -// publishEvents publishes the events to both the recent event cache and any -// interested subscribers. -func (w *worker) publishEvents( - offset Offset, - events []*envelopepb.Envelope, -) { - skip := w.growCache(len(events)) - - for i, env := range events { - event := Event{w.StreamID, offset, env} - offset++ +// rangeHistorical delivers all (relevent) events to the subscriber, starting +// with the events in the record at position sub.beginPos. +func (r *Reader) rangeHistorical( + ctx context.Context, + sub *Subscriber, + j journal.Journal[*eventstreamjournal.Record], + fn journal.RangeFunc[*eventstreamjournal.Record], +) error { + r.Logger.Debug( + "ranging over historical events in journal", + slog.String("stream_id", sub.StreamID.AsString()), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + slog.Uint64("journal_begin_position", uint64(sub.beginPos)), + ) - if i >= skip { - w.appendEventToCache(event) + if err := j.Range(ctx, sub.beginPos, fn); err != nil { + if errors.Is(err, journal.ErrNotFound) { + // If we're ranging over a journal record that does not exist it + // means we've delivered all events but were unable to subscribe to + // receive contemporary events from the worker. + return nil } + return fmt.Errorf("unable to range over journal: %w", err) + } - if len(w.subscribers) == 0 { - continue - } + return nil +} + +// searchHistorical performs a binary search to find the journal record that +// contains the next event to deliver to sub, then delivers that event an all +// subsequent events until the end of the journal. +func (r *Reader) searchHistorical( + ctx context.Context, + sub *Subscriber, + j journal.Journal[*eventstreamjournal.Record], + fn journal.RangeFunc[*eventstreamjournal.Record], +) error { + begin, end, err := j.Bounds(ctx) + if err != nil { + return fmt.Errorf("unable to read journal bounds: %w", err) + } - var delivered, filtered, canceled int + begin = max(begin, sub.beginPos) - for sub := range w.subscribers { - switch w.deliverEventToSubscriber(event, sub) { - case eventDelivered: - delivered++ - case eventFiltered: - filtered++ - case subscriptionCanceled: - canceled++ - } - } + r.Logger.Debug( + "searching for historical events in journal", + slog.String("stream_id", sub.StreamID.AsString()), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + slog.Uint64("journal_begin_position", uint64(begin)), + slog.Uint64("journal_end_position", uint64(end)), + ) - w.Logger.Debug( - "event published to subscribers", - slog.Uint64("stream_offset", uint64(event.Offset)), - slog.String("message_id", env.MessageId.AsString()), - slog.String("description", env.Description), - slog.Int("delivered_count", delivered), - slog.Int("filtered_count", filtered), - slog.Int("canceled_count", canceled), - ) + if err := journal.RangeFromSearchResult( + ctx, + j, + begin, end, + eventstreamjournal.SearchByOffset(uint64(sub.Offset)), + fn, + ); err != nil { + if errors.Is(err, journal.ErrNotFound) { + // If the event is not in the journal then we don't want to + // re-search these same records in the future. + sub.beginPos = end + return nil + } + return fmt.Errorf("unable to search journal: %w", err) } + + return nil } diff --git a/internal/eventstream/reader_test.go b/internal/eventstream/reader_test.go new file mode 100644 index 00000000..02073926 --- /dev/null +++ b/internal/eventstream/reader_test.go @@ -0,0 +1,390 @@ +package eventstream_test + +import ( + "context" + "testing" + "time" + + . "github.com/dogmatiq/dogma/fixtures" + "github.com/dogmatiq/enginekit/protobuf/envelopepb" + "github.com/dogmatiq/enginekit/protobuf/identitypb" + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + . "github.com/dogmatiq/marshalkit/fixtures" + "github.com/dogmatiq/persistencekit/driver/memory/memoryjournal" + "github.com/dogmatiq/spruce" + "github.com/dogmatiq/veracity/internal/envelope" + . "github.com/dogmatiq/veracity/internal/eventstream" + "github.com/dogmatiq/veracity/internal/test" +) + +func TestReader(t *testing.T) { + t.Parallel() + + type dependencies struct { + Journals *memoryjournal.BinaryStore + Supervisor *Supervisor + Packer *envelope.Packer + Reader *Reader + } + + setup := func(t test.TestingT) (deps dependencies) { + deps.Journals = &memoryjournal.BinaryStore{} + + logger := spruce.NewLogger(t) + + deps.Supervisor = &Supervisor{ + Journals: deps.Journals, + Logger: logger, + } + + deps.Packer = &envelope.Packer{ + Application: identitypb.New("", uuidpb.Generate()), + Marshaler: Marshaler, + } + + deps.Reader = &Reader{ + Journals: deps.Journals, + SubscribeQueue: &deps.Supervisor.SubscribeQueue, + UnsubscribeQueue: &deps.Supervisor.UnsubscribeQueue, + Logger: logger, + } + + return deps + } + + t.Run("it reads historical events from the journal", func(t *testing.T) { + t.Parallel() + + streamID := uuidpb.Generate() + tctx := test.WithContext(t) + deps := setup(tctx) + + supervisor := test. + RunInBackground(t, "supervisor", deps.Supervisor.Run). + UntilStopped() + + env1 := deps.Packer.Pack(MessageE1) + env2 := deps.Packer.Pack(MessageE2) + env3 := deps.Packer.Pack(MessageE3) + + // Journal record with a single event. + if _, err := deps.Supervisor.AppendQueue.Do( + tctx, + AppendRequest{ + StreamID: streamID, + Events: []*envelopepb.Envelope{ + env1, + }, + }, + ); err != nil { + t.Fatal(err) + } + + // Journal record with multiple events. + if _, err := deps.Supervisor.AppendQueue.Do( + tctx, + AppendRequest{ + StreamID: streamID, + Events: []*envelopepb.Envelope{ + env2, + env3, + }, + }, + ); err != nil { + t.Fatal(err) + } + + // Stop the supervisor now to verify that it's not necessary to read + // historical events. We set the timeout to the minimum value possible + // to reduce the test run-time. + supervisor.StopAndWait() + deps.Reader.SubscribeTimeout = 1 + + events := make(chan Event) + sub := &Subscriber{ + StreamID: streamID, + Events: events, + } + + test. + RunInBackground(t, "reader", func(ctx context.Context) error { + return deps.Reader.Read(ctx, sub) + }). + UntilTestEnds() + + for offset, env := range []*envelopepb.Envelope{env1, env2, env3} { + test.ExpectChannelToReceive( + tctx, + events, + Event{ + StreamID: streamID, + Offset: Offset(offset), + Envelope: env, + }, + ) + } + }) + + t.Run("it reads contemporary events via a supervisor subscription", func(t *testing.T) { + t.Parallel() + + streamID := uuidpb.Generate() + tctx := test.WithContext(t) + deps := setup(tctx) + + test. + RunInBackground(t, "supervisor", deps.Supervisor.Run). + UntilTestEnds() + + // Give the reader a different journal store so we know it isn't reading + // the getting events from the journal. + deps.Reader.Journals = &memoryjournal.BinaryStore{} + + events := make(chan Event, 100) + sub := &Subscriber{ + StreamID: streamID, + Events: events, + } + + test. + RunInBackground(t, "reader", func(ctx context.Context) error { + return deps.Reader.Read(ctx, sub) + }). + UntilTestEnds() + + env1 := deps.Packer.Pack(MessageE1) + env2 := deps.Packer.Pack(MessageE2) + env3 := deps.Packer.Pack(MessageE3) + + go func() { + // Journal record with a single event. + if _, err := deps.Supervisor.AppendQueue.Do( + tctx, + AppendRequest{ + StreamID: streamID, + Events: []*envelopepb.Envelope{ + env1, + }, + }, + ); err != nil { + t.Error(err) + return + } + + // Journal record with multiple events. + if _, err := deps.Supervisor.AppendQueue.Do( + tctx, + AppendRequest{ + StreamID: streamID, + Events: []*envelopepb.Envelope{ + env2, + env3, + }, + }, + ); err != nil { + t.Error(err) + return + } + }() + + for offset, env := range []*envelopepb.Envelope{env1, env2, env3} { + test.ExpectChannelToReceive( + tctx, + events, + Event{ + StreamID: streamID, + Offset: Offset(offset), + Envelope: env, + }, + ) + } + }) + + t.Run("it reverts to the journal if it cannot keep up with the event stream", func(t *testing.T) { + t.Parallel() + + streamID := uuidpb.Generate() + tctx := test.WithContext(t) + deps := setup(tctx) + + test. + RunInBackground(t, "supervisor", deps.Supervisor.Run). + UntilTestEnds() + + events := make(chan Event) // note: unbuffered + sub := &Subscriber{ + StreamID: streamID, + Events: events, + } + + test. + RunInBackground(t, "reader", func(ctx context.Context) error { + return deps.Reader.Read(ctx, sub) + }). + UntilTestEnds() + + envelopes := []*envelopepb.Envelope{ + deps.Packer.Pack(MessageE1), + deps.Packer.Pack(MessageE2), + deps.Packer.Pack(MessageE3), + } + + go func() { + for _, env := range envelopes { + _, err := deps.Supervisor.AppendQueue.Do( + tctx, + AppendRequest{ + StreamID: streamID, + Events: []*envelopepb.Envelope{env}, + }, + ) + if err != nil { + t.Error(err) + return + } + } + }() + + for offset, env := range envelopes { + time.Sleep(500 * time.Microsecond) // make the consumer slow + test.ExpectChannelToReceive( + tctx, + events, + Event{ + StreamID: streamID, + Offset: Offset(offset), + Envelope: env, + }, + ) + } + }) + + t.Run("it does not duplicate or mis-order events when there are competing supervisors", func(t *testing.T) { + t.Parallel() + + streamID := uuidpb.Generate() + tctx := test.WithContext(t) + deps := setup(tctx) + + // emulate a supervisor running on another node + remoteSupervisor := &Supervisor{ + Journals: deps.Journals, + Logger: deps.Supervisor.Logger.With("supervisor", "remote"), + } + + deps.Supervisor.Logger = deps.Supervisor.Logger.With("supervisor", "local") + + test. + RunInBackground(t, "local-supervisor", deps.Supervisor.Run). + UntilTestEnds() + + test. + RunInBackground(t, "remote-supervisor", remoteSupervisor.Run). + UntilTestEnds() + + // use a small buffer, allowing it to revert to reading from the + // journal as an added sanity check. + events := make(chan Event, 5) + sub := &Subscriber{ + StreamID: streamID, + Events: events, + } + + test. + RunInBackground(t, "reader", func(ctx context.Context) error { + return deps.Reader.Read(ctx, sub) + }). + UntilTestEnds() + + const eventsPerSupervisor = 1 + + appendLoop := func(ctx context.Context, s *Supervisor, source string) error { + handlerID := uuidpb.Generate() + + for n := range eventsPerSupervisor { + env := deps.Packer.Pack( + MessageE{Value: n}, + envelope.WithHandler( + // Abuse the "source handler" field of the envelope to + // discriminate between events produced by our local and + // remote supervisors. + identitypb.New(source, handlerID), + ), + ) + + if _, err := s.AppendQueue.Do( + ctx, + AppendRequest{ + StreamID: streamID, + Events: []*envelopepb.Envelope{env}, + }, + ); err != nil { + t.Logf("failed to append event #%d: %s", n, err) + return err + } + } + + return nil + } + + test. + RunInBackground(t, "local-append-loop", func(ctx context.Context) error { + return appendLoop(ctx, deps.Supervisor, "local") + }). + BeforeTestEnds() + + test. + RunInBackground(t, "remote-append-loop", func(ctx context.Context) error { + return appendLoop(ctx, remoteSupervisor, "remote") + }). + BeforeTestEnds() + + nextLocal := 0 + nextRemote := 0 + + for offset := Offset(0); offset < eventsPerSupervisor*2; offset++ { + t.Logf("waiting for event at offset %d", offset) + + select { + case <-tctx.Done(): + t.Fatal(tctx.Err()) + + case e := <-events: + if e.Offset != offset { + t.Fatalf("unexpected offset: got %d, want %d", e.Offset, offset) + } + + next := &nextLocal + if e.Envelope.SourceHandler.Name == "remote" { + next = &nextRemote + } + + m, err := deps.Packer.Unpack(e.Envelope) + if err != nil { + t.Fatalf("unable to unpack event: %s", err) + } + + got := m.(MessageE) + want := MessageE{Value: float64(*next)} + + test.Expect( + t, + "unexpected message from "+e.Envelope.SourceHandler.Name+" supervisor", + got, + want, + ) + + *next++ + t.Logf("received expected event %q from %q supervisor", e.Envelope.Description, e.Envelope.SourceHandler.Name) + } + } + + if nextLocal != eventsPerSupervisor { + t.Errorf("unexpected number of events from local supervisor: got %d, want %d", nextLocal, eventsPerSupervisor) + } + + if nextRemote != eventsPerSupervisor { + t.Errorf("unexpected number of events from remote supervisor: got %d, want %d", nextRemote, eventsPerSupervisor) + } + }) +} diff --git a/internal/eventstream/subscriber.go b/internal/eventstream/subscriber.go new file mode 100644 index 00000000..034111f3 --- /dev/null +++ b/internal/eventstream/subscriber.go @@ -0,0 +1,251 @@ +package eventstream + +import ( + "log/slog" + + "github.com/dogmatiq/enginekit/protobuf/envelopepb" + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + "github.com/dogmatiq/persistencekit/journal" + "github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal" + "github.com/dogmatiq/veracity/internal/signaling" +) + +// A Subscriber is sent events from a stream, by way of a [Reader]. +type Subscriber struct { + // StreamID is the ID of the stream from which events are read. + StreamID *uuidpb.UUID + + // Offset is the offset of the next event to read. + // + // It must not be read or modified while the subscription is active. It is + // incremented as events are sent to the subscriber. + Offset Offset + + // Filter is a predicate function that returns true if the subscriber should + // receive the event in the given envelope. + // + // It is used to avoid filling the subscriber's channel with events they are + // not interested in. It is called by the event stream worker in its own + // goroutine, and hence must not block. + Filter func(*envelopepb.Envelope) bool + + // Events is the channel to which the subscriber's events are sent. + Events chan<- Event + + // canceled indicates that the [Supervisor] has canceled the subscription. + canceled signaling.Event + + // id is a unique identifier for the subscriber. + id *uuidpb.UUID + + // beginPos is the journal position to begin ranging or search for the next + // event to deliver to the subscriber. + beginPos journal.Position + + // beginPosIsDefinitive is true if beginPos "definitive", meaning that it + // represents the exact position of the record containing the next event to + // deliver to the subscriber. + beginPosIsDefinitive bool +} + +// handleSubscribe adds sub to the subscriber list. +// +// It delivers any cached events that the subscriber has not yet seen. If the +// subscriber's requested event is older than the events in the cache the +// subscription is canceled immediately. +func (w *worker) handleSubscribe(sub *Subscriber) { + if !sub.StreamID.Equal(w.StreamID) { + panic("received request for a different stream ID") + } + + if w.subscribers == nil { + w.subscribers = map[*Subscriber]struct{}{} + } + w.subscribers[sub] = struct{}{} + defer w.resetIdleTimer() + + if sub.Offset >= w.nextOffset { + w.Logger.Debug( + "subscription activated, waiting for new events", + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + slog.Int("subscriber_count", len(w.subscribers)), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + ) + return + } + + index := w.findInCache(sub.Offset) + + if index == -1 { + delete(w.subscribers, sub) + w.Logger.Warn( + "subscription not activated due to request for uncached historical events", + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + slog.Int("cached_event_count", len(w.recentEvents)), + slog.Int("subscriber_count", len(w.subscribers)), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + ) + sub.canceled.Signal() + return + } + + w.Logger.Debug( + "subscription activated, delivering cached events", + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + slog.Int("subscriber_count", len(w.subscribers)), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + ) + + for _, event := range w.recentEvents[index:] { + if w.deliverEventToSubscriber(event, sub) == subscriptionCanceled { + return + } + } +} + +// handleUnsubscribe removes sub from the subscriber list. +func (w *worker) handleUnsubscribe(sub *Subscriber) { + if !sub.StreamID.Equal(w.StreamID) { + panic("received request for a different stream ID") + } + + before := len(w.subscribers) + delete(w.subscribers, sub) + after := len(w.subscribers) + + if before > after { + w.Logger.Debug( + "subscription canceled by subscriber", + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + slog.Int("subscriber_count", after), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + ) + sub.canceled.Signal() + w.resetIdleTimer() + } +} + +// deliverResult is an enumeration of the possible outcomes of delivering an +// event to a subscriber. +type deliverResult int + +const ( + // eventDelivered means that the event was sent to the subscriber's events + // channel, which may or may not be buffered. + eventDelivered deliverResult = iota + + // eventFiltered means that the event was filtered by the subscriber's + // filter function, and did not need to be delivered. + eventFiltered + + // subscriptionCanceled means that an attempt was made to send the event to + // the subscriber's event channel, but the channel buffer was full (or + // unbuffered and not ready to read), and so the subscription was canceled. + subscriptionCanceled +) + +// deliverEventToSubscriber attempts to deliver an event to a subscriber's event +// channel. +func (w *worker) deliverEventToSubscriber(event Event, sub *Subscriber) deliverResult { + if event.Offset > sub.Offset { + panic("event is out of order") + } + + if event.Offset < sub.Offset { + return eventFiltered + } + + if sub.Filter != nil && !sub.Filter(event.Envelope) { + sub.Offset++ + return eventFiltered + } + + select { + case sub.Events <- event: + sub.Offset++ + return eventDelivered + + default: + delete(w.subscribers, sub) + w.Logger.Warn( + "subscription canceled because the subscriber cannot keep up with the event stream", + slog.Int("subscriber_count", len(w.subscribers)), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + ) + sub.canceled.Signal() + w.resetIdleTimer() + + return subscriptionCanceled + } +} + +// publishEvents publishes the events to both the recent event cache and any +// interested subscribers. +func (w *worker) publishEvents(pos journal.Position, rec *eventstreamjournal.Record) { + op := rec.GetEventsAppended() + if op == nil { + return + } + + skip := w.growCache(len(op.Events)) + + // Update the subscriber's position to refer to the record containing the + // events we're about to deliver. + for sub := range w.subscribers { + sub.beginPos = pos + sub.beginPosIsDefinitive = true + } + + offset := Offset(rec.StreamOffsetBefore) + for i, env := range op.Events { + event := Event{w.StreamID, offset, env} + offset++ + + if i >= skip { + w.appendEventToCache(event) + } + + if len(w.subscribers) == 0 { + continue + } + + var delivered, filtered, canceled int + + for sub := range w.subscribers { + switch w.deliverEventToSubscriber(event, sub) { + case eventDelivered: + delivered++ + case eventFiltered: + filtered++ + case subscriptionCanceled: + canceled++ + } + } + + w.Logger.Debug( + "event published to subscribers", + slog.Uint64("stream_offset", uint64(event.Offset)), + slog.String("message_id", env.MessageId.AsString()), + slog.String("description", env.Description), + slog.Int("subscriber_delivered_count", delivered), + slog.Int("subscriber_filtered_count", filtered), + slog.Int("subscriber_canceled_count", canceled), + ) + } + + // Any remaining (i.e. uncanceled) subscribers should now look for the + // following journal record. + for sub := range w.subscribers { + sub.beginPos++ + } +} diff --git a/internal/eventstream/supervisor.go b/internal/eventstream/supervisor.go index d514435f..22490fd2 100644 --- a/internal/eventstream/supervisor.go +++ b/internal/eventstream/supervisor.go @@ -21,8 +21,8 @@ var errShuttingDown = errors.New("event stream sub-system is shutting down") type Supervisor struct { Journals journal.BinaryStore AppendQueue messaging.ExchangeQueue[AppendRequest, AppendResponse] - SubscribeQueue messaging.RequestQueue[*Subscriber] - UnsubscribeQueue messaging.RequestQueue[*Subscriber] + SubscribeQueue messaging.ExchangeQueue[*Subscriber, messaging.None] + UnsubscribeQueue messaging.ExchangeQueue[*Subscriber, messaging.None] Logger *slog.Logger shutdown signaling.Latch @@ -93,29 +93,29 @@ func (s *Supervisor) appendState( // subscribeState forwards a subscribe request to the appropriate worker. func (s *Supervisor) subscribeState( ctx context.Context, - req messaging.Request[*Subscriber], + ex messaging.Exchange[*Subscriber, messaging.None], ) fsm.Action { - w, err := s.workerByStreamID(ctx, req.Request.StreamID) + w, err := s.workerByStreamID(ctx, ex.Request.StreamID) if err != nil { - req.Err(errShuttingDown) + ex.Err(errShuttingDown) return fsm.Fail(err) } - return forwardToWorker(ctx, s, w.SubscribeQueue.Send(), req) + return forwardToWorker(ctx, s, w.SubscribeQueue.Send(), ex) } // unsubscribeState forwards an unsubscribe request to the appropriate worker. func (s *Supervisor) unsubscribeState( ctx context.Context, - req messaging.Request[*Subscriber], + ex messaging.Exchange[*Subscriber, messaging.None], ) fsm.Action { - w, ok := s.workers.TryGet(req.Request.StreamID) + w, ok := s.workers.TryGet(ex.Request.StreamID) if !ok { - req.Ok() + ex.Zero() return fsm.EnterState(s.idleState) } - return forwardToWorker(ctx, s, w.UnsubscribeQueue.Send(), req) + return forwardToWorker(ctx, s, w.UnsubscribeQueue.Send(), ex) } func forwardToWorker[ diff --git a/internal/eventstream/worker.go b/internal/eventstream/worker.go index 4d6c66c6..9a513baa 100644 --- a/internal/eventstream/worker.go +++ b/internal/eventstream/worker.go @@ -2,6 +2,7 @@ package eventstream import ( "context" + "fmt" "log/slog" "time" @@ -24,10 +25,10 @@ type worker struct { AppendQueue messaging.ExchangeQueue[AppendRequest, AppendResponse] // SubscribeQueue is a queue of requests to subscribe to the stream. - SubscribeQueue messaging.RequestQueue[*Subscriber] + SubscribeQueue messaging.ExchangeQueue[*Subscriber, messaging.None] // UnsubscribeQueue is a queue of requests to unsubscribe from the stream. - UnsubscribeQueue messaging.RequestQueue[*Subscriber] + UnsubscribeQueue messaging.ExchangeQueue[*Subscriber, messaging.None] // Shutdown signals the worker to stop when it next becomes idle. Shutdown signaling.Latch @@ -65,22 +66,20 @@ func (w *worker) Run(ctx context.Context) (err error) { pos, rec, ok, err := journal.LastRecord(ctx, w.Journal) if err != nil { - return err + return fmt.Errorf("unable to find most recent journal record: %w", err) } if ok { w.nextPos = pos + 1 w.nextOffset = Offset(rec.StreamOffsetAfter) - - w.Logger.Debug( - "event stream journal has existing records", - slog.Uint64("next_journal_position", uint64(w.nextPos)), - slog.Uint64("next_stream_offset", uint64(w.nextOffset)), - ) - } else { - w.Logger.Debug("event stream journal is empty") } + w.Logger.Debug( + "event stream worker started", + slog.Uint64("next_journal_position", uint64(w.nextPos)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + ) + w.resetIdleTimer() defer w.idleTimer.Stop() @@ -109,12 +108,12 @@ func (w *worker) tick(ctx context.Context) (bool, error) { case ex := <-w.SubscribeQueue.Recv(): w.handleSubscribe(ex.Request) - ex.Ok() + ex.Zero() return true, nil case ex := <-w.UnsubscribeQueue.Recv(): w.handleUnsubscribe(ex.Request) - ex.Ok() + ex.Zero() return true, nil case <-w.idleTimer.C: @@ -151,13 +150,12 @@ func (w *worker) catchUpWithJournal(ctx context.Context) error { ) (ok bool, err error) { recordCount++ - events := rec.GetEventsAppended().GetEvents() - if len(events) != 0 { + if n := int(rec.StreamOffsetAfter - rec.StreamOffsetBefore); n != 0 { if eventCount == 0 { - w.Logger.Warn("event stream contains events that were not appended by this worker") + w.Logger.Warn("event stream journal contains records with undelivered events") } - w.publishEvents(Offset(rec.StreamOffsetBefore), events) - eventCount += len(events) + w.publishEvents(pos, rec) + eventCount += n } w.nextPos = pos + 1 @@ -165,17 +163,17 @@ func (w *worker) catchUpWithJournal(ctx context.Context) error { return true, nil }, - ); err != nil { - return err + ); journal.IgnoreNotFound(err) != nil { + return fmt.Errorf("unable to range over journal: %w", err) } if recordCount != 0 { w.Logger.Debug( - "caught up to the end of the event stream journal", - slog.Int("record_count", recordCount), - slog.Int("event_count", eventCount), + "processed latest records from event stream journal", slog.Uint64("next_journal_position", uint64(w.nextPos)), slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + slog.Int("journal_record_count", recordCount), + slog.Int("event_count", eventCount), ) } diff --git a/internal/messaging/request.go b/internal/messaging/request.go deleted file mode 100644 index d5ff6a23..00000000 --- a/internal/messaging/request.go +++ /dev/null @@ -1,64 +0,0 @@ -package messaging - -import ( - "context" - "sync" -) - -// Request encapsulates a request. -type Request[Req any] struct { - Context context.Context - Request Req - Error chan<- error -} - -// Ok sends a successful response. -func (e Request[Req]) Ok() { - e.Error <- nil -} - -// Err sends an error response. -func (e Request[Req]) Err(err error) { - e.Error <- err -} - -// RequestQueue is a queue of requests. -type RequestQueue[Req any] struct { - init sync.Once - queue chan Request[Req] -} - -// Recv returns a channel that, when read, dequeues the next request. -func (q *RequestQueue[Req]) Recv() <-chan Request[Req] { - return q.getQueue() -} - -// Send returns a channel that, when written, enqueues an request. -func (q *RequestQueue[Req]) Send() chan<- Request[Req] { - return q.getQueue() -} - -// Do performs a synchronous request. -func (q *RequestQueue[Req]) Do(ctx context.Context, req Req) error { - response := make(chan error, 1) - - select { - case <-ctx.Done(): - return ctx.Err() - case q.Send() <- Request[Req]{ctx, req, response}: - } - - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-response: - return err - } -} - -func (q *RequestQueue[Req]) getQueue() chan Request[Req] { - q.init.Do(func() { - q.queue = make(chan Request[Req]) - }) - return q.queue -}