Skip to content

Commit dd7f9d8

Browse files
authored
Merge pull request #624 from openziti/multipart-receive
implement support for receiving multipart edge messages
2 parents fc66b65 + 5885741 commit dd7f9d8

File tree

6 files changed

+120
-12
lines changed

6 files changed

+120
-12
lines changed

CHANGELOG.md

+7
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
# Release notes 0.23.42
2+
3+
## Issues Fixed and Dependency Updates
4+
5+
* github.com/openziti/sdk-golang: [v0.23.41 -> v0.23.42](https://github.com/openziti/sdk-golang/compare/v0.23.41...v0.23.42)
6+
* [Issue #625](https://github.com/openziti/sdk-golang/issues/625) - traffic optimization: implement support for receiving multi-part edge payloads
7+
18
# Release notes 0.23.41
29

310
## Issues Fixed and Dependency Updates

ziti/edge/conn.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -153,14 +153,23 @@ func (ec *MsgChannel) WriteTraced(data []byte, msgUUID []byte, hdrs map[int32][]
153153
copyBuf := make([]byte, len(data))
154154
copy(copyBuf, data)
155155

156-
msg := NewDataMsg(ec.id, ec.msgIdSeq.Next(), copyBuf)
156+
seq := ec.msgIdSeq.Next()
157+
msg := NewDataMsg(ec.id, seq, copyBuf)
157158
if msgUUID != nil {
158159
msg.Headers[UUIDHeader] = msgUUID
159160
}
160161

161162
for k, v := range hdrs {
162163
msg.Headers[k] = v
163164
}
165+
166+
// indicate that we can accept multipart messages
167+
// with the first message
168+
if seq == 1 {
169+
flags, _ := msg.GetUint32Header(FlagsHeader)
170+
flags = flags | MULTIPART
171+
msg.PutUint32Header(FlagsHeader, flags)
172+
}
164173
ec.TraceMsg("write", msg)
165174
pfxlog.Logger().WithFields(GetLoggerFields(msg)).Debugf("writing %v bytes", len(copyBuf))
166175

ziti/edge/messages.go

+9
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,15 @@ const (
106106

107107
// FIN is an edge payload flag used to signal communication ends
108108
FIN = 0x1
109+
// TRACE_UUID indicates that peer will send data messages with specially constructed UUID headers
110+
TRACE_UUID = 1 << 1
111+
// MULTIPART indicates that peer can accept multipart data messages
112+
MULTIPART = 1 << 2
113+
// STREAM indicates connection with stream semantics
114+
// this allows consolidation of payloads to lower overhead
115+
STREAM = 1 << 3
116+
// MULTIPART_MSG set on data message with multiple payloads
117+
MULTIPART_MSG = 1 << 4
109118
)
110119

111120
type CryptoMethod byte

ziti/edge/network/conn.go

+42-10
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
"crypto/rand"
2828
"encoding/base64"
29+
"encoding/binary"
2930
"github.com/michaelquigley/pfxlog"
3031
"github.com/openziti/channel/v3"
3132
"github.com/openziti/edge-api/rest_model"
@@ -52,9 +53,10 @@ var _ edge.Conn = &edgeConn{}
5253
type edgeConn struct {
5354
edge.MsgChannel
5455
readQ *noopSeq[*channel.Message]
55-
leftover []byte
56+
inBuffer [][]byte
5657
msgMux edge.MsgMux
5758
hosting cmap.ConcurrentMap[string, *edgeListener]
59+
flags uint32
5860
closed atomic.Bool
5961
readFIN atomic.Bool
6062
sentFIN atomic.Bool
@@ -458,10 +460,16 @@ func (conn *edgeConn) Read(p []byte) (int, error) {
458460
}
459461

460462
log.Tracef("read buffer = %d bytes", len(p))
461-
if len(conn.leftover) > 0 {
462-
log.Tracef("found %d leftover bytes", len(conn.leftover))
463-
n := copy(p, conn.leftover)
464-
conn.leftover = conn.leftover[n:]
463+
if len(conn.inBuffer) > 0 {
464+
first := conn.inBuffer[0]
465+
log.Tracef("found %d buffered bytes", len(first))
466+
n := copy(p, first)
467+
first = first[n:]
468+
if len(first) == 0 {
469+
conn.inBuffer = conn.inBuffer[1:]
470+
} else {
471+
conn.inBuffer[0] = first
472+
}
465473
return n, nil
466474
}
467475

@@ -471,7 +479,7 @@ func (conn *edgeConn) Read(p []byte) (int, error) {
471479
}
472480

473481
msg, err := conn.readQ.GetNext()
474-
if err == ErrClosed {
482+
if errors.Is(err, ErrClosed) {
475483
log.Debug("sequencer closed, closing connection")
476484
conn.closed.Store(true)
477485
return 0, io.EOF
@@ -484,6 +492,7 @@ func (conn *edgeConn) Read(p []byte) (int, error) {
484492
if flags&edge.FIN != 0 {
485493
conn.readFIN.Store(true)
486494
}
495+
conn.flags = conn.flags | (flags & (edge.STREAM | edge.MULTIPART))
487496

488497
switch msg.ContentType {
489498

@@ -499,6 +508,8 @@ func (conn *edgeConn) Read(p []byte) (int, error) {
499508
return 0, io.EOF
500509
}
501510

511+
multipart := (flags & edge.MULTIPART_MSG) != 0
512+
502513
// first data message should contain crypto header
503514
if conn.rxKey != nil {
504515
if len(d) != secretstream.StreamHeaderBytes {
@@ -519,11 +530,32 @@ func (conn *edgeConn) Read(p []byte) (int, error) {
519530
return 0, err
520531
}
521532
}
522-
n := copy(p, d)
523-
conn.leftover = d[n:]
533+
n := 0
534+
if multipart && len(d) > 0 {
535+
var parts [][]byte
536+
for len(d) > 0 {
537+
l := binary.LittleEndian.Uint16(d[0:2])
538+
d = d[2:]
539+
part := d[0:l]
540+
d = d[l:]
541+
parts = append(parts, part)
542+
}
543+
n = copy(p, parts[0])
544+
parts[0] = parts[0][n:]
545+
if len(parts[0]) == 0 {
546+
parts = parts[1:]
547+
}
548+
conn.inBuffer = append(conn.inBuffer, parts...)
549+
} else {
550+
n = copy(p, d)
551+
d = d[n:]
552+
if len(d) > 0 {
553+
conn.inBuffer = append(conn.inBuffer, d)
554+
}
555+
}
524556

525-
log.Tracef("saving %d bytes for leftover", len(conn.leftover))
526-
log.Debugf("reading %v bytes", n)
557+
log.Tracef("%d chunks in incoming buffer", len(conn.inBuffer))
558+
log.Debugf("read %v bytes", n)
527559
return n, nil
528560

529561
default:

ziti/edge/network/conn_test.go

+51
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ package network
22

33
import (
44
"crypto/x509"
5+
"encoding/binary"
56
"github.com/openziti/channel/v3"
67
"github.com/openziti/foundation/v2/sequencer"
78
"github.com/openziti/sdk-golang/ziti/edge"
89
"github.com/stretchr/testify/require"
10+
"io"
911
"sync/atomic"
1012
"testing"
1113
"time"
@@ -121,6 +123,55 @@ func BenchmarkSequencer(b *testing.B) {
121123
}
122124
}
123125

126+
func TestReadMultipart(t *testing.T) {
127+
req := require.New(t)
128+
mux := edge.NewCowMapMsgMux()
129+
testChannel := &NoopTestChannel{}
130+
131+
readQ := NewNoopSequencer[*channel.Message](4)
132+
conn := &edgeConn{
133+
MsgChannel: *edge.NewEdgeMsgChannel(testChannel, 1),
134+
readQ: readQ,
135+
msgMux: mux,
136+
serviceName: "test",
137+
}
138+
139+
var stop atomic.Bool
140+
defer stop.Store(true)
141+
142+
var multipart []byte
143+
words := []string{"Hello", "World", "of", "ziti"}
144+
for _, w := range words {
145+
multipart = binary.LittleEndian.AppendUint16(multipart, uint16(len(w)))
146+
multipart = append(multipart, []byte(w)...)
147+
}
148+
msg := edge.NewDataMsg(1, uint32(0), multipart)
149+
msg.Headers.PutUint32Header(edge.FlagsHeader, uint32(edge.MULTIPART_MSG))
150+
_ = readQ.PutSequenced(msg)
151+
msg = edge.NewDataMsg(1, uint32(0), nil)
152+
msg.Headers.PutUint32Header(edge.FlagsHeader, uint32(edge.FIN))
153+
err := readQ.PutSequenced(msg)
154+
if err != nil {
155+
panic(err)
156+
}
157+
158+
var read []string
159+
for {
160+
data := make([]byte, 1024)
161+
req.NoError(conn.SetReadDeadline(time.Now().Add(1 * time.Second)))
162+
n, e := conn.Read(data)
163+
if e == io.EOF {
164+
break
165+
}
166+
167+
req.NoError(e)
168+
169+
read = append(read, string(data[:n]))
170+
}
171+
172+
req.Equal(words, read)
173+
}
174+
124175
type NoopTestChannel struct {
125176
}
126177

ziti/sdkinfo/build_info.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)