Spamworldpro Mini Shell
Spamworldpro


Server : Apache
System : Linux server2.corals.io 4.18.0-348.2.1.el8_5.x86_64 #1 SMP Mon Nov 15 09:17:08 EST 2021 x86_64
User : corals ( 1002)
PHP Version : 7.4.33
Disable Function : exec,passthru,shell_exec,system
Directory :  /home/corals/mautic.corals.io/vendor/php-amqplib/rabbitmq-bundle/RabbitMq/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : /home/corals/mautic.corals.io/vendor/php-amqplib/rabbitmq-bundle/RabbitMq/MultipleConsumer.php
<?php

namespace OldSound\RabbitMqBundle\RabbitMq;

use OldSound\RabbitMqBundle\Provider\QueuesProviderInterface;
use OldSound\RabbitMqBundle\RabbitMq\Exception\QueueNotFoundException;
use PhpAmqpLib\Message\AMQPMessage;

class MultipleConsumer extends Consumer
{
    protected $queues = [];

    /**
     * Queues provider
     *
     * @var QueuesProviderInterface|null
     */
    protected $queuesProvider = null;

    /**
     * Context the consumer runs in
     *
     * @var string
     */
    protected $context = null;

    /**
     * QueuesProvider setter
     *
     * @param QueuesProviderInterface $queuesProvider
     *
     * @return self
     */
    public function setQueuesProvider(QueuesProviderInterface $queuesProvider)
    {
        $this->queuesProvider = $queuesProvider;
        return $this;
    }

    public function getQueueConsumerTag($queue)
    {
        return sprintf('%s-%s', $this->getConsumerTag(), $queue);
    }

    public function setQueues(array $queues)
    {
        $this->queues = $queues;
    }

    public function setContext($context)
    {
        $this->context = $context;
    }

    protected function setupConsumer()
    {
        $this->mergeQueues();

        if ($this->autoSetupFabric) {
            $this->setupFabric();
        }

        foreach ($this->queues as $name => $options) {
            //PHP 5.3 Compliant
            $currentObject = $this;

            $this->getChannel()->basic_consume($name, $this->getQueueConsumerTag($name), false, $this->consumerOptions['no_ack'], false, false, function (AMQPMessage $msg) use ($currentObject, $name) {
                $currentObject->processQueueMessage($name, $msg);
            });
        }
    }

    protected function queueDeclare()
    {
        foreach ($this->queues as $name => $options) {
            [$queueName, , ] = $this->getChannel()->queue_declare(
                $name,
                $options['passive'],
                $options['durable'],
                $options['exclusive'],
                $options['auto_delete'],
                $options['nowait'],
                $options['arguments'],
                $options['ticket']
            );

            if (isset($options['routing_keys']) && count($options['routing_keys']) > 0) {
                foreach ($options['routing_keys'] as $routingKey) {
                    $this->queueBind($queueName, $this->exchangeOptions['name'], $routingKey, $options['arguments'] ?? []);
                }
            } else {
                $this->queueBind($queueName, $this->exchangeOptions['name'], $this->routingKey, $options['arguments'] ?? []);
            }
        }

        $this->queueDeclared = true;
    }

    public function processQueueMessage($queueName, AMQPMessage $msg)
    {
        if (!isset($this->queues[$queueName])) {
            throw new QueueNotFoundException();
        }

        $this->processMessageQueueCallback($msg, $queueName, $this->queues[$queueName]['callback']);
    }

    public function stopConsuming()
    {
        foreach ($this->queues as $name => $options) {
            $this->getChannel()->basic_cancel($this->getQueueConsumerTag($name), false, true);
        }
    }

    /**
     * Merges static and provided queues into one array
     */
    protected function mergeQueues()
    {
        if ($this->queuesProvider) {
            $this->queues = array_merge(
                $this->queues,
                $this->queuesProvider->getQueues()
            );
        }
    }
}

Spamworldpro Mini