Skip to content

Commit bbcdc95

Browse files
committed
Merge pull request #60 from cbusbey/initiator_fixes
fixes for #58 #59
2 parents 21ab3d0 + bea1d3f commit bbcdc95

File tree

6 files changed

+106
-70
lines changed

6 files changed

+106
-70
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ lint:
2323
golint .
2424

2525
test:
26-
go test -v . ./datadictionary
26+
go test -v -cover . ./datadictionary
2727

2828
_build_all:
2929
go build -v ./...

acceptor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (a *Acceptor) Stop() {
5656
_ = recover() // suppress sending on closed channel error
5757
}()
5858
for _, channel := range a.quitChans {
59-
channel <- true
59+
close(channel)
6060
}
6161
}
6262

connection.go

Lines changed: 34 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2,45 +2,43 @@ package quickfix
22

33
import (
44
"bufio"
5+
"io"
56
"net"
7+
"time"
68
)
79

810
//Picks up session from net.Conn Initiator
9-
func handleInitiatorConnection(netConn net.Conn, log Log, sessID SessionID, quit chan bool) {
10-
defer func() {
11-
if err := recover(); err != nil {
12-
log.OnEventf("Connection Terminated: %v", err)
13-
}
14-
15-
netConn.Close()
16-
}()
17-
11+
func handleInitiatorConnection(address string, log Log, sessID SessionID, quit chan bool) {
12+
reconnectInterval := 30 * time.Second
1813
session := activate(sessID)
1914
if session == nil {
2015
log.OnEventf("Session not found for SessionID: %v", sessID)
2116
return
2217
}
23-
defer func() {
24-
deactivate(sessID)
25-
}()
2618

27-
var msgOut chan []byte
28-
var err error
29-
if msgOut, err = session.initiate(); err != nil {
30-
log.OnEventf("Session cannot initiate: %v", err)
31-
return
32-
}
19+
defer deactivate(sessID)
3320

34-
reader := bufio.NewReader(netConn)
35-
parser := newParser(reader)
21+
for {
22+
msgIn := make(chan fixIn)
23+
msgOut := make(chan []byte)
3624

37-
msgIn := make(chan fixIn)
38-
go writeLoop(netConn, msgOut)
39-
go func() {
40-
readLoop(parser, msgIn)
41-
}()
25+
netConn, err := net.Dial("tcp", address)
26+
if err != nil {
27+
goto reconnect
28+
}
4229

43-
session.run(msgIn, quit)
30+
go readLoop(newParser(bufio.NewReader(netConn)), msgIn)
31+
go func() {
32+
writeLoop(netConn, msgOut)
33+
netConn.Close()
34+
}()
35+
session.initiate(msgIn, msgOut, quit)
36+
37+
reconnect:
38+
log.OnEventf("%v Reconnecting in %v", sessID, reconnectInterval)
39+
time.Sleep(reconnectInterval)
40+
continue
41+
}
4442
}
4543

4644
//Picks up session from net.Conn Acceptor
@@ -96,32 +94,25 @@ func handleAcceptorConnection(netConn net.Conn, qualifiedSessionIDs map[SessionI
9694
deactivate(qualifiedSessID)
9795
}()
9896

99-
var msgOut chan []byte
100-
if msgOut, err = session.accept(); err != nil {
101-
log.OnEventf("Session cannot accept: %v", err)
102-
return
103-
}
104-
10597
msgIn := make(chan fixIn)
106-
go writeLoop(netConn, msgOut)
98+
msgOut := make(chan []byte)
99+
107100
go func() {
108101
msgIn <- fixIn{msgBytes, parser.lastRead}
109102
readLoop(parser, msgIn)
110103
}()
111104

112-
session.run(msgIn, quit)
105+
go session.accept(msgIn, msgOut, quit)
106+
writeLoop(netConn, msgOut)
113107
}
114108

115-
func writeLoop(connection net.Conn, messageOut chan []byte) {
116-
defer func() {
117-
close(messageOut)
118-
}()
119-
120-
var msg []byte
109+
func writeLoop(connection io.Writer, messageOut chan []byte) {
121110
for {
122-
if msg = <-messageOut; msg == nil {
111+
msg, ok := <-messageOut
112+
if !ok {
123113
return
124114
}
115+
125116
connection.Write(msg)
126117
}
127118
}
@@ -133,17 +124,9 @@ func readLoop(parser *parser, msgIn chan fixIn) {
133124

134125
for {
135126
msg, err := parser.ReadMessage()
136-
137127
if err != nil {
138-
switch err.(type) {
139-
//ignore message parser errors
140-
case parseError:
141-
continue
142-
default:
143-
return
144-
}
145-
} else {
146-
msgIn <- fixIn{msg, parser.lastRead}
128+
return
147129
}
130+
msgIn <- fixIn{msg, parser.lastRead}
148131
}
149132
}

connection_internal_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package quickfix
2+
3+
import (
4+
"bytes"
5+
"strings"
6+
"testing"
7+
)
8+
9+
func TestWriteLoop(t *testing.T) {
10+
writer := bytes.NewBufferString("")
11+
msgOut := make(chan []byte)
12+
13+
go func() {
14+
msgOut <- []byte("test msg 1 ")
15+
msgOut <- []byte("test msg 2 ")
16+
msgOut <- []byte("test msg 3")
17+
close(msgOut)
18+
}()
19+
writeLoop(writer, msgOut)
20+
21+
expected := "test msg 1 test msg 2 test msg 3"
22+
23+
if writer.String() != expected {
24+
t.Errorf("expected %v got %v", expected, writer.String())
25+
}
26+
}
27+
28+
func TestReadLoop(t *testing.T) {
29+
msgIn := make(chan fixIn)
30+
stream := "hello8=FIX.4.09=5blah10=103garbage8=FIX.4.09=4foo10=103"
31+
32+
parser := newParser(strings.NewReader(stream))
33+
go readLoop(parser, msgIn)
34+
35+
var tests = []struct {
36+
expectedMsg string
37+
channelClosed bool
38+
}{
39+
{expectedMsg: "8=FIX.4.09=5blah10=103"},
40+
{expectedMsg: "8=FIX.4.09=4foo10=103"},
41+
{channelClosed: true},
42+
}
43+
44+
for _, test := range tests {
45+
msg, ok := <-msgIn
46+
switch {
47+
case !ok && !test.channelClosed:
48+
t.Error("Channel unexpectedly closed")
49+
fallthrough
50+
case !ok && test.channelClosed:
51+
continue
52+
}
53+
54+
if string(msg.bytes) != test.expectedMsg {
55+
t.Errorf("Expected %v got %v", test.expectedMsg, string(msg.bytes))
56+
}
57+
}
58+
}

initiator.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package quickfix
33
import (
44
"fmt"
55
"github.com/quickfixgo/quickfix/config"
6-
"net"
76
)
87

98
//Initiator initiates connections and processes messages for all sessions.
@@ -31,13 +30,9 @@ func (i *Initiator) Start() error {
3130
return fmt.Errorf("error on SocketConnectPort: %v", err)
3231
}
3332

34-
conn, err := net.Dial("tcp", fmt.Sprintf("%v:%v", socketConnectHost, socketConnectPort))
35-
if err != nil {
36-
return err
37-
}
38-
3933
i.quitChans[sessionID] = make(chan bool)
40-
go handleInitiatorConnection(conn, i.globalLog, sessionID, i.quitChans[sessionID])
34+
address := fmt.Sprintf("%v:%v", socketConnectHost, socketConnectPort)
35+
go handleInitiatorConnection(address, i.globalLog, sessionID, i.quitChans[sessionID])
4136
}
4237

4338
return nil
@@ -49,7 +44,7 @@ func (i *Initiator) Stop() {
4944
_ = recover() // suppress sending on closed channel error
5045
}()
5146
for _, channel := range i.quitChans {
52-
channel <- true
47+
close(channel)
5348
}
5449
}
5550

session.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -107,21 +107,21 @@ func createSession(sessionID SessionID, storeFactory MessageStoreFactory, settin
107107
return nil
108108
}
109109

110-
func (s *Session) initiate() (chan []byte, error) {
110+
//kicks off session as an initiator
111+
func (s *Session) initiate(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
111112
s.currentState = logonState{}
112-
s.messageOut = make(chan []byte)
113113
s.messageStash = make(map[int]Message)
114114
s.initiateLogon = true
115115

116-
return s.messageOut, nil
116+
s.run(msgIn, msgOut, quit)
117117
}
118118

119-
func (s *Session) accept() (chan []byte, error) {
119+
//kicks off session as an acceptor
120+
func (s *Session) accept(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
120121
s.currentState = logonState{}
121-
s.messageOut = make(chan []byte)
122122
s.messageStash = make(map[int]Message)
123123

124-
return s.messageOut, nil
124+
s.run(msgIn, msgOut, quit)
125125
}
126126

127127
func (s *Session) onDisconnect() {
@@ -467,10 +467,10 @@ type fixIn struct {
467467
receiveTime time.Time
468468
}
469469

470-
func (s *Session) run(msgIn chan fixIn, quit chan bool) {
470+
func (s *Session) run(msgIn chan fixIn, msgOut chan []byte, quit chan bool) {
471+
s.messageOut = msgOut
471472
defer func() {
472-
close(quit)
473-
s.messageOut <- nil
473+
close(s.messageOut)
474474
s.onDisconnect()
475475
}()
476476

0 commit comments

Comments
 (0)