Skip to content
This repository was archived by the owner on Jan 21, 2020. It is now read-only.

Commit 6516ace

Browse files
author
David Chung
authored
More metadata implementations + minor bug fixes (#408)
Signed-off-by: David Chung <[email protected]>
1 parent d7c7305 commit 6516ace

File tree

8 files changed

+194
-49
lines changed

8 files changed

+194
-49
lines changed

cmd/group/main.go

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ import (
99
"github.com/docker/infrakit/pkg/discovery"
1010
"github.com/docker/infrakit/pkg/plugin"
1111
"github.com/docker/infrakit/pkg/plugin/group"
12+
metadata_plugin "github.com/docker/infrakit/pkg/plugin/metadata"
1213
flavor_client "github.com/docker/infrakit/pkg/rpc/flavor"
1314
group_server "github.com/docker/infrakit/pkg/rpc/group"
1415
instance_client "github.com/docker/infrakit/pkg/rpc/instance"
16+
metadata_rpc "github.com/docker/infrakit/pkg/rpc/metadata"
1517
"github.com/docker/infrakit/pkg/spi/flavor"
1618
"github.com/docker/infrakit/pkg/spi/instance"
1719
"github.com/spf13/cobra"
@@ -52,8 +54,62 @@ func main() {
5254
return flavor_client.NewClient(n, endpoint.Address)
5355
}
5456

55-
cli.RunPlugin(*name, group_server.PluginServer(
56-
group.NewGroupPlugin(instancePluginLookup, flavorPluginLookup, *pollInterval, *maxParallelNum)))
57+
groupPlugin := group.NewGroupPlugin(instancePluginLookup, flavorPluginLookup, *pollInterval, *maxParallelNum)
58+
59+
// Start a poller to load the snapshot and make that available as metadata
60+
updateSnapshot := make(chan func(map[string]interface{}))
61+
stopSnapshot := make(chan struct{})
62+
go func() {
63+
tick := time.Tick(1 * time.Second)
64+
tick30 := time.Tick(30 * time.Second)
65+
for {
66+
select {
67+
case <-tick:
68+
// load the specs for the groups
69+
snapshot := map[string]interface{}{}
70+
if specs, err := groupPlugin.InspectGroups(); err == nil {
71+
for _, spec := range specs {
72+
snapshot[string(spec.ID)] = spec
73+
}
74+
} else {
75+
snapshot["err"] = err
76+
}
77+
78+
updateSnapshot <- func(view map[string]interface{}) {
79+
metadata_plugin.Put([]string{"specs"}, snapshot, view)
80+
}
81+
82+
case <-tick30:
83+
snapshot := map[string]interface{}{}
84+
// describe the groups and expose info as metadata
85+
if specs, err := groupPlugin.InspectGroups(); err == nil {
86+
for _, spec := range specs {
87+
if description, err := groupPlugin.DescribeGroup(spec.ID); err == nil {
88+
snapshot[string(spec.ID)] = description
89+
} else {
90+
snapshot[string(spec.ID)] = err
91+
}
92+
}
93+
} else {
94+
snapshot["err"] = err
95+
}
96+
97+
updateSnapshot <- func(view map[string]interface{}) {
98+
metadata_plugin.Put([]string{"groups"}, snapshot, view)
99+
}
100+
101+
case <-stopSnapshot:
102+
log.Infoln("Snapshot updater stopped")
103+
return
104+
}
105+
}
106+
}()
107+
108+
cli.RunPlugin(*name,
109+
metadata_rpc.PluginServer(metadata_plugin.NewPluginFromChannel(updateSnapshot)),
110+
group_server.PluginServer(groupPlugin))
111+
112+
close(stopSnapshot)
57113

58114
return nil
59115
}

cmd/manager/main.go

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@ package main
33
import (
44
"os"
55
"path/filepath"
6+
"time"
67

78
log "github.com/Sirupsen/logrus"
89
"github.com/docker/infrakit/pkg/cli"
910
"github.com/docker/infrakit/pkg/discovery"
1011
"github.com/docker/infrakit/pkg/leader"
1112
"github.com/docker/infrakit/pkg/manager"
13+
metadata_plugin "github.com/docker/infrakit/pkg/plugin/metadata"
1214
group_rpc "github.com/docker/infrakit/pkg/rpc/group"
1315
manager_rpc "github.com/docker/infrakit/pkg/rpc/manager"
16+
metadata_rpc "github.com/docker/infrakit/pkg/rpc/metadata"
1417
"github.com/docker/infrakit/pkg/store"
1518
"github.com/docker/infrakit/pkg/util/docker"
1619
"github.com/spf13/cobra"
@@ -78,9 +81,47 @@ func runMain(cfg config) error {
7881
return err
7982
}
8083

81-
cli.RunPlugin(cfg.id, group_rpc.PluginServer(mgr), manager_rpc.PluginServer(mgr))
84+
// Start a poller to load the snapshot and make that available as metadata
85+
updateSnapshot := make(chan func(map[string]interface{}))
86+
stopSnapshot := make(chan struct{})
87+
go func() {
88+
tick := time.Tick(1 * time.Second)
89+
for {
90+
select {
91+
case <-tick:
92+
snapshot := map[string]interface{}{}
93+
94+
// update leadership
95+
if isLeader, err := mgr.IsLeader(); err == nil {
96+
updateSnapshot <- func(view map[string]interface{}) {
97+
metadata_plugin.Put([]string{"leader"}, isLeader, view)
98+
}
99+
} else {
100+
log.Warningln("Cannot check leader for metadata:", err)
101+
}
102+
103+
// update config
104+
if err := cfg.snapshot.Load(&snapshot); err == nil {
105+
updateSnapshot <- func(view map[string]interface{}) {
106+
metadata_plugin.Put([]string{"configs"}, snapshot, view)
107+
}
108+
} else {
109+
log.Warningln("Cannot load snapshot for metadata:", err)
110+
}
111+
112+
case <-stopSnapshot:
113+
log.Infoln("Snapshot updater stopped")
114+
return
115+
}
116+
}
117+
}()
118+
119+
cli.RunPlugin(cfg.id,
120+
metadata_rpc.PluginServer(metadata_plugin.NewPluginFromChannel(updateSnapshot)),
121+
group_rpc.PluginServer(mgr), manager_rpc.PluginServer(mgr))
82122

83123
mgr.Stop()
124+
close(stopSnapshot)
84125
log.Infoln("Manager stopped")
85126

86127
return err

examples/flavor/swarm/flavor.go

Lines changed: 48 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -64,52 +64,64 @@ func DockerClient(spec Spec) (client.APIClient, error) {
6464
type baseFlavor struct {
6565
getDockerClient func(Spec) (client.APIClient, error)
6666
initScript *template.Template
67+
metadataPlugin metadata.Plugin
68+
}
69+
70+
// Runs a poller that periodically samples the swarm status and node info.
71+
func (s *baseFlavor) runMetadataSnapshot(stopSnapshot <-chan struct{}) chan func(map[string]interface{}) {
72+
// Start a poller to load the snapshot and make that available as metadata
73+
updateSnapshot := make(chan func(map[string]interface{}))
74+
go func() {
75+
tick := time.Tick(1 * time.Second)
76+
for {
77+
select {
78+
case <-tick:
79+
80+
snapshot := map[string]interface{}{}
81+
if docker, err := s.getDockerClient(Spec{
82+
Docker: ConnectInfo{
83+
Host: "unix:///var/run/docker.sock", // defaults to local socket
84+
},
85+
}); err != nil {
86+
snapshot["local"] = map[string]interface{}{"error": err}
87+
} else {
88+
if status, node, err := swarmState(docker); err != nil {
89+
snapshot["local"] = map[string]interface{}{"error": err}
90+
} else {
91+
snapshot["local"] = map[string]interface{}{
92+
"status": status,
93+
"node": node,
94+
}
95+
}
96+
}
97+
98+
updateSnapshot <- func(view map[string]interface{}) {
99+
metadata_plugin.Put([]string{"groups"}, snapshot, view)
100+
}
101+
102+
case <-stopSnapshot:
103+
log.Infoln("Snapshot updater stopped")
104+
return
105+
}
106+
}
107+
}()
108+
return updateSnapshot
67109
}
68110

69111
// List implements the metadata.Plugin SPI's List method
70112
func (s *baseFlavor) List(path metadata.Path) ([]string, error) {
71-
docker, err := s.getDockerClient(Spec{
72-
Docker: ConnectInfo{
73-
Host: "unix:///var/run/docker.sock", // defaults to local socket
74-
},
75-
})
76-
if err != nil {
77-
return nil, err
113+
if s.metadataPlugin != nil {
114+
return s.metadataPlugin.List(path)
78115
}
79-
status, node, err := swarmState(docker)
80-
if err != nil {
81-
return nil, err
82-
}
83-
data := map[string]interface{}{
84-
"local": map[string]interface{}{
85-
"status": status,
86-
"node": node,
87-
},
88-
}
89-
return metadata_plugin.List(path, data), nil
116+
return nil, nil
90117
}
91118

92119
// Get implements the metadata.Plugin SPI's List method
93120
func (s *baseFlavor) Get(path metadata.Path) (*types.Any, error) {
94-
docker, err := s.getDockerClient(Spec{
95-
Docker: ConnectInfo{
96-
Host: "unix:///var/run/docker.sock", // defaults to local socket
97-
},
98-
})
99-
if err != nil {
100-
return nil, err
101-
}
102-
status, node, err := swarmState(docker)
103-
if err != nil {
104-
return nil, err
105-
}
106-
data := map[string]interface{}{
107-
"local": map[string]interface{}{
108-
"status": status,
109-
"node": node,
110-
},
121+
if s.metadataPlugin != nil {
122+
return s.metadataPlugin.Get(path)
111123
}
112-
return metadata_plugin.GetValue(path, data)
124+
return nil, nil
113125
}
114126

115127
// Funcs implements the template.FunctionExporter interface that allows the RPC server to expose help on the

examples/flavor/swarm/flavor_test.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,15 @@ func templ(tpl string) *template.Template {
2929
func TestValidate(t *testing.T) {
3030
ctrl := gomock.NewController(t)
3131
defer ctrl.Finish()
32+
managerStop := make(chan struct{})
33+
workerStop := make(chan struct{})
3234

3335
managerFlavor := NewManagerFlavor(func(Spec) (docker_client.APIClient, error) {
3436
return mock_client.NewMockAPIClient(ctrl), nil
35-
}, templ(DefaultManagerInitScriptTemplate))
37+
}, templ(DefaultManagerInitScriptTemplate), managerStop)
3638
workerFlavor := NewWorkerFlavor(func(Spec) (docker_client.APIClient, error) {
3739
return mock_client.NewMockAPIClient(ctrl), nil
38-
}, templ(DefaultWorkerInitScriptTemplate))
40+
}, templ(DefaultWorkerInitScriptTemplate), workerStop)
3941

4042
require.NoError(t, workerFlavor.Validate(
4143
types.AnyString(`{"Docker" : {"Host":"unix:///var/run/docker.sock"}}`),
@@ -75,17 +77,22 @@ func TestValidate(t *testing.T) {
7577
group_types.AllocationMethod{LogicalIDs: []instance.LogicalID{"127.0.0.1"}})
7678
require.Error(t, err)
7779
require.Equal(t, "Invalid attachment Type 'keyboard', only ebs is supported", err.Error())
80+
81+
close(managerStop)
82+
close(workerStop)
7883
}
7984

8085
func TestWorker(t *testing.T) {
8186
ctrl := gomock.NewController(t)
8287
defer ctrl.Finish()
8388

89+
workerStop := make(chan struct{})
90+
8491
client := mock_client.NewMockAPIClient(ctrl)
8592

8693
flavorImpl := NewWorkerFlavor(func(Spec) (docker_client.APIClient, error) {
8794
return client, nil
88-
}, templ(DefaultWorkerInitScriptTemplate))
95+
}, templ(DefaultWorkerInitScriptTemplate), workerStop)
8996

9097
swarmInfo := swarm.Swarm{
9198
ClusterInfo: swarm.ClusterInfo{ID: "ClusterUUID"},
@@ -137,6 +144,8 @@ func TestWorker(t *testing.T) {
137144
instance.Description{Tags: map[string]string{associationTag: associationID}})
138145
require.NoError(t, err)
139146
require.Equal(t, flavor.Healthy, health)
147+
148+
close(workerStop)
140149
}
141150

142151
const nodeID = "my-node-id"
@@ -147,11 +156,13 @@ func TestManager(t *testing.T) {
147156
ctrl := gomock.NewController(t)
148157
defer ctrl.Finish()
149158

159+
managerStop := make(chan struct{})
160+
150161
client := mock_client.NewMockAPIClient(ctrl)
151162

152163
flavorImpl := NewManagerFlavor(func(Spec) (docker_client.APIClient, error) {
153164
return client, nil
154-
}, templ(DefaultManagerInitScriptTemplate))
165+
}, templ(DefaultManagerInitScriptTemplate), managerStop)
155166

156167
swarmInfo := swarm.Swarm{
157168
ClusterInfo: swarm.ClusterInfo{ID: "ClusterUUID"},
@@ -214,4 +225,6 @@ func TestManager(t *testing.T) {
214225
instance.Description{Tags: map[string]string{associationTag: associationID}})
215226
require.NoError(t, err)
216227
require.Equal(t, flavor.Healthy, health)
228+
229+
close(managerStop)
217230
}

examples/flavor/swarm/main.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,10 @@ func main() {
5151
return err
5252
}
5353

54-
managerFlavor := NewManagerFlavor(DockerClient, mt)
55-
workerFlavor := NewWorkerFlavor(DockerClient, wt)
54+
managerStop := make(chan struct{})
55+
workerStop := make(chan struct{})
56+
managerFlavor := NewManagerFlavor(DockerClient, mt, managerStop)
57+
workerFlavor := NewWorkerFlavor(DockerClient, wt, workerStop)
5658

5759
cli.RunPlugin(*name,
5860

@@ -73,6 +75,9 @@ func main() {
7375
"manager": managerFlavor,
7476
"worker": workerFlavor,
7577
}))
78+
79+
close(managerStop)
80+
close(workerStop)
7681
return nil
7782
}
7883

