Skip to content

Commit 10b23db

Browse files
Merge pull request #56 from kaleido-io/readahead
Update event queue to support readahead to FireFly core
2 parents adeab1f + 9bb8118 commit 10b23db

File tree

13 files changed

+815
-576
lines changed

13 files changed

+815
-576
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,9 @@ This will make it possible for the organizations to establish MTLS communication
137137
|message-delivered| Emitted to the sender when a message has been delivered | recipient, message, requestId (optional)
138138
|message-failed | Emitted to the sender when a message could not be delivered| recipient, message, requestId (optional)
139139

140-
- After receiving a websocket message, a commit must be sent in order to receive the next one:
140+
- After receiving a websocket message, an ack must be sent ("commit" is a synonym for "ack"):
141141
```
142-
{ "action": "commit" }
142+
{ "action": "ack", "id": "<ID_FROM_EVENT>" }
143143
```
144144
- Messages arrive in the same order they were sent
145145
- Up to 1,000 messages will be queued

package-lock.json

Lines changed: 438 additions & 498 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,40 +20,40 @@
2020
"author": "",
2121
"license": "Apache-2.0",
2222
"dependencies": {
23-
"ajv": "^8.8.2",
24-
"axios": "^0.24.0",
25-
"busboy": "^0.3.1",
26-
"express": "^4.17.2",
23+
"ajv": "^8.11.0",
24+
"axios": "^0.26.1",
25+
"busboy": "^1.5.0",
26+
"express": "^4.17.3",
2727
"form-data": "^4.0.0",
28-
"jsrsasign": "^10.5.1",
28+
"jsrsasign": "^10.5.14",
2929
"swagger-ui-express": "^4.3.0",
3030
"uuid": "^8.3.2",
31-
"ws": "^8.4.0",
31+
"ws": "^8.5.0",
3232
"yamljs": "^0.3.0"
3333
},
3434
"devDependencies": {
3535
"@types/bunyan": "^1.8.8",
36-
"@types/busboy": "^0.3.1",
36+
"@types/busboy": "^1.5.0",
3737
"@types/chai": "^4.3.0",
3838
"@types/express": "^4.17.13",
39-
"@types/jsrsasign": "^9.0.3",
40-
"@types/mocha": "^9.0.0",
41-
"@types/node": "^17.0.8",
39+
"@types/jsrsasign": "^10.2.1",
40+
"@types/mocha": "^9.1.0",
41+
"@types/node": "^17.0.23",
4242
"@types/swagger-ui-express": "^4.1.3",
43-
"@types/uuid": "^8.3.3",
44-
"@types/ws": "^8.2.2",
43+
"@types/uuid": "^8.3.4",
44+
"@types/ws": "^8.5.3",
4545
"@types/yamljs": "^0.2.31",
46-
"chai": "^4.3.4",
47-
"mocha": "^9.1.3",
46+
"chai": "^4.3.6",
47+
"mocha": "^9.2.2",
4848
"moment": "^2.29.1",
4949
"nyc": "^15.1.0",
5050
"rimraf": "^3.0.2",
51-
"sinon": "^12.0.1",
51+
"sinon": "^13.0.1",
5252
"sinon-chai": "^3.7.0",
5353
"source-map-support": "^0.5.21",
54-
"ts-node": "^10.4.0",
54+
"ts-node": "^10.7.0",
5555
"ts-sinon": "^2.0.2",
56-
"typescript": "^4.5.4"
56+
"typescript": "^4.6.3"
5757
},
5858
"nyc": {
5959
"extension": [

src/app.ts

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,16 @@ import path from 'path';
2121
import swaggerUi from 'swagger-ui-express';
2222
import WebSocket from 'ws';
2323
import YAML from 'yamljs';
24-
import { eventEmitter as blobsEventEmitter } from './handlers/blobs';
2524
import * as eventsHandler from './handlers/events';
26-
import { eventEmitter as messagesEventEmitter } from './handlers/messages';
2725
import { genTLSContext, init as initCert, loadPeerCAs } from './lib/cert';
2826
import { config, init as initConfig } from './lib/config';
27+
import { IAckEvent } from './lib/interfaces';
2928
import { Logger } from './lib/logger';
3029
import RequestError, { errorHandler } from './lib/request-error';
3130
import * as utils from './lib/utils';
3231
import { router as apiRouter, setRefreshCACerts } from './routers/api';
33-
import { eventEmitter as p2pEventEmitter, router as p2pRouter } from './routers/p2p';
32+
import { router as p2pRouter } from './routers/p2p';
33+
import { init as initEvents } from './handlers/events';
3434

3535
const log = new Logger("app.ts");
3636

@@ -51,6 +51,7 @@ setRefreshCACerts(refreshCACerts)
5151
export const start = async () => {
5252
await initConfig();
5353
await initCert();
54+
await initEvents(config);
5455

5556
const apiApp = express();
5657
apiServer = http.createServer(apiApp);
@@ -68,11 +69,7 @@ export const start = async () => {
6869
}
6970
});
7071

71-
p2pEventEmitter.addListener('event', event => eventsHandler.queueEvent(event));
72-
blobsEventEmitter.addListener('event', event => eventsHandler.queueEvent(event));
73-
messagesEventEmitter.addListener('event', event => eventsHandler.queueEvent(event));
74-
75-
eventsHandler.eventEmitter.addListener('event', event => {
72+
eventsHandler.getEmitter().addListener('event', event => {
7673
log.info(`Event emitted ${event.type}/${event.id}`)
7774
if (delegatedWebSocket !== undefined) {
7875
delegatedWebSocket.send(JSON.stringify(event));
@@ -82,21 +79,16 @@ export const start = async () => {
8279
const assignWebSocketDelegate = (webSocket: WebSocket) => {
8380
log.info('New WebSocket delegate assigned');
8481
delegatedWebSocket = webSocket;
85-
const event = eventsHandler.getCurrentEvent();
8682
webSocket.on('message', async message => {
8783
try {
8884
const messageContent = JSON.parse(message.toLocaleString());
89-
if (messageContent.action === 'commit') {
90-
log.info(`Event comitted ${event?`${event.type}/${event.id}`:`[no event in flight]`}`)
91-
eventsHandler.handleCommit();
85+
if (messageContent.action === 'ack' || messageContent.action == 'commit') {
86+
eventsHandler.handleAck(messageContent as IAckEvent);
9287
}
9388
} catch (err) {
9489
log.error(`Failed to process websocket message ${err}`);
9590
}
9691
});
97-
if (event !== undefined) {
98-
webSocket.send(JSON.stringify(event));
99-
}
10092
webSocket.on('close', () => {
10193
log.info('WebSocket delegate disconnected');
10294
const nextDelegatedWebSocket = wss.clients.values().next().value;
@@ -106,6 +98,8 @@ export const start = async () => {
10698
delegatedWebSocket = undefined;
10799
}
108100
});
101+
// Anything that's in-flight needs to be sent again
102+
eventsHandler.reDispatchInFlight();
109103
};
110104

111105
wss.on('connection', (webSocket: WebSocket) => {

src/handlers/blobs.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
// limitations under the License.
1616

1717
import crypto from 'crypto';
18-
import EventEmitter from 'events';
1918
import FormData from 'form-data';
2019
import { createReadStream, createWriteStream, promises as fs } from 'fs';
2120
import https from 'https';
@@ -27,12 +26,12 @@ import { BlobTask, IBlobDeliveredEvent, IBlobFailedEvent, IFile, IMetadata } fro
2726
import { Logger } from '../lib/logger';
2827
import RequestError from '../lib/request-error';
2928
import * as utils from '../lib/utils';
29+
import { queueEvent } from './events';
3030

3131
const log = new Logger("handlers/blobs.ts")
3232

3333
let blobQueue: BlobTask[] = [];
3434
let sending = false;
35-
export const eventEmitter = new EventEmitter();
3635

3736
export const retreiveBlob = async (filePath: string) => {
3837
const resolvedFilePath = path.join(utils.constants.DATA_DIRECTORY, utils.constants.BLOBS_SUBDIRECTORY, filePath);
@@ -100,7 +99,7 @@ export const deliverBlob = async ({ blobPath, recipient, recipientURL, requestId
10099
timeout: utils.constants.REST_API_CALL_BLOB_REQUEST_TIMEOUT,
101100
httpsAgent
102101
});
103-
eventEmitter.emit('event', {
102+
await queueEvent({
104103
id: uuidV4(),
105104
type: 'blob-delivered',
106105
path: blobPath,
@@ -109,7 +108,7 @@ export const deliverBlob = async ({ blobPath, recipient, recipientURL, requestId
109108
} as IBlobDeliveredEvent);
110109
log.trace(`Blob delivered`);
111110
} catch (err: any) {
112-
eventEmitter.emit('event', {
111+
await queueEvent({
113112
id: uuidV4(),
114113
type: 'blob-failed',
115114
path: blobPath,

src/handlers/events.ts

Lines changed: 102 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,39 +15,120 @@
1515
// limitations under the License.
1616

1717
import EventEmitter from "events";
18-
import { OutboundEvent } from "../lib/interfaces";
18+
import { IAckEvent, IConfig, OutboundEvent } from "../lib/interfaces";
1919
import { Logger } from "../lib/logger";
2020
import * as utils from '../lib/utils';
2121

2222
const log = new Logger("handlers/events.ts")
2323

24-
let eventQueue: OutboundEvent[] = [];
25-
export const eventEmitter = new EventEmitter();
24+
let maxInflight = utils.constants.DEFAULT_MAX_INFLIGHT;
25+
let maxEventQueueSize = utils.constants.MAX_EVENT_QUEUE_SIZE;
26+
let eventQueue: OutboundEvent[];
27+
let inFlight: OutboundEvent[];
2628

27-
export const queueEvent = (socketEvent: OutboundEvent) => {
28-
if(eventQueue.length < utils.constants.MAX_EVENT_QUEUE_SIZE) {
29-
eventQueue.push(socketEvent);
30-
if(eventQueue.length === 1) {
31-
eventEmitter.emit('event', eventQueue[0]);
29+
let eventEmitter: EventEmitter;
30+
let unblockPromise: Promise<void> | undefined;
31+
let unblock: (() => void) | undefined;
32+
33+
export const init = async (config: IConfig) => {
34+
eventEmitter = new EventEmitter();
35+
eventQueue = [];
36+
inFlight = [];
37+
unblockPromise = undefined;
38+
unblock = undefined;
39+
if (config.events?.maxInflight !== undefined) {
40+
maxInflight = config.events.maxInflight;
41+
}
42+
if (config.events?.queueSize !== undefined) {
43+
maxEventQueueSize = config.events.queueSize;
44+
}
45+
}
46+
47+
const dispatchNext = () => {
48+
if (inFlight.length < maxInflight) {
49+
const event = eventQueue.shift();
50+
if (event) {
51+
inFlight.push(event)
52+
log.debug(`${event.id}: dispatched`);
53+
eventEmitter.emit('event', event);
3254
}
33-
} else {
34-
log.warn('Max queue size reached');
3555
}
36-
};
3756

38-
export const handleCommit = () => {
39-
eventQueue.shift();
40-
if(eventQueue.length > 0) {
41-
eventEmitter.emit('event', eventQueue[0]);
57+
if (eventQueue.length < maxEventQueueSize && unblock) {
58+
unblock();
59+
unblockPromise = undefined;
60+
unblock = undefined;
61+
log.info(`Event queue unblocked (length=${eventQueue.length})`);
4262
}
4363
}
4464

45-
export const getCurrentEvent = () => {
46-
if(eventQueue.length > 0) {
47-
return eventQueue[0];
65+
export const queueEvent = async (socketEvent: OutboundEvent) => {
66+
67+
let currentUnblockPromise = unblockPromise;
68+
if (currentUnblockPromise) {
69+
let blockedTime = Date.now();
70+
log.warn(`${socketEvent.id}: delaying receive due to full event queue (length=${eventQueue.length})`);
71+
await currentUnblockPromise;
72+
log.info(`${socketEvent.id}: unblocked receive after ${Date.now()-blockedTime}ms`);
4873
}
49-
};
5074

51-
export const getQueueSize = () => {
52-
return eventQueue.length;
75+
eventQueue.push(socketEvent);
76+
if (eventQueue.length >= maxEventQueueSize && !unblockPromise) {
77+
log.warn(`Event queue became full (length=${eventQueue.length})`);
78+
unblockPromise = new Promise(resolve => {
79+
unblock = resolve;
80+
})
81+
}
82+
83+
dispatchNext();
5384
};
85+
86+
export const reDispatchInFlight = () => {
87+
for (const event of inFlight) {
88+
eventEmitter.emit('event', event)
89+
}
90+
}
91+
92+
export const handleAck = (ack: IAckEvent) => {
93+
94+
// Check we have something in-flight
95+
if (inFlight.length <= 0) {
96+
log.error(`Ack for ${ack.id} while no events in-flight`);
97+
return
98+
}
99+
100+
// If no ID supplied (back-level API) we
101+
if (ack.id === undefined) {
102+
log.warn(`FireFly core is back-level and did not supply an event ID`);
103+
ack.id = inFlight[0].id;
104+
}
105+
106+
// Remove from our in-flight map
107+
let event;
108+
for (let i = 0; i < inFlight.length; i++) {
109+
const candidate = inFlight[i]
110+
if (ack.id === candidate.id) {
111+
event = candidate;
112+
inFlight.splice(i, 1);
113+
break;
114+
}
115+
}
116+
if (!event) {
117+
log.warn(`Ack received for ${ack.id} which is not in-flight`);
118+
return
119+
}
120+
log.debug(`${ack.id}: acknowledged`);
121+
122+
dispatchNext();
123+
}
124+
125+
export const getEmitter = () => {
126+
return eventEmitter;
127+
}
128+
129+
export const getStats = () => {
130+
return {
131+
messageQueueSize: eventQueue.length,
132+
inFlightCount: inFlight.length,
133+
}
134+
}

src/handlers/messages.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,19 @@
1414
// See the License for the specific language governing permissions and
1515
// limitations under the License.
1616

17-
import EventEmitter from 'events';
1817
import FormData from 'form-data';
1918
import https from 'https';
2019
import { v4 as uuidV4 } from 'uuid';
2120
import { ca, cert, key } from '../lib/cert';
2221
import { IMessageDeliveredEvent, IMessageFailedEvent, MessageTask } from '../lib/interfaces';
2322
import { Logger } from '../lib/logger';
2423
import * as utils from '../lib/utils';
24+
import { queueEvent } from './events';
2525

2626
const log = new Logger('handlers/messages.ts')
2727

2828
let messageQueue: MessageTask[] = [];
2929
let sending = false;
30-
export const eventEmitter = new EventEmitter();
3130

3231
export const sendMessage = async (message: string, recipient: string, recipientURL: string, requestId: string | undefined) => {
3332
if (sending) {
@@ -55,7 +54,7 @@ export const deliverMessage = async ({ message, recipient, recipientURL, request
5554
headers: formData.getHeaders(),
5655
httpsAgent
5756
});
58-
eventEmitter.emit('event', {
57+
await queueEvent({
5958
id: uuidV4(),
6059
type: 'message-delivered',
6160
message,
@@ -64,7 +63,7 @@ export const deliverMessage = async ({ message, recipient, recipientURL, request
6463
} as IMessageDeliveredEvent);
6564
log.trace(`Message delivered`);
6665
} catch(err: any) {
67-
eventEmitter.emit('event', {
66+
await queueEvent({
6867
id: uuidV4(),
6968
type: 'message-failed',
7069
message,

src/lib/config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ const loadConfig = async () => {
5151
throw err;
5252
}
5353
}
54+
config = data as IConfig;
5455
if(validateConfig(data)) {
55-
config = data as IConfig;
5656
for(const peer of config.peers) {
5757
if(peer.endpoint.endsWith('/')) {
5858
peer.endpoint = peer.endpoint.slice(-0, -1);

0 commit comments

Comments
 (0)