Source of file ServerEndpoint.php

Size: 5,114 Bytes - Last Modified: 2021-01-12T22:04:13+00:00

C:/Users/MAKS/Code/_PROJECTS/amqp-agent/src/RPC/ServerEndpoint.php

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
Covered by 3 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodRaisesAnExceptionIfTheServerIsNotConnectedYet
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
54
Covered by 3 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodRaisesAnExceptionIfTheServerIsNotConnectedYet
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
5556
Covered by 3 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodRaisesAnExceptionIfTheServerIsNotConnectedYet
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
57
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
5859
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
60
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
61
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
62
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
63
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
64
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
656667
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
68
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
69
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
70
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
717273
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
74
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
75
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
76
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
77
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
78
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
79
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
8081
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
82
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
838485
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
86
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
878889
Covered by 1 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
909192
Covered by 1 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodRaisesAnExceptionIfTheServerIsNotConnectedYet
93949596979899100101102103104105
Covered by 1 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
106107108109110111112113114115116
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
117118
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
119
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
120
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
121
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
122123
Covered by 2 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
124
Covered by 1 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
125
Covered by 1 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
126
Covered by 1 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
127
Covered by 1 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testAnExceptionIsRaisedIfPassedCallbackDoesNotReturnString
128129130131132
Covered by 1 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
133
Covered by 1 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
134
Covered by 1 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
135136
Covered by 1 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
137138
Covered by 1 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
139
Covered by 1 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
140
Covered by 1 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
141
Covered by 1 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
142143144
Covered by 1 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
145146
Covered by 1 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
147
Covered by 1 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
148149150151152153154155156
Covered by 1 test(s):
  • MAKS\AmqpAgent\Tests\RPC\ServerEndpointTest::testRespondMethodViaAliasMethodServe
157158159
<?php

/**
 * @author Marwan Al-Soltany <MarwanAlsoltany@gmail.com>
 * @copyright Marwan Al-Soltany 2020
 * For the full copyright and license information, please view
 * the LICENSE file that was distributed with this source code.
 */

declare(strict_types=1);

namespace MAKS\AmqpAgent\RPC;

use PhpAmqpLib\Message\AMQPMessage;
use MAKS\AmqpAgent\Helper\ClassProxy;
use MAKS\AmqpAgent\RPC\AbstractEndpoint;
use MAKS\AmqpAgent\RPC\ServerEndpointInterface;
use MAKS\AmqpAgent\Exception\RPCEndpointException;

/**
 * A class specialized in responding. Implementing only the methods needed for a server.
 *
 * Example:
 * ```
 * $serverEndpoint = new ServerEndpoint();
 * $serverEndpoint->on('some.event', function () { ... });
 * $serverEndpoint->connect();
 * $serverEndpoint->respond('Namespace\SomeClass::someMethod', 'queue.name');
 * $serverEndpoint->disconnect();
 * ```
 *
 * @since 2.0.0
 * @api
 */
class ServerEndpoint extends AbstractEndpoint implements ServerEndpointInterface
{
    /**
     * The callback to use when processing the requests.
     * @var callable
     */
    protected $callback;


    /**
     * Listens on requests coming via the passed queue and processes them with the passed callback.
     * @param callable|null $callback [optional] The callback to process the request. This callback will be passed an `AMQPMessage` and must return a string.
     * @param string|null $queueName [optional] The name of the queue to listen on.
     * @return string The last processed request.
     * @throws RPCEndpointException If the server is not connected yet or if the passed callback didn't return a string.
     */
    public function respond(?callable $callback = null, ?string $queueName = null): string
    {
        $this->callback = $callback ?? [$this, 'callback'];
        $this->queueName = $queueName ?? $this->queueName;

        if ($this->isConnected()) {
            $this->requestQueue = $this->queueName;

            $this->channel->queue_declare(
                $this->requestQueue,
                false,
                false,
                false,
                false
            );

            $this->channel->basic_qos(
                null,
                1,
                null
            );

            $this->channel->basic_consume(
                $this->requestQueue,
                null,
                false,
                false,
                false,
                false,
                function ($message) {
                    ClassProxy::call($this, 'onRequest', $message);
                }
            );

            while ($this->channel->is_consuming()) {
                $this->channel->wait();
            }

            return $this->requestBody;
        }

        throw new RPCEndpointException('Server is not connected yet!');
    }

    /**
     * Listens on requests coming via the passed queue and processes them with the passed callback.
     * Alias for `self::respond()`.
     * @param callable|null $callback [optional] The callback to process the request. This callback will be passed an `AMQPMessage` and must return a string.
     * @param string|null $queueName [optional] The queue to listen on.
     * @return string The last processed request.
     * @throws RPCEndpointException If the server is not connected yet or if the passed callback didn't return a string.
     */
    public function serve(?callable $callback = null, ?string $queueName = null): string
    {
        return $this->respond($callback, $queueName);
    }

    /**
     * Replies to the client.
     * @param AMQPMessage $request
     * @return void
     * @throws RPCEndpointException
     */
    protected function onRequest(AMQPMessage $request): void
    {
        $this->trigger('request.on.get', [$request]);

        $this->requestBody = $request->body;
        $this->responseBody = call_user_func($this->callback, $request);
        $this->responseQueue = (string)$request->get('reply_to');
        $this->correlationId = (string)$request->get('correlation_id');

        if (!is_string($this->responseBody)) {
            throw new RPCEndpointException(
                sprintf(
                    'The passed processing callback must return a string, instead it returned (data-type: %s)!',
                    gettype($this->responseBody)
                )
            );
        }

        $message = new AMQPMessage($this->responseBody);
        $message->set('correlation_id', $this->correlationId);
        $message->set('timestamp', time());

        $this->trigger('response.before.send', [$message]);

        $request->getChannel()->basic_publish(
            $message,
            null,
            $this->responseQueue
        );

        $request->ack();

        $this->trigger('response.after.send', [$message]);
    }

    /**
     * Returns the final request body. This method will be ignored if a callback in `self::respond()` is specified.
     * @param AMQPMessage $message
     * @return string
     */
    protected function callback(AMQPMessage $message): string
    {
        return $message->body;
    }
}