Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
6078a49
add debug endpoint; add test configs
hai719 Jul 29, 2025
4b1eb9b
add shard manager
hai719 Jul 31, 2025
de6633b
proxy routing
hai719 Aug 7, 2025
d797c52
route only in target proxy; remap taskID and track ack in proxy.
hai719 Sep 3, 2025
1b20ad0
update
hai719 Sep 4, 2025
fbc4af4
retry when shard not available
hai719 Sep 5, 2025
9f5cd62
add tags to log
hai719 Sep 5, 2025
b51520a
add intra proxy streams
hai719 Sep 23, 2025
85c6a97
fix incorrect stream pair
hai719 Sep 26, 2025
04046e5
add log for debugging
hai719 Sep 26, 2025
796bef6
fix issues
hai719 Oct 13, 2025
6e98448
fix streams
hai719 Oct 13, 2025
ccdb29d
add ring_max_size to debug
hai719 Oct 13, 2025
939d6b2
fix panic; fix memberlist join issue
hai719 Oct 16, 2025
4ed7e50
fix panic
hai719 Oct 17, 2025
5fb2b88
update regarding cluster_connection
hai719 Dec 8, 2025
9a9535a
Merge branch 'main' into hai719/routing
hai719 Dec 10, 2025
4c716c9
fix for cluster_conn; fix routing
hai719 Dec 16, 2025
cf59b2d
fix test error: handle case when no replication task after shard regi…
hai719 Dec 17, 2025
4813e58
Refactor: move channel management to ShardManager
hai719 Dec 17, 2025
d2d52f2
Merge branch 'main' into hai719/routing
hai719 Dec 17, 2025
286f51f
update helm
hai719 Dec 17, 2025
0b7b2d4
remove clusterConnection from adminServiceProxyServer
hai719 Dec 17, 2025
34e07db
fix unit test
hai719 Dec 17, 2025
2f26e6f
fix test error
hai719 Dec 17, 2025
970b6ad
fix intra proxy streams; add connection debug info.
hai719 Dec 20, 2025
bed6b1d
add tcp_proxy for test. add intra_proxy test file.
hai719 Dec 20, 2025
6822113
update tests
hai719 Dec 21, 2025
949ec25
handle late-registered remote shard
hai719 Dec 23, 2025
e1dc330
use make bins to avoid testcore related error
hai719 Dec 23, 2025
6bd8449
fix test error; refactor test
hai719 Dec 24, 2025
40b222c
fix test error
hai719 Dec 24, 2025
affc6bb
address comments: update logs level (info->debug); add comments
hai719 Jan 2, 2026
9012f6b
refactor and put stream handling into separated file
hai719 Jan 2, 2026
7bab5d2
address comments
hai719 Jan 2, 2026
7290d27
move structs to better place
hai719 Jan 2, 2026
2fa9239
revert lastWatermark atomic.Pointer change.
hai719 Jan 2, 2026
873b2ce
close stream when cluster connection is closed
hai719 Jan 3, 2026
a75d610
clean up shardManager interface
hai719 Jan 3, 2026
fc1f735
run test in parallel to reduce time
hai719 Jan 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 229 additions & 0 deletions MEMBERLIST_TROUBLESHOOTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
# Memberlist Network Troubleshooting

This guide helps resolve network connectivity issues with memberlist in the s2s-proxy.

## Common Issues

### UDP Ping Failures

**Symptoms:**
```
[DEBUG] memberlist: Failed UDP ping: proxy-node-a-2 (timeout reached)
[WARN] memberlist: Was able to connect to proxy-node-a-2 over TCP but UDP probes failed, network may be misconfigured
```

**Causes:**
- UDP traffic blocked by firewalls
- Running in containers without UDP port mapping
- Network security policies blocking UDP
- NAT/proxy configurations

**Solutions:**

#### 1. Use TCP-Only Mode (Recommended)

Update your configuration to use TCP-only transport:

```yaml
memberlist:
enabled: true
enableForwarding: true
nodeName: "proxy-node-1"
bindAddr: "0.0.0.0"
bindPort: 7946
joinAddrs:
- "proxy-node-2:7946"
- "proxy-node-3:7946"
# TCP-only configuration
tcpOnly: true # Disable UDP entirely
disableTCPPings: true # Improve performance in TCP-only mode
probeTimeoutMs: 1000 # Adjust for network latency
probeIntervalMs: 2000 # Reduce probe frequency
```

#### 2. Open UDP Ports

If you want to keep UDP enabled:

**Docker/Kubernetes:**
```bash
# Expose UDP port in Docker
docker run -p 7946:7946/udp -p 7946:7946/tcp ...

# Kubernetes service
apiVersion: v1
kind: Service
spec:
ports:
- name: memberlist-tcp
port: 7946
protocol: TCP
- name: memberlist-udp
port: 7946
protocol: UDP
```

**Firewall:**
```bash
# Linux iptables
iptables -A INPUT -p udp --dport 7946 -j ACCEPT
iptables -A INPUT -p tcp --dport 7946 -j ACCEPT

# AWS Security Groups - allow UDP/TCP 7946
```

#### 3. Adjust Bind Address

