From d992c3263ad893cec026548fc3729f0fd4e34668 Mon Sep 17 00:00:00 2001 From: Jason Suttles Date: Fri, 21 Mar 2025 19:03:01 -0400 Subject: [PATCH 1/2] refactor TCP client and server to manage connections and responses more effectively --- lib/client/tcp-client.js | 12 +++- lib/server/tcp-server.js | 33 +++++++-- test/test-server.js | 150 ++++++++++++++++++++++++++------------- 3 files changed, 138 insertions(+), 57 deletions(-) diff --git a/lib/client/tcp-client.js b/lib/client/tcp-client.js index da1c86b..1c4f62a 100644 --- a/lib/client/tcp-client.js +++ b/lib/client/tcp-client.js @@ -30,16 +30,21 @@ TcpClient.prototype.connect = function(callback) { self.responseBuffer += data.toString(); if (self.responseBuffer.substring(self.responseBuffer.length - 2, self.responseBuffer.length) == FS + CR) { var ack = self.parser.parse(self.responseBuffer.substring(1, self.responseBuffer.length - 2)); - self.callback(null, ack); self.responseBuffer = ""; self.awaitingResponse = false; if (!self.keepalive) { self.close(); } + self.callback(null, ack); } }); callback(); }); + self.client.on('close', function() { + self.responseBuffer = ""; + self.awaitingResponse = false; + delete self.client; + }) self.client.on('error', function(err) { callback(err); }) @@ -47,10 +52,11 @@ TcpClient.prototype.connect = function(callback) { TcpClient.prototype.send = function(msg, callback) { var self = this; - self.callback = callback || self.callback; if (self.awaitingResponse) { - self.callback(new Error("Can't send while awaiting response")); + callback(new Error("Can't send while awaiting response")); + return; } + self.callback = callback || self.callback; self.awaitingResponse = true; try { if (self.client) { diff --git a/lib/server/tcp-server.js b/lib/server/tcp-server.js index b9e709e..8e6ea6d 100644 --- a/lib/server/tcp-server.js +++ b/lib/server/tcp-server.js @@ -19,7 +19,7 @@ function TcpServer(options, handler) { this.handler = handler; this.server = null; - this.socket = null; + this.sockets = []; this.parser = options.parser || new Parser(); } @@ -43,17 +43,24 @@ function Req(msg, raw) { function Res(socket, ack) { this.ack = ack; + this.socket = socket; this.end = function() { socket.write(VT + (this.ack).toString() + FS + CR); } } -TcpServer.prototype.start = function(port, encoding, options) { +TcpServer.prototype.start = function(port, encoding) { var self = this; - options = options || {} this.server = net.createServer(function(socket) { var message = ""; + self.sockets.push(socket); + socket.on('close', function () { + const index = self.sockets.indexOf(socket); + if (index >= 0) { + self.sockets.splice(index, 1); + } + }); socket.on('data', function(data) { try { message += data.toString(); @@ -79,7 +86,25 @@ TcpServer.prototype.start = function(port, encoding, options) { this.server.listen(port); } -TcpServer.prototype.stop = function() { +TcpServer.prototype.stop = function (closeConnectionsCallback) { + var self = this; + if (closeConnectionsCallback) { + var promises = []; + this.sockets.forEach(function (socket) { + promises.push( + new Promise(function (resolve) { + socket.end(resolve); + }), + ); + }); + Promise.all(promises).then(function () { + self.server.close(function () { + self.sockets = []; + closeConnectionsCallback(); + }); + }); + return; + } this.server.close(); } diff --git a/test/test-server.js b/test/test-server.js index 89aadd1..4476bd7 100644 --- a/test/test-server.js +++ b/test/test-server.js @@ -14,6 +14,7 @@ describe('FileServer', function() { var fileServer; describe('.start()', function() { this.timeout(10000); + fs.mkdirSync('test/import'); it('should start the file server listening on a folder, and emit event on new file', function(done) { var hl7TestMessage = fs.readFileSync('test/samples/adt.hl7').toString().replace(/\r?\n/g, "\r"); @@ -28,11 +29,7 @@ describe('FileServer', function() { }); fileServer.start('test/import'); - - setTimeout(function() { - fs.writeFileSync('test/import/adt.hl7', hl7TestMessage); - }, 1000); - + fs.writeFileSync('test/import/adt.hl7', hl7TestMessage); }); }); @@ -61,12 +58,10 @@ describe('FileClient', function() { fileClient.send(msg, function(err) { assert(!err); - setTimeout(function() { - assert.equal(fs.statSync(path.join('./test/export', newMessageName)).isFile(), true); - fs.unlinkSync(path.join('./test/export', newMessageName)); - fs.rmdirSync('./test/export'); - done(); - }, 5000) + assert.equal(fs.statSync(path.join('./test/export', newMessageName)).isFile(), true); + fs.unlinkSync(path.join('./test/export', newMessageName)); + fs.rmdirSync('./test/export'); + done(); }); }); }); @@ -86,62 +81,77 @@ describe('TcpServer', function() { tcpServer.start(8686); - setTimeout(function() { - var tcpClient = server.createTcpClient({ host: '127.0.0.1', port: 8686 }); + var tcpClient = server.createTcpClient({ host: '127.0.0.1', port: 8686 }); - tcpClient.send(adt, function(err, ack) { - console.log(ack.log()); - assert.equal(ack.header.getHeaderField(11), adt.header.getHeaderField(11)) - assert.equal(ack.header.getHeaderField(12), adt.header.getHeaderField(12)) - done(); - }); - }, 1000); + tcpClient.send(adt, function(err, ack) { + console.log(ack.log()); + assert.equal(ack.header.getHeaderField(11), adt.header.getHeaderField(11)) + assert.equal(ack.header.getHeaderField(12), adt.header.getHeaderField(12)) + done(); + }); }); - it('should work correctly if message sent as 2 parts', function(done) { + it('should work if 2 messages sent in a row, first message successfully sent, second message blocked', function(done) { var parser = new hl7.Parser(); var adt = parser.parse(fs.readFileSync('test/samples/adt.hl7').toString()); - setTimeout(function() { - var rawTcpClient = net.connect({host: '127.0.0.1', port: 8686}); - - rawTcpClient.on('data', function(data) { - rawTcpClient.end(); - done(); + var tcpClient = server.createTcpClient({ host: '127.0.0.1', port: 8686 }); + var promise = new Promise(function(resolve) { + tcpClient.send(adt, function(err, ack) { + resolve({err, ack}); }); + }); + var promise2 = new Promise(function(resolve) { + tcpClient.send(adt, function(err, ack) { + resolve({err, ack}); + }); + }); + Promise.all([promise, promise2]) + .then(function(results) { + assert(results[0].ack); + assert(results[1].err); + done(); + }); + }) + it('should work correctly if message sent as 2 parts', function(done) { + var parser = new hl7.Parser(); + var adt = parser.parse(fs.readFileSync('test/samples/adt.hl7').toString()); + var rawTcpClient = net.connect({host: '127.0.0.1', port: 8686}); + + rawTcpClient.on('data', function(data) { + rawTcpClient.end(); + done(); + }); - var part1 = adt.toString().substring(0, 10); - var part2 = adt.toString().substring(10, adt.toString().length); + var part1 = adt.toString().substring(0, 10); + var part2 = adt.toString().substring(10, adt.toString().length); - rawTcpClient.write(VT + part1); - setTimeout(function() { - rawTcpClient.write(part2 + FS + CR); - }, 2000) - }, 1000); + rawTcpClient.write(VT + part1); + rawTcpClient.write(part2 + FS + CR); }); it('should keep connection open and still work', function(done) { var parser = new hl7.Parser(); var adt = parser.parse(fs.readFileSync('test/samples/adt.hl7').toString()); - setTimeout(function() { - var tcpClient = server.createTcpClient({ host: '127.0.0.1', port: 8686, keepalive: true }); - - tcpClient.send(adt, function(ack) { - setTimeout(function() { - setTimeout(function() { - tcpClient.send(adt, function(ack) { - tcpClient.close(); - done(); - }); - }, 1000); + var tcpClient = server.createTcpClient({ host: '127.0.0.1', port: 8686, keepalive: true }); + + tcpClient.send(adt, function(err) { + if (err) { + tcpClient.close(); + return done(err); + } + setTimeout(function() { + tcpClient.send(adt, function(err) { + tcpClient.close(); + done(err); }); - }); - }, 1000); + }, 1000); + }); }); it('should handle no server', function(done) { var parser = new hl7.Parser(); var adt = parser.parse(fs.readFileSync('test/samples/adt.hl7').toString()); - var tcpClient = server.createTcpClient({ host: '127.0.0.1', port: 9999, keepalive: true }); + var tcpClient = server.createTcpClient({ host: '127.0.0.1', port: 9999, keepalive: true }); tcpClient.send(adt, function(err) { console.log('here is the error ' + err.message); done(); @@ -150,8 +160,48 @@ describe('TcpServer', function() { }); describe('.stop()', function() { - it('should stop the tcp server', function() { - tcpServer.stop() + it('should stop the tcp server', function(done) { + var parser = new hl7.Parser(); + var adt = parser.parse(fs.readFileSync('test/samples/adt.hl7').toString()); + var tcpClient1 = server.createTcpClient({ host: '127.0.0.1', port: 8686, keepalive: true }); + tcpClient1.send(adt, function(err) { + if (err) + return done(err); + + tcpServer.stop(); + setTimeout(function() { + var tcpClient2 = server.createTcpClient({ host: '127.0.0.1', port: 8686, keepalive: true }); + tcpClient1.send(adt, function(err) { // shows that open connections are still open + if (err) + return done(err); + tcpClient2.send(adt, function(err, ack) { // shows that new connections cannot open + assert.equal(err.code, 'ECONNREFUSED'); + done(); + }); + }); + }); + }); + }); + it('should stop the tcp server and close all connections', function(done) { + var parser = new hl7.Parser(); + var adt = parser.parse(fs.readFileSync('test/samples/adt.hl7').toString()); + var tcpClient1 = server.createTcpClient({ host: '127.0.0.1', port: 8686, keepalive: true }); + tcpServer.start(8686); + tcpClient1.send(adt, function(err) { + if (err) + return done(err); + + tcpServer.stop(function () { + setTimeout(function() { // wait for close to finish on the client side + assert(!tcpClient1.client); // on close TcpClient.client is deleted + var tcpClient2 = server.createTcpClient({ host: '127.0.0.1', port: 8686, keepalive: true }); + tcpClient2.send(adt, function(err, ack) { // shows that new connections cannot open + assert.equal(err.code, 'ECONNREFUSED'); + done(); + }); + }, 100); + }); + }); }); }); }); From f8a087b0058347372dc2d0d880bfc4ecb5977d98 Mon Sep 17 00:00:00 2001 From: Jason Suttles Date: Fri, 21 Mar 2025 19:06:17 -0400 Subject: [PATCH 2/2] return tests to how they were --- test/test-server.js | 1 - 1 file changed, 1 deletion(-) diff --git a/test/test-server.js b/test/test-server.js index 4476bd7..c0145a0 100644 --- a/test/test-server.js +++ b/test/test-server.js @@ -14,7 +14,6 @@ describe('FileServer', function() { var fileServer; describe('.start()', function() { this.timeout(10000); - fs.mkdirSync('test/import'); it('should start the file server listening on a folder, and emit event on new file', function(done) { var hl7TestMessage = fs.readFileSync('test/samples/adt.hl7').toString().replace(/\r?\n/g, "\r");