-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.go
168 lines (144 loc) · 5.29 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package automergendjsonsync
import (
"context"
"errors"
"fmt"
"log/slog"
"mime"
"net/http"
"sync"
"github.com/automerge/automerge-go"
)
// SharedDoc encapsulates a doc with a signalling mechanism that broadcasts an event when new messages changes have
// been synced into the doc. This event is generally used to wake up other goroutines for generating sync messages to
// other clients or servers but can also be used to driver other mechanisms like backups or transformers.
type SharedDoc struct {
doc *automerge.Doc
mutex sync.Mutex
channels []chan bool
}
// NewSharedDoc returns a new SharedDoc
func NewSharedDoc(doc *automerge.Doc) *SharedDoc {
return &SharedDoc{doc: doc}
}
// Doc returns the document held by this SharedDoc.
func (b *SharedDoc) Doc() *automerge.Doc {
return b.doc
}
type serverOptions struct {
state *automerge.SyncState
headerEditors []func(rw http.Header)
readPredicate ReadPredicate
terminationCheck TerminationCheck
}
type ServerOption func(*serverOptions)
func newServerOptions(opts ...ServerOption) *serverOptions {
options := &serverOptions{
readPredicate: NoReadPredicate,
terminationCheck: NoTerminationCheck,
}
for _, opt := range opts {
opt(options)
}
return options
}
func WithServerSyncState(state *automerge.SyncState) ServerOption {
return func(o *serverOptions) {
o.state = state
}
}
func WithServerHeaderEditor(f func(headers http.Header)) ServerOption {
return func(o *serverOptions) {
o.headerEditors = append(o.headerEditors, f)
}
}
func WithReadPredicate(f ReadPredicate) ServerOption {
return func(o *serverOptions) {
o.readPredicate = f
}
}
func WithTerminationCheck(f TerminationCheck) ServerOption {
return func(o *serverOptions) {
o.terminationCheck = f
}
}
func isNotSuitableContentType(in string) bool {
mt, p, err := mime.ParseMediaType(in)
return err != nil || mt != ContentType || (p["charset"] != "" && p["charset"] != "utf-8")
}
func (b *SharedDoc) ServeChanges(rw http.ResponseWriter, req *http.Request, opts ...ServerOption) (finalErr error) {
log := Logger(req.Context())
options := newServerOptions(opts...)
if options.state == nil {
options.state = automerge.NewSyncState(b.Doc())
}
// If there is an accept header, then ensure it's compatible.
if v := req.Header.Get("Accept"); v != "" && isNotSuitableContentType(v) {
rw.WriteHeader(http.StatusNotAcceptable)
return nil
}
// If there is a content-type header, then ensure it's what we expect
if v := req.Header.Get("Content-Type"); v != "" && isNotSuitableContentType(v) {
rw.WriteHeader(http.StatusUnsupportedMediaType)
return nil
}
ctx := req.Context()
// Because the request body is relatively expensive to produce, the client may only want to produce it when the request has been accepted.
// So it may send an Expect=100-continue header and expect us to honor it.
if req.Header.Get("Expect") == "100-continue" {
rw.WriteHeader(http.StatusContinue)
}
log.InfoContext(ctx, "sending http sync response", slog.String("proto", req.Proto), slog.String("target", fmt.Sprintf("%s %s", req.Method, req.URL)), slog.Int("status", http.StatusOK))
rw.Header().Set("Content-Type", ContentTypeWithCharset)
rw.Header().Set("X-Content-Type-Options", "nosniff")
rw.Header().Set("Cache-Control", "no-store")
for _, he := range options.headerEditors {
he(rw.Header())
}
rw.WriteHeader(http.StatusOK)
// Flush the header, this should ensure the client can begin reacting to our sync messages while still producing the body content.
if v, ok := rw.(http.Flusher); ok {
v.Flush()
}
// We produce the messages in a goroutine which must be shut down on exit.
wg := new(sync.WaitGroup)
defer wg.Wait()
sub, fin := b.SubscribeToReceivedChanges()
defer fin()
// We piggyback on the context and ensure we cancel it before waiting for the wait group.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
log.DebugContext(ctx, "starting to read messages from request body in the background")
wg.Add(1)
go func() {
defer wg.Done()
if received, err := b.consumeMessagesFromReader(ctx, options.state, req.Body, options.readPredicate, options.terminationCheck); err != nil {
// If we've finished and the request context is closed (indicating that the client disconnected), then this
// isn't really an error. For anything else, set the final error and cancel the context. The cancellation
// should stop the writer from producing messages and lead to closing the response.
if req.Context().Err() != nil {
log.DebugContext(ctx, "client context closed")
} else if errors.Is(err, http.ErrBodyReadAfterClose) {
log.DebugContext(ctx, "read after close")
} else {
finalErr = err
cancel()
}
} else if received == 0 {
// It's bad if the request reached EOF without any sync messages since our writer can't really do anything
// in response. So we set an error and cancel.
finalErr = fmt.Errorf("request closed with no messages received")
cancel()
}
}()
log.DebugContext(ctx, "writing messages to response body")
if err := generateMessagesToWriter(ctx, options.state, sub, rw, false); err != nil {
// If we close and the request context is closed then there's no particular error unless finalErr has been set
// from the reading routine.
if ctx.Err() != nil {
return
}
return errors.Join(err, finalErr)
}
return
}