Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 24 additions & 62 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

const path = require('path');
const net = require('net');
const readline = require('readline');
const {EventEmitter} = require('events');
const _ = require('lodash');

class LightningClient {
class LightningClient extends EventEmitter {
constructor(rpcPath) {
if (!path.isAbsolute(rpcPath)) {
throw new Error('The rpcPath must be an absolute path');
Expand All @@ -14,6 +16,7 @@ class LightningClient {

console.log(`Connecting to ${rpcPath}`);

super();
this.rpcPath = rpcPath;
this.reconnectWait = 0.5;
this.reconnectTimeout = null;
Expand Down Expand Up @@ -41,55 +44,16 @@ class LightningClient {
});
});

this.waitingFor = {};

this.client.on('data', data => {
_.each(LightningClient.splitJSON(data.toString()), str => {
let dataObject = {};
try {
dataObject = JSON.parse(str);
} catch (err) {
return;
}

if (!_.isFunction(_self.waitingFor[dataObject.id])) {
return;
}

_self.waitingFor[dataObject.id].call(_self, dataObject);
delete _self.waitingFor[dataObject.id];
});
});
}

static splitJSON(str) {
const parts = [];

let openCount = 0;
let lastSplit = 0;

for (let i = 0; i < str.length; i++) {
if (i > 0 && str.charCodeAt(i - 1) === 115) { // 115 => backslash, ignore this character
continue;
readline.createInterface({input: this.client}).on('line', str => {
let data;
try {
data = JSON.parse(str);
} catch (err) {
return _self.emit('error', 'Invalid JSON: ' + str);
}

if (str[i] === '{') {
openCount++;
} else if (str[i] === '}') {
openCount--;

if (openCount === 0) {
const start = lastSplit;
const end = i + 1 === str.length ? undefined : i + 1;

parts.push(str.slice(start, end));

lastSplit = end;
}
}
}

return parts.length === 0 ? [str] : parts;
_self.emit('res:' + data.id, data);
});
}

increaseWaitTime() {
Expand Down Expand Up @@ -131,22 +95,20 @@ class LightningClient {

// Wait for the client to connect
return this.clientConnectionPromise
.then(() => {
.then(() => new Promise((resolve, reject) => {
// Wait for a response
return new Promise((resolve, reject) => {
this.waitingFor[callInt] = response => {
if (_.isNil(response.error)) {
resolve(response.result);
return;
}

reject(new Error(response.error));
};

// Send the command
_self.client.write(JSON.stringify(sendObj));
this.once('res:' + callInt, response => {
if (_.isNil(response.error)) {
resolve(response.result);
return;
}

reject(new Error(response.error));
});
});

// Send the command
_self.client.write(JSON.stringify(sendObj));
}));
}

devBlockheight() {
Expand Down