diff --git a/cmd/send-all-events-to-relay/main.go b/cmd/send-all-events-to-relay/main.go index 78366db..3bf7c69 100644 --- a/cmd/send-all-events-to-relay/main.go +++ b/cmd/send-all-events-to-relay/main.go @@ -148,7 +148,7 @@ func (u *EventUploader) worker(ctx context.Context) { func (u *EventUploader) sendEvent(ctx context.Context, event domain.Event) error { for { if err := u.eventSender.SendEvent(ctx, u.address, event); err != nil { - if errors.Is(err, relays.BackPressureError) { + if errors.Is(err, relays.ErrEventReplaced) { u.eventsRelayReplaced.Add(1) u.allEvents.Add(1) } else { diff --git a/service/domain/relays/relay_connection.go b/service/domain/relays/relay_connection.go index e0a1623..ff1649d 100644 --- a/service/domain/relays/relay_connection.go +++ b/service/domain/relays/relay_connection.go @@ -70,7 +70,6 @@ type RelayConnection struct { eventsToSendMutex sync.Mutex newEventsCh chan domain.Event rateLimitNoticeBackoffManager *RateLimitNoticeBackoffManager - cancelBackPressure context.CancelFunc } func NewRelayConnection( @@ -90,7 +89,6 @@ func NewRelayConnection( eventsToSend: make(map[domain.EventId]*eventToSend), newEventsCh: make(chan domain.Event), rateLimitNoticeBackoffManager: rateLimitNoticeBackoffManager, - cancelBackPressure: nil, } } @@ -108,10 +106,6 @@ func (r *RelayConnection) Run(ctx context.Context) { if r.Address().HostWithoutPort() == "relay.nos.social" { // We control relay.nos.social, so we don't backoff here backoff = 0 - } else if errors.Is(err, BackPressureError) { - // Only calling r.cancelBackPressure() can resolve the backpressure - r.WaitUntilNoBackPressure(ctx) - backoff = 0 } select { @@ -123,20 +117,6 @@ func (r *RelayConnection) Run(ctx context.Context) { } } -func (r *RelayConnection) WaitUntilNoBackPressure(ctx context.Context) { - var resolvedBackPressureCtx context.Context - var cancelBackPressure context.CancelFunc - - r.setStateWithFn(RelayConnectionStateBackPressured, func() { - resolvedBackPressureCtx, cancelBackPressure = context.WithCancel(ctx) - r.cancelBackPressure = cancelBackPressure - }) - - <-resolvedBackPressureCtx.Done() - - r.setState(RelayConnectionStateDisconnected) -} - func (r *RelayConnection) State() RelayConnectionState { r.stateMutex.Lock() defer r.stateMutex.Unlock() @@ -356,8 +336,8 @@ func (r *RelayConnection) run(ctx context.Context) error { } if r.state == RelayConnectionStateBackPressured { - // Load shedding under backpressure - continue + // Load shedding under backpressure, we just disconnect + return BackPressureError } if err := r.handleMessage(messageBytes); err != nil { @@ -613,14 +593,6 @@ func (r *RelayConnection) setState(state RelayConnectionState) { r.state = state } -func (r *RelayConnection) setStateWithFn(state RelayConnectionState, fn func()) { - r.stateMutex.Lock() - defer r.stateMutex.Unlock() - - r.state = state - fn() -} - func (r *RelayConnection) manageSubs(ctx context.Context, conn Connection) error { defer conn.Close() diff --git a/service/domain/relays/relay_connections.go b/service/domain/relays/relay_connections.go index 84423ad..d1c3d82 100644 --- a/service/domain/relays/relay_connections.go +++ b/service/domain/relays/relay_connections.go @@ -76,7 +76,7 @@ func (d *RelayConnections) SendEvent(ctx context.Context, relayAddress domain.Re func (d *RelayConnections) NotifyBackPressure() { for _, connection := range d.connections { - if connection.Address().HostWithoutPort() != "relay.nos.social" { + if connection.Address().HostWithoutPort() != "relay.nos.social" && connection.State() == RelayConnectionStateConnected { connection.setState(RelayConnectionStateBackPressured) } } @@ -84,9 +84,8 @@ func (d *RelayConnections) NotifyBackPressure() { func (d *RelayConnections) ResolveBackPressure() { for _, connection := range d.connections { - if connection.cancelBackPressure != nil { - connection.cancelBackPressure() - connection.cancelBackPressure = nil + if connection.Address().HostWithoutPort() != "relay.nos.social" && connection.State() == RelayConnectionStateBackPressured { + connection.setState(RelayConnectionStateConnected) } } } @@ -115,6 +114,7 @@ func (d *RelayConnections) storeMetrics() { } d.metrics.ReportRelayConnectionsState(m) } + func (r *RelayConnections) getRateLimitNoticeBackoffManager(relayAddress domain.RelayAddress) *RateLimitNoticeBackoffManager { rateLimitNoticeBackoffManager, exists := r.rateLimitNoticeBackoffManagers[relayAddress.HostWithoutPort()] if !exists { diff --git a/service/ports/sqlitepubsub/event_saved.go b/service/ports/sqlitepubsub/event_saved.go index faa9600..06e2350 100644 --- a/service/ports/sqlitepubsub/event_saved.go +++ b/service/ports/sqlitepubsub/event_saved.go @@ -57,7 +57,7 @@ func (s *EventSavedEventSubscriber) Run(ctx context.Context) error { fmt.Sprintf("Queue size %d > %d. Sending backpressure signal to slow down", queueSize, backPressureThreshold), ) s.handler.NotifyBackPressure() - } else if queueSize < backPressureThreshold/2 { + } else { s.handler.ResolveBackPressure() }