diff --git a/cgo-nmsg/zmq_test.go b/cgo-nmsg/zmq_test.go new file mode 100644 index 0000000..f981c94 --- /dev/null +++ b/cgo-nmsg/zmq_test.go @@ -0,0 +1,284 @@ +package nmsg_test + +import ( + "errors" + "github.com/farsightsec/go-nmsg" + cnmsg "github.com/farsightsec/go-nmsg/cgo-nmsg" + "io" + "log" + "strconv" + "testing" +) + +type tester func(*testing.T, io.Reader, io.Writer) +type testerCgo func(*testing.T, cnmsg.Input, cnmsg.Output) +type testerMixed func(*testing.T, cnmsg.Input, cnmsg.Output, nmsg.Input, nmsg.Output) + +func compare(a, b []byte) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + +func PayloadIsEqual(c *nmsg.NmsgPayload, d *nmsg.NmsgPayload) bool { + if *c.Vid != *d.Vid || *c.Msgtype != *d.Msgtype { + return false + } + + return compare(c.Payload, d.Payload) +} + +func MessageIsEqual(c *cnmsg.Message, d *cnmsg.Message) bool { + ct, cv := c.GetMsgtype() + dt, dv := d.GetMsgtype() + if ct != dt || cv != dv { + return false + } + + cp, err := c.GetBytesField("payload", 0) + if err != nil { + return false + } + + dp, err := d.GetBytesField("payload", 0) + if err != nil { + return false + } + + return compare(cp, dp) +} + +func getCgoMessage(t *testing.T, size int) *cnmsg.Message { + mod := cnmsg.MessageModLookupByName("base", "encode") + if mod == nil { + log.Fatal("module not found") + } + msg := cnmsg.NewMessage(mod) + if err := msg.SetEnumField("type", 0, "TEXT"); err != nil { + log.Fatal(err) + } + + payload := make([]byte, size) + for i := range payload { + payload[i] = '0' + } + + if err := msg.SetBytesField("payload", 0, payload); err != nil { + log.Fatal(err) + } + + return msg +} + +func doWriteCgo(t *testing.T, s chan bool, o cnmsg.Output) error { + for { + select { + case _ = <-s: + return nil + default: + msg := getCgoMessage(t, 500) + err := o.Write(msg) + if err != nil { + return err + } + } + } +} + +func doReadCGo(s chan bool, i cnmsg.Input) (*cnmsg.Message, error) { + var rmsg *cnmsg.Message + var err error + for { + rmsg, err = i.Read() + if err != nil { + if cnmsg.ErrorRetry(err) == false { + return nil, err + } + } else if rmsg == nil { + return nil, errors.New("receive nil message") + } else { + break + } + } + s <- true + return rmsg, nil +} + +func doWriteNmsg(s chan bool, msg *nmsg.NmsgPayload, o nmsg.Output) error { + for { + select { + case _ = <-s: + return nil + default: + err := o.Send(msg) + if err != nil { + return err + } + } + } +} + +func doReadNmsg(s chan bool, i nmsg.Input) (*nmsg.NmsgPayload, error) { + var rmsg *nmsg.NmsgPayload + var err error + for { + rmsg, err = i.Recv() + if err != nil { + return nil, err + } else if rmsg == nil { + return nil, errors.New("receive nil message") + } else { + break + } + } + s <- true + return rmsg, nil +} + +func doTestCgoDo(t *testing.T, i cnmsg.Input, o cnmsg.Output) { + signal := make(chan bool) + + go func() { + err := doWriteCgo(t, signal, o) + if err != nil { + t.Fatal(err.Error()) + } + }() + + rmsg, err := doReadCGo(signal, i) + + if err != nil { + t.Fatal(err.Error()) + } + + msg_ref := getCgoMessage(t, 500) + if MessageIsEqual(rmsg, msg_ref) == false { + log.Fatal("messages do not match") + } +} + +func doTestForCGo(t *testing.T, ep string, tp string, fn testerCgo) { + writer, err := cnmsg.NewZMQOutput(ep+",accept,"+tp, 2000) + + if err != nil { + t.Error(err.Error()) + return + } + + reader, err := cnmsg.NewZMQInput(ep + ",connect," + tp) + + if err != nil { + t.Error(err.Error()) + return + } + + fn(t, reader, writer) +} + +func doTestMixedDo(t *testing.T, ci cnmsg.Input, co cnmsg.Output, ni nmsg.Input, no nmsg.Output) { + // Write to co, read for ni, write to no, read from ci + signal1 := make(chan bool) + signal2 := make(chan bool) + + go func() { + err := doWriteCgo(t, signal1, co) + if err != nil { + log.Fatal(err.Error()) + } + }() + pin, err := doReadNmsg(signal1, ni) + if err != nil { + log.Fatal(err.Error()) + } + go func() { + err := doWriteNmsg(signal2, pin, no) + if err != nil { + log.Fatal(err.Error()) + } + }() + rmsg, err := doReadCGo(signal2, ci) + if err != nil { + log.Fatal(err.Error()) + } + + msg_ref := getCgoMessage(t, 500) + + if MessageIsEqual(rmsg, msg_ref) == false { + log.Fatal("messages do not match") + } +} + +func doTestForMixed(t *testing.T, ep string, num int, tp string, fn testerMixed) { + ep1 := ep + strconv.Itoa(num) + ep2 := ep + strconv.Itoa(num+1) + + co, err := cnmsg.NewZMQOutput(ep1+",accept,"+tp, 1000) + + if err != nil { + t.Error(err.Error() + " " + ep1) + return + } + + nw, err := nmsg.NewZMQWriter(ep2 + ",accept," + tp) + if err != nil { + t.Error(err.Error() + " " + ep2) + return + } + + nr, err := nmsg.NewZMQReader(ep1 + ",connect," + tp) + if err != nil { + t.Error(err.Error() + " " + ep1) + return + } + + ci, err := cnmsg.NewZMQInput(ep2 + ",connect," + tp) + + if err != nil { + t.Error(err.Error() + " " + ep2) + return + } + + ni := nmsg.NewInput(nr, 1000) + no := nmsg.UnbufferedOutput(nw) + + fn(t, ci, co, ni, no) +} + +func TestZMQ_CGo_Local(t *testing.T) { + doTestForCGo(t, "tcp://127.0.0.1:6555", "pushpull", doTestCgoDo) + doTestForCGo(t, "tcp://127.0.0.1:6557", "pubsub", doTestCgoDo) +} + +func TestZMQ_CGo_Inproc(t *testing.T) { + doTestForCGo(t, "inproc://TestZMQInproc10", "pushpull", doTestCgoDo) + doTestForCGo(t, "inproc://TestZMQInproc30", "pubsub", doTestCgoDo) +} + +func TestZMQ_CGo_IPC(t *testing.T) { + doTestForCGo(t, "ipc:///tmp/TestZMQIpc10", "pushpull", doTestCgoDo) + doTestForCGo(t, "ipc:///tmp/TestZMQIpc20", "pubsub", doTestCgoDo) +} + +# This test is currently failing with a segfault +func TestZmq_Mixed_Local(t *testing.T) { + doTestForMixed(t, "tcp://127.0.0.1:", 7555, "pushpull", doTestMixedDo) + doTestForMixed(t, "tcp://127.0.0.1:", 7557, "pubsub", doTestMixedDo) +} + +//// Inproc cgo-nmsg side fill writer buffer and hangs +//func TestZmq_Mixed_Inproc(t *testing.T) { +// doTestForMixed(t, "inproc://TestZMQInproc", 100, "pushpull", doTestMixedDo) +// doTestForMixed(t, "inproc://TestZMQInproc", 200, "pubsub", doTestMixedDo) +//} + +# This test is currently failing with a segfault +func TestZmq_Mixed_IPC(t *testing.T) { + doTestForMixed(t, "ipc:///tmp/TestZMQIpc", 100, "pushpull", doTestMixedDo) + doTestForMixed(t, "ipc:///tmp/TestZMQIpc", 200, "pubsub", doTestMixedDo) +} diff --git a/debian/changelog b/debian/changelog index a71a083..6fae9b2 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,11 @@ +go-nmsg (0.3.1-1) debian-fsi; urgency=medium + + * Update dependencies for package building. + * Skip building cgo-nmsg. + * Move cgo-nmsg tests into that directory so they are also skipped. + + -- Farsight Security Inc Thu, 29 May 2025 13:05:44 -0400 + go-nmsg (0.3.0-1) debian-fsi; urgency=medium * Add support for dnsobs message type. diff --git a/debian/control b/debian/control index 094d890..b2a210e 100644 --- a/debian/control +++ b/debian/control @@ -6,8 +6,9 @@ Build-Depends: debhelper-compat (= 13), dh-golang, golang-any, - golang-google-protobuf-dev, golang-github-dnstap-go-dnstap-dev, + golang-github-pebbe-zmq4-dev, + golang-google-protobuf-dev, golang-gopkg-yaml.v2-dev, libnmsg-dev (>= 1.1.0~), pkg-config, @@ -19,6 +20,8 @@ XS-Go-Import-Path: github.com/farsightsec/go-nmsg Package: golang-github-farsightsec-go-nmsg-dev Architecture: any Depends: ${shlibs:Depends}, ${misc:Depends}, + golang-github-dnstap-go-dnstap-dev, + golang-github-pebbe-zmq4-dev, golang-google-protobuf-dev, Description: Pure Golang NMSG Library go-nmsg is a pure go implementation of the NMSG container and payload @@ -33,20 +36,7 @@ Package: golang-github-farsightsec-go-nmsg-base-dev Architecture: any Depends: ${shlibs:Depends}, ${misc:Depends}, golang-github-farsightsec-go-nmsg-dev (= ${binary:Version}), - golang-github-dnstap-go-dnstap-dev, - golang-google-protobuf-dev, Description: NMSG vendor base encoding modules for Golang This package provides generated Go code from Protocol Buffers Version 2 specifications for NMSG vendor base encoding modules: Dns, DnsQR, Email, Encode, Http, IPConn, Linkpair, LogLine, Ncap, Packet, Pkt, and Xml. - -Package: golang-github-farsightsec-go-nmsg-cgo-nmsg-dev -Architecture: any -Depends: ${shlibs:Depends}, ${misc:Depends}, - pkg-config, - libnmsg-dev, - libxs-dev -Description: Golang bindings to the C libnmsg library - The NMSG network message encapsulation library format is an efficient - encoding of typed, structured data into payloads which are packed into - containers which can be transmitted over the network or stored to disk. diff --git a/debian/golang-github-farsightsec-go-nmsg-cgo-nmsg-dev.install b/debian/golang-github-farsightsec-go-nmsg-cgo-nmsg-dev.install deleted file mode 100644 index 5f5694d..0000000 --- a/debian/golang-github-farsightsec-go-nmsg-cgo-nmsg-dev.install +++ /dev/null @@ -1 +0,0 @@ -usr/share/gocode/src/github.com/farsightsec/go-nmsg/cgo-nmsg/*.go diff --git a/debian/rules b/debian/rules index 0c46032..55fff1f 100755 --- a/debian/rules +++ b/debian/rules @@ -1,4 +1,6 @@ #!/usr/bin/make -f +export DH_GOLANG_EXCLUDES := cgo-nmsg + %: dh $@ --with=golang --builddirectory=_build --buildsystem=golang diff --git a/go.mod b/go.mod index d5376ca..f3b0973 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( github.com/dnstap/golang-dnstap v0.4.0 - github.com/pebbe/zmq4 v1.2.10 + github.com/pebbe/zmq4 v1.4.0 google.golang.org/protobuf v1.25.0 gopkg.in/yaml.v2 v2.4.0 ) diff --git a/go.sum b/go.sum index 3990d06..d6f3a9f 100644 --- a/go.sum +++ b/go.sum @@ -26,8 +26,8 @@ github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/miekg/dns v1.1.31 h1:sJFOl9BgwbYAWOGEwr61FU28pqsBNdpRBnhGXtO06Oo= github.com/miekg/dns v1.1.31/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM= -github.com/pebbe/zmq4 v1.2.10 h1:wQkqRZ3CZeABIeidr3e8uQZMMH5YAykA/WN0L5zkd1c= -github.com/pebbe/zmq4 v1.2.10/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r48= +github.com/pebbe/zmq4 v1.4.0 h1:gO5P92Ayl8GXpPZdYcD62Cwbq0slSBVVQRIXwGSJ6eQ= +github.com/pebbe/zmq4 v1.4.0/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r48= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 h1:ObdrDkeb4kJdCP557AjRjq69pTHfNouLtWZG7j9rPN8= diff --git a/zmq_test.go b/zmq_test.go index 2852803..cf19316 100644 --- a/zmq_test.go +++ b/zmq_test.go @@ -3,16 +3,11 @@ package nmsg_test import ( "errors" "github.com/farsightsec/go-nmsg" - cnmsg "github.com/farsightsec/go-nmsg/cgo-nmsg" "io" - "log" - "strconv" "testing" ) type tester func(*testing.T, io.Reader, io.Writer) -type testerCgo func(*testing.T, cnmsg.Input, cnmsg.Output) -type testerMixed func(*testing.T, cnmsg.Input, cnmsg.Output, nmsg.Input, nmsg.Output) func PayloadIsEqual(c *nmsg.NmsgPayload, d *nmsg.NmsgPayload) bool { if *c.Vid != *d.Vid || *c.Msgtype != *d.Msgtype { @@ -22,82 +17,6 @@ func PayloadIsEqual(c *nmsg.NmsgPayload, d *nmsg.NmsgPayload) bool { return compare(c.Payload, d.Payload) } -func MessageIsEqual(c *cnmsg.Message, d *cnmsg.Message) bool { - ct, cv := c.GetMsgtype() - dt, dv := d.GetMsgtype() - if ct != dt || cv != dv { - return false - } - - cp, err := c.GetBytesField("payload", 0) - if err != nil { - return false - } - - dp, err := d.GetBytesField("payload", 0) - if err != nil { - return false - } - - return compare(cp, dp) -} - -func getCgoMessage(t *testing.T, size int) *cnmsg.Message { - mod := cnmsg.MessageModLookupByName("base", "encode") - if mod == nil { - log.Fatal("module not found") - } - msg := cnmsg.NewMessage(mod) - if err := msg.SetEnumField("type", 0, "TEXT"); err != nil { - log.Fatal(err) - } - - payload := make([]byte, size) - for i := range payload { - payload[i] = '0' - } - - if err := msg.SetBytesField("payload", 0, payload); err != nil { - log.Fatal(err) - } - - return msg -} - -func doWriteCgo(t *testing.T, s chan bool, o cnmsg.Output) error { - for { - select { - case _ = <-s: - return nil - default: - msg := getCgoMessage(t, 500) - err := o.Write(msg) - if err != nil { - return err - } - } - } -} - -func doReadCGo(s chan bool, i cnmsg.Input) (*cnmsg.Message, error) { - var rmsg *cnmsg.Message - var err error - for { - rmsg, err = i.Read() - if err != nil { - if cnmsg.ErrorRetry(err) == false { - return nil, err - } - } else if rmsg == nil { - return nil, errors.New("receive nil message") - } else { - break - } - } - s <- true - return rmsg, nil -} - func doWriteNmsg(s chan bool, msg *nmsg.NmsgPayload, o nmsg.Output) error { for { select { @@ -157,28 +76,6 @@ func doTestDo(t *testing.T, i nmsg.Input, o nmsg.Output) { } } -func doTestCgoDo(t *testing.T, i cnmsg.Input, o cnmsg.Output) { - signal := make(chan bool) - - go func() { - err := doWriteCgo(t, signal, o) - if err != nil { - t.Fatal(err.Error()) - } - }() - - rmsg, err := doReadCGo(signal, i) - - if err != nil { - t.Fatal(err.Error()) - } - - msg_ref := getCgoMessage(t, 500) - if MessageIsEqual(rmsg, msg_ref) == false { - log.Fatal("messages do not match") - } -} - func doTestUnbuffered(t *testing.T, r io.Reader, w io.Writer) { input := nmsg.NewInput(r, 1000) output := nmsg.UnbufferedOutput(w) @@ -211,93 +108,6 @@ func doTestFor(t *testing.T, ep string, tp string, fn tester) { fn(t, reader, writer) } -func doTestForCGo(t *testing.T, ep string, tp string, fn testerCgo) { - writer, err := cnmsg.NewZMQOutput(ep+",accept,"+tp, 2000) - - if err != nil { - t.Error(err.Error()) - return - } - - reader, err := cnmsg.NewZMQInput(ep + ",connect," + tp) - - if err != nil { - t.Error(err.Error()) - return - } - - fn(t, reader, writer) -} - -func doTestMixedDo(t *testing.T, ci cnmsg.Input, co cnmsg.Output, ni nmsg.Input, no nmsg.Output) { - // Write to co, read for ni, write to no, read from ci - signal1 := make(chan bool) - signal2 := make(chan bool) - - go func() { - err := doWriteCgo(t, signal1, co) - if err != nil { - log.Fatal(err.Error()) - } - }() - pin, err := doReadNmsg(signal1, ni) - if err != nil { - log.Fatal(err.Error()) - } - go func() { - err := doWriteNmsg(signal2, pin, no) - if err != nil { - log.Fatal(err.Error()) - } - }() - rmsg, err := doReadCGo(signal2, ci) - if err != nil { - log.Fatal(err.Error()) - } - - msg_ref := getCgoMessage(t, 500) - - if MessageIsEqual(rmsg, msg_ref) == false { - log.Fatal("messages do not match") - } -} - -func doTestForMixed(t *testing.T, ep string, num int, tp string, fn testerMixed) { - ep1 := ep + strconv.Itoa(num) - ep2 := ep + strconv.Itoa(num+1) - - co, err := cnmsg.NewZMQOutput(ep1+",accept,"+tp, 1000) - - if err != nil { - t.Error(err.Error() + " " + ep1) - return - } - - nw, err := nmsg.NewZMQWriter(ep2 + ",accept," + tp) - if err != nil { - t.Error(err.Error() + " " + ep2) - return - } - - nr, err := nmsg.NewZMQReader(ep1 + ",connect," + tp) - if err != nil { - t.Error(err.Error() + " " + ep1) - return - } - - ci, err := cnmsg.NewZMQInput(ep2 + ",connect," + tp) - - if err != nil { - t.Error(err.Error() + " " + ep2) - return - } - - ni := nmsg.NewInput(nr, 1000) - no := nmsg.UnbufferedOutput(nw) - - fn(t, ci, co, ni, no) -} - func TestZMQLocal(t *testing.T) { doTestFor(t, "tcp://127.0.0.1:5555", "pushpull", doTestUnbuffered) doTestFor(t, "tcp://127.0.0.1:5556", "pushpull", doTestBuffered) @@ -318,34 +128,3 @@ func TestZMQIpc(t *testing.T) { doTestFor(t, "ipc:///tmp/TestZMQIpc2", "pubsub", doTestUnbuffered) doTestFor(t, "ipc:///tmp/TestZMQIpc2", "pubsub", doTestBuffered) } - -func TestZMQ_CGo_Local(t *testing.T) { - doTestForCGo(t, "tcp://127.0.0.1:6555", "pushpull", doTestCgoDo) - doTestForCGo(t, "tcp://127.0.0.1:6557", "pubsub", doTestCgoDo) -} - -func TestZMQ_CGo_Inproc(t *testing.T) { - doTestForCGo(t, "inproc://TestZMQInproc10", "pushpull", doTestCgoDo) - doTestForCGo(t, "inproc://TestZMQInproc30", "pubsub", doTestCgoDo) -} - -func TestZMQ_CGo_IPC(t *testing.T) { - doTestForCGo(t, "ipc:///tmp/TestZMQIpc10", "pushpull", doTestCgoDo) - doTestForCGo(t, "ipc:///tmp/TestZMQIpc20", "pubsub", doTestCgoDo) -} - -func TestZmq_Mixed_Local(t *testing.T) { - doTestForMixed(t, "tcp://127.0.0.1:", 7555, "pushpull", doTestMixedDo) - doTestForMixed(t, "tcp://127.0.0.1:", 7557, "pubsub", doTestMixedDo) -} - -//// Inproc cgo-nmsg side fill writer buffer and hangs -//func TestZmq_Mixed_Inproc(t *testing.T) { -// doTestForMixed(t, "inproc://TestZMQInproc", 100, "pushpull", doTestMixedDo) -// doTestForMixed(t, "inproc://TestZMQInproc", 200, "pubsub", doTestMixedDo) -//} - -func TestZmq_Mixed_IPC(t *testing.T) { - doTestForMixed(t, "ipc:///tmp/TestZMQIpc", 100, "pushpull", doTestMixedDo) - doTestForMixed(t, "ipc:///tmp/TestZMQIpc", 200, "pubsub", doTestMixedDo) -}