Source of file AbstractEndpoint.php
Size: 8,353 Bytes - Last Modified: 2021-01-12T22:04:13+00:00
C:/Users/MAKS/Code/_PROJECTS/amqp-agent/src/RPC/AbstractEndpoint.php
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
Covered by 16 test(s):
103
Covered by 16 test(s):
104
Covered by 16 test(s):
105106107108109110111
Covered by 10 test(s):
112
Covered by 10 test(s):
113114115116117118119120121122123
Covered by 11 test(s):
124
Covered by 11 test(s):
125
Covered by 11 test(s):
126127128
Covered by 11 test(s):
129
Covered by 1 test(s):
130131132
Covered by 11 test(s):
133134
Covered by 11 test(s):
135
Covered by 11 test(s):
136137
Covered by 11 test(s):
138
Covered by 11 test(s):
139140
Covered by 11 test(s):
141142143144145146147148149
Covered by 12 test(s):
150
Covered by 7 test(s):
151152
Covered by 7 test(s):
153
Covered by 7 test(s):
154155
Covered by 7 test(s):
156
Covered by 7 test(s):
157158
Covered by 12 test(s):
159160161162163164165166
Covered by 16 test(s):
167
Covered by 16 test(s):
168
Covered by 16 test(s):
169
Covered by 16 test(s):
170
Covered by 10 test(s):
171172173
Covered by 16 test(s):
174175176177178179180181182
Covered by 1 test(s):
183184185186187188189190191192193
Covered by 3 test(s):
194
Covered by 3 test(s):
195
Covered by 1 test(s):
196
Covered by 1 test(s):
197198
Covered by 3 test(s):
199200
Covered by 2 test(s):
201
Covered by 2 test(s):
202
Covered by 2 test(s):
203
Covered by 2 test(s):
204
Covered by 2 test(s):
205
Covered by 2 test(s):
206207208
Covered by 2 test(s):
209
Covered by 2 test(s):
210
Covered by 2 test(s):
211
Covered by 2 test(s):
212213214
Covered by 2 test(s):
215216
Covered by 2 test(s):
217
Covered by 2 test(s):
218
Covered by 2 test(s):
219
Covered by 2 test(s):
220
Covered by 2 test(s):
221
Covered by 2 test(s):
222
Covered by 2 test(s):
223224
Covered by 2 test(s):
225
Covered by 2 test(s):
226
Covered by 2 test(s):
227228229
Covered by 2 test(s):
230231
Covered by 2 test(s):
232
Covered by 2 test(s):
233
Covered by 2 test(s):
234235236237
Covered by 2 test(s):
238
Covered by 2 test(s):
239240241
Covered by 2 test(s):
242243
Covered by 2 test(s):
244245
Covered by 2 test(s):
246
Covered by 1 test(s):
247248
Covered by 1 test(s):
249
Covered by 1 test(s):
250251252
Covered by 2 test(s):
253
Covered by 1 test(s):
254
Covered by 1 test(s):
255256257258259260261262263264265266267268269270271272273274275276277278
Covered by 6 test(s):
279
Covered by 6 test(s):
280
Covered by 6 test(s):
281
Covered by 6 test(s):
282
Covered by 6 test(s):
283284285
Covered by 6 test(s):
286287
Covered by 3 test(s):
288289290291292293294295296297
| <?php /** * @author Marwan Al-Soltany <MarwanAlsoltany@gmail.com> * @copyright Marwan Al-Soltany 2020 * For the full copyright and license information, please view * the LICENSE file that was distributed with this source code. */ declare(strict_types=1); namespace MAKS\AmqpAgent\RPC; use Exception; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Message\AMQPMessage; use MAKS\AmqpAgent\RPC\AbstractEndpointInterface; use MAKS\AmqpAgent\Helper\EventTrait; use MAKS\AmqpAgent\Exception\MagicMethodsExceptionsTrait; use MAKS\AmqpAgent\Exception\RPCEndpointException; use MAKS\AmqpAgent\Config\RPCEndpointParameters as Parameters; /** * An abstract class implementing the basic functionality of an endpoint. * @since 2.0.0 * @api */ abstract class AbstractEndpoint implements AbstractEndpointInterface { use MagicMethodsExceptionsTrait; use EventTrait; /** * The connection options of the RPC endpoint. * @var array */ protected $connectionOptions; /** * The queue name of the RPC endpoint. * @var string */ protected $queueName; /** * Whether the endpoint is connected to RabbitMQ server or not. * @var bool */ protected $connected; /** * The endpoint connection. * @var AMQPStreamConnection */ protected $connection; /** * The endpoint channel. * @var AMQPChannel */ protected $channel; /** * The request body. * @var string */ protected $requestBody; /** * Requests conveyor. * @var string */ protected $requestQueue; /** * The response body. * @var string */ protected $responseBody; /** * Responses conveyor. * @var string */ protected $responseQueue; /** * Correlation ID of the last request/response. * @var string */ protected $correlationId; /** * Class constructor. * @param array $connectionOptions [optional] The overrides for the default connection options of the RPC endpoint. * @param string $queueName [optional] The override for the default queue name of the RPC endpoint. */ public function __construct(?array $connectionOptions = [], ?string $queueName = null) { $this->connectionOptions = Parameters::patch($connectionOptions, 'RPC_CONNECTION_OPTIONS'); $this->queueName = empty($queueName) ? Parameters::RPC_QUEUE_NAME : $queueName; } /** * Closes the connection with RabbitMQ server before destroying the object. */ public function __destruct() { $this->disconnect(); } /** * Opens a connection with RabbitMQ server. * @param array|null $connectionOptions [optional] The overrides for the default connection options of the RPC endpoint. * @return self * @throws RPCEndpointException If the endpoint is already connected. */ public function connect(?array $connectionOptions = []) { $this->connectionOptions = Parameters::patchWith( $connectionOptions ?? [], $this->connectionOptions ); if ($this->isConnected()) { throw new RPCEndpointException('Endpoint is already connected!'); } $parameters = array_values($this->connectionOptions); $this->connection = new AMQPStreamConnection(...$parameters); $this->trigger('connection.after.open', [$this->connection]); $this->channel = $this->connection->channel(); $this->trigger('channel.after.open', [$this->channel]); return $this; } /** * Closes the connection with RabbitMQ server. * @return void */ public function disconnect(): void { if ($this->isConnected()) { $this->connected = null; $this->trigger('channel.before.close', [$this->channel]); $this->channel->close(); $this->trigger('connection.before.close', [$this->connection]); $this->connection->close(); } } /** * Returns whether the endpoint is connected or not. * @return bool */ public function isConnected(): bool { $this->connected = ( isset($this->connection) && isset($this->channel) && $this->connection->isConnected() && $this->channel->is_open() ); return $this->connected; } /** * Returns the connection used by the endpoint. * @return AMQPStreamConnection */ public function getConnection(): AMQPStreamConnection { return $this->connection; } /** * The time needed for the round-trip to RabbitMQ server in milliseconds. * Note that if the endpoint is not connected yet, this method will establish a new connection only for checking. * @return float A two decimal points rounded float. */ final public function ping(): float { try { $pingConnection = $this->connection; if (!isset($pingConnection) || !$pingConnection->isConnected()) { $parameters = array_values($this->connectionOptions); $pingConnection = new AMQPStreamConnection(...$parameters); } $pingChannel = $pingConnection->channel(); [$pingQueue] = $pingChannel->queue_declare( null, false, false, true, true ); $pingChannel->basic_qos( null, 1, null ); $pingEcho = null; $pingChannel->basic_consume( $pingQueue, null, false, false, false, false, function ($message) use (&$pingEcho) { $message->ack(); $pingEcho = $message->body; } ); $pingStartTime = microtime(true); $pingChannel->basic_publish( new AMQPMessage(__FUNCTION__), null, $pingQueue ); while (!$pingEcho) { $pingChannel->wait(); } $pingEndTime = microtime(true); $pingChannel->queue_delete($pingQueue); if ($pingConnection === $this->connection) { $pingChannel->close(); } else { $pingChannel->close(); $pingConnection->close(); } return round(($pingEndTime - $pingStartTime) * 1000, 2); } catch (Exception $error) { RPCEndpointException::rethrow($error); } } /** * Hooking method based on events to manipulate the request/response during the endpoint/message life cycle. * Check out `self::$events` via `self::getEvents()` after processing at least one request/response to see all available events. * * The parameters will be passed to the callback as follows: * 1. `$listenedOnObject` (first segment of event name e.g. `'connection.after.open'` will be `$connection`), * 2. `$calledOnObject` (the object this method was called on e.g. `$endpoint`), * 3. `$eventName` (the event was listened on e.g. `'connection.after.open'`). * ``` * $endpoint->on('connection.after.open', function ($connection, $endpoint, $event) { * ... * }); * ``` * @param string $event The event to listen on. * @param callable $callback The callback to execute. * @return self */ final public function on(string $event, callable $callback) { $this->bind($event, function (...$arguments) use ($event, $callback) { call_user_func_array( $callback, array_merge( $arguments, [$this, $event] ) ); }); return $this; } /** * Hook method to manipulate the message (request/response) when extending the class. * @param AMQPMessage $message * @return string */ abstract protected function callback(AMQPMessage $message): string; } |