Skip to content

Commit 70ef1f9

Browse files
m110thejoeejoee
authored andcommitted
SSE htmx example (ThreeDotsLabs#435)
* SSE htmx example
1 parent 8685d31 commit 70ef1f9

File tree

18 files changed

+2097
-49
lines changed

18 files changed

+2097
-49
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
FROM golang:1.22 AS builder
2+
3+
COPY . /src
4+
WORKDIR /src/
5+
6+
RUN CGO_ENABLED=0 go build -ldflags="-s -w" -trimpath -o /main .
7+
8+
FROM alpine
9+
RUN apk add --no-cache ca-certificates
10+
COPY --from=builder /main /main
11+
CMD ["/main"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
version: '3.7'
2+
services:
3+
server:
4+
build:
5+
context: docker
6+
volumes:
7+
- ./:/src
8+
- go_pkg:/go/pkg
9+
- go_cache:/go-cache
10+
working_dir: /src
11+
ports:
12+
- '8080:8080'
13+
environment:
14+
- PORT=8080
15+
- DATABASE_URL=postgres://postgres:postgres@postgres:5432/postgres?sslmode=disable
16+
- PUBSUB_PROJECT_ID=local
17+
- PUBSUB_EMULATOR_HOST=pubsub:8681
18+
restart: unless-stopped
19+
networks:
20+
- sse
21+
22+
postgres:
23+
image: postgres:15
24+
restart: unless-stopped
25+
environment:
26+
- POSTGRES_PASSWORD=postgres
27+
ports:
28+
- 5432:5432
29+
networks:
30+
- sse
31+
32+
pubsub:
33+
image: messagebird/gcloud-pubsub-emulator:latest
34+
restart: unless-stopped
35+
ports:
36+
- '8681:8681'
37+
networks:
38+
- sse
39+
40+
networks:
41+
sse:
42+
43+
volumes:
44+
go_pkg:
45+
go_cache:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
FROM golang:1.22
2+
3+
RUN go install github.com/cespare/reflex@latest
4+
RUN go install github.com/a-h/templ/cmd/templ@latest
5+
6+
COPY reflex.conf /
7+
8+
ENTRYPOINT ["/go/bin/reflex", "-c", "/reflex.conf"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-r '(\.go$|go\.mod$)' -s go run .
2+
-r '\.templ$' templ generate
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"cloud.google.com/go/pubsub"
9+
"github.com/ThreeDotsLabs/watermill"
10+
"github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud"
11+
"github.com/ThreeDotsLabs/watermill-http/v2/pkg/http"
12+
"github.com/ThreeDotsLabs/watermill/components/cqrs"
13+
"github.com/ThreeDotsLabs/watermill/message"
14+
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
15+
)
16+
17+
type PostViewed struct {
18+
PostID int `json:"post_id"`
19+
}
20+
21+
type PostReactionAdded struct {
22+
PostID int `json:"post_id"`
23+
ReactionID string `json:"reaction_id"`
24+
}
25+
26+
type PostStatsUpdated struct {
27+
PostID int `json:"post_id"`
28+
ViewsUpdated bool `json:"views_updated"`
29+
ReactionUpdated *string `json:"reaction_updated"`
30+
}
31+
32+
type Routers struct {
33+
EventsRouter *message.Router
34+
SSERouter http.SSERouter
35+
EventBus *cqrs.EventBus
36+
}
37+
38+
func NewRouters(cfg config, repo *Repository) (Routers, error) {
39+
logger := watermill.NewStdLogger(false, false)
40+
41+
publisher, err := googlecloud.NewPublisher(
42+
googlecloud.PublisherConfig{
43+
ProjectID: cfg.PubSubProjectID,
44+
},
45+
logger,
46+
)
47+
if err != nil {
48+
return Routers{}, err
49+
}
50+
51+
eventBus, err := cqrs.NewEventBusWithConfig(
52+
publisher,
53+
cqrs.EventBusConfig{
54+
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
55+
return params.EventName, nil
56+
},
57+
Marshaler: cqrs.JSONMarshaler{},
58+
Logger: logger,
59+
},
60+
)
61+
if err != nil {
62+
return Routers{}, err
63+
}
64+
65+
eventsRouter, err := message.NewRouter(message.RouterConfig{}, logger)
66+
if err != nil {
67+
return Routers{}, err
68+
}
69+
70+
eventsRouter.AddMiddleware(middleware.Recoverer)
71+
72+
eventProcessor, err := cqrs.NewEventProcessorWithConfig(
73+
eventsRouter,
74+
cqrs.EventProcessorConfig{
75+
GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) {
76+
return params.EventName, nil
77+
},
78+
SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) {
79+
return googlecloud.NewSubscriber(
80+
googlecloud.SubscriberConfig{
81+
ProjectID: cfg.PubSubProjectID,
82+
GenerateSubscriptionName: func(topic string) string {
83+
return fmt.Sprintf("%v_%v", topic, params.HandlerName)
84+
},
85+
},
86+
logger,
87+
)
88+
},
89+
Marshaler: cqrs.JSONMarshaler{},
90+
Logger: logger,
91+
},
92+
)
93+
if err != nil {
94+
return Routers{}, err
95+
}
96+
97+
err = eventProcessor.AddHandlers(
98+
cqrs.NewEventHandler(
99+
"UpdateViews",
100+
func(ctx context.Context, event *PostViewed) error {
101+
err = repo.UpdatePost(ctx, event.PostID, func(post *Post) {
102+
post.Views++
103+
})
104+
if err != nil {
105+
return err
106+
}
107+
108+
statsUpdated := PostStatsUpdated{
109+
PostID: event.PostID,
110+
ViewsUpdated: true,
111+
}
112+
113+
return eventBus.Publish(ctx, statsUpdated)
114+
},
115+
),
116+
cqrs.NewEventHandler(
117+
"UpdateReactions",
118+
func(ctx context.Context, event *PostReactionAdded) error {
119+
err := repo.UpdatePost(ctx, event.PostID, func(post *Post) {
120+
post.Reactions[event.ReactionID]++
121+
})
122+
if err != nil {
123+
return err
124+
}
125+
126+
statsUpdated := PostStatsUpdated{
127+
PostID: event.PostID,
128+
ReactionUpdated: &event.ReactionID,
129+
}
130+
131+
return eventBus.Publish(ctx, statsUpdated)
132+
},
133+
),
134+
)
135+
if err != nil {
136+
return Routers{}, err
137+
}
138+
139+
sseSubscriber, err := googlecloud.NewSubscriber(
140+
googlecloud.SubscriberConfig{
141+
ProjectID: cfg.PubSubProjectID,
142+
GenerateSubscriptionName: func(topic string) string {
143+
return fmt.Sprintf("%v_%v", topic, watermill.NewShortUUID())
144+
},
145+
SubscriptionConfig: pubsub.SubscriptionConfig{
146+
ExpirationPolicy: time.Hour * 24,
147+
},
148+
},
149+
logger,
150+
)
151+
if err != nil {
152+
return Routers{}, err
153+
}
154+
155+
sseRouter, err := http.NewSSERouter(
156+
http.SSERouterConfig{
157+
UpstreamSubscriber: sseSubscriber,
158+
Marshaler: http.StringSSEMarshaler{},
159+
},
160+
logger,
161+
)
162+
if err != nil {
163+
return Routers{}, err
164+
}
165+
166+
return Routers{
167+
EventsRouter: eventsRouter,
168+
SSERouter: sseRouter,
169+
EventBus: eventBus,
170+
}, nil
171+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
module main
2+
3+
go 1.22.0
4+
5+
require (
6+
cloud.google.com/go/pubsub v1.36.1
7+
github.com/ThreeDotsLabs/watermill v1.3.5
8+
github.com/ThreeDotsLabs/watermill-googlecloud v1.2.0
9+
github.com/ThreeDotsLabs/watermill-http/v2 v2.2.0
10+
github.com/a-h/templ v0.2.663
11+
github.com/kelseyhightower/envconfig v1.4.0
12+
github.com/labstack/echo/v4 v4.11.4
13+
github.com/lib/pq v1.10.9
14+
)
15+
16+
require (
17+
cloud.google.com/go v0.112.1 // indirect
18+
cloud.google.com/go/compute v1.24.0 // indirect
19+
cloud.google.com/go/compute/metadata v0.2.3 // indirect
20+
cloud.google.com/go/iam v1.1.7 // indirect
21+
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
22+
github.com/felixge/httpsnoop v1.0.4 // indirect
23+
github.com/go-chi/chi v4.0.2+incompatible // indirect
24+
github.com/go-chi/render v1.0.1 // indirect
25+
github.com/go-logr/logr v1.4.1 // indirect
26+
github.com/go-logr/stdr v1.2.2 // indirect
27+
github.com/gogo/protobuf v1.3.2 // indirect
28+
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
29+
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
30+
github.com/golang/protobuf v1.5.4 // indirect
31+
github.com/google/s2a-go v0.1.7 // indirect
32+
github.com/google/uuid v1.6.0 // indirect
33+
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
34+
github.com/googleapis/gax-go/v2 v2.12.3 // indirect
35+
github.com/hashicorp/errwrap v1.1.0 // indirect
36+
github.com/hashicorp/go-multierror v1.1.1 // indirect
37+
github.com/labstack/gommon v0.4.2 // indirect
38+
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
39+
github.com/mattn/go-colorable v0.1.13 // indirect
40+
github.com/mattn/go-isatty v0.0.20 // indirect
41+
github.com/oklog/ulid v1.3.1 // indirect
42+
github.com/pkg/errors v0.9.1 // indirect
43+
github.com/sony/gobreaker v0.5.0 // indirect
44+
github.com/valyala/bytebufferpool v1.0.0 // indirect
45+
github.com/valyala/fasttemplate v1.2.2 // indirect
46+
go.opencensus.io v0.24.0 // indirect
47+
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
48+
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
49+
go.opentelemetry.io/otel v1.24.0 // indirect
50+
go.opentelemetry.io/otel/metric v1.24.0 // indirect
51+
go.opentelemetry.io/otel/trace v1.24.0 // indirect
52+
golang.org/x/crypto v0.21.0 // indirect
53+
golang.org/x/net v0.22.0 // indirect
54+
golang.org/x/oauth2 v0.18.0 // indirect
55+
golang.org/x/sync v0.6.0 // indirect
56+
golang.org/x/sys v0.18.0 // indirect
57+
golang.org/x/text v0.14.0 // indirect
58+
golang.org/x/time v0.5.0 // indirect
59+
google.golang.org/api v0.170.0 // indirect
60+
google.golang.org/appengine v1.6.8 // indirect
61+
google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 // indirect
62+
google.golang.org/genproto/googleapis/api v0.0.0-20240314234333-6e1732d8331c // indirect
63+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240311132316-a219d84964c2 // indirect
64+
google.golang.org/grpc v1.62.1 // indirect
65+
google.golang.org/protobuf v1.33.0 // indirect
66+
)

0 commit comments

Comments
 (0)