Source of file ServerEndpoint.php
Size: 5,114 Bytes - Last Modified: 2021-01-12T22:04:13+00:00
C:/Users/MAKS/Code/_PROJECTS/amqp-agent/src/RPC/ServerEndpoint.php
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
Covered by 3 test(s):
54
Covered by 3 test(s):
5556
Covered by 3 test(s):
57
Covered by 2 test(s):
5859
Covered by 2 test(s):
60
Covered by 2 test(s):
61
Covered by 2 test(s):
62
Covered by 2 test(s):
63
Covered by 2 test(s):
64
Covered by 2 test(s):
656667
Covered by 2 test(s):
68
Covered by 2 test(s):
69
Covered by 2 test(s):
70
Covered by 2 test(s):
717273
Covered by 2 test(s):
74
Covered by 2 test(s):
75
Covered by 2 test(s):
76
Covered by 2 test(s):
77
Covered by 2 test(s):
78
Covered by 2 test(s):
79
Covered by 2 test(s):
8081
Covered by 2 test(s):
82
Covered by 2 test(s):
838485
Covered by 2 test(s):
86
Covered by 2 test(s):
878889
Covered by 1 test(s):
909192
Covered by 1 test(s):
93949596979899100101102103104105
Covered by 1 test(s):
106107108109110111112113114115116
Covered by 2 test(s):
117118
Covered by 2 test(s):
119
Covered by 2 test(s):
120
Covered by 2 test(s):
121
Covered by 2 test(s):
122123
Covered by 2 test(s):
124
Covered by 1 test(s):
125
Covered by 1 test(s):
126
Covered by 1 test(s):
127
Covered by 1 test(s):
128129130131132
Covered by 1 test(s):
133
Covered by 1 test(s):
134
Covered by 1 test(s):
135136
Covered by 1 test(s):
137138
Covered by 1 test(s):
139
Covered by 1 test(s):
140
Covered by 1 test(s):
141
Covered by 1 test(s):
142143144
Covered by 1 test(s):
145146
Covered by 1 test(s):
147
Covered by 1 test(s):
148149150151152153154155156
Covered by 1 test(s):
157158159
| <?php /** * @author Marwan Al-Soltany <MarwanAlsoltany@gmail.com> * @copyright Marwan Al-Soltany 2020 * For the full copyright and license information, please view * the LICENSE file that was distributed with this source code. */ declare(strict_types=1); namespace MAKS\AmqpAgent\RPC; use PhpAmqpLib\Message\AMQPMessage; use MAKS\AmqpAgent\Helper\ClassProxy; use MAKS\AmqpAgent\RPC\AbstractEndpoint; use MAKS\AmqpAgent\RPC\ServerEndpointInterface; use MAKS\AmqpAgent\Exception\RPCEndpointException; /** * A class specialized in responding. Implementing only the methods needed for a server. * * Example: * ``` * $serverEndpoint = new ServerEndpoint(); * $serverEndpoint->on('some.event', function () { ... }); * $serverEndpoint->connect(); * $serverEndpoint->respond('Namespace\SomeClass::someMethod', 'queue.name'); * $serverEndpoint->disconnect(); * ``` * * @since 2.0.0 * @api */ class ServerEndpoint extends AbstractEndpoint implements ServerEndpointInterface { /** * The callback to use when processing the requests. * @var callable */ protected $callback; /** * Listens on requests coming via the passed queue and processes them with the passed callback. * @param callable|null $callback [optional] The callback to process the request. This callback will be passed an `AMQPMessage` and must return a string. * @param string|null $queueName [optional] The name of the queue to listen on. * @return string The last processed request. * @throws RPCEndpointException If the server is not connected yet or if the passed callback didn't return a string. */ public function respond(?callable $callback = null, ?string $queueName = null): string { $this->callback = $callback ?? [$this, 'callback']; $this->queueName = $queueName ?? $this->queueName; if ($this->isConnected()) { $this->requestQueue = $this->queueName; $this->channel->queue_declare( $this->requestQueue, false, false, false, false ); $this->channel->basic_qos( null, 1, null ); $this->channel->basic_consume( $this->requestQueue, null, false, false, false, false, function ($message) { ClassProxy::call($this, 'onRequest', $message); } ); while ($this->channel->is_consuming()) { $this->channel->wait(); } return $this->requestBody; } throw new RPCEndpointException('Server is not connected yet!'); } /** * Listens on requests coming via the passed queue and processes them with the passed callback. * Alias for `self::respond()`. * @param callable|null $callback [optional] The callback to process the request. This callback will be passed an `AMQPMessage` and must return a string. * @param string|null $queueName [optional] The queue to listen on. * @return string The last processed request. * @throws RPCEndpointException If the server is not connected yet or if the passed callback didn't return a string. */ public function serve(?callable $callback = null, ?string $queueName = null): string { return $this->respond($callback, $queueName); } /** * Replies to the client. * @param AMQPMessage $request * @return void * @throws RPCEndpointException */ protected function onRequest(AMQPMessage $request): void { $this->trigger('request.on.get', [$request]); $this->requestBody = $request->body; $this->responseBody = call_user_func($this->callback, $request); $this->responseQueue = (string)$request->get('reply_to'); $this->correlationId = (string)$request->get('correlation_id'); if (!is_string($this->responseBody)) { throw new RPCEndpointException( sprintf( 'The passed processing callback must return a string, instead it returned (data-type: %s)!', gettype($this->responseBody) ) ); } $message = new AMQPMessage($this->responseBody); $message->set('correlation_id', $this->correlationId); $message->set('timestamp', time()); $this->trigger('response.before.send', [$message]); $request->getChannel()->basic_publish( $message, null, $this->responseQueue ); $request->ack(); $this->trigger('response.after.send', [$message]); } /** * Returns the final request body. This method will be ignored if a callback in `self::respond()` is specified. * @param AMQPMessage $message * @return string */ protected function callback(AMQPMessage $message): string { return $message->body; } } |