Skip to content

Flowgraph Extension

Scott Johnston edited this page Aug 30, 2018 · 45 revisions

The flowgraph model of computation is extendable over sockets with some sort of ready-send protocol. A simple one is the reading (or writing) of a new-line terminated string from (or to) a two-way socket, followed by the writing (or reading) of a single new-line character ("\n").

FuncSrc is a flowgraph primitive for the downstream end of any such two-way connection of interface type io.ReadWriter (which includes net.Conn):

// FuncSrc reads a data value and writes a '\n' acknowledgement.
func FuncSrc(x Edge, rw io.ReadWriter) Node {

        node := MakeNode("src", nil, []*Edge{&x}, nil, srcFire)
        reader := bufio.NewReader(rw)
        writer := bufio.NewWriter(rw)
        node.Aux = bufio.NewReadWriter(reader, writer)
        return node
	
}

The eventual bufio.ReadWriter is stored in the Node Aux empty interface and used in the NodeFire type func:

func srcFire (n *Node) {	 
        x := n.Dsts[0] 		 
        rw := n.Aux.(*bufio.ReadWriter)

        // read data string
        xv, err := rw.ReadString('\n')
        if err != nil {
                n.Errorf("%v", err)
                close(x.Data)
                x.Data = nil
                return
        }
        x.DstPut(xv)

        // write ack
        _, err = rw.WriteString("\n")
        if err != nil {
                n.Errorf("%v", err)
                close(x.Data)
                x.Data = nil
                return
        }
        rw.Flush()
}

In other flowgraph primitives all inputs (data and acks) are read before using the NodeFire func, and all outputs are written afterwards. In FuncSrc one more data input is read to start the firing, and the first ack is written to finish it.

FuncDst preserves this order of read before write in the dstFire func of type NodeFire:

func dstFire (n *Node) {	 
        a := n.Srcs[0] 		 
        s := n.Aux.(*irw)
        rw := s.RW
        var err error

        // read ack
        a.Flow = true
        if s.Initialized  {
                _, err = rw.ReadString('\n')
                if err != nil { 
                        n.Errorf("%v", err)
                        close(a.Ack)
                        a.Ack = nil
                        return
                }
        } else {
                s.Initialized = true
        }
	
        // write data
        _, err = rw.WriteString(fmt.Sprintf("%v\n", a.Val))
        if err != nil {
                n.Errorf("%v", err)
                close(a.Ack)
                a.Ack = nil
                return
        }
        rw.Flush()
}

However since that means an ack would be read before any data can be written, the initial read needs to be skipped. This is accomplished by bundling the bufio.ReadWriter stored in Node.Aux with an Initialization flag set true after the first time:

type irw struct {
        Initialized bool
        RW *bufio.ReadWriter
}

Every read and write over the network connection will block until the goroutine on the other end does a matching write or read. But whether FuncSrc or FuncDst has to wait, the normal cycling through read/fire/write is preserved for each. And because each is an independent goroutine, there is no blocking of other goroutines waiting for a two-way socket (other than waiting for data or acks to flow).

The test bench for FuncSrc creates a listener and accepts a remote connection:

func main() {

        nodeid := flag.Int("nodeid", 0, "base for node ids")
        flag.Parse()
        fgbase.NodeID = int64(*nodeid)

        fgbase.TraceLevel = fgbase.V

        ln, err := net.Listen("tcp", "localhost:37777")
        if err != nil {
                flowgraph.StderrLog.Printf("%v\n", err)
                return
        }
        conn, err := ln.Accept()
        if err != nil {
                fgbase.StderrLog.Printf("%v\n", err)
                return
        }

        e,n := fgbase.MakeGraph(1,2)

        n[0] = fgbase.FuncSrc(e[0], conn)
        n[1] = tbo(e[0])

        fgbase.RunAll(n, 2*time.Second)

}

The test bench for FuncDst reaches out to establish a remote connection:

func main() {

        nodeid := flag.Int("nodeid", 0, "base for node ids")
        flag.Parse()
        fgbase.NodeID = int64(*nodeid)

        fgbase.TraceLevel = flowgraph.V

        time.Sleep(1*time.Second)
        conn, err := net.Dial("tcp", "localhost:37777")
        if err != nil {
                fgbase.StderrLog.Printf("%v\n", err)
                return
        }

        e,n := fgbase.MakeGraph(1,2)

        n[0] = tbi(e[0])
        n[1] = fgbase.FuncDst(e[0], conn)

        fgbase.RunAll(n, 2*time.Second)

}

For the full example see the tbsrcdst Makefile target in http://github.com/vectaport/fgbase_test.

The above describes a non-buffered remote connection. To implement a buffered remote connection it became necessary to extend Edge with Src() and Dst() methods that create dangling edges hooked up to a "hostname:portum" socket using buffered data and ack channels. Src() listens for incoming connections on the host port, Dst() connects to a remote host port. json.Marshal and json.Unmarshal are used to transmit any empty interface over the net.

For an example see the tbsrcdst2 Makefile target in http://github.com/vectaport/fgbase_test.

Clone this wiki locally