Skip to content

Commit 3205b92

Browse files
port the share infrastructure over to the subordinate framework (#789)
1 parent 5c4cf9b commit 3205b92

7 files changed

+125
-112
lines changed

agent/share.go

+38-77
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
11
package agent
22

33
import (
4-
"bytes"
5-
"encoding/json"
64
"errors"
75
"github.com/michaelquigley/pfxlog"
86
"github.com/openziti/zrok/agent/proctree"
7+
"github.com/openziti/zrok/cmd/zrok/subordinate"
98
"github.com/openziti/zrok/sdk/golang/sdk"
10-
"github.com/sirupsen/logrus"
11-
"strings"
129
"time"
1310
)
1411

@@ -28,11 +25,8 @@ type share struct {
2825
closed bool
2926
accessGrants []string
3027

31-
process *proctree.Child
32-
readBuffer bytes.Buffer
33-
booted bool
34-
bootComplete chan struct{}
35-
bootErr error
28+
process *proctree.Child
29+
sub *subordinate.MessageHandler
3630

3731
agent *Agent
3832
}
@@ -44,79 +38,46 @@ func (s *share) monitor() {
4438
s.agent.rmShare <- s
4539
}
4640

47-
func (s *share) tail(data []byte) {
48-
defer func() {
49-
if r := recover(); r != nil {
50-
logrus.Errorf("recovering: %v", r)
41+
func (s *share) bootHandler(msgType string, msg subordinate.Message) error {
42+
switch msgType {
43+
case subordinate.BootMessage:
44+
if v, found := msg["token"]; found {
45+
if str, ok := v.(string); ok {
46+
s.token = str
47+
}
48+
}
49+
if v, found := msg["backend_mode"]; found {
50+
if str, ok := v.(string); ok {
51+
s.backendMode = sdk.BackendMode(str)
52+
}
5153
}
52-
}()
53-
s.readBuffer.Write(data)
54-
if line, err := s.readBuffer.ReadString('\n'); err == nil {
55-
line = strings.Trim(line, "\n")
56-
if !s.booted {
57-
if strings.HasPrefix(line, "{") {
58-
in := make(map[string]interface{})
59-
if err := json.Unmarshal([]byte(line), &in); err == nil {
60-
if v, found := in["message"]; found {
61-
if str, ok := v.(string); ok {
62-
if str == "boot" {
63-
if v, found := in["token"]; found {
64-
if str, ok := v.(string); ok {
65-
s.token = str
66-
}
67-
}
68-
if v, found := in["backend_mode"]; found {
69-
if str, ok := v.(string); ok {
70-
s.backendMode = sdk.BackendMode(str)
71-
}
72-
}
73-
if v, found := in["share_mode"]; found {
74-
if str, ok := v.(string); ok {
75-
s.shareMode = sdk.ShareMode(str)
76-
}
77-
}
78-
if v, found := in["frontend_endpoints"]; found {
79-
if vArr, ok := v.([]interface{}); ok {
80-
for _, v := range vArr {
81-
if str, ok := v.(string); ok {
82-
s.frontendEndpoints = append(s.frontendEndpoints, str)
83-
}
84-
}
85-
}
86-
}
87-
if v, found := in["target"]; found {
88-
if str, ok := v.(string); ok {
89-
s.target = str
90-
}
91-
}
92-
s.booted = true
93-
} else {
94-
s.bootErr = errors.New(line)
95-
}
96-
} else {
97-
s.bootErr = errors.New(line)
98-
}
99-
} else {
100-
s.bootErr = errors.New(line)
54+
if v, found := msg["share_mode"]; found {
55+
if str, ok := v.(string); ok {
56+
s.shareMode = sdk.ShareMode(str)
57+
}
58+
}
59+
if v, found := msg["frontend_endpoints"]; found {
60+
if vArr, ok := v.([]interface{}); ok {
61+
for _, v := range vArr {
62+
if str, ok := v.(string); ok {
63+
s.frontendEndpoints = append(s.frontendEndpoints, str)
10164
}
102-
} else {
103-
s.bootErr = errors.New(line)
10465
}
105-
close(s.bootComplete)
106-
} else {
107-
logrus.Warn(line)
10866
}
109-
} else {
110-
if strings.HasPrefix(line, "{") {
111-
in := make(map[string]interface{})
112-
if err := json.Unmarshal([]byte(line), &in); err == nil {
113-
pfxlog.ChannelLogger(s.token).Info(in)
114-
}
115-
} else {
116-
pfxlog.ChannelLogger(s.token).Info(strings.Trim(line, "\n"))
67+
}
68+
if v, found := msg["target"]; found {
69+
if str, ok := v.(string); ok {
70+
s.target = str
71+
}
72+
}
73+
74+
case subordinate.ErrorMessage:
75+
if v, found := msg[subordinate.ErrorMessage]; found {
76+
if str, ok := v.(string); ok {
77+
return errors.New(str)
11778
}
11879
}
119-
} else {
120-
s.readBuffer.WriteString(line)
12180
}
81+
82+
return nil
12283
}

agent/sharePrivate.go

+26-10
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ package agent
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"github.com/openziti/zrok/agent/agentGrpc"
78
"github.com/openziti/zrok/agent/proctree"
9+
"github.com/openziti/zrok/cmd/zrok/subordinate"
810
"github.com/openziti/zrok/environment"
911
"github.com/openziti/zrok/sdk/golang/sdk"
1012
"github.com/sirupsen/logrus"
@@ -23,10 +25,20 @@ func (i *agentGrpcImpl) SharePrivate(_ context.Context, req *agentGrpc.SharePriv
2325

2426
shrCmd := []string{os.Args[0], "share", "private", "--subordinate", "-b", req.BackendMode}
2527
shr := &share{
26-
shareMode: sdk.PrivateShareMode,
27-
backendMode: sdk.BackendMode(req.BackendMode),
28-
bootComplete: make(chan struct{}),
29-
agent: i.agent,
28+
shareMode: sdk.PrivateShareMode,
29+
backendMode: sdk.BackendMode(req.BackendMode),
30+
sub: subordinate.NewMessageHandler(),
31+
agent: i.agent,
32+
}
33+
shr.sub.MessageHandler = func(msg subordinate.Message) {
34+
logrus.Info(msg)
35+
}
36+
var bootErr error
37+
shr.sub.BootHandler = func(msgType string, msg subordinate.Message) {
38+
bootErr = shr.bootHandler(msgType, msg)
39+
}
40+
shr.sub.MalformedHandler = func(msg subordinate.Message) {
41+
logrus.Error(msg)
3042
}
3143

3244
if req.Insecure {
@@ -49,18 +61,22 @@ func (i *agentGrpcImpl) SharePrivate(_ context.Context, req *agentGrpc.SharePriv
4961

5062
logrus.Infof("executing '%v'", shrCmd)
5163

52-
shr.process, err = proctree.StartChild(shr.tail, shrCmd...)
64+
shr.process, err = proctree.StartChild(shr.sub.Tail, shrCmd...)
5365
if err != nil {
5466
return nil, err
5567
}
5668

57-
go shr.monitor()
58-
<-shr.bootComplete
69+
<-shr.sub.BootComplete
5970

60-
if shr.bootErr == nil {
71+
if bootErr == nil {
72+
go shr.monitor()
6173
i.agent.addShare <- shr
6274
return &agentGrpc.SharePrivateResponse{Token: shr.token}, nil
75+
76+
} else {
77+
if err := proctree.WaitChild(shr.process); err != nil {
78+
logrus.Errorf("error joining: %v", err)
79+
}
80+
return nil, fmt.Errorf("unable to start share: %v", bootErr)
6381
}
64-
65-
return nil, shr.bootErr
6682
}

agent/sharePublic.go

+26-10
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ package agent
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"github.com/openziti/zrok/agent/agentGrpc"
78
"github.com/openziti/zrok/agent/proctree"
9+
"github.com/openziti/zrok/cmd/zrok/subordinate"
810
"github.com/openziti/zrok/environment"
911
"github.com/openziti/zrok/sdk/golang/sdk"
1012
"github.com/sirupsen/logrus"
@@ -23,10 +25,20 @@ func (i *agentGrpcImpl) SharePublic(_ context.Context, req *agentGrpc.SharePubli
2325

2426
shrCmd := []string{os.Args[0], "share", "public", "--subordinate", "-b", req.BackendMode}
2527
shr := &share{
26-
shareMode: sdk.PublicShareMode,
27-
backendMode: sdk.BackendMode(req.BackendMode),
28-
bootComplete: make(chan struct{}),
29-
agent: i.agent,
28+
shareMode: sdk.PublicShareMode,
29+
backendMode: sdk.BackendMode(req.BackendMode),
30+
sub: subordinate.NewMessageHandler(),
31+
agent: i.agent,
32+
}
33+
shr.sub.MessageHandler = func(msg subordinate.Message) {
34+
logrus.Info(msg)
35+
}
36+
var bootErr error
37+
shr.sub.BootHandler = func(msgType string, msg subordinate.Message) {
38+
bootErr = shr.bootHandler(msgType, msg)
39+
}
40+
shr.sub.MalformedHandler = func(msg subordinate.Message) {
41+
logrus.Error(msg)
3042
}
3143

3244
for _, basicAuth := range req.BasicAuth {
@@ -73,21 +85,25 @@ func (i *agentGrpcImpl) SharePublic(_ context.Context, req *agentGrpc.SharePubli
7385

7486
logrus.Infof("executing '%v'", shrCmd)
7587

76-
shr.process, err = proctree.StartChild(shr.tail, shrCmd...)
88+
shr.process, err = proctree.StartChild(shr.sub.Tail, shrCmd...)
7789
if err != nil {
7890
return nil, err
7991
}
8092

81-
go shr.monitor()
82-
<-shr.bootComplete
93+
<-shr.sub.BootComplete
8394

84-
if shr.bootErr == nil {
95+
if bootErr == nil {
96+
go shr.monitor()
8597
i.agent.addShare <- shr
8698
return &agentGrpc.SharePublicResponse{
8799
Token: shr.token,
88100
FrontendEndpoints: shr.frontendEndpoints,
89101
}, nil
90-
}
91102

92-
return nil, shr.bootErr
103+
} else {
104+
if err := proctree.WaitChild(shr.process); err != nil {
105+
logrus.Errorf("error joining: %v", err)
106+
}
107+
return nil, fmt.Errorf("unable to start share: %v", bootErr)
108+
}
93109
}

agent/shareReserved.go

+26-9
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@ package agent
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"github.com/openziti/zrok/agent/agentGrpc"
78
"github.com/openziti/zrok/agent/proctree"
9+
"github.com/openziti/zrok/cmd/zrok/subordinate"
810
"github.com/openziti/zrok/environment"
11+
"github.com/sirupsen/logrus"
912
"os"
1013
)
1114

@@ -21,9 +24,19 @@ func (i *agentGrpcImpl) ShareReserved(_ context.Context, req *agentGrpc.ShareRes
2124

2225
shrCmd := []string{os.Args[0], "share", "reserved", "--subordinate"}
2326
shr := &share{
24-
reserved: true,
25-
bootComplete: make(chan struct{}),
26-
agent: i.agent,
27+
reserved: true,
28+
sub: subordinate.NewMessageHandler(),
29+
agent: i.agent,
30+
}
31+
shr.sub.MessageHandler = func(msg subordinate.Message) {
32+
logrus.Info(msg)
33+
}
34+
var bootErr error
35+
shr.sub.BootHandler = func(msgType string, msg subordinate.Message) {
36+
bootErr = shr.bootHandler(msgType, msg)
37+
}
38+
shr.sub.MalformedHandler = func(msg subordinate.Message) {
39+
logrus.Error(msg)
2740
}
2841

2942
if req.OverrideEndpoint != "" {
@@ -38,15 +51,15 @@ func (i *agentGrpcImpl) ShareReserved(_ context.Context, req *agentGrpc.ShareRes
3851
shrCmd = append(shrCmd, req.Token)
3952
shr.token = req.Token
4053

41-
shr.process, err = proctree.StartChild(shr.tail, shrCmd...)
54+
shr.process, err = proctree.StartChild(shr.sub.Tail, shrCmd...)
4255
if err != nil {
4356
return nil, err
4457
}
4558

46-
go shr.monitor()
47-
<-shr.bootComplete
59+
<-shr.sub.BootComplete
4860

49-
if shr.bootErr == nil {
61+
if bootErr == nil {
62+
go shr.monitor()
5063
i.agent.addShare <- shr
5164
return &agentGrpc.ShareReservedResponse{
5265
Token: shr.token,
@@ -55,7 +68,11 @@ func (i *agentGrpcImpl) ShareReserved(_ context.Context, req *agentGrpc.ShareRes
5568
FrontendEndpoints: shr.frontendEndpoints,
5669
Target: shr.target,
5770
}, nil
58-
}
5971

60-
return nil, shr.bootErr
72+
} else {
73+
if err := proctree.WaitChild(shr.process); err != nil {
74+
logrus.Errorf("error joining: %v", err)
75+
}
76+
return nil, fmt.Errorf("unable to start share: %v", bootErr)
77+
}
6178
}

cmd/zrok/sharePrivate.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
tea "github.com/charmbracelet/bubbletea"
88
"github.com/openziti/zrok/agent/agentClient"
99
"github.com/openziti/zrok/agent/agentGrpc"
10+
"github.com/openziti/zrok/cmd/zrok/subordinate"
1011
"github.com/openziti/zrok/endpoints"
1112
"github.com/openziti/zrok/endpoints/drive"
1213
"github.com/openziti/zrok/endpoints/proxy"
@@ -371,7 +372,7 @@ func (cmd *sharePrivateCommand) shareLocal(args []string, root env_core.Root) {
371372

372373
if cmd.subordinate {
373374
data := make(map[string]interface{})
374-
data["message"] = "boot"
375+
data[subordinate.MessageKey] = subordinate.BootMessage
375376
data["token"] = shr.Token
376377
data["frontend_endpoints"] = shr.FrontendEndpoints
377378
jsonData, err := json.Marshal(data)
@@ -395,7 +396,7 @@ func (cmd *sharePrivateCommand) shareLocal(args []string, root env_core.Root) {
395396
select {
396397
case req := <-requests:
397398
data := make(map[string]interface{})
398-
data["message"] = "access"
399+
data[subordinate.MessageKey] = "access"
399400
data["remote_address"] = req.RemoteAddr
400401
data["method"] = req.Method
401402
data["path"] = req.Path

cmd/zrok/sharePublic.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/gobwas/glob"
99
"github.com/openziti/zrok/agent/agentClient"
1010
"github.com/openziti/zrok/agent/agentGrpc"
11+
"github.com/openziti/zrok/cmd/zrok/subordinate"
1112
"github.com/openziti/zrok/endpoints"
1213
"github.com/openziti/zrok/endpoints/drive"
1314
"github.com/openziti/zrok/endpoints/proxy"
@@ -273,7 +274,7 @@ func (cmd *sharePublicCommand) shareLocal(args []string, root env_core.Root) {
273274

274275
if cmd.subordinate {
275276
data := make(map[string]interface{})
276-
data["message"] = "boot"
277+
data[subordinate.MessageKey] = subordinate.BootMessage
277278
data["token"] = shr.Token
278279
data["frontend_endpoints"] = shr.FrontendEndpoints
279280
jsonData, err := json.Marshal(data)
@@ -297,7 +298,7 @@ func (cmd *sharePublicCommand) shareLocal(args []string, root env_core.Root) {
297298
select {
298299
case req := <-requests:
299300
data := make(map[string]interface{})
300-
data["message"] = "access"
301+
data[subordinate.MessageKey] = "access"
301302
data["remote_address"] = req.RemoteAddr
302303
data["method"] = req.Method
303304
data["path"] = req.Path

0 commit comments

Comments
 (0)