Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Add unit test for websocket controller #6762

Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ generate-mocks: install-mock-generators
mockery --name 'Storage' --dir=module/executiondatasync/tracker --case=underscore --output="module/executiondatasync/tracker/mock" --outpkg="mocktracker"
mockery --name 'ScriptExecutor' --dir=module/execution --case=underscore --output="module/execution/mock" --outpkg="mock"
mockery --name 'StorageSnapshot' --dir=fvm/storage/snapshot --case=underscore --output="fvm/storage/snapshot/mock" --outpkg="mock"
mockery --name 'WebsocketConnection' --dir=engine/access/rest/websockets --case=underscore --output="engine/access/rest/websockets/mock" --outpkg="mock"

#temporarily make insecure/ a non-module to allow mockery to create mocks
mv insecure/go.mod insecure/go2.mod
Expand Down
42 changes: 42 additions & 0 deletions engine/access/rest/websockets/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package websockets

import (
"github.com/gorilla/websocket"
)

// We wrap gorilla's websocket connection with interface
// to be able to mock it in order to test the types dependent on it

type WebsocketConnection interface {
illia-malachyn marked this conversation as resolved.
Show resolved Hide resolved
ReadJSON(v interface{}) error
WriteJSON(v interface{}) error
Close() error
}

type GorillaWebsocketConnection struct {
illia-malachyn marked this conversation as resolved.
Show resolved Hide resolved
conn *websocket.Conn
}

func NewGorillaWebsocketConnection(conn *websocket.Conn) *GorillaWebsocketConnection {
return &GorillaWebsocketConnection{
conn: conn,
}
}

var _ WebsocketConnection = (*GorillaWebsocketConnection)(nil)

func (m *GorillaWebsocketConnection) ReadJSON(v interface{}) error {
return m.conn.ReadJSON(v)
}

func (m *GorillaWebsocketConnection) WriteJSON(v interface{}) error {
return m.conn.WriteJSON(v)
}

func (m *GorillaWebsocketConnection) SetCloseHandler(handler func(code int, text string) error) {
m.conn.SetCloseHandler(handler)
}

