Skip to content
This repository was archived by the owner on Nov 27, 2023. It is now read-only.

Commit 50c68ce

Browse files
authored
Merge pull request #221 from rumpl/feat-context-metadata
Use the context from the metadata if it exists
2 parents 9257d0f + a999f34 commit 50c68ce

File tree

4 files changed

+275
-107
lines changed

4 files changed

+275
-107
lines changed

server/contextserverstream.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package server
2+
3+
import (
4+
"context"
5+
6+
"google.golang.org/grpc"
7+
"google.golang.org/grpc/metadata"
8+
)
9+
10+
// A gRPC server stream will only let you get its context but
11+
// there is no way to set a new (augmented context) to the next
12+
// handler (like we do for a unary request). We need to wrap the grpc.ServerSteam
13+
// to be able to set a new context that will be sent to the next stream interceptor.
14+
type contextServerStream struct {
15+
ss grpc.ServerStream
16+
ctx context.Context
17+
}
18+
19+
func (css *contextServerStream) SetHeader(md metadata.MD) error {
20+
return css.ss.SetHeader(md)
21+
}
22+
23+
func (css *contextServerStream) SendHeader(md metadata.MD) error {
24+
return css.ss.SendHeader(md)
25+
}
26+
27+
func (css *contextServerStream) SetTrailer(md metadata.MD) {
28+
css.ss.SetTrailer(md)
29+
}
30+
31+
func (css *contextServerStream) Context() context.Context {
32+
return css.ctx
33+
}
34+
35+
func (css *contextServerStream) SendMsg(m interface{}) error {
36+
return css.ss.SendMsg(m)
37+
}
38+
39+
func (css *contextServerStream) RecvMsg(m interface{}) error {
40+
return css.ss.RecvMsg(m)
41+
}

server/interceptor.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package server
2+
3+
import (
4+
"context"
5+
"errors"
6+
"strings"
7+
8+
"google.golang.org/grpc"
9+
"google.golang.org/grpc/metadata"
10+
11+
"github.com/docker/api/client"
12+
"github.com/docker/api/config"
13+
apicontext "github.com/docker/api/context"
14+
"github.com/docker/api/context/store"
15+
"github.com/docker/api/server/proxy"
16+
)
17+
18+
// key is the key where the current docker context is stored in the metadata
19+
// of a gRPC request
20+
const key = "context_key"
21+
22+
// unaryServerInterceptor configures the context and sends it to the next handler
23+
func unaryServerInterceptor(clictx context.Context) grpc.UnaryServerInterceptor {
24+
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
25+
currentContext, err := getIncomingContext(ctx)
26+
if err != nil {
27+
currentContext, err = getConfigContext(clictx)
28+
if err != nil {
29+
return nil, err
30+
}
31+
}
32+
configuredCtx, err := configureContext(clictx, currentContext, info.FullMethod)
33+
if err != nil {
34+
return nil, err
35+
}
36+
37+
return handler(configuredCtx, req)
38+
}
39+
}
40+
41+
// streamServerInterceptor configures the context and sends it to the next handler
42+
func streamServerInterceptor(clictx context.Context) grpc.StreamServerInterceptor {
43+
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
44+
currentContext, err := getIncomingContext(ss.Context())
45+
if err != nil {
46+
currentContext, err = getConfigContext(clictx)
47+
if err != nil {
48+
return err
49+
}
50+
}
51+
ctx, err := configureContext(clictx, currentContext, info.FullMethod)
52+
if err != nil {
53+
return err
54+
}
55+
56+
return handler(srv, &contextServerStream{
57+
ss: ss,
58+
ctx: ctx,
59+
})
60+
}
61+
}
62+
63+
// Returns the current context from the configuration file
64+
func getConfigContext(ctx context.Context) (string, error) {
65+
configDir := config.Dir(ctx)
66+
configFile, err := config.LoadFile(configDir)
67+
if err != nil {
68+
return "", err
69+
}
70+
return configFile.CurrentContext, nil
71+
}
72+
73+
// Returns the context set by the caller if any, error otherwise
74+
func getIncomingContext(ctx context.Context) (string, error) {
75+
if md, ok := metadata.FromIncomingContext(ctx); ok {
76+
if key, ok := md[key]; ok {
77+
return key[0], nil
78+
}
79+
}
80+
81+
return "", errors.New("not found")
82+
}
83+
84+
// configureContext populates the request context with objects the client
85+
// needs: the context store and the api client
86+
func configureContext(ctx context.Context, currentContext string, method string) (context.Context, error) {
87+
configDir := config.Dir(ctx)
88+
89+
ctx = apicontext.WithCurrentContext(ctx, currentContext)
90+
91+
// The contexts service doesn't need the client
92+
if !strings.Contains(method, "/com.docker.api.protos.context.v1.Contexts") {
93+
c, err := client.New(ctx)
94+
if err != nil {
95+
return nil, err
96+
}
97+
98+
ctx, err = proxy.WithClient(ctx, c)
99+
if err != nil {
100+
return nil, err
101+
}
102+
}
103+
104+
s, err := store.New(store.WithRoot(configDir))
105+
if err != nil {
106+
return nil, err
107+
}
108+
ctx = store.WithContextStore(ctx, s)
109+
110+
return ctx, nil
111+
}

