Skip to content

Commit 20aff84

Browse files
authoredJun 2, 2023
Merge pull request #303 from ClickHouse/fix/rework-client-close
fix(client): rework client close
2 parents 14d3048 + 7c71e87 commit 20aff84

File tree

6 files changed

+47
-44
lines changed

6 files changed

+47
-44
lines changed
 

‎.golangci.yml

+3-2
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@ linters-settings:
3434
- unnecessaryBlock
3535
- redundantSprint
3636
- octalLiteral
37+
- ifElseChain
3738

3839
linters:
3940
disable-all: true
4041
enable:
41-
- depguard
4242
- dogsled
4343
- errcheck
4444
- goconst
45-
# - gocritic # go1.18
45+
- gocritic
4646
- gofmt
4747
- goimports
4848
- revive
@@ -67,6 +67,7 @@ linters:
6767
# - maligned (same as prealloc)
6868
# - funlen (gocyclo is enough)
6969
# - gochecknoglobals (we know when it is ok to use globals)
70+
# - depguard (broken)
7071

7172
issues:
7273
exclude-use-default: false

‎client.go

+12-10
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,16 @@ import (
2929
type Client struct {
3030
lg *zap.Logger
3131
conn net.Conn
32-
mux sync.Mutex
3332
buf *proto.Buffer
3433
reader *proto.Reader
3534
info proto.ClientHello
3635
server proto.ServerHello
3736
version clientVersion
3837
quotaKey string
3938

39+
mux sync.Mutex
40+
closed bool
41+
4042
// Single packet read timeout.
4143
readTimeout time.Duration
4244

@@ -80,14 +82,14 @@ var ErrClosed = errors.New("client is closed")
8082
// Close closes underlying connection and frees all resources,
8183
// rendering Client to unusable state.
8284
func (c *Client) Close() error {
83-
if c.conn == nil {
85+
c.mux.Lock()
86+
defer c.mux.Unlock()
87+
88+
if c.closed {
8489
return ErrClosed
8590
}
86-
defer func() {
87-
c.buf = nil
88-
c.reader = nil
89-
c.conn = nil
90-
}()
91+
92+
c.closed = true
9193
if err := c.conn.Close(); err != nil {
9294
return errors.Wrap(err, "conn")
9395
}
@@ -97,7 +99,9 @@ func (c *Client) Close() error {
9799

98100
// IsClosed indicates that connection is closed.
99101
func (c *Client) IsClosed() bool {
100-
return c.conn == nil
102+
c.mux.Lock()
103+
defer c.mux.Unlock()
104+
return c.closed
101105
}
102106

103107
// Exception is server-side error.
@@ -251,8 +255,6 @@ func (c *Client) packet(ctx context.Context) (proto.ServerCode, error) {
251255
}
252256

253257
func (c *Client) flushBuf(ctx context.Context, b *proto.Buffer) error {
254-
c.mux.Lock()
255-
defer c.mux.Unlock()
256258
if err := ctx.Err(); err != nil {
257259
return errors.Wrap(err, "context")
258260
}

‎otel_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ type MapPair[V comparable] struct {
1313
Values *proto.ColArr[V]
1414
}
1515

16-
func (m MapPair[V]) Row(i int) ([]string, []V) {
16+
func (m MapPair[V]) Row(i int) (keys []string, values []V) {
1717
return m.Keys.Row(i), m.Values.Row(i)
1818
}
1919

‎ping.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
//
1616
// Do not call concurrently with Do.
1717
func (c *Client) Ping(ctx context.Context) (err error) {
18-
if c.conn == nil {
18+
if c.IsClosed() {
1919
return ErrClosed
2020
}
2121
if c.otel {

‎proto/col_low_cardinality.go

+28-28
Original file line numberDiff line numberDiff line change
@@ -13,34 +13,6 @@ var (
1313
_ Column = (*ColLowCardinality[string])(nil)
1414
)
1515

16-
// DecodeState implements StateDecoder, ensuring state for index column.
17-
func (c *ColLowCardinality[T]) DecodeState(r *Reader) error {
18-
keySerialization, err := r.Int64()
19-
if err != nil {
20-
return errors.Wrap(err, "version")
21-
}
22-
if keySerialization != int64(sharedDictionariesWithAdditionalKeys) {
23-
return errors.Errorf("got version %d, expected %d",
24-
keySerialization, sharedDictionariesWithAdditionalKeys,
25-
)
26-
}
27-
if s, ok := c.index.(StateDecoder); ok {
28-
if err := s.DecodeState(r); err != nil {
29-
return errors.Wrap(err, "index state")
30-
}
31-
}
32-
return nil
33-
}
34-
35-
// EncodeState implements StateEncoder, ensuring state for index column.
36-
func (c ColLowCardinality[T]) EncodeState(b *Buffer) {
37-
// Writing key serialization version.
38-
b.PutInt64(int64(sharedDictionariesWithAdditionalKeys))
39-
if s, ok := c.index.(StateEncoder); ok {
40-
s.EncodeState(b)
41-
}
42-
}
43-
4416
//go:generate go run github.com/dmarkham/enumer -type CardinalityKey -trimprefix Key -output col_low_cardinality_enum.go
4517

4618
// CardinalityKey is integer type of ColLowCardinality.Keys column.
@@ -119,6 +91,34 @@ type ColLowCardinality[T comparable] struct {
11991
keys []int
12092
}
12193

94+
// DecodeState implements StateDecoder, ensuring state for index column.
95+
func (c *ColLowCardinality[T]) DecodeState(r *Reader) error {
96+
keySerialization, err := r.Int64()
97+
if err != nil {
98+
return errors.Wrap(err, "version")
99+
}
100+
if keySerialization != int64(sharedDictionariesWithAdditionalKeys) {
101+
return errors.Errorf("got version %d, expected %d",
102+
keySerialization, sharedDictionariesWithAdditionalKeys,
103+
)
104+
}
105+
if s, ok := c.index.(StateDecoder); ok {
106+
if err := s.DecodeState(r); err != nil {
107+
return errors.Wrap(err, "index state")
108+
}
109+
}
110+
return nil
111+
}
112+
113+
// EncodeState implements StateEncoder, ensuring state for index column.
114+
func (c ColLowCardinality[T]) EncodeState(b *Buffer) {
115+
// Writing key serialization version.
116+
b.PutInt64(int64(sharedDictionariesWithAdditionalKeys))
117+
if s, ok := c.index.(StateEncoder); ok {
118+
s.EncodeState(b)
119+
}
120+
}
121+
122122
func (c *ColLowCardinality[T]) DecodeColumn(r *Reader, rows int) error {
123123
if rows == 0 {
124124
// Skipping entirely of no rows.

‎query.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func (c *Client) sendQuery(ctx context.Context, q Query) error {
7575
zap.String("query_id", q.QueryID),
7676
)
7777
}
78-
if c.conn == nil {
78+
if c.IsClosed() {
7979
return ErrClosed
8080
}
8181
c.encode(proto.Query{
@@ -537,7 +537,7 @@ func (c *Client) handlePacket(ctx context.Context, p proto.ServerCode, q Query)
537537

538538
// Do performs Query on ClickHouse server.
539539
func (c *Client) Do(ctx context.Context, q Query) (err error) {
540-
if c.conn == nil {
540+
if c.IsClosed() {
541541
return ErrClosed
542542
}
543543
if len(q.Parameters) > 0 && !proto.FeatureParameters.In(c.protocolVersion) {

0 commit comments

Comments
 (0)
Please sign in to comment.