Skip to content

Commit f96cf62

Browse files
committedJan 7, 2025·
refactor: improve worker process management and logging
- Add support for forked process PID file management - Implement proper PID file handling for child processes - Add setForked() method to track forked state - Enhance debug logging in WorkerSafeScriptsCore - Improve error handling and response management in WorkerApiCommands - Add child process timeout handling - Refactor request processing into smaller, more maintainable methods - Follow PSR-12 code style standards - Add comprehensive English comments
1 parent 38cefbd commit f96cf62

File tree

4 files changed

+347
-111
lines changed

4 files changed

+347
-111
lines changed
 

‎src/Core/Workers/Cron/WorkerSafeScriptsCore.php

+5
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ private function processStartBatch(array $batch, array &$childPids): void
263263
if ($pid === 0) {
264264
// Child process
265265
try {
266+
$this->setForked();
266267
$this->checkWorkerByType($workerConfig['type'], $workerConfig['worker']);
267268
exit(0);
268269
} catch (Throwable $e) {
@@ -291,6 +292,7 @@ private function processBatch(array $batch, array &$childPids): void
291292
if ($pid === 0) {
292293
// Child process
293294
try {
295+
$this->setForked();
294296
SystemMessages::sysLogMsg(
295297
__CLASS__,
296298
"Restarting worker: $worker",
@@ -464,8 +466,11 @@ public function checkWorkerBeanstalk(string $workerClassName): void
464466
$result = false;
465467
if ($WorkerPID !== '') {
466468
// Ping the worker via Beanstalk queue.
469+
SystemMessages::sysLogMsg(__METHOD__, "Service $workerClassName is alive. Sending ping request.", LOG_DEBUG);
467470
$queue = new BeanstalkClient($this->makePingTubeName($workerClassName));
468471
[$result] = $queue->sendRequest('ping', 5, 1);
472+
SystemMessages::sysLogMsg(__METHOD__, "Service $workerClassName answered $result", LOG_DEBUG);
473+
469474
}
470475
if (false === $result) {
471476
Processes::processPHPWorker($workerClassName);

‎src/Core/Workers/WorkerBase.php

+87-45
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,13 @@ abstract class WorkerBase extends Injectable implements WorkerInterface
8989

9090
private const string LOG_NAMESPACE_SEPARATOR = '\\';
9191

92+
/**
93+
* Flag indicating whether the worker is a forked process
94+
*
95+
* @var bool
96+
*/
97+
private bool $isForked = false;
98+
9299
/**
93100
* Maximum number of processes that can be created
94101
*
@@ -203,17 +210,52 @@ protected function setWorkerState(int $state): void
203210
*
204211
* @throws RuntimeException If unable to write PID file
205212
*/
213+
/**
214+
* Handles PID file operations for worker processes
215+
*/
206216
private function savePidFile(): void
207217
{
208218
try {
209-
$activeProcesses = Processes::getPidOfProcess(static::class);
210-
$processes = array_filter(explode(' ', $activeProcesses));
219+
$pid = getmypid();
220+
if ($pid === false) {
221+
throw new RuntimeException('Could not get process ID');
222+
}
223+
224+
$pidFile = $this->getPidFile();
225+
$pidDir = dirname($pidFile);
211226

212-
if (count($processes) === 1) {
213-
$this->saveSinglePidFile($activeProcesses);
214-
} else {
215-
$this->saveMultiplePidFiles($processes);
227+
// Ensure PID directory exists
228+
if (!is_dir($pidDir) && !mkdir($pidDir, 0755, true)) {
229+
throw new RuntimeException("Could not create PID directory: $pidDir");
216230
}
231+
232+
// For forked processes, append the PID to the filename
233+
if (isset($this->isForked) && $this->isForked === true) {
234+
$pidFile = $this->getForkedPidFile($pid);
235+
}
236+
237+
// Use exclusive file creation to avoid race conditions
238+
$handle = fopen($pidFile, 'c+');
239+
if ($handle === false) {
240+
throw new RuntimeException("Could not open PID file: $pidFile");
241+
}
242+
243+
if (!flock($handle, LOCK_EX | LOCK_NB)) {
244+
fclose($handle);
245+
throw new RuntimeException("Could not acquire lock on PID file: $pidFile");
246+
}
247+
248+
// Write PID to file
249+
if (ftruncate($handle, 0) === false ||
250+
fwrite($handle, (string)$pid) === false) {
251+
flock($handle, LOCK_UN);
252+
fclose($handle);
253+
throw new RuntimeException("Could not write to PID file: $pidFile");
254+
}
255+
256+
flock($handle, LOCK_UN);
257+
fclose($handle);
258+
217259
} catch (Throwable $e) {
218260
SystemMessages::sysLogMsg(
219261
static::class,
@@ -225,51 +267,30 @@ private function savePidFile(): void
225267
}
226268

227269
/**
228-
* Saves a single PID to file
229-
*
230-
* @param string $pid Process ID to save
231-
* @throws RuntimeException If write fails
232-
*/
233-
private function saveSinglePidFile(string $pid): void
234-
{
235-
if (!file_put_contents($this->getPidFile(), $pid)) {
236-
throw new RuntimeException('Could not write to PID file');
237-
}
238-
}
239-
240-
/**
241-
* Saves multiple PIDs to separate files
270+
* Generate the PID file path for the worker
242271
*
243-
* @param array $processes Array of process IDs
244-
* @throws RuntimeException If write fails
272+
* @return string The path to the PID file
245273
*/
246-
private function saveMultiplePidFiles(array $processes): void
274+
public function getPidFile(): string
247275
{
248-
$pidFilesDir = dirname($this->getPidFile());
249-
$baseName = (string)pathinfo($this->getPidFile(), PATHINFO_BASENAME);
250-
$pidFile = $pidFilesDir . '/' . $baseName;
251-
252-
// Delete old PID files
253-
$rm = Util::which('rm');
254-
Processes::mwExec("$rm -rf $pidFile*");
255-
256-
foreach ($processes as $index => $process) {
257-
$pidFilePath = sprintf("%s-%d%s", $pidFile, $index + 1, self::PID_FILE_SUFFIX);
258-
if (!file_put_contents($pidFilePath, $process)) {
259-
throw new RuntimeException("Could not write to PID file: $pidFilePath");
260-
}
261-
}
276+
$name = str_replace("\\", '-', static::class);
277+
return self::PID_FILE_DIR . "/$name" . self::PID_FILE_SUFFIX;
262278
}
263279

264280
/**
265-
* Generate the PID file path for the worker
281+
* Generates PID file path for forked processes
266282
*
267-
* @return string The path to the PID file
283+
* @param int $pid Process ID
284+
* @return string Full path to the PID file
268285
*/
269-
public function getPidFile(): string
286+
private function getForkedPidFile(int $pid): string
270287
{
271-
$name = str_replace("\\", '-', static::class);
272-
return self::PID_FILE_DIR . "/$name" . self::PID_FILE_SUFFIX;
288+
$basePidFile = $this->getPidFile();
289+
return sprintf(
290+
'%s.%d',
291+
$basePidFile,
292+
$pid
293+
);
273294
}
274295

275296
/**
@@ -425,14 +446,27 @@ private function logErrorShutdown(string $processTitle, float $timeElapsedSecs,
425446
}
426447

427448
/**
428-
* Cleans up PID file during shutdown
449+
* Safely cleans up PID file during shutdown
429450
*/
430451
private function cleanupPidFile(): void
431452
{
432453
try {
433-
$pidFile = $this->getPidFile();
454+
$pid = getmypid();
455+
if ($pid === false) {
456+
return;
457+
}
458+
459+
// Determine which PID file to remove
460+
$pidFile = isset($this->isForked) && $this->isForked === true
461+
? $this->getForkedPidFile($pid)
462+
: $this->getPidFile();
463+
464+
// Only remove the file if it exists and contains our PID
434465
if (file_exists($pidFile)) {
435-
unlink($pidFile);
466+
$storedPid = file_get_contents($pidFile);
467+
if ($storedPid === (string)$pid) {
468+
unlink($pidFile);
469+
}
436470
}
437471
} catch (Throwable $e) {
438472
SystemMessages::sysLogMsg(
@@ -541,6 +575,14 @@ public function makePingTubeName(string $workerClassName): string
541575
return Text::camelize("ping_$workerClassName", '\\');
542576
}
543577

578+
/**
579+
* Sets flag for forked process after pcntl_fork()
580+
*/
581+
public function setForked(): void
582+
{
583+
$this->isForked = true;
584+
}
585+
544586
/**
545587
* Destructor - ensures PID file is saved on object destruction
546588
*/

‎src/Core/Workers/WorkerPrepareAdvice.php

+1
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ public function start(array $argv): void
130130
} elseif ($pid == 0) {
131131
// Child process.
132132
try {
133+
$this->setForked();
133134
$this->processAdvice($adviceType);
134135
} catch (Throwable $e) {
135136
CriticalErrorsHandler::handleExceptionWithSyslog($e);

‎src/PBXCoreREST/Workers/WorkerApiCommands.php

+254-66
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
namespace MikoPBX\PBXCoreREST\Workers;
2222

23+
use InvalidArgumentException;
24+
use JsonException;
2325
use MikoPBX\Common\Handlers\CriticalErrorsHandler;
2426
use MikoPBX\Common\Providers\BeanstalkConnectionWorkerApiProvider;
2527
use MikoPBX\Core\System\{BeanstalkClient, Configs\BeanstalkConf, Directories, Processes, SystemMessages};
@@ -28,8 +30,8 @@
2830
use MikoPBX\PBXCoreREST\Lib\PBXApiResult;
2931
use MikoPBX\PBXCoreREST\Lib\PbxExtensionsProcessor;
3032
use MikoPBX\PBXCoreREST\Lib\SystemManagementProcessor;
33+
use RuntimeException;
3134
use Throwable;
32-
3335
use function xdebug_break;
3436

3537
require_once 'Globals.php';
@@ -47,11 +49,9 @@
4749
class WorkerApiCommands extends WorkerBase
4850
{
4951
/**
50-
* The maximum parallel worker processes
51-
*
52-
* @var int
52+
* Maximum time to wait for child process (seconds)
5353
*/
54-
public int $maxProc = 4;
54+
private const int CHILD_PROCESS_TIMEOUT = 60;
5555

5656

5757
/**
@@ -92,77 +92,265 @@ public function prepareAnswer(BeanstalkClient $message): void
9292
// Fork failed
9393
throw new \RuntimeException("Failed to fork a new process.");
9494
}
95+
9596
if ($pid === 0) {
96-
$res = new PBXApiResult();
97-
$res->processor = __METHOD__;
98-
$async = false;
9997
try {
100-
$request = json_decode($message->getBody(), true, 512, JSON_THROW_ON_ERROR);
101-
$async = ($request['async'] ?? false) === true;
102-
$processor = $request['processor'];
103-
$res->processor = $processor;
104-
// Old style, we can remove it in 2025
105-
if ($processor === 'modules') {
106-
$processor = PbxExtensionsProcessor::class;
107-
}
98+
// Child process
99+
$this->setForked();
100+
$this->processRequest($message);
101+
} catch (Throwable $e) {
102+
CriticalErrorsHandler::handleExceptionWithSyslog($e);
103+
exit(1); // Exit with error
104+
}
105+
exit(0);
106+
}
108107

109-
$this->breakpointHere($request);
110-
111-
// This is the child process
112-
if (method_exists($processor, 'callback')) {
113-
cli_set_process_title(__CLASS__ . '-' . $request['action']);
114-
// Execute async job
115-
if ($async) {
116-
$res->success = true;
117-
$res->messages['info'][] = "The async job {$request['action']} starts in background, you will receive answer on {$request['asyncChannelId']} nchan channel";
118-
$encodedResult = json_encode($res->getResult());
119-
$message->reply($encodedResult);
120-
$processor::callback($request);
121-
} else {
122-
$res = $processor::callback($request);
123-
}
124-
} else {
125-
$res->success = false;
126-
$res->messages['error'][] = "Unknown processor - $processor in prepareAnswer";
127-
}
128-
} catch (Throwable $exception) {
129-
$request = [];
130-
$this->needRestart = true;
131-
// Prepare answer with pretty error description
132-
$res->messages['error'][] = CriticalErrorsHandler::handleExceptionWithSyslog($exception);
133-
} finally {
134-
if ($async === false) {
135-
$encodedResult = json_encode($res->getResult());
136-
if ($encodedResult === false) {
137-
$res->data = [];
138-
$res->messages['error'][] = 'It is impossible to encode to json current processor answer';
139-
$encodedResult = json_encode($res->getResult());
140-
}
108+
// Parent process
109+
$startTime = time();
110+
$status = 0;
141111

142-
// Check the response size and write in on file if it bigger than Beanstalk can digest
143-
if (strlen($encodedResult) > BeanstalkConf::JOB_DATA_SIZE_LIMIT) {
144-
$downloadCacheDir = Directories::getDir(Directories::WWW_DOWNLOAD_CACHE_DIR);
145-
$filenameTmp = $downloadCacheDir . '/temp-' . __FUNCTION__ . '_' . microtime() . '.data';
146-
if (file_put_contents($filenameTmp, serialize($res->getResult()))) {
147-
$encodedResult = json_encode([BeanstalkClient::RESPONSE_IN_FILE => $filenameTmp]);
148-
} else {
149-
$res->data = [];
150-
$res->messages['error'][] = 'It is impossible to write answer into file ' . $filenameTmp;
151-
$encodedResult = json_encode($res->getResult());
152-
}
112+
// Wait for child with timeout
113+
while (time() - $startTime < self::CHILD_PROCESS_TIMEOUT) {
114+
$res = pcntl_waitpid($pid, $status, WNOHANG);
115+
if ($res === -1) {
116+
throw new RuntimeException("Failed to wait for child process");
117+
}
118+
if ($res > 0) {
119+
// Child process completed
120+
if (pcntl_wifexited($status)) {
121+
$exitStatus = pcntl_wexitstatus($status);
122+
if ($exitStatus !== 0) {
123+
throw new RuntimeException("Child process failed with status: $exitStatus");
153124
}
154-
$message->reply($encodedResult);
125+
return;
155126
}
156-
if ($res->success) {
157-
$this->checkNeedReload($request);
127+
if (pcntl_wifsignaled($status)) {
128+
$signal = pcntl_wtermsig($status);
129+
throw new RuntimeException("Child process terminated by signal: $signal");
158130
}
131+
return;
132+
}
133+
usleep(100000); // Sleep 100ms
134+
}
135+
136+
// Timeout reached
137+
posix_kill($pid, SIGTERM);
138+
throw new RuntimeException("Child process timed out");
139+
}
140+
141+
/**
142+
* Process individual API request
143+
*
144+
* @param BeanstalkClient $message The message from beanstalk queue
145+
*
146+
* @throws JsonException If JSON parsing fails
147+
* @throws RuntimeException|Throwable If processor execution fails
148+
*/
149+
private function processRequest(BeanstalkClient $message): void
150+
{
151+
$res = new PBXApiResult();
152+
$res->processor = __METHOD__;
153+
try {
154+
// Parse request JSON
155+
$request = $this->parseRequestJson($message);
156+
157+
// Setup basic request parameters
158+
$async = (bool)($request['async'] ?? false);
159+
$processor = $this->resolveProcessor($request);
160+
161+
$res->processor = $processor;
162+
163+
// Old style, we can remove it in 2025
164+
if ($processor === 'modules') {
165+
$processor = PbxExtensionsProcessor::class;
166+
}
167+
168+
// Handle debug mode if needed
169+
$this->handleDebugMode($request);
170+
171+
// Process the request
172+
if (!method_exists($processor, 'callback')) {
173+
throw new RuntimeException("Unknown processor - {$processor}");
174+
}
175+
176+
cli_set_process_title(__CLASS__ . '-' . $request['action']);
177+
178+
// Execute request based on async flag
179+
if ($async) {
180+
$this->handleAsyncRequest($message, $request, $res);
181+
} else {
182+
$res = $processor::callback($request);
183+
$this->sendResponse($message, $res);
184+
}
185+
186+
// Check if reload is needed after successful execution
187+
if ($res->success) {
188+
$this->checkNeedReload($request);
189+
}
190+
191+
} catch (JsonException $e) {
192+
$this->handleError($res, "Invalid JSON in request: {$e->getMessage()}");
193+
$this->sendResponse($message, $res);
194+
} catch (InvalidArgumentException $e) {
195+
$this->handleError($res, "Invalid request parameters: {$e->getMessage()}");
196+
$this->sendResponse($message, $res);
197+
} catch (Throwable $e) {
198+
$this->handleError($res, CriticalErrorsHandler::handleExceptionWithSyslog($e));
199+
$this->sendResponse($message, $res);
200+
throw $e; // Re-throw for parent process to handle
201+
}
202+
}
203+
204+
/**
205+
* Parse and validate request JSON
206+
*
207+
* @param BeanstalkClient $message
208+
* @return array
209+
* @throws JsonException
210+
*/
211+
private function parseRequestJson(BeanstalkClient $message): array
212+
{
213+
$request = json_decode(
214+
$message->getBody(),
215+
true,
216+
512,
217+
JSON_THROW_ON_ERROR
218+
);
219+
220+
if (!is_array($request)) {
221+
throw new InvalidArgumentException('Request must be a JSON object');
222+
}
223+
224+
return $request;
225+
}
226+
227+
/**
228+
* Resolve processor class name
229+
*
230+
* @param array $request
231+
* @return string
232+
* @throws InvalidArgumentException
233+
*/
234+
private function resolveProcessor(array $request): string
235+
{
236+
$processor = $request['processor'] ?? '';
237+
238+
// Handle legacy 'modules' processor name
239+
if ($processor === 'modules') {
240+
return PbxExtensionsProcessor::class;
241+
}
242+
243+
if (empty($processor)) {
244+
throw new InvalidArgumentException('Processor name is required');
245+
}
246+
247+
return $processor;
248+
}
249+
250+
/**
251+
* Handle asynchronous request execution
252+
*
253+
* @param BeanstalkClient $message
254+
* @param array $request
255+
* @param PBXApiResult $res
256+
*/
257+
private function handleAsyncRequest(
258+
BeanstalkClient $message,
259+
array $request,
260+
PBXApiResult $res
261+
): void
262+
{
263+
$res->success = true;
264+
$res->messages['info'][] = sprintf(
265+
'The async job %s starts in background, you will receive answer on %s nchan channel',
266+
$request['action'],
267+
$request['asyncChannelId']
268+
);
269+
270+
$this->sendResponse($message, $res);
271+
$request['processor']::callback($request);
272+
}
273+
274+
/**
275+
* Send response back through beanstalk
276+
*
277+
* @param BeanstalkClient $message
278+
* @param PBXApiResult $res
279+
* @throws RuntimeException
280+
*/
281+
private function sendResponse(BeanstalkClient $message, PBXApiResult $res): void
282+
{
283+
try {
284+
$result = $res->getResult();
285+
$encodedResult = json_encode($result);
286+
287+
if ($encodedResult === false) {
288+
$res->data = [];
289+
$res->messages['error'][] = 'Failed to encode response to JSON';
290+
$encodedResult = json_encode($res->getResult());
291+
}
292+
293+
// Handle large responses
294+
if (strlen($encodedResult) > BeanstalkConf::JOB_DATA_SIZE_LIMIT) {
295+
$encodedResult = $this->handleLargeResponse($result);
159296
}
160-
exit(0); // Exit the child process
297+
298+
$message->reply($encodedResult);
299+
300+
} catch (Throwable $e) {
301+
throw new RuntimeException(
302+
"Failed to send response: {$e->getMessage()}",
303+
0,
304+
$e
305+
);
306+
}
307+
}
308+
309+
/**
310+
* Handle large response by storing in temporary file
311+
*
312+
* @param array $result
313+
* @return string JSON encoded response with file reference
314+
* @throws RuntimeException
315+
*/
316+
private function handleLargeResponse(array $result): string
317+
{
318+
$downloadCacheDir = Directories::getDir(Directories::WWW_DOWNLOAD_CACHE_DIR);
319+
320+
// Generate unique filename using uniqid()
321+
$filenameTmp = sprintf(
322+
'%s/temp-%s_%s.data',
323+
$downloadCacheDir,
324+
__FUNCTION__,
325+
uniqid('', true)
326+
);
327+
328+
// Check available disk space
329+
if (disk_free_space($downloadCacheDir) < strlen(serialize($result))) {
330+
throw new RuntimeException('Insufficient disk space for temporary file');
331+
}
332+
333+
if (!file_put_contents($filenameTmp, serialize($result))) {
334+
throw new RuntimeException("Failed to write response to temporary file");
161335
}
162-
// This is the parent process
163-
pcntl_wait($status); // Wait for the child process to complete
336+
337+
return json_encode([BeanstalkClient::RESPONSE_IN_FILE => $filenameTmp]);
338+
}
339+
340+
/**
341+
* Handle error cases
342+
*
343+
* @param PBXApiResult $res
344+
* @param string $message
345+
*/
346+
private function handleError(PBXApiResult $res, string $message): void
347+
{
348+
$res->success = false;
349+
$res->messages['error'][] = $message;
350+
$res->data = [];
164351
}
165352

353+
166354
/**
167355
* Checks if the module or worker needs to be reloaded.
168356
*
@@ -230,7 +418,7 @@ private function getNeedRestartActions(): array
230418
* ...
231419
* });
232420
*/
233-
private function breakpointHere(array $request): void
421+
private function handleDebugMode(array $request): void
234422
{
235423
if (isset($request['debug']) && $request['debug'] === true && extension_loaded('xdebug')) {
236424
if (function_exists('xdebug_connect_to_client')) {

0 commit comments

Comments
 (0)
Please sign in to comment.