![]() Server : Apache System : Linux server2.corals.io 4.18.0-348.2.1.el8_5.x86_64 #1 SMP Mon Nov 15 09:17:08 EST 2021 x86_64 User : corals ( 1002) PHP Version : 7.4.33 Disable Function : exec,passthru,shell_exec,system Directory : /home/corals/mautic.corals.io/app/bundles/WebhookBundle/Command/ |
<?php namespace Mautic\WebhookBundle\Command; use Mautic\CoreBundle\Helper\CoreParametersHelper; use Mautic\WebhookBundle\Model\WebhookModel; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; /** * CLI Command to process queued webhook payloads. */ class ProcessWebhookQueuesCommand extends Command { public const COMMAND_NAME = 'mautic:webhooks:process'; public function __construct( private CoreParametersHelper $coreParametersHelper, private WebhookModel $webhookModel ) { parent::__construct(); } protected function configure() { $this->setName(self::COMMAND_NAME) ->addOption( '--webhook-id', '-i', InputOption::VALUE_OPTIONAL, 'Process payload for a specific webhook. If not specified, all webhooks will be processed.', null ) ->addOption( '--min-id', null, InputOption::VALUE_OPTIONAL, 'Sets the minimum webhook queue ID to process (so called range mode).', null ) ->addOption( '--max-id', null, InputOption::VALUE_OPTIONAL, 'Sets the maximum webhook queue ID to process (so called range mode).', null ); } protected function execute(InputInterface $input, OutputInterface $output): int { // check to make sure we are in queue mode if ($this->coreParametersHelper->get('queue_mode') != $this->webhookModel::COMMAND_PROCESS) { $output->writeLn('Webhook Bundle is in immediate process mode. To use the command function change to command mode.'); return Command::SUCCESS; } $id = $input->getOption('webhook-id'); $minId = (int) $input->getOption('min-id'); $maxId = (int) $input->getOption('max-id'); if ($id) { $webhook = $this->webhookModel->getEntity($id); $webhooks = (null !== $webhook && $webhook->isPublished()) ? [$id => $webhook] : []; $queueRangeMode = $minId && $maxId; } else { // make sure we only get published webhook entities $webhooks = $this->webhookModel->getEntities( [ 'filter' => [ 'force' => [ [ 'column' => 'e.isPublished', 'expr' => 'eq', 'value' => 1, ], ], ], ] ); } if (!count($webhooks)) { $output->writeln('<error>No published webhooks found. Try again later.</error>'); return Command::FAILURE; } $output->writeLn('<info>Processing Webhooks</info>'); try { if ($queueRangeMode) { $webhookLimit = $this->webhookModel->getWebhookLimit(); if (1 > $webhookLimit) { throw new \InvalidArgumentException('`webhook limit` parameter must be greater than zero.'); } for (; $minId <= $maxId; $minId += $webhookLimit) { $this->webhookModel ->setMinQueueId($minId) ->setMaxQueueId(min($minId + $webhookLimit - 1, $maxId)); $this->webhookModel->processWebhook(current($webhooks)); } } else { $this->webhookModel->processWebhooks($webhooks); } } catch (\Exception $e) { $output->writeLn('<error>'.$e->getMessage().'</error>'); $output->writeLn('<error>'.$e->getTraceAsString().'</error>'); return Command::FAILURE; } $output->writeLn('<info>Webhook Processing Complete</info>'); return Command::SUCCESS; } protected static $defaultDescription = 'Process queued webhook payloads'; }