Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Implement inactivity tracker #6817

Merged
Merged
Show file tree
Hide file tree
Changes from 84 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
40f0c2c
Implement limit tracker
Guitarheroua Dec 4, 2024
2534eaf
Added inactivity timeout floag for access and observer nodes
UlyanaAndrukhiv Dec 12, 2024
09627f6
Merged with krok/new-websockets
UlyanaAndrukhiv Dec 12, 2024
2f2bc98
Merge branch 'krok/new-websockets' of github.com:The-K-R-O-K/flow-go …
AndriiDiachuk Dec 13, 2024
7179cb8
Merge branch 'master' of github.com:The-K-R-O-K/flow-go into krok/new…
AndriiDiachuk Dec 13, 2024
8fd7faf
Merge branch 'krok/new-websockets' of github.com:The-K-R-O-K/flow-go …
UlyanaAndrukhiv Dec 13, 2024
279ee14
Merge branch 'AndriiDiachuk/6588-events-data-provider' of github.com:…
AndriiDiachuk Dec 13, 2024
d6fe8f5
Added inactivity tracker impl
UlyanaAndrukhiv Dec 13, 2024
d86d90e
Merge branch 'krok/new-websockets' of github.com:The-K-R-O-K/flow-go …
UlyanaAndrukhiv Dec 13, 2024
341195c
Merge branch 'krok/new-websockets' of github.com:The-K-R-O-K/flow-go …
Guitarheroua Dec 13, 2024
364b35e
added comments and error message
Guitarheroua Dec 13, 2024
09f1705
remove comments
illia-malachyn Dec 13, 2024
f1849c2
Merge branch 'illia-malachyn/6642-ws-controller-error-handling' into …
illia-malachyn Dec 13, 2024
16f5734
Merge branch 'illia-malachyn/6642-ws-controller-error-handling' into …
illia-malachyn Dec 13, 2024
8d6649d
Merge branch 'illia-malachyn/6642-ws-controller-error-handling' into …
illia-malachyn Dec 13, 2024
6bb1143
Merge branch 'illia-malachyn/6642-ws-controller-error-handling' into …
illia-malachyn Dec 13, 2024
0b20427
Merged with illia-malachyn/6642-ws-controller-error-handling
UlyanaAndrukhiv Dec 16, 2024
eaa649f
Merge branch 'illia-malachyn/6642-ws-controller-error-handling' of gi…
UlyanaAndrukhiv Dec 16, 2024
bbafe48
Added init tests for inactivity tracker
UlyanaAndrukhiv Dec 16, 2024
5fa4e47
Merge branch 'AndriiSlisarchuk/6640-responce-limit-tracker' into krok…
Guitarheroua Dec 17, 2024
76ba6b2
Fixed test
UlyanaAndrukhiv Dec 17, 2024
53e3461
Merged with krok/new-websockets
UlyanaAndrukhiv Dec 17, 2024
bc38455
Merged with master
UlyanaAndrukhiv Dec 18, 2024
25a3b5c
Updated inactivity tracker for websockets according to comments
UlyanaAndrukhiv Dec 19, 2024
d388d79
Merged with illia-malachyn/6642-ws-controller-error-handling
UlyanaAndrukhiv Dec 19, 2024
72f1e1b
Updated according to comments
UlyanaAndrukhiv Dec 19, 2024
fc086df
Merge branch 'master' into UlianaAndrukhiv/6799-inactivity-tracker
UlyanaAndrukhiv Dec 23, 2024
9a91eb1
Merge branch 'master' of github.com:The-K-R-O-K/flow-go into UlianaAn…
UlyanaAndrukhiv Dec 26, 2024
20c3e06
Updated acccording to comments
UlyanaAndrukhiv Dec 26, 2024
b808c0c
Merged with master
UlyanaAndrukhiv Dec 30, 2024
bbc23ea
Updated comment according to suggestion, updated log message
UlyanaAndrukhiv Dec 30, 2024
4204718
Added missed godoc for websocket DefaultInactivityTimeout, removed ra…
UlyanaAndrukhiv Dec 30, 2024
807e589
Updated comments and log according to suggestions
UlyanaAndrukhiv Jan 2, 2025
946e02d
Merge branch 'master' into UlianaAndrukhiv/6799-inactivity-tracker
UlyanaAndrukhiv Jan 2, 2025
a2b56c2
Removed comment and improved the readability of the code
UlyanaAndrukhiv Jan 2, 2025
1c4f2a7
Merge branch 'master' into UlianaAndrukhiv/6799-inactivity-tracker
UlyanaAndrukhiv Jan 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1450,6 +1450,11 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
"registerdb-pruning-threshold",
defaultConfig.registerDBPruneThreshold,
fmt.Sprintf("specifies the number of blocks below the latest stored block height to keep in register db. default: %d", defaultConfig.registerDBPruneThreshold))

