Skip to content

Commit c908056

Browse files
committed
update example and tests, drop connectionAdapter
1 parent 30fceb8 commit c908056

File tree

13 files changed

+188
-88
lines changed

13 files changed

+188
-88
lines changed

.github/copilot-instructions.md

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -311,9 +311,50 @@ func (opt *Options) init() {
311311

312312
```go
313313
type NotificationProcessor interface {
314-
ProcessPushNotification(ctx context.Context, data []byte) error
315-
RegisterHandler(notificationType string, handler NotificationHandler) error
316-
Close() error
314+
RegisterHandler(pushNotificationName string, handler interface{}, protected bool) error
315+
UnregisterHandler(pushNotificationName string) error
316+
GetHandler(pushNotificationName string) interface{}
317+
}
318+
319+
type NotificationHandler interface {
320+
HandlePushNotification(ctx context.Context, handlerCtx NotificationHandlerContext, notification []interface{}) error
321+
}
322+
```
323+
324+
### Notification Hooks
325+
326+
```go
327+
type NotificationHook interface {
328+
PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool)
329+
PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error)
330+
}
331+
332+
// NotificationHandlerContext provides context for notification processing
333+
type NotificationHandlerContext struct {
334+
Client interface{} // Redis client instance
335+
Pool interface{} // Connection pool
336+
Conn interface{} // Specific connection (*pool.Conn)
337+
IsBlocking bool // Whether notification was on blocking connection
338+
}
339+
```
340+
341+
### Hook Implementation Pattern
342+
343+
```go
344+
func (h *CustomHook) PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool) {
345+
// Access connection information
346+
if conn, ok := notificationCtx.Conn.(*pool.Conn); ok {
347+
connID := conn.GetID()
348+
// Process with connection context
349+
}
350+
return notification, true // Continue processing
351+
}
352+
353+
func (h *CustomHook) PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error) {
354+
// Handle processing result
355+
if result != nil {
356+
// Log or handle error
357+
}
317358
}
318359
```
319360

adapters.go

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"time"
88

99
"github.com/redis/go-redis/v9/internal/interfaces"
10-
"github.com/redis/go-redis/v9/internal/pool"
1110
"github.com/redis/go-redis/v9/push"
1211
)
1312

@@ -88,43 +87,6 @@ func (oa *optionsAdapter) NewDialer() func(context.Context) (net.Conn, error) {
8887
}
8988
}
9089

91-
// connectionAdapter adapts a Redis connection to interfaces.ConnectionWithRelaxedTimeout
92-
type connectionAdapter struct {
93-
conn *pool.Conn
94-
}
95-
96-
// Close closes the connection.
97-
func (ca *connectionAdapter) Close() error {
98-
return ca.conn.Close()
99-
}
100-
101-
// IsUsable returns true if the connection is safe to use for new commands.
102-
func (ca *connectionAdapter) IsUsable() bool {
103-
return ca.conn.IsUsable()
104-
}
105-
106-
// GetPoolConn returns the underlying pool connection.
107-
func (ca *connectionAdapter) GetPoolConn() *pool.Conn {
108-
return ca.conn
109-
}
110-
111-
// SetRelaxedTimeout sets relaxed timeouts for this connection during hitless upgrades.
112-
// These timeouts remain active until explicitly cleared.
113-
func (ca *connectionAdapter) SetRelaxedTimeout(readTimeout, writeTimeout time.Duration) {
114-
ca.conn.SetRelaxedTimeout(readTimeout, writeTimeout)
115-
}
116-
117-
// SetRelaxedTimeoutWithDeadline sets relaxed timeouts with an expiration deadline.
118-
// After the deadline, timeouts automatically revert to normal values.
119-
func (ca *connectionAdapter) SetRelaxedTimeoutWithDeadline(readTimeout, writeTimeout time.Duration, deadline time.Time) {
120-
ca.conn.SetRelaxedTimeoutWithDeadline(readTimeout, writeTimeout, deadline)
121-
}
122-
123-
// ClearRelaxedTimeout clears relaxed timeouts for this connection.
124-
func (ca *connectionAdapter) ClearRelaxedTimeout() {
125-
ca.conn.ClearRelaxedTimeout()
126-
}
127-
12890
// pushProcessorAdapter adapts a push.NotificationProcessor to implement interfaces.NotificationProcessor.
12991
type pushProcessorAdapter struct {
13092
processor push.NotificationProcessor

example/pubsub/main.go

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,37 @@ import (
55
"fmt"
66
"log"
77
"sync"
8+
"sync/atomic"
89
"time"
910

1011
"github.com/redis/go-redis/v9"
1112
"github.com/redis/go-redis/v9/hitless"
1213
)
1314

1415
var ctx = context.Background()
16+
var cntErrors atomic.Int64
17+
var cntSuccess atomic.Int64
18+
var startTime = time.Now()
1519

1620
// This example is not supposed to be run as is. It is just a test to see how pubsub behaves in relation to pool management.
1721
// It was used to find regressions in pool management in hitless mode.
1822
// Please don't use it as a reference for how to use pubsub.
1923
func main() {
24+
startTime = time.Now()
2025
wg := &sync.WaitGroup{}
2126
rdb := redis.NewClient(&redis.Options{
22-
Addr: ":6379",
27+
Addr: "redis-15176.aws-cluster-1213.cto.redislabs.com:15176",
2328
HitlessUpgradeConfig: &redis.HitlessUpgradeConfig{
2429
Mode: hitless.MaintNotificationsEnabled,
2530
},
2631
})
2732
_ = rdb.FlushDB(ctx).Err()
33+
hitlessManager := rdb.GetHitlessManager()
34+
if hitlessManager == nil {
35+
panic("hitless manager is nil")
36+
}
37+
loggingHook := hitless.NewLoggingHook(3)
38+
hitlessManager.AddNotificationHook(loggingHook)
2839

2940
go func() {
3041
for {
@@ -62,20 +73,23 @@ func main() {
6273
subCtx, cancelSubCtx = context.WithCancel(ctx)
6374
for i := 0; i < 10; i++ {
6475
if err := rdb.Incr(ctx, "publishers").Err(); err != nil {
65-
panic(err)
76+
fmt.Println("incr error:", err)
77+
cntErrors.Add(1)
6678
}
6779
wg.Add(1)
6880
go floodThePool(pubCtx, rdb, wg)
6981
}
7082

7183
for i := 0; i < 500; i++ {
7284
if err := rdb.Incr(ctx, "subscribers").Err(); err != nil {
73-
panic(err)
85+
fmt.Println("incr error:", err)
86+
cntErrors.Add(1)
7487
}
88+
7589
wg.Add(1)
7690
go subscribe(subCtx, rdb, "test2", i, wg)
7791
}
78-
time.Sleep(5 * time.Second)
92+
time.Sleep(120 * time.Second)
7993
fmt.Println("canceling publishers")
8094
cancelPublishers()
8195
time.Sleep(10 * time.Second)
@@ -95,6 +109,9 @@ func main() {
95109
fmt.Printf("if drained = published*subscribers: %d\n", publishedInt*subscribersInt)
96110

97111
time.Sleep(2 * time.Second)
112+
fmt.Println("errors:", cntErrors.Load())
113+
fmt.Println("success:", cntSuccess.Load())
114+
fmt.Println("time:", time.Since(startTime))
98115
}
99116

100117
func floodThePool(ctx context.Context, rdb *redis.Client, wg *sync.WaitGroup) {
@@ -107,14 +124,18 @@ func floodThePool(ctx context.Context, rdb *redis.Client, wg *sync.WaitGroup) {
107124
}
108125
err := rdb.Publish(ctx, "test2", "hello").Err()
109126
if err != nil {
110-
// noop
111-
//log.Println("publish error:", err)
127+
if err.Error() != "context canceled" {
128+
log.Println("publish error:", err)
129+
cntErrors.Add(1)
130+
}
112131
}
113132

114133
err = rdb.Incr(ctx, "published").Err()
115134
if err != nil {
116-
// noop
117-
//log.Println("incr error:", err)
135+
if err.Error() != "context canceled" {
136+
log.Println("incr error:", err)
137+
cntErrors.Add(1)
138+
}
118139
}
119140
time.Sleep(10 * time.Nanosecond)
120141
}
@@ -137,7 +158,10 @@ func subscribe(ctx context.Context, rdb *redis.Client, topic string, subscriberI
137158
case msg := <-recChan:
138159
err := rdb.Incr(ctx, "received").Err()
139160
if err != nil {
140-
log.Println("incr error:", err)
161+
if err.Error() != "context canceled" {
162+
log.Printf("%s\n", err.Error())
163+
cntErrors.Add(1)
164+
}
141165
}
142166
_ = msg // Use the message to avoid unused variable warning
143167
}

hitless/README.md

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,23 +49,62 @@ Config: &hitless.Config{
4949
- **Auto-calculated**: `10 × MaxWorkers`, capped by pool size
5050
- **Always capped**: Queue size never exceeds pool size
5151

52-
## Metrics Hook Example
52+
## Notification Hooks
5353

54-
A metrics collection hook is available in `example_hooks.go` that demonstrates how to monitor hitless upgrade operations:
54+
Notification hooks allow you to monitor and customize hitless upgrade operations. The `NotificationHook` interface provides pre and post processing hooks:
55+
56+
```go
57+
type NotificationHook interface {
58+
PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool)
59+
PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error)
60+
}
61+
```
62+
63+
### Example: Metrics Collection Hook
64+
65+
A metrics collection hook is available in `example_hooks.go`:
5566

5667
```go
5768
import "github.com/redis/go-redis/v9/hitless"
5869

5970
metricsHook := hitless.NewMetricsHook()
60-
// Use with your monitoring system
71+
manager.AddNotificationHook(metricsHook)
72+
73+
// Access metrics
74+
metrics := metricsHook.GetMetrics()
6175
```
6276

63-
The metrics hook tracks:
77+
### Example: Custom Logging Hook
78+
79+
```go
80+
type CustomHook struct{}
81+
82+
func (h *CustomHook) PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool) {
83+
// Log notification with connection details
84+
if conn, ok := notificationCtx.Conn.(*pool.Conn); ok {
85+
log.Printf("Processing %s on connection %d", notificationType, conn.GetID())
86+
}
87+
return notification, true // Continue processing
88+
}
89+
90+
func (h *CustomHook) PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error) {
91+
if result != nil {
92+
log.Printf("Failed to process %s: %v", notificationType, result)
93+
}
94+
}
95+
```
96+
97+
The notification context provides access to:
98+
- **Client**: The Redis client instance
99+
- **Pool**: The connection pool
100+
- **Conn**: The specific connection that received the notification
101+
- **IsBlocking**: Whether the notification was received on a blocking connection
102+
103+
Hooks can track:
64104
- Handoff success/failure rates
65-
- Handoff duration
66-
- Queue depth
67-
- Worker utilization
68-
- Connection lifecycle events
105+
- Processing duration
106+
- Connection-specific metrics
107+
- Custom business logic
69108

70109
## Requirements
71110

hitless/example_hooks.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ package hitless
33
import (
44
"context"
55
"time"
6+
7+
"github.com/redis/go-redis/v9/internal"
8+
"github.com/redis/go-redis/v9/internal/pool"
9+
"github.com/redis/go-redis/v9/push"
610
)
711

812
// contextKey is a custom type for context keys to avoid collisions
@@ -29,9 +33,14 @@ func NewMetricsHook() *MetricsHook {
2933
}
3034

3135
// PreHook records the start time for processing metrics.
32-
func (mh *MetricsHook) PreHook(ctx context.Context, notificationType string, notification []interface{}) ([]interface{}, bool) {
36+
func (mh *MetricsHook) PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool) {
3337
mh.NotificationCounts[notificationType]++
3438

39+
// Log connection information if available
40+
if conn, ok := notificationCtx.Conn.(*pool.Conn); ok {
41+
internal.Logger.Printf(ctx, "hitless: metrics hook processing %s notification on connection %d", notificationType, conn.GetID())
42+
}
43+
3544
// Store start time in context for duration calculation
3645
startTime := time.Now()
3746
_ = context.WithValue(ctx, startTimeKey, startTime) // Context not used further
@@ -40,7 +49,7 @@ func (mh *MetricsHook) PreHook(ctx context.Context, notificationType string, not
4049
}
4150

4251
// PostHook records processing completion and any errors.
43-
func (mh *MetricsHook) PostHook(ctx context.Context, notificationType string, notification []interface{}, result error) {
52+
func (mh *MetricsHook) PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error) {
4453
// Calculate processing duration
4554
if startTime, ok := ctx.Value(startTimeKey).(time.Time); ok {
4655
duration := time.Since(startTime)
@@ -50,6 +59,11 @@ func (mh *MetricsHook) PostHook(ctx context.Context, notificationType string, no
5059
// Record errors
5160
if result != nil {
5261
mh.ErrorCounts[notificationType]++
62+
63+
// Log error details with connection information
64+
if conn, ok := notificationCtx.Conn.(*pool.Conn); ok {
65+
internal.Logger.Printf(ctx, "hitless: metrics hook recorded error for %s notification on connection %d: %v", notificationType, conn.GetID(), result)
66+
}
5367
}
5468
}
5569

hitless/hitless_manager.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/redis/go-redis/v9/internal"
1212
"github.com/redis/go-redis/v9/internal/interfaces"
1313
"github.com/redis/go-redis/v9/internal/pool"
14+
"github.com/redis/go-redis/v9/push"
1415
)
1516

1617
// Push notification type constants for hitless upgrades
@@ -35,8 +36,8 @@ var hitlessNotificationTypes = []string{
3536
// PreHook can modify the notification and return false to skip processing
3637
// PostHook is called after successful processing
3738
type NotificationHook interface {
38-
PreHook(ctx context.Context, notificationType string, notification []interface{}) ([]interface{}, bool)
39-
PostHook(ctx context.Context, notificationType string, notification []interface{}, result error)
39+
PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool)
40+
PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error)
4041
}
4142

4243
// MovingOperationKey provides a unique key for tracking MOVING operations
@@ -252,14 +253,14 @@ func (hm *HitlessManager) GetState() State {
252253
}
253254

254255
// processPreHooks calls all pre-hooks and returns the modified notification and whether to continue processing.
255-
func (hm *HitlessManager) processPreHooks(ctx context.Context, notificationType string, notification []interface{}) ([]interface{}, bool) {
256+
func (hm *HitlessManager) processPreHooks(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool) {
256257
hm.hooksMu.RLock()
257258
defer hm.hooksMu.RUnlock()
258259

259260
currentNotification := notification
260261

261262
for _, hook := range hm.hooks {
262-
modifiedNotification, shouldContinue := hook.PreHook(ctx, notificationType, currentNotification)
263+
modifiedNotification, shouldContinue := hook.PreHook(ctx, notificationCtx, notificationType, currentNotification)
263264
if !shouldContinue {
264265
return modifiedNotification, false
265266
}
@@ -270,12 +271,12 @@ func (hm *HitlessManager) processPreHooks(ctx context.Context, notificationType
270271
}
271272

272273
// processPostHooks calls all post-hooks with the processing result.
273-
func (hm *HitlessManager) processPostHooks(ctx context.Context, notificationType string, notification []interface{}, result error) {
274+
func (hm *HitlessManager) processPostHooks(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error) {
274275
hm.hooksMu.RLock()
275276
defer hm.hooksMu.RUnlock()
276277

277278
for _, hook := range hm.hooks {
278-
hook.PostHook(ctx, notificationType, notification, result)
279+
hook.PostHook(ctx, notificationCtx, notificationType, notification, result)
279280
}
280281
}
281282

0 commit comments

Comments
 (0)