Skip to content

Commit a5c2ae6

Browse files
author
Anton Kotenko
committed
First working draft
1 parent 2eb0b33 commit a5c2ae6

20 files changed

+533
-440
lines changed

lib/App.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@ class App {
4848
} catch (e) {
4949
this._safeLogError(e, 'Failed to stop gracefully');
5050
process.exit(this.constructor.EXIT_CODES.FAILED_TO_STOP);
51+
} finally {
52+
// FIXME unsubscribe more preciesly
53+
process.removeAllListeners('SIGTERM');
54+
process.removeAllListeners('SIGINT');
55+
process.removeAllListeners('uncaughtException');
56+
process.removeAllListeners('unhandledRejection');
5157
}
5258
}
5359

lib/MultiApp.js

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
const assert = require('assert');
2+
const uuid = require('uuid');
3+
const App = require('./App.js');
4+
5+
class MultiApp extends App {
6+
async _start() {
7+
const { ContainerLogger } = require('./logging.js'); // FIXME proper logger
8+
const AmqpConnWrapper = require('./AmqpConnWrapper.js');
9+
const { AmqpCommunicationLayer } = require('./amqp.js');
10+
const { MultisailorConfig } = require('./settings.js');
11+
const RestApiClient = require('elasticio-rest-node');
12+
13+
this._sailors = {};
14+
this._config = MultisailorConfig.fromEnv();
15+
this._logger = new ContainerLogger(this._config);
16+
this._amqpConn = new AmqpConnWrapper(this._config.AMQP_URI, this._logger);
17+
await this._amqpConn.start();
18+
this._apiClient = RestApiClient( // eslint-disable-line
19+
this._config.API_USERNAME,
20+
this._config.API_KEY,
21+
{
22+
retryCount: this._config.API_REQUEST_RETRY_ATTEMPTS,
23+
retryDelay: this._config.API_REQUEST_RETRY_DELAY
24+
}
25+
);
26+
this._sharedCommunicationLayer = new AmqpCommunicationLayer(this._amqpConn, this._config, this._logger);
27+
await this._sharedCommunicationLayer.listenQueue(this._dispatch.bind(this));
28+
}
29+
async _stop() {
30+
// FIXME stop all sailors
31+
if (this._sharedCommunicationLayer) {
32+
await this._sharedCommunicationLayer.unlistenQueue();
33+
}
34+
if (this._amqpConn) {
35+
await this._amqpConn.stop();
36+
this._amqpConn = null;
37+
}
38+
}
39+
async _handleFatalError() {
40+
}
41+
_logError() {
42+
}
43+
44+
async _dispatch(payload, msg) {
45+
try {
46+
const sailor = await this._getSailor(msg);
47+
await sailor.processMessage(payload, msg);
48+
} catch (e) {
49+
console.error(e.stack);
50+
this._logger.error(e, 'Can not handle message');
51+
}
52+
}
53+
_parseRouingKey(rk) {
54+
// $workspaceId.$taskId/$taskType.$stepId.$queue_suffix
55+
const parts = rk.split('.');
56+
// FIXME handle incorrect routing key; log error, skip and reject message to make it disappear in queue
57+
assert(parts.length === 4);
58+
// Traffic to service component should be forwarded with input routing key
59+
// because input routing key contains step id of CURRENT_STEP
60+
// (As opposite messages routing key contains previous step id);
61+
assert(parts[3] === 'input');
62+
return {
63+
flowId: parts[1].split('/')[0], // FIXME add asserts
64+
stepId: parts[2]
65+
};
66+
}
67+
async _getSailor(msg) {
68+
console.log('msg', msg);
69+
const { ComponentLogger } = require('./logging.js'); // FIXME proper logger
70+
const Sailor = require('./sailor.js');
71+
const { AmqpCommunicationLayer } = require('./amqp.js');
72+
73+
let sailor;
74+
const { flowId, stepId } = this._parseRouingKey(msg.fields.routingKey);
75+
console.log('flow+step', flowId, stepId);
76+
try {
77+
this._sailors[flowId] = this._sailors[flowId] || {};
78+
if (this._sailors[flowId][stepId]) {
79+
return this._sailors[flowId][stepId];
80+
}
81+
const [config, stepData] = await this._restoreExecContext(flowId, stepId, msg);
82+
const logger = new ComponentLogger(config); // FIXME defaults. Prooperly create it
83+
const communicationLayer = new AmqpCommunicationLayer(this._amqpConn, config, logger);
84+
sailor = new Sailor(
85+
communicationLayer,
86+
config,
87+
logger
88+
);
89+
sailor.stepData = stepData; // FIXME fucking ugly
90+
await sailor.prepare();
91+
/**
92+
* TODO skip atm. Generally it's possible to fetch data from api.
93+
* to handle if startup is requried.
94+
* Startup is requried if no hooks data and sailor supports startup hook
95+
* if (STARTUP_REQUIERD)
96+
* await sailor.startup();
97+
* }
98+
*/
99+
console.log('doing init');
100+
await sailor.init();
101+
// NOTICE do not call sailor.run here.
102+
// message dispatching is done in different ways in MultiSailor and SingleSailor modes
103+
return this._sailors[flowId][stepId] = sailor;
104+
} catch (e) {
105+
console.log('got error', e);
106+
// FIXME hell knows if this works
107+
await sailor && sailor.reportError(e);
108+
// FIXME try to log error in context of current message
109+
throw e;
110+
}
111+
}
112+
113+
async _restoreExecContext(flowId, stepId, msg) {
114+
// FIXME every step should be injected with it's own user + pass
115+
// retrieveStep should provide user + pass
116+
const manadatoryValues = Object.assign({}, this._config);
117+
manadatoryValues.FLOW_ID = flowId;
118+
manadatoryValues.STEP_ID = stepId,
119+
manadatoryValues.EXEC_ID = msg.properties.headers.execId;
120+
manadatoryValues.USER_ID = msg.properties.headers.userId;
121+
const stepData = await this._apiClient.tasks.retrieveStep(manadatoryValues.FLOW_ID, manadatoryValues.STEP_ID);
122+
123+
// NOTICE we generate one "pseudo-container" per flow + step;
124+
manadatoryValues.CONTAINER_ID = uuid.v4();
125+
126+
127+
manadatoryValues.WORKSPACE_ID = stepData.workspace_id;// FIXME assert
128+
manadatoryValues.COMP_ID = stepData.comp_id; // FIXME assert
129+
manadatoryValues.FUNCTION = stepData.function; // FIXME ASSERT
130+
131+
132+
manadatoryValues.COMP_NAME = stepData.comp_name; // FIXME assert
133+
manadatoryValues.EXEC_TYPE = 'flow-step'; // FIXME should match admiral ENV_VARS_CREATOR
134+
manadatoryValues.FLOW_VERSION = stepData.flow_version; // FIXME assert
135+
manadatoryValues.TENANT_ID = stepData.tenant_id; // FIXME assert
136+
manadatoryValues.CONTRACT_ID = stepData.contract_id; // FIXME assert
137+
//manadatoryValues.TASK_USER_EMAIL = stepData.XXXX
138+
//manadatoryValues.EXECUTION_RESULT_ID = 'ZZZZ'// FIXME skip ATM one time execs
139+
140+
//used by communication layer only
141+
//FIXME this knowledge is currently shared between this code and admiral.
142+
//have no idea how to fix it normally.
143+
//Probably sailor to monorepo, and reuse code between admiral and sailor.
144+
//Other option: step info endpoint should return this data
145+
manadatoryValues.PUBLISH_MESSAGES_TO = `${manadatoryValues.WORKSPACE_ID}_org`;
146+
manadatoryValues.DATA_ROUTING_KEY = `${manadatoryValues.WORKSPACE_ID}.${manadatoryValues.FLOW_ID}/ordinary.${manadatoryValues.STEP_ID}.message`;
147+
manadatoryValues.ERROR_ROUTING_KEY = `${manadatoryValues.WORKSPACE_ID}.${manadatoryValues.FLOW_ID}/ordinary.${manadatoryValues.STEP_ID}.error`;
148+
manadatoryValues.REBOUND_ROUTING_KEY = `${manadatoryValues.WORKSPACE_ID}.${manadatoryValues.FLOW_ID}/ordinary.${manadatoryValues.STEP_ID}.rebound`;
149+
manadatoryValues.SNAPSHOT_ROUTING_KEY = `${manadatoryValues.WORKSPACE_ID}.${manadatoryValues.FLOW_ID}/ordinary.${manadatoryValues.STEP_ID}.snapshot`;
150+
151+
return [manadatoryValues, stepData];
152+
}
153+
}
154+
module.exports = MultiApp;

