Skip to content

Commit

Permalink
Merge pull request #91 from planetary-social/simplify-backpressure-ha…
Browse files Browse the repository at this point in the history
…ndling

Simplify backpressure handling
  • Loading branch information
dcadenas authored Jun 10, 2024
2 parents ea4e456 + 54acc65 commit 922f4af
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 36 deletions.
2 changes: 1 addition & 1 deletion cmd/send-all-events-to-relay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (u *EventUploader) worker(ctx context.Context) {
func (u *EventUploader) sendEvent(ctx context.Context, event domain.Event) error {
for {
if err := u.eventSender.SendEvent(ctx, u.address, event); err != nil {
if errors.Is(err, relays.BackPressureError) {
if errors.Is(err, relays.ErrEventReplaced) {
u.eventsRelayReplaced.Add(1)
u.allEvents.Add(1)
} else {
Expand Down
32 changes: 2 additions & 30 deletions service/domain/relays/relay_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ type RelayConnection struct {
eventsToSendMutex sync.Mutex
newEventsCh chan domain.Event
rateLimitNoticeBackoffManager *RateLimitNoticeBackoffManager
cancelBackPressure context.CancelFunc
}

func NewRelayConnection(
Expand All @@ -90,7 +89,6 @@ func NewRelayConnection(
eventsToSend: make(map[domain.EventId]*eventToSend),
newEventsCh: make(chan domain.Event),
rateLimitNoticeBackoffManager: rateLimitNoticeBackoffManager,
cancelBackPressure: nil,
}
}

Expand All @@ -108,10 +106,6 @@ func (r *RelayConnection) Run(ctx context.Context) {
if r.Address().HostWithoutPort() == "relay.nos.social" {
// We control relay.nos.social, so we don't backoff here
backoff = 0
} else if errors.Is(err, BackPressureError) {
// Only calling r.cancelBackPressure() can resolve the backpressure
r.WaitUntilNoBackPressure(ctx)
backoff = 0
}

select {
Expand All @@ -123,20 +117,6 @@ func (r *RelayConnection) Run(ctx context.Context) {
}
}

func (r *RelayConnection) WaitUntilNoBackPressure(ctx context.Context) {
var resolvedBackPressureCtx context.Context
var cancelBackPressure context.CancelFunc

r.setStateWithFn(RelayConnectionStateBackPressured, func() {
resolvedBackPressureCtx, cancelBackPressure = context.WithCancel(ctx)
r.cancelBackPressure = cancelBackPressure
})

<-resolvedBackPressureCtx.Done()

r.setState(RelayConnectionStateDisconnected)
}

func (r *RelayConnection) State() RelayConnectionState {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()
Expand Down Expand Up @@ -356,8 +336,8 @@ func (r *RelayConnection) run(ctx context.Context) error {
}

if r.state == RelayConnectionStateBackPressured {
// Load shedding under backpressure
continue
// Load shedding under backpressure, we just disconnect
return BackPressureError
}

if err := r.handleMessage(messageBytes); err != nil {
Expand Down Expand Up @@ -613,14 +593,6 @@ func (r *RelayConnection) setState(state RelayConnectionState) {
r.state = state
}

func (r *RelayConnection) setStateWithFn(state RelayConnectionState, fn func()) {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()

r.state = state
fn()
}

func (r *RelayConnection) manageSubs(ctx context.Context, conn Connection) error {
defer conn.Close()

Expand Down
8 changes: 4 additions & 4 deletions service/domain/relays/relay_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,16 @@ func (d *RelayConnections) SendEvent(ctx context.Context, relayAddress domain.Re

func (d *RelayConnections) NotifyBackPressure() {
for _, connection := range d.connections {
if connection.Address().HostWithoutPort() != "relay.nos.social" {
if connection.Address().HostWithoutPort() != "relay.nos.social" && connection.State() == RelayConnectionStateConnected {
connection.setState(RelayConnectionStateBackPressured)
}
}
}

func (d *RelayConnections) ResolveBackPressure() {
for _, connection := range d.connections {
if connection.cancelBackPressure != nil {
connection.cancelBackPressure()
connection.cancelBackPressure = nil
if connection.Address().HostWithoutPort() != "relay.nos.social" && connection.State() == RelayConnectionStateBackPressured {
connection.setState(RelayConnectionStateConnected)
}
}
}
Expand Down Expand Up @@ -115,6 +114,7 @@ func (d *RelayConnections) storeMetrics() {
}
d.metrics.ReportRelayConnectionsState(m)
}

func (r *RelayConnections) getRateLimitNoticeBackoffManager(relayAddress domain.RelayAddress) *RateLimitNoticeBackoffManager {
rateLimitNoticeBackoffManager, exists := r.rateLimitNoticeBackoffManagers[relayAddress.HostWithoutPort()]
if !exists {
Expand Down
2 changes: 1 addition & 1 deletion service/ports/sqlitepubsub/event_saved.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (s *EventSavedEventSubscriber) Run(ctx context.Context) error {
fmt.Sprintf("Queue size %d > %d. Sending backpressure signal to slow down", queueSize, backPressureThreshold),
)
s.handler.NotifyBackPressure()
} else if queueSize < backPressureThreshold/2 {
} else {
s.handler.ResolveBackPressure()
}

Expand Down

0 comments on commit 922f4af

Please sign in to comment.