Skip to content

Commit

Permalink
Merge pull request activepieces#378 from activepieces/remove-node-redis
Browse files Browse the repository at this point in the history
  • Loading branch information
abuaboud authored Jan 25, 2023
2 parents 2ae3da8 + 97aac22 commit deb7546
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 307 deletions.
345 changes: 62 additions & 283 deletions package-lock.json

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"@google-cloud/vision": "^3.0.1",
"@mailchimp/mailchimp_marketing": "^3.0.80",
"@materia-ui/ngx-monaco-editor": "^6.0.0",
"@microfleet/ioredis-lock": "^5.1.0",
"@ngrx/effects": "^15.1.0",
"@ngrx/store": "^15.1.0",
"@ngrx/store-devtools": "^15.1.0",
Expand Down Expand Up @@ -64,8 +65,6 @@
"qs": "^6.11.0",
"quill": "^1.3.7",
"quill-mention": "^3.1.0",
"redis": "^4.5.1",
"redis-lock": "^1.0.0",
"rxjs": "~7.5.0",
"string-replace-async": "^3.0.2",
"tslib": "^2.3.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import { databaseConnection } from "../database/database-connection";
import { buildPaginator } from "../helper/pagination/build-paginator";
import { paginationHelper } from "../helper/pagination/pagination-utils";
import { AppConnectionEntity } from "./app-connection-entity";
import axios, { AxiosError } from "axios";
import axios from "axios";
import { createRedisLock } from "../database/redis-connection";

const appConnectionRepo = databaseConnection.getRepository(AppConnectionEntity);

Expand All @@ -16,6 +17,9 @@ export const appConnectionService = {
})
},
async getOne(projectId: ProjectId, name: string): Promise<AppConnection | null> {
// We should make sure this is accessed only once, as a race condition could occur where the token needs to be refreshed and it gets accessed at the same time,
// which could result in the wrong request saving incorrect data.
const refreshLock = await createRedisLock(`${projectId}_${name}`);
const appConnection = await appConnectionRepo.findOneBy({
projectId: projectId,
name: name
Expand All @@ -25,6 +29,7 @@ export const appConnectionService = {
}
const refreshedAppConnection = await refresh(appConnection);
await appConnectionRepo.update(refreshedAppConnection.id, refreshedAppConnection);
refreshLock.release();
return refreshedAppConnection;
},
async delete(id: AppConnectionId): Promise<void> {
Expand Down Expand Up @@ -63,7 +68,7 @@ async function refresh(connection: AppConnection): Promise<AppConnection> {
for (const key in Object.keys(connection.value)) {
let connectionValue = connection.value[key];
if (typeof connectionValue === 'object' && connectionValue.hasOwnProperty('type')) {
let type: AppConnectionType = connectionValue.type;
const type: AppConnectionType = connectionValue.type;
switch (type) {
case AppConnectionType.CLOUD_OAUTH2:
connectionValue = await refreshCloud(connection.appName, connectionValue as CloudOAuth2ConnectionValue);
Expand All @@ -76,6 +81,7 @@ async function refresh(connection: AppConnection): Promise<AppConnection> {
}
}
}
break;
default:
break;
}
Expand Down Expand Up @@ -137,7 +143,7 @@ async function refreshWithCredentials(appConnection: OAuth2ConnectionValueWithAp

export function formatOAuth2Response(response: Record<string, any>) {
const secondsSinceEpoch = Math.round(Date.now() / 1000);
let formattedResponse: BaseOAuth2ConnectionValue = {
const formattedResponse: BaseOAuth2ConnectionValue = {
access_token: response["access_token"],
expires_in: response["expires_in"],
claimed_at: secondsSinceEpoch,
Expand Down
2 changes: 0 additions & 2 deletions packages/backend/src/app/database/database-module.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import { FastifyPluginAsync } from "fastify";
import { databaseConnection } from "./database-connection";
import { redisLockClient } from "./redis-connection";

export const databaseModule: FastifyPluginAsync = async (_app, _opts) => {
await databaseConnection.initialize();
await databaseConnection.runMigrations();
await redisLockClient.connect();
};
21 changes: 13 additions & 8 deletions packages/backend/src/app/database/redis-connection.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
import { createClient } from "redis";
import Redis from "ioredis";
import createRedisLock from "redis-lock";
import { system } from "../helper/system/system";
import { SystemProp } from "../helper/system/system-prop";
import { createLock } from "@microfleet/ioredis-lock";

const username = system.get(SystemProp.REDIS_USER);
const password = system.get(SystemProp.REDIS_PASSWORD);
const useSsl = system.get(SystemProp.REDIS_USE_SSL)??false;
const host = system.getOrThrow(SystemProp.REDIS_HOST);
const serializedPort = system.getOrThrow(SystemProp.REDIS_PORT);
const port = Number.parseInt(serializedPort, 10);

export const redisLockClient = createClient({
url: `redis://${host}:${port}`,
});

export const redisLock = createRedisLock(redisLockClient, 5000);
redisLockClient.on("error", (err: unknown) => console.log("Redis Client Error", err));

export const createRedisClient = (): Redis => {
return new Redis({
Expand All @@ -24,5 +18,16 @@ export const createRedisClient = (): Redis => {
username: username,
password: password,
maxRetriesPerRequest: null,
tls: useSsl ? {} : undefined,
});
};

const redisConection = createRedisClient();

export const createRedisLock = (key: string) => {
return createLock(redisConection, {
timeout: 2 * 60 * 1000,
retries: 3,
delay: 100,
}).acquire(key);
};
6 changes: 3 additions & 3 deletions packages/backend/src/app/flows/flow-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
import { flowVersionService } from "./flow-version/flow-version.service";
import { paginationHelper } from "../helper/pagination/pagination-utils";
import { buildPaginator } from "../helper/pagination/build-paginator";
import { redisLock } from "../database/redis-connection";
import { createRedisLock } from "../database/redis-connection";
import { ActivepiecesError, ErrorCode } from "@activepieces/shared";

const flowRepo = databaseConnection.getRepository(FlowEntity);
Expand Down Expand Up @@ -100,13 +100,13 @@ export const flowService = {
};
},
async update(flowId: FlowId, request: FlowOperationRequest): Promise<Flow | null> {
const flowLock = await redisLock(flowId);
const flowLock = await createRedisLock(flowId);
let lastVersion = (await flowVersionService.getFlowVersion(flowId, undefined))!;
if (lastVersion.state === FlowVersionState.LOCKED) {
lastVersion = await flowVersionService.createVersion(flowId, lastVersion);
}
await flowVersionService.applyOperation(lastVersion, request);
await flowLock();
await flowLock.release();
return await this.getOne(flowId, undefined);
},
async delete(flowId: FlowId): Promise<void> {
Expand Down
1 change: 1 addition & 0 deletions packages/backend/src/app/helper/system/system-prop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export enum SystemProp {
REDIS_PORT = "REDIS_PORT",
REDIS_USER = "REDIS_USER",
REDIS_PASSWORD = "REDIS_PASSWORD",
REDIS_USE_SSL = "REDIS_USE_SSL",

WARNING_TEXT_BODY = "WARNING_TEXT_BODY",
WARNING_TEXT_HEADER = "WARNING_TEXT_HEADER",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import { triggerUtils } from "../../helper/trigger-utils";
import { ONE_TIME_JOB_QUEUE, REPEATABLE_JOB_QUEUE } from "./flow-queue";
import { flowWorker } from "./flow-worker";
import { OneTimeJobData, RepeatableJobData } from "./job-data";
import { logger } from "packages/backend/src/main";
import { collectionVersionService } from "../../collections/collection-version/collection-version.service";
import { logger } from "packages/backend/src/main";

const oneTimeJobConsumer = new Worker<OneTimeJobData, unknown, ApId>(
ONE_TIME_JOB_QUEUE,
Expand Down
8 changes: 3 additions & 5 deletions packages/backend/src/app/workers/flow-worker/flow-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ import {
Action,
ActionType,
CodeActionSettings,
Collection,
CollectionVersion,
EngineOperationType,
ExecutionOutputStatus,
File,
FlowVersion,
Expand All @@ -15,21 +13,21 @@ import {
import { Sandbox, sandboxManager } from "../sandbox";
import { flowVersionService } from "../../flows/flow-version/flow-version.service";
import { collectionVersionService } from "../../collections/collection-version/collection-version.service";
import { redisLock } from "../../database/redis-connection";
import { fileService } from "../../file/file.service";
import { codeBuilder } from "../code-worker/code-builder";
import { flowRunService } from "../../flow-run/flow-run-service";
import { OneTimeJobData } from "./job-data";
import { collectionService } from "../../collections/collection.service";
import { engineHelper } from "../../helper/engine-helper";
import { createRedisLock } from "../../database/redis-connection";

async function executeFlow(jobData: OneTimeJobData): Promise<void> {
const flowVersion = (await flowVersionService.getOne(jobData.flowVersionId))!;
const collectionVersion = (await collectionVersionService.getOne(jobData.collectionVersionId))!;
const collection = await collectionService.getOneOrThrow(collectionVersion.collectionId);

const sandbox = sandboxManager.obtainSandbox();
const flowLock = await redisLock(flowVersion.id);
const flowLock = await createRedisLock(flowVersion.id);
console.log(`[${jobData.runId}] Executing flow ${flowVersion.id} in sandbox ${sandbox.boxId}`);
try {
await sandbox.cleanAndInit();
Expand Down Expand Up @@ -57,7 +55,7 @@ async function executeFlow(jobData: OneTimeJobData): Promise<void> {
await flowRunService.finish(jobData.runId, ExecutionOutputStatus.INTERNAL_ERROR, null);
} finally {
sandboxManager.returnSandbox(sandbox.boxId);
await flowLock();
await flowLock.release();
}
console.log(`[${jobData.runId}] Finished executing flow ${flowVersion.id} in sandbox ${sandbox.boxId}`);
}
Expand Down

0 comments on commit deb7546

Please sign in to comment.