Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify backpressure handling #91

Merged
merged 2 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/send-all-events-to-relay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (u *EventUploader) worker(ctx context.Context) {
func (u *EventUploader) sendEvent(ctx context.Context, event domain.Event) error {
for {
if err := u.eventSender.SendEvent(ctx, u.address, event); err != nil {
if errors.Is(err, relays.BackPressureError) {
if errors.Is(err, relays.ErrEventReplaced) {
u.eventsRelayReplaced.Add(1)
u.allEvents.Add(1)
} else {
Expand Down
32 changes: 2 additions & 30 deletions service/domain/relays/relay_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ type RelayConnection struct {
eventsToSendMutex sync.Mutex
newEventsCh chan domain.Event
rateLimitNoticeBackoffManager *RateLimitNoticeBackoffManager
cancelBackPressure context.CancelFunc
}

func NewRelayConnection(
Expand All @@ -90,7 +89,6 @@ func NewRelayConnection(
eventsToSend: make(map[domain.EventId]*eventToSend),
newEventsCh: make(chan domain.Event),
rateLimitNoticeBackoffManager: rateLimitNoticeBackoffManager,
cancelBackPressure: nil,
}
}

Expand All @@ -108,10 +106,6 @@ func (r *RelayConnection) Run(ctx context.Context) {
if r.Address().HostWithoutPort() == "relay.nos.social" {
// We control relay.nos.social, so we don't backoff here
backoff = 0
} else if errors.Is(err, BackPressureError) {
// Only calling r.cancelBackPressure() can resolve the backpressure
r.WaitUntilNoBackPressure(ctx)
backoff = 0
}

select {
Expand All @@ -123,20 +117,6 @@ func (r *RelayConnection) Run(ctx context.Context) {
}
}

func (r *RelayConnection) WaitUntilNoBackPressure(ctx context.Context) {
var resolvedBackPressureCtx context.Context
var cancelBackPressure context.CancelFunc

r.setStateWithFn(RelayConnectionStateBackPressured, func() {
resolvedBackPressureCtx, cancelBackPressure = context.WithCancel(ctx)
r.cancelBackPressure = cancelBackPressure
})

<-resolvedBackPressureCtx.Done()

r.setState(RelayConnectionStateDisconnected)
}

func (r *RelayConnection) State() RelayConnectionState {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()
Expand Down Expand Up @@ -356,8 +336,8 @@ func (r *RelayConnection) run(ctx context.Context) error {
}

if r.state == RelayConnectionStateBackPressured {
// Load shedding under backpressure
continue
// Load shedding under backpressure, we just disconnect
return BackPressureError
}

if err := r.handleMessage(messageBytes); err != nil {
Expand Down Expand Up @@ -613,14 +593,6 @@ func (r *RelayConnection) setState(state RelayConnectionState) {
r.state = state
}

func (r *RelayConnection) setStateWithFn(state RelayConnectionState, fn func()) {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()

r.state = state
fn()
}

func (r *RelayConnection) manageSubs(ctx context.Context, conn Connection) error {
defer conn.Close()

Expand Down
8 changes: 4 additions & 4 deletions service/domain/relays/relay_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,16 @@ func (d *RelayConnections) SendEvent(ctx context.Context, relayAddress domain.Re

func (d *RelayConnections) NotifyBackPressure() {
for _, connection := range d.connections {
if connection.Address().HostWithoutPort() != "relay.nos.social" {
if connection.Address().HostWithoutPort() != "relay.nos.social" && connection.State() == RelayConnectionStateConnected {
connection.setState(RelayConnectionStateBackPressured)
}
}
}

func (d *RelayConnections) ResolveBackPressure() {
for _, connection := range d.connections {
if connection.cancelBackPressure != nil {
connection.cancelBackPressure()
connection.cancelBackPressure = nil
if connection.Address().HostWithoutPort() != "relay.nos.social" && connection.State() == RelayConnectionStateBackPressured {
connection.setState(RelayConnectionStateConnected)
}
}
}
Expand Down Expand Up @@ -115,6 +114,7 @@ func (d *RelayConnections) storeMetrics() {
}
d.metrics.ReportRelayConnectionsState(m)
}

func (r *RelayConnections) getRateLimitNoticeBackoffManager(relayAddress domain.RelayAddress) *RateLimitNoticeBackoffManager {
rateLimitNoticeBackoffManager, exists := r.rateLimitNoticeBackoffManagers[relayAddress.HostWithoutPort()]
if !exists {
Expand Down
2 changes: 1 addition & 1 deletion service/ports/sqlitepubsub/event_saved.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (s *EventSavedEventSubscriber) Run(ctx context.Context) error {
fmt.Sprintf("Queue size %d > %d. Sending backpressure signal to slow down", queueSize, backPressureThreshold),
)
s.handler.NotifyBackPressure()
} else if queueSize < backPressureThreshold/2 {
} else {
s.handler.ResolveBackPressure()
}

Expand Down
Loading