Source code for rabbitpy.amqp_queue

"""
The rabbitpy.amqp_queue module contains two classes :py:class:`Queue` and
:py:class:`Consumer`. The :py:class:`Queue` class is an object that is used
create and work with queues on a RabbitMQ server.

To consume messages you can iterate over the Queue object itself if the
defaults for the :py:meth:`Queue.__iter__() <Queue.__iter__>` method work
for your needs:

.. code:: python

    with conn.channel() as channel:
        for message in rabbitpy.Queue(channel, 'example'):
            print('Message: %r' % message)
            message.ack()

or by the :py:meth:`Queue.consume() <Queue.consume>` method
if you would like to specify `no_ack`, `prefetch_count`, or `priority`:

.. code:: python

    with conn.channel() as channel:
        queue = rabbitpy.Queue(channel, 'example')
        for message in queue.consume:
            print('Message: %r' % message)
            message.ack()

"""
import logging
import warnings

from pamqp import specification

from rabbitpy import base
from rabbitpy import exceptions
from rabbitpy import utils

LOGGER = logging.getLogger(__name__)


[docs]class Queue(base.AMQPClass): """Create and manage RabbitMQ queues. :param channel: The channel object to communicate on :type channel: :py:class:`~rabbitpy.Channel` :param str name: The name of the queue :param exclusive: Queue can only be used by this channel and will auto-delete once the channel is closed. :type exclusive: bool :param durable: Indicates if the queue should survive a RabbitMQ is restart :type durable: bool :param bool auto_delete: Automatically delete when all consumers disconnect :param int max_length: Maximum queue length :param int message_ttl: Time-to-live of a message in milliseconds :param expires: Milliseconds until a queue is removed after becoming idle :type expires: int :param dead_letter_exchange: Dead letter exchange for rejected messages :type dead_letter_exchange: str :param dead_letter_routing_key: Routing key for dead lettered messages :type dead_letter_routing_key: str :param dict arguments: Custom arguments for the queue :raises: :py:exc:`~rabbitpy.exceptions.RemoteClosedChannelException` :raises: :py:exc:`~rabbitpy.exceptions.RemoteCancellationException` """ arguments = dict() auto_delete = False dead_letter_exchange = None dead_letter_routing_key = None durable = False exclusive = False expires = None max_length = None message_ttl = None # pylint: disable=too-many-arguments
[docs] def __init__(self, channel, name='', durable=False, exclusive=False, auto_delete=False, max_length=None, message_ttl=None, expires=None, dead_letter_exchange=None, dead_letter_routing_key=None, arguments=None): """Create a new Queue object instance. Only the :py:class:`rabbitpy.Channel` object is required. .. warning:: You should only use a single :py:class:`~rabbitpy.Queue` instance per channel when consuming or getting messages. Failure to do so can have unintended consequences. """ super(Queue, self).__init__(channel, name) # Defaults self.consumer_tag = 'rabbitpy.%s.%s' % (self.channel.id, id(self)) self.consuming = False # Assign Arguments self.durable = durable self.exclusive = exclusive self.auto_delete = auto_delete self.arguments = arguments or {} self.max_length = max_length self.message_ttl = message_ttl self.expires = expires self.dead_letter_exchange = dead_letter_exchange self.dead_letter_routing_key = dead_letter_routing_key
[docs] def __iter__(self): """Quick way to consume messages using defaults of ``no_ack=False``, prefetch and priority not set. .. warning:: You should only use a single :py:class:`~rabbitpy.Queue` instance per channel when consuming messages. Failure to do so can have unintended consequences. :yields: :class:`~rabbitpy.Message` """ return self.consume()
[docs] def __len__(self): """Return the pending number of messages in the queue by doing a passive Queue declare. :rtype: int """ response = self._rpc(self._declare(True)) return response.message_count
[docs] def __setattr__(self, name, value): """Validate the data types for specific attributes when setting them, otherwise fall throw to the parent __setattr__ :param str name: The attribute to set :param mixed value: The value to set :raises: ValueError """ if value is not None: if (name in ['auto_delete', 'durable', 'exclusive'] and not isinstance(value, bool)): raise ValueError('%s must be True or False' % name) if (name in ['max_length', 'message_ttl', 'expires'] and not isinstance(value, int)): raise ValueError('%s must be an int' % name) if (name in ['dead_letter_exchange', 'dead_letter_routing_key'] and not utils.is_string(value)): raise ValueError('%s must be a str, bytes or unicode' % name) if name == 'arguments' and not isinstance(value, dict): raise ValueError('arguments must be a dict') # Set the value super(Queue, self).__setattr__(name, value)
[docs] def bind(self, source, routing_key=None, arguments=None): """Bind the queue to the specified exchange or routing key. :type source: str or :py:class:`rabbitpy.exchange.Exchange` exchange :param source: The exchange to bind to :param str routing_key: The routing key to use :param dict arguments: Optional arguments for for RabbitMQ :return: bool """ if hasattr(source, 'name'): source = source.name frame = specification.Queue.Bind(queue=self.name, exchange=source, routing_key=routing_key or '', arguments=arguments) response = self._rpc(frame) return isinstance(response, specification.Queue.BindOk)
[docs] def consume(self, no_ack=False, prefetch=None, priority=None): """Consume messages from the queue as a :py:class:`generator`: .. code:: python for message in queue.consume(): message.ack() You can use this method instead of the queue object as an iterator if you need to alter the prefect count, set the consumer priority or consume in no_ack mode. .. versionadded:: 0.26 .. warning:: You should only use a single :py:class:`~rabbitpy.Queue` instance per channel when consuming messages. Failure to do so can have unintended consequences. :param bool no_ack: Do not require acknowledgements :param int prefetch: Set a prefetch count for the channel :param int priority: Consumer priority :rtype: :py:class:`generator` :raises: :exc:`~rabbitpy.exceptions.RemoteCancellationException` """ self._consume(no_ack, prefetch, priority) try: while self.consuming: # pylint: disable=protected-access message = self.channel._consume_message() if message: yield message else: if self.consuming: self.stop_consuming() break finally: if self.consuming: self.stop_consuming()
[docs] def consume_messages(self, no_ack=False, prefetch=None, priority=None): """Consume messages from the queue as a generator. .. warning:: This method is deprecated in favor of :py:meth:`Queue.consume` and will be removed in future releases. .. deprecated:: 0.26 You can use this message instead of the queue object as an iterator if you need to alter the prefect count, set the consumer priority or consume in no_ack mode. :param bool no_ack: Do not require acknowledgements :param int prefetch: Set a prefetch count for the channel :param int priority: Consumer priority :rtype: :py:class:`Generator` :raises: :exc:`~rabbitpy.exceptions.RemoteCancellationException` """ warnings.warn('This method is deprecated in favor Queue.consume', DeprecationWarning) return self.consume(no_ack, prefetch, priority)
# pylint: disable=no-self-use, unused-argument
[docs] def consumer(self, no_ack=False, prefetch=None, priority=None): """Method for returning the contextmanager for consuming messages. You should not use this directly. .. warning:: This method is deprecated and will be removed in a future release. .. deprecated:: 0.26 :param bool no_ack: Do not require acknowledgements :param int prefetch: Set a prefetch count for the channel :param int priority: Consumer priority :return: None """ raise DeprecationWarning()
[docs] def declare(self, passive=False): """Declare the queue on the RabbitMQ channel passed into the constructor, returning the current message count for the queue and its consumer count as a tuple. :param bool passive: Passive declare to retrieve message count and consumer count information :return: Message count, Consumer count :rtype: tuple(int, int) """ response = self._rpc(self._declare(passive)) if not self.name: self.name = response.queue return response.message_count, response.consumer_count
[docs] def delete(self, if_unused=False, if_empty=False): """Delete the queue :param bool if_unused: Delete only if unused :param bool if_empty: Delete only if empty """ self._rpc(specification.Queue.Delete(queue=self.name, if_unused=if_unused, if_empty=if_empty))
[docs] def get(self, acknowledge=True): """Request a single message from RabbitMQ using the Basic.Get AMQP command. .. warning:: You should only use a single :py:class:`~rabbitpy.Queue` instance per channel when getting messages. Failure to do so can have unintended consequences. :param bool acknowledge: Let RabbitMQ know if you will manually acknowledge or negatively acknowledge the message after each get. :rtype: :class:`~rabbitpy.Message` or None """ self._write_frame(specification.Basic.Get(queue=self.name, no_ack=not acknowledge)) return self.channel._get_message() # pylint: disable=protected-access
[docs] def ha_declare(self, nodes=None): """Declare a the queue as highly available, passing in a list of nodes the queue should live on. If no nodes are passed, the queue will be declared across all nodes in the cluster. :param list nodes: A list of nodes to declare. If left empty, queue will be declared on all cluster nodes. :return: Message count, Consumer count :rtype: tuple(int, int) """ if nodes: self.arguments['x-ha-policy'] = 'nodes' self.arguments['x-ha-nodes'] = nodes else: self.arguments['x-ha-policy'] = 'all' if 'x-ha-nodes' in self.arguments: del self.arguments['x-ha-nodes'] return self.declare()
[docs] def purge(self): """Purge the queue of all of its messages.""" self._rpc(specification.Queue.Purge())
[docs] def stop_consuming(self): """Stop consuming messages. This is usually invoked if you want to cancel your consumer from outside the context manager or generator. If you invoke this, there is a possibility that the generator method will return None instead of a :py:class:`rabbitpy.Message`. """ if utils.PYPY and not self.consuming: return if not self.consuming: raise exceptions.NotConsumingError() self.channel._cancel_consumer(self) # pylint: disable=protected-access self.consuming = False
[docs] def unbind(self, source, routing_key=None): """Unbind queue from the specified exchange where it is bound the routing key. If routing key is None, use the queue name. :type source: str or :py:class:`rabbitpy.exchange.Exchange` exchange :param source: The exchange to unbind from :param str routing_key: The routing key that binds them """ if hasattr(source, 'name'): source = source.name routing_key = routing_key or self.name self._rpc(specification.Queue.Unbind(queue=self.name, exchange=source, routing_key=routing_key))
def _consume(self, no_ack=False, prefetch=None, priority=None): """Return a :py:class:_Consumer instance as a contextmanager, properly shutting down the consumer when the generator is exited. :param bool no_ack: Do not require acknowledgements :param int prefetch: Set a prefetch count for the channel :param int priority: Consumer priority :return: _Consumer """ if prefetch: self.channel.prefetch_count(prefetch, False) # pylint: disable=protected-access self.channel._consume(self, no_ack, priority) self.consuming = True def _declare(self, passive=False): """Return a specification.Queue.Declare class pre-composed for the rpc method since this can be called multiple times. :param bool passive: Passive declare to retrieve message count and consumer count information :rtype: pamqp.specification.Queue.Declare """ arguments = dict(self.arguments) if self.expires: arguments['x-expires'] = self.expires if self.message_ttl: arguments['x-message-ttl'] = self.message_ttl if self.max_length: arguments['x-max-length'] = self.max_length if self.dead_letter_exchange: arguments['x-dead-letter-exchange'] = self.dead_letter_exchange if self.dead_letter_routing_key: arguments['x-dead-letter-routing-key'] = \ self.dead_letter_routing_key LOGGER.debug('Declaring Queue %s, durable=%s, passive=%s, ' 'exclusive=%s, auto_delete=%s, arguments=%r', self.name, self.durable, passive, self.exclusive, self.auto_delete, arguments) return specification.Queue.Declare(queue=self.name, durable=self.durable, passive=passive, exclusive=self.exclusive, auto_delete=self.auto_delete, arguments=arguments)