-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtower.go
306 lines (273 loc) · 8.97 KB
/
tower.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
package tower
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"sync"
)
// Tower is an instance of Tower.
type Tower struct {
messengers Messengers
logger Logger
errorConstructor ErrorConstructor
entryConstructor EntryConstructor
service Service
defaultNotifyOption []MessageOption
errorMessageContextBuilder ErrorMessageContextBuilder
messageContextBuilder MessageContextBuilder
callerDepth int
}
// SetCallerDepth Sets the depth of the caller to be used when constructing the ErrorBuilder.
func (t *Tower) SetCallerDepth(callerDepth int) {
t.callerDepth = callerDepth
}
var (
_ Logger = (*Tower)(nil)
_ Messenger = (*Tower)(nil)
)
// NewTower Creates New Tower Instance. Using Built in Generator Engines by default.
func NewTower(service Service) *Tower {
return &Tower{
messengers: Messengers{},
logger: NoopLogger{},
errorConstructor: ErrorConstructorFunc(defaultErrorGenerator),
entryConstructor: EntryConstructorFunc(defaultEntryConstructor),
errorMessageContextBuilder: ErrorMessageContextBuilderFunc(defaultErrorMessageContextBuilder),
messageContextBuilder: MessageContextBuilderFunc(defaultMessageContextBuilder),
service: service,
callerDepth: 2,
}
}
// RegisterMessenger Registers a messenger to the tower.
//
// The messenger's name should be unique. Same name will replace the previous messenger with the same name.
//
// If you wish to have multiple messengers of the same type, you should use different names for each of them.
func (t *Tower) RegisterMessenger(messenger Messenger) {
t.messengers[messenger.Name()] = messenger
}
// RemoveMessenger Removes the Messenger by name.
func (t *Tower) RemoveMessenger(name string) {
delete(t.messengers, name)
}
// Wrap like exported tower.Wrap, but at the scope of this Tower's instance instead.
func (t *Tower) Wrap(err error) ErrorBuilder {
if err == nil {
err = errors.New("<nil>")
}
caller := GetCaller(t.callerDepth)
return t.errorConstructor.ConstructError(&ErrorConstructorContext{
Err: err,
Caller: caller,
Tower: t,
})
}
// NewEntry Creates a new EntryBuilder. The returned EntryBuilder may be appended with values.
func (t *Tower) NewEntry(msg string, args ...any) EntryBuilder {
if len(args) > 0 {
msg = fmt.Sprintf(msg, args...)
}
caller := GetCaller(t.callerDepth)
return t.entryConstructor.ConstructEntry(&EntryConstructorContext{
Caller: caller,
Tower: t,
Message: msg,
})
}
// Bail creates a new ErrorBuilder from simple messages.
//
// If args is not empty, msg will be fed into fmt.Errorf along with the args.
// Otherwise, msg will be fed into `errors.New()`.
func (t *Tower) Bail(msg string, args ...any) ErrorBuilder {
var err error
if len(args) > 0 {
err = fmt.Errorf(msg, args...)
} else {
err = errors.New(msg)
}
caller := GetCaller(t.callerDepth)
return t.errorConstructor.ConstructError(&ErrorConstructorContext{
Err: err,
Caller: caller,
Tower: t,
})
}
// BailFreeze creates new immutable Error from simple messages.
//
// If args is not empty, msg will be fed into fmt.Errorf along with the args.
// Otherwise, msg will be fed into `errors.New()`.
func (t *Tower) BailFreeze(msg string, args ...any) Error {
var err error
if len(args) > 0 {
err = fmt.Errorf(msg, args...)
} else {
err = errors.New(msg)
}
caller := GetCaller(t.callerDepth)
return t.errorConstructor.ConstructError(&ErrorConstructorContext{
Err: err,
Caller: caller,
Tower: t,
}).Freeze()
}
// SetErrorConstructor Sets how the ErrorBuilder will be constructed.
func (t *Tower) SetErrorConstructor(c ErrorConstructor) {
t.errorConstructor = c
}
// SetErrorMessageContextBuilder Sets how the MessageContext will be built from tower.Error.
func (t *Tower) SetErrorMessageContextBuilder(b ErrorMessageContextBuilder) {
t.errorMessageContextBuilder = b
}
// SetMessageContextBuilder Sets how the MessageContext will be built from tower.Entry.
func (t *Tower) SetMessageContextBuilder(b MessageContextBuilder) {
t.messageContextBuilder = b
}
// SetEntryConstructor Sets how the EntryBuilder will be constructed.
func (t *Tower) SetEntryConstructor(c EntryConstructor) {
t.entryConstructor = c
}
// SetDefaultNotifyOption Sets the default options for Notify and NotifyError.
// When Notify or NotifyError is called, the default options will be applied first,
// then followed by the options passed in on premise by the user.
func (t *Tower) SetDefaultNotifyOption(opts ...MessageOption) {
t.defaultNotifyOption = opts
}
// WrapFreeze is a Shorthand for tower.Wrap(err).Message(message).Freeze()
//
// Useful when just wanting to add extra simple messages to the error chain.
// If args is not empty, message will be fed into fmt.Errorf along with the args.
func (t *Tower) WrapFreeze(err error, message string, args ...any) Error {
caller := GetCaller(t.callerDepth)
if len(args) > 0 {
message = fmt.Sprintf(message, args...)
}
return t.errorConstructor.ConstructError(&ErrorConstructorContext{
Err: err,
Caller: caller,
Tower: t,
}).
Message(message).
Freeze()
}
// GetMessengers Returns a CLONE of the registered messengers.
func (t Tower) GetMessengers() Messengers {
return t.messengers.Clone()
}
// GetMessengerByName Gets the Messenger by name. Returns nil if not found.
func (t Tower) GetMessengerByName(name string) Messenger {
return t.messengers[name]
}
// GetLogger Gets the underlying Logger.
func (t Tower) GetLogger() Logger {
return t.logger
}
// GetService Gets the service metadata that Tower is running under.
func (t Tower) GetService() Service {
return t.service
}
// SetLogger Sets the underlying Logger. This method is NOT concurrent safe.
func (t *Tower) SetLogger(log Logger) {
t.logger = log
}
// Notify Sends the Entry to Messengers.
func (t Tower) Notify(ctx context.Context, entry Entry, parameters ...MessageOption) {
opts := t.createOption(parameters...)
msg := t.messageContextBuilder.BuildMessageContext(entry, opts)
t.sendNotif(ctx, msg, opts)
}
// NotifyError Sends the Error to Messengers.
func (t Tower) NotifyError(ctx context.Context, err Error, parameters ...MessageOption) {
opts := t.createOption(parameters...)
msg := t.errorMessageContextBuilder.BuildErrorMessageContext(err, opts)
t.sendNotif(ctx, msg, opts)
}
func (t *Tower) createOption(parameters ...MessageOption) *messageOption {
opts := &messageOption{tower: t}
for _, v := range t.defaultNotifyOption {
v.apply(opts)
}
for _, v := range parameters {
v.apply(opts)
}
return opts
}
func (t Tower) sendNotif(ctx context.Context, msg MessageContext, opts *messageOption) {
ctx = DetachedContext(ctx)
if opts.specificMessenger != nil {
go opts.specificMessenger.SendMessage(ctx, msg)
return
}
if len(opts.messengers) > 0 {
for _, messenger := range opts.messengers {
go messenger.SendMessage(ctx, msg)
}
return
}
for _, messenger := range t.messengers {
go messenger.SendMessage(ctx, msg)
}
}
// Log Implements tower.Logger interface. So The Tower instance itself may be used as Logger Engine.
func (t Tower) Log(ctx context.Context, entry Entry) {
t.logger.Log(ctx, entry)
}
// LogError Implements tower.Logger interface. So The Tower instance itself may be used as Logger Engine.
func (t Tower) LogError(ctx context.Context, err Error) {
t.logger.LogError(ctx, err)
}
// Name Implements tower.Messenger interface. So The Tower instance itself may be used as Messenger.
//
// Returns the service registered in the format of "tower-service_name-service_type-service_environment".
func (t Tower) Name() string {
return "tower-" + t.service.String()
}
// SendMessage Implements tower.Messenger interface. So The Tower instance itself may be used as Messenger.
//
// Sends notification to all messengers registered in this instance.
func (t Tower) SendMessage(ctx context.Context, msg MessageContext) {
for _, v := range t.messengers {
v.SendMessage(ctx, msg)
}
}
type multierror []error
func (m multierror) Error() string {
s := strings.Builder{}
for i, err := range m {
s.WriteString(strconv.Itoa(i + 1))
s.WriteString(". ")
if i > 0 {
s.WriteString("; ")
}
s.WriteString(err.Error())
}
return s.String()
}
// Wait Implements tower.Messenger interface. So The Tower instance itself may be used as Messenger.
//
// Waits until all message in the queue or until given channel is received.
//
// Implementer must exit the function as soon as possible when this ctx is canceled.
func (t Tower) Wait(ctx context.Context) error {
mu := &sync.Mutex{}
wg := &sync.WaitGroup{}
wg.Add(len(t.messengers))
errs := make(multierror, 0, len(t.messengers))
for _, v := range t.messengers {
go func(messenger Messenger) {
defer wg.Done()
err := messenger.Wait(ctx)
if err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("failed on waiting messages to finish from '%s': %w", messenger.Name(), err))
mu.Unlock()
}
}(v)
}
wg.Wait()
if len(errs) > 0 {
return errs
}
return nil
}