From 5ddb14147b4f6e3cdd936a1f475ca2c00a78755b Mon Sep 17 00:00:00 2001 From: Marien Fressinaud Date: Mon, 8 Feb 2021 14:15:11 +0100 Subject: [PATCH] Add a job worker system to docker-compose --- .github/workflows/ci.yml | 2 +- cli | 29 ++++++++++++++++------- docker/Dockerfile.php | 2 +- docker/docker-compose.yml | 14 +++++++++++ lib/Minz | 2 +- src/Routes.php | 1 + src/cli/JobsWorker.php | 45 ++++++++++++++++++++++++++++++++++++ src/cli/System.php | 1 + tests/cli/JobsWorkerTest.php | 31 +++++++++++++++++++++++++ 9 files changed, 116 insertions(+), 11 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5096d520..4444e7dd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -47,7 +47,7 @@ jobs: with: php-version: ${{ matrix.php-versions }} coverage: xdebug - extensions: intl, gettext, gd, pdo, pdo_pgsql + extensions: intl, gettext, pcntl, gd, pdo, pdo_pgsql ini-values: browscap=${{ github.workspace }}/docker/lite_php_browscap.ini - name: Setup locales diff --git a/cli b/cli index 342eb64f..cbbf643c 100755 --- a/cli +++ b/cli @@ -55,15 +55,28 @@ try { $application = new \flusio\cli\Application(); $response = $application->run($request); -// Display the content -$output = $response->render(); -if ($output) { - echo $output . "\n"; -} +if ($response instanceof Generator) { + // This is used by the JobsWorker#watch method in order to provide a + // long-running service. + foreach ($response as $response_part) { + $output = $response_part->render(); + if ($output) { + echo $output . "\n"; + } + } -$code = $response->code(); -if ($code >= 200 && $code < 300) { exit(0); } else { - exit(1); + // Display the content + $output = $response->render(); + if ($output) { + echo $output . "\n"; + } + + $code = $response->code(); + if ($code >= 200 && $code < 300) { + exit(0); + } else { + exit(1); + } } diff --git a/docker/Dockerfile.php b/docker/Dockerfile.php index 2f409277..a62654d6 100644 --- a/docker/Dockerfile.php +++ b/docker/Dockerfile.php @@ -16,7 +16,7 @@ && pecl install xdebug \ && docker-php-ext-configure intl \ && docker-php-ext-configure gd --with-webp --with-jpeg --with-freetype \ - && docker-php-ext-install -j$(nproc) intl gettext zip pdo pdo_pgsql gd \ + && docker-php-ext-install -j$(nproc) intl gettext pcntl zip pdo pdo_pgsql gd \ && docker-php-ext-enable xdebug \ && echo "xdebug.mode=coverage" >> /usr/local/etc/php/conf.d/docker-php-ext-xdebug.ini; diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 62e8bb90..24093367 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -19,6 +19,20 @@ services: links: - database + job_worker: + image: flusio_php:dev + build: + context: . + dockerfile: Dockerfile.php + restart: unless-stopped + command: php ./cli --request /jobs/watch + volumes: + - ..:/var/www/html:z + - composer:/tmp + user: $USER + links: + - database + database: image: postgres:12-alpine restart: unless-stopped diff --git a/lib/Minz b/lib/Minz index ecce82ae..22953539 160000 --- a/lib/Minz +++ b/lib/Minz @@ -1 +1 @@ -Subproject commit ecce82ae5e2fea7c72cbc696d232e5a88b20edd9 +Subproject commit 229535390c97fc0d37230ab0ccfe5181d6113a16 diff --git a/src/Routes.php b/src/Routes.php index 389d13a4..e5d1c4ee 100644 --- a/src/Routes.php +++ b/src/Routes.php @@ -134,6 +134,7 @@ public static function loadCli($router) $router->addRoute('cli', '/links/refresh', 'cli/Links#refresh'); $router->addRoute('cli', '/jobs/run', 'cli/JobsWorker#run'); + $router->addRoute('cli', '/jobs/watch', 'cli/JobsWorker#watch'); self::load($router); } diff --git a/src/cli/JobsWorker.php b/src/cli/JobsWorker.php index db88ecfd..87bb7cf1 100644 --- a/src/cli/JobsWorker.php +++ b/src/cli/JobsWorker.php @@ -46,4 +46,49 @@ public function run($request) return Response::text(200, "job#{$db_job['id']}: done"); } + + /** + * Start a job worker which call `run()` in a loop. This action should be + * called via a systemd service, or as any other kind of "init" service. + * + * Responses are yield during the lifetime of the action. + * + * @response 204 If no job to run + * @response 500 If we took a job but we can't lock it + * @response 200 + */ + public function watch($request) + { + \pcntl_async_signals(true); + \pcntl_signal(SIGTERM, [$this, 'stopWatch']); + \pcntl_signal(SIGINT, [$this, 'stopWatch']); + \pcntl_signal(SIGALRM, [$this, 'stopWatch']); // used for tests + $this->exit_watch = false; + + yield Response::text(200, '[Job worker started]'); + + while (true) { + yield $this->run($request); + + if (!$this->exit_watch) { + sleep(5); + } + + // exit_watch can be set to true during sleep(), so don't merge the + // two conditions with a "else"! + if ($this->exit_watch) { + break; + } + } + + yield Response::text(200, '[Job worker stopped]'); + } + + /** + * Handler to catch signals and stop the worker. + */ + private function stopWatch() + { + $this->exit_watch = true; + } } diff --git a/src/cli/System.php b/src/cli/System.php index 305a8539..460c0acb 100644 --- a/src/cli/System.php +++ b/src/cli/System.php @@ -20,6 +20,7 @@ public function usage() $usage .= ' / Show this help' . "\n"; $usage .= ' /database/status Return the status of the DB connection' . "\n"; $usage .= ' /jobs/run Execute one waiting job' . "\n"; + $usage .= ' /jobs/watch Wait and execute jobs' . "\n"; $usage .= ' /links/refresh Refresh the oldest links (only illustration images)' . "\n"; $usage .= ' [-pnumber=NUMBER] where NUMBER is the number of links to refresh (default is 10)' . "\n"; $usage .= ' /subscriptions/sync Synchronize the overdue subscriptions (or nearly overdue)' . "\n"; diff --git a/tests/cli/JobsWorkerTest.php b/tests/cli/JobsWorkerTest.php index e66b656b..bf703b58 100644 --- a/tests/cli/JobsWorkerTest.php +++ b/tests/cli/JobsWorkerTest.php @@ -125,4 +125,35 @@ public function testRunKeepsFailingJobs() $this->assertStringContainsString('I failed you :(', $db_job['last_error']); $this->assertSame($now->getTimestamp(), $failed_at->getTimestamp()); } + + public function testWatchRendersCorrectly() + { + $job_dao = new models\dao\Job(); + $token = $this->create('token'); + $user_id = $this->create('user', [ + 'validation_token' => $token, + ]); + $job_id = $this->create('job', [ + 'perform_at' => \Minz\Time::ago(1, 'second')->format(\Minz\Model::DATETIME_FORMAT), + 'locked_at' => null, + 'handler' => json_encode([ + 'job_class' => 'flusio\jobs\Mailer', + 'job_args' => ['Users', 'sendAccountValidationEmail', $user_id], + ]), + ]); + + \pcntl_alarm(3); // the worker will get a SIGALRM signal and stop in 3s + $response_generator = $this->appRun('cli', '/jobs/watch'); + + $response = $response_generator->current(); + $this->assertResponse($response, 200, '[Job worker started]'); + $response_generator->next(); + $response = $response_generator->current(); + $this->assertResponse($response, 200, "job#{$job_id}: done"); + $response_generator->next(); + $response = $response_generator->current(); + $this->assertResponse($response, 200, '[Job worker stopped]'); + $this->assertEmailsCount(1); + $this->assertSame(0, $job_dao->count()); + } }