For container environments, use specific bind addresses:

```yaml
memberlist:
bindAddr: "0.0.0.0" # Listen on all interfaces
# OR
bindAddr: "10.0.0.1" # Specific container IP
```

## Configuration Options

### Network Timing

```yaml
memberlist:
probeTimeoutMs: 500 # Time to wait for ping response (default: 500ms)
probeIntervalMs: 1000 # Time between health probes (default: 1s)
```

**Adjust based on network conditions:**
- **Fast networks**: Lower values (500ms timeout, 1s interval)
- **Slow/high-latency networks**: Higher values (1000ms timeout, 2s interval)
- **Unreliable networks**: Much higher values (2000ms timeout, 5s interval)

### Transport Modes

#### Local Network Mode (Default)
```yaml
memberlist:
tcpOnly: false # Uses both UDP and TCP
```
- Best for local networks
- Fastest failure detection
- Requires UDP connectivity

#### TCP-Only Mode
```yaml
memberlist:
tcpOnly: true # TCP transport only
disableTCPPings: true # Optimize for TCP-only
```
- Works in restricted networks
- Slightly slower failure detection
- More reliable in containerized environments

## Testing Connectivity

### 1. Test TCP Connectivity
```bash
# Test if TCP port is reachable
telnet proxy-node-2 7946
nc -zv proxy-node-2 7946
```

### 2. Test UDP Connectivity
```bash
# Test UDP port (if not using tcpOnly)
nc -u -zv proxy-node-2 7946
```

### 3. Monitor Memberlist Logs
Enable debug logging to see detailed memberlist behavior:
```bash
# Set log level to debug
export LOG_LEVEL=debug
./s2s-proxy start --config your-config.yaml
```

### 4. Check Debug Endpoint
Query the debug endpoint to see cluster status:
```bash
curl http://localhost:6060/debug/connections | jq .shard_info
```

## Example Configurations

### Docker Compose
```yaml
version: '3.8'
services:
proxy1:
image: s2s-proxy
ports:
- "7946:7946/tcp"
- "7946:7946/udp" # Only if not using tcpOnly
environment:
- CONFIG_PATH=/config/proxy.yaml
```

### Kubernetes
```yaml
apiVersion: apps/v1
kind: Deployment
spec:
template:
spec:
containers:
- name: s2s-proxy
ports:
- containerPort: 7946
protocol: TCP
- containerPort: 7946
protocol: UDP # Only if not using tcpOnly
```

## Performance Impact

**UDP + TCP Mode:**
- Fastest failure detection (~1-2 seconds)
- Best for stable networks
- Requires UDP connectivity

**TCP-Only Mode:**
- Slightly slower failure detection (~2-5 seconds)
- More reliable in restricted environments
- Works everywhere TCP works

## Recommended Settings by Environment

### Local Development
```yaml
memberlist:
tcpOnly: false
probeTimeoutMs: 500
probeIntervalMs: 1000
```

### Docker/Containers
```yaml
memberlist:
tcpOnly: true
disableTCPPings: true
probeTimeoutMs: 1000
probeIntervalMs: 2000
```

### Kubernetes
```yaml
memberlist:
tcpOnly: true
disableTCPPings: true
probeTimeoutMs: 1500
probeIntervalMs: 3000
```

### High-Latency/Unreliable Networks
```yaml
memberlist:
tcpOnly: true
disableTCPPings: true
probeTimeoutMs: 2000
probeIntervalMs: 5000
```
96 changes: 96 additions & 0 deletions PROXY_FORWARDING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Proxy-to-Proxy Forwarding

This document describes the proxy-to-proxy forwarding functionality that enables distributed shard management across multiple s2s-proxy instances.

## Overview

The proxy-to-proxy forwarding mechanism allows multiple proxy instances to work together as a cluster, where each proxy instance owns a subset of shards. When a replication stream request comes to a proxy that doesn't own the target shard, it automatically forwards the request to the proxy instance that does own that shard.

## Architecture

```
Client → Proxy A (Inbound) → Proxy B (Inbound) → Target Server
(Forward) (Owner)
```

## How It Works

1. **Shard Ownership**: Using consistent hashing via HashiCorp memberlist, each proxy instance is assigned ownership of specific shards
2. **Ownership Check**: When a `StreamWorkflowReplicationMessages` request arrives on an **inbound connection** with **forwarding enabled**, the proxy checks if it owns the required shard
3. **Forwarding**: If another proxy owns the shard, the request is forwarded to that proxy (only for inbound connections with forwarding enabled)
4. **Bidirectional Streaming**: The forwarding proxy acts as a transparent relay, forwarding both requests and responses

## Key Components

### Shard Manager
- **Interface**: `ShardManager` with methods for shard ownership and proxy address resolution
- **Implementation**: Uses memberlist for cluster membership and consistent hashing for shard distribution
- **Methods**:
- `IsLocalShard(shardID)` - Check if this proxy owns a shard
- `GetShardOwner(shardID)` - Get the node name that owns a shard
- `GetProxyAddress(nodeName)` - Get the service address for a proxy node

