AMQP Adapter

While the core rabbitpy API strives to provide an easy to use, Pythonic interface for RabbitMQ, some developers may prefer a less opinionated AMQP interface. The rabbitpy.AMQP adapter provides a more traditional AMQP client library API seen in libraries like pika.

New in version 0.26.

Example

The following example will connect to RabbitMQ and use the rabbitpy.AMQP adapter to consume and acknowledge messages.

import rabbitpy

with rabbitpy.Connection() as conn:
    with conn.channel() as channel:
        amqp = rabbitpy.AMQP(channel)

        for message in amqp.basic_consume('queue-name'):
            print(message)

API Documentation

class rabbitpy.AMQP(channel)[source]

The AMQP Adapter provides a more generic, non-opinionated interface to RabbitMQ by providing methods that map to the AMQP API.

Parameters:channel (rabbitmq.channel.Channel) – The channel to use
basic_ack(delivery_tag=0, multiple=False)[source]

Acknowledge one or more messages

This method acknowledges one or more messages delivered via the Deliver or Get-Ok methods. The client can ask to confirm a single message or a set of messages up to and including a specific message.

Parameters:
  • delivery_tag (int|long) – Server-assigned delivery tag
  • multiple (bool) – Acknowledge multiple messages
basic_cancel(consumer_tag='', nowait=False)[source]

End a queue consumer

This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel- ok reply.

Parameters:
  • consumer_tag (str) – Consumer tag
  • nowait (bool) – Do not send a reply method
basic_consume(queue='', consumer_tag='', no_local=False, no_ack=False, exclusive=False, nowait=False, arguments=None)[source]

Start a queue consumer

This method asks the server to start a “consumer”, which is a transient request for messages from a specific queue. Consumers last as long as the channel they were declared on, or until the client cancels them.

This method will act as an generator, returning messages as they are delivered from the server.

Example use:

for message in basic_consume(queue_name):
    print message.body
    message.ack()
Parameters:
  • queue (str) – The queue name to consume from
  • consumer_tag (str) – The consumer tag
  • no_local (bool) – Do not deliver own messages
  • no_ack (bool) – No acknowledgement needed
  • exclusive (bool) – Request exclusive access
  • nowait (bool) – Do not send a reply method
  • arguments (dict) – Arguments for declaration
basic_get(queue='', no_ack=False)[source]

Direct access to a queue

This method provides a direct access to the messages in a queue using a synchronous dialogue that is designed for specific types of application where synchronous functionality is more important than performance.

Parameters:
  • queue (str) – The queue name
  • no_ack (bool) – No acknowledgement needed
basic_nack(delivery_tag=0, multiple=False, requeue=True)[source]

Reject one or more incoming messages.

This method allows a client to reject one or more incoming messages. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue. This method is also used by the server to inform publishers on channels in confirm mode of unhandled messages. If a publisher receives this method, it probably needs to republish the offending messages.

Parameters:
  • delivery_tag (int|long) – Server-assigned delivery tag
  • multiple (bool) – Reject multiple messages
  • requeue (bool) – Requeue the message
basic_publish(exchange='', routing_key='', body='', properties=None, mandatory=False, immediate=False)[source]

Publish a message

This method publishes a message to a specific exchange. The message will be routed to queues as defined by the exchange configuration and distributed to any active consumers when the transaction, if any, is committed.

Parameters:
  • exchange (str) – The exchange name
  • routing_key (str) – Message routing key
  • body (str|bytes) – The message body
  • properties (dict) – AMQP message properties
  • mandatory (bool) – Indicate mandatory routing
  • immediate (bool) – Request immediate delivery
Returns:

bool or None

basic_qos(prefetch_size=0, prefetch_count=0, global_flag=False)[source]

Specify quality of service

This method requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection. The particular properties and semantics of a qos method always depend on the content class semantics. Though the qos method could in principle apply to both peers, it is currently meaningful only for the server.

Parameters:
  • prefetch_size (int|long) – Prefetch window in octets
  • prefetch_count (int) – Prefetch window in messages
  • global_flag (bool) – Apply to entire connection
basic_recover(requeue=False)[source]

Redeliver unacknowledged messages

This method asks the server to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered. This method replaces the asynchronous Recover.

Parameters:requeue (bool) – Requeue the message
basic_reject(delivery_tag=0, requeue=True)[source]

Reject an incoming message

This method allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue.

