Skip to content

Conversation

@hai719
Copy link
Contributor

@hai719 hai719 commented Dec 8, 2025

What was changed

Support replication between Temporal servers with arbitrary shard counts.

Why?

Checklist

  1. Closes

  2. How was this tested:

  1. Any docs updates needed?

@hai719 hai719 requested a review from temporal-nick December 8, 2025 17:29
reportStreamValue: reportStreamValue,
shardCountConfig: shardCountConfig,
lcmParameters: lcmParameters,
clusterConnection: clusterConnection,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ClusterConnection is a huge context to pass around (MuxManager, inbound and outbound servers and clients, now ShardManager and intraProxyManager and Send/Ack channels). Can this code be written without this back-reference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored the structure and removed the use of ClusterConnection

@@ -0,0 +1,43 @@
worker_processes 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we using nginx in this change? I don't see it being started in main.go

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only for local testing for multi-proxy case. I can remove it.

uses: azure/[email protected]
with:
version: v3.17.3
version: v3.19.4
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this for? do the change in a separate PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to fix the helm error that I didn't see in main branch:

Error: plugin is installed but unusable: failed to load plugin at "/home/runner/.local/share/helm/plugins/helm-unittest.git/plugin.yaml": error unmarshaling JSON: while decoding JSON: json: unknown field "platformHooks"
Error: Process completed with exit code 1.```

}

// Add debug endpoint handler
http.HandleFunc("/debug/connections", func(w http.ResponseWriter, r *http.Request) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice stuff but prefer having a separate PR for this

Comment on lines +22 to +25
# shardCount:
# mode: "lcm"
# localShardCount: 3
# remoteShardCount: 2
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove comment code? we can create separate config if needed.

Comment on lines +30 to +32
history.shardUpdateMinInterval:
- value: 1s
history.ReplicationStreamSendEmptyTaskDuration:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are these config for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the sender side keep alive when there is no replication task. This will keep the replication stream alive and make sure receiver gets latest watermark.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that a new mandatory configuration that clients will need to enable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clients do not need to explicit enable it. The default value in OSS is 1 minute.

) error {

i.logger.Debug("InterceptStream", tag.NewAnyTag("method", info.FullMethod))
// Skip translation for intra-proxy streams
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Translation already happen at inbound/outbound server. The intraProxy stream will still go through inbound/outbound server. The translation is skipped here to avoid multiple translations.

@hai719 hai719 marked this pull request as ready for review December 26, 2025 18:08
@hai719 hai719 requested a review from a team as a code owner December 26, 2025 18:08
Comment on lines 372 to 378
if s.proxyB != nil {
s.proxyB.Stop()
}
} else {
if s.loadBalancerA != nil {
s.loadBalancerA.Stop()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be if-else if we're already nil-checking everything?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

Upstream *Upstream
}

TCPProxy struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some comments describing what this proxy is for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added.

for i, task := range attr.Messages.ReplicationTasks {
msg = append(msg, fmt.Sprintf("[%d]: %v", i, task.SourceTaskId))
}
f.logger.Info(fmt.Sprintf("forwarding ReplicationMessages: exclusive %v, tasks: %v", attr.Messages.ExclusiveHighWatermark, strings.Join(msg, ", ")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a large data dump. How often will this log? Do we want to put it behind a ticker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to debug level.

Comment on lines 345 to 360
forwarder := newStreamForwarder(
s.adminClient,
targetStreamServer,
streamServer,
targetMetadata,
sourceClusterShardID,
targetClusterShardID,
s.metricLabelValues,
logger,
)
err = forwarder.Run()
if err != nil {
return err
}
// Do not try to transfer EOF from the source here. Just returning "nil" is sufficient to terminate the stream
// to the client.
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the other half of "if s.shardCountConfig.Mode == config.ShardCountLCM" above. Let's put the body of that if statement into a method to match streamRouting and streamIntraProxyRouting, and then we can consolidate the dispatch into a single obvious place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated and move these into another file.

return nil
}

func (s *adminServiceProxyServer) streamIntraProxyRouting(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a future PR: The stream routing logic has outgrown this adminservice file, which is supposed to be the adminservice handler. We should put streamIntraProxyRouting, streamRouting,streamLCM, and streamDirect together into a separate file.

resp, err := server.DescribeCluster(ctx, req)
s.NoError(err)
s.Equal(c.expResp, resp)
s.Equal(c.expResp.FailoverVersionIncrement, resp.FailoverVersionIncrement)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave a comment that reminds us what parts of the response aren't supposed to match

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated with proto.Equal()

targetShardCount int32
logger log.Logger

clusterConnection *ClusterConnection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is only used to get a reference to the shardManager. Can we just put that here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

"github.com/temporalio/s2s-proxy/transport/mux"
"github.com/temporalio/s2s-proxy/transport/mux/session"
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment here describing that this is the handler for the proxy debug endpoint would be helpful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added.

Comment on lines +201 to +203
// lastWatermark tracks the last watermark received from source shard for late-registering target shards
lastWatermarkMu sync.RWMutex
lastWatermark *replicationv1.WorkflowReplicationMessages
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we replace this with atomic.Pointer? It should be both higher-performance and simpler to use

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert lastWatermark atomic.Pointer change due to test failure.
We can redo the change after oss ReplicationStreamSendEmptyTaskDuration is available.

proxy/proxy.go Outdated
Comment on lines 27 to 36
RoutedAck struct {
TargetShard history.ClusterShardID
Req *adminservice.StreamWorkflowReplicationMessagesRequest
}

// RoutedMessage wraps a replication response with originating client shard info
RoutedMessage struct {
SourceShard history.ClusterShardID
Resp *adminservice.StreamWorkflowReplicationMessagesResponse
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These seem orphaned. Do they belong in another file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved.

Comment on lines 401 to 407
shutdownChan := channel.NewShutdownOnce()
go func() {
if err := sender.Run(streamServer, shutdownChan); err != nil {
logger.Error("intraProxyStreamSender.Run error", tag.Error(err))
}
}()
<-shutdownChan.Channel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two problems:

  1. This shutdownChan doesn't escape anywhere, so this is equivalent to just calling sender.Run()
  2. A given clusterConnection and all its resources may be closed unilaterally from a config endpoint. Instead of creating a new shutdown channel, pass in the lifetime context when this handler is built during ClusterConnection and wait on that.

I think passing the lifetime through to sender.Run in place of shutdownChan should accomplish the need for shutdown in that function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The replication stream has different lifetime, not same as clusterConnection.

Copy link
Contributor

@temporal-nick temporal-nick Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. But, when the clusterConnection is terminated, the replication streams should close. There's a hierarchy of lifetimes:

Proxy -> []ClusterConnection -> MuxManager -> Inbound GRPC Servers -> []inbound AdminService Streams
                                           -> Outbound GRPC server -> []outbound AdminService Streams
                             -> StreamManager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wired lifetime.

Comment on lines 443 to 454
shutdownChan := channel.NewShutdownOnce()
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
proxyStreamSender.Run(streamServer, shutdownChan)
}()
go func() {
defer wg.Done()
proxyStreamReceiver.Run(shutdownChan)
}()
wg.Wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same point as above: Pass in the lifetime context instead of creating a new shutdownChan here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above.

We can redo the change after oss ReplicationStreamSendEmptyTaskDuration is available.
Comment on lines +137 to +153
getLCMParameters := func(shardCountConfig config.ShardCountConfig, inverse bool) LCMParameters {
if shardCountConfig.Mode != config.ShardCountLCM {
return LCMParameters{}
}
lcm := common.LCM(shardCountConfig.LocalShardCount, shardCountConfig.RemoteShardCount)
if inverse {
return LCMParameters{
LCM: lcm,
TargetShardCount: shardCountConfig.LocalShardCount,
}
}
return LCMParameters{
LCM: lcm,
TargetShardCount: shardCountConfig.RemoteShardCount,
}
}
getRoutingParameters := func(shardCountConfig config.ShardCountConfig, inverse bool, directionLabel string) RoutingParameters {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these need to be functions? It seems like you could just construct LCMParameters and RoutingParameters here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see they're called twice, but they each embed an if-else which has custom code for each branch. I'd recommend compressing this into a simple struct definition embedded into the serverConfiguration structs below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These functions can later be extracted (like xxTranslations) into separated file to make this function clean.

) {
// Terminate any previous local receiver for this shard
if r.shardManager != nil {
r.shardManager.TerminatePreviousLocalReceiver(r.sourceShardID, r.logger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future PR: Is it possible to avoid the need for this using context lifetimes and/or locking?

}

// ShardManager manages distributed shard ownership across proxy instances
ShardManager interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still reading through this. My initial impression is that this is a huge interface. Do we use all of these methods? Is there a higher-level interface we could expose that would remove ~50% of these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

Makefile Outdated
# Disable cgo by default.
CGO_ENABLED ?= 0
TEST_ARG ?= -race -timeout=5m -tags test_dep
TEST_ARG ?= -race -timeout=15m -tags test_dep -count=1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the tests really take 15 minutes to run? I think we need to change or separate them if this is the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test took 11m to finish when replication tests were run in sequence for easy debugging. Updated the tests to run in parallel.

Copy link
Contributor

@temporal-nick temporal-nick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current version is clean enough to continue iterating on in the main branch. Approving!

@temporal-nick temporal-nick merged commit 924155c into temporalio:main Jan 3, 2026
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants