Skip to content

Commit

Permalink
Merge pull request activepieces#2758 from activepieces/fix/for-loop-t…
Browse files Browse the repository at this point in the history
…runcation

fix: for loop truncation
  • Loading branch information
khaledmashaly authored Sep 20, 2023
2 parents c723c6b + ae5220a commit 415faed
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 31 deletions.
61 changes: 45 additions & 16 deletions packages/engine/src/lib/helper/logging-utils.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
import { ActionType, ExecutionOutput, FlowVersion, LoopOnItemsStepOutput, StepOutput, applyFunctionToValues, isString } from "@activepieces/shared";
import { ActionType, ExecutionOutput, LoopOnItemsStepOutput, MAX_LOG_SIZE, StepOutput, applyFunctionToValues } from "@activepieces/shared";
import sizeof from "object-sizeof";
import { compressMemoryFileString, handleAPFile, isApFilePath, isMemoryFilePath } from "../services/files.service";
import { isMemoryFilePath } from "../services/files.service";

const TRIM_SIZE_BYTE = 512 * 1024;
const TRUNCATION_TEXT_PLACEHOLDER = '(truncated)'

export const loggerUtils = {
async trimExecution(executionState: ExecutionOutput) {
const steps = executionState.executionState.steps;
export const loggingUtils = {
async trimExecution(executionOutput: ExecutionOutput) {
const steps = executionOutput.executionState.steps;
for (const stepName in steps) {
const stepOutput = steps[stepName];
steps[stepName] = await trimStepOuput(stepOutput);
steps[stepName] = await trimStepOutput(stepOutput);
}
return executionState;
return executionOutput;
}
}

async function trimStepOuput(stepOutput: StepOutput): Promise<StepOutput> {
async function trimStepOutput(stepOutput: StepOutput): Promise<StepOutput> {
const modified: StepOutput = JSON.parse(JSON.stringify(stepOutput));
modified.input = await applyFunctionToValues(modified.input, trim);
switch (modified.type) {
Expand All @@ -39,13 +39,42 @@ async function trimStepOuput(stepOutput: StepOutput): Promise<StepOutput> {
return modified;
}

const trim = async (obj: any) => {
const trim = async (obj: unknown): Promise<unknown> => {
if (isMemoryFilePath(obj)) {
return await compressMemoryFileString(obj);
return TRUNCATION_TEXT_PLACEHOLDER
}
const size = sizeof(obj);
if (size > TRIM_SIZE_BYTE) {
return '(truncated)';

if (objectExceedMaxSize(obj) && isObject(obj)) {
const objectEntries = Object.entries(obj).sort(bySizeDesc)
let i = 0

while (i < objectEntries.length && objectEntriesExceedMaxSize(objectEntries)) {
const key = objectEntries[i][0]
obj[key] = TRUNCATION_TEXT_PLACEHOLDER
i += 1
}
}

if (!objectExceedMaxSize(obj)) {
return obj
}
return obj;
};

return TRUNCATION_TEXT_PLACEHOLDER
}

const objectEntriesExceedMaxSize = (objectEntries: [string, unknown][]): boolean => {
const obj = Object.fromEntries(objectEntries)
return objectExceedMaxSize(obj)
}

const objectExceedMaxSize = (obj: unknown): boolean => {
return sizeof(obj) > MAX_LOG_SIZE
}

const isObject = (obj: unknown): obj is Record<string, unknown> => {
return typeof obj === 'object' && !Array.isArray(obj) && obj !== null
}

const bySizeDesc = (a: [string, unknown], b: [string, unknown]): number => {
return sizeof(b[1]) - sizeof(a[1])
}
17 changes: 4 additions & 13 deletions packages/engine/src/lib/services/files.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export function createFilesService({ stepName, type, flowId }: { stepName: strin
case 'local':
return writeLocalFile({ stepName, fileName, data });
case 'memory':
return writeMemoryFile({ stepName, fileName, data });
return writeMemoryFile({ fileName, data });
}
}
}
Expand All @@ -28,17 +28,8 @@ export function isMemoryFilePath(dbPath: unknown): boolean {
if (!isString(dbPath)) {
return false;
}
return dbPath.startsWith(MEMORY_PREFIX_URL);
}

export async function compressMemoryFileString(path: string) {
try {
const file = await handleAPFile(path);
return MEMORY_PREFIX_URL + file.filename
} catch (e) {
console.error(e);
return path;
}
return dbPath.startsWith(MEMORY_PREFIX_URL);
}

export function isApFilePath(dbPath: unknown): boolean {
Expand All @@ -60,7 +51,7 @@ export async function handleAPFile(path: string) {
}
}

async function writeMemoryFile({ stepName, fileName, data }: { stepName: string, fileName: string, data: Buffer }): Promise<string> {
async function writeMemoryFile({ fileName, data }: { fileName: string, data: Buffer }): Promise<string> {
try {
const base64Data = data.toString('base64');
const base64String = JSON.stringify({ fileName, data: base64Data });
Expand Down Expand Up @@ -139,4 +130,4 @@ async function readLocalFile(absolutePath: string): Promise<ApFile> {
const filename = absolutePath.split('/').pop()!;
const extension = filename.split('.').pop()!;
return new ApFile(filename, buffer, extension);
}
}
4 changes: 2 additions & 2 deletions packages/engine/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import { triggerHelper } from './lib/helper/trigger-helper';
import { Piece } from '@activepieces/pieces-framework';
import { VariableService } from './lib/services/variable-service';
import { testExecution } from './lib/helper/test-execution-context';
import { loggerUtils } from './lib/helper/logging-utils';
import { loggingUtils } from './lib/helper/logging-utils';

const initFlowExecutor = (input: ExecuteFlowOperation): FlowExecutor => {
const { flowVersion } = input
Expand Down Expand Up @@ -105,7 +105,7 @@ const executeFlow = async (input?: ExecuteFlowOperation): Promise<void> => {

writeOutput({
status: EngineResponseStatus.OK,
response: await loggerUtils.trimExecution(output)
response: await loggingUtils.trimExecution(output)
})
} catch (e) {
console.error(e);
Expand Down
37 changes: 37 additions & 0 deletions packages/engine/test/helper/logging-utils.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import {
ActionType,
ExecutionOutput,
ExecutionOutputStatus,
ExecutionState,
StepOutputStatus,
} from '@activepieces/shared'
import { loggingUtils } from '../../src/lib/helper/logging-utils'

describe('Logging Utils', () => {
it('Should not truncate whole step if its log size exceeds limit', async () => {
// arrange
const mockStepOutput = {
type: ActionType.CODE,
status: StepOutputStatus.SUCCEEDED,
input: {
a: 'a'.repeat(2197100),
},
}

const mockExecutionState = new ExecutionState()
mockExecutionState.insertStep(mockStepOutput, 'mockStep', [])

const mockExecutionOutput: ExecutionOutput = {
status: ExecutionOutputStatus.SUCCEEDED,
executionState: mockExecutionState,
duration: 10,
tasks: 10,
}

// act
const result = await loggingUtils.trimExecution(mockExecutionOutput)

// assert
expect(result.executionState.steps['mockStep'].input).toHaveProperty<string>('a', '(truncated)')
})
})

0 comments on commit 415faed

Please sign in to comment.