From 0b54a5e5e5ac28a03b9a4a03615f78561705d0e4 Mon Sep 17 00:00:00 2001 From: Jack Westwood Date: Fri, 24 Oct 2025 15:53:05 +0100 Subject: [PATCH] ING-1307: Initial implementation of optimized routing --- go.mod | 26 +++--- go.sum | 58 ++++++++----- impl_kv_v1.go | 32 +++++++ load_balancing_policy.go | 180 +++++++++++++++++++++++++++++++++++++++ resolver.go | 107 +++++++++++++++++++++++ routingManager.go | 148 ++++++++++++++++++++++++++++++++ routingclient.go | 34 ++++++-- routingconn.go | 6 ++ 8 files changed, 550 insertions(+), 41 deletions(-) create mode 100644 load_balancing_policy.go create mode 100644 resolver.go create mode 100644 routingManager.go diff --git a/go.mod b/go.mod index 8a2c549..2732993 100644 --- a/go.mod +++ b/go.mod @@ -1,29 +1,31 @@ module github.com/couchbase/gocbcoreps -go 1.19 +go 1.24.0 + +toolchain go1.24.5 require ( github.com/couchbase/goprotostellar v1.0.2 github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 - go.opentelemetry.io/otel v1.24.0 - go.opentelemetry.io/otel/metric v1.24.0 - go.opentelemetry.io/otel/trace v1.24.0 + go.opentelemetry.io/otel v1.37.0 + go.opentelemetry.io/otel/metric v1.37.0 + go.opentelemetry.io/otel/trace v1.37.0 go.uber.org/zap v1.27.0 - google.golang.org/grpc v1.62.1 + google.golang.org/grpc v1.76.0 ) require ( - github.com/golang/protobuf v1.5.4 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 go.uber.org/multierr v1.11.0 // indirect - golang.org/x/net v0.22.0 // indirect - golang.org/x/sys v0.18.0 // indirect - golang.org/x/text v0.14.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect + golang.org/x/net v0.42.0 // indirect + golang.org/x/sys v0.34.0 // indirect + golang.org/x/text v0.27.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b // indirect ) require ( - github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect - google.golang.org/protobuf v1.33.0 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + google.golang.org/protobuf v1.36.6 // indirect ) diff --git a/go.sum b/go.sum index d49c9f6..df68fb0 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= -github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -30,7 +30,10 @@ github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaW github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -51,20 +54,28 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0/go.mod h1:Mjt1i1INqiaoZOMGR1RIUJN+i3ChKoFRqzrRQhlkbs0= -go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= -go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= -go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= -go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= -go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= -go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= +go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ= +go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I= +go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE= +go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E= +go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI= +go.opentelemetry.io/otel/sdk v1.37.0/go.mod h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg= +go.opentelemetry.io/otel/sdk/metric v1.37.0 h1:90lI228XrB9jCMuSdA0673aubgRobVZFhbjxHHspCPc= +go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps= +go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4= +go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= @@ -89,8 +100,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= -golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= +golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -103,12 +114,12 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= +golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4= +golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -122,22 +133,24 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b h1:zPKJod4w6F1+nRGDI9ubnXYhU9NSWoFAijkHkUXeTK8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= -google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= -google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A= +google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= @@ -145,5 +158,6 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/impl_kv_v1.go b/impl_kv_v1.go index 9dbe473..d2aae22 100644 --- a/impl_kv_v1.go +++ b/impl_kv_v1.go @@ -2,9 +2,11 @@ package gocbcoreps import ( "context" + "encoding/json" "github.com/couchbase/goprotostellar/genproto/kv_v1" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) type routingImpl_KvV1 struct { @@ -15,69 +17,99 @@ type routingImpl_KvV1 struct { var _ kv_v1.KvServiceClient = (*routingImpl_KvV1)(nil) func (c *routingImpl_KvV1) Get(ctx context.Context, in *kv_v1.GetRequest, opts ...grpc.CallOption) (*kv_v1.GetResponse, error) { + ctx = c.contextWithRoutingInfo(ctx, in.BucketName, in.Key) return c.client.fetchConnForKey(in.BucketName, in.Key).KvV1().Get(ctx, in, opts...) } func (c *routingImpl_KvV1) GetAndTouch(ctx context.Context, in *kv_v1.GetAndTouchRequest, opts ...grpc.CallOption) (*kv_v1.GetAndTouchResponse, error) { + ctx = c.contextWithRoutingInfo(ctx, in.BucketName, in.Key) return c.client.fetchConnForKey(in.BucketName, in.Key).KvV1().GetAndTouch(ctx, in, opts...) } func (c *routingImpl_KvV1) GetAndLock(ctx context.Context, in *kv_v1.GetAndLockRequest, opts ...grpc.CallOption) (*kv_v1.GetAndLockResponse, error) { + ctx = c.contextWithRoutingInfo(ctx, in.BucketName, in.Key) return c.client.fetchConnForKey(in.BucketName, in.Key).KvV1().GetAndLock(ctx, in, opts...) } func (c *routingImpl_KvV1) Unlock(ctx context.Context, in *kv_v1.UnlockRequest, opts ...grpc.CallOption) (*kv_v1.UnlockResponse, error) { + ctx = c.contextWithRoutingInfo(ctx, in.BucketName, in.Key) return c.client.fetchConnForKey(in.BucketName, in.Key).KvV1().Unlock(ctx, in, opts...) } func (c *routingImpl_KvV1) GetAllReplicas(ctx context.Context, in *kv_v1.GetAllReplicasRequest, opts ...grpc.CallOption) (kv_v1.KvService_GetAllReplicasClient, error) { + ctx = c.contextWithRoutingInfo(ctx, in.BucketName, in.Key) return c.client.fetchConnForKey(in.BucketName, in.Key).KvV1().GetAllReplicas(ctx, in, opts...) } func (c *routingImpl_KvV1) Touch(ctx context.Context, in *kv_v1.TouchRequest, opts ...grpc.CallOption) (*kv_v1.TouchResponse, error) { + ctx = c.contextWithRoutingInfo(ctx, in.BucketName, in.Key) return c.client.fetchConnForKey(in.BucketName, in.Key).KvV1().Touch(ctx, in, opts...) } func (c *routingImpl_KvV1) Exists(ctx context.Context, in *kv_v1.ExistsRequest, opts ...grpc.CallOption) (*kv_v1.ExistsResponse, error) { + ctx = c.contextWithRoutingInfo(ctx, in.BucketName, in.Key) return c.client.fetchConnForKey(in.BucketName, in.Key).KvV1().Exists(ctx, in, opts...) } func (c *routingImpl_KvV1) Insert(ctx context.Context, in *kv_v1.InsertRequest, opts ...grpc.CallOption) (*kv_v1.InsertResponse, error) { + ctx = c.contextWithRoutingInfo(ctx, in.BucketName, in.Key) return c.client.fetchConnForKey(in.BucketName, in.Key).KvV1().Insert(ctx, in, opts...) } func (c *routingImpl_KvV1) Upsert(ctx context.Context, in *kv_v1.UpsertRequest, opts ...grpc.CallOption) (*kv_v1.UpsertResponse, error) { + ctx = c.contextWithRoutingInfo(ctx, in.BucketName, in.Key) return c.client.fetchConnForKey(in.BucketName, in.Key).KvV1().Upsert(ctx, in, opts...) } func (c *routingImpl_KvV1) Replace(ctx context.Context, in *kv_v1.ReplaceRequest, opts ...grpc.CallOption) (*kv_v1.ReplaceResponse, error) { + ctx = c.contextWithRoutingInfo(ctx, in.BucketName, in.Key) return c.client.fetchConnForKey(in.BucketName, in.Key).KvV1().Replace(ctx, in, opts...) } func (c *routingImpl_KvV1) Remove(ctx context.Context, in *kv_v1.RemoveRequest, opts ...grpc.CallOption) (*kv_v1.RemoveResponse, error) { + ctx = c.contextWithRoutingInfo(ctx, in.BucketName, in.Key) return c.client.fetchConnForKey(in.BucketName, in.Key).KvV1().Remove(ctx, in, opts...) } func (c *routingImpl_KvV1) Increment(ctx context.Context, in *kv_v1.IncrementRequest, opts ...grpc.CallOption) (*kv_v1.IncrementResponse, error) { + ctx = c.contextWithRoutingInfo(ctx, in.BucketName, in.Key) return c.client.fetchConnForKey(in.BucketName, in.Key).KvV1().Increment(ctx, in, opts...) } func (c *routingImpl_KvV1) Decrement(ctx context.Context, in *kv_v1.DecrementRequest, opts ...grpc.CallOption) (*kv_v1.DecrementResponse, error) { + ctx = c.contextWithRoutingInfo(ctx, in.BucketName, in.Key) return c.client.fetchConnForKey(in.BucketName, in.Key).KvV1().Decrement(ctx, in, opts...) } func (c *routingImpl_KvV1) Append(ctx context.Context, in *kv_v1.AppendRequest, opts ...grpc.CallOption) (*kv_v1.AppendResponse, error) { + ctx = c.contextWithRoutingInfo(ctx, in.BucketName, in.Key) return c.client.fetchConnForKey(in.BucketName, in.Key).KvV1().Append(ctx, in, opts...) } func (c *routingImpl_KvV1) Prepend(ctx context.Context, in *kv_v1.PrependRequest, opts ...grpc.CallOption) (*kv_v1.PrependResponse, error) { + ctx = c.contextWithRoutingInfo(ctx, in.BucketName, in.Key) return c.client.fetchConnForKey(in.BucketName, in.Key).KvV1().Prepend(ctx, in, opts...) } func (c *routingImpl_KvV1) LookupIn(ctx context.Context, in *kv_v1.LookupInRequest, opts ...grpc.CallOption) (*kv_v1.LookupInResponse, error) { + ctx = c.contextWithRoutingInfo(ctx, in.BucketName, in.Key) return c.client.fetchConnForKey(in.BucketName, in.Key).KvV1().LookupIn(ctx, in, opts...) } func (c *routingImpl_KvV1) MutateIn(ctx context.Context, in *kv_v1.MutateInRequest, opts ...grpc.CallOption) (*kv_v1.MutateInResponse, error) { + ctx = c.contextWithRoutingInfo(ctx, in.BucketName, in.Key) return c.client.fetchConnForKey(in.BucketName, in.Key).KvV1().MutateIn(ctx, in, opts...) } + +func (c *routingImpl_KvV1) contextWithRoutingInfo(ctx context.Context, bucketName, key string) context.Context { + routingInfo := c.client.RoutingManager().FindFor(ctx, bucketName, key) + + bytes, err := json.Marshal(routingInfo) + if err != nil { + // TO DO -decide what we should do if we get here + return ctx + } + + md := metadata.Pairs("routinginfo", string(bytes)) + return metadata.NewOutgoingContext(ctx, md) +} diff --git a/load_balancing_policy.go b/load_balancing_policy.go new file mode 100644 index 0000000..94ec40d --- /dev/null +++ b/load_balancing_policy.go @@ -0,0 +1,180 @@ +package gocbcoreps + +import ( + "encoding/json" + "fmt" + "net" + "sync/atomic" + + "go.uber.org/zap" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/endpointsharding" + "google.golang.org/grpc/balancer/roundrobin" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/serviceconfig" +) + +const customLBName = "custom_load_balancer" + +type customLBConfig struct { + serviceconfig.LoadBalancingConfig `json:"-"` +} + +type CustomLBBuilder struct { + logger *zap.Logger +} + +func (CustomLBBuilder) Name() string { + return customLBName +} + +func (CustomLBBuilder) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + lbConfig := &customLBConfig{} + + if err := json.Unmarshal(s, lbConfig); err != nil { + return nil, fmt.Errorf(customLBName+": unable to unmarshal customLBConfig: %v", err) + } + return lbConfig, nil +} + +func (clbb CustomLBBuilder) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { + cb := &customLB{ + ClientConn: cc, + bOpts: bOpts, + logger: clbb.logger, + } + + cb.Balancer = endpointsharding.NewBalancer(cb, bOpts, balancer.Get(roundrobin.Name).Build, endpointsharding.Options{}) + return cb +} + +type customLB struct { + // All state and operations on this balancer are either initialized at build + // time and read only after, or are only accessed as part of its + // balancer.Balancer API (UpdateState from children only comes in from + // balancer.Balancer calls as well, and children are called one at a time), + // in which calls are guaranteed to come synchronously. Thus, no extra + // synchronization is required in this balancer. + balancer.Balancer + balancer.ClientConn + bOpts balancer.BuildOptions + logger *zap.Logger + + cfg atomic.Pointer[customLBConfig] +} + +// Called by the gRPC channel to inform the load balancing policy of: +// - updated backend addresses +// - updated service config (if any) +func (clb *customLB) UpdateClientConnState(state balancer.ClientConnState) error { + clbCfg, ok := state.BalancerConfig.(*customLBConfig) + if !ok { + return balancer.ErrBadResolverState + } + + clb.cfg.Store(clbCfg) + // A call to UpdateClientConnState should always produce a new Picker. That + // is guaranteed to happen since the aggregator will always call + // UpdateChildState in its UpdateClientConnState. + return clb.Balancer.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: state.ResolverState, + }) +} + +// Called by the load balancing poicy to update the connectivity state of the +// balancer and initialise a new picker. It is called when: +// - New addresses/config received +// - Subchannel state changes +func (clb *customLB) UpdateState(state balancer.State) { + if state.ConnectivityState == connectivity.Ready { + childStates := endpointsharding.ChildStatesFromPicker(state.Picker) + var readyPickers []balancer.Picker + addressToPicker := make(map[string]balancer.Picker) + for _, childState := range childStates { + if childState.State.ConnectivityState == connectivity.Ready { + readyPickers = append(readyPickers, childState.State.Picker) + + // TODO - I think just using the first address is alright here + // because of how we resolve things, but need to verify. + host, _, err := net.SplitHostPort(childState.Endpoint.Addresses[0].Addr) + if err != nil { + // TO DO - properly handle this error + clb.logger.Error("failed to split host address", zap.String("address", childState.Endpoint.Addresses[0].Addr)) + continue + } + + addressToPicker[host] = childState.State.Picker + } + } + + picker := &customLoadBalancerPicker{ + addressToPicker: addressToPicker, + logger: clb.logger, + } + clb.ClientConn.UpdateState(balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: picker, + }) + return + } + + // Delegate to default behavior/picker from below. + clb.ClientConn.UpdateState(state) +} + +type customLoadBalancerPicker struct { + addressToPicker map[string]balancer.Picker + + logger *zap.Logger +} + +func (clbp *customLoadBalancerPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + // TODO - optimized the performance of this, any slowness here is very bad + // since this is called for every RPC + md, ok := metadata.FromOutgoingContext(info.Ctx) + if ok { + routingInfo := md["routinginfo"] + + if len(routingInfo) == 0 { + return clbp.pickFirst().Pick(info) + } + + var routingInfoStruct RoutingInfo + + // TODO - would it be more performant to just check if the substring of + // an address in within the routing information instead of unmarshaling? + err := json.Unmarshal([]byte(routingInfo[0]), &routingInfoStruct) + if err != nil { + // TODO - wrt performance concerns maybe we shouldn't be logging in + // Pick() + clbp.logger.Error("failed to unmarshal routing info", zap.Any("routingInfo", routingInfo[0])) + return clbp.pickFirst().Pick(info) + } + + fmt.Printf("%+v\n", routingInfoStruct) + + for k, _ := range clbp.addressToPicker { + fmt.Println(k) + } + + for _, addr := range routingInfoStruct.LocalServers { + bestPicker, ok := clbp.addressToPicker[addr] + if ok { + fmt.Println("BEST PICKER FOUND") + return bestPicker.Pick(info) + } + } + } + + childPicker := clbp.pickFirst() + return childPicker.Pick(info) +} + +// TO DO - probably want to implement round robin picking here. +func (clbp *customLoadBalancerPicker) pickFirst() balancer.Picker { + for _, pkr := range clbp.addressToPicker { + return pkr + } + return nil +} diff --git a/resolver.go b/resolver.go new file mode 100644 index 0000000..092dbae --- /dev/null +++ b/resolver.go @@ -0,0 +1,107 @@ +package gocbcoreps + +import ( + "fmt" + "net" + "time" + + "go.uber.org/zap" + "google.golang.org/grpc/resolver" +) + +const customServiceConfig = `{ + "loadBalancingPolicy": "custom_load_balancer" +}` + +// TODO - this should probably be named something better than this, same applies +// above +const customScheme = "custom" + +type CustomResolverBuilder struct { + logger *zap.Logger +} + +func (c *CustomResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, rOpts resolver.BuildOptions) (resolver.Resolver, error) { + r := &customResolver{ + target: target, + cc: cc, + done: make(chan struct{}), + } + r.start() + return r, nil +} + +func (*CustomResolverBuilder) Scheme() string { return customScheme } + +type customResolver struct { + target resolver.Target + cc resolver.ClientConn + done chan struct{} + logger *zap.Logger +} + +func (r *customResolver) start() { + host, port, err := net.SplitHostPort(r.target.Endpoint()) + if err != nil { + // TO DO - properly handle this error + fmt.Println("Split Error:", err) + return + } + + ips, err := net.LookupIP(host) + if err != nil { + fmt.Println("Error:", err) + return + } + + var ipv4s []string + for _, ip := range ips { + if ip.To4() != nil { + ipv4s = append(ipv4s, ip.String()) + } + } + + addrs := make([]resolver.Endpoint, len(ipv4s)) + for i, ip := range ipv4s { + addr := ip + ":" + port + fmt.Println(addr) + + addrs[i] = resolver.Endpoint{ + Addresses: []resolver.Address{{Addr: addr}}, + } + + } + + // Watch for changes in the resolution of the hostname + go r.watch() + + // Make the gRPC channel use our sutom load balancing policy + cfg := r.cc.ParseServiceConfig(customServiceConfig) + + r.cc.UpdateState(resolver.State{ + Endpoints: addrs, + ServiceConfig: cfg, + }) +} + +// TODO - make the watch mechanism smarter so we don't update the state when +// the addresses haven't changed. +func (r *customResolver) watch() { + ticker := time.NewTicker(60 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + r.start() + case <-r.done: + return + } + } +} + +func (*customResolver) ResolveNow(resolver.ResolveNowOptions) {} + +func (r *customResolver) Close() { + close(r.done) +} diff --git a/routingManager.go b/routingManager.go new file mode 100644 index 0000000..fef40ca --- /dev/null +++ b/routingManager.go @@ -0,0 +1,148 @@ +package gocbcoreps + +import ( + "context" + "errors" + "hash/crc32" + "net" + "sync" + + "github.com/couchbase/goprotostellar/genproto/routing_v1" + "go.uber.org/zap" +) + +type routingManager struct { + client routing_v1.RoutingServiceClient + + logger *zap.Logger + + // For each bucket we store an array of routingInfo, one entry per vBucket + bucketToRouting map[string][1024]RoutingInfo + + updateLock sync.Mutex +} + +type RoutingInfo struct { + // LocalServers is the list of servers that have the vBucket local to them + LocalServers []string `json:"localServers"` + + // TODO add support for listing servers that have the vBucket in their server group. +} + +func newRoutingManager(client routing_v1.RoutingServiceClient, logger *zap.Logger) *routingManager { + return &routingManager{ + client: client, + logger: logger, + bucketToRouting: make(map[string][1024]RoutingInfo), + } +} + +func (r *routingManager) FindFor(ctx context.Context, bucket, key string) RoutingInfo { + // We don't want to perform an RPC every time we call this so we should have some + // watch thread on a per bucket basis that updates a map in our manger that we + // can quickly access (under a lock) here. + r.updateLock.Lock() + vbRoutingInfo, ok := r.bucketToRouting[bucket] + r.updateLock.Unlock() + if !ok { + go r.watchBucket(ctx, bucket) + // TODO - decide what the behaviour should be here, do we wait until we get the routing + // information or do we just let the watch thread do it's thing in the background and the + // next request to the bucket will have an entry. For now we do the latter. + return RoutingInfo{} + } + + vbId := vBidFromKey(key) + + return vbRoutingInfo[vbId] +} + +func vBidFromKey(key string) uint16 { + crc := crc32.ChecksumIEEE([]byte(key)) + crcMidBits := uint16(crc>>16) & ^uint16(0x8000) + // Stole this from VbucketByKey in gocbcorex, there we mod by the number of + // entries in the vBucketMap, can we just hardcode 1024 here? + return crcMidBits % uint16(1024) +} + +func (r *routingManager) watchBucket(ctx context.Context, bucket string) error { + routingStream, err := r.client.WatchRouting(context.Background(), &routing_v1.WatchRoutingRequest{ + BucketName: &bucket, + }) + if err != nil { + return err + } + + outputCh := make(chan [1024]RoutingInfo) + go func() { + for { + routingResp, err := routingStream.Recv() + if err != nil { + r.logger.Error("failed to recv updated topology", zap.Error(err)) + break + } + + vbIdToLocalServer, err := r.vbIdToLocalServersFromResp(*routingResp) + if err != nil { + r.logger.Error("failed to extract routing infor from response", zap.Error(err)) + break + } + + outputCh <- vbIdToLocalServer + } + }() + + // We want to leave this running so that any vBucketMap changes pushed + // to us on teh watch routing stream can be reflected in the manager. + for { + select { + // TO DO - handle retries/timeouts here. May need to create a new + // context and pass to watchRouting because if we use the top level + //config passed in then this will get canceled when the top level op + // finishes. + // case <-ctx.Done(): + // return nil + case vbIdToLocalSrv := <-outputCh: + r.updateLock.Lock() + r.bucketToRouting[bucket] = vbIdToLocalSrv + r.updateLock.Unlock() + } + } + +} + +// This takes a watch routing response and builds a map from vBucket ID to +// ip address of the cng instance to which that vBucket is local. +func (r *routingManager) vbIdToLocalServersFromResp(resp routing_v1.WatchRoutingResponse) ([1024]RoutingInfo, error) { + var vbIdToLocalServer [1024]RoutingInfo + + vBucketRoutingStrategy, ok := resp.DataRouting.(*routing_v1.WatchRoutingResponse_VbucketDataRouting) + if !ok { + return vbIdToLocalServer, errors.New("unexpected data routing implementation in response") + } + + for _, ep := range vBucketRoutingStrategy.VbucketDataRouting.Endpoints { + // The server will tell us the host name's of nodes along with the port + // we need to split the host from the port and resolve the ip addr. + host, _, err := net.SplitHostPort(resp.Endpoints[ep.EndpointIdx].Address) + if err != nil { + r.logger.Error("splitting host name from port", zap.String("address", resp.Endpoints[ep.EndpointIdx].Address)) + continue + } + + ipAdr, err := net.LookupIP(host) + if err != nil { + r.logger.Error("looking up ip for node host", zap.String("host", host)) + continue + } + + // TO DO - atm this assumes that each vBucket is only local to one CNG. + for _, vbId := range ep.LocalVbuckets { + vbIdToLocalServer[vbId] = RoutingInfo{ + LocalServers: []string{ipAdr[0].String()}, + } + } + } + + return vbIdToLocalServer, nil +} diff --git a/routingclient.go b/routingclient.go index 58090fb..dd203fa 100644 --- a/routingclient.go +++ b/routingclient.go @@ -9,6 +9,8 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/resolver" grpc_logsettable "github.com/grpc-ecosystem/go-grpc-middleware/logging/settable" "go.uber.org/zap/zapgrpc" @@ -35,10 +37,11 @@ type routingClient_Bucket struct { } type RoutingClient struct { - routing *atomicRoutingTable - lock sync.Mutex - buckets map[string]*routingClient_Bucket - logger *zap.Logger + routing *atomicRoutingTable + routingManager *routingManager + lock sync.Mutex + buckets map[string]*routingClient_Bucket + logger *zap.Logger } // Verify that RoutingClient implements Conn @@ -85,6 +88,16 @@ func DialContext(ctx context.Context, target string, opts *DialOptions) (*Routin poolSize = opts.PoolSize } + // TO DO - consider if these belong elsewhere - we can always register the custom + // balancer/resolver, and whether they are used will be dependent on if we + // prepend the custom scheme to the address when we call DialContext + balancer.Register(CustomLBBuilder{ + logger: logger, + }) + resolver.Register(&CustomResolverBuilder{ + logger: logger, + }) + for i := uint32(0); i < poolSize; i++ { conn, err := dialRoutingConn(ctx, target, &routingConnOptions{ RootCAs: opts.RootCAs, @@ -107,10 +120,13 @@ func DialContext(ctx context.Context, target string, opts *DialOptions) (*Routin Conns: newRoutingConnPool(conns), }) + routingManager := newRoutingManager(conns[0].routingV1, logger) + return &RoutingClient{ - routing: routing, - buckets: make(map[string]*routingClient_Bucket), - logger: logger, + routing: routing, + routingManager: routingManager, + buckets: make(map[string]*routingClient_Bucket), + logger: logger, }, nil } @@ -228,6 +244,10 @@ func (c *RoutingClient) SearchAdminV1() admin_search_v1.SearchAdminServiceClient return &routingImpl_SearchAdminV1{c} } +func (c *RoutingClient) RoutingManager() *routingManager { + return c.routingManager +} + func (c *RoutingClient) Close() error { table := c.routing.Load() if table == nil { diff --git a/routingconn.go b/routingconn.go index 237b9d7..bc89800 100644 --- a/routingconn.go +++ b/routingconn.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "crypto/x509" "errors" + "fmt" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/propagation" @@ -115,6 +116,11 @@ func dialRoutingConn(ctx context.Context, address string, opts *routingConnOptio dialOpts = append(dialOpts, grpc.WithStatsHandler(otelgrpc.NewClientHandler(clientOpts...))) dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.MaxRecvMsgSizeCallOption{MaxRecvMsgSize: maxMsgSize})) + // TO DO - make this configurable some how... + // This will make grpc use our custom name resolution + + address = fmt.Sprintf("%s:///%s", customScheme, address) + conn, err := grpc.DialContext(ctx, address, dialOpts...) if err != nil { return nil, err