Skip to content

Commit 2eb0b33

Browse files
author
Anton Kotenko
committed
Component as service -- pass tests except one time execs
1 parent 8f8fb57 commit 2eb0b33

36 files changed

+4981
-7540
lines changed

.eslintrc.json

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
{
2+
"root": true,
3+
"parserOptions": {
4+
"ecmaVersion": 2018,
5+
"sourceType": "script",
6+
"ecmaFeatures": {
7+
"impliedStrict": true
8+
}
9+
},
10+
"plugins": [
11+
"eslint-plugin-mocha"
12+
],
13+
"extends": [
14+
"standard"
15+
],
16+
"rules": {
17+
"mocha/no-exclusive-tests": "error",
18+
"mocha/no-skipped-tests": "error",
19+
"node/no-deprecated-api": "warn",
20+
"semi": [
21+
"error",
22+
"always"
23+
],
24+
"indent": [
25+
"error",
26+
4
27+
],
28+
"max-len": [
29+
"warn",
30+
120,
31+
4,
32+
{
33+
"ignoreUrls": true,
34+
"ignoreTemplateLiterals": true
35+
}
36+
],
37+
"space-before-function-paren": [
38+
"warn",
39+
{
40+
"anonymous": "always",
41+
"named": "never",
42+
"asyncArrow": "always"
43+
}
44+
]
45+
}
46+
}

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ coverage
22
node_modules
33
.DS_Store
44
.idea
5+
.*.sw[op]

lib/AmqpConnWrapper.js

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
const amqplib = require('amqplib');
2+
// FIXME TESTS
3+
class AmqpConnWrapper {
4+
constructor(amqpUri, logger) {
5+
this._logger = logger;
6+
this._amqpUri = amqpUri;
7+
}
8+
async start() {
9+
// FIXME safety for double calls
10+
// TODO handle channel errors. Recoonect????
11+
this._amqp = await amqplib.connect(this._amqpUri);
12+
// FIXME
13+
//this.amqp.on('error', this._logger.criticalerrorandExit);
14+
//this.amqp.on('close', this._logger.criticalerrorandExit);
15+
this._logger.debug('Connected to amqp');
16+
17+
this._subscribeChannel = await this._amqp.createChannel();
18+
// FIXME
19+
//this.subscribeChannel.on('error', this._logger.criticalErrorAndExit);
20+
this._logger.debug('Opened subscribe channel');
21+
22+
this._publishChannel = await this._amqp.createConfirmChannel();
23+
// FIXME
24+
//this.publishChannel.on('error', this._logger.criticalErrorAndExit);
25+
this._logger.debug('Opened publish channel');
26+
}
27+
async stop() {
28+
// FIXME safety for double calls
29+
this._logger.trace('Close AMQP connections');
30+
try {
31+
await this._subscribeChannel.close();
32+
} catch (alreadyClosed) {
33+
this._logger.debug('Subscribe channel is closed already');
34+
}
35+
try {
36+
await this._publishChannel.close();
37+
} catch (alreadyClosed) {
38+
this._logger.debug('Publish channel is closed already');
39+
}
40+
try {
41+
await this._amqp.close();
42+
} catch (alreadyClosed) {
43+
this._logger.debug('AMQP connection is closed already');
44+
}
45+
this._logger.debug('Successfully closed AMQP connections');
46+
}
47+
getPublishChannel() {
48+
return this._publishChannel;
49+
}
50+
getSubscribeChannel() {
51+
return this._subscribeChannel;
52+
}
53+
}
54+
module.exports = AmqpConnWrapper;