flags.DurationVar(&builder.rpcConf.WebSocketConfig.InactivityTimeout,
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
"websocket-inactivity-timeout",
defaultConfig.rpcConf.WebSocketConfig.InactivityTimeout,
"specifies the duration a WebSocket connection can remain open without any active subscriptions before being automatically closed")
}).ValidateFlags(func() error {
if builder.supportsObserver && (builder.PublicNetworkConfig.BindAddress == cmd.NotSet || builder.PublicNetworkConfig.BindAddress == "") {
return errors.New("public-network-address must be set if supports-observer is true")
Expand Down
5 changes: 5 additions & 0 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,11 @@ func (builder *ObserverServiceBuilder) extraFlags() {
"registerdb-pruning-threshold",
defaultConfig.registerDBPruneThreshold,
fmt.Sprintf("specifies the number of blocks below the latest stored block height to keep in register db. default: %d", defaultConfig.registerDBPruneThreshold))

flags.DurationVar(&builder.rpcConf.WebSocketConfig.InactivityTimeout,
"websocket-inactivity-timeout",
defaultConfig.rpcConf.WebSocketConfig.InactivityTimeout,
"specifies the duration a WebSocket connection can remain open without any active subscriptions before being automatically closed")
}).ValidateFlags(func() error {
if builder.executionDataSyncEnabled {
if builder.executionDataConfig.FetchTimeout <= 0 {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package request
package parser

import (
"fmt"
Expand Down
2 changes: 1 addition & 1 deletion engine/access/rest/http/request/get_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (g *GetEvents) Parse(rawType string, rawStart string, rawEnd string, rawBlo
if rawType == "" {
return fmt.Errorf("event type must be provided")
}
var eventType EventType
var eventType parser.EventType
err = eventType.Parse(rawType)
if err != nil {
return err
Expand Down
8 changes: 7 additions & 1 deletion engine/access/rest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,13 @@ func NewServer(serverAPI access.API,
builder.AddLegacyWebsocketsRoutes(stateStreamApi, chain, stateStreamConfig, config.MaxRequestSize)
}

dataProviderFactory := dp.NewDataProviderFactory(logger, stateStreamApi, serverAPI)
dataProviderFactory := dp.NewDataProviderFactory(
logger,
stateStreamApi,
serverAPI,
chain,
stateStreamConfig.EventFilterConfig,
stateStreamConfig.HeartbeatInterval)
builder.AddWebsocketsRoute(chain, wsConfig, config.MaxRequestSize, dataProviderFactory)

c := cors.New(cors.Options{
Expand Down
8 changes: 7 additions & 1 deletion engine/access/rest/websockets/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@ const (
// if the client is slow or unresponsive. This prevents resource exhaustion
// and allows the server to gracefully handle timeouts for delayed writes.
WriteWait = 10 * time.Second

DefaultInactivityTimeout time.Duration = 1 * time.Minute
)

type Config struct {
MaxSubscriptionsPerConnection uint64
MaxResponsesPerSecond uint64
SendMessageTimeout time.Duration
SendMessageTimeout time.Duration // TODO: Do we need this, if we have MaxResponsesPerSecond ?
MaxRequestSize int64
// InactivityTimeout specifies the duration a WebSocket connection can remain open without any active subscriptions
// before being automatically closed
InactivityTimeout time.Duration
}

func NewDefaultWebsocketConfig() Config {
Expand All @@ -44,5 +49,6 @@ func NewDefaultWebsocketConfig() Config {
MaxResponsesPerSecond: 1000,
SendMessageTimeout: 10 * time.Second,
MaxRequestSize: 1024,
InactivityTimeout: DefaultInactivityTimeout,
}
}
58 changes: 57 additions & 1 deletion engine/access/rest/websockets/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync"
"time"

"golang.org/x/time/rate"

"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/rs/zerolog"
Expand All @@ -16,6 +18,7 @@ import (
dp "github.com/onflow/flow-go/engine/access/rest/websockets/data_providers"
"github.com/onflow/flow-go/engine/access/rest/websockets/models"
"github.com/onflow/flow-go/utils/concurrentmap"
"github.com/onflow/flow-go/utils/concurrentticker"
)

type Controller struct {
Expand All @@ -30,6 +33,8 @@ type Controller struct {
dataProviders *concurrentmap.Map[uuid.UUID, dp.DataProvider]
dataProviderFactory dp.DataProviderFactory
dataProvidersGroup *sync.WaitGroup
limiter *rate.Limiter
inactivityTracker *concurrentticker.Ticker
}

func NewWebSocketController(
Expand All @@ -46,6 +51,8 @@ func NewWebSocketController(
dataProviders: concurrentmap.New[uuid.UUID, dp.DataProvider](),
dataProviderFactory: dataProviderFactory,
dataProvidersGroup: &sync.WaitGroup{},
limiter: rate.NewLimiter(rate.Limit(config.MaxResponsesPerSecond), 1),
inactivityTracker: concurrentticker.NewTicker(config.InactivityTimeout),
}
}

Expand All @@ -59,7 +66,7 @@ func (c *Controller) HandleConnection(ctx context.Context) {

err := c.configureKeepalive()
if err != nil {
c.logger.Error().Err(err).Msg("error configuring connection")
c.logger.Error().Err(err).Msg("error configuring keepalive connection")
return
}

Expand All @@ -74,6 +81,9 @@ func (c *Controller) HandleConnection(ctx context.Context) {
g.Go(func() error {
return c.readMessages(gCtx)
})
g.Go(func() error {
return c.monitorInactivity(ctx)
})

if err = g.Wait(); err != nil {
if errors.Is(err, websocket.ErrCloseSent) {
Expand Down Expand Up @@ -113,6 +123,40 @@ func (c *Controller) configureKeepalive() error {
return nil
}

// monitorInactivity periodically checks for inactivity on the connection.
//
// Expected behavior:
// - Terminates when all data providers are unsubscribed.
// - Resets based on activity such as adding/removing subscriptions.
//
// Parameters:
// - ctx: Context to control cancellation and timeouts.
func (c *Controller) monitorInactivity(ctx context.Context) error {
defer c.inactivityTracker.Stop()

for {
select {
case <-ctx.Done():
return nil
case <-c.inactivityTracker.C():
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
if c.dataProviders.Size() == 0 {
// Optionally send a message to the client indicating the reason for closure.
c.logger.Info().Msg("Connection inactive, closing due to timeout.")
return fmt.Errorf("no recent activity for %v", c.config.InactivityTimeout)
}
}
}
}

// checkInactivity checks if there are no active data providers
// and resets the inactivity timer. This function should be called after removing a data provider
// to update the inactivity tracker and ensure it reflects the current state.
func (c *Controller) checkInactivity() {
if c.dataProviders.Size() == 0 {
c.inactivityTracker.Reset(c.config.InactivityTimeout)
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
}
}

// keepalive sends a ping message periodically to keep the WebSocket connection alive
// and avoid timeouts.
//
Expand Down Expand Up @@ -164,6 +208,16 @@ func (c *Controller) writeMessages(ctx context.Context) error {
return fmt.Errorf("multiplexed stream closed")
}

// wait for the rate limiter to allow the next message write.
if err := c.limiter.WaitN(ctx, 1); err != nil {
return fmt.Errorf("rate limiter wait failed: %w", err)
}

// Specifies a timeout for the write operation. If the write
// isn't completed within this duration, it fails with a timeout error.
// SetWriteDeadline ensures the write operation does not block indefinitely
// if the client is slow or unresponsive. This prevents resource exhaustion
// and allows the server to gracefully handle timeouts for delayed writes.
if err := c.conn.SetWriteDeadline(time.Now().Add(WriteWait)); err != nil {
return fmt.Errorf("failed to set the write deadline: %w", err)
}
Expand Down Expand Up @@ -292,6 +346,7 @@ func (c *Controller) handleSubscribe(ctx context.Context, msg models.SubscribeMe

c.dataProvidersGroup.Done()
c.dataProviders.Remove(provider.ID())
c.checkInactivity()
}()
}

Expand Down Expand Up @@ -327,6 +382,7 @@ func (c *Controller) handleUnsubscribe(ctx context.Context, msg models.Unsubscri
}

c.dataProviders.Remove(id)
c.checkInactivity()

responseOk := models.UnsubscribeMessageResponse{
BaseMessageResponse: models.BaseMessageResponse{
Expand Down
115 changes: 111 additions & 4 deletions engine/access/rest/websockets/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,19 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

streammock "github.com/onflow/flow-go/engine/access/state_stream/mock"

"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

dpmock "github.com/onflow/flow-go/engine/access/rest/websockets/data_providers/mock"
connmock "github.com/onflow/flow-go/engine/access/rest/websockets/mock"
"github.com/onflow/flow-go/engine/access/rest/websockets/models"
"github.com/onflow/flow-go/engine/access/state_stream/backend"
streammock "github.com/onflow/flow-go/engine/access/state_stream/mock"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/utils/unittest"
)
Expand Down Expand Up @@ -844,6 +842,36 @@ func (s *WsControllerSuite) TestControllerShutdown() {

conn.AssertExpectations(t)
})

s.T().Run("Inactivity tracking", func(t *testing.T) {
peterargue marked this conversation as resolved.
Show resolved Hide resolved
t.Parallel()

conn := connmock.NewWebsocketConnection(t)
conn.On("Close").Return(nil).Once()
conn.On("SetReadDeadline", mock.Anything).Return(nil).Once()
conn.On("SetPongHandler", mock.AnythingOfType("func(string) error")).Return(nil).Once()

factory := dpmock.NewDataProviderFactory(t)
// Mock with short inactivity timeout for testing
wsConfig := s.wsConfig

wsConfig.InactivityTimeout = 50 * time.Millisecond
controller := NewWebSocketController(s.logger, wsConfig, conn, factory)

conn.
UlyanaAndrukhiv marked this conversation as resolved.
Show resolved Hide resolved
On("ReadJSON", mock.Anything).
Return(func(interface{}) error {
// wait on read
<-time.After(wsConfig.InactivityTimeout + 10)
return websocket.ErrCloseSent
}).
Once()

controller.HandleConnection(context.Background())
time.Sleep(wsConfig.InactivityTimeout)

conn.AssertExpectations(t)
})
}

func (s *WsControllerSuite) TestKeepaliveRoutine() {
Expand Down Expand Up @@ -920,6 +948,85 @@ func (s *WsControllerSuite) TestKeepaliveRoutine() {
})
}

// TestMonitorInactivity verifies that monitorInactivity returns an error
// when the WebSocket connection has no subscriptions for the configured inactivity timeout.
func (s *WsControllerSuite) TestMonitorInactivity() {
conn := connmock.NewWebsocketConnection(s.T())
factory := dpmock.NewDataProviderFactory(s.T())

// Mock with short inactivity timeout for testing
wsConfig := s.wsConfig

wsConfig.InactivityTimeout = 50 * time.Millisecond
controller := NewWebSocketController(s.logger, wsConfig, conn, factory)

err := controller.monitorInactivity(context.Background())
s.Require().Error(err)
s.Require().Equal(err, fmt.Errorf("no recent activity for %v", wsConfig.InactivityTimeout))

conn.AssertExpectations(s.T())
}

// TestRateLimiter tests the rate-limiting functionality of the WebSocket controller.
//
// Test Steps:
// 1. Create a mock WebSocket connection with behavior for `SetWriteDeadline` and `WriteJSON`.
// 2. Configure the WebSocket controller with a rate limit of 2 responses per second.
// 3. Simulate sending messages to the `multiplexedStream` channel.
// 4. Collect timestamps of message writes to verify rate-limiting behavior.
// 5. Assert that all messages are processed and that the delay between messages respects the configured rate limit.
//
// The test ensures that:
// - The number of messages processed matches the total messages sent.
// - The delay between consecutive messages falls within the expected range based on the rate limit, with a tolerance of 5ms.
func (s *WsControllerSuite) TestRateLimiter() {
s.T().Run("Enforces response rate limit", func(t *testing.T) {
totalMessages := 5 // Number of messages to simulate.

// Step 1: Create a mock WebSocket connection.
conn := connmock.NewWebsocketConnection(t)
conn.On("SetWriteDeadline", mock.Anything).Return(nil).Times(totalMessages)

// Step 2: Configure the WebSocket controller with a rate limit.
config := NewDefaultWebsocketConfig()
config.MaxResponsesPerSecond = 2 // 2 messages per second.
controller := NewWebSocketController(s.logger, config, conn, nil)

// Step 3: Simulate sending messages to the controller's `multiplexedStream`.
go func() {
for i := 0; i < totalMessages; i++ {
controller.multiplexedStream <- map[string]interface{}{
"message": i,
}
}
close(controller.multiplexedStream)
}()

// Step 4: Collect timestamps of message writes for verification.
var timestamps []time.Time
conn.On("WriteJSON", mock.Anything).Run(func(args mock.Arguments) {
timestamps = append(timestamps, time.Now())
}).Return(nil).Times(totalMessages)

// Invoke the `writeMessages` method to process the stream.
_ = controller.writeMessages(context.Background())

// Step 5: Verify that all messages are processed.
require.Len(t, timestamps, totalMessages, "All messages should be processed")

// Calculate the expected delay between messages based on the rate limit.
expectedDelay := time.Second / time.Duration(config.MaxResponsesPerSecond)
const tolerance = 5 * time.Millisecond // Allow up to 5ms deviation.

// Step 6: Assert that the delays respect the rate limit with tolerance.
for i := 1; i < len(timestamps); i++ {
delay := timestamps[i].Sub(timestamps[i-1])
assert.GreaterOrEqual(t, delay, expectedDelay-tolerance, "Messages should respect the minimum rate limit")
assert.LessOrEqual(t, delay, expectedDelay+tolerance, "Messages should respect the maximum rate limit")
}
})
}

// newControllerMocks initializes mock WebSocket connection, data provider, and data provider factory.
// The mocked functions are expected to be called in a case when a test is expected to reach WriteJSON function.
func newControllerMocks(t *testing.T) (*connmock.WebsocketConnection, *dpmock.DataProviderFactory, *dpmock.DataProvider) {
Expand Down
Loading
Loading