rabbitmq - The best way to create php rabbit worker -


assume have rabbit queue filled data (e.g. user provides action need analyse later). 30 till 50 new items added each second. need create worker through queue , perform tasks on data. can this:

class worker {      public function run()     {         $queue = new queue('exchange', 'queue');         while (true)         {             $queue->processqueue();         }     } } 

and run worker.php on server , seems working.

but wonder, if infinite loop add load rabbit instance, if there no data proceed? maybe better way smth

class worker {     const idle = 5;      private $start = 0;      public function run()     {         $this->start = time();          $queue = new queue('exchange', 'queue');         while (true)         {             $queue->processqueue();              //don't allow worker working lot             if (time() - $this->start >= 60 * 60 - self::idle)             {                 break;             }              sleep(self::idle);         }          $queue->close();     } } 

so worker not pull data rabbit continuously sleep while. , after 1 hour of work stop working , instance of worker invoked crontab job or smth else?

in order manage workers rabbitmq use following library:

https://github.com/php-amqplib/php-amqplib

then create class define how workers should works (contains rabbitmq logic), give me that:

use phpamqplib\connection\amqpstreamconnection; use phpamqplib\message\amqpmessage;  abstract class queueamqpconsumer {         protected $connection;      protected $debug;      protected $queuename;      protected $exchange;      public function __construct(amqpstreamconnection $amqpconnection, $queuename, $exchange = null)     {         $this->connection = $amqpconnection;         $this->queuename = $queuename;         $this->exchange = $exchange;     }      public function run($debug = false)     {         $this->debug = $debug;         $channel = $this->connection->channel();         if ($this->exchange !== null) {             $channel->exchange_declare($this->exchange, "topic", false, true, false);         }          $channel->queue_declare($this->queuename, false, true, false, false);         if ($this->exchange !== null) {             $channel->queue_bind($this->queuename, $this->exchange);         }          $channel->basic_qos(null, 1, null);         $channel->basic_consume($this->queuename, '', false, false, false, false, [$this, 'callback']);          while (count($channel->callbacks)) {             $channel->wait();         }          $channel->close();         $this->connection->close();     }       final function callback(amqpmessage $message)     {         $result = $this->process($message);          if (false === $result) {             $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag'], false, true);         } else {             $message->delivery_info['channel']->basic_ack($message->   delivery_info['delivery_tag']);         }     }      /**      * @param amqpmessage $message      *      * @return bool      */     abstract protected function process(amqpmessage $message); } 

this class allows setup queue, exchange (topic in case), qos (you can custom these parameters, example) etc..

then loop on callback. here callback abstract method process(...) implemented on different workers need process queue. responsability of "loop/listening" on channel : $channel->wait();

then create child class need process messages in queue:

class myworker extends queueamqpconsumer {     protected function process(amqpmessage $message)     {           // .... process message here     } } 

so worker listening queue time , process messages @ moment arrived in queue. if process(...) return else false, message acknowledged.

you have launch class that:

$consumer = new myworker(....);     $consumer->run(); 

Comments

Popular posts from this blog

java - Jasper subreport showing only one entry from the JSON data source when embedded in the Title band -

serialization - Convert Any type in scala to Array[Byte] and back -

SonarQube Plugin for Jenkins does not find SonarQube Scanner executable -