Skip to content

Commit

Permalink
Add a job worker system to docker-compose
Browse files Browse the repository at this point in the history
  • Loading branch information
marienfressinaud committed Feb 8, 2021
1 parent 7f745a5 commit 5ddb141
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
with:
php-version: ${{ matrix.php-versions }}
coverage: xdebug
extensions: intl, gettext, gd, pdo, pdo_pgsql
extensions: intl, gettext, pcntl, gd, pdo, pdo_pgsql
ini-values: browscap=${{ github.workspace }}/docker/lite_php_browscap.ini

- name: Setup locales
Expand Down
29 changes: 21 additions & 8 deletions cli
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,28 @@ try {
$application = new \flusio\cli\Application();
$response = $application->run($request);

// Display the content
$output = $response->render();
if ($output) {
echo $output . "\n";
}
if ($response instanceof Generator) {
// This is used by the JobsWorker#watch method in order to provide a
// long-running service.
foreach ($response as $response_part) {
$output = $response_part->render();
if ($output) {
echo $output . "\n";
}
}

$code = $response->code();
if ($code >= 200 && $code < 300) {
exit(0);
} else {
exit(1);
// Display the content
$output = $response->render();
if ($output) {
echo $output . "\n";
}

$code = $response->code();
if ($code >= 200 && $code < 300) {
exit(0);
} else {
exit(1);
}
}
2 changes: 1 addition & 1 deletion docker/Dockerfile.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
&& pecl install xdebug \
&& docker-php-ext-configure intl \
&& docker-php-ext-configure gd --with-webp --with-jpeg --with-freetype \
&& docker-php-ext-install -j$(nproc) intl gettext zip pdo pdo_pgsql gd \
&& docker-php-ext-install -j$(nproc) intl gettext pcntl zip pdo pdo_pgsql gd \
&& docker-php-ext-enable xdebug \
&& echo "xdebug.mode=coverage" >> /usr/local/etc/php/conf.d/docker-php-ext-xdebug.ini;

Expand Down
14 changes: 14 additions & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,20 @@ services:
links:
- database

job_worker:
image: flusio_php:dev
build:
context: .
dockerfile: Dockerfile.php
restart: unless-stopped
command: php ./cli --request /jobs/watch
volumes:
- ..:/var/www/html:z
- composer:/tmp
user: $USER
links:
- database

database:
image: postgres:12-alpine
restart: unless-stopped
Expand Down
1 change: 1 addition & 0 deletions src/Routes.php
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public static function loadCli($router)
$router->addRoute('cli', '/links/refresh', 'cli/Links#refresh');

$router->addRoute('cli', '/jobs/run', 'cli/JobsWorker#run');
$router->addRoute('cli', '/jobs/watch', 'cli/JobsWorker#watch');

self::load($router);
}
Expand Down
45 changes: 45 additions & 0 deletions src/cli/JobsWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,49 @@ public function run($request)

return Response::text(200, "job#{$db_job['id']}: done");
}

/**
* Start a job worker which call `run()` in a loop. This action should be
* called via a systemd service, or as any other kind of "init" service.
*
* Responses are yield during the lifetime of the action.
*
* @response 204 If no job to run
* @response 500 If we took a job but we can't lock it
* @response 200
*/
public function watch($request)
{
\pcntl_async_signals(true);
\pcntl_signal(SIGTERM, [$this, 'stopWatch']);
\pcntl_signal(SIGINT, [$this, 'stopWatch']);
\pcntl_signal(SIGALRM, [$this, 'stopWatch']); // used for tests
$this->exit_watch = false;

yield Response::text(200, '[Job worker started]');

while (true) {
yield $this->run($request);

if (!$this->exit_watch) {
sleep(5);
}

// exit_watch can be set to true during sleep(), so don't merge the
// two conditions with a "else"!
if ($this->exit_watch) {
break;
}
}

yield Response::text(200, '[Job worker stopped]');
}

/**
* Handler to catch signals and stop the worker.
*/
private function stopWatch()
{
$this->exit_watch = true;
}
}
1 change: 1 addition & 0 deletions src/cli/System.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public function usage()
$usage .= ' / Show this help' . "\n";
$usage .= ' /database/status Return the status of the DB connection' . "\n";
$usage .= ' /jobs/run Execute one waiting job' . "\n";
$usage .= ' /jobs/watch Wait and execute jobs' . "\n";
$usage .= ' /links/refresh Refresh the oldest links (only illustration images)' . "\n";
$usage .= ' [-pnumber=NUMBER] where NUMBER is the number of links to refresh (default is 10)' . "\n";
$usage .= ' /subscriptions/sync Synchronize the overdue subscriptions (or nearly overdue)' . "\n";
Expand Down
31 changes: 31 additions & 0 deletions tests/cli/JobsWorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,35 @@ public function testRunKeepsFailingJobs()
$this->assertStringContainsString('I failed you :(', $db_job['last_error']);
$this->assertSame($now->getTimestamp(), $failed_at->getTimestamp());
}

public function testWatchRendersCorrectly()
{
$job_dao = new models\dao\Job();
$token = $this->create('token');
$user_id = $this->create('user', [
'validation_token' => $token,
]);
$job_id = $this->create('job', [
'perform_at' => \Minz\Time::ago(1, 'second')->format(\Minz\Model::DATETIME_FORMAT),
'locked_at' => null,
'handler' => json_encode([
'job_class' => 'flusio\jobs\Mailer',
'job_args' => ['Users', 'sendAccountValidationEmail', $user_id],
]),
]);

\pcntl_alarm(3); // the worker will get a SIGALRM signal and stop in 3s
$response_generator = $this->appRun('cli', '/jobs/watch');

$response = $response_generator->current();
$this->assertResponse($response, 200, '[Job worker started]');
$response_generator->next();
$response = $response_generator->current();
$this->assertResponse($response, 200, "job#{$job_id}: done");
$response_generator->next();
$response = $response_generator->current();
$this->assertResponse($response, 200, '[Job worker stopped]');
$this->assertEmailsCount(1);
$this->assertSame(0, $job_dao->count());
}
}

0 comments on commit 5ddb141

Please sign in to comment.