-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathserver.go
183 lines (151 loc) · 4.7 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
// Copyright 2024 SeatGeek, Inc.
//
// Licensed under the terms of the Apache-2.0 license. See LICENSE file in project root for terms.
package mailroom
import (
"context"
"fmt"
"log/slog"
"net/http"
"time"
"github.com/gorilla/mux"
"github.com/seatgeek/mailroom/pkg/common"
"github.com/seatgeek/mailroom/pkg/handler"
"github.com/seatgeek/mailroom/pkg/notifier"
"github.com/seatgeek/mailroom/pkg/server"
"github.com/seatgeek/mailroom/pkg/user"
)
// Server is the heart of the mailroom application
// It listens for incoming webhooks, parses them, generates notifications, and dispatches them to users.
type Server struct {
listenAddr string
handlers []handler.Handler
notifier notifier.Notifier
transports []notifier.Transport
userStore user.Store
router *mux.Router
}
type Opt func(s *Server)
// New returns a new server
func New(opts ...Opt) *Server {
s := &Server{
listenAddr: "0.0.0.0:8000",
router: mux.NewRouter(),
}
for _, opt := range opts {
opt(s)
}
s.notifier = notifier.New(s.userStore, s.transports...)
return s
}
// WithListenAddr sets the IP and port the server listens on, in the form "host:port"
func WithListenAddr(addr string) Opt {
return func(s *Server) {
s.listenAddr = addr
}
}
// WithHandlers adds handler.Handler instances to the server
func WithHandlers(handlers ...handler.Handler) Opt {
return func(s *Server) {
s.handlers = append(s.handlers, handlers...)
}
}
// WithTransports adds notifier.Transport instances to the server
func WithTransports(transports ...notifier.Transport) Opt {
return func(s *Server) {
s.transports = append(s.transports, transports...)
}
}
// WithUserStore sets the user.Store for the server
func WithUserStore(us user.Store) Opt {
return func(s *Server) {
s.userStore = us
}
}
// WithRouter sets the mux.Router used for the server
func WithRouter(router *mux.Router) Opt {
return func(s *Server) {
s.router = router
}
}
func (s *Server) validate(ctx context.Context) error {
for _, src := range s.handlers {
if v, ok := src.(common.Validator); ok {
if err := v.Validate(ctx); err != nil {
return fmt.Errorf("parser %s failed to validate: %w", src.Key(), err)
}
}
}
for _, t := range s.transports {
if v, ok := t.(common.Validator); ok {
if err := v.Validate(ctx); err != nil {
return fmt.Errorf("transport %s failed to validate: %w", t.Key(), err)
}
}
}
if v, ok := s.userStore.(common.Validator); ok {
if err := v.Validate(ctx); err != nil {
return fmt.Errorf("user store failed to validate: %w", err)
}
}
return nil
}
// Run starts the server in a Goroutine and blocks until the server is shut down.
// If the given context is canceled, the server will attempt to shut down gracefully.
func (s *Server) Run(ctx context.Context) error {
if err := s.validate(ctx); err != nil {
return fmt.Errorf("server validation failed: %w", err)
}
return s.serveHttp(ctx)
}
func (s *Server) serveHttp(ctx context.Context) error {
hsm := s.router
hsm.HandleFunc("/healthz", func(writer http.ResponseWriter, _ *http.Request) {
writer.WriteHeader(200)
_, _ = writer.Write([]byte("^_^\n"))
})
// Mount all handlers
for _, src := range s.handlers {
endpoint := "/event/" + src.Key()
slog.Debug("mounting handler", "endpoint", endpoint)
hsm.HandleFunc(endpoint, server.CreateEventHandler(src, s.notifier))
}
// Expose routes for managing user preferences
prefs := user.NewPreferencesHandler(s.userStore, s.handlers, transportKeys(s.transports))
hsm.HandleFunc("/users/{key}/preferences", prefs.GetPreferences).Methods("GET")
hsm.HandleFunc("/users/{key}/preferences", prefs.UpdatePreferences).Methods("PUT")
hsm.HandleFunc("/configuration", prefs.ListOptions).Methods("GET")
hs := &http.Server{
Addr: s.listenAddr,
Handler: hsm,
ReadHeaderTimeout: 2 * time.Second,
}
// Run the server in a Goroutine
httpExited := make(chan error)
go (func() {
defer close(httpExited)
slog.Info("http server listening on " + s.listenAddr)
httpExited <- hs.ListenAndServe()
})()
select {
// Wait for the context to be canceled
case <-ctx.Done():
slog.Info("shutting down http server gracefully")
shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelShutdown()
if err := hs.Shutdown(shutdownCtx); err != nil { //nolint:contextcheck
return fmt.Errorf("failed to gracefully shutdown http server: %w", err)
}
return nil
// Or wait for the server to exit on its own (with some error)
case err := <-httpExited:
return err
}
}
func transportKeys(transports []notifier.Transport) []common.TransportKey {
keys := make([]common.TransportKey, len(transports))
for i, t := range transports {
keys[i] = t.Key()
}
return keys
}