Skip to content

Commit fbc5c60

Browse files
committed
Merge 'mateusz/pid-monitor' into old-pulls
Closes chrisboulton/php-resque#334
2 parents 28d46d5 + c38f047 commit fbc5c60

File tree

7 files changed

+164
-23
lines changed

7 files changed

+164
-23
lines changed

HOWITWORKS.md

+22-19
Original file line numberDiff line numberDiff line change
@@ -106,51 +106,54 @@ How do the workers process the queues?
106106
8. `Resque_Job->fail()` returns control to the worker (still in
107107
`Resque_Worker::work()`) without a value
108108
- Job
109-
1. The job calls `Resque_Worker->perform()` with the `Resque_Job` as
109+
1. `Resque_Job_PID` is created, registering the PID of the actual process
110+
doing the job.
111+
2. The job calls `Resque_Worker->perform()` with the `Resque_Job` as
110112
its only argument.
111-
2. `Resque_Worker->perform()` sets up a `try...catch` block so it can
113+
3. `Resque_Worker->perform()` sets up a `try...catch` block so it can
112114
properly handle exceptions by marking jobs as failed (by calling
113115
`Resque_Job->fail()`, as above)
114-
3. Inside the `try...catch`, `Resque_Worker->perform()` triggers an
116+
4. Inside the `try...catch`, `Resque_Worker->perform()` triggers an
115117
`afterFork` event
116-
4. Still inside the `try...catch`, `Resque_Worker->perform()` calls
118+
5. Still inside the `try...catch`, `Resque_Worker->perform()` calls
117119
`Resque_Job->perform()` with no arguments
118-
5. `Resque_Job->perform()` calls `Resque_Job->getInstance()` with no
120+
6. `Resque_Job->perform()` calls `Resque_Job->getInstance()` with no
119121
arguments
120-
6. If `Resque_Job->getInstance()` has already been called, it returns
122+
7. If `Resque_Job->getInstance()` has already been called, it returns
121123
the existing instance; otherwise:
122-
7. `Resque_Job->getInstance()` checks that the job's class (type)
124+
8. `Resque_Job->getInstance()` checks that the job's class (type)
123125
exists and has a `perform()` method; if not, in either case, it
124126
throws an exception which will be caught by
125127
`Resque_Worker->perform()`
126-
8. `Resque_Job->getInstance()` creates an instance of the job's class,
128+
9. `Resque_Job->getInstance()` creates an instance of the job's class,
127129
and initializes it with a reference to the `Resque_Job` itself, the
128130
job's arguments (which it gets by calling
129131
`Resque_Job->getArguments()`, which in turn simply returns the value
130132
of `args[0]`, or an empty array if no arguments were passed), and
131133
the queue name
132-
9. `Resque_Job->getInstance()` returns control, along with the job
134+
10. `Resque_Job->getInstance()` returns control, along with the job
133135
class instance, to `Resque_Job->perform()`
134-
10. `Resque_Job->perform()` sets up its own `try...catch` block to
136+
11. `Resque_Job->perform()` sets up its own `try...catch` block to
135137
handle `Resque_Job_DontPerform` exceptions; any other exceptions are
136138
passed up to `Resque_Worker->perform()`
137-
11. `Resque_Job->perform()` triggers a `beforePerform` event
138-
12. `Resque_Job->perform()` calls `setUp()` on the instance, if it
139+
12. `Resque_Job->perform()` triggers a `beforePerform` event
140+
13. `Resque_Job->perform()` calls `setUp()` on the instance, if it
139141
exists
140-
13. `Resque_Job->perform()` calls `perform()` on the instance
141-
14. `Resque_Job->perform()` calls `tearDown()` on the instance, if it
142+
14. `Resque_Job->perform()` calls `perform()` on the instance
143+
15. `Resque_Job->perform()` calls `tearDown()` on the instance, if it
142144
exists
143-
15. `Resque_Job->perform()` triggers an `afterPerform` event
144-
16. The `try...catch` block ends, suppressing `Resque_Job_DontPerform`
145+
16. `Resque_Job->perform()` triggers an `afterPerform` event
146+
17. The `try...catch` block ends, suppressing `Resque_Job_DontPerform`
145147
exceptions by returning control, and the value `FALSE`, to
146148
`Resque_Worker->perform()`; any other situation returns the value
147149
`TRUE` along with control, instead
148-
17. The `try...catch` block in `Resque_Worker->perform()` ends
149-
18. `Resque_Worker->perform()` updates the job status from `RUNNING` to
150+
18. The `try...catch` block in `Resque_Worker->perform()` ends
151+
19. `Resque_Worker->perform()` updates the job status from `RUNNING` to
150152
`COMPLETE`, then returns control, with no value, to the worker
151153
(again still in `Resque_Worker::work()`)
152-
19. `Resque_Worker::work()` calls `exit(0)` to terminate the job process
154+
20. `Resque_Job_PID()` is removed, the forked process will terminate soon
153155
cleanly
156+
21. `Resque_Worker::work()` calls `exit(0)` to terminate the job process
154157
- SPECIAL CASE: Non-forking OS (Windows)
155158
1. Same as the job above, except it doesn't call `exit(0)` when done
156159
7. `Resque_Worker::work()` calls `Resque_Worker->doneWorking()` with no

README.md

+13
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,19 @@ Statuses are available for up to 24 hours after a job has completed or failed,
181181
and are then automatically expired. A status can also forcefully be expired by
182182
calling the `stop()` method on a status class.
183183

