AMQP Adapter¶
While the core rabbitpy API strives to provide an easy to use, Pythonic interface
for RabbitMQ, some developers may prefer a less opinionated AMQP interface. The
rabbitpy.AMQP
adapter provides a more traditional AMQP client library
API seen in libraries like pika.
New in version 0.26.
Example¶
The following example will connect to RabbitMQ and use the rabbitpy.AMQP
adapter to consume and acknowledge messages.
import rabbitpy
with rabbitpy.Connection() as conn:
with conn.channel() as channel:
amqp = rabbitpy.AMQP(channel)
for message in amqp.basic_consume('queue-name'):
print(message)
API Documentation¶
-
class
rabbitpy.
AMQP
(channel)[source]¶ The AMQP Adapter provides a more generic, non-opinionated interface to RabbitMQ by providing methods that map to the AMQP API.
Parameters: channel (rabbitmq.channel.Channel) – The channel to use -
basic_ack
(delivery_tag=0, multiple=False)[source]¶ Acknowledge one or more messages
This method acknowledges one or more messages delivered via the Deliver or Get-Ok methods. The client can ask to confirm a single message or a set of messages up to and including a specific message.
Parameters: - delivery_tag (int|long) – Server-assigned delivery tag
- multiple (bool) – Acknowledge multiple messages
-
basic_cancel
(consumer_tag='', nowait=False)[source]¶ End a queue consumer
This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel- ok reply.
Parameters:
-
basic_consume
(queue='', consumer_tag='', no_local=False, no_ack=False, exclusive=False, nowait=False, arguments=None)[source]¶ Start a queue consumer
This method asks the server to start a “consumer”, which is a transient request for messages from a specific queue. Consumers last as long as the channel they were declared on, or until the client cancels them.
This method will act as an generator, returning messages as they are delivered from the server.
Example use:
for message in basic_consume(queue_name): print message.body message.ack()
Parameters: - queue (str) – The queue name to consume from
- consumer_tag (str) – The consumer tag
- no_local (bool) – Do not deliver own messages
- no_ack (bool) – No acknowledgement needed
- exclusive (bool) – Request exclusive access
- nowait (bool) – Do not send a reply method
- arguments (dict) – Arguments for declaration
-
basic_get
(queue='', no_ack=False)[source]¶ Direct access to a queue
This method provides a direct access to the messages in a queue using a synchronous dialogue that is designed for specific types of application where synchronous functionality is more important than performance.
Parameters:
-
basic_nack
(delivery_tag=0, multiple=False, requeue=True)[source]¶ Reject one or more incoming messages.
This method allows a client to reject one or more incoming messages. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue. This method is also used by the server to inform publishers on channels in confirm mode of unhandled messages. If a publisher receives this method, it probably needs to republish the offending messages.
Parameters:
-
basic_publish
(exchange='', routing_key='', body='', properties=None, mandatory=False, immediate=False)[source]¶ Publish a message
This method publishes a message to a specific exchange. The message will be routed to queues as defined by the exchange configuration and distributed to any active consumers when the transaction, if any, is committed.
Parameters: Returns: bool or None
-
basic_qos
(prefetch_size=0, prefetch_count=0, global_flag=False)[source]¶ Specify quality of service
This method requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection. The particular properties and semantics of a qos method always depend on the content class semantics. Though the qos method could in principle apply to both peers, it is currently meaningful only for the server.
Parameters:
-
basic_recover
(requeue=False)[source]¶ Redeliver unacknowledged messages
This method asks the server to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered. This method replaces the asynchronous Recover.
Parameters: requeue (bool) – Requeue the message
-
basic_reject
(delivery_tag=0, requeue=True)[source]¶ Reject an incoming message
This method allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue.
Parameters: - delivery_tag (int|long) – Server-assigned delivery tag
- requeue (bool) – Requeue the message
-
confirm_select
()[source]¶ This method sets the channel to use publisher acknowledgements. The client can only use this method on a non-transactional channel.
-
exchange_bind
(destination='', source='', routing_key='', nowait=False, arguments=None)[source]¶ Bind exchange to an exchange.
This method binds an exchange to an exchange.
Parameters:
-
exchange_declare
(exchange='', exchange_type='direct', passive=False, durable=False, auto_delete=False, internal=False, nowait=False, arguments=None)[source]¶ Verify exchange exists, create if needed
This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class.
Parameters: - exchange (str) – The exchange name
- exchange_type (str) – Exchange type
- passive (bool) – Do not create exchange
- durable (bool) – Request a durable exchange
- auto_delete (bool) – Automatically delete when not in use
- internal (bool) – Deprecated
- nowait (bool) – Do not send a reply method
- arguments (dict) – Arguments for declaration
-
exchange_delete
(exchange='', if_unused=False, nowait=False)[source]¶ Delete an exchange
This method deletes an exchange. When an exchange is deleted all queue bindings on the exchange are cancelled.
Parameters:
-
exchange_unbind
(destination='', source='', routing_key='', nowait=False, arguments=None)[source]¶ Unbind an exchange from an exchange.
This method unbinds an exchange from an exchange.
Parameters:
-
queue_bind
(queue='', exchange='', routing_key='', nowait=False, arguments=None)[source]¶ Bind queue to an exchange
This method binds a queue to an exchange. Until a queue is bound it will not receive any messages. In a classic messaging model, store-and- forward queues are bound to a direct exchange and subscription queues are bound to a topic exchange.
Parameters:
-
queue_declare
(queue='', passive=False, durable=False, exclusive=False, auto_delete=False, nowait=False, arguments=None)[source]¶ Declare queue, create if needed
This method creates or checks a queue. When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue.
Parameters:
-
queue_delete
(queue='', if_unused=False, if_empty=False, nowait=False)[source]¶ Delete a queue
This method deletes a queue. When a queue is deleted any pending messages are sent to a dead-letter queue if this is defined in the server configuration, and all consumers on the queue are cancelled.
Parameters:
-
queue_purge
(queue='', nowait=False)[source]¶ Purge a queue
This method removes all messages from a queue which are not awaiting acknowledgment.
Parameters:
-
queue_unbind
(queue='', exchange='', routing_key='', arguments=None)[source]¶ Unbind a queue from an exchange
This method unbinds a queue from an exchange.
Parameters:
-
tx_commit
()[source]¶ Commit the current transaction
This method commits all message publications and acknowledgments performed in the current transaction. A new transaction starts immediately after a commit.
-
tx_rollback
()[source]¶ Abandon the current transaction
This method abandons all message publications and acknowledgments performed in the current transaction. A new transaction starts immediately after a rollback. Note that unacked messages will not be automatically redelivered by rollback; if that is required an explicit recover call should be issued.
-