-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: solver exponential backoff #450
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -2,76 +2,152 @@ package http | |||||
|
||||||
import ( | ||||||
"context" | ||||||
"errors" | ||||||
"fmt" | ||||||
"math" | ||||||
"sync" | ||||||
"syscall" | ||||||
"time" | ||||||
|
||||||
"github.com/gorilla/websocket" | ||||||
"github.com/rs/zerolog/log" | ||||||
) | ||||||
|
||||||
const ( | ||||||
initialDelay = 1.0 | ||||||
maxAttempts = 10 | ||||||
exponential = 1.2 | ||||||
) | ||||||
|
||||||
// ConnectWebSocket establishes a new WebSocket connection | ||||||
func ConnectWebSocket( | ||||||
url string, | ||||||
ctx context.Context, | ||||||
) chan []byte { | ||||||
connectFactory := func() *websocket.Conn { | ||||||
for { | ||||||
func ConnectWebSocket(ctx context.Context, url string) (chan []byte, error) { | ||||||
connectFactory := func() (*websocket.Conn, error) { | ||||||
currentBackoff := 0.0 | ||||||
for attempt := 0; attempt < maxAttempts; attempt++ { | ||||||
select { | ||||||
case <-ctx.Done(): | ||||||
return nil, fmt.Errorf("websocket connection canceled: %w", ctx.Err()) | ||||||
default: | ||||||
} | ||||||
|
||||||
log.Debug().Msgf("WebSocket connection connecting: %s", url) | ||||||
conn, _, err := websocket.DefaultDialer.Dial(url, nil) | ||||||
if err != nil { | ||||||
log.Error().Msgf("WebSocket connection failed: %s\nReconnecting in 2 seconds...", err) | ||||||
time.Sleep(2 * time.Second) | ||||||
continue | ||||||
switch { | ||||||
case websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure): | ||||||
log.Info().Msg("Solver service closed connection") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At the moment, we only use this helper function to connect to the solver. But we might re-use it in other contexts, so we may want to reword these errors to something like "Websocket server closed connection". There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, we may want to log errors we where we will attempt again at the |
||||||
return nil, fmt.Errorf("solver connection closed: %w", err) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will returning here cause us to give up on our connection attempts? |
||||||
case errors.Is(err, syscall.ECONNREFUSED): | ||||||
log.Info().Msg("Solver service appears to be down") | ||||||
return nil, fmt.Errorf("solver service unavailable: %w", err) | ||||||
default: | ||||||
log.Error().Msgf("WebSocket connection failed: %s\nReconnecting in %.3f seconds...", err, currentBackoff) | ||||||
timer := time.NewTimer(time.Duration(currentBackoff * float64(time.Second))) | ||||||
|
||||||
select { | ||||||
case <-ctx.Done(): | ||||||
if !timer.Stop() { | ||||||
<-timer.C | ||||||
} | ||||||
return nil, fmt.Errorf("websocket connection canceled during backoff: %w", ctx.Err()) | ||||||
case <-timer.C: | ||||||
} | ||||||
|
||||||
currentBackoff += initialDelay * math.Pow(exponential, float64(attempt)) | ||||||
continue | ||||||
} | ||||||
} | ||||||
|
||||||
conn.SetPongHandler(nil) | ||||||
return conn | ||||||
return conn, nil | ||||||
} | ||||||
return nil, fmt.Errorf("maximum connection attempts (%d) reached", maxAttempts) | ||||||
} | ||||||
|
||||||
pingInterval := time.NewTicker(time.Second * 5) | ||||||
connLk := &sync.Mutex{} | ||||||
responseCh := make(chan []byte) | ||||||
errCh := make(chan error) | ||||||
responseCh := make(chan []byte, 100) | ||||||
errCh := make(chan error, 1) | ||||||
|
||||||
readMessage := func(conn *websocket.Conn) { | ||||||
defer func() { | ||||||
conn.Close() | ||||||
close(responseCh) | ||||||
}() | ||||||
for { | ||||||
messageType, p, err := conn.ReadMessage() | ||||||
if err != nil { | ||||||
errCh <- err | ||||||
select { | ||||||
case <-ctx.Done(): | ||||||
log.Info().Msg("Exiting readMessage loop due to context cancellation.") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
This detail is a bit internal, would suggest dropping the log level to |
||||||
return | ||||||
} | ||||||
if messageType == websocket.TextMessage { | ||||||
log.Debug(). | ||||||
Str("action", "ws READ"). | ||||||
Str("payload", string(p)). | ||||||
Msgf("") | ||||||
responseCh <- p | ||||||
default: | ||||||
messageType, p, err := conn.ReadMessage() | ||||||
if err != nil { | ||||||
errCh <- err | ||||||
return | ||||||
} | ||||||
if messageType == websocket.TextMessage { | ||||||
log.Debug(). | ||||||
Str("action", "ws READ"). | ||||||
Str("payload", string(p)). | ||||||
Msg("") | ||||||
responseCh <- p | ||||||
} | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
conn := connectFactory() | ||||||
conn, err := connectFactory() | ||||||
if err != nil { | ||||||
log.Err(err).Msg("Error in WebSocket connection.") | ||||||
return nil, err | ||||||
} | ||||||
|
||||||
go readMessage(conn) | ||||||
|
||||||
go func() { | ||||||
defer func() { | ||||||
pingInterval.Stop() | ||||||
connLk.Lock() | ||||||
if conn != nil { | ||||||
conn.Close() | ||||||
} | ||||||
connLk.Unlock() | ||||||
}() | ||||||
for { | ||||||
select { | ||||||
case <-ctx.Done(): | ||||||
log.Info().Msg("Ping loop exiting due to context cancellation.") | ||||||
return | ||||||
case <-pingInterval.C: | ||||||
connLk.Lock() | ||||||
log.Trace().Msg("send ping message") | ||||||
if err := conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil { | ||||||
log.Err(err).Msg("sending ping message") | ||||||
connLk.Unlock() | ||||||
continue | ||||||
if conn != nil { | ||||||
if err := conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil { | ||||||
log.Err(err).Msg("Error sending ping message.") | ||||||
select { | ||||||
case errCh <- err: | ||||||
default: | ||||||
} | ||||||
} | ||||||
} | ||||||
connLk.Unlock() | ||||||
case err := <-errCh: | ||||||
log.Err(err).Msg("websocket error") | ||||||
log.Err(err).Msg("WebSocket error detected.") | ||||||
connLk.Lock() | ||||||
conn = connectFactory() | ||||||
if conn != nil { | ||||||
conn.Close() | ||||||
} | ||||||
newConn, err := connectFactory() | ||||||
if err != nil { | ||||||
log.Err(err).Msg("Failed to reconnect WebSocket.") | ||||||
connLk.Unlock() | ||||||
return | ||||||
} | ||||||
conn = newConn | ||||||
connLk.Unlock() | ||||||
go readMessage(conn) | ||||||
go readMessage(newConn) | ||||||
} | ||||||
} | ||||||
}() | ||||||
return responseCh | ||||||
|
||||||
return responseCh, nil | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Could we rename this to
exponent
?