184+
### Obtaining job PID ###
185+
186+
You can obtain the PID of the actual process doing the work through `Resque_Job_PID`. On a forking OS this will be the
187+
PID of the forked process.
188+
189+
CAUTION: on a non-forking OS, the PID returned will be of the worker itself.
190+
191+
```php
192+
echo Resque_Job_PID::get($token);
193+
```
194+
195+
Function returns `0` if the `perform` hasn't started yet, or if it has already ended.
196+
184197
## Workers
185198

186199
Workers work in the exact same way as the Ruby workers. For complete

lib/Resque/Job/PID.php

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
<?php
2+
/**
3+
* PID tracker for the forked worker job.
4+
*
5+
* @package Resque/Job
6+
* @author Chris Boulton <[email protected]>
7+
* @license http://www.opensource.org/licenses/mit-license.php
8+
*/
9+
class Resque_Job_PID
10+
{
11+
/**
12+
* Create a new PID tracker item for the supplied job ID.
13+
*
14+
* @param string $id The ID of the job to track the PID of.
15+
*/
16+
public static function create($id)
17+
{
18+
Resque::redis()->set('job:' . $id . ':pid', (string)getmypid());
19+
}
20+
21+
/**
22+
* Fetch the PID for the process actually executing the job.
23+
*
24+
* @param string $id The ID of the job to get the PID of.
25+
*
26+
* @return int PID of the process doing the job (on non-forking OS, PID of the worker, otherwise forked PID).
27+
*/
28+
public static function get($id)
29+
{
30+
return (int)Resque::redis()->get('job:' . $id . ':pid');
31+
}
32+
33+
/**
34+
* Remove the PID tracker for the job.
35+
*
36+
* @param string $id The ID of the job to remove the tracker from.
37+
*/
38+
public static function del($id)
39+
{
40+
Resque::redis()->del('job:' . $id . ':pid');
41+
}
42+
}
43+

lib/Resque/Worker.php

+17
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,17 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
220220
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
221221
$this->updateProcLine($status);
222222
$this->logger->log(Psr\Log\LogLevel::INFO, $status);
223+
224+
if(!empty($job->payload['id'])) {
225+
Resque_Job_PID::create($job->payload['id']);
226+
}
227+
223228
$this->perform($job);
229+
230+
if(!empty($job->payload['id'])) {
231+
Resque_Job_PID::del($job->payload['id']);
232+
}
233+
224234
if ($this->child === 0) {
225235
exit(0);
226236
}
@@ -437,6 +447,13 @@ public function shutdownNow()
437447
$this->killChild();
438448
}
439449

450+
/**
451+
* @return int Child process PID.
452+
*/
453+
public function getChildPID() {
454+
return $this->child;
455+
}
456+
440457
/**
441458
* Kill a forked child job immediately. The job it is processing will not
442459
* be completed.

test/Resque/Tests/JobPIDTest.php

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
<?php
2+
/**
3+
* Resque_Job_PID tests.
4+
*
5+
* @package Resque/Tests
6+
* @author Chris Boulton <[email protected]>
7+
* @license http://www.opensource.org/licenses/mit-license.php
8+
*/
9+
class Resque_Tests_JobPIDTest extends Resque_Tests_TestCase
10+
{
11+
/**
12+
* @var \Resque_Worker
13+
*/
14+
protected $worker;
15+
16+
public function setUp()
17+
{
18+
parent::setUp();
19+
20+
// Register a worker to test with
21+
$this->worker = new Resque_Worker('jobs');
22+
$this->worker->setLogger(new Resque_Log());
23+
}
24+
25+
public function testQueuedJobDoesNotReturnPID()
26+
{
27+
$token = Resque::enqueue('jobs', 'Test_Job', null, true);
28+
$this->assertEquals(0, Resque_Job_PID::get($token));
29+
}
30+
31+
public function testRunningJobReturnsPID()
32+
{
33+
// Cannot use InProgress_Job on non-forking OS.
34+
if(!function_exists('pcntl_fork')) return;
35+
36+
$token = Resque::enqueue('jobs', 'InProgress_Job', null, true);
37+
$this->worker->work(0);
38+
$this->assertNotEquals(0, Resque_Job_PID::get($token));
39+
}
40+
41+
public function testFinishedJobDoesNotReturnPID()
42+
{
43+
$token = Resque::enqueue('jobs', 'Test_Job', null, true);
44+
$this->worker->work(0);
45+
$this->assertEquals(0, Resque_Job_PID::get($token));
46+
}
47+
}

test/Resque/Tests/JobStatusTest.php

+4-4
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@
88
*/
99
class Resque_Tests_JobStatusTest extends Resque_Tests_TestCase
1010
{
11-
/**
12-
* @var \Resque_Worker
13-
*/
14-
protected $worker;
11+
/**
12+
* @var \Resque_Worker
13+
*/
14+
protected $worker;
1515

1616
public function setUp()
1717
{

test/bootstrap.php

+18
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,24 @@ public function perform()
109109
}
110110
}
111111

112+
/**
113+
* This job exits the forked worker process, which simulates the job being (forever) in progress,
114+
* so that we can verify the state of the system for "running jobs". Does not work on a non-forking OS.
115+
*
116+
* CAUTION Use this test job only with Worker::work, i.e. only when you actually trigger the fork in tests.
117+
*/
118+
class InProgress_Job
119+
{
120+
public function perform()
121+
{
122+
if(!function_exists('pcntl_fork')) {
123+
// We can't lose the worker on a non-forking OS.
124+
throw new Failing_Job_Exception('Do not use InProgress_Job for tests on non-forking OS!');
125+
}
126+
exit(0);
127+
}
128+
}
129+
112130
class Test_Job_Without_Perform_Method
113131
{
114132

0 commit comments

Comments
 (0)