Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion acceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (a *Acceptor) handleConnection(netConn net.Conn) {
}

a.sessionAddr.Store(sessID, netConn.RemoteAddr())
msgIn := make(chan fixIn)
msgIn := make(chan fixIn, session.InChanCapacity)
msgOut := make(chan []byte)

if err := session.connect(msgIn, msgOut); err != nil {
Expand Down
10 changes: 10 additions & 0 deletions config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,16 @@ const (
// Valid Values:
// - Any positive integer
MaxLatency string = "MaxLatency"

// InChanCapacity sets the maximum number of messages that can be buffered in the channel for incoming FIX messages.
//
// Required: No
//
// Default: 1
//
// Valid Values:
// - A positive integer, or zero for an unbuffered channel
InChanCapacity string = "InChanCapacity"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion initiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, di
netConn = tlsConn
}

msgIn = make(chan fixIn)
msgIn = make(chan fixIn, session.InChanCapacity)
msgOut = make(chan []byte)
if err := session.connect(msgIn, msgOut); err != nil {
session.log.OnEventf("Failed to initiate: %v", err)
Expand Down
1 change: 1 addition & 0 deletions internal/session_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type SessionSettings struct {
TimeZone *time.Location
ResetSeqTime time.Time
EnableResetSeqTime bool
InChanCapacity int

// Required on logon for FIX.T.1 messages.
DefaultApplVerID string
Expand Down
18 changes: 18 additions & 0 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,21 @@ func (s *session) checkBeginString(msg *Message) MessageRejectError {
return nil
}

func (s *session) drainMessageIn() {
s.log.OnEventf("Draining %d messages from inbound channel...", len(s.messageIn))
for {
select {
case fixInc, ok := <-s.messageIn:
if !ok {
return
}
s.Incoming(s, fixInc)
default:
return
}
}
}

func (s *session) doReject(msg *Message, rej MessageRejectError) error {
reply := msg.reverseRoute()

Expand Down Expand Up @@ -824,6 +839,9 @@ func (s *session) onDisconnect() {
s.messageOut = nil
}

// s.messageIn is buffered so we need to drain it before disconnection
s.drainMessageIn()

s.messageIn = nil
}

Expand Down
12 changes: 12 additions & 0 deletions session_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,18 @@ func (f sessionFactory) newSession(
s.DisableMessagePersist = !persistMessages
}

if settings.HasSetting(config.InChanCapacity) {
if s.InChanCapacity, err = settings.IntSetting(config.InChanCapacity); err != nil {
return
} else if s.InChanCapacity < 0 {
err = IncorrectFormatForSetting{Setting: config.InChanCapacity, Value: []byte(strconv.Itoa(s.InChanCapacity))}
return
}
} else {
// Default to 1 buffered message per channel
s.InChanCapacity = 1
}

if f.BuildInitiators {
if err = f.buildInitiatorSettings(s, settings); err != nil {
return
Expand Down
Loading