Source code for rabbitpy.amqp

"""
AMQP Adapter

"""
from pamqp import specification as spec

from rabbitpy import base
from rabbitpy import message
from rabbitpy import exceptions
from rabbitpy import utils


# pylint: disable=too-many-public-methods
[docs]class AMQP(base.ChannelWriter): """The AMQP Adapter provides a more generic, non-opinionated interface to RabbitMQ by providing methods that map to the AMQP API. :param rabbitmq.channel.Channel channel: The channel to use """ def __init__(self, channel): super(AMQP, self).__init__(channel) self.consumer_tag = 'rabbitpy.%s.%s' % (self.channel.id, id(self)) self._consuming = False
[docs] def basic_ack(self, delivery_tag=0, multiple=False): """Acknowledge one or more messages This method acknowledges one or more messages delivered via the Deliver or Get-Ok methods. The client can ask to confirm a single message or a set of messages up to and including a specific message. :param delivery_tag: Server-assigned delivery tag :type delivery_tag: int|long :param bool multiple: Acknowledge multiple messages """ self._write_frame(spec.Basic.Ack(delivery_tag, multiple))
[docs] def basic_consume(self, queue='', consumer_tag='', no_local=False, no_ack=False, exclusive=False, nowait=False, arguments=None): """Start a queue consumer This method asks the server to start a "consumer", which is a transient request for messages from a specific queue. Consumers last as long as the channel they were declared on, or until the client cancels them. This method will act as an generator, returning messages as they are delivered from the server. Example use: .. code:: python for message in basic_consume(queue_name): print message.body message.ack() :param str queue: The queue name to consume from :param str consumer_tag: The consumer tag :param bool no_local: Do not deliver own messages :param bool no_ack: No acknowledgement needed :param bool exclusive: Request exclusive access :param bool nowait: Do not send a reply method :param dict arguments: Arguments for declaration """ if not consumer_tag: consumer_tag = self.consumer_tag # pylint: disable=protected-access self.channel._consumers[consumer_tag] = (self, no_ack) self._rpc(spec.Basic.Consume(0, queue, consumer_tag, no_local, no_ack, exclusive, nowait, arguments)) self._consuming = True try: while self._consuming: # pylint: disable=protected-access msg = self.channel._consume_message() if msg: yield msg else: if self._consuming: self.basic_cancel(consumer_tag) break finally: if self._consuming: self.basic_cancel(consumer_tag)
[docs] def basic_cancel(self, consumer_tag='', nowait=False): """End a queue consumer This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel- ok reply. :param str consumer_tag: Consumer tag :param bool nowait: Do not send a reply method """ if utils.PYPY and not self._consuming: return if not self._consuming: raise exceptions.NotConsumingError() # pylint: disable=protected-access self.channel._cancel_consumer(self, consumer_tag, nowait) self._consuming = False
[docs] def basic_get(self, queue='', no_ack=False): """Direct access to a queue This method provides a direct access to the messages in a queue using a synchronous dialogue that is designed for specific types of application where synchronous functionality is more important than performance. :param str queue: The queue name :param bool no_ack: No acknowledgement needed """ self._rpc(spec.Basic.Get(0, queue, no_ack))
[docs] def basic_nack(self, delivery_tag=0, multiple=False, requeue=True): """Reject one or more incoming messages. This method allows a client to reject one or more incoming messages. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue. This method is also used by the server to inform publishers on channels in confirm mode of unhandled messages. If a publisher receives this method, it probably needs to republish the offending messages. :param delivery_tag: Server-assigned delivery tag :type delivery_tag: int|long :param bool multiple: Reject multiple messages :param bool requeue: Requeue the message """ self._write_frame(spec.Basic.Nack(delivery_tag, multiple, requeue))
[docs] def basic_publish(self, exchange='', routing_key='', body='', properties=None, mandatory=False, immediate=False): """Publish a message This method publishes a message to a specific exchange. The message will be routed to queues as defined by the exchange configuration and distributed to any active consumers when the transaction, if any, is committed. :param str exchange: The exchange name :param str routing_key: Message routing key :param body: The message body :type body: str|bytes :param dict properties: AMQP message properties :param bool mandatory: Indicate mandatory routing :param bool immediate: Request immediate delivery :return: bool or None """ msg = message.Message(self.channel, body, properties or {}, False, False) return msg.publish(exchange, routing_key, mandatory, immediate)
[docs] def basic_qos(self, prefetch_size=0, prefetch_count=0, global_flag=False): """Specify quality of service This method requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection. The particular properties and semantics of a qos method always depend on the content class semantics. Though the qos method could in principle apply to both peers, it is currently meaningful only for the server. :param prefetch_size: Prefetch window in octets :type prefetch_size: int|long :param int prefetch_count: Prefetch window in messages :param bool global_flag: Apply to entire connection """ self._rpc(spec.Basic.Qos(prefetch_size, prefetch_count, global_flag))
[docs] def basic_reject(self, delivery_tag=0, requeue=True): """Reject an incoming message This method allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue. :param delivery_tag: Server-assigned delivery tag :type delivery_tag: int|long :param bool requeue: Requeue the message """ self._write_frame(spec.Basic.Reject(delivery_tag, requeue))
[docs] def basic_recover(self, requeue=False): """Redeliver unacknowledged messages This method asks the server to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered. This method replaces the asynchronous Recover. :param bool requeue: Requeue the message """ self._rpc(spec.Basic.Recover(requeue))
[docs] def confirm_select(self): """This method sets the channel to use publisher acknowledgements. The client can only use this method on a non-transactional channel. """ self._rpc(spec.Confirm.Select())
[docs] def exchange_declare(self, exchange='', exchange_type='direct', passive=False, durable=False, auto_delete=False, internal=False, nowait=False, arguments=None): """Verify exchange exists, create if needed This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class. :param str exchange: The exchange name :param str exchange_type: Exchange type :param bool passive: Do not create exchange :param bool durable: Request a durable exchange :param bool auto_delete: Automatically delete when not in use :param bool internal: Deprecated :param bool nowait: Do not send a reply method :param dict arguments: Arguments for declaration """ self._rpc(spec.Exchange.Declare(0, exchange, exchange_type, passive, durable, auto_delete, internal, nowait, arguments))
[docs] def exchange_delete(self, exchange='', if_unused=False, nowait=False): """Delete an exchange This method deletes an exchange. When an exchange is deleted all queue bindings on the exchange are cancelled. :param str exchange: The exchange name :param bool if_unused: Delete only if unused :param bool nowait: Do not send a reply method """ self._rpc(spec.Exchange.Delete(0, exchange, if_unused, nowait))
[docs] def exchange_bind(self, destination='', source='', routing_key='', nowait=False, arguments=None): """Bind exchange to an exchange. This method binds an exchange to an exchange. :param str destination: The destination exchange name :param str source: The source exchange name :param str routing_key: The routing key to bind with :param bool nowait: Do not send a reply method :param dict arguments: Optional arguments """ self._rpc(spec.Exchange.Bind(0, destination, source, routing_key, nowait, arguments))
[docs] def exchange_unbind(self, destination='', source='', routing_key='', nowait=False, arguments=None): """Unbind an exchange from an exchange. This method unbinds an exchange from an exchange. :param str destination: The destination exchange name :param str source: The source exchange name :param str routing_key: The routing key to bind with :param bool nowait: Do not send a reply method :param dict arguments: Optional arguments """ self._rpc(spec.Exchange.Unbind(0, destination, source, routing_key, nowait, arguments))
[docs] def queue_bind(self, queue='', exchange='', routing_key='', nowait=False, arguments=None): """Bind queue to an exchange This method binds a queue to an exchange. Until a queue is bound it will not receive any messages. In a classic messaging model, store-and- forward queues are bound to a direct exchange and subscription queues are bound to a topic exchange. :param str queue: The queue name :param str exchange: Name of the exchange to bind to :param str routing_key: Message routing key :param bool nowait: Do not send a reply method :param dict arguments: Arguments for binding """ self._rpc(spec.Queue.Bind(0, queue, exchange, routing_key, nowait, arguments))
[docs] def queue_declare(self, queue='', passive=False, durable=False, exclusive=False, auto_delete=False, nowait=False, arguments=None): """Declare queue, create if needed This method creates or checks a queue. When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue. :param str queue: The queue name :param bool passive: Do not create queue :param bool durable: Request a durable queue :param bool exclusive: Request an exclusive queue :param bool auto_delete: Auto-delete queue when unused :param bool nowait: Do not send a reply method :param dict arguments: Arguments for declaration """ self._rpc(spec.Queue.Declare(0, queue, passive, durable, exclusive, auto_delete, nowait, arguments))
[docs] def queue_delete(self, queue='', if_unused=False, if_empty=False, nowait=False): """Delete a queue This method deletes a queue. When a queue is deleted any pending messages are sent to a dead-letter queue if this is defined in the server configuration, and all consumers on the queue are cancelled. :param str queue: The queue name :param bool if_unused: Delete only if unused :param bool if_empty: Delete only if empty :param bool nowait: Do not send a reply method """ self._rpc(spec.Queue.Delete(0, queue, if_unused, if_empty, nowait))
[docs] def queue_purge(self, queue='', nowait=False): """Purge a queue This method removes all messages from a queue which are not awaiting acknowledgment. :param str queue: The queue name :param bool nowait: Do not send a reply method """ self._rpc(spec.Queue.Purge(0, queue, nowait))
[docs] def queue_unbind(self, queue='', exchange='', routing_key='', arguments=None): """Unbind a queue from an exchange This method unbinds a queue from an exchange. :param str queue: The queue name :param str exchange: The exchange name :param str routing_key: Routing key of binding :param dict arguments: Arguments of binding """ self._rpc(spec.Queue.Unbind(0, queue, exchange, routing_key, arguments))
[docs] def tx_select(self): """Select standard transaction mode This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods. """ self._rpc(spec.Tx.Select())
[docs] def tx_commit(self): """Commit the current transaction This method commits all message publications and acknowledgments performed in the current transaction. A new transaction starts immediately after a commit. """ self._rpc(spec.Tx.Commit())
[docs] def tx_rollback(self): """Abandon the current transaction This method abandons all message publications and acknowledgments performed in the current transaction. A new transaction starts immediately after a rollback. Note that unacked messages will not be automatically redelivered by rollback; if that is required an explicit recover call should be issued. """ self._rpc(spec.Tx.Rollback())