func (m *GorillaWebsocketConnection) Close() error {
return m.conn.Close()
}
108 changes: 66 additions & 42 deletions engine/access/rest/websockets/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
type Controller struct {
logger zerolog.Logger
config Config
conn *websocket.Conn
conn WebsocketConnection
communicationChannel chan interface{}
dataProviders *concurrentmap.Map[uuid.UUID, dp.DataProvider]
dataProviderFactory dp.DataProviderFactory
Expand All @@ -26,8 +26,8 @@ type Controller struct {
func NewWebSocketController(
logger zerolog.Logger,
config Config,
conn *websocket.Conn,
dataProviderFactory dp.DataProviderFactory,
conn WebsocketConnection,
) *Controller {
return &Controller{
logger: logger.With().Str("component", "websocket-controller").Logger(),
Expand All @@ -43,59 +43,69 @@ func NewWebSocketController(
func (c *Controller) HandleConnection(ctx context.Context) {
//TODO: configure the connection with ping-pong and deadlines
//TODO: spin up a response limit tracker routine
go c.readMessagesFromClient(ctx)
c.writeMessagesToClient(ctx)
defer c.shutdownConnection()
go c.readMessages(ctx)
c.writeMessages(ctx)
}

// writeMessagesToClient reads a messages from communication channel and passes them on to a client WebSocket connection.
// writeMessages reads a messages from communication channel and passes them on to a client WebSocket connection.
// The communication channel is filled by data providers. Besides, the response limit tracker is involved in
// write message regulation
func (c *Controller) writeMessagesToClient(ctx context.Context) {
//TODO: can it run forever? maybe we should cancel the ctx in the reader routine
func (c *Controller) writeMessages(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case msg := <-c.communicationChannel:
case msg, ok := <-c.communicationChannel:
if !ok {
return
}
c.logger.Debug().Msgf("read message from communication channel: %s", msg)

// TODO: handle 'response per second' limits

err := c.conn.WriteJSON(msg)
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) ||
websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
return
}

c.logger.Error().Err(err).Msg("error writing to connection")
return
}

c.logger.Debug().Msg("written message to client")
}
}
}

// readMessagesFromClient continuously reads messages from a client WebSocket connection,
// readMessages continuously reads messages from a client WebSocket connection,
// processes each message, and handles actions based on the message type.
func (c *Controller) readMessagesFromClient(ctx context.Context) {
defer c.shutdownConnection()

func (c *Controller) readMessages(ctx context.Context) {
for {
select {
case <-ctx.Done():
illia-malachyn marked this conversation as resolved.
Show resolved Hide resolved
c.logger.Info().Msg("context canceled, stopping read message loop")
return
default:
msg, err := c.readMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseAbnormalClosure) {
return
}
c.logger.Warn().Err(err).Msg("error reading message from client")
msg, err := c.readMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseAbnormalClosure) ||
websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
return
}

baseMsg, validatedMsg, err := c.parseAndValidateMessage(msg)
if err != nil {
c.logger.Debug().Err(err).Msg("error parsing and validating client message")
return
}
c.logger.Debug().Err(err).Msg("error reading message from client")
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what type of errors are we allowing here? does it make sense to just close the connection?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will be done later in PR #6798

See diff https://github.com/onflow/flow-go/pull/6798/files#diff-3dc4a79384159575e2c161dbc797a45dea7b38850629edef25b4a9a1cbc2533aR183-R196

It will notify the client, that something went wrong with the incoming message

}

if err := c.handleAction(ctx, validatedMsg); err != nil {
c.logger.Warn().Err(err).Str("action", baseMsg.Action).Msg("error handling action")
}
baseMsg, validatedMsg, err := c.parseAndValidateMessage(msg)
if err != nil {
c.logger.Debug().Err(err).Msg("error parsing and validating client message")
//TODO: write error to error channel
continue
}

if err := c.handleAction(ctx, validatedMsg); err != nil {
c.logger.Debug().Err(err).Str("action", baseMsg.Action).Msg("error handling action")
//TODO: write error to error channel
continue
}
}
}
Expand All @@ -105,6 +115,7 @@ func (c *Controller) readMessage() (json.RawMessage, error) {
if err := c.conn.ReadJSON(&message); err != nil {
return nil, fmt.Errorf("error reading JSON from client: %w", err)
}

return message, nil
}

Expand Down Expand Up @@ -169,8 +180,20 @@ func (c *Controller) handleSubscribe(ctx context.Context, msg models.SubscribeMe

c.dataProviders.Add(dp.ID(), dp)

//TODO: return OK response to client
c.communicationChannel <- msg
// firstly, we want to write OK response to client and only after that we can start providing actual data
response := models.SubscribeMessageResponse{
BaseMessageResponse: models.BaseMessageResponse{
Success: true,
},
Topic: dp.Topic(),
ID: dp.ID().String(),
}

select {
case <-ctx.Done():
return
case c.communicationChannel <- response:
}

go func() {
err := dp.Run()
Expand Down Expand Up @@ -202,20 +225,21 @@ func (c *Controller) handleListSubscriptions(ctx context.Context, msg models.Lis
}

func (c *Controller) shutdownConnection() {
defer close(c.communicationChannel)
defer func(conn *websocket.Conn) {
defer func() {
if err := c.conn.Close(); err != nil {
c.logger.Error().Err(err).Msg("error closing connection")
c.logger.Warn().Err(err).Msg("error closing connection")
}
}(c.conn)
}()

err := c.dataProviders.ForEach(func(_ uuid.UUID, dp dp.DataProvider) error {
dp.Close()
c.logger.Debug().Msg("shutting down connection")

_ = c.dataProviders.ForEach(func(id uuid.UUID, dp dp.DataProvider) error {
err := dp.Close()
c.logger.Error().Err(err).
Str("data_provider", id.String()).
Msg("error closing data provider")
return nil
})
if err != nil {
c.logger.Error().Err(err).Msg("error closing data provider")
}

c.dataProviders.Clear()
}
Loading
Loading