Multi-threaded Use NotesΒΆ

To ensure that the network communication module at the core of rabbitpy is thread safe, the class is a daemonic Python thread that uses a combination of threading.Event, Queue.Queue, and a local cross-platform implementation of a read-write socket pair in rabbitpy.IO.write_trigger.

While ensuring that the core socket IO and dispatching of AMQP frames across threads goes a long way to make sure that multi-threaded applications can safely use rabbitpy, it does not protect against cross-thread channel utilization.

Due to the way that channels events are managed, it is recommend that you restrict the use of a channel to an individual thread. By not sharing channels across threads, you will ensure that you do not accidentally create issues with channel state in the AMQP protocol. As an asynchronous RPC style protocol, when you issue commands, such as a queue declaration, or are publishing a message, there are expectations in the conversation on a channel about the order of events and frames sent and received.

The following example uses the main Python thread to connect to RabbitMQ and then spawns a thread for publishing and a thread for consuming.

import rabbitpy
import threading

EXCHANGE = 'threading_example'
QUEUE = 'threading_queue'
ROUTING_KEY = 'test'

def consumer(connection):
    """Consume MESSAGE_COUNT messages on the connection and then exit.

    :param rabbitpy.Connection connection: The connection to consume on

    received = 0
    with as channel:
        for message in rabbitpy.Queue(channel, QUEUE).consume_messages():
            print message.body
            received += 1
            if received == MESSAGE_COUNT:

def publisher(connection):
    """Pubilsh up to MESSAGE_COUNT messages on connection
    on an individual thread.

    :param rabbitpy.Connection connection: The connection to publish on

    with as channel:
        for index in range(0, MESSAGE_COUNT):
            message = rabbitpy.Message(channel, 'Message #%i' % index)
            message.publish(EXCHANGE, ROUTING_KEY)

# Connect to RabbitMQ
with rabbitpy.Connection() as connection:

    # Open the channel, declare and bind the exchange and queue
    with as channel:

        # Declare the exchange
        exchange = rabbitpy.Exchange(channel, EXCHANGE)

        # Declare the queue
        queue = rabbitpy.Queue(channel, QUEUE)

        # Bind the queue to the exchange
        queue.bind(EXCHANGE, ROUTING_KEY)

    # Pass in the kwargs
    kwargs = {'connection': connection}

    # Start the consumer thread
    consumer_thread = threading.Thread(target=consumer, kwargs=kwargs)

    # Start the pubisher thread
    publisher_thread = threading.Thread(target=publisher, kwargs=kwargs)

    # Join the consumer thread, waiting for it to consume all MESSAGE_COUNT messages