lib/Multisailor.js

Lines changed: 0 additions & 99 deletions
This file was deleted.

lib/SingleApp.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class SingleApp extends App {
3131
}
3232

3333
async _stop() {
34+
// FIXME Sailor should be also stopped
3435
if (this._communicationLayer) {
3536
await this._communicationLayer.unlistenQueue();
3637
}

lib/amqp.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ class BaseCommunicationLayer {
9898
* PRPCESS_AMQP_DRAIN
9999
* AMQP_PUBLISH_RETRY_DELAY
100100
* AMQP_PUBLISH_RETRY_ATTEMPTS
101-
* RABBITMQ_PREFETCH_SAILOR: 1,
101+
* RABBITMQ_PREFETCH_SAILOR
102102
*/
103103
class AmqpCommunicationLayer extends BaseCommunicationLayer {
104104
constructor(amqpConn, settings, logger) {

lib/sailor.js

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,16 @@ class Sailor {
3737
this._communicationLayer = communicationLayer;
3838
this.componentReader = new ComponentReader(this._logger);
3939
this.snapshot = {};
40-
this.stepData = {};
40+
this.stepData = null;
4141
//eslint-disable-next-line new-cap
42-
this.apiClient = RestApiClient(
42+
this._apiClient = RestApiClient(
4343
settings.API_USERNAME,
4444
settings.API_KEY,
4545
{
4646
retryCount: settings.API_REQUEST_RETRY_ATTEMPTS,
4747
retryDelay: settings.API_REQUEST_RETRY_DELAY
48-
});
48+
}
49+
);
4950
}
5051

5152
async prepare() {
@@ -55,20 +56,18 @@ class Sailor {
5556
FLOW_ID: flowId,
5657
STEP_ID: stepId
5758
},
58-
apiClient,
5959
componentReader
6060
} = this;
61-
62-
const stepData = await apiClient.tasks.retrieveStep(flowId, stepId);
63-
this._logger.debug('Received step data: %j', stepData);
64-
65-
Object.assign(this, {
66-
snapshot: stepData.snapshot || {},
67-
stepData
68-
});
69-
70-
this.stepData = stepData;
71-
61+
console.log('going to fetch flow step data', flowId, stepId);
62+
if (!this.stepData) {
63+
const stepData = await this._apiClient.tasks.retrieveStep(flowId, stepId);
64+
this.stepData = stepData;
65+
this.snapshot = stepData.snapshot || {};
66+
} else {
67+
this.snapshot = this.stepData.snapshot || {};
68+
}
69+
this._logger.debug('Received step data: %j', this.stepData);
70+
console.log('going to call init');
7271
await componentReader.init(compPath);
7372
}
7473

@@ -84,6 +83,7 @@ class Sailor {
8483
function: this.settings.FUNCTION
8584
};
8685
const props = createDefaultAmqpProperties(headers);
86+
console.log('goung to send error', err, props);
8787
this._communicationLayer.sendError(err, props);
8888
}
8989

@@ -180,7 +180,6 @@ class Sailor {
180180
}
181181

182182
async processMessage(payload, message) {
183-
console.log('call process message');
184183
//eslint-disable-next-line consistent-this
185184
const self = this;
186185
const settings = this.settings;
@@ -378,7 +377,7 @@ class Sailor {
378377
}, 'processMessage emit updateKeys');
379378

380379
try {
381-
await self.apiClient.accounts.update(cfg._account, { keys: keys });
380+
await self._apiClient.accounts.update(cfg._account, { keys: keys });
382381
logger.debug('Successfully updated keys #%s', deliveryTag);
383382
} catch (error) {
384383
logger.error('Failed to updated keys #%s', deliveryTag);

0 commit comments

Comments
 (0)