diff --git a/.travis.yml b/.travis.yml index 9bb4841..069d58b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,13 +1,19 @@ language: php php: - - 5.6 - - 7.0 - - 7.1 + - '7.3' + - '7.2' + - '7.1' + - '7.0' + - '5.6' - hhvm matrix: exclude: - php: hhvm env: ENABLE_REDIS_EXT=1 + allow_failures: + - php: '7.3' + - php: '7.2' + - php: hhvm env: - ENABLE_REDIS_EXT=0 - ENABLE_REDIS_EXT=1 diff --git a/HOWITWORKS.md b/HOWITWORKS.md index 58490a9..a1bb85d 100644 --- a/HOWITWORKS.md +++ b/HOWITWORKS.md @@ -106,51 +106,54 @@ How do the workers process the queues? 8. `Resque_Job->fail()` returns control to the worker (still in `Resque_Worker::work()`) without a value - Job - 1. The job calls `Resque_Worker->perform()` with the `Resque_Job` as + 1. `Resque_Job_PID` is created, registering the PID of the actual process + doing the job. + 2. The job calls `Resque_Worker->perform()` with the `Resque_Job` as its only argument. - 2. `Resque_Worker->perform()` sets up a `try...catch` block so it can + 3. `Resque_Worker->perform()` sets up a `try...catch` block so it can properly handle exceptions by marking jobs as failed (by calling `Resque_Job->fail()`, as above) - 3. Inside the `try...catch`, `Resque_Worker->perform()` triggers an + 4. Inside the `try...catch`, `Resque_Worker->perform()` triggers an `afterFork` event - 4. Still inside the `try...catch`, `Resque_Worker->perform()` calls + 5. Still inside the `try...catch`, `Resque_Worker->perform()` calls `Resque_Job->perform()` with no arguments - 5. `Resque_Job->perform()` calls `Resque_Job->getInstance()` with no + 6. `Resque_Job->perform()` calls `Resque_Job->getInstance()` with no arguments - 6. If `Resque_Job->getInstance()` has already been called, it returns + 7. If `Resque_Job->getInstance()` has already been called, it returns the existing instance; otherwise: - 7. `Resque_Job->getInstance()` checks that the job's class (type) + 8. `Resque_Job->getInstance()` checks that the job's class (type) exists and has a `perform()` method; if not, in either case, it throws an exception which will be caught by `Resque_Worker->perform()` - 8. `Resque_Job->getInstance()` creates an instance of the job's class, + 9. `Resque_Job->getInstance()` creates an instance of the job's class, and initializes it with a reference to the `Resque_Job` itself, the job's arguments (which it gets by calling `Resque_Job->getArguments()`, which in turn simply returns the value of `args[0]`, or an empty array if no arguments were passed), and the queue name - 9. `Resque_Job->getInstance()` returns control, along with the job + 10. `Resque_Job->getInstance()` returns control, along with the job class instance, to `Resque_Job->perform()` - 10. `Resque_Job->perform()` sets up its own `try...catch` block to + 11. `Resque_Job->perform()` sets up its own `try...catch` block to handle `Resque_Job_DontPerform` exceptions; any other exceptions are passed up to `Resque_Worker->perform()` - 11. `Resque_Job->perform()` triggers a `beforePerform` event - 12. `Resque_Job->perform()` calls `setUp()` on the instance, if it + 12. `Resque_Job->perform()` triggers a `beforePerform` event + 13. `Resque_Job->perform()` calls `setUp()` on the instance, if it exists - 13. `Resque_Job->perform()` calls `perform()` on the instance - 14. `Resque_Job->perform()` calls `tearDown()` on the instance, if it + 14. `Resque_Job->perform()` calls `perform()` on the instance + 15. `Resque_Job->perform()` calls `tearDown()` on the instance, if it exists - 15. `Resque_Job->perform()` triggers an `afterPerform` event - 16. The `try...catch` block ends, suppressing `Resque_Job_DontPerform` + 16. `Resque_Job->perform()` triggers an `afterPerform` event + 17. The `try...catch` block ends, suppressing `Resque_Job_DontPerform` exceptions by returning control, and the value `FALSE`, to `Resque_Worker->perform()`; any other situation returns the value `TRUE` along with control, instead - 17. The `try...catch` block in `Resque_Worker->perform()` ends - 18. `Resque_Worker->perform()` updates the job status from `RUNNING` to + 18. The `try...catch` block in `Resque_Worker->perform()` ends + 19. `Resque_Worker->perform()` updates the job status from `RUNNING` to `COMPLETE`, then returns control, with no value, to the worker (again still in `Resque_Worker::work()`) - 19. `Resque_Worker::work()` calls `exit(0)` to terminate the job process + 20. `Resque_Job_PID()` is removed, the forked process will terminate soon cleanly + 21. `Resque_Worker::work()` calls `exit(0)` to terminate the job process - SPECIAL CASE: Non-forking OS (Windows) 1. Same as the job above, except it doesn't call `exit(0)` when done 7. `Resque_Worker::work()` calls `Resque_Worker->doneWorking()` with no diff --git a/README.md b/README.md index 53d5959..c66dfa1 100644 --- a/README.md +++ b/README.md @@ -181,6 +181,19 @@ Statuses are available for up to 24 hours after a job has completed or failed, and are then automatically expired. A status can also forcefully be expired by calling the `stop()` method on a status class. +### Obtaining job PID ### + +You can obtain the PID of the actual process doing the work through `Resque_Job_PID`. On a forking OS this will be the +PID of the forked process. + +CAUTION: on a non-forking OS, the PID returned will be of the worker itself. + +```php +echo Resque_Job_PID::get($token); +``` + +Function returns `0` if the `perform` hasn't started yet, or if it has already ended. + ## Workers Workers work in the exact same way as the Ruby workers. For complete diff --git a/composer.json b/composer.json index 629472d..58d5efa 100644 --- a/composer.json +++ b/composer.json @@ -27,12 +27,6 @@ "role": "Creator" } ], - "repositories": [ - { - "type": "vcs", - "url": "https://github.com/chrisboulton/credis" - } - ], "require": { "php": ">=5.3.0", "ext-pcntl": "*", @@ -53,5 +47,10 @@ "psr-0": { "Resque": "lib" } - } + }, + "extra": { + "branch-alias": { + "dev-master": "1.0-dev" + } + } } diff --git a/demo/resque.php b/demo/resque.php index fcfe578..05d3da6 100644 --- a/demo/resque.php +++ b/demo/resque.php @@ -4,4 +4,4 @@ require 'job.php'; require 'php_error_job.php'; -require '../bin/resque'; \ No newline at end of file +require __DIR__ . '/../bin/resque'; diff --git a/lib/Resque.php b/lib/Resque.php index d03b2ec..f33c15b 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -28,6 +28,11 @@ class Resque */ protected static $redisDatabase = 0; + /** + * @var string auth of Redis database + */ + protected static $auth; + /** * Given a host/port combination separated by a colon, set it as * the redis server that Resque will talk to. @@ -37,11 +42,13 @@ class Resque * and returns a Resque_Redis instance, or * a nested array of servers with host/port pairs. * @param int $database + * @param string $auth */ - public static function setBackend($server, $database = 0) + public static function setBackend($server, $database = 0, $auth = null) { self::$redisServer = $server; self::$redisDatabase = $database; + self::$auth = $auth; self::$redis = null; } @@ -62,6 +69,10 @@ public static function redis() self::$redis = new Resque_Redis(self::$redisServer, self::$redisDatabase); } + if (!empty(self::$auth)) { + self::$redis->auth(self::$auth); + } + return self::$redis; } @@ -141,9 +152,9 @@ public static function pop($queue) public static function dequeue($queue, $items = Array()) { if(count($items) > 0) { - return self::removeItems($queue, $items); + return self::removeItems($queue, $items); } else { - return self::removeList($queue); + return self::removeList($queue); } } @@ -213,10 +224,11 @@ public static function size($queue) * @param string $class The name of the class that contains the code to execute the job. * @param array $args Any optional arguments that should be passed when the job is executed. * @param boolean $trackStatus Set to true to be able to monitor the status of a job. + * @param string $prefix The prefix needs to be set for the status key * * @return string|boolean Job ID when the job was created, false if creation was cancelled due to beforeEnqueue */ - public static function enqueue($queue, $class, $args = null, $trackStatus = false) + public static function enqueue($queue, $class, $args = null, $trackStatus = false, $prefix = "") { $id = Resque::generateJobId(); $hookParams = array( @@ -232,7 +244,7 @@ public static function enqueue($queue, $class, $args = null, $trackStatus = fals return false; } - Resque_Job::create($queue, $class, $args, $trackStatus, $id); + Resque_Job::create($queue, $class, $args, $trackStatus, $id, $prefix); Resque_Event::trigger('afterEnqueue', $hookParams); return $id; @@ -263,6 +275,20 @@ public static function queues() return $queues; } + /** + * Retrieve all the items of a queue with Redis + * + * @return array Array of items. + */ + public static function items($queue, $start = 0, $stop = -1) + { + $list = self::redis()->lrange('queue:' . $queue, $start, $stop); + if(!is_array($list)) { + $list = array(); + } + return $list; + } + /** * Remove Items from the queue * Safely moving each item to a temporary queue before processing it @@ -317,7 +343,7 @@ private static function removeItems($queue, $items = Array()) /** * matching item - * item can be ['class'] or ['class' => 'id'] or ['class' => {:foo => 1, :bar => 2}] + * item can be ['class'] or ['class' => 'id'] or ['class' => {'foo' => 1, 'bar' => 2}] * @private * * @params string $string redis result in json @@ -330,24 +356,24 @@ private static function matchItem($string, $items) $decoded = json_decode($string, true); foreach($items as $key => $val) { - # class name only ex: item[0] = ['class'] - if (is_numeric($key)) { - if($decoded['class'] == $val) { - return true; - } - # class name with args , example: item[0] = ['class' => {'foo' => 1, 'bar' => 2}] - } elseif (is_array($val)) { - $decodedArgs = (array)$decoded['args'][0]; - if ($decoded['class'] == $key && - count($decodedArgs) > 0 && count(array_diff($decodedArgs, $val)) == 0) { - return true; + # class name only ex: item[0] = ['class'] + if (is_numeric($key)) { + if($decoded['class'] == $val) { + return true; + } + # class name with args , example: item[0] = ['class' => {'foo' => 1, 'bar' => 2}] + } elseif (is_array($val)) { + $decodedArgs = (array)$decoded['args'][0]; + if ($decoded['class'] == $key && + count($decodedArgs) > 0 && count(array_diff($decodedArgs, $val)) == 0) { + return true; + } + # class name with ID, example: item[0] = ['class' => 'id'] + } else { + if ($decoded['class'] == $key && $decoded['id'] == $val) { + return true; + } } - # class name with ID, example: item[0] = ['class' => 'id'] - } else { - if ($decoded['class'] == $key && $decoded['id'] == $val) { - return true; - } - } } return false; } diff --git a/lib/Resque/Failure.php b/lib/Resque/Failure.php index deb678f..066fc6e 100644 --- a/lib/Resque/Failure.php +++ b/lib/Resque/Failure.php @@ -28,6 +28,20 @@ public static function create($payload, Exception $exception, Resque_Worker $wor new $backend($payload, $exception, $worker, $queue); } + /** + * Create a new failed job on the backend from PHP 7 errors. + * + * @param object $payload The contents of the job that has just failed. + * @param \Error $exception The PHP 7 error generated when the job failed to run. + * @param \Resque_Worker $worker Instance of Resque_Worker that was running this job when it failed. + * @param string $queue The name of the queue that this job was fetched from. + */ + public static function createFromError($payload, Error $exception, Resque_Worker $worker, $queue) + { + $backend = self::getBackend(); + new $backend($payload, $exception, $worker, $queue); + } + /** * Return an instance of the backend for saving job failures. * diff --git a/lib/Resque/Failure/Redis.php b/lib/Resque/Failure/Redis.php index 69d6872..2d16ab0 100644 --- a/lib/Resque/Failure/Redis.php +++ b/lib/Resque/Failure/Redis.php @@ -20,7 +20,7 @@ class Resque_Failure_Redis implements Resque_Failure_Interface public function __construct($payload, $exception, $worker, $queue) { $data = new stdClass; - $data->failed_at = strftime('%a %b %d %H:%M:%S %Z %Y'); + $data->failed_at = date('c'); $data->payload = $payload; $data->exception = get_class($exception); $data->error = $exception->getMessage(); diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php index 8508f76..31ad439 100755 --- a/lib/Resque/Job.php +++ b/lib/Resque/Job.php @@ -53,11 +53,12 @@ public function __construct($queue, $payload) * @param array $args Any optional arguments that should be passed when the job is executed. * @param boolean $monitor Set to true to be able to monitor the status of a job. * @param string $id Unique identifier for tracking the job. Generated if not supplied. + * @param string $prefix The prefix needs to be set for the status key * * @return string * @throws \InvalidArgumentException */ - public static function create($queue, $class, $args = null, $monitor = false, $id = null) + public static function create($queue, $class, $args = null, $monitor = false, $id = null, $prefix = "") { if (is_null($id)) { $id = Resque::generateJobId(); @@ -69,14 +70,15 @@ public static function create($queue, $class, $args = null, $monitor = false, $i ); } Resque::push($queue, array( - 'class' => $class, - 'args' => array($args), - 'id' => $id, + 'class' => $class, + 'args' => array($args), + 'id' => $id, + 'prefix' => $prefix, 'queue_time' => microtime(true), )); if($monitor) { - Resque_Job_Status::create($id); + Resque_Job_Status::create($id, $prefix); } return $id; @@ -123,14 +125,14 @@ public static function reserveBlocking(array $queues, $timeout = null) * * @param int $status Status constant from Resque_Job_Status indicating the current status of a job. */ - public function updateStatus($status) + public function updateStatus($status, $result = null) { if(empty($this->payload['id'])) { return; } - $statusInstance = new Resque_Job_Status($this->payload['id']); - $statusInstance->update($status); + $statusInstance = new Resque_Job_Status($this->payload['id'], $this->payload['prefix']); + $statusInstance->update($status, $result); } /** @@ -140,7 +142,7 @@ public function updateStatus($status) */ public function getStatus() { - $status = new Resque_Job_Status($this->payload['id']); + $status = new Resque_Job_Status($this->payload['id'], $this->payload['prefix']); return $status->get(); } @@ -183,17 +185,18 @@ public function getInstance() */ public function perform() { + $result = true; try { Resque_Event::trigger('beforePerform', $this); $instance = $this->getInstance(); - if(method_exists($instance, 'setUp')) { + if(is_callable([$instance, 'setUp'])) { $instance->setUp(); } - $instance->perform(); + $result = $instance->perform(); - if(method_exists($instance, 'tearDown')) { + if(is_callable([$instance, 'tearDown'])) { $instance->tearDown(); } @@ -201,10 +204,10 @@ public function perform() } // beforePerform/setUp have said don't perform this job. Return. catch(Resque_Job_DontPerform $e) { - return false; + $result = false; } - return true; + return $result; } /** @@ -220,12 +223,21 @@ public function fail($exception) )); $this->updateStatus(Resque_Job_Status::STATUS_FAILED); - Resque_Failure::create( - $this->payload, - $exception, - $this->worker, - $this->queue - ); + if ($exception instanceof Error) { + Resque_Failure::createFromError( + $this->payload, + $exception, + $this->worker, + $this->queue + ); + } else { + Resque_Failure::create( + $this->payload, + $exception, + $this->worker, + $this->queue + ); + } Resque_Stat::incr('failed'); Resque_Stat::incr('failed:' . $this->worker); } @@ -236,13 +248,13 @@ public function fail($exception) */ public function recreate() { - $status = new Resque_Job_Status($this->payload['id']); + $status = new Resque_Job_Status($this->payload['id'], $this->payload['prefix']); $monitor = false; if($status->isTracking()) { $monitor = true; } - return self::create($this->queue, $this->payload['class'], $this->getArguments(), $monitor); + return self::create($this->queue, $this->payload['class'], $this->getArguments(), $monitor, $this->payload['prefix']); } /** diff --git a/lib/Resque/Job/PID.php b/lib/Resque/Job/PID.php new file mode 100644 index 0000000..f72710d --- /dev/null +++ b/lib/Resque/Job/PID.php @@ -0,0 +1,43 @@ + + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Job_PID +{ + /** + * Create a new PID tracker item for the supplied job ID. + * + * @param string $id The ID of the job to track the PID of. + */ + public static function create($id) + { + Resque::redis()->set('job:' . $id . ':pid', (string)getmypid()); + } + + /** + * Fetch the PID for the process actually executing the job. + * + * @param string $id The ID of the job to get the PID of. + * + * @return int PID of the process doing the job (on non-forking OS, PID of the worker, otherwise forked PID). + */ + public static function get($id) + { + return (int)Resque::redis()->get('job:' . $id . ':pid'); + } + + /** + * Remove the PID tracker for the job. + * + * @param string $id The ID of the job to remove the tracker from. + */ + public static function del($id) + { + Resque::redis()->del('job:' . $id . ':pid'); + } +} + diff --git a/lib/Resque/Job/Status.php b/lib/Resque/Job/Status.php index 00fc40c..249ba8c 100644 --- a/lib/Resque/Job/Status.php +++ b/lib/Resque/Job/Status.php @@ -13,6 +13,11 @@ class Resque_Job_Status const STATUS_FAILED = 3; const STATUS_COMPLETE = 4; + /** + * @var string The prefix of the job status id. + */ + private $prefix; + /** * @var string The ID of the job this status class refers back to. */ @@ -37,9 +42,10 @@ class Resque_Job_Status * * @param string $id The ID of the job to manage the status for. */ - public function __construct($id) + public function __construct($id, $prefix = '') { $this->id = $id; + $this->prefix = empty($prefix) ? '' : "${prefix}_"; } /** @@ -48,14 +54,18 @@ public function __construct($id) * * @param string $id The ID of the job to monitor the status of. */ - public static function create($id) + public static function create($id, $prefix = "") { + $status = new self($id, $prefix); $statusPacket = array( - 'status' => self::STATUS_WAITING, + 'status' => self::STATUS_WAITING, 'updated' => time(), 'started' => time(), + 'result' => null, ); - Resque::redis()->set('job:' . $id . ':status', json_encode($statusPacket)); + Resque::redis()->set((string) $status, json_encode($statusPacket)); + + return $status; } /** @@ -84,15 +94,23 @@ public function isTracking() * * @param int The status of the job (see constants in Resque_Job_Status) */ - public function update($status) + public function update($status, $result = null) { + $status = (int) $status; + if(!$this->isTracking()) { return; } + if($status < self::STATUS_WAITING || $status > self::STATUS_COMPLETE) { + return; + } + $statusPacket = array( - 'status' => $status, + 'status' => $status, 'updated' => time(), + 'started' => $this->fetch('started'), + 'result' => $result, ); Resque::redis()->set((string)$this, json_encode($statusPacket)); @@ -105,23 +123,58 @@ public function update($status) /** * Fetch the status for the job being monitored. * - * @return mixed False if the status is not being monitored, otherwise the status as + * @return mixed False if the status is not being monitored, otherwise the status * as an integer, based on the Resque_Job_Status constants. */ public function get() { - if(!$this->isTracking()) { - return false; - } - - $statusPacket = json_decode(Resque::redis()->get((string)$this), true); - if(!$statusPacket) { - return false; - } + return $this->status(); + } - return $statusPacket['status']; + /** + * Fetch the status for the job being monitored. + * + * @return mixed False if the status is not being monitored, otherwise the status + * as an integer, based on the Resque_Job_Status constants. + */ + public function status() + { + return $this->fetch('status'); } + /** + * Fetch the last update timestamp of the job being monitored. + * + * @return mixed False if the job is not being monitored, otherwise the + * update timestamp. + */ + public function updated() + { + return $this->fetch('updated'); + } + + /** + * Fetch the start timestamp of the job being monitored. + * + * @return mixed False if the job is not being monitored, otherwise the + * start timestamp. + */ + public function started() + { + return $this->fetch('started'); + } + + /** + * Fetch the result of the job being monitored. + * + * @return mixed False if the job is not being monitored, otherwise the result + * as mixed + */ + public function result() + { + return $this->fetch('result'); + } + /** * Stop tracking the status of a job. */ @@ -137,6 +190,35 @@ public function stop() */ public function __toString() { - return 'job:' . $this->id . ':status'; + return 'job:' . $this->prefix . $this->id . ':status'; + } + + /** + * Fetch a value from the status packet for the job being monitored. + * + * @return mixed False if the status is not being monitored, otherwise the + * requested value from the status packet. + */ + protected function fetch($value = null) + { + if(!$this->isTracking()) { + return false; + } + + $statusPacket = json_decode(Resque::redis()->get((string)$this), true); + if(!$statusPacket) { + return false; + } + + if(empty($value)) { + return $statusPacket; + } else { + if(isset($statusPacket[$value])) { + return $statusPacket[$value]; + } else { + return null; + } + } + } } diff --git a/lib/Resque/Redis.php b/lib/Resque/Redis.php index 153bd40..cf05444 100644 --- a/lib/Resque/Redis.php +++ b/lib/Resque/Redis.php @@ -113,12 +113,15 @@ public static function prefix($namespace) public function __construct($server, $database = null, $client = null) { try { - if (is_array($server)) { - $this->driver = new Credis_Cluster($server); - } - else if (is_object($client)) { + if (is_object($client)) { $this->driver = $client; } + elseif (is_object($server)) { + $this->driver = $server; + } + elseif (is_array($server)) { + $this->driver = new Credis_Cluster($server); + } else { list($host, $port, $dsnDatabase, $user, $password, $options) = self::parseDsn($server); // $user is not used, only $password diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 3add46e..7935cec 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -11,6 +11,11 @@ */ class Resque_Worker { + /** + * @var string Prefix for the process name + */ + private static $processPrefix = 'resque'; + /** * @var LoggerInterface Logging object that impliments the PSR-3 LoggerInterface */ @@ -81,6 +86,15 @@ public function __construct($queues) $this->id = $this->hostname . ':'.getmypid() . ':' . implode(',', $this->queues); } + /** + * Set the process prefix of the workers to the given prefix string. + * @param string $prefix The new process prefix + */ + public static function setProcessPrefix($prefix) + { + self::$processPrefix = $prefix; + } + /** * Return all workers known to Resque as instantiated instances. * @return array @@ -152,19 +166,35 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) $this->updateProcLine('Starting'); $this->startup(); + if(function_exists('pcntl_signal_dispatch')) { + pcntl_signal_dispatch(); + } + while(true) { if($this->shutdown) { break; } + // is redis still alive? + try { + if (Resque::redis()->ping() === false) { + throw new CredisException('redis ping() failed'); + } + } catch (CredisException $e) { + $this->logger->log(Psr\Log\LogLevel::ERROR, 'redis went away. trying to reconnect'); + Resque::$redis = null; + usleep($interval * 1000000); + continue; + } + // Attempt to find and reserve a job $job = false; if(!$this->paused) { if($blocking === true) { $this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval)); - $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with blocking timeout ' . $interval); + $this->updateProcLine('Waiting with blocking timeout ' . $interval); } else { - $this->updateProcLine('Waiting for ' . implode(',', $this->queues) . ' with interval ' . $interval); + $this->updateProcLine('Waiting with interval ' . $interval); } $job = $this->reserve($blocking, $interval); @@ -184,7 +214,7 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) $this->updateProcLine('Paused'); } else { - $this->updateProcLine('Waiting for ' . implode(',', $this->queues)); + $this->updateProcLine('Waiting'); } usleep($interval * 1000000); @@ -199,12 +229,22 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) $this->child = Resque::fork(); - // Forked and we're the child. Run the job. - if ($this->child === 0 || $this->child === false) { + // Forked and we're the child. Or PCNTL is not installed. Run the job. + if ($this->child === 0 || $this->child === false || $this->child === -1) { $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); $this->updateProcLine($status); $this->logger->log(Psr\Log\LogLevel::INFO, $status); + + if(!empty($job->payload['id'])) { + Resque_Job_PID::create($job->payload['id']); + } + $this->perform($job); + + if(!empty($job->payload['id'])) { + Resque_Job_PID::del($job->payload['id']); + } + if ($this->child === 0) { exit(0); } @@ -217,7 +257,14 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) $this->logger->log(Psr\Log\LogLevel::INFO, $status); // Wait until the child process finishes before continuing - pcntl_wait($status); + while (pcntl_wait($status, WNOHANG) === 0) { + if(function_exists('pcntl_signal_dispatch')) { + pcntl_signal_dispatch(); + } + + // Pause for a half a second to conserve system resources + usleep(500000); + } if (pcntl_wifexited($status) !== true) { $job->fail(new Resque_Job_DirtyExitException('Job exited abnormally')); @@ -250,17 +297,23 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) */ public function perform(Resque_Job $job) { + $result = null; try { Resque_Event::trigger('afterFork', $job); - $job->perform(); + $result = $job->perform(); } catch(Exception $e) { - $this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {stack}', array('job' => $job, 'stack' => $e)); + $this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {exception}', array('job' => $job, 'exception' => $e)); + $job->fail($e); + return; + } + catch(Error $e) { + $this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {exception}', array('job' => $job, 'exception' => $e)); $job->fail($e); return; } - $job->updateStatus(Resque_Job_Status::STATUS_COMPLETE); + $job->updateStatus(Resque_Job_Status::STATUS_COMPLETE, $result); $this->logger->log(Psr\Log\LogLevel::NOTICE, '{job} has finished', array('job' => $job)); } @@ -283,6 +336,11 @@ public function reserve($blocking = false, $timeout = null) } if($blocking === true) { + if(empty($queues)){ + $this->logger->log(Psr\Log\LogLevel::INFO, 'No queue was found, sleeping for {interval}', array('interval' => $timeout)); + usleep($timeout * 1000000); + return false; + } $job = Resque_Job::reserveBlocking($queues, $timeout); if($job) { $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); @@ -344,7 +402,7 @@ private function startup() */ private function updateProcLine($status) { - $processTitle = 'resque-' . Resque::VERSION . ': ' . $status; + $processTitle = static::$processPrefix . '-' . Resque::VERSION . ' (' . implode(',', $this->queues) . '): ' . $status; if(function_exists('cli_set_process_title') && PHP_OS !== 'Darwin') { cli_set_process_title($processTitle); } @@ -415,6 +473,13 @@ public function shutdownNow() $this->killChild(); } + /** + * @return int Child process PID. + */ + public function getChildPID() { + return $this->child; + } + /** * Kill a forked child job immediately. The job it is processing will not * be completed. @@ -427,7 +492,7 @@ public function killChild() } $this->logger->log(Psr\Log\LogLevel::INFO, 'Killing child at {child}', array('child' => $this->child)); - if(exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) { + if(exec('ps -o pid,s -p ' . $this->child, $output, $returnCode) && $returnCode != 1) { $this->logger->log(Psr\Log\LogLevel::DEBUG, 'Child {child} found, killing.', array('child' => $this->child)); posix_kill($this->child, SIGKILL); $this->child = null; @@ -471,9 +536,18 @@ public function pruneDeadWorkers() public function workerPids() { $pids = array(); - exec('ps -A -o pid,command | grep [r]esque', $cmdOutput); - foreach($cmdOutput as $line) { - list($pids[],) = explode(' ', trim($line), 2); + if (strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') { + exec('WMIC path win32_process get Processid,Commandline | findstr resque | findstr /V findstr', $cmdOutput); + foreach($cmdOutput as $line) { + $line = preg_replace('/\s+/m', ' ', $line); + list(,,$pids[]) = explode(' ', trim($line), 3); + } + } + else { + exec('ps -A -o pid,args | grep [r]esque', $cmdOutput); + foreach($cmdOutput as $line) { + list($pids[],) = explode(' ', trim($line), 2); + } } return $pids; } @@ -484,7 +558,7 @@ public function workerPids() public function registerWorker() { Resque::redis()->sadd('workers', (string)$this); - Resque::redis()->set('worker:' . (string)$this . ':started', strftime('%a %b %d %H:%M:%S %Z %Y')); + Resque::redis()->set('worker:' . (string)$this . ':started', date('c')); } /** @@ -516,7 +590,7 @@ public function workingOn(Resque_Job $job) $job->updateStatus(Resque_Job_Status::STATUS_RUNNING); $data = json_encode(array( 'queue' => $job->queue, - 'run_at' => strftime('%a %b %d %H:%M:%S %Z %Y'), + 'run_at' => date('c'), 'payload' => $job->payload )); Resque::redis()->set('worker:' . $job->worker, $data); diff --git a/test/Resque/Tests/JobPIDTest.php b/test/Resque/Tests/JobPIDTest.php new file mode 100644 index 0000000..2d4a93d --- /dev/null +++ b/test/Resque/Tests/JobPIDTest.php @@ -0,0 +1,47 @@ + + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Tests_JobPIDTest extends Resque_Tests_TestCase +{ + /** + * @var \Resque_Worker + */ + protected $worker; + + public function setUp() + { + parent::setUp(); + + // Register a worker to test with + $this->worker = new Resque_Worker('jobs'); + $this->worker->setLogger(new Resque_Log()); + } + + public function testQueuedJobDoesNotReturnPID() + { + $token = Resque::enqueue('jobs', 'Test_Job', null, true); + $this->assertEquals(0, Resque_Job_PID::get($token)); + } + + public function testRunningJobReturnsPID() + { + // Cannot use InProgress_Job on non-forking OS. + if(!function_exists('pcntl_fork')) return; + + $token = Resque::enqueue('jobs', 'InProgress_Job', null, true); + $this->worker->work(0); + $this->assertNotEquals(0, Resque_Job_PID::get($token)); + } + + public function testFinishedJobDoesNotReturnPID() + { + $token = Resque::enqueue('jobs', 'Test_Job', null, true); + $this->worker->work(0); + $this->assertEquals(0, Resque_Job_PID::get($token)); + } +} diff --git a/test/Resque/Tests/JobStatusTest.php b/test/Resque/Tests/JobStatusTest.php index d751c37..8be7753 100644 --- a/test/Resque/Tests/JobStatusTest.php +++ b/test/Resque/Tests/JobStatusTest.php @@ -8,10 +8,10 @@ */ class Resque_Tests_JobStatusTest extends Resque_Tests_TestCase { - /** - * @var \Resque_Worker - */ - protected $worker; + /** + * @var \Resque_Worker + */ + protected $worker; public function setUp() { diff --git a/test/bootstrap.php b/test/bootstrap.php index e4f25fd..4d7253e 100644 --- a/test/bootstrap.php +++ b/test/bootstrap.php @@ -109,6 +109,24 @@ public function perform() } } +/** + * This job exits the forked worker process, which simulates the job being (forever) in progress, + * so that we can verify the state of the system for "running jobs". Does not work on a non-forking OS. + * + * CAUTION Use this test job only with Worker::work, i.e. only when you actually trigger the fork in tests. + */ +class InProgress_Job +{ + public function perform() + { + if(!function_exists('pcntl_fork')) { + // We can't lose the worker on a non-forking OS. + throw new Failing_Job_Exception('Do not use InProgress_Job for tests on non-forking OS!'); + } + exit(0); + } +} + class Test_Job_Without_Perform_Method {