Parameters:
  • delivery_tag (int|long) – Server-assigned delivery tag
  • requeue (bool) – Requeue the message
confirm_select()[source]

This method sets the channel to use publisher acknowledgements. The client can only use this method on a non-transactional channel.

exchange_bind(destination='', source='', routing_key='', nowait=False, arguments=None)[source]

Bind exchange to an exchange.

This method binds an exchange to an exchange.

Parameters:
  • destination (str) – The destination exchange name
  • source (str) – The source exchange name
  • routing_key (str) – The routing key to bind with
  • nowait (bool) – Do not send a reply method
  • arguments (dict) – Optional arguments
exchange_declare(exchange='', exchange_type='direct', passive=False, durable=False, auto_delete=False, internal=False, nowait=False, arguments=None)[source]

Verify exchange exists, create if needed

This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class.

Parameters:
  • exchange (str) – The exchange name
  • exchange_type (str) – Exchange type
  • passive (bool) – Do not create exchange
  • durable (bool) – Request a durable exchange
  • auto_delete (bool) – Automatically delete when not in use
  • internal (bool) – Deprecated
  • nowait (bool) – Do not send a reply method
  • arguments (dict) – Arguments for declaration
exchange_delete(exchange='', if_unused=False, nowait=False)[source]

Delete an exchange

This method deletes an exchange. When an exchange is deleted all queue bindings on the exchange are cancelled.

Parameters:
  • exchange (str) – The exchange name
  • if_unused (bool) – Delete only if unused
  • nowait (bool) – Do not send a reply method
exchange_unbind(destination='', source='', routing_key='', nowait=False, arguments=None)[source]

Unbind an exchange from an exchange.

This method unbinds an exchange from an exchange.

Parameters:
  • destination (str) – The destination exchange name
  • source (str) – The source exchange name
  • routing_key (str) – The routing key to bind with
  • nowait (bool) – Do not send a reply method
  • arguments (dict) – Optional arguments
queue_bind(queue='', exchange='', routing_key='', nowait=False, arguments=None)[source]

Bind queue to an exchange

This method binds a queue to an exchange. Until a queue is bound it will not receive any messages. In a classic messaging model, store-and- forward queues are bound to a direct exchange and subscription queues are bound to a topic exchange.

Parameters:
  • queue (str) – The queue name
  • exchange (str) – Name of the exchange to bind to
  • routing_key (str) – Message routing key
  • nowait (bool) – Do not send a reply method
  • arguments (dict) – Arguments for binding
queue_declare(queue='', passive=False, durable=False, exclusive=False, auto_delete=False, nowait=False, arguments=None)[source]

Declare queue, create if needed

This method creates or checks a queue. When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue.

Parameters:
  • queue (str) – The queue name
  • passive (bool) – Do not create queue
  • durable (bool) – Request a durable queue
  • exclusive (bool) – Request an exclusive queue
  • auto_delete (bool) – Auto-delete queue when unused
  • nowait (bool) – Do not send a reply method
  • arguments (dict) – Arguments for declaration
queue_delete(queue='', if_unused=False, if_empty=False, nowait=False)[source]

Delete a queue

This method deletes a queue. When a queue is deleted any pending messages are sent to a dead-letter queue if this is defined in the server configuration, and all consumers on the queue are cancelled.

Parameters:
  • queue (str) – The queue name
  • if_unused (bool) – Delete only if unused
  • if_empty (bool) – Delete only if empty
  • nowait (bool) – Do not send a reply method
queue_purge(queue='', nowait=False)[source]

Purge a queue

This method removes all messages from a queue which are not awaiting acknowledgment.

Parameters:
  • queue (str) – The queue name
  • nowait (bool) – Do not send a reply method
queue_unbind(queue='', exchange='', routing_key='', arguments=None)[source]

Unbind a queue from an exchange

This method unbinds a queue from an exchange.

Parameters:
  • queue (str) – The queue name
  • exchange (str) – The exchange name
  • routing_key (str) – Routing key of binding
  • arguments (dict) – Arguments of binding
tx_commit()[source]

Commit the current transaction

This method commits all message publications and acknowledgments performed in the current transaction. A new transaction starts immediately after a commit.

tx_rollback()[source]

Abandon the current transaction

This method abandons all message publications and acknowledgments performed in the current transaction. A new transaction starts immediately after a rollback. Note that unacked messages will not be automatically redelivered by rollback; if that is required an explicit recover call should be issued.

tx_select()[source]

Select standard transaction mode

This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods.