Skip to content

Commit 5649ffb

Browse files
committed
feat(hitless): Introduce handlers for hitless upgrades
This commit includes all the work on hitless upgrades with the addition of: - Pubsub Pool - Examples - Refactor of push - Refactor of pool (using atomics for most things) - Introducing of hooks in pool
1 parent 36f9f58 commit 5649ffb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+6345
-247
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,6 @@ coverage.txt
99
**/coverage.txt
1010
.vscode
1111
tmp/*
12+
13+
# Hitless upgrade documentation (temporary)
14+
hitless/docs/

adapters.go

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package redis
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net"
7+
"time"
8+
9+
"github.com/redis/go-redis/v9/internal/interfaces"
10+
"github.com/redis/go-redis/v9/internal/pool"
11+
"github.com/redis/go-redis/v9/push"
12+
)
13+
14+
// ErrInvalidCommand is returned when an invalid command is passed to ExecuteCommand.
15+
var ErrInvalidCommand = errors.New("invalid command type")
16+
17+
// ErrInvalidPool is returned when the pool type is not supported.
18+
var ErrInvalidPool = errors.New("invalid pool type")
19+
20+
// newClientAdapter creates a new client adapter for regular Redis clients.
21+
func newClientAdapter(client *baseClient) interfaces.ClientInterface {
22+
return &clientAdapter{client: client}
23+
}
24+
25+
// clientAdapter adapts a Redis client to implement interfaces.ClientInterface.
26+
type clientAdapter struct {
27+
client *baseClient
28+
}
29+
30+
// GetOptions returns the client options.
31+
func (ca *clientAdapter) GetOptions() interfaces.OptionsInterface {
32+
return &optionsAdapter{options: ca.client.opt}
33+
}
34+
35+
// GetPushProcessor returns the client's push notification processor.
36+
func (ca *clientAdapter) GetPushProcessor() interfaces.NotificationProcessor {
37+
return &pushProcessorAdapter{processor: ca.client.pushProcessor}
38+
}
39+
40+
// optionsAdapter adapts Redis options to implement interfaces.OptionsInterface.
41+
type optionsAdapter struct {
42+
options *Options
43+
}
44+
45+
// GetReadTimeout returns the read timeout.
46+
func (oa *optionsAdapter) GetReadTimeout() time.Duration {
47+
return oa.options.ReadTimeout
48+
}
49+
50+
// GetWriteTimeout returns the write timeout.
51+
func (oa *optionsAdapter) GetWriteTimeout() time.Duration {
52+
return oa.options.WriteTimeout
53+
}
54+
55+
// GetNetwork returns the network type.
56+
func (oa *optionsAdapter) GetNetwork() string {
57+
return oa.options.Network
58+
}
59+
60+
// GetAddr returns the connection address.
61+
func (oa *optionsAdapter) GetAddr() string {
62+
return oa.options.Addr
63+
}
64+
65+
// IsTLSEnabled returns true if TLS is enabled.
66+
func (oa *optionsAdapter) IsTLSEnabled() bool {
67+
return oa.options.TLSConfig != nil
68+
}
69+
70+
// GetProtocol returns the protocol version.
71+
func (oa *optionsAdapter) GetProtocol() int {
72+
return oa.options.Protocol
73+
}
74+
75+
// GetPoolSize returns the connection pool size.
76+
func (oa *optionsAdapter) GetPoolSize() int {
77+
return oa.options.PoolSize
78+
}
79+
80+
// NewDialer returns a new dialer function for the connection.
81+
func (oa *optionsAdapter) NewDialer() func(context.Context) (net.Conn, error) {
82+
baseDialer := oa.options.NewDialer()
83+
return func(ctx context.Context) (net.Conn, error) {
84+
// Extract network and address from the options
85+
network := oa.options.Network
86+
addr := oa.options.Addr
87+
return baseDialer(ctx, network, addr)
88+
}
89+
}
90+
91+
// connectionAdapter adapts a Redis connection to interfaces.ConnectionWithRelaxedTimeout
92+
type connectionAdapter struct {
93+
conn *pool.Conn
94+
}
95+
96+
// Close closes the connection.
97+
func (ca *connectionAdapter) Close() error {
98+
return ca.conn.Close()
99+
}
100+
101+
// IsUsable returns true if the connection is safe to use for new commands.
102+
func (ca *connectionAdapter) IsUsable() bool {
103+
return ca.conn.IsUsable()
104+
}
105+
106+
// GetPoolConnection returns the underlying pool connection.
107+
func (ca *connectionAdapter) GetPoolConnection() *pool.Conn {
108+
return ca.conn
109+
}
110+
111+
// SetRelaxedTimeout sets relaxed timeouts for this connection during hitless upgrades.
112+
// These timeouts remain active until explicitly cleared.
113+
func (ca *connectionAdapter) SetRelaxedTimeout(readTimeout, writeTimeout time.Duration) {
114+
ca.conn.SetRelaxedTimeout(readTimeout, writeTimeout)
115+
}
116+
117+
// SetRelaxedTimeoutWithDeadline sets relaxed timeouts with an expiration deadline.
118+
// After the deadline, timeouts automatically revert to normal values.
119+
func (ca *connectionAdapter) SetRelaxedTimeoutWithDeadline(readTimeout, writeTimeout time.Duration, deadline time.Time) {
120+
ca.conn.SetRelaxedTimeoutWithDeadline(readTimeout, writeTimeout, deadline)
121+
}
122+
123+
// ClearRelaxedTimeout clears relaxed timeouts for this connection.
124+
func (ca *connectionAdapter) ClearRelaxedTimeout() {
125+
ca.conn.ClearRelaxedTimeout()
126+
}
127+
128+
// pushProcessorAdapter adapts a push.NotificationProcessor to implement interfaces.NotificationProcessor.
129+
type pushProcessorAdapter struct {
130+
processor push.NotificationProcessor
131+
}
132+
133+
// RegisterHandler registers a handler for a specific push notification name.
134+
func (ppa *pushProcessorAdapter) RegisterHandler(pushNotificationName string, handler interface{}, protected bool) error {
135+
if pushHandler, ok := handler.(push.NotificationHandler); ok {
136+
return ppa.processor.RegisterHandler(pushNotificationName, pushHandler, protected)
137+
}
138+
return errors.New("handler must implement push.NotificationHandler")
139+
}
140+
141+
// UnregisterHandler removes a handler for a specific push notification name.
142+
func (ppa *pushProcessorAdapter) UnregisterHandler(pushNotificationName string) error {
143+
return ppa.processor.UnregisterHandler(pushNotificationName)
144+
}
145+
146+
// GetHandler returns the handler for a specific push notification name.
147+
func (ppa *pushProcessorAdapter) GetHandler(pushNotificationName string) interface{} {
148+
return ppa.processor.GetHandler(pushNotificationName)
149+
}

0 commit comments

Comments
 (0)