Queue

The Queue class is used to work with RabbitMQ queues on an open channel. The following example shows how you can create a queue using the Queue.declare method.

import rabbitpy

with rabbitpy.Connection() as connection:
    with connection.channel() as channel:
        queue = rabbitpy.Queue(channel, 'my-queue')
        queue.durable = True
        queue.declare()

To consume messages you can iterate over the Queue object itself if the defaults for the Queue.__iter__() method work for your needs:

with conn.channel() as channel:
    for message in rabbitpy.Queue(channel, 'example'):
        print 'Message: %r' % message
        message.ack()

or by the Queue.consume() method if you would like to specify no_ack, prefetch_count, or priority:

with conn.channel() as channel:
    queue = rabbitpy.Queue(channel, 'example')
    for message in queue.consume():
        print 'Message: %r' % message
        message.ack()

Warning

If you use either the Queue as an iterator method or Queue.consume() method of consuming messages in PyPy, you must manually invoke Queue.stop_consuming(). This is due to PyPy not predictably cleaning up after the generator used for allowing the iteration over messages. Should your code want to test to see if the code is being executed in PyPy, you can evaluate the boolean rabbitpy.PYPY constant value.

API Documentation

class rabbitpy.Queue(channel, name='', durable=False, exclusive=False, auto_delete=False, max_length=None, message_ttl=None, expires=None, dead_letter_exchange=None, dead_letter_routing_key=None, arguments=None)[source]

Create and manage RabbitMQ queues.

Parameters:
  • channel (Channel) – The channel object to communicate on
  • name (str) – The name of the queue
  • exclusive (bool) – Queue can only be used by this channel and will auto-delete once the channel is closed.
  • durable (bool) – Indicates if the queue should survive a RabbitMQ is restart
  • auto_delete (bool) – Automatically delete when all consumers disconnect
  • max_length (int) – Maximum queue length
  • message_ttl (int) – Time-to-live of a message in milliseconds
  • expires (int) – Milliseconds until a queue is removed after becoming idle
  • dead_letter_exchange (str) – Dead letter exchange for rejected messages
  • dead_letter_routing_key (str) – Routing key for dead lettered messages
  • arguments (dict) – Custom arguments for the queue
Raises:

RemoteClosedChannelException

Raises:

RemoteCancellationException

__init__(channel, name='', durable=False, exclusive=False, auto_delete=False, max_length=None, message_ttl=None, expires=None, dead_letter_exchange=None, dead_letter_routing_key=None, arguments=None)[source]

Create a new Queue object instance. Only the rabbitpy.Channel object is required.

__iter__()[source]

Quick way to consume messages using defaults of no_ack=False, prefetch and priority not set.

Yields:rabbitpy.message.Message
__len__()[source]

Return the pending number of messages in the queue by doing a passive Queue declare.

Return type:int
__setattr__(name, value)[source]

Validate the data types for specific attributes when setting them, otherwise fall throw to the parent __setattr__

Parameters:
  • name (str) – The attribute to set
  • value (mixed) – The value to set
Raises:

ValueError

bind(source, routing_key=None, arguments=None)[source]

Bind the queue to the specified exchange or routing key.

Parameters:
  • source (str or rabbitpy.exchange.Exchange exchange) – The exchange to bind to
  • routing_key (str) – The routing key to use
  • arguments (dict) – Optional arguments for for RabbitMQ
Returns:

bool

consume(no_ack=False, prefetch=None, priority=None)[source]

Consume messages from the queue as a generator:

You can use this method instead of the queue object as an iterator if you need to alter the prefect count, set the consumer priority or consume in no_ack mode.

New in version 0.26.

Parameters:
  • no_ack (bool) – Do not require acknowledgements
  • prefetch (int) – Set a prefetch count for the channel
  • priority (int) – Consumer priority
Return type:

generator

Raises:

rabbitpy.exceptions.RemoteCancellationException

consume_messages(no_ack=False, prefetch=None, priority=None)[source]

Consume messages from the queue as a generator.

Warning

This method is deprecated in favor of Queue.consume() and will be removed in future releases.

Deprecated since version 0.26.

You can use this message instead of the queue object as an iterator if you need to alter the prefect count, set the consumer priority or consume in no_ack mode.

Parameters:
  • no_ack (bool) – Do not require acknowledgements
  • prefetch (int) – Set a prefetch count for the channel
  • priority (int) – Consumer priority
Return type:

Generator

Raises:

rabbitpy.exceptions.RemoteCancellationException

consumer(no_ack=False, prefetch=None, priority=None)[source]

Method for returning the contextmanager for consuming messages. You should not use this directly.

Warning

This method is deprecated and will be removed in a future release.

Deprecated since version 0.26.

Parameters:
  • no_ack (bool) – Do not require acknowledgements
  • prefetch (int) – Set a prefetch count for the channel
  • priority (int) – Consumer priority
Returns:

None

declare(passive=False)[source]

Declare the queue on the RabbitMQ channel passed into the constructor, returning the current message count for the queue and its consumer count as a tuple.

Parameters:passive (bool) – Passive declare to retrieve message count and consumer count information
Returns:Message count, Consumer count
Return type:tuple(int, int)
delete(if_unused=False, if_empty=False)[source]

Delete the queue

Parameters:
  • if_unused (bool) – Delete only if unused
  • if_empty (bool) – Delete only if empty
get(acknowledge=True)[source]

Request a single message from RabbitMQ using the Basic.Get AMQP command.

Parameters:acknowledge (bool) – Let RabbitMQ know if you will manually acknowledge or negatively acknowledge the message after each get.
Return type:rabbitpy.message.Message or None
ha_declare(nodes=None)[source]

Declare a the queue as highly available, passing in a list of nodes the queue should live on. If no nodes are passed, the queue will be declared across all nodes in the cluster.

Parameters:nodes (list) – A list of nodes to declare. If left empty, queue will be declared on all cluster nodes.
Returns:Message count, Consumer count
Return type:tuple(int, int)
purge()[source]

Purge the queue of all of its messages.

stop_consuming()[source]

Stop consuming messages. This is usually invoked if you want to cancel your consumer from outside the context manager or generator.

If you invoke this, there is a possibility that the generator method will return None instead of a rabbitpy.Message.

unbind(source, routing_key=None)[source]

Unbind queue from the specified exchange where it is bound the routing key. If routing key is None, use the queue name.

Parameters:
  • source (str or rabbitpy.exchange.Exchange exchange) – The exchange to unbind from
  • routing_key (str) – The routing key that binds them