"""
Wrapper methods for easy access to common operations, making them both less
complex and less verbose for one off or simple use cases.
"""
from rabbitpy import amqp_queue
from rabbitpy import connection
from rabbitpy import exchange
from rabbitpy import message
[docs]def consume(uri=None, queue_name=None, no_ack=False, prefetch=None,
priority=None):
"""Consume messages from the queue as a generator:
.. code:: python
for message in rabbitpy.consume('amqp://localhost/%2F', 'my_queue'):
message.ack()
:param str uri: AMQP connection URI
:param str queue_name: The name of the queue to consume from
:param bool no_ack: Do not require acknowledgements
:param int prefetch: Set a prefetch count for the channel
:param int priority: Set the consumer priority
:rtype: :py:class:`Iterator`
:raises: py:class:`ValueError`
"""
if not queue_name:
raise ValueError('You must specify a queue name to consume from')
with connection.Connection(uri) as conn:
with conn.channel() as channel:
queue = amqp_queue.Queue(channel, queue_name)
for msg in queue.consume(no_ack, prefetch, priority):
yield msg
[docs]def get(uri=None, queue_name=None):
"""Get a message from RabbitMQ, auto-acknowledging with RabbitMQ if one
is returned.
Invoke directly as ``rabbitpy.get()``
:param str uri: AMQP URI to connect to
:param str queue_name: The queue name to get the message from
:rtype: py:class:`rabbitpy.message.Message` or None
:raises: py:class:`ValueError`
"""
if not queue_name:
raise ValueError('You must specify a queue name to get a message from')
with connection.Connection(uri) as conn:
with conn.channel() as channel:
q = amqp_queue.Queue(channel, queue_name)
return q.get(False)
[docs]def publish(uri=None, exchange_name=None, routing_key=None,
body=None, properties=None, confirm=False):
"""Publish a message to RabbitMQ. This should only be used for one-off
publishing, as you will suffer a performance penalty if you use it
repeatedly instead creating a connection and channel and publishing on that
:param str uri: AMQP URI to connect to
:param str exchange_name: The exchange to publish to
:param str routing_key: The routing_key to publish with
:param str or unicode or bytes or dict or list: The message body
:param dict properties: Dict representation of Basic.Properties
:param bool confirm: Confirm this delivery with Publisher Confirms
:rtype: bool or None
"""
if exchange_name is None:
exchange_name = ''
with connection.Connection(uri) as conn:
with conn.channel() as channel:
msg = message.Message(channel, body or '', properties or dict())
if confirm:
channel.enable_publisher_confirms()
return msg.publish(exchange_name, routing_key or '')
else:
msg.publish(exchange_name, routing_key or '')
[docs]def create_queue(uri=None, queue_name='', durable=True, auto_delete=False,
max_length=None, message_ttl=None, expires=None,
dead_letter_exchange=None, dead_letter_routing_key=None,
arguments=None):
"""Create a queue with RabbitMQ. This should only be used for one-off
operations. If a queue name is omitted, the name will be automatically
generated by RabbitMQ.
:param str uri: AMQP URI to connect to
:param str queue_name: The queue name to create
:param durable: Indicates if the queue should survive a RabbitMQ is restart
:type durable: bool
:param bool auto_delete: Automatically delete when all consumers disconnect
:param int max_length: Maximum queue length
:param int message_ttl: Time-to-live of a message in milliseconds
:param expires: Milliseconds until a queue is removed after becoming idle
:type expires: int
:param dead_letter_exchange: Dead letter exchange for rejected messages
:type dead_letter_exchange: str
:param dead_letter_routing_key: Routing key for dead lettered messages
:type dead_letter_routing_key: str
:param dict arguments: Custom arguments for the queue
:raises: :py:class:`ValueError`
:raises: :py:class:`rabbitpy.RemoteClosedException`
"""
dlx_routing_key = dead_letter_routing_key
with connection.Connection(uri) as conn:
with conn.channel() as channel:
obj = amqp_queue.Queue(channel, queue_name,
durable=durable,
auto_delete=auto_delete,
max_length=max_length,
message_ttl=message_ttl,
expires=expires,
dead_letter_exchange=dead_letter_exchange,
dead_letter_routing_key=dlx_routing_key,
arguments=arguments)
obj.declare()
[docs]def delete_queue(uri=None, queue_name=None):
"""Delete a queue from RabbitMQ. This should only be used for one-off
operations.
:param str uri: AMQP URI to connect to
:param str queue_name: The queue name to delete
:rtype: bool
:raises: :py:class:`ValueError`
:raises: :py:class:`rabbitpy.RemoteClosedException`
"""
if not queue_name:
raise ValueError('You must specify a queue name to consume from')
with connection.Connection(uri) as conn:
with conn.channel() as channel:
queue = amqp_queue.Queue(channel, queue_name)
queue.delete()
[docs]def create_direct_exchange(uri=None, exchange_name=None, durable=True):
"""Create a direct exchange with RabbitMQ. This should only be used for
one-off operations.
:param str uri: AMQP URI to connect to
:param str exchange_name: The exchange name to create
:param bool durable: Exchange should survive server restarts
:raises: :py:class:`ValueError`
:raises: :py:class:`rabbitpy.RemoteClosedException`
"""
if not exchange_name:
raise ValueError('You must specify a exchange name to create')
with connection.Connection(uri) as conn:
with conn.channel() as channel:
obj = exchange.DirectExchange(channel, exchange_name,
durable=durable)
obj.declare()
[docs]def create_fanout_exchange(uri=None, exchange_name=None, durable=True):
"""Create a fanout exchange with RabbitMQ. This should only be used for
one-off operations.
:param str uri: AMQP URI to connect to
:param str exchange_name: The exchange name to create
:param bool durable: Exchange should survive server restarts
:raises: :py:class:`ValueError`
:raises: :py:class:`rabbitpy.RemoteClosedException`
"""
if not exchange_name:
raise ValueError('You must specify a exchange name to create')
with connection.Connection(uri) as conn:
with conn.channel() as channel:
obj = exchange.FanoutExchange(channel, exchange_name,
durable=durable)
obj.declare()
[docs]def create_topic_exchange(uri=None, exchange_name=None, durable=True):
"""Create an exchange from RabbitMQ. This should only be used for one-off
operations.
:param str uri: AMQP URI to connect to
:param str exchange_name: The exchange name to create
:param bool durable: Exchange should survive server restarts
:raises: :py:class:`ValueError`
:raises: :py:class:`rabbitpy.RemoteClosedException`
"""
if not exchange_name:
raise ValueError('You must specify a exchange name to create')
with connection.Connection(uri) as conn:
with conn.channel() as channel:
obj = exchange.TopicExchange(channel, exchange_name,
durable=durable)
obj.declare()
[docs]def delete_exchange(uri=None, exchange_name=None):
"""Delete an exchange from RabbitMQ. This should only be used for one-off
operations.
:param str uri: AMQP URI to connect to
:param str exchange_name: The exchange name to delete
:raises: :py:class:`ValueError`
:raises: :py:class:`rabbitpy.RemoteClosedException`
"""
if not exchange_name:
raise ValueError('You must specify a exchange name to delete')
with connection.Connection(uri) as conn:
with conn.channel() as channel:
obj = exchange.Exchange(channel, exchange_name)
obj.delete()