Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions lib/client/tcp-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,33 @@ TcpClient.prototype.connect = function(callback) {
self.responseBuffer += data.toString();
if (self.responseBuffer.substring(self.responseBuffer.length - 2, self.responseBuffer.length) == FS + CR) {
var ack = self.parser.parse(self.responseBuffer.substring(1, self.responseBuffer.length - 2));
self.callback(null, ack);
self.responseBuffer = "";
self.awaitingResponse = false;
if (!self.keepalive) {
self.close();
}
self.callback(null, ack);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't allow another message to be queued up right after the first one completed

}
});
callback();
});
self.client.on('close', function() {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On server side close, it would not clean up

self.responseBuffer = "";
self.awaitingResponse = false;
delete self.client;
})
self.client.on('error', function(err) {
callback(err);
})
}

TcpClient.prototype.send = function(msg, callback) {
var self = this;
self.callback = callback || self.callback;
if (self.awaitingResponse) {
self.callback(new Error("Can't send while awaiting response"));
callback(new Error("Can't send while awaiting response"));
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaced the callback, so that the first message callback didn't receive the ack, but the second one did

return;
}
self.callback = callback || self.callback;
self.awaitingResponse = true;
try {
if (self.client) {
Expand Down
33 changes: 29 additions & 4 deletions lib/server/tcp-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ function TcpServer(options, handler) {

this.handler = handler;
this.server = null;
this.socket = null;
this.sockets = [];
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added socket tracking

this.parser = options.parser || new Parser();
}

Expand All @@ -43,17 +43,24 @@ function Req(msg, raw) {
function Res(socket, ack) {
this.ack = ack;

this.socket = socket;
this.end = function() {
socket.write(VT + (this.ack).toString() + FS + CR);
}
}

TcpServer.prototype.start = function(port, encoding, options) {
TcpServer.prototype.start = function(port, encoding) {
var self = this;
options = options || {}
this.server = net.createServer(function(socket) {
var message = "";

self.sockets.push(socket);
socket.on('close', function () {
const index = self.sockets.indexOf(socket);
if (index >= 0) {
self.sockets.splice(index, 1);
}
});
socket.on('data', function(data) {
try {
message += data.toString();
Expand All @@ -79,7 +86,25 @@ TcpServer.prototype.start = function(port, encoding, options) {
this.server.listen(port);
}

TcpServer.prototype.stop = function() {
TcpServer.prototype.stop = function (closeConnectionsCallback) {
var self = this;
if (closeConnectionsCallback) {
var promises = [];
this.sockets.forEach(function (socket) {
promises.push(
new Promise(function (resolve) {
socket.end(resolve);
}),
);
});
Promise.all(promises).then(function () {
self.server.close(function () {
self.sockets = [];
closeConnectionsCallback();
});
});
return;
}
this.server.close();
}

Expand Down
150 changes: 100 additions & 50 deletions test/test-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ describe('FileServer', function() {
var fileServer;
describe('.start()', function() {
this.timeout(10000);
fs.mkdirSync('test/import');
it('should start the file server listening on a folder, and emit event on new file', function(done) {

var hl7TestMessage = fs.readFileSync('test/samples/adt.hl7').toString().replace(/\r?\n/g, "\r");
Expand All @@ -28,11 +29,7 @@ describe('FileServer', function() {
});

fileServer.start('test/import');

setTimeout(function() {
fs.writeFileSync('test/import/adt.hl7', hl7TestMessage);
}, 1000);

fs.writeFileSync('test/import/adt.hl7', hl7TestMessage);
});
});

Expand Down Expand Up @@ -61,12 +58,10 @@ describe('FileClient', function() {
fileClient.send(msg, function(err) {
assert(!err);

setTimeout(function() {
assert.equal(fs.statSync(path.join('./test/export', newMessageName)).isFile(), true);
fs.unlinkSync(path.join('./test/export', newMessageName));
fs.rmdirSync('./test/export');
done();
}, 5000)
assert.equal(fs.statSync(path.join('./test/export', newMessageName)).isFile(), true);
fs.unlinkSync(path.join('./test/export', newMessageName));
fs.rmdirSync('./test/export');
done();
});
});
});
Expand All @@ -86,62 +81,77 @@ describe('TcpServer', function() {

tcpServer.start(8686);

setTimeout(function() {
var tcpClient = server.createTcpClient({ host: '127.0.0.1', port: 8686 });
var tcpClient = server.createTcpClient({ host: '127.0.0.1', port: 8686 });

tcpClient.send(adt, function(err, ack) {
console.log(ack.log());
assert.equal(ack.header.getHeaderField(11), adt.header.getHeaderField(11))
assert.equal(ack.header.getHeaderField(12), adt.header.getHeaderField(12))
done();
});
}, 1000);
tcpClient.send(adt, function(err, ack) {
console.log(ack.log());
assert.equal(ack.header.getHeaderField(11), adt.header.getHeaderField(11))
assert.equal(ack.header.getHeaderField(12), adt.header.getHeaderField(12))
done();
});

});
it('should work correctly if message sent as 2 parts', function(done) {
it('should work if 2 messages sent in a row, first message successfully sent, second message blocked', function(done) {
var parser = new hl7.Parser();
var adt = parser.parse(fs.readFileSync('test/samples/adt.hl7').toString());

setTimeout(function() {
var rawTcpClient = net.connect({host: '127.0.0.1', port: 8686});

rawTcpClient.on('data', function(data) {
rawTcpClient.end();
done();
var tcpClient = server.createTcpClient({ host: '127.0.0.1', port: 8686 });
var promise = new Promise(function(resolve) {
tcpClient.send(adt, function(err, ack) {
resolve({err, ack});
});
});
var promise2 = new Promise(function(resolve) {
tcpClient.send(adt, function(err, ack) {
resolve({err, ack});
});
});
Promise.all([promise, promise2])
.then(function(results) {
assert(results[0].ack);
assert(results[1].err);
done();
});
})
it('should work correctly if message sent as 2 parts', function(done) {
var parser = new hl7.Parser();
var adt = parser.parse(fs.readFileSync('test/samples/adt.hl7').toString());
var rawTcpClient = net.connect({host: '127.0.0.1', port: 8686});

rawTcpClient.on('data', function(data) {
rawTcpClient.end();
done();
});

var part1 = adt.toString().substring(0, 10);
var part2 = adt.toString().substring(10, adt.toString().length);
var part1 = adt.toString().substring(0, 10);
var part2 = adt.toString().substring(10, adt.toString().length);

rawTcpClient.write(VT + part1);
setTimeout(function() {
rawTcpClient.write(part2 + FS + CR);
}, 2000)
}, 1000);
rawTcpClient.write(VT + part1);
rawTcpClient.write(part2 + FS + CR);
});
it('should keep connection open and still work', function(done) {
var parser = new hl7.Parser();
var adt = parser.parse(fs.readFileSync('test/samples/adt.hl7').toString());

setTimeout(function() {
var tcpClient = server.createTcpClient({ host: '127.0.0.1', port: 8686, keepalive: true });

tcpClient.send(adt, function(ack) {
setTimeout(function() {
setTimeout(function() {
tcpClient.send(adt, function(ack) {
tcpClient.close();
done();
});
}, 1000);
var tcpClient = server.createTcpClient({ host: '127.0.0.1', port: 8686, keepalive: true });

tcpClient.send(adt, function(err) {
if (err) {
tcpClient.close();
return done(err);
}
setTimeout(function() {
tcpClient.send(adt, function(err) {
tcpClient.close();
done(err);
});
});
}, 1000);
}, 1000);
});
});
it('should handle no server', function(done) {
var parser = new hl7.Parser();
var adt = parser.parse(fs.readFileSync('test/samples/adt.hl7').toString());
var tcpClient = server.createTcpClient({ host: '127.0.0.1', port: 9999, keepalive: true });
var tcpClient = server.createTcpClient({ host: '127.0.0.1', port: 9999, keepalive: true });
tcpClient.send(adt, function(err) {
console.log('here is the error ' + err.message);
done();
Expand All @@ -150,8 +160,48 @@ describe('TcpServer', function() {
});

describe('.stop()', function() {
it('should stop the tcp server', function() {
tcpServer.stop()
it('should stop the tcp server', function(done) {
var parser = new hl7.Parser();
var adt = parser.parse(fs.readFileSync('test/samples/adt.hl7').toString());
var tcpClient1 = server.createTcpClient({ host: '127.0.0.1', port: 8686, keepalive: true });
tcpClient1.send(adt, function(err) {
if (err)
return done(err);

tcpServer.stop();
setTimeout(function() {
var tcpClient2 = server.createTcpClient({ host: '127.0.0.1', port: 8686, keepalive: true });
tcpClient1.send(adt, function(err) { // shows that open connections are still open
if (err)
return done(err);
tcpClient2.send(adt, function(err, ack) { // shows that new connections cannot open
assert.equal(err.code, 'ECONNREFUSED');
done();
});
});
});
});
});
it('should stop the tcp server and close all connections', function(done) {
var parser = new hl7.Parser();
var adt = parser.parse(fs.readFileSync('test/samples/adt.hl7').toString());
var tcpClient1 = server.createTcpClient({ host: '127.0.0.1', port: 8686, keepalive: true });
tcpServer.start(8686);
tcpClient1.send(adt, function(err) {
if (err)
return done(err);

tcpServer.stop(function () {
setTimeout(function() { // wait for close to finish on the client side
assert(!tcpClient1.client); // on close TcpClient.client is deleted
var tcpClient2 = server.createTcpClient({ host: '127.0.0.1', port: 8686, keepalive: true });
tcpClient2.send(adt, function(err, ack) { // shows that new connections cannot open
assert.equal(err.code, 'ECONNREFUSED');
done();
});
}, 100);
});
});
});
});
});
Expand Down