server/interceptor_test.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package server
2+
3+
import (
4+
"context"
5+
"io/ioutil"
6+
"os"
7+
"path"
8+
"testing"
9+
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
"github.com/stretchr/testify/suite"
13+
"google.golang.org/grpc"
14+
"google.golang.org/grpc/metadata"
15+
16+
"github.com/docker/api/config"
17+
apicontext "github.com/docker/api/context"
18+
)
19+
20+
type interceptorSuite struct {
21+
suite.Suite
22+
dir string
23+
ctx context.Context
24+
}
25+
26+
func (is *interceptorSuite) BeforeTest(suiteName, testName string) {
27+
dir, err := ioutil.TempDir("", "example")
28+
require.Nil(is.T(), err)
29+
30+
ctx := context.Background()
31+
ctx = config.WithDir(ctx, dir)
32+
err = ioutil.WriteFile(path.Join(dir, "config.json"), []byte(`{"currentContext": "default"}`), 0644)
33+
require.Nil(is.T(), err)
34+
35+
is.dir = dir
36+
is.ctx = ctx
37+
}
38+
39+
func (is *interceptorSuite) AfterTest(suiteName, tesName string) {
40+
err := os.RemoveAll(is.dir)
41+
require.Nil(is.T(), err)
42+
}
43+
44+
func (is *interceptorSuite) TestUnaryGetCurrentContext() {
45+
interceptor := unaryServerInterceptor(is.ctx)
46+
47+
currentContext := is.callUnary(context.Background(), interceptor)
48+
49+
assert.Equal(is.T(), "default", currentContext)
50+
}
51+
52+
func (is *interceptorSuite) TestUnaryContextFromMetadata() {
53+
contextName := "test"
54+
55+
interceptor := unaryServerInterceptor(is.ctx)
56+
reqCtx := context.Background()
57+
reqCtx = metadata.NewIncomingContext(reqCtx, metadata.MD{
58+
(key): []string{contextName},
59+
})
60+
61+
currentContext := is.callUnary(reqCtx, interceptor)
62+
63+
assert.Equal(is.T(), contextName, currentContext)
64+
}
65+
66+
func (is *interceptorSuite) TestStreamGetCurrentContext() {
67+
interceptor := streamServerInterceptor(is.ctx)
68+
69+
currentContext := is.callStream(context.Background(), interceptor)
70+
71+
assert.Equal(is.T(), "default", currentContext)
72+
}
73+
74+
func (is *interceptorSuite) TestStreamContextFromMetadata() {
75+
contextName := "test"
76+
77+
interceptor := streamServerInterceptor(is.ctx)
78+
reqCtx := context.Background()
79+
reqCtx = metadata.NewIncomingContext(reqCtx, metadata.MD{
80+
(key): []string{contextName},
81+
})
82+
83+
currentContext := is.callStream(reqCtx, interceptor)
84+
85+
assert.Equal(is.T(), contextName, currentContext)
86+
}
87+
88+
func (is *interceptorSuite) callStream(ctx context.Context, interceptor grpc.StreamServerInterceptor) string {
89+
currentContext := ""
90+
err := interceptor(nil, &contextServerStream{
91+
ctx: ctx,
92+
}, &grpc.StreamServerInfo{
93+
FullMethod: "/com.docker.api.protos.context.v1.Contexts/test",
94+
}, func(srv interface{}, stream grpc.ServerStream) error {
95+
currentContext = apicontext.CurrentContext(stream.Context())
96+
return nil
97+
})
98+
99+
require.Nil(is.T(), err)
100+
101+
return currentContext
102+
}
103+
104+
func (is *interceptorSuite) callUnary(ctx context.Context, interceptor grpc.UnaryServerInterceptor) string {
105+
currentContext := ""
106+
resp, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{
107+
FullMethod: "/com.docker.api.protos.context.v1.Contexts/test",
108+
}, func(ctx context.Context, req interface{}) (interface{}, error) {
109+
currentContext = apicontext.CurrentContext(ctx)
110+
return nil, nil
111+
})
112+
113+
require.Nil(is.T(), err)
114+
require.Nil(is.T(), resp)
115+
116+
return currentContext
117+
}
118+
119+
func TestInterceptor(t *testing.T) {
120+
suite.Run(t, new(interceptorSuite))
121+
}

0 commit comments

Comments
 (0)