Source of file ClientEndpoint.php
Size: 5,226 Bytes - Last Modified: 2021-01-12T22:04:13+00:00
C:/Users/MAKS/Code/_PROJECTS/amqp-agent/src/RPC/ClientEndpoint.php
12345678910111213141516171819202122232425262728293031323334353637383940414243444546
Covered by 3 test(s):
4748
Covered by 3 test(s):
49
Covered by 3 test(s):
50
Covered by 3 test(s):
51
Covered by 3 test(s):
52
Covered by 3 test(s):
53
Covered by 3 test(s):
54
Covered by 3 test(s):
555657
Covered by 3 test(s):
58
Covered by 3 test(s):
59
Covered by 3 test(s):
60
Covered by 3 test(s):
61
Covered by 3 test(s):
62
Covered by 3 test(s):
63
Covered by 3 test(s):
6465
Covered by 2 test(s):
66
Covered by 3 test(s):
67686970
Covered by 3 test(s):
717273747576777879808182
Covered by 3 test(s):
83
Covered by 1 test(s):
848586
Covered by 2 test(s):
87
Covered by 2 test(s):
88
Covered by 2 test(s):
89
Covered by 2 test(s):
90
Covered by 2 test(s):
9192
Covered by 2 test(s):
93
Covered by 2 test(s):
94
Covered by 2 test(s):
95
Covered by 2 test(s):
9697
Covered by 2 test(s):
98
Covered by 2 test(s):
99
Covered by 2 test(s):
100
Covered by 2 test(s):
101
Covered by 2 test(s):
102
Covered by 2 test(s):
103104105
Covered by 2 test(s):
106107
Covered by 2 test(s):
108
Covered by 2 test(s):
109
Covered by 2 test(s):
110
Covered by 2 test(s):
111112113
Covered by 2 test(s):
114115
Covered by 2 test(s):
116
Covered by 2 test(s):
117118119
Covered by 1 test(s):
120121122123124125126127128129130131132
Covered by 1 test(s):
133134135136137138139140141142143
Covered by 2 test(s):
144145
Covered by 2 test(s):
146
Covered by 1 test(s):
147
Covered by 1 test(s):
148
Covered by 1 test(s):
149150151
Covered by 1 test(s):
152
Covered by 1 test(s):
153
Covered by 1 test(s):
154
Covered by 1 test(s):
155
Covered by 1 test(s):
156157158159160161162163164165166167
Covered by 1 test(s):
168169170
| <?php /** * @author Marwan Al-Soltany <MarwanAlsoltany@gmail.com> * @copyright Marwan Al-Soltany 2020 * For the full copyright and license information, please view * the LICENSE file that was distributed with this source code. */ declare(strict_types=1); namespace MAKS\AmqpAgent\RPC; use PhpAmqpLib\Message\AMQPMessage; use MAKS\AmqpAgent\Helper\ClassProxy; use MAKS\AmqpAgent\Helper\IDGenerator; use MAKS\AmqpAgent\RPC\AbstractEndpoint; use MAKS\AmqpAgent\RPC\ClientEndpointInterface; use MAKS\AmqpAgent\Exception\RPCEndpointException; /** * A class specialized in requesting. Implementing only the methods needed for a client. * * Example: * ``` * $clientEndpoint = new ClientEndpoint(); * $clientEndpoint->on('some.event', function () { ... }); * $clientEndpoint->connect(); * $clientEndpoint->request('Message Body', 'queue.name'); * $clientEndpoint->disconnect(); * ``` * * @since 2.0.0 * @api */ class ClientEndpoint extends AbstractEndpoint implements ClientEndpointInterface { /** * Opens a connection with RabbitMQ server. * @param array|null $connectionOptions * @return self * @throws RPCEndpointException */ public function connect(?array $connectionOptions = []) { parent::connect($connectionOptions); if ($this->isConnected()) { list($this->responseQueue, , ) = $this->channel->queue_declare( null, false, false, true, false ); $this->channel->basic_consume( $this->responseQueue, null, false, false, false, false, function ($message) { ClassProxy::call($this, 'onResponse', $message); } ); } return $this; } /** * Sends the passed request to the server using the passed queue. * @param string|AMQPMessage $request The request body or an `AMQPMessage` instance. * @param string|null $queueName [optional] The name of queue to send through. * @return string The response body. * @throws RPCEndpointException If the client is not connected yet or if request Correlation ID does not match the one of the response. */ public function request($request, ?string $queueName = null): string { if (!$this->isConnected()) { throw new RPCEndpointException('Client is not connected yet!'); } $this->queueName = $queueName ?? $this->queueName; $this->requestBody = $request instanceof AMQPMessage ? $request->body : (string)$request; $this->responseBody = null; $this->requestQueue = $this->queueName; $this->correlationId = IDGenerator::generateHash(); $message = $request instanceof AMQPMessage ? $request : new AMQPMessage((string)$request); $message->set('reply_to', $this->responseQueue); $message->set('correlation_id', $this->correlationId); $message->set('timestamp', time()); $this->channel->queue_declare( $this->requestQueue, false, false, false, false ); $this->trigger('request.before.send', [$message]); $this->channel->basic_publish( $message, null, $this->requestQueue ); $this->trigger('request.after.send', [$message]); while ($this->responseBody === null) { $this->channel->wait(); } return $this->responseBody; } /** * Sends the passed request to the server using the passed queue. * Alias for `self::request()`. * @param string|AMQPMessage $request The request body or an `AMQPMessage` instance. * @param string|null $queueName [optional] The name of queue to send through. * @return string The response body. * @throws RPCEndpointException If the client is not connected yet or if request Correlation ID does not match the one of the response. */ public function call($request, ?string $queueName = null): string { return $this->request($request, $queueName); } /** * Validates the response. * @param AMQPMessage $response * @return void * @throws RPCEndpointException */ protected function onResponse(AMQPMessage $response): void { $this->trigger('response.on.get', [$response]); if ($this->correlationId === $response->get('correlation_id')) { $this->responseBody = $this->callback($response); $response->ack(); return; } throw new RPCEndpointException( sprintf( 'Correlation ID of the response "%s" does not match the one of the request "%s"!', $this->correlationId, (string)$response->get('correlation_id') ) ); } /** * Returns the final response body. * @param AMQPMessage $message * @return string */ protected function callback(AMQPMessage $message): string { return $message->body; } } |