From 2fc94e06c03d8f4fca7ac616c262242730832cf1 Mon Sep 17 00:00:00 2001 From: Iulian Vasile Baba Date: Sat, 5 Apr 2025 08:35:44 +0200 Subject: [PATCH] Add heartbeat to check client connection --- config/config-dev.yml | 2 +- config/config.go | 4 +- publisher/cmd/publisher/main.go | 4 +- .../handlers/{env.go => dependencies.go} | 2 +- publisher/internal/handlers/publisher.go | 78 +++++++++++++++---- publisher/internal/handlers/publisher_test.go | 14 ++-- publisher/internal/mq/publisher.go | 2 +- publisher/internal/mq/publisher_test.go | 8 +- 8 files changed, 82 insertions(+), 32 deletions(-) rename publisher/internal/handlers/{env.go => dependencies.go} (82%) diff --git a/config/config-dev.yml b/config/config-dev.yml index 4513bba..7d48752 100644 --- a/config/config-dev.yml +++ b/config/config-dev.yml @@ -1,2 +1,2 @@ nats: - host: 'nats://realtime-messaging-nats:4222' \ No newline at end of file + host: 'nats://realtime-messaging-nats:4222' diff --git a/config/config.go b/config/config.go index 7fb86d7..192423f 100644 --- a/config/config.go +++ b/config/config.go @@ -10,7 +10,7 @@ import ( type Config struct { Nats struct { - Host string `yaml:"host"` + Host string `yaml:"host"` } } @@ -47,4 +47,4 @@ func getStage() string { } return stage -} \ No newline at end of file +} diff --git a/publisher/cmd/publisher/main.go b/publisher/cmd/publisher/main.go index 15808db..2b47054 100644 --- a/publisher/cmd/publisher/main.go +++ b/publisher/cmd/publisher/main.go @@ -20,11 +20,11 @@ func main() { log.Fatalf("failed to connect to NATS server: %v", err) } - env := &handlers.Env{ + deps := &handlers.Dependencies{ PublisherClient: publisherClient, } - http.HandleFunc("/publish", env.PublisherHandler) + http.HandleFunc("/publish", deps.PublisherHandler) log.Println("publisher service running on :3000") err = http.ListenAndServe(":3000", nil) log.Fatalf("failed to start publisher service: %v", err) diff --git a/publisher/internal/handlers/env.go b/publisher/internal/handlers/dependencies.go similarity index 82% rename from publisher/internal/handlers/env.go rename to publisher/internal/handlers/dependencies.go index df55ad7..54a2d34 100644 --- a/publisher/internal/handlers/env.go +++ b/publisher/internal/handlers/dependencies.go @@ -2,6 +2,6 @@ package handlers import "github.com/iulian509/realtime-messaging/publisher/internal/mq" -type Env struct { +type Dependencies struct { PublisherClient *mq.Publisher } diff --git a/publisher/internal/handlers/publisher.go b/publisher/internal/handlers/publisher.go index f78e34b..b8a8738 100644 --- a/publisher/internal/handlers/publisher.go +++ b/publisher/internal/handlers/publisher.go @@ -1,15 +1,24 @@ package handlers import ( + "context" "log" "net/http" + "time" "github.com/gorilla/websocket" + "github.com/iulian509/realtime-messaging/publisher/internal/mq" +) + +const ( + pingPeriod = 30 * time.Second + pongWait = 60 * time.Second + writeWait = 30 * time.Second ) var upgrader = websocket.Upgrader{} -func (env *Env) PublisherHandler(w http.ResponseWriter, r *http.Request) { +func (deps *Dependencies) PublisherHandler(w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Printf("WebSocket upgrade failed: %v", err) @@ -17,25 +26,66 @@ func (env *Env) PublisherHandler(w http.ResponseWriter, r *http.Request) { } defer conn.Close() - log.Println("client connected to WebSocket") + log.Println("client connected to Publisher WebSocket") + + conn.SetReadDeadline(time.Now().Add(pongWait)) + conn.SetPongHandler(func(appData string) error { + conn.SetReadDeadline(time.Now().Add(pongWait)) + return nil + }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go pingConnection(ctx, cancel, conn) + + processMessages(ctx, cancel, conn, deps.PublisherClient) +} + +func pingConnection(ctx context.Context, cancel context.CancelFunc, conn *websocket.Conn) { + ticker := time.NewTicker(pingPeriod) + defer ticker.Stop() for { - _, message, err := conn.ReadMessage() - if err != nil { - if websocket.IsUnexpectedCloseError(err) { - log.Printf("WebSocket connection closed unexpectedly: %v", err) - } else { - log.Printf("WebSocket read error: %v", err) + select { + case <-ticker.C: + err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait)) + if err != nil { + log.Println("Ping error:", err) + cancel() + conn.Close() + return } + case <-ctx.Done(): return } - log.Printf("received message: %s", message) + } +} + +func processMessages(ctx context.Context, cancel context.CancelFunc, conn *websocket.Conn, publisherClient *mq.Publisher) { + for { + select { + case <-ctx.Done(): + return + default: + _, message, err := conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err) { + log.Printf("WebSocket connection closed unexpectedly: %v", err) + } else { + log.Printf("WebSocket read error: %v", err) + } + cancel() + return + } + log.Printf("received message: %s", message) - err = env.PublisherClient.PublishMessage(message) - if err != nil { - log.Printf("error publishing message: %v", err) - } else { - log.Printf("published message: %s", message) + err = publisherClient.PublishMessage(message) + if err != nil { + log.Printf("error publishing message: %v", err) + } else { + log.Printf("published message: %s", message) + } } } } diff --git a/publisher/internal/handlers/publisher_test.go b/publisher/internal/handlers/publisher_test.go index 60570ee..d83bde6 100644 --- a/publisher/internal/handlers/publisher_test.go +++ b/publisher/internal/handlers/publisher_test.go @@ -25,11 +25,11 @@ func TestPublisherHandler(t *testing.T) { // Set up mocks mockedConn := new(MockNATSConnection) publisher := &mq.Publisher{Conn: mockedConn} - env := &Env{PublisherClient: publisher} + deps := &Dependencies{PublisherClient: publisher} // Set up test handler and server handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - env.PublisherHandler(w, r) + deps.PublisherHandler(w, r) }) server := httptest.NewServer(handler) defer server.Close() @@ -41,8 +41,8 @@ func TestPublisherHandler(t *testing.T) { defer ws.Close() const ( - publish = "Publish" - subject = "subject" + publish = "Publish" + subject = "subject" testMessage = "test message" ) @@ -51,9 +51,9 @@ func TestPublisherHandler(t *testing.T) { err = ws.WriteMessage(websocket.TextMessage, []byte(testMessage)) assert.NoError(t, err) - + // Wait for the handler to process the message - time.Sleep(100*time.Millisecond) + time.Sleep(100 * time.Millisecond) mockedConn.AssertExpectations(t) }) @@ -64,7 +64,7 @@ func TestPublisherHandler(t *testing.T) { assert.NoError(t, err, "message writing should succeed even if NATS publish fails") // Wait for the handler to process the message - time.Sleep(100*time.Millisecond) + time.Sleep(100 * time.Millisecond) mockedConn.AssertExpectations(t) }) diff --git a/publisher/internal/mq/publisher.go b/publisher/internal/mq/publisher.go index 79353f7..f9a3aad 100644 --- a/publisher/internal/mq/publisher.go +++ b/publisher/internal/mq/publisher.go @@ -21,7 +21,7 @@ func NewPublisher(url string) (*Publisher, error) { return &Publisher{Conn: nc}, nil } -func (p *Publisher) PublishMessage(message []byte) error{ +func (p *Publisher) PublishMessage(message []byte) error { err := p.Conn.Publish("subject", message) if err != nil { return err diff --git a/publisher/internal/mq/publisher_test.go b/publisher/internal/mq/publisher_test.go index 418b30e..54e8663 100644 --- a/publisher/internal/mq/publisher_test.go +++ b/publisher/internal/mq/publisher_test.go @@ -26,8 +26,8 @@ func TestPublishMessage(t *testing.T) { publisher := &Publisher{Conn: mockedConn} const ( - publish = "Publish" - subject = "subject" + publish = "Publish" + subject = "subject" testMessage = "test message" ) @@ -38,11 +38,11 @@ func TestPublishMessage(t *testing.T) { assert.NoError(t, err) mockedConn.AssertExpectations(t) }) - t.Run("failing publish", func(t *testing.T){ + t.Run("failing publish", func(t *testing.T) { mockedConn.On(publish, subject, []byte(testMessage)).Return(assert.AnError).Once() err := publisher.PublishMessage([]byte(testMessage)) assert.Error(t, err) mockedConn.AssertExpectations(t) }) -} \ No newline at end of file +}