Source of file Publisher.php
Size: 15,026 Bytes - Last Modified: 2021-03-15T15:06:16+00:00
C:/Users/MAKS/Code/_PROJECTS/amqp-agent/src/Worker/Publisher.php
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
Covered by 13 test(s):
91
Covered by 13 test(s):
92
Covered by 13 test(s):
93
Covered by 13 test(s):
9495
Covered by 13 test(s):
96
Covered by 13 test(s):
979899100101102103104105106107108
Covered by 8 test(s):
109
Covered by 8 test(s):
110
Covered by 4 test(s):
111112113
Covered by 8 test(s):
114115116
Covered by 8 test(s):
117
Covered by 8 test(s):
118
Covered by 8 test(s):
119
Covered by 8 test(s):
120
Covered by 8 test(s):
121
Covered by 8 test(s):
122
Covered by 8 test(s):
123
Covered by 8 test(s):
124
Covered by 8 test(s):
125
Covered by 8 test(s):
126127128129130131
Covered by 8 test(s):
132
Covered by 4 test(s):
133134135
Covered by 8 test(s):
136137138139140141142143144145146147
Covered by 7 test(s):
148
Covered by 7 test(s):
149
Covered by 4 test(s):
150151152
Covered by 7 test(s):
153154155
Covered by 7 test(s):
156
Covered by 7 test(s):
157
Covered by 7 test(s):
158
Covered by 7 test(s):
159
Covered by 7 test(s):
160
Covered by 7 test(s):
161
Covered by 7 test(s):
162163164165166167
Covered by 7 test(s):
168
Covered by 4 test(s):
169170171
Covered by 7 test(s):
172173174175176177178179180181182
Covered by 5 test(s):
183
Covered by 5 test(s):
184
Covered by 4 test(s):
185186187
Covered by 5 test(s):
188
Covered by 5 test(s):
189190191
Covered by 5 test(s):
192
Covered by 5 test(s):
193
Covered by 5 test(s):
194195196
Covered by 5 test(s):
197
Covered by 4 test(s):
198199200
Covered by 5 test(s):
201202203204205206207208209210211212213
Covered by 5 test(s):
214
Covered by 5 test(s):
215
Covered by 3 test(s):
216217218
Covered by 5 test(s):
219220
Covered by 5 test(s):
221222
Covered by 5 test(s):
223224
Covered by 5 test(s):
225
Covered by 1 test(s):
226
Covered by 5 test(s):
227
Covered by 3 test(s):
228
Covered by 4 test(s):
229
Covered by 2 test(s):
230231
Covered by 2 test(s):
232
Covered by 2 test(s):
233
Covered by 2 test(s):
234
Covered by 2 test(s):
235
Covered by 2 test(s):
236
Covered by 2 test(s):
237238239240241242
Covered by 3 test(s):
243
Covered by 3 test(s):
244
Covered by 3 test(s):
245
Covered by 3 test(s):
246
Covered by 3 test(s):
247
Covered by 3 test(s):
248
Covered by 3 test(s):
249250251252
Covered by 3 test(s):
253254
Covered by 3 test(s):
255256257
Covered by 3 test(s):
258
Covered by 2 test(s):
259260261
Covered by 3 test(s):
262263264265266267268269270271272273274275
Covered by 2 test(s):
276
Covered by 2 test(s):
277
Covered by 1 test(s):
278279280
Covered by 2 test(s):
281282
Covered by 2 test(s):
283284
Covered by 2 test(s):
285
Covered by 2 test(s):
286
Covered by 2 test(s):
287288
Covered by 2 test(s):
289290
Covered by 2 test(s):
291
Covered by 2 test(s):
292
Covered by 1 test(s):
293294
Covered by 1 test(s):
295296297
Covered by 1 test(s):
298
Covered by 1 test(s):
299
Covered by 1 test(s):
300
Covered by 1 test(s):
301
Covered by 1 test(s):
302303
Covered by 1 test(s):
304305306307308
Covered by 2 test(s):
309
Covered by 2 test(s):
310
Covered by 2 test(s):
311
Covered by 2 test(s):
312
Covered by 2 test(s):
313
Covered by 2 test(s):
314
Covered by 2 test(s):
315316317
Covered by 2 test(s):
318319
Covered by 2 test(s):
320321322323324325326327328329330331332333334335336337
Covered by 1 test(s):
338339340
Covered by 1 test(s):
341342
Covered by 1 test(s):
343344345
Covered by 1 test(s):
346
Covered by 1 test(s):
347348349
Covered by 1 test(s):
350351352353354355356357358
Covered by 3 test(s):
359
Covered by 3 test(s):
360
Covered by 3 test(s):
361
Covered by 3 test(s):
362363
Covered by 3 test(s):
364365366367368369370371372373374375
Covered by 2 test(s):
376
Covered by 2 test(s):
377
Covered by 2 test(s):
378379
Covered by 1 test(s):
380
Covered by 1 test(s):
381
Covered by 1 test(s):
382383
Covered by 1 test(s):
384385
| <?php /** * @author Marwan Al-Soltany <MarwanAlsoltany@gmail.com> * @copyright Marwan Al-Soltany 2020 * For the full copyright and license information, please view * the LICENSE file that was distributed with this source code. */ declare(strict_types=1); namespace MAKS\AmqpAgent\Worker; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Exception\AMQPInvalidArgumentException; use PhpAmqpLib\Exception\AMQPTimeoutException; use PhpAmqpLib\Exception\AMQPConnectionBlockedException; use PhpAmqpLib\Exception\AMQPConnectionClosedException; use PhpAmqpLib\Exception\AMQPChannelClosedException; use MAKS\AmqpAgent\Worker\AbstractWorker; use MAKS\AmqpAgent\Worker\PublisherInterface; use MAKS\AmqpAgent\Worker\WorkerFacilitationInterface; use MAKS\AmqpAgent\Exception\AmqpAgentException as Exception; use MAKS\AmqpAgent\Config\PublisherParameters as Parameters; /** * A class specialized in publishing. Implementing only the methods needed for a publisher. * * Example: * ``` * $publisher = new Publisher(); * $publisher->connect(); * $publisher->queue(); * $publisher->exchange(); * $publisher->bind(); * $publisher->publish('Some message!'); * $publisher->disconnect(); * ``` * * @since 1.0.0 * @api */ class Publisher extends AbstractWorker implements PublisherInterface, WorkerFacilitationInterface { /** * The default exchange options that the worker should use when no overrides are provided. * @var array */ protected $exchangeOptions; /** * The default bind options that the worker should use when no overrides are provided. * @var array */ protected $bindOptions; /** * The default message options that the worker should use when no overrides are provided. * @var array */ protected $messageOptions; /** * The default publish options that the worker should use when no overrides are provided. * @var array */ protected $publishOptions; /** * Publisher object constructor. * @param array $connectionOptions [optional] The overrides for the default connection options of the worker. * @param array $channelOptions [optional] The overrides for the default channel options of the worker. * @param array $queueOptions [optional] The overrides for the default queue options of the worker. * @param array $exchangeOptions [optional] The overrides for the default exchange options of the worker. * @param array $bindOptions [optional] The overrides for the default bind options of the worker. * @param array $messageOptions [optional] The overrides for the default message options of the worker. * @param array $publishOptions [optional] The overrides for the default publish options of the worker. */ public function __construct( array $connectionOptions = [], array $channelOptions = [], array $queueOptions = [], array $exchangeOptions = [], array $bindOptions = [], array $messageOptions = [], array $publishOptions = [] ) { $this->exchangeOptions = Parameters::patch($exchangeOptions, 'EXCHANGE_OPTIONS'); $this->bindOptions = Parameters::patch($bindOptions, 'BIND_OPTIONS'); $this->messageOptions = Parameters::patch($messageOptions, 'MESSAGE_OPTIONS'); $this->publishOptions = Parameters::patch($publishOptions, 'PUBLISH_OPTIONS'); parent::__construct($connectionOptions, $channelOptions, $queueOptions); } /** * Declares an exchange on the default channel of the worker's connection to RabbitMQ server. * @param array|null $parameters [optional] The overrides for the default exchange options of the worker. * @param AMQPChannel|null $_channel [optional] The channel that should be used instead of the default worker's channel. * @return self * @throws AMQPTimeoutException */ public function exchange(?array $parameters = null, ?AMQPChannel $_channel = null) { $changes = null; if ($parameters) { $changes = $this->mutateClassMember('exchangeOptions', $parameters); } $channel = $_channel ?: $this->channel; try { $channel->exchange_declare( $this->exchangeOptions['exchange'], $this->exchangeOptions['type'], $this->exchangeOptions['passive'], $this->exchangeOptions['durable'], $this->exchangeOptions['auto_delete'], $this->exchangeOptions['internal'], $this->exchangeOptions['nowait'], $this->exchangeOptions['arguments'], $this->exchangeOptions['ticket'] ); } catch (AMQPTimeoutException $error) { // @codeCoverageIgnore Exception::rethrow($error); // @codeCoverageIgnore } if ($changes) { $this->mutateClassMember('exchangeOptions', $changes); } return $this; } /** * Binds the default queue to the default exchange on the default channel of the worker's connection to RabbitMQ server. * @param array|null $parameters [optional] The overrides for the default bind options of the worker. * @param AMQPChannel|null $_channel [optional] The channel that should be used instead of the default worker's channel. * @return self * @throws AMQPTimeoutException */ public function bind(?array $parameters = null, ?AMQPChannel $_channel = null) { $changes = null; if ($parameters) { $changes = $this->mutateClassMember('bindOptions', $parameters); } $channel = $_channel ?: $this->channel; try { $channel->queue_bind( $this->bindOptions['queue'], $this->bindOptions['exchange'], $this->bindOptions['routing_key'], $this->bindOptions['nowait'], $this->bindOptions['arguments'], $this->bindOptions['ticket'] ); } catch (AMQPTimeoutException $error) { // @codeCoverageIgnore Exception::rethrow($error); // @codeCoverageIgnore } if ($changes) { $this->mutateClassMember('bindOptions', $changes); } return $this; } /** * Returns an AMQPMessage object. * @param string $body The body of the message. * @param array|null $properties [optional] The overrides for the default properties of the default message options of the worker. * @return AMQPMessage */ public function message(string $body, ?array $properties = null): AMQPMessage { $changes = null; if ($properties) { $changes = $this->mutateClassSubMember('messageOptions', 'properties', $properties); } if ($body) { $this->messageOptions['body'] = $body; } $message = new AMQPMessage( $this->messageOptions['body'], $this->messageOptions['properties'] ); if ($changes) { $this->mutateClassSubMember('messageOptions', 'properties', $changes); } return $message; } /** * Publishes a message to the default exchange on the default channel of the worker's connection to RabbitMQ server. * @param string|array|AMQPMessage $payload A string of the body of the message or an array of body and properties for the message or a AMQPMessage object. * @param array|null $parameters [optional] The overrides for the default publish options of the worker. * @param AMQPChannel|null $_channel [optional] The channel that should be used instead of the default worker's channel. * @return self * @throws Exception|AMQPChannelClosedException|AMQPConnectionClosedException|AMQPConnectionBlockedException */ public function publish($payload, ?array $parameters = null, ?AMQPChannel $_channel = null) { $changes = null; if ($parameters) { $changes = $this->mutateClassMember('publishOptions', $parameters); } $channel = $_channel ?: $this->channel; $originalMessage = $this->publishOptions['msg']; $message = $payload ?: $originalMessage; if ($message instanceof AMQPMessage) { $this->publishOptions['msg'] = $message; } elseif (is_array($message) && isset($message['body']) && isset($message['properties'])) { $this->publishOptions['msg'] = $this->message($message['body'], $message['properties']); } elseif (is_string($message)) { $this->publishOptions['msg'] = $this->message($message); } else { throw new Exception( sprintf( 'Payload must be a string, an array like %s, or an instance of "%s". The given parameter (data-type: %s) was none of them.', '["body" => "Message body!", "properties" ["key" => "value"]]', AMQPMessage::class, is_object($payload) ? get_class($payload) : gettype($payload) ) ); } try { $channel->basic_publish( $this->publishOptions['msg'], $this->publishOptions['exchange'], $this->publishOptions['routing_key'], $this->publishOptions['mandatory'], $this->publishOptions['immediate'], $this->publishOptions['ticket'] ); } catch (AMQPChannelClosedException | AMQPConnectionClosedException | AMQPConnectionBlockedException $error) { // @codeCoverageIgnore Exception::rethrow($error); // @codeCoverageIgnore } finally { // reverting messageOptions back to its state. $this->publishOptions['msg'] = $originalMessage; } if ($changes) { $this->mutateClassMember('publishOptions', $changes); } return $this; } /** * Publishes a batch of messages to the default exchange on the default channel of the worker's connection to RabbitMQ server. * @param string[]|array[]|AMQPMessage[] $messages An array of bodies of the messages or an array of arrays of body and properties for the messages or an array of AMQPMessage objects. * @param int $batchSize [optional] The number of messages that should be published per batch. * @param array|null $parameters [optional] The overrides for the default publish options of the worker. * @param AMQPChannel|null $_channel [optional] The channel that should be used instead of the default worker's channel. * @return self * @throws Exception|AMQPChannelClosedException|AMQPConnectionClosedException|AMQPConnectionBlockedException */ public function publishBatch(array $messages, int $batchSize = 2500, ?array $parameters = null, ?AMQPChannel $_channel = null) { $changes = null; if ($parameters) { $changes = $this->mutateClassMember('publishOptions', $parameters); } $channel = $_channel ?: $this->channel; $originalMessage = $this->publishOptions['msg']; $count = count($messages); for ($i = 0; $i < $count; $i++) { $payload = $messages[$i]; $message = $payload ?: $originalMessage; if ($message instanceof AMQPMessage) { $this->publishOptions['msg'] = $message; } elseif (is_array($message) && isset($message['body']) && isset($message['properties'])) { $this->publishOptions['msg'] = $this->message($message['body'], $message['properties']); } elseif (is_string($message)) { $this->publishOptions['msg'] = $this->message($message); } else { throw new Exception( sprintf( 'Messages array elements must be either a string, an array like %s, or an instance of "%s". Element in index "%d" (data-type: %s) was none of them.', '["body" => "Message body!", "properties" ["key" => "value"]]', AMQPMessage::class, $i, is_object($payload) ? get_class($payload) : gettype($payload) ) ); } $channel->batch_basic_publish( $this->publishOptions['msg'], $this->publishOptions['exchange'], $this->publishOptions['routing_key'], $this->publishOptions['mandatory'], $this->publishOptions['immediate'], $this->publishOptions['ticket'] ); if ($i % $batchSize == 0) { try { $channel->publish_batch(); // @codeCoverageIgnoreStart } catch (AMQPConnectionBlockedException $e) { $tries = -1; do { sleep(1); $tries++; } while ($this->connection->isBlocked() && $tries >= 60); $channel->publish_batch(); } catch (AMQPChannelClosedException | AMQPConnectionClosedException | AMQPConnectionBlockedException $error) { Exception::rethrow($error); // @codeCoverageIgnoreEnd } } } try { $channel->publish_batch(); } catch (AMQPChannelClosedException | AMQPConnectionClosedException | AMQPConnectionBlockedException $error) { // @codeCoverageIgnore Exception::rethrow($error); // @codeCoverageIgnore } finally { // reverting messageOptions back to its state. $this->publishOptions['msg'] = $originalMessage; } if ($changes) { $this->mutateClassMember('publishOptions', $changes); } return $this; } /** * Executes `self::connect()`, `self::queue()`, `self::exchange`, and `self::bind()` respectively. * @return self */ public function prepare() { $this->connect(); $this->queue(); $this->exchange(); $this->bind(); return $this; } /** * Executes `self::connect()`, `self::queue()`, `self::exchange`, `self::bind()`, `self::publish()`, and `self::disconnect()` respectively. * @param string[]|array[]|AMQPMessage[] $messages An array of strings, arrays, or AMQPMessage objects (same as `self::publishBatch()`). * @return void * @throws Exception */ public function work($messages): void { try { $this->prepare(); foreach ($messages as $message) { $this->publish($message); } $this->disconnect(); } catch (Exception $error) { Exception::rethrow($error, null, false); } } } |