-
Notifications
You must be signed in to change notification settings - Fork 6
arbitrary shard counts #177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 32 commits
6078a49
4b1eb9b
de6633b
d797c52
1b20ad0
fbc4af4
9f5cd62
b51520a
85c6a97
04046e5
796bef6
6e98448
ccdb29d
939d6b2
4ed7e50
5fb2b88
9a9535a
4c716c9
cf59b2d
4813e58
d2d52f2
286f51f
0b7b2d4
34e07db
2f26e6f
970b6ad
bed6b1d
6822113
949ec25
e1dc330
6bd8449
40b222c
affc6bb
9012f6b
7bab5d2
7290d27
2fa9239
873b2ce
a75d610
fc1f735
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -59,15 +59,15 @@ jobs: | |
| cache: ${{ github.ref == 'refs/heads/main' }} # only update the cache in main. | ||
|
|
||
| - name: Run go build | ||
| run: go build ./... | ||
| run: make bins | ||
|
|
||
| - name: Run go unittest | ||
| run: make test | ||
|
|
||
| - name: Install helm | ||
| uses: azure/[email protected] | ||
| with: | ||
| version: v3.17.3 | ||
| version: v3.19.4 | ||
|
|
||
| - name: Install helm-unittest | ||
| run: helm plugin install https://github.com/helm-unittest/helm-unittest.git | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,7 +11,7 @@ GO_GET_TOOL = go get -tool -modfile=$(TOOLS_MOD_FILE) | |
|
|
||
| # Disable cgo by default. | ||
| CGO_ENABLED ?= 0 | ||
| TEST_ARG ?= -race -timeout=5m -tags test_dep | ||
| TEST_ARG ?= -race -timeout=15m -tags test_dep -count=1 | ||
|
||
| BENCH_ARG ?= -benchtime=5000x | ||
|
|
||
| ALL_SRC := $(shell find . -name "*.go") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -63,12 +63,17 @@ func buildCLIOptions() *cli.App { | |
| return app | ||
| } | ||
|
|
||
| func startPProfHTTPServer(logger log.Logger, c config.ProfilingConfig) { | ||
| func startPProfHTTPServer(logger log.Logger, c config.ProfilingConfig, proxyInstance *proxy.Proxy) { | ||
| addr := c.PProfHTTPAddress | ||
| if len(addr) == 0 { | ||
| return | ||
| } | ||
|
|
||
| // Add debug endpoint handler | ||
| http.HandleFunc("/debug/connections", func(w http.ResponseWriter, r *http.Request) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice stuff but prefer having a separate PR for this |
||
| proxy.HandleDebugInfo(w, r, proxyInstance, logger) | ||
| }) | ||
|
|
||
| go func() { | ||
| logger.Info("Start pprof http server", tag.NewStringTag("address", addr)) | ||
| if err := http.ListenAndServe(addr, nil); err != nil { | ||
|
|
@@ -101,7 +106,7 @@ func startProxy(c *cli.Context) error { | |
| } | ||
|
|
||
| cfg := proxyParams.ConfigProvider.GetS2SProxyConfig() | ||
| startPProfHTTPServer(proxyParams.Logger, cfg.ProfilingConfig) | ||
| startPProfHTTPServer(proxyParams.Logger, cfg.ProfilingConfig, proxyParams.Proxy) | ||
|
|
||
| if err := proxyParams.Proxy.Start(); err != nil { | ||
| return err | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| package common | ||
|
|
||
| import ( | ||
| "context" | ||
|
|
||
| "google.golang.org/grpc/metadata" | ||
| ) | ||
|
|
||
| const ( | ||
| // Intra-proxy identification and tracing headers | ||
| IntraProxyHeaderKey = "x-s2s-intra-proxy" | ||
| IntraProxyHeaderValue = "1" | ||
| IntraProxyOriginProxyIDHeader = "x-s2s-origin-proxy-id" | ||
| IntraProxyHopCountHeader = "x-s2s-hop-count" | ||
| IntraProxyTraceIDHeader = "x-s2s-trace-id" | ||
| ) | ||
|
|
||
| // IsIntraProxy checks incoming context metadata for intra-proxy marker. | ||
| func IsIntraProxy(ctx context.Context) bool { | ||
| if md, ok := metadata.FromIncomingContext(ctx); ok { | ||
| if vals := md.Get(IntraProxyHeaderKey); len(vals) > 0 && vals[0] == IntraProxyHeaderValue { | ||
| return true | ||
| } | ||
| } | ||
| return false | ||
| } | ||
|
|
||
| // WithIntraProxyHeaders returns a new outgoing context with intra-proxy headers set. | ||
| func WithIntraProxyHeaders(ctx context.Context, headers map[string]string) context.Context { | ||
| md, _ := metadata.FromOutgoingContext(ctx) | ||
| md = md.Copy() | ||
| md.Set(IntraProxyHeaderKey, IntraProxyHeaderValue) | ||
| for k, v := range headers { | ||
| md.Set(k, v) | ||
| } | ||
| return metadata.NewOutgoingContext(ctx, md) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| inbound: | ||
| name: "a-inbound-server" | ||
| server: | ||
| type: "mux" | ||
| mux: "muxed" | ||
| client: | ||
| tcp: | ||
| serverAddress: "localhost:7233" | ||
| outbound: | ||
| name: "a-outbound-server" | ||
| server: | ||
| tcp: | ||
| listenAddress: "0.0.0.0:6133" | ||
| client: | ||
| type: "mux" | ||
| mux: "muxed" | ||
| mux: | ||
| - name: "muxed" | ||
| mode: "client" | ||
| client: | ||
| serverAddress: "localhost:7003" | ||
| profiling: | ||
| pprofAddress: "localhost:6060" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| inbound: | ||
| name: "a-inbound-server" | ||
| server: | ||
| type: "mux" | ||
| mux: "muxed" | ||
| client: | ||
| tcp: | ||
| serverAddress: "localhost:7233" | ||
| outbound: | ||
| name: "a-outbound-server" | ||
| server: | ||
| tcp: | ||
| listenAddress: "0.0.0.0:6233" | ||
| client: | ||
| type: "mux" | ||
| mux: "muxed" | ||
| mux: | ||
| - name: "muxed" | ||
| mode: "client" | ||
| client: | ||
| serverAddress: "localhost:7003" | ||
| profiling: | ||
| pprofAddress: "localhost:6061" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| inbound: | ||
| name: "b-inbound-server" | ||
| server: | ||
| type: "mux" | ||
| mux: "muxed" | ||
| client: | ||
| tcp: | ||
| serverAddress: "localhost:8233" | ||
| outbound: | ||
| name: "b-outbound-server" | ||
| server: | ||
| tcp: | ||
| listenAddress: "0.0.0.0:6333" | ||
| client: | ||
| type: "mux" | ||
| mux: "muxed" | ||
| mux: | ||
| - name: "muxed" | ||
| mode: "server" | ||
| server: | ||
| listenAddress: "0.0.0.0:6334" | ||
| # shardCount: | ||
| # mode: "lcm" | ||
| # localShardCount: 3 | ||
| # remoteShardCount: 2 | ||
|
Comment on lines
+22
to
+25
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we remove comment code? we can create separate config if needed. |
||
| shardCount: | ||
| mode: "routing" | ||
| localShardCount: 3 | ||
| remoteShardCount: 2 | ||
| profiling: | ||
| pprofAddress: "localhost:6070" | ||
| memberlist: | ||
| enabled: true | ||
| nodeName: "proxy-node-b-1" | ||
| bindAddr: "127.0.0.1" | ||
| bindPort: 6335 | ||
| # joinAddrs: | ||
| # - "localhost:6435" | ||
| proxyAddresses: | ||
| "proxy-node-b-1": "localhost:6333" | ||
| "proxy-node-b-2": "localhost:6433" | ||
| # # TCP-only configuration for restricted networks | ||
| tcpOnly: true # Use TCP transport only, disable UDP | ||
| # disableTCPPings: true # Disable TCP pings for faster convergence | ||
| # probeTimeoutMs: 1000 # Longer timeout for network latency | ||
| # probeIntervalMs: 2000 # Less frequent probes to reduce network noise | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| inbound: | ||
| name: "b-inbound-server" | ||
| server: | ||
| type: "mux" | ||
| mux: "muxed" | ||
| client: | ||
| tcp: | ||
| serverAddress: "localhost:8233" | ||
| outbound: | ||
| name: "b-outbound-server" | ||
| server: | ||
| tcp: | ||
| listenAddress: "0.0.0.0:6433" | ||
| client: | ||
| type: "mux" | ||
| mux: "muxed" | ||
| mux: | ||
| - name: "muxed" | ||
| mode: "server" | ||
| server: | ||
| listenAddress: "0.0.0.0:6434" | ||
| # shardCount: | ||
| # mode: "lcm" | ||
| # localShardCount: 3 | ||
| # remoteShardCount: 2 | ||
| shardCount: | ||
| mode: "routing" | ||
| localShardCount: 3 | ||
| remoteShardCount: 2 | ||
| profiling: | ||
| pprofAddress: "localhost:6071" | ||
| memberlist: | ||
| enabled: true | ||
| nodeName: "proxy-node-b-2" | ||
| bindAddr: "127.0.0.1" | ||
| bindPort: 6435 | ||
| joinAddrs: | ||
| - "localhost:6335" | ||
| proxyAddresses: | ||
| "proxy-node-b-1": "localhost:6333" | ||
| "proxy-node-b-2": "localhost:6433" | ||
| # # TCP-only configuration for restricted networks | ||
| # tcpOnly: true # Use TCP transport only, disable UDP | ||
| # disableTCPPings: true # Disable TCP pings for faster convergence | ||
| # probeTimeoutMs: 1000 # Longer timeout for network latency | ||
| # probeIntervalMs: 2000 # Less frequent probes to reduce network noise |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,10 +20,14 @@ history.ReplicationEnableUpdateWithNewTaskMerge: | |
| history.enableWorkflowExecutionTimeoutTimer: | ||
| - value: true | ||
| history.EnableReplicationTaskTieredProcessing: | ||
| - value: true | ||
| - value: false | ||
| history.persistenceMaxQPS: | ||
| - value: 100000 | ||
| constraints: {} | ||
| frontend.persistenceMaxQPS: | ||
| - value: 100000 | ||
| constraints: {} | ||
| constraints: {} | ||
| history.shardUpdateMinInterval: | ||
| - value: 1s | ||
| history.ReplicationStreamSendEmptyTaskDuration: | ||
|
Comment on lines
+30
to
+32
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what are these config for?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the sender side keep alive when there is no replication task. This will keep the replication stream alive and make sure receiver gets latest watermark.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is that a new mandatory configuration that clients will need to enable?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Clients do not need to explicit enable it. The default value in OSS is 1 minute. |
||
| - value: 10s | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this for? do the change in a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to fix the helm error that I didn't see in main branch: