Skip to content

Commit 1532472

Browse files
sklarsapuzpuzpuz
andauthored
feat(client): add sender pool (#47)
Co-authored-by: Andrei Pechkurov <[email protected]>
1 parent 9205645 commit 1532472

File tree

3 files changed

+430
-0
lines changed

3 files changed

+430
-0
lines changed

README.md

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,59 @@ To connect via TCP, set the configuration string to:
7575
// ...
7676
```
7777

78+
## Pooled Line Senders
79+
80+
**Warning: Experimental feature designed for use with HTTP senders ONLY**
81+
82+
Version 3 of the client introduces a `LineSenderPool`, which provides a mechanism
83+
to cache previously-used `LineSender`s in memory so they can be reused without
84+
having to allocate and instantiate new senders.
85+
86+
A LineSenderPool is thread-safe and can be used to concurrently Acquire and Release senders
87+
across multiple goroutines.
88+
89+
Since `LineSender`s must be used in a single-threaded context, a typical pattern is to Acquire
90+
a sender from a `LineSenderPool` at the beginning of a goroutine and use a deferred
91+
execution block to Release the sender at the end of the goroutine.
92+
93+
Here is an example of the `LineSenderPool` Acquire, Release, and Close semantics:
94+
95+
```go
96+
package main
97+
98+
import (
99+
"context"
100+
101+
qdb "github.com/questdb/go-questdb-client/v3"
102+
)
103+
104+
func main() {
105+
ctx := context.TODO()
106+
107+
pool := qdb.PoolFromConf("http::addr=localhost:9000")
108+
defer func() {
109+
err := pool.Close(ctx)
110+
if err != nil {
111+
panic(err)
112+
}
113+
}()
114+
115+
sender, err := pool.Acquire(ctx)
116+
if err != nil {
117+
panic(err)
118+
}
119+
120+
sender.Table("prices").
121+
Symbol("ticker", "AAPL").
122+
Float64Column("price", 123.45).
123+
AtNow(ctx)
124+
125+
if err := pool.Release(ctx, sender); err != nil {
126+
panic(err)
127+
}
128+
}
129+
```
130+
78131
## Migration from v2
79132

80133
v2 code example:

sender_pool.go

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*******************************************************************************
2+
* ___ _ ____ ____
3+
* / _ \ _ _ ___ ___| |_| _ \| __ )
4+
* | | | | | | |/ _ \/ __| __| | | | _ \
5+
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
6+
* \__\_\\__,_|\___||___/\__|____/|____/
7+
*
8+
* Copyright (c) 2014-2019 Appsicle
9+
* Copyright (c) 2019-2022 QuestDB
10+
*
11+
* Licensed under the Apache License, Version 2.0 (the "License");
12+
* you may not use this file except in compliance with the License.
13+
* You may obtain a copy of the License at
14+
*
15+
* http://www.apache.org/licenses/LICENSE-2.0
16+
*
17+
* Unless required by applicable law or agreed to in writing, software
18+
* distributed under the License is distributed on an "AS IS" BASIS,
19+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20+
* See the License for the specific language governing permissions and
21+
* limitations under the License.
22+
*
23+
******************************************************************************/
24+
25+
package questdb
26+
27+
import (
28+
"context"
29+
"errors"
30+
"fmt"
31+
"strings"
32+
"sync"
33+
)
34+
35+
// LineSenderPool wraps a mutex-protected slice of [LineSender]. It allows a goroutine to
36+
// Acquire a sender from the pool and Release it back to the pool when it's done being used.
37+
//
38+
// WARNING: This is an experimental API that is designed to work with HTTP senders ONLY.
39+
type LineSenderPool struct {
40+
maxSenders int
41+
conf string
42+
43+
closed bool
44+
45+
senders []LineSender
46+
mu *sync.Mutex
47+
}
48+
49+
// LineSenderPoolOption defines line sender pool config option.
50+
type LineSenderPoolOption func(*LineSenderPool)
51+
52+
// PoolFromConf instantiates a new LineSenderPool with a QuestDB configuration string.
53+
// Any sender acquired from this pool will be initialized with the same configuration
54+
// string that was passed into the conf argument.
55+
//
56+
// The default maximum number of senders is 64, but can be customized by using the
57+
// [WithMaxSenders] option.
58+
func PoolFromConf(conf string, opts ...LineSenderPoolOption) (*LineSenderPool, error) {
59+
if strings.HasPrefix(conf, "tcp") {
60+
return nil, errors.New("tcp/s not supported for pooled senders, use http/s only")
61+
}
62+
63+
pool := &LineSenderPool{
64+
maxSenders: 64,
65+
conf: conf,
66+
senders: []LineSender{},
67+
mu: &sync.Mutex{},
68+
}
69+
70+
for _, opt := range opts {
71+
opt(pool)
72+
}
73+
74+
return pool, nil
75+
}
76+
77+
// WithMaxSenders sets the maximum number of senders in the pool.
78+
// The default maximum number of senders is 64.
79+
func WithMaxSenders(count int) LineSenderPoolOption {
80+
return func(lsp *LineSenderPool) {
81+
lsp.maxSenders = count
82+
}
83+
}
84+
85+
// Acquire obtains a LineSender from the pool. If the pool is empty, a new
86+
// LineSender will be instantiated using the pool's config string.
87+
func (p *LineSenderPool) Acquire(ctx context.Context) (LineSender, error) {
88+
p.mu.Lock()
89+
defer p.mu.Unlock()
90+
91+
if p.closed {
92+
return nil, fmt.Errorf("cannot Acquire a LineSender from a closed LineSenderPool")
93+
}
94+
95+
if len(p.senders) > 0 {
96+
// Pop sender off the slice and return it
97+
s := p.senders[len(p.senders)-1]
98+
p.senders = p.senders[0 : len(p.senders)-1]
99+
return s, nil
100+
}
101+
102+
return LineSenderFromConf(ctx, p.conf)
103+
}
104+
105+
// Release flushes the LineSender and returns it back to the pool. If the pool
106+
// is full, the sender is closed and discarded. In cases where the sender's
107+
// flush fails, it is not added back to the pool.
108+
func (p *LineSenderPool) Release(ctx context.Context, s LineSender) error {
109+
// If there is an error on flush, do not add the sender back to the pool
110+
if err := s.Flush(ctx); err != nil {
111+
return err
112+
}
113+
114+
p.mu.Lock()
115+
defer p.mu.Unlock()
116+
117+
for i := range p.senders {
118+
if p.senders[i] == s {
119+
return fmt.Errorf("LineSender %p has already been released back to the pool", s)
120+
}
121+
}
122+
123+
if p.closed || len(p.senders) >= p.maxSenders {
124+
return s.Close(ctx)
125+
}
126+
127+
p.senders = append(p.senders, s)
128+
129+
return nil
130+
}
131+
132+
// Close sets the pool's status to "closed" and closes all cached LineSenders.
133+
// When LineSenders are released back into a closed pool, they will be closed and discarded.
134+
func (p *LineSenderPool) Close(ctx context.Context) error {
135+
p.mu.Lock()
136+
defer p.mu.Unlock()
137+
138+
p.closed = true
139+
140+
var senderErrors []error
141+
142+
for _, s := range p.senders {
143+
senderErr := s.Close(ctx)
144+
if senderErr != nil {
145+
senderErrors = append(senderErrors, senderErr)
146+
147+
}
148+
}
149+
150+
if len(senderErrors) == 0 {
151+
return nil
152+
}
153+
154+
err := fmt.Errorf("error closing one or more LineSenders in the pool")
155+
for _, senderErr := range senderErrors {
156+
err = fmt.Errorf("%s %w", err, senderErr)
157+
}
158+
159+
return err
160+
}
161+
162+
// IsClosed will return true if the pool is closed. Once a pool is closed,
163+
// you will not be able to Acquire any new LineSenders from it. When
164+
// LineSenders are released back into a closed pool, they will be closed and
165+
// discarded.
166+
func (p *LineSenderPool) IsClosed() bool {
167+
p.mu.Lock()
168+
defer p.mu.Unlock()
169+
170+
return p.closed
171+
}
172+
173+
// Len returns the numbers of cached LineSenders in the pool.
174+
func (p *LineSenderPool) Len() int {
175+
p.mu.Lock()
176+
defer p.mu.Unlock()
177+
178+
return len(p.senders)
179+
}

0 commit comments

Comments
 (0)