Skip to content

Commit

Permalink
fix: piece local dev
Browse files Browse the repository at this point in the history
  • Loading branch information
abuaboud committed Oct 16, 2024
1 parent e39857b commit 60e75b3
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 41 deletions.
5 changes: 2 additions & 3 deletions packages/server/api/src/app/worker.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { logger, system, WorkerSystemProps } from '@activepieces/server-shared'
import { isNil, WorkerMachineType } from '@activepieces/shared'
import { FastifyInstance } from 'fastify'
import { flowWorker } from 'server-worker'
import { flowWorker, piecesBuilder } from 'server-worker'
import { accessTokenManager } from './authentication/lib/access-token-manager'
import { piecesBuilder } from './pieces/piece-metadata-service/pieces-builder'


export const setupWorker = async (app: FastifyInstance): Promise<void> => {
Expand All @@ -13,7 +12,7 @@ export const setupWorker = async (app: FastifyInstance): Promise<void> => {
app.addHook('onClose', async () => {
await flowWorker.close()
})
await piecesBuilder(app.io)
await piecesBuilder(app, app.io)
}
export async function workerPostBoot(): Promise<void> {
logger.info('Worker started')
Expand Down
3 changes: 2 additions & 1 deletion packages/server/worker/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export * from './lib/piece-manager/development/pieces-builder'
export * from './lib/engine/engine-runner'
export { engineRunner } from './lib/engine'
export * from './lib/executors/flow-job-executor'
export * from './lib/utils/webhook-utils'
export * from './lib/flow-worker'
export * from './lib/job-polling'
export * from './lib/job-polling'
15 changes: 11 additions & 4 deletions packages/server/worker/src/lib/api/server-api.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@ export const workerApiService = (workerToken: string) => {
return {
async heartbeat(): Promise<void> {
const request: WorkerMachineHealthcheckRequest = await heartbeat.getSystemInfo()
await client.post('/v1/worker-machines/heartbeat', request)
try {
await client.post('/v1/worker-machines/heartbeat', request)
} catch (error) {
if (ApAxiosClient.isApAxiosError(error) && error.error.code === 'ECONNREFUSED') {
return;
}
throw error;
}
},
async poll(queueName: QueueName): Promise<ApQueueJob | null> {
try {
Expand All @@ -28,14 +35,14 @@ export const workerApiService = (workerToken: string) => {
params: request,
})
return response
}
}
catch (error) {
await new Promise((resolve) => setTimeout(resolve, 2000))
return null
}
},
async resumeRun(request: ResumeRunRequest): Promise<void> {
await client.post<unknown>('/v1/workers/resume-run', request)
await client.post<unknown>('/v1/workers/resume-run', request)
},
async deleteWebhookSimulation(request: DeleteWebhookSimulationRequest): Promise<void> {
await client.post('/v1/workers/delete-webhook-simulation', request)
Expand All @@ -44,7 +51,7 @@ export const workerApiService = (workerToken: string) => {
await client.post('/v1/workers/save-payloads', request)
},
async startRuns(request: SubmitPayloadsRequest): Promise<FlowRun[]> {
return client.post<FlowRun[]>('/v1/workers/submit-payloads', request)
return client.post<FlowRun[]>('/v1/workers/submit-payloads', request)

},
async sendWebhookUpdate(request: SendWebhookUpdateRequest): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
import { spawn } from 'child_process'
import { Server } from 'http'
import { resolve } from 'path'
import { AppSystemProp, filePiecesUtils, logger, SharedSystemProp, system } from '@activepieces/server-shared'
import { ApLock, AppSystemProp, filePiecesUtils, logger, memoryLock, SharedSystemProp, system } from '@activepieces/server-shared'
import { assertNotNullOrUndefined, debounce, WebsocketClientEvent } from '@activepieces/shared'
import { Mutex } from 'async-mutex'
import chalk from 'chalk'
import chokidar from 'chokidar'
import { FastifyInstance } from 'fastify'

const mutex = new Mutex()
const packages = system.get(AppSystemProp.DEV_PIECES)?.split(',') || []
const isFilePieces = system.getOrThrow(SharedSystemProp.PIECES_SOURCE) === 'FILE'
export const PIECES_BUILDER_MUTEX_KEY = 'pieces-builder'

async function handleFileChange(piecePackageName: string, io: Server): Promise<void> {
logger.info(
chalk.blueBright.bold(
'👀 Detected changes in pieces. Waiting... 👀 ' + piecePackageName,
),
)
let lock: ApLock | undefined
try {
await mutex.acquire()
lock = await memoryLock.acquire(PIECES_BUILDER_MUTEX_KEY)

logger.info(chalk.blue.bold('🤌 Building pieces... 🤌'))
if (!/^[a-z0-9-]+$/.test(piecePackageName)) {
Expand All @@ -32,7 +33,9 @@ async function handleFileChange(piecePackageName: string, io: Server): Promise<v
logger.info(error, chalk.red.bold('Failed to run build process...'))
}
finally {
mutex.release()
if (lock) {
await lock.release()
}
logger.info(
chalk.green.bold(
'✨ Changes are ready! Please refresh the frontend to see the new updates. ✨',
Expand All @@ -59,10 +62,12 @@ async function runCommandWithLiveOutput(cmd: string): Promise<void> {
})
}

export async function piecesBuilder(io: Server): Promise<void> {
export async function piecesBuilder(app: FastifyInstance, io: Server): Promise<void> {
// Only run this script if the pieces source is file
if (!isFilePieces) return

const watchers: chokidar.FSWatcher[] = []

for (const packageName of packages) {
logger.info(chalk.blue(`Starting watch for package: ${packageName}`))

Expand All @@ -75,11 +80,28 @@ export async function piecesBuilder(io: Server): Promise<void> {
handleFileChange(piecePackageName, io).catch(logger.error)
}, 2000)

chokidar.watch(resolve(pieceDirectory), { ignored: /^\./, persistent: true }).on('all', (event, path) => {
const watcher = chokidar.watch(resolve(pieceDirectory), {
ignored: [/^\./, /node_modules/, /dist/],
persistent: true,
ignoreInitial: true,
awaitWriteFinish: {
stabilityThreshold: 2000,
pollInterval: 200
}
})
watcher.on('all', (event, path) => {
if (path.endsWith('.ts')) {
debouncedHandleFileChange()
}
})

watchers.push(watcher)
}


app.addHook('onClose', () => {
for (const watcher of watchers) {
watcher.close()
}
})
}
61 changes: 35 additions & 26 deletions packages/server/worker/src/lib/piece-manager/local-piece-manager.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { readFile, writeFile } from 'node:fs/promises'
import { join, resolve, sep } from 'node:path'
import { filePiecesUtils, logger, packageManager } from '@activepieces/server-shared'
import { ApLock, filePiecesUtils, logger, memoryLock, packageManager } from '@activepieces/server-shared'
import { assertEqual, assertNotNullOrUndefined, PackageType, PiecePackage } from '@activepieces/shared'
import { PieceManager } from './piece-manager'
import { PIECES_BUILDER_MUTEX_KEY } from './development/pieces-builder'


export class LocalPieceManager extends PieceManager {
Expand All @@ -11,34 +12,42 @@ export class LocalPieceManager extends PieceManager {
): Promise<void> {
logger.debug(params, '[linkDependencies] params')

const { projectPath, pieces } = params
const basePath = resolve(__dirname.split(`${sep}dist`)[0])
const baseLinkPath = join(
basePath,
'dist',
'packages',
'pieces',
'community',
)
let lock: ApLock | undefined
try {
lock = await memoryLock.acquire(PIECES_BUILDER_MUTEX_KEY)
const { projectPath, pieces } = params
const basePath = resolve(__dirname.split(`${sep}dist`)[0])
const baseLinkPath = join(
basePath,
'dist',
'packages',
'pieces',
'community',
)

const frameworkPackages = {
'@activepieces/pieces-common': `link:${baseLinkPath}/common`,
'@activepieces/pieces-framework': `link:${baseLinkPath}/framework`,
'@activepieces/shared': `link:${basePath}/dist/packages/shared`,
}
const frameworkPackages = {
'@activepieces/pieces-common': `link:${baseLinkPath}/common`,
'@activepieces/pieces-framework': `link:${baseLinkPath}/framework`,
'@activepieces/shared': `link:${basePath}/dist/packages/shared`,
}

await linkFrameworkPackages(projectPath, baseLinkPath, frameworkPackages)
await linkFrameworkPackages(projectPath, baseLinkPath, frameworkPackages)

for (const piece of pieces) {
assertEqual(piece.packageType, PackageType.REGISTRY, 'packageType', `Piece ${piece.pieceName} is not of type REGISTRY`)
const directoryPath = await filePiecesUtils.findDirectoryByPackageName(piece.pieceName)
assertNotNullOrUndefined(directoryPath, `directoryPath for ${piece.pieceName} is null or undefined`)
await updatePackageJson(directoryPath, frameworkPackages)
await packageManager.link({
packageName: piece.pieceName,
path: projectPath,
linkPath: directoryPath,
})
for (const piece of pieces) {
assertEqual(piece.packageType, PackageType.REGISTRY, 'packageType', `Piece ${piece.pieceName} is not of type REGISTRY`)
const directoryPath = await filePiecesUtils.findDirectoryByPackageName(piece.pieceName)
assertNotNullOrUndefined(directoryPath, `directoryPath for ${piece.pieceName} is null or undefined`)
await updatePackageJson(directoryPath, frameworkPackages)
await packageManager.link({
packageName: piece.pieceName,
path: projectPath,
linkPath: directoryPath,
})
}
} finally {
if (lock) {
await lock.release()
}
}
}
}
Expand Down

0 comments on commit 60e75b3

Please sign in to comment.