-
-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathstore.go
209 lines (181 loc) · 5.2 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package traefikkop
import (
"fmt"
"reflect"
"strings"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/traefik/traefik/v2/pkg/config/dynamic"
"gopkg.in/redis.v5"
)
type TraefikStore interface {
Store(conf dynamic.Configuration) error
Ping() error
KeepConfAlive() error
}
func collectKeys(m interface{}) []string {
mk := reflect.ValueOf(m).MapKeys()
// set := mapset.NewSet()
set := make([]string, len(mk))
for i := 0; i < len(mk); i++ {
// set.Add(mk[i].String())
set[i] = mk[i].String()
}
return set
}
type RedisStore struct {
Hostname string
Addr string
Pass string
DB int
client *redis.Client
lastConfig *dynamic.Configuration
}
func NewRedisStore(hostname string, addr string, pass string, db int) TraefikStore {
logrus.Infof("creating new redis store at %s for hostname %s", addr, hostname)
store := &RedisStore{
Hostname: hostname,
Addr: addr,
Pass: pass,
DB: db,
client: redis.NewClient(&redis.Options{
Addr: addr,
Password: pass,
DB: db,
}),
}
return store
}
func (s *RedisStore) Ping() error {
return s.client.Ping().Err()
}
// sk returns the 'set key' for keeping track of our services/routers/middlewares
// e.g., [email protected]
func (s RedisStore) sk(b string) string {
return fmt.Sprintf("traefik_%s@%s", b, s.Hostname)
}
func (s *RedisStore) Store(conf dynamic.Configuration) error {
s.removeOldKeys(conf.HTTP.Middlewares, "http_middlewares")
s.removeOldKeys(conf.HTTP.Routers, "http_routers")
s.removeOldKeys(conf.HTTP.Services, "http_services")
s.removeOldKeys(conf.TCP.Middlewares, "tcp_middlewares")
s.removeOldKeys(conf.TCP.Routers, "tcp_routers")
s.removeOldKeys(conf.TCP.Services, "tcp_services")
s.removeOldKeys(conf.UDP.Routers, "udp_routers")
s.removeOldKeys(conf.UDP.Services, "udp_services")
kv, err := ConfigToKV(conf)
if err != nil {
return err
}
for k, v := range kv {
logrus.Debugf("writing %s = %s", k, v)
s.client.Set(k, v, 0)
}
s.swapKeys(s.sk("http_middlewares"))
s.swapKeys(s.sk("http_routers"))
s.swapKeys(s.sk("http_services"))
s.swapKeys(s.sk("tcp_middlewares"))
s.swapKeys(s.sk("tcp_routers"))
s.swapKeys(s.sk("tcp_services"))
s.swapKeys(s.sk("udp_routers"))
s.swapKeys(s.sk("udp_services"))
// Update sentinel key with current timestamp
s.client.Set(s.sk("kop_last_update"), time.Now().Unix(), 0)
// Store a copy of the configuration in case redis restarts
configCopy := conf
s.lastConfig = &configCopy
return nil
}
// NeedsUpdate checks if Redis needs a full configuration refresh
// by checking for the sentinel key's existence
func (s *RedisStore) NeedsUpdate() bool {
// Check if sentinel key exists
exists, err := s.client.Exists(s.sk("kop_last_update")).Result()
if err != nil {
logrus.Warnf("Failed to check Redis status: %s", err)
}
return !exists
}
// Push the last configuration if needed
func (s *RedisStore) KeepConfAlive() error {
if s.lastConfig == nil {
return nil // No config to push yet
}
if s.NeedsUpdate() {
logrus.Warnln("Redis seems to have restarted and needs to be updated. Pushing last known configuration")
return s.Store(*s.lastConfig)
}
return nil
}
func (s *RedisStore) swapKeys(setkey string) error {
// store router name list by renaming
err := s.client.Rename(setkey+"_new", setkey).Err()
if err != nil {
if strings.Contains(err.Error(), "no such key") {
s.client.Unlink(setkey)
return nil
}
return errors.Wrap(err, "rename failed")
}
return nil
}
// k returns the actual config key path
// e.g., traefik/http/routers/nginx@docker
func (s RedisStore) k(sk, b string) string {
k := strings.ReplaceAll(fmt.Sprintf("traefik_%s", sk), "_", "/")
b = strings.TrimSuffix(b, "@docker")
return fmt.Sprintf("%s/%s", k, b)
}
func (s *RedisStore) removeKeys(setkey string, keys []string) error {
if len(keys) == 0 {
return nil
}
if logrus.IsLevelEnabled(logrus.DebugLevel) {
logrus.Debugf("removing keys from %s: %s", setkey, strings.Join(keys, ","))
}
for _, removeKey := range keys {
keyPath := s.k(setkey, removeKey) + "/*"
logrus.Debugf("removing keys matching %s", keyPath)
res, err := s.client.Keys(keyPath).Result()
if err != nil {
return errors.Wrap(err, "fetch failed")
}
if err := s.client.Unlink(res...).Err(); err != nil {
return errors.Wrap(err, "unlink failed")
}
}
return nil
}
func (s *RedisStore) removeOldKeys(m interface{}, setname string) error {
setkey := s.sk(setname)
// store new keys in temp set
newkeys := collectKeys(m)
if len(newkeys) == 0 {
res, err := s.client.SMembers(setkey).Result()
if err != nil {
return errors.Wrap(err, "fetch failed")
}
return s.removeKeys(setname, res)
} else {
// make a diff and remove
err := s.client.SAdd(setkey+"_new", mkslice(newkeys)...).Err()
if err != nil {
return errors.Wrap(err, "add failed")
}
// diff the existing keys with the new ones
res, err := s.client.SDiff(setkey, setkey+"_new").Result()
if err != nil {
return errors.Wrap(err, "diff failed")
}
return s.removeKeys(setname, res)
}
}
// mkslice converts a string slice to an interface slice
func mkslice(old []string) []interface{} {
new := make([]interface{}, len(old))
for i, v := range old {
new[i] = v
}
return new
}