Rabbitmq - php amqp broken broken pipe error -
i processing huge xml document (which contains around million entries) , subsequently importing formatted version db using rabbitmq. each time after publishing around 200,000 entries receive broken pipe error , , rabbitmq unable recover it.
notice error: fwrite(): send of 2651 bytes failed errno=11 resource temporarily unavailable in [/var/www/ribbon/app/console/command/lib/php_amqplib/amqp.inc, line 439]
notice error: fwrite(): send of 33 bytes failed errno=104 connection reset peer in [/var/www/ribbon/app/console/command/lib/php_amqplib/amqp.inc, line 439]
notice error: fwrite(): send of 19 bytes failed errno=32 broken pipe in [/var/www/ribbon/app/console/command/lib/php_amqplib/amqp.inc, line 439]
this subsequently causes node down error , process needs manually killed recover it.
these class methods:-
public function publishmessage($message) { if (!isset($this->conn)) { $this->_createnewconnectionandchannel(); } try { $this->ch->basic_publish( new amqpmessage($message, array('content_type' => 'text/plain')), $this->defaults['exchange']['name'], $this->defaults['binding']['routing_key'] ); } catch (exception $e) { echo "caught exception : " . $e->getmessage(); echo "creating new connection."; $this->_createnewconnectionandchannel(); $this->publishmessage($message); // try again } } protected function _createnewconnectionandchannel() { if (isset($this->conn)) { $this->conn->close(); } if(isset($this->ch)) { $this->ch->close(); } $this->conn = new amqpconnection( $this->defaults['connection']['host'], $this->defaults['connection']['port'], $this->defaults['connection']['user'], $this->defaults['connection']['pass'] ); $this->ch = $this->conn->channel(); $this->ch->access_request($this->defaults['channel']['vhost'], false, false, true, true); $this->ch->basic_qos(0 , 20 , 0); // fair dispatching $this->ch->queue_declare( $this->defaults['queue']['name'], $this->defaults['queue']['passive'], $this->defaults['queue']['durable'], $this->defaults['queue']['exclusive'], $this->defaults['queue']['auto_delete'] ); $this->ch->exchange_declare( $this->defaults['exchange']['name'], $this->defaults['exchange']['type'], $this->defaults['exchange']['passive'], $this->defaults['exchange']['durable'], $this->defaults['exchange']['auto_delete'] ); $this->ch->queue_bind( $this->defaults['queue']['name'], $this->defaults['exchange']['name'], $this->defaults['binding']['routing_key'] ); }
any appreciated.
make sure have added virtualhost access user on rabbit mq. i've created new user , forgot set access rights "/" host used default.
you can via management panel yourhost:15672 > admin > click on user > "set permission".
p.s. assume rabbitmq service running, user exists , password correct.
Comments
Post a Comment