examples/flavor/swarm/manager.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,19 @@ import (
66
log "github.com/Sirupsen/logrus"
77
"github.com/docker/docker/client"
88
group_types "github.com/docker/infrakit/pkg/plugin/group/types"
9+
"github.com/docker/infrakit/pkg/plugin/metadata"
910
"github.com/docker/infrakit/pkg/spi/instance"
1011
"github.com/docker/infrakit/pkg/template"
1112
"github.com/docker/infrakit/pkg/types"
1213
)
1314

1415
// NewManagerFlavor creates a flavor.Plugin that creates manager and worker nodes connected in a swarm.
15-
func NewManagerFlavor(connect func(Spec) (client.APIClient, error), templ *template.Template) *ManagerFlavor {
16-
return &ManagerFlavor{&baseFlavor{initScript: templ, getDockerClient: connect}}
16+
func NewManagerFlavor(connect func(Spec) (client.APIClient, error), templ *template.Template,
17+
stop <-chan struct{}) *ManagerFlavor {
18+
19+
base := &baseFlavor{initScript: templ, getDockerClient: connect}
20+
base.metadataPlugin = metadata.NewPluginFromChannel(base.runMetadataSnapshot(stop))
21+
return &ManagerFlavor{baseFlavor: base}
1722
}
1823

1924
// ManagerFlavor is the flavor for swarm managers

examples/flavor/swarm/worker.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,20 @@ import (
88
"github.com/docker/docker/api/types/filters"
99
"github.com/docker/docker/client"
1010
group_types "github.com/docker/infrakit/pkg/plugin/group/types"
11+
"github.com/docker/infrakit/pkg/plugin/metadata"
1112
"github.com/docker/infrakit/pkg/spi/instance"
1213
"github.com/docker/infrakit/pkg/template"
1314
"github.com/docker/infrakit/pkg/types"
1415
"golang.org/x/net/context"
1516
)
1617

1718
// NewWorkerFlavor creates a flavor.Plugin that creates manager and worker nodes connected in a swarm.
18-
func NewWorkerFlavor(connect func(Spec) (client.APIClient, error), templ *template.Template) *WorkerFlavor {
19-
return &WorkerFlavor{&baseFlavor{initScript: templ, getDockerClient: connect}}
19+
func NewWorkerFlavor(connect func(Spec) (client.APIClient, error), templ *template.Template,
20+
stop <-chan struct{}) *WorkerFlavor {
21+
22+
base := &baseFlavor{initScript: templ, getDockerClient: connect}
23+
base.metadataPlugin = metadata.NewPluginFromChannel(base.runMetadataSnapshot(stop))
24+
return &WorkerFlavor{baseFlavor: base}
2025
}
2126

2227
// WorkerFlavor implements the flavor and metadata plugins

0 commit comments

Comments
 (0)