@@ -37,13 +37,19 @@ import (
37
37
//
38
38
// WARNING: This is an experimental API that is designed to work with HTTP senders ONLY.
39
39
type LineSenderPool struct {
40
+ // options
40
41
maxSenders int
41
- conf string
42
42
43
- closed bool
43
+ // presence of a non-empty conf takes precedence over opts
44
+ conf string
45
+ opts []LineSenderOption
44
46
47
+ // senders are stored here
45
48
senders []LineSender
46
- mu * sync.Mutex
49
+
50
+ // plumbing fields
51
+ closed bool
52
+ mu * sync.Mutex
47
53
}
48
54
49
55
// LineSenderPoolOption defines line sender pool config option.
@@ -74,6 +80,37 @@ func PoolFromConf(conf string, opts ...LineSenderPoolOption) (*LineSenderPool, e
74
80
return pool , nil
75
81
}
76
82
83
+ // PoolFromOptions instantiates a new LineSenderPool using programmatic options.
84
+ // Any sender acquired from this pool will be initialized with the same options
85
+ // that were passed into the opts argument.
86
+ //
87
+ // Unlike [PoolFromConf], PoolFromOptions does not have the ability to customize
88
+ // the returned LineSenderPool. In this case, to add options (such as [WithMaxSenders]),
89
+ // you need manually apply these options after calling this method.
90
+ //
91
+ // // Create a PoolFromOptions with LineSender options
92
+ // p, err := PoolFromOptions(
93
+ // WithHttp(),
94
+ // WithAutoFlushRows(1000000),
95
+ // )
96
+ //
97
+ // if err != nil {
98
+ // panic(err)
99
+ // }
100
+ //
101
+ // // Add Pool-level options manually
102
+ // WithMaxSenders(32)(p)
103
+ func PoolFromOptions (opts ... LineSenderOption ) (* LineSenderPool , error ) {
104
+ pool := & LineSenderPool {
105
+ maxSenders : 64 ,
106
+ opts : opts ,
107
+ senders : []LineSender {},
108
+ mu : & sync.Mutex {},
109
+ }
110
+
111
+ return pool , nil
112
+ }
113
+
77
114
// WithMaxSenders sets the maximum number of senders in the pool.
78
115
// The default maximum number of senders is 64.
79
116
func WithMaxSenders (count int ) LineSenderPoolOption {
@@ -99,7 +136,19 @@ func (p *LineSenderPool) Acquire(ctx context.Context) (LineSender, error) {
99
136
return s , nil
100
137
}
101
138
102
- return LineSenderFromConf (ctx , p .conf )
139
+ if p .conf != "" {
140
+ return LineSenderFromConf (ctx , p .conf )
141
+ } else {
142
+ conf := newLineSenderConfig (httpSenderType )
143
+ for _ , opt := range p .opts {
144
+ opt (conf )
145
+ if conf .senderType == tcpSenderType {
146
+ return nil , errors .New ("tcp/s not supported for pooled senders, use http/s only" )
147
+ }
148
+ }
149
+ return newHttpLineSender (conf )
150
+ }
151
+
103
152
}
104
153
105
154
// Release flushes the LineSender and returns it back to the pool. If the pool
0 commit comments