Skip to content

Commit

Permalink
Merge branch 'v2' of https://github.com/gogf/clickhouse-go into v2-co…
Browse files Browse the repository at this point in the history
…mpatible
  • Loading branch information
DGuang21 committed Jun 10, 2022
2 parents 37f5f21 + a5c9ef7 commit 45ae208
Show file tree
Hide file tree
Showing 21 changed files with 436 additions and 112 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ jobs:
- 1.17
- 1.18
clickhouse:
- 19.11
- 20.1
- 21.11
- 22.1
- 21.8
- 22.3
- 22.4
- 22.5
- latest

services:
clickhouse:
image: yandex/clickhouse-server:${{ matrix.clickhouse }}
image: clickhouse/clickhouse-server:${{ matrix.clickhouse }}
ports:
- 9000:9000
options: --ulimit nofile=262144:262144
Expand All @@ -49,5 +49,5 @@ jobs:
- name: Run tests
run: |
go test -v .
go test -v ./tests
go test -v ./tests/...
go test -v ./lib/...
15 changes: 7 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ There are two version of this driver, v1 and v2, available as separate branches.

Users should use v2 which is production ready and [significantly faster than v1](#benchmark).

## Supported ClickHouse Versions

The driver is tested against the currently [supported versions](https://github.com/ClickHouse/ClickHouse/blob/master/SECURITY.md) of ClickHouse

## Key features

* Uses native ClickHouse TCP client-server protocol
Expand Down Expand Up @@ -144,20 +148,15 @@ This minimal tls.Config is normally all that is necessary to connect to the secu

If additional TLS parameters are necessary the application code should set the desired fields in the tls.Config struct. That can include specific cipher suites, forcing a particular TLS version (like 1.2 or 1.3), adding an internal CA certificate chain, adding a client certificate (and private key) if required by the ClickHouse server, and most of the other options that come with a more specialized security setup.

## Alternatives
## Third-party alternatives

* Database drivers
* Database drivers:
* [mailru/go-clickhouse](https://github.com/mailru/go-clickhouse) (uses the HTTP protocol)
* [uptrace/go-clickhouse](https://github.com/uptrace/go-clickhouse) (uses the native TCP protocol with `database/sql`-like API)
* drivers with columnar interface :
* Drivers with columnar interface:
* [vahid-sohrabloo/chconn](https://github.com/vahid-sohrabloo/chconn)
* [go-faster/ch](https://github.com/go-faster/ch)

* Insert collectors:
* [KittenHouse](https://github.com/YuriyNasretdinov/kittenhouse)
* [nikepan/clickhouse-bulk](https://github.com/nikepan/clickhouse-bulk)

### Useful projects

* [clickhouse-backup](https://github.com/AlexAkulov/clickhouse-backup)
* [go-graphite](https://github.com/go-graphite)
78 changes: 60 additions & 18 deletions bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ func bindPositional(tz *time.Location, query string, args ...interface{}) (_ str
return "", nil
}
}
params[i] = format(tz, v)
params[i], err = format(tz, v)
if err != nil {
return "", err
}
}
i := 0
query = bindPositionalRe.ReplaceAllStringFunc(query, func(n string) string {
Expand Down Expand Up @@ -116,7 +119,11 @@ func bindNumeric(tz *time.Location, query string, args ...interface{}) (_ string
return "", nil
}
}
params[fmt.Sprintf("$%d", i+1)] = format(tz, v)
val, err := format(tz, v)
if err != nil {
return "", err
}
params[fmt.Sprintf("$%d", i+1)] = val
}
query = bindNumericRe.ReplaceAllStringFunc(query, func(n string) string {
if _, found := params[n]; !found {
Expand Down Expand Up @@ -147,7 +154,11 @@ func bindNamed(tz *time.Location, query string, args ...interface{}) (_ string,
return "", err
}
}
params["@"+v.Name] = format(tz, value)
val, err := format(tz, value)
if err != nil {
return "", err
}
params["@"+v.Name] = val
}
}
query = bindNamedRe.ReplaceAllStringFunc(query, func(n string) string {
Expand All @@ -163,49 +174,80 @@ func bindNamed(tz *time.Location, query string, args ...interface{}) (_ string,
return query, nil
}

func format(tz *time.Location, v interface{}) string {
func format(tz *time.Location, v interface{}) (string, error) {
quote := func(v string) string {
return "'" + strings.NewReplacer(`\`, `\\`, `'`, `\'`).Replace(v) + "'"
}
switch v := v.(type) {
case nil:
return "NULL"
return "NULL", nil
case string:
return quote(v)
return quote(v), nil
case time.Time:
switch v.Location().String() {
case "Local":
return fmt.Sprintf("toDateTime(%d)", v.Unix())
return fmt.Sprintf("toDateTime(%d)", v.Unix()), nil
case tz.String():
return v.Format("toDateTime('2006-01-02 15:04:05')")
return v.Format("toDateTime('2006-01-02 15:04:05')"), nil
}
return v.Format("toDateTime('2006-01-02 15:04:05', '" + v.Location().String() + "')")
return v.Format("toDateTime('2006-01-02 15:04:05', '" + v.Location().String() + "')"), nil
case []interface{}: // tuple
elements := make([]string, 0, len(v))
for _, e := range v {
elements = append(elements, format(tz, e))
val, err := format(tz, e)
if err != nil {
return "", err
}
elements = append(elements, val)
}
return "(" + strings.Join(elements, ", ") + ")"
return "(" + strings.Join(elements, ", ") + ")", nil
case [][]interface{}:
items := make([]string, 0, len(v))
for _, t := range v {
items = append(items, format(tz, t))
val, err := format(tz, t)
if err != nil {
return "", err
}
items = append(items, val)
}
return strings.Join(items, ", ")
return strings.Join(items, ", "), nil
case fmt.Stringer:
return quote(v.String())
return quote(v.String()), nil
}
switch v := reflect.ValueOf(v); v.Kind() {
case reflect.String:
return quote(v.String())
return quote(v.String()), nil
case reflect.Slice:
values := make([]string, 0, v.Len())
for i := 0; i < v.Len(); i++ {
values = append(values, format(tz, v.Index(i).Interface()))
val, err := format(tz, v.Index(i).Interface())
if err != nil {
return "", err
}
values = append(values, val)
}
return strings.Join(values, ", ")
return strings.Join(values, ", "), nil
case reflect.Map: // map
values := make([]string, 0, len(v.MapKeys()))
for _, key := range v.MapKeys() {
name := fmt.Sprint(key.Interface())
if key.Kind() == reflect.String {
name = fmt.Sprintf("'%s'", name)
}
val, err := format(tz, v.MapIndex(key).Interface())
if err != nil {
return "", err
}
if v.MapIndex(key).Kind() == reflect.Slice {
// assume slices in maps are arrays
val = fmt.Sprintf("[%s]", val)
}
values = append(values, fmt.Sprintf("%s : %s", name, val))
}
return "{" + strings.Join(values, ", ") + "}", nil

}
return fmt.Sprint(v)
return fmt.Sprint(v), nil
}

func rebind(in []std_driver.NamedValue) []interface{} {
Expand Down
21 changes: 14 additions & 7 deletions bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,10 @@ func TestFormatTime(t *testing.T) {
tz, err = time.LoadLocation("Europe/London")
)
if assert.NoError(t, err) {
if assert.Equal(t, "toDateTime('2022-01-12 15:00:00')", format(t1.Location(), t1)) {
assert.Equal(t, "toDateTime('2022-01-12 15:00:00', 'UTC')", format(tz, t1))
val, _ := format(t1.Location(), t1)
if assert.Equal(t, "toDateTime('2022-01-12 15:00:00')", val) {
val, _ = format(tz, t1)
assert.Equal(t, "toDateTime('2022-01-12 15:00:00', 'UTC')", val)
}
}
}
Expand All @@ -197,19 +199,24 @@ func TestStringBasedType(t *testing.T) {
SupperString string
SupperSupperString string
)
require.Equal(t, "'a'", format(time.UTC, SupperString("a")))
require.Equal(t, "'a'", format(time.UTC, SupperSupperString("a")))
require.Equal(t, "'a', 'b', 'c'", format(time.UTC, []SupperSupperString{"a", "b", "c"}))
val, _ := format(time.UTC, SupperString("a"))
require.Equal(t, "'a'", val)
val, _ = format(time.UTC, SupperSupperString("a"))
require.Equal(t, "'a'", val)
val, _ = format(time.UTC, []SupperSupperString{"a", "b", "c"})
require.Equal(t, "'a', 'b', 'c'", val)
}

func TestFormatTuple(t *testing.T) {
assert.Equal(t, "('A', 1)", format(time.UTC, []interface{}{"A", 1}))
val, _ := format(time.UTC, []interface{}{"A", 1})
assert.Equal(t, "('A', 1)", val)
{
tuples := [][]interface{}{
[]interface{}{"A", 1},
[]interface{}{"B", 2},
}
assert.Equal(t, "('A', 1), ('B', 2)", format(time.UTC, tuples))
val, _ = format(time.UTC, tuples)
assert.Equal(t, "('A', 1), ('B', 2)", val)
}
}

Expand Down
14 changes: 11 additions & 3 deletions clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var (
ErrAcquireConnTimeout = errors.New("clickhouse: acquire conn timeout. you can increase the number of max open conn or the dial timeout")
ErrUnsupportedServerRevision = errors.New("clickhouse: unsupported server revision")
ErrBindMixedParamsFormats = errors.New("clickhouse [bind]: mixed named, numeric or positional parameters")
ErrAcquireConnNoAddress = errors.New("clickhouse: no valid address supplied")
)

type OpError struct {
Expand Down Expand Up @@ -187,14 +188,21 @@ func (ch *clickhouse) Stats() driver.Stats {

func (ch *clickhouse) dial(ctx context.Context) (conn *connect, err error) {
connID := int(atomic.AddInt64(&ch.connID, 1))
for num := range ch.opt.Addr {
if ch.opt.ConnOpenStrategy == ConnOpenRoundRobin {
num = int(connID) % len(ch.opt.Addr)
for i := range ch.opt.Addr {
var num int
switch ch.opt.ConnOpenStrategy {
case ConnOpenInOrder:
num = i
case ConnOpenRoundRobin:
num = (int(connID) + i) % len(ch.opt.Addr)
}
if conn, err = dial(ctx, ch.opt.Addr[num], connID, ch.opt); err == nil {
return conn, nil
}
}
if err == nil {
err = ErrAcquireConnNoAddress
}
return nil, err
}

Expand Down
10 changes: 7 additions & 3 deletions clickhouse_std.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,13 @@ func (o *stdConnOpener) Connect(ctx context.Context) (_ driver.Conn, err error)
conn *connect
connID = int(atomic.AddInt64(&globalConnID, 1))
)
for num := range o.opt.Addr {
if o.opt.ConnOpenStrategy == ConnOpenRoundRobin {
num = int(connID) % len(o.opt.Addr)
for i := range o.opt.Addr {
var num int
switch o.opt.ConnOpenStrategy {
case ConnOpenInOrder:
num = i
case ConnOpenRoundRobin:
num = (int(connID) + i) % len(o.opt.Addr)
}
if conn, err = dial(ctx, o.opt.Addr[num], connID, o.opt); err == nil {
return &stdDriver{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/pierrec/lz4/v4 v4.1.14
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/shopspring/decimal v1.3.1
github.com/stretchr/testify v1.7.1
github.com/stretchr/testify v1.7.2
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.opentelemetry.io/otel/trace v1.7.0
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFR
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/tklauser/go-sysconf v0.3.10 h1:IJ1AZGZRWbY8T5Vfk04D9WOA5WSejdflXxP03OUqALw=
github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk=
github.com/tklauser/numcpus v0.4.0 h1:E53Dm1HjH1/R2/aoCtXtPgzmElmn51aOkhCFSuZq//o=
Expand Down Expand Up @@ -98,5 +99,6 @@ google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
4 changes: 2 additions & 2 deletions lib/column/decimal.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,15 @@ func (col *Decimal) AppendRow(v interface{}) error {
func (col *Decimal) Decode(decoder *binary.Decoder, rows int) error {
switch col.nobits {
case 32:
var base UInt32
var base Int32
if err := base.Decode(decoder, rows); err != nil {
return err
}
for _, v := range base {
col.values = append(col.values, decimal.New(int64(v), int32(-col.scale)))
}
case 64:
var base UInt64
var base Int64
if err := base.Decode(decoder, rows); err != nil {
return err
}
Expand Down
20 changes: 6 additions & 14 deletions lib/column/ipv6.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,7 @@ func (col *IPv6) Append(v interface{}) (nulls []uint8, err error) {
nulls = make([]uint8, len(v))
for _, v := range v {
if len(v) != net.IPv6len {
return nil, &Error{
ColumnType: string(col.Type()),
Err: fmt.Errorf("invalid size. expected %d got %d", net.IPv6len, len(v)),
}
v = v.To16()
}
col.data = append(col.data, v[:]...)
}
Expand All @@ -84,13 +81,11 @@ func (col *IPv6) Append(v interface{}) (nulls []uint8, err error) {
for i, v := range v {
switch {
case v != nil:
if len(*v) != net.IPv6len {
return nil, &Error{
ColumnType: string(col.Type()),
Err: fmt.Errorf("invalid size. expected %d got %d", net.IPv6len, len(*v)),
}
}
//copy so we don't modify original value
tmp := *v
if len(tmp) != net.IPv6len {
tmp = tmp.To16()
}
col.data = append(col.data, tmp[:]...)
default:
col.data, nulls[i] = append(col.data, make([]byte, net.IPv6len)...), 1
Expand Down Expand Up @@ -128,10 +123,7 @@ func (col *IPv6) AppendRow(v interface{}) error {
}
}
if len(ip) != net.IPv6len {
return &Error{
ColumnType: string(col.Type()),
Err: fmt.Errorf("invalid size. expected %d got %d", net.IPv6len, len(ip)),
}
ip = ip.To16()
}
col.data = append(col.data, ip[:]...)
return nil
Expand Down
3 changes: 3 additions & 0 deletions lib/column/string.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package column

import (
"encoding"
"fmt"
"reflect"

Expand Down Expand Up @@ -54,6 +55,8 @@ func (col *String) ScanRow(dest interface{}, row int) error {
case **string:
*d = new(string)
**d = v[row]
case encoding.BinaryUnmarshaler:
return d.UnmarshalBinary(binary.Str2Bytes(v[row]))
default:
return &ColumnConverterError{
Op: "ScanRow",
Expand Down
Loading

0 comments on commit 45ae208

Please sign in to comment.