### Forwarding Logic
- **Location**: `StreamWorkflowReplicationMessages` in `adminservice.go`
- **Conditions**: Forwards only when:
- **Inbound connection** (`s.IsInbound == true`)
- **Memberlist enabled** (`memberlist.enabled == true`)
- **Forwarding enabled** (`memberlist.enableForwarding == true`)
- **Checks**: Two shard ownership checks (only for inbound):
1. `clientShardID` - the incoming shard from the client
2. `serverShardID` - the target shard (after LCM remapping if applicable)
- **Forwarding Function**: `forwardToProxy()` handles the bidirectional streaming

### Configuration

```yaml
memberlist:
enabled: true
# Enable proxy-to-proxy forwarding
enableForwarding: true
nodeName: "proxy-node-1"
bindAddr: "0.0.0.0"
bindPort: 7946
joinAddrs:
- "proxy-node-2:7946"
- "proxy-node-3:7946"
shardStrategy: "consistent"
proxyAddresses:
"proxy-node-1": "localhost:7001"
"proxy-node-2": "proxy-node-2:7001"
"proxy-node-3": "proxy-node-3:7001"
```

## Metrics

The following Prometheus metrics track forwarding operations:

- `shard_distribution` - Number of shards handled by each proxy instance
- `shard_forwarding_total` - Total forwarding operations (labels: from_node, to_node, result)
- `memberlist_cluster_size` - Number of nodes in the memberlist cluster
- `memberlist_events_total` - Memberlist events (join/leave)

## Benefits

1. **Horizontal Scaling**: Add more proxy instances to handle more shards
2. **High Availability**: Automatic shard redistribution when proxies fail
3. **Load Distribution**: Shards are evenly distributed across proxy instances
4. **Transparent**: Clients don't need to know about shard ownership
5. **Configurable**: Can enable cluster coordination without forwarding via `enableForwarding: false`
6. **Backward Compatible**: Works with existing setups when memberlist is disabled

## Limitations

- Forwarding adds one additional network hop for non-local shards
- Requires careful configuration of proxy addresses for inter-proxy communication
- Uses insecure gRPC connections for proxy-to-proxy communication (can be enhanced with TLS)

## Example Deployment

For a 3-proxy cluster handling temporal replication:

1. **proxy-node-1**: Handles shards 0, 3, 6, 9, ...
2. **proxy-node-2**: Handles shards 1, 4, 7, 10, ...
3. **proxy-node-3**: Handles shards 2, 5, 8, 11, ...

When a replication stream for shard 7 comes to proxy-node-1, it will automatically forward to proxy-node-2.
9 changes: 7 additions & 2 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,17 @@ func buildCLIOptions() *cli.App {
return app
}

func startPProfHTTPServer(logger log.Logger, c config.ProfilingConfig) {
func startPProfHTTPServer(logger log.Logger, c config.ProfilingConfig, proxyInstance *proxy.Proxy) {
addr := c.PProfHTTPAddress
if len(addr) == 0 {
return
}

// Add debug endpoint handler
http.HandleFunc("/debug/connections", func(w http.ResponseWriter, r *http.Request) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice stuff but prefer having a separate PR for this

proxy.HandleDebugInfo(w, r, proxyInstance, logger)
})

go func() {
logger.Info("Start pprof http server", tag.NewStringTag("address", addr))
if err := http.ListenAndServe(addr, nil); err != nil {
Expand Down Expand Up @@ -101,7 +106,7 @@ func startProxy(c *cli.Context) error {
}

cfg := proxyParams.ConfigProvider.GetS2SProxyConfig()
startPProfHTTPServer(proxyParams.Logger, cfg.ProfilingConfig)
startPProfHTTPServer(proxyParams.Logger, cfg.ProfilingConfig, proxyParams.Proxy)

if err := proxyParams.Proxy.Start(); err != nil {
return err
Expand Down
37 changes: 37 additions & 0 deletions common/intra_headers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package common

import (
"context"

"google.golang.org/grpc/metadata"
)

const (
// Intra-proxy identification and tracing headers
IntraProxyHeaderKey = "x-s2s-intra-proxy"
IntraProxyHeaderValue = "1"
IntraProxyOriginProxyIDHeader = "x-s2s-origin-proxy-id"
IntraProxyHopCountHeader = "x-s2s-hop-count"
IntraProxyTraceIDHeader = "x-s2s-trace-id"
)

// IsIntraProxy checks incoming context metadata for intra-proxy marker.
func IsIntraProxy(ctx context.Context) bool {
if md, ok := metadata.FromIncomingContext(ctx); ok {
if vals := md.Get(IntraProxyHeaderKey); len(vals) > 0 && vals[0] == IntraProxyHeaderValue {
return true
}
}
return false
}

// WithIntraProxyHeaders returns a new outgoing context with intra-proxy headers set.
func WithIntraProxyHeaders(ctx context.Context, headers map[string]string) context.Context {
md, _ := metadata.FromOutgoingContext(ctx)
md = md.Copy()
md.Set(IntraProxyHeaderKey, IntraProxyHeaderValue)
for k, v := range headers {
md.Set(k, v)
}
return metadata.NewOutgoingContext(ctx, md)
}
Loading