lib/App.js

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
class App {
2+
/**
3+
* Start application
4+
* Implemented as SAFE in terms of exception
5+
*/
6+
async start() {
7+
process.on('uncaughtException', (e) => {
8+
try {
9+
this._safeLogError(e, 'Uncaught exception');
10+
this._safeFatalErrorHook(e);
11+
} catch (error) {
12+
console.error('Error handling uncaught exception', error);
13+
} finally {
14+
process.exit(this.constructor.EXIT_CODES.UNCAUGHT_EXCEPTION);
15+
}
16+
});
17+
process.on('unhandledRejection', (e) => {
18+
try {
19+
this._safeLogError(e, 'Uncaught rejection');
20+
this._safeFatalErrorHook(e);
21+
} catch (error) {
22+
console.error('Error handling uncaught rejection', error);
23+
} finally {
24+
process.exit(this.constructor.EXIT_CODES.UNCAUGHT_REJECTION);
25+
}
26+
});
27+
process.on('SIGTERM', this.stop.bind(this));
28+
process.on('SIGINT', this.stop.bind(this));
29+
30+
try {
31+
await this._start();
32+
} catch (e) {
33+
this._safeLogError(e, 'Failed to start');
34+
this._safeFatalErrorHook(e);
35+
process.exit(this.constructor.EXIT_CODES.FAILED_TO_START);
36+
}
37+
}
38+
39+
40+
/**
41+
* Gracefully stop application.
42+
* part of standard shutdown procedure
43+
* Implemented as SAFE in terms of exception
44+
*/
45+
async stop() {
46+
try {
47+
await this._stop();
48+
} catch (e) {
49+
this._safeLogError(e, 'Failed to stop gracefully');
50+
process.exit(this.constructor.EXIT_CODES.FAILED_TO_STOP);
51+
}
52+
}
53+
54+
/**
55+
* Start procedure implementation.
56+
* Not safe in terms of exceptions. May throw.
57+
* In case of error, exception will be caught, logged
58+
* and process will be destroyed in "cruel" way
59+
*/
60+
_start() {
61+
throw new Error('implement me');
62+
}
63+
64+
/**
65+
* Graceful stop procedure implementation.
66+
* Not safe in terms of exceptions. May throw.
67+
* In case of error, exception will be caught, logged
68+
* and process will be destroyed in "cruel" way
69+
*/
70+
_stop() {
71+
throw new Error('implement me');
72+
}
73+
74+
_logError() {
75+
throw new Error('implement me');
76+
}
77+
78+
async _handleFatalError() {
79+
throw new Error('implement me');
80+
}
81+
82+
_safeLogError() {
83+
console.error(...arguments);
84+
try {
85+
this._logError(...arguments);
86+
} catch (e) {
87+
console.error('Uncaught rejection', e);
88+
}
89+
}
90+
91+
_safeFatalErrorHook() {
92+
try {
93+
this._handleFatalError(...arguments);
94+
} catch (e) {
95+
this._safeLogError(e, 'Uncaught rejection');
96+
}
97+
}
98+
99+
static get EXIT_CODES() {
100+
return {
101+
SUCCESS: 0,
102+
FAILED_TO_START: 1,
103+
FAILED_TO_STOP: 2,
104+
UNCAUGHT_EXCEPTION: 3,
105+
UNCAUGHT_REJECTION: 4
106+
};
107+
}
108+
}
109+
module.exports = App;

