Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/config-dev.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
nats:
host: 'nats://realtime-messaging-nats:4222'
host: 'nats://realtime-messaging-nats:4222'
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

type Config struct {
Nats struct {
Host string `yaml:"host"`
Host string `yaml:"host"`
}
}

Expand Down Expand Up @@ -47,4 +47,4 @@ func getStage() string {
}

return stage
}
}
4 changes: 2 additions & 2 deletions publisher/cmd/publisher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ func main() {
log.Fatalf("failed to connect to NATS server: %v", err)
}

env := &handlers.Env{
deps := &handlers.Dependencies{
PublisherClient: publisherClient,
}

http.HandleFunc("/publish", env.PublisherHandler)
http.HandleFunc("/publish", deps.PublisherHandler)
log.Println("publisher service running on :3000")
err = http.ListenAndServe(":3000", nil)
log.Fatalf("failed to start publisher service: %v", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ package handlers

import "github.com/iulian509/realtime-messaging/publisher/internal/mq"

type Env struct {
type Dependencies struct {
PublisherClient *mq.Publisher
}
78 changes: 64 additions & 14 deletions publisher/internal/handlers/publisher.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,91 @@
package handlers

import (
"context"
"log"
"net/http"
"time"

"github.com/gorilla/websocket"
"github.com/iulian509/realtime-messaging/publisher/internal/mq"
)

const (
pingPeriod = 30 * time.Second
pongWait = 60 * time.Second
writeWait = 30 * time.Second
)

var upgrader = websocket.Upgrader{}

func (env *Env) PublisherHandler(w http.ResponseWriter, r *http.Request) {
func (deps *Dependencies) PublisherHandler(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("WebSocket upgrade failed: %v", err)
return
}
defer conn.Close()

log.Println("client connected to WebSocket")
log.Println("client connected to Publisher WebSocket")

conn.SetReadDeadline(time.Now().Add(pongWait))
conn.SetPongHandler(func(appData string) error {
conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go pingConnection(ctx, cancel, conn)

processMessages(ctx, cancel, conn, deps.PublisherClient)
}

func pingConnection(ctx context.Context, cancel context.CancelFunc, conn *websocket.Conn) {
ticker := time.NewTicker(pingPeriod)
defer ticker.Stop()

for {
_, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err) {
log.Printf("WebSocket connection closed unexpectedly: %v", err)
} else {
log.Printf("WebSocket read error: %v", err)
select {
case <-ticker.C:
err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait))
if err != nil {
log.Println("Ping error:", err)
cancel()
conn.Close()
return
}
case <-ctx.Done():
return
}
log.Printf("received message: %s", message)
}
}

func processMessages(ctx context.Context, cancel context.CancelFunc, conn *websocket.Conn, publisherClient *mq.Publisher) {
for {
select {
case <-ctx.Done():
return
default:
_, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err) {
log.Printf("WebSocket connection closed unexpectedly: %v", err)
} else {
log.Printf("WebSocket read error: %v", err)
}
cancel()
return
}
log.Printf("received message: %s", message)

err = env.PublisherClient.PublishMessage(message)
if err != nil {
log.Printf("error publishing message: %v", err)
} else {
log.Printf("published message: %s", message)
err = publisherClient.PublishMessage(message)
if err != nil {
log.Printf("error publishing message: %v", err)
} else {
log.Printf("published message: %s", message)
}
}
}
}
14 changes: 7 additions & 7 deletions publisher/internal/handlers/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ func TestPublisherHandler(t *testing.T) {
// Set up mocks
mockedConn := new(MockNATSConnection)
publisher := &mq.Publisher{Conn: mockedConn}
env := &Env{PublisherClient: publisher}
deps := &Dependencies{PublisherClient: publisher}

// Set up test handler and server
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
env.PublisherHandler(w, r)
deps.PublisherHandler(w, r)
})
server := httptest.NewServer(handler)
defer server.Close()
Expand All @@ -41,8 +41,8 @@ func TestPublisherHandler(t *testing.T) {
defer ws.Close()

const (
publish = "Publish"
subject = "subject"
publish = "Publish"
subject = "subject"
testMessage = "test message"
)

Expand All @@ -51,9 +51,9 @@ func TestPublisherHandler(t *testing.T) {

err = ws.WriteMessage(websocket.TextMessage, []byte(testMessage))
assert.NoError(t, err)

// Wait for the handler to process the message
time.Sleep(100*time.Millisecond)
time.Sleep(100 * time.Millisecond)

mockedConn.AssertExpectations(t)
})
Expand All @@ -64,7 +64,7 @@ func TestPublisherHandler(t *testing.T) {
assert.NoError(t, err, "message writing should succeed even if NATS publish fails")

// Wait for the handler to process the message
time.Sleep(100*time.Millisecond)
time.Sleep(100 * time.Millisecond)

mockedConn.AssertExpectations(t)
})
Expand Down
2 changes: 1 addition & 1 deletion publisher/internal/mq/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func NewPublisher(url string) (*Publisher, error) {
return &Publisher{Conn: nc}, nil
}

func (p *Publisher) PublishMessage(message []byte) error{
func (p *Publisher) PublishMessage(message []byte) error {
err := p.Conn.Publish("subject", message)
if err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions publisher/internal/mq/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func TestPublishMessage(t *testing.T) {
publisher := &Publisher{Conn: mockedConn}

const (
publish = "Publish"
subject = "subject"
publish = "Publish"
subject = "subject"
testMessage = "test message"
)

Expand All @@ -38,11 +38,11 @@ func TestPublishMessage(t *testing.T) {
assert.NoError(t, err)
mockedConn.AssertExpectations(t)
})
t.Run("failing publish", func(t *testing.T){
t.Run("failing publish", func(t *testing.T) {
mockedConn.On(publish, subject, []byte(testMessage)).Return(assert.AnError).Once()

err := publisher.PublishMessage([]byte(testMessage))
assert.Error(t, err)
mockedConn.AssertExpectations(t)
})
}
}