-
Notifications
You must be signed in to change notification settings - Fork 21
Add Subprocess Job Execution Support #178
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 2.x
Are you sure you want to change the base?
Changes from all commits
3ce6055
3ccfd9a
487b954
aa87aee
8d7a9ec
106562b
1320cf6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,222 @@ | ||
| <?php | ||
| declare(strict_types=1); | ||
|
|
||
| /** | ||
| * CakePHP(tm) : Rapid Development Framework (https://cakephp.org) | ||
| * Copyright (c) Cake Software Foundation, Inc. (https://cakefoundation.org/) | ||
| * | ||
| * Licensed under The MIT License | ||
| * For full copyright and license information, please see the LICENSE.txt | ||
| * Redistributions of files must retain the above copyright notice. | ||
| * | ||
| * @copyright Copyright (c) Cake Software Foundation, Inc. (https://cakefoundation.org/) | ||
| * @link https://cakephp.org CakePHP(tm) Project | ||
| * @since 2.2.0 | ||
| * @license https://opensource.org/licenses/MIT MIT License | ||
| */ | ||
| namespace Cake\Queue\Command; | ||
|
|
||
| use Cake\Command\Command; | ||
| use Cake\Console\Arguments; | ||
| use Cake\Console\ConsoleIo; | ||
| use Cake\Core\ContainerInterface; | ||
| use Cake\Log\Engine\ConsoleLog; | ||
| use Cake\Log\Log; | ||
| use Cake\Queue\Job\Message; | ||
| use Cake\Queue\Queue\Processor; | ||
| use Enqueue\Null\NullConnectionFactory; | ||
| use Enqueue\Null\NullMessage; | ||
| use Interop\Queue\Message as QueueMessage; | ||
| use Interop\Queue\Processor as InteropProcessor; | ||
| use JsonException; | ||
| use Psr\Log\LoggerInterface; | ||
| use Psr\Log\NullLogger; | ||
| use RuntimeException; | ||
| use Throwable; | ||
|
|
||
| /** | ||
| * Subprocess job runner command. | ||
| * Executes a single job in an isolated subprocess. | ||
| */ | ||
| class SubprocessJobRunnerCommand extends Command | ||
| { | ||
| /** | ||
| * @param \Cake\Core\ContainerInterface|null $container DI container instance | ||
| */ | ||
| public function __construct( | ||
| protected readonly ?ContainerInterface $container = null, | ||
| ) { | ||
| } | ||
|
|
||
| /** | ||
| * Get the command name. | ||
| * | ||
| * @return string | ||
| */ | ||
| public static function defaultName(): string | ||
| { | ||
| return 'queue subprocess_runner'; | ||
| } | ||
|
|
||
| /** | ||
| * Execute a single job from STDIN and output result to STDOUT. | ||
| * | ||
| * @param \Cake\Console\Arguments $args Arguments | ||
| * @param \Cake\Console\ConsoleIo $io ConsoleIo | ||
| * @return int | ||
| */ | ||
| public function execute(Arguments $args, ConsoleIo $io): int | ||
| { | ||
| $input = $this->readInput($io); | ||
|
|
||
| if ($input === '') { | ||
| $this->outputResult($io, [ | ||
| 'success' => false, | ||
| 'error' => 'No input received', | ||
| ]); | ||
|
|
||
| return self::CODE_ERROR; | ||
| } | ||
|
|
||
| try { | ||
| $data = json_decode($input, true, 512, JSON_THROW_ON_ERROR); | ||
| } catch (JsonException $jsonException) { | ||
| $this->outputResult($io, [ | ||
| 'success' => false, | ||
| 'error' => 'Invalid JSON input: ' . $jsonException->getMessage(), | ||
| ]); | ||
|
|
||
| return self::CODE_ERROR; | ||
| } | ||
|
|
||
| try { | ||
| $result = $this->executeJob($data); | ||
| $this->outputResult($io, [ | ||
| 'success' => true, | ||
| 'result' => $result, | ||
| ]); | ||
|
|
||
| return self::CODE_SUCCESS; | ||
| } catch (Throwable $throwable) { | ||
| $this->outputResult($io, [ | ||
| 'success' => false, | ||
| 'result' => InteropProcessor::REQUEUE, | ||
| 'exception' => [ | ||
| 'class' => get_class($throwable), | ||
| 'message' => $throwable->getMessage(), | ||
| 'code' => $throwable->getCode(), | ||
| 'file' => $throwable->getFile(), | ||
| 'line' => $throwable->getLine(), | ||
| 'trace' => $throwable->getTraceAsString(), | ||
| ], | ||
| ]); | ||
|
|
||
| return self::CODE_SUCCESS; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Read input from STDIN or ConsoleIo | ||
| * | ||
| * @param \Cake\Console\ConsoleIo $io ConsoleIo | ||
| * @return string | ||
| */ | ||
| protected function readInput(ConsoleIo $io): string | ||
| { | ||
| $input = ''; | ||
| while (!feof(STDIN)) { | ||
| $chunk = fread(STDIN, 8192); | ||
| if ($chunk === false) { | ||
| break; | ||
| } | ||
|
|
||
| $input .= $chunk; | ||
| } | ||
|
Comment on lines
+127
to
+134
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we read input from |
||
|
|
||
| return $input; | ||
| } | ||
|
|
||
| /** | ||
| * Execute the job with the provided data. | ||
| * | ||
| * @param array<string, mixed> $data Job data | ||
| * @return string | ||
| */ | ||
| protected function executeJob(array $data): string | ||
| { | ||
| $connectionFactory = new NullConnectionFactory(); | ||
| $context = $connectionFactory->createContext(); | ||
|
|
||
| $messageClass = $data['messageClass'] ?? NullMessage::class; | ||
|
|
||
| // Validate message class for security | ||
| if (!class_exists($messageClass) || !is_subclass_of($messageClass, QueueMessage::class)) { | ||
| throw new RuntimeException(sprintf('Invalid message class: %s', $messageClass)); | ||
| } | ||
|
|
||
| $messageBody = json_encode($data['body']); | ||
|
|
||
| /** @var \Interop\Queue\Message $queueMessage */ | ||
| $queueMessage = new $messageClass($messageBody); | ||
|
|
||
| if (isset($data['properties']) && is_array($data['properties'])) { | ||
| foreach ($data['properties'] as $key => $value) { | ||
| $queueMessage->setProperty($key, $value); | ||
| } | ||
| } | ||
|
|
||
| $logger = $this->configureLogging($data); | ||
|
|
||
| $message = new Message($queueMessage, $context, $this->container); | ||
| $processor = new Processor($logger, $this->container); | ||
|
|
||
| $result = $processor->processMessage($message); | ||
|
|
||
| // Result is string|object (with __toString) | ||
| /** @phpstan-ignore cast.string */ | ||
| return is_string($result) ? $result : (string)$result; | ||
| } | ||
|
|
||
| /** | ||
| * Configure logging to use STDERR to prevent job logs from contaminating STDOUT. | ||
| * Reconfigures all CakePHP loggers to write to STDERR with no additional formatting. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why? Won't that break logging outputs to files/syslog log drain services? |
||
| * | ||
| * @param array<string, mixed> $data Job data | ||
| * @return \Psr\Log\LoggerInterface | ||
| */ | ||
| protected function configureLogging(array $data): LoggerInterface | ||
| { | ||
| // Drop all existing loggers to prevent duplicate logging | ||
| foreach (Log::configured() as $loggerName) { | ||
| Log::drop($loggerName); | ||
| } | ||
|
|
||
| // Configure a single stderr logger | ||
| Log::setConfig('default', [ | ||
| 'className' => ConsoleLog::class, | ||
| 'stream' => 'php://stderr', | ||
| ]); | ||
|
|
||
| $logger = Log::engine('default'); | ||
| if (!$logger instanceof LoggerInterface) { | ||
| $logger = new NullLogger(); | ||
| } | ||
|
|
||
| return $logger; | ||
| } | ||
|
|
||
| /** | ||
| * Output result as JSON to STDOUT. | ||
| * | ||
| * @param \Cake\Console\ConsoleIo $io ConsoleIo | ||
| * @param array<string, mixed> $result Result data | ||
| * @return void | ||
| */ | ||
| protected function outputResult(ConsoleIo $io, array $result): void | ||
| { | ||
| $json = json_encode($result); | ||
| if ($json !== false) { | ||
| $io->out($json); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's 512? Can we use the union of the flag constants?