lib/Multisailor.js

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
const assert = require('assert');
2+
const { ComponentLogger } = require('./logging.js');
3+
const { AmqpCommunicationLayer } = require('./amqp.js');
4+
const { Sailor } = require('./sailor.js');
5+
6+
7+
class MultiSailor {
8+
constructor(amqpConn, config, logger) {
9+
this._amqpConn = amqpConn;
10+
this._logger = logger; // not context aware logger
11+
this._sharedCommunicationLayer = new AmqpCommunicationLayer(this._amqpConn, this._logger, config);
12+
this._config = config;
13+
this._sailors = {};
14+
}
15+
async start() {
16+
await this._sharedCommunicationLayer.listenQueue(this._dispatch.bind(this));
17+
}
18+
async stop() {
19+
await this._sharedCommunicationLayer.unlistenQueue();
20+
}
21+
22+
async _dispatch(msg) {
23+
try {
24+
const sailor = await this._getSailor(msg);
25+
sailor.processMessage(msg);
26+
} catch (e) {
27+
this._logger.error(e, 'Can not handle message');
28+
}
29+
}
30+
31+
async _getSailor(msg) {
32+
let sailor;
33+
const { taskId: flowId, stepId } = msg.properties.headers;
34+
assert(flowId && stepId, 'Incorrect message format, no taskId/stepId');
35+
try {
36+
this._sailors[flowId] = this._sailors[flowId] || {};
37+
if (this._sailors[flowId][stepId]) {
38+
return this._sailors[flowId][stepId];
39+
}
40+
const config = this._restoreExecContext(msg);
41+
const logger = new ComponentLogger(config); // FIXME defaults. Prooperly create it
42+
const communicationLayer = new AmqpCommunicationLayer(this._amqpConn, logger, config);
43+
const sailor = new Sailor(
44+
communicationLayer,
45+
logger,
46+
config
47+
);
48+
await sailor.prepare();
49+
/**
50+
* TODO skip atm. Generally it's possible to fetch data from api.
51+
* to handle if startup is requried.
52+
* Startup is requried if no hooks data and sailor supports startup hook
53+
* if (STARTUP_REQUERD)
54+
* await sailor.startup();
55+
* }
56+
*/
57+
await sailor.init();
58+
// NOTICE do not call sailor.run here.
59+
// message dispatching is done in different ways in MultiSailor and SingleSailor modes
60+
return this._sailors[flowId][stepId] = sailor;
61+
} catch (e) {
62+
sailor && sailor.reportError(e);
63+
// FIXME try to log error in context of current message
64+
throw e;
65+
}
66+
}
67+
68+
_restoreExecContext(msg) {
69+
const manadatoryValues = Object.assign({}, this._config);
70+
manadatoryValues.FLOW_ID = msg.properties.headers.taskId;
71+
manadatoryValues.STEP_ID = msg.properties.headers.stepId;
72+
manadatoryValues.EXEC_ID = msg.properties.headers.execId;
73+
manadatoryValues.USER_ID = msg.properties.headers.userId;
74+
75+
//// used by communication layer only
76+
//manadatoryValues.PUBLISH_MESSAGES_TO
77+
//manadatoryValues.DATA_ROUTING_KEY
78+
//manadatoryValues.ERROR_ROUTING_KEY
79+
//manadatoryValues.REBOUND_ROUTING_KEY
80+
//manadatoryValues.SNAPSHOT_ROUTING_KEY
81+
82+
//manadatoryValues.CONTAINER_ID = //STUB randomize
83+
84+
85+
//manadatoryValues.WORKSPACE_ID = stepData.
86+
//manadatoryValues.COMP_ID = stepData.comp_id;
87+
//manadatoryValues.FUNCTION = stepData.function;
88+
89+
90+
//manadatoryValues.COMP_NAME = stepData.comp_name; // FIXME
91+
//manadatoryValues.EXEC_TYPE = 'flow-step'; // FIXME should match admiral ENV_VARS_CREATOR
92+
//manadatoryValues.FLOW_VERSION = stepData.flow_version; // FIXME
93+
//manadatoryValues.TENANT_ID = stepData.tenant_id
94+
//manadatoryValues.TASK_USER_EMAIL = stepData.XXXX
95+
//manadatoryValues.EXECUTION_RESULT_ID = 'ZZZZ'// FIXME skip ATM one time execs
96+
return manadatoryValues;
97+
}
98+
}
99+
module.exports = MultiSailor;

lib/SingleApp.js

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
const App = require('./App.js');
2+
3+
class SingleApp extends App {
4+
async _start() {
5+
const { ComponentLogger } = require('./logging.js');
6+
const Sailor = require('./sailor.js');
7+
const AmqpConnWrapper = require('./AmqpConnWrapper.js');
8+
const { AmqpCommunicationLayer } = require('./amqp.js');
9+
const { SingleSailorConfig } = require('./settings.js');
10+
11+
this._config = SingleSailorConfig.fromEnv();
12+
this._logger = new ComponentLogger(this._config);
13+
this._amqpConn = new AmqpConnWrapper(this._config.AMQP_URI, this._logger);
14+
this._communicationLayer = new AmqpCommunicationLayer(this._amqpConn, this._config, this._logger);
15+
this._sailor = new Sailor(this._communicationLayer, this._config, this._logger);
16+
if (this._config.HOOK_SHUTDOWN) {
17+
await this._sailor.prepare();
18+
await this._sailor.shutdown();
19+
return;
20+
} else {
21+
await this._amqpConn.start();
22+
await this._sailor.prepare();
23+
24+
if (this._config.STARTUP_REQUIRED) {
25+
await this._sailor.startup();
26+
}
27+
28+
await this._sailor.init();
29+
await this._sailor.run();
30+
}
31+
}
32+
33+
async _stop() {
34+
if (this._communicationLayer) {
35+
await this._communicationLayer.unlistenQueue();
36+
}
37+
if (this._amqpConn) {
38+
await this._amqpConn.stop();
39+
this._amqpConn = null;
40+
}
41+
}
42+
43+
async _handleFatalError(error) {
44+
if (this._sailor && !this._config.HOOK_SHUTDOWN) {
45+
await this._sailor.reportError(error);
46+
}
47+
}
48+
49+
_logError() {
50+
if (this._logger) {
51+
this._logger.error(...arguments);
52+
}
53+
}
54+
}
55+
module.exports = SingleApp;

0 commit comments

Comments
 (0)