Skip to content

Commit a2dccfd

Browse files
authored
IAS xsuaa hybrid improvement (#385)
* xsuaa ias hybrid improvement * prepare new version * fix locking * fix tests
1 parent 6aeb263 commit a2dccfd

File tree

12 files changed

+202
-262
lines changed

12 files changed

+202
-262
lines changed

package-lock.json

Lines changed: 166 additions & 155 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@cap-js-community/event-queue",
3-
"version": "1.11.0-beta.5",
3+
"version": "1.11.0",
44
"description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.",
55
"main": "src/index.js",
66
"types": "src/index.d.ts",
@@ -47,24 +47,24 @@
4747
"node": ">=18"
4848
},
4949
"dependencies": {
50-
"@sap/xssec": "^4.6.0",
51-
"cron-parser": "^5.3.1",
50+
"@sap/xssec": "^4.10.0",
51+
"cron-parser": "^5.4.0",
5252
"redis": "^4.7.0",
5353
"verror": "^1.10.1",
5454
"yaml": "^2.7.1"
5555
},
5656
"devDependencies": {
5757
"@cap-js/cds-test": "^0.4.0",
58-
"@cap-js/hana": "^2.2.0",
59-
"@cap-js/sqlite": "^2.0.1",
60-
"@sap/cds": "^9.3.1",
61-
"@sap/cds-dk": "^9.3.1",
58+
"@cap-js/hana": "^2.3.3",
59+
"@cap-js/sqlite": "^2.0.3",
60+
"@sap/cds": "^9.4.1",
61+
"@sap/cds-dk": "^9.4.1",
6262
"eslint": "^8.57.0",
6363
"eslint-config-prettier": "^9.1.0",
6464
"eslint-plugin-jest": "^28.6.0",
6565
"eslint-plugin-node": "^11.1.0",
6666
"express": "^4.21.2",
67-
"hdb": "^2.25.1",
67+
"hdb": "^2.26.1",
6868
"jest": "^29.7.0",
6969
"prettier": "^2.8.8",
7070
"sqlite3": "^5.1.7",

src/redis/redisPub.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ const broadcastEvent = async (tenantId, events, forceBroadcast = false) => {
7777
for (let i = 0; i < TRIES_FOR_PUBLISH_PERIODIC_EVENT; i++) {
7878
const result = eventConfig.multiInstanceProcessing
7979
? false
80-
: await distributedLock.checkLockExistsAndReturnValue(context, [type, subType].join("##"));
80+
: await distributedLock.checkLockExists(context, [type, subType].join("##"));
8181
if (result) {
8282
logger.debug("skip publish redis event as no lock is available", {
8383
type,

src/runner/runner.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ const _acquireRunId = async (context) => {
405405
overrideValue: true,
406406
});
407407
} else {
408-
runId = await distributedLock.checkLockExistsAndReturnValue(context, EVENT_QUEUE_RUN_ID, {
408+
runId = await distributedLock.getValue(context, EVENT_QUEUE_RUN_ID, {
409409
tenantScoped: false,
410410
});
411411
}
@@ -422,7 +422,7 @@ const _calculateOffsetForFirstRun = async () => {
422422
try {
423423
await trace(dummyContext, "calculateOffsetForFirstRun", async () => {
424424
if (eventQueueConfig.redisEnabled) {
425-
let lastRunTs = await distributedLock.checkLockExistsAndReturnValue(dummyContext, EVENT_QUEUE_RUN_TS, {
425+
let lastRunTs = await distributedLock.getValue(dummyContext, EVENT_QUEUE_RUN_TS, {
426426
tenantScoped: false,
427427
});
428428
if (!lastRunTs) {
@@ -434,7 +434,7 @@ const _calculateOffsetForFirstRun = async () => {
434434
if (couldSetValue) {
435435
lastRunTs = ts;
436436
} else {
437-
lastRunTs = await distributedLock.checkLockExistsAndReturnValue(dummyContext, EVENT_QUEUE_RUN_TS, {
437+
lastRunTs = await distributedLock.getValue(dummyContext, EVENT_QUEUE_RUN_TS, {
438438
tenantScoped: false,
439439
});
440440
}

src/shared/cdsHelper.js

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -165,38 +165,7 @@ const getAllTenantIds = async () => {
165165
}, []);
166166
};
167167

168-
const TENANT_COLUMNS = ["subscribedSubdomain", "createdAt", "modifiedAt"];
169-
170-
const getAllTenantWithMetadata = async () => {
171-
const response = await _getAllTenantBase();
172-
if (!response) {
173-
return null;
174-
}
175-
176-
return response.reduce(async (result, row) => {
177-
const tenantId = row.subscribedTenantId ?? row.tenant;
178-
result = await result;
179-
if (await common.isTenantIdValidCb(TenantIdCheckTypes.eventProcessing, tenantId)) {
180-
const data = Object.entries(row).reduce(
181-
(result, [key, value]) => {
182-
if (TENANT_COLUMNS.includes(key)) {
183-
result[key] = value;
184-
} else {
185-
result.metadata[key] = value;
186-
}
187-
return result;
188-
},
189-
{ metadata: {} }
190-
);
191-
data.metadata = JSON.stringify(data.metadata);
192-
result.push(data);
193-
}
194-
return result;
195-
}, []);
196-
};
197-
198168
module.exports = {
199169
executeInNewTransaction,
200170
getAllTenantIds,
201-
getAllTenantWithMetadata,
202171
};

src/shared/common.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ const hashStringTo32Bit = (value) => crypto.createHash("sha256").update(String(v
9292
const _getNewAuthContext = async (tenantId) => {
9393
try {
9494
if (!_getNewAuthContext._xsuaaService) {
95-
_getNewAuthContext._xsuaaService = new xssec.XsuaaService(cds.requires.auth.credentials);
95+
_getNewAuthContext._xsuaaService = new xssec.XsuaaService(cds.requires["xsuaa-eventQueue"]?.credentials);
9696
}
9797
const authService = _getNewAuthContext._xsuaaService;
9898
const token = await authService.fetchClientCredentialsToken({ zid: tenantId });

src/shared/distributedLock.js

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,21 @@ const releaseLock = async (context, key, { tenantScoped = true } = {}) => {
6060
}
6161
};
6262

63-
const checkLockExistsAndReturnValue = async (context, key, { tenantScoped = true } = {}) => {
63+
const checkLockExists = async (context, key, { tenantScoped = true } = {}) => {
6464
const fullKey = _generateKey(context, tenantScoped, key);
6565
if (config.redisEnabled) {
66-
return await _checkLockExistsRedis(context, fullKey);
66+
return !!(await _getLockValueRedis(context, fullKey));
6767
} else {
68-
return await _checkLockExistsDb(context, fullKey);
68+
return !!(await _getLockValueDb(context, fullKey));
69+
}
70+
};
71+
72+
const getValue = async (context, key, { tenantScoped = true } = {}) => {
73+
const fullKey = _generateKey(context, tenantScoped, key);
74+
if (config.redisEnabled) {
75+
return await _getLockValueRedis(context, fullKey);
76+
} else {
77+
return await _getLockValueDb(context, fullKey);
6978
}
7079
};
7180

@@ -106,12 +115,12 @@ const _renewLockRedis = async (context, fullKey, expiryTime, { value = "true" }
106115
return result === REDIS_COMMAND_OK;
107116
};
108117

109-
const _checkLockExistsRedis = async (context, fullKey) => {
118+
const _getLockValueRedis = async (context, fullKey) => {
110119
const client = await redis.createMainClientAndConnect(config.redisOptions);
111-
return await client.exists(fullKey);
120+
return await client.get(fullKey);
112121
};
113122

114-
const _checkLockExistsDb = async (context, fullKey) => {
123+
const _getLockValueDb = async (context, fullKey) => {
115124
let result;
116125
await cdsHelper.executeInNewTransaction(context, "distributedLock-checkExists", async (tx) => {
117126
result = await tx.run(SELECT.one.from(config.tableNameEventLock).where("code =", fullKey));
@@ -259,7 +268,8 @@ const shutdownHandler = async () => {
259268
module.exports = {
260269
acquireLock,
261270
releaseLock,
262-
checkLockExistsAndReturnValue,
271+
checkLockExists,
272+
getValue,
263273
setValueWithExpire,
264274
shutdownHandler,
265275
renewLock,

srv/service/admin-service.cds

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,4 @@ service EventQueueAdminService {
4242
@mandatory
4343
subType: String) returns Boolean;
4444
}
45-
46-
@readonly
47-
@cds.persistence.skip
48-
entity Tenant {
49-
Key ID: String;
50-
subdomain: String;
51-
metadata: String;
52-
}
5345
}

srv/service/admin-service.js

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
"use strict";
22

33
const cds = require("@sap/cds");
4-
const cdsHelper = require("../../src/shared/cdsHelper");
54
const { EventProcessingStatus } = require("../../src");
65
const config = require("../../src/config");
76
const distributedLock = require("../../src/shared/distributedLock");
87
const redisPub = require("../../src/redis/redisPub");
98

109
module.exports = class AdminService extends cds.ApplicationService {
1110
async init() {
12-
const { Event: EventService, Tenant, Lock: LockService } = this.entities();
11+
const { Event: EventService, Lock: LockService } = this.entities();
1312
const { Event: EventDb } = cds.db.entities("sap.eventqueue");
1413
const { landscape, space } = this.getLandscapeAndSpace();
1514

@@ -18,9 +17,6 @@ module.exports = class AdminService extends cds.ApplicationService {
1817
req.reject(403, "Admin service is disabled by configuration");
1918
}
2019

21-
if (req.target.name === Tenant.name) {
22-
return;
23-
}
2420
const headers = Object.assign({}, req.headers, req.req?.headers);
2521
const tenant = headers["z-id"] ?? req.data.tenant;
2622

@@ -61,11 +57,6 @@ module.exports = class AdminService extends cds.ApplicationService {
6157
}));
6258
});
6359

64-
this.on("READ", Tenant, async () => {
65-
const tenants = await cdsHelper.getAllTenantWithMetadata();
66-
return tenants ?? [];
67-
});
68-
6960
this.on("setStatusAndAttempts", async (req) => {
7061
const tenant = req.headers["z-id"];
7162
cds.log("eventQueue").info("Restarting processing for event queue");

test-integration/runner.test.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -495,12 +495,12 @@ describe("runner", () => {
495495
});
496496

497497
it("acquireRunId should set ts", async () => {
498-
let runTs = await distributedLock.checkLockExistsAndReturnValue({}, runner.__.EVENT_QUEUE_RUN_TS, {
498+
let runTs = await distributedLock.getValue({}, runner.__.EVENT_QUEUE_RUN_TS, {
499499
tenantScoped: false,
500500
});
501501
expect(runTs).toBeFalsy();
502502
await runner.__._acquireRunId();
503-
runTs = await distributedLock.checkLockExistsAndReturnValue({}, runner.__.EVENT_QUEUE_RUN_TS, {
503+
runTs = await distributedLock.getValue({}, runner.__.EVENT_QUEUE_RUN_TS, {
504504
tenantScoped: false,
505505
});
506506
expect(runTs).toBeDefined();
@@ -512,7 +512,7 @@ describe("runner", () => {
512512
jest.useFakeTimers();
513513
const systemTime = Date.now();
514514
jest.setSystemTime(systemTime);
515-
const runTs = await distributedLock.checkLockExistsAndReturnValue({}, runner.__.EVENT_QUEUE_RUN_TS, {
515+
const runTs = await distributedLock.getValue({}, runner.__.EVENT_QUEUE_RUN_TS, {
516516
tenantScoped: false,
517517
});
518518
const expectedTs = new Date(runTs).getTime() + configInstance.runInterval - systemTime;
@@ -528,7 +528,7 @@ describe("runner", () => {
528528
jest.useFakeTimers();
529529
const systemTime = Date.now();
530530
jest.setSystemTime(systemTime);
531-
const runTs = await distributedLock.checkLockExistsAndReturnValue({}, runner.__.EVENT_QUEUE_RUN_TS, {
531+
const runTs = await distributedLock.getValue({}, runner.__.EVENT_QUEUE_RUN_TS, {
532532
tenantScoped: false,
533533
});
534534
const expectedTs = new Date(runTs).getTime() + configInstance.runInterval - systemTime;

0 commit comments

Comments
 (0)