Source code for rabbitpy.channel

"""
Implement the base channel construct that is used by rabbitpy objects
such as :py:class:`Exchange <rabbitpy.Exchange>` or
:py:class:`Exchange <rabbitpy.Queue>`. It is responsible for coordinating
the communication between the IO thread and the higher-level objects.

"""
import logging
try:
    import queue as Queue
except ImportError:
    import Queue

from pamqp import specification as spec
from pamqp import PYTHON3

from rabbitpy import base
from rabbitpy import exceptions
from rabbitpy import message

LOGGER = logging.getLogger(__name__)

BASIC_DELIVER = 'Basic.Deliver'
CONTENT_BODY = 'ContentBody'
CONTENT_HEADER = 'ContentHeader'


[docs]class Channel(base.AMQPChannel): """The Channel object is the communications object used by Exchanges, Messages, Queues, and Transactions. It is created by invoking the :py:meth:`rabbitpy.Connection.channel() <rabbitpy.connection.Connection.channel>` method. It can act as a context manager, allowing for quick shorthand use: .. code:: python with connection.channel(): # Do something To create a new channel, invoke py:meth:`rabbitpy.connection.Connection.channel` To improve performance, pass blocking_read to True. Note that doing so prevents ``KeyboardInterrupt``/CTRL-C from exiting the Python interpreter. :param int channel_id: The channel # to use for this instance :param dict server_capabilities: Features the server supports :param events rabbitpy.Events: Event management object :param queue.Queue exception_queue: Exception queue :param queue.Queue read_queue: Queue to read pending frames from :param queue.Queue write_queue: Queue to write pending AMQP objs to :param int maximum_frame_size: The max frame size for msg bodies :param socket write_trigger: Write to this socket to break IO waiting :param bool blocking_read: Use blocking Queue.get to improve performance :raises: rabbitpy.exceptions.RemoteClosedChannelException :raises: rabbitpy.exceptions.AMQPException """ STATES = base.AMQPChannel.STATES STATES[0x04] = 'Remotely Closed' def __init__(self, channel_id, server_capabilities, events, exception_queue, read_queue, write_queue, maximum_frame_size, write_trigger, blocking_read=False): """Create a new instance of the Channel class""" super(Channel, self).__init__(exception_queue, write_trigger, blocking_read) self._channel_id = channel_id self._consumers = {} self._consuming = False self._events = events self._maximum_frame_size = maximum_frame_size self._publisher_confirms = False self._read_queue = read_queue self._write_queue = write_queue self._server_capabilities = server_capabilities def __enter__(self): """For use as a context manager, return a handle to this object instance. :rtype: Channel """ return self def __exit__(self, exc_type, exc_val, unused_exc_tb): """When leaving the context, examine why the context is leaving, if it's an exception or what. """ if exc_type and exc_val: LOGGER.debug('Exiting due to exception: %r', exc_val) self._set_state(self.CLOSED) raise if self.open: self.close()
[docs] def close(self): """Close the channel, cancelling any active consumers, purging the read queue, while looking to see if a Basic.Nack should be sent, sending it if so. """ if self.closed: LOGGER.debug('Channel %i close invoked when already closed', self._channel_id) return self._set_state(self.CLOSING) # Empty the queue and nack the max id (and all previous) if self._consumers: delivery_tag = 0 discard_counter = 0 ack_tags = [] for queue_obj, no_ack in self._consumers.values(): self._cancel_consumer(queue_obj) if not no_ack: LOGGER.debug('Channel %i will nack messages for %s', self._channel_id, queue_obj.consumer_tag) ack_tags.append(queue_obj.consumer_tag) # If there are any ack tags, get the last msg to nack if ack_tags: while not self._read_queue.empty(): frame_value = self._get_from_read_queue() if not frame_value: break if (frame_value.name == BASIC_DELIVER and frame_value.consumer_tag in ack_tags): if delivery_tag < frame_value.delivery_tag: delivery_tag = frame_value.delivery_tag discard_counter += 1 if delivery_tag: self._multi_nack(delivery_tag) super(Channel, self).close()
[docs] def enable_publisher_confirms(self): """Turn on Publisher Confirms. If confirms are turned on, the Message.publish command will return a bool indicating if a message has been successfully published. """ if not self._supports_publisher_confirms: raise exceptions.NotSupportedError('Confirm.Select') self.rpc(spec.Confirm.Select()) self._publisher_confirms = True
@property def id(self): """Return the channel id :rtype: int """ return self._channel_id @property def maximum_frame_size(self): return self._maximum_frame_size
[docs] def open(self): """Open the channel, invoked directly upon creation by the Connection """ self._set_state(self.OPENING) self.write_frame(self._build_open_frame()) self._wait_on_frame(spec.Channel.OpenOk) self._set_state(self.OPEN) LOGGER.debug('Channel #%i open', self._channel_id)
[docs] def prefetch_count(self, value, all_channels=False): """Set a prefetch count for the channel (or all channels on the same connection). :param int value: The prefetch count to set :param bool all_channels: Set the prefetch count on all channels on the same connection """ self.rpc(spec.Basic.Qos(prefetch_count=value, global_=all_channels))
[docs] def prefetch_size(self, value, all_channels=False): """Set a prefetch size in bytes for the channel (or all channels on the same connection). :param int value: The prefetch size to set :param bool all_channels: Set the prefetch size on all channels on the same connection """ if value is None: return self.rpc(spec.Basic.Qos(prefetch_size=value, global_=all_channels))
@property def publisher_confirms(self): """Returns True if publisher confirms are enabled. :rtype: bool """ return self._publisher_confirms
[docs] def recover(self, requeue=False): """Recover all unacknowledged messages that are associated with this channel. :param bool requeue: Requeue the message """ self.rpc(spec.Basic.Recover(requeue=requeue))
@staticmethod def _build_open_frame(): """Build and return a channel open frame :rtype: pamqp.spec.Channel.Open """ return spec.Channel.Open() def _cancel_consumer(self, obj, consumer_tag=None, nowait=False): """Cancel the consuming of a queue. :param rabbitpy.amqp_queue.Queue obj: The queue to cancel """ consumer_tag = consumer_tag or obj.consumer_tag self._interrupt_wait_on_frame() if consumer_tag in self._consumers: del self._consumers[consumer_tag] self.write_frame(spec.Basic.Cancel(consumer_tag=consumer_tag)) if not nowait and not self.closed: self._wait_on_frame(spec.Basic.CancelOk) def _check_for_rpc_request(self, value): """Inspect a frame to see if it's a RPC request from RabbitMQ. :param spec.Frame value: """ LOGGER.debug('Checking for RPC request: %r', value) super(Channel, self)._check_for_rpc_request(value) if isinstance(value, spec.Basic.Return): raise exceptions.MessageReturnedException(value.reply_code, value.reply_text, value.exchange, value.routing_key) elif isinstance(value, spec.Basic.Cancel): self._waiting = False if value.consumer_tag in self._consumers: del self._consumers[value.consumer_tag] raise exceptions.RemoteCancellationException(value.consumer_tag) def _consume(self, obj, no_ack, priority=None): """Register a Queue object as a consumer, issuing Basic.Consume. :param rabbitpy.amqp_queue.Queue obj: The queue to consume :param bool no_ack: no_ack mode :param int priority: Consumer priority :raises: ValueError """ args = dict() if priority is not None: if not self._supports_consumer_priorities: raise exceptions.NotSupportedError('consumer_priorities') if not isinstance(priority, int): raise ValueError('Consumer priority must be an int') args['x-priority'] = priority self.rpc(spec.Basic.Consume(queue=obj.name, consumer_tag=obj.consumer_tag, no_ack=no_ack, arguments=args)) self._consumers[obj.consumer_tag] = (obj, no_ack) def _consume_message(self): """Get a message from the stack, blocking while doing so. If a consumer is cancelled out-of-band, we will receive a Basic.CancelOk instead. :rtype: rabbitpy.message.Message """ if not self._consumers: raise exceptions.NotConsumingError frame_value = self._wait_on_frame([spec.Basic.Deliver]) if frame_value: return self._wait_for_content_frames(frame_value) return None def _create_message(self, method_frame, header_frame, body): """Create a message instance with the channel it was received on and the dictionary of message parts. Will return None if no message can be created. :param pamqp.specification.Frame method_frame: The method frame value :param header_frame: Header frame value :type header_frame: pamqp.header.ContentHeader or None :param body: The message body :type body: str or None :rtype: rabbitpy.message.Message or None """ if not method_frame: LOGGER.warning('Received empty method_frame, returning None') return None if not header_frame: LOGGER.debug('Malformed header frame: %r', header_frame) props = header_frame.properties.to_dict() if header_frame else dict() msg = message.Message(self, body, props) msg.method = method_frame msg.name = method_frame.name return msg def _get_from_read_queue(self): """Fetch a frame from the read queue and return it, otherwise return None :rtype: pamqp.specification.Frame """ try: frame_value = self._read_queue.get(False) except Queue.Empty: return None try: self._read_queue.task_done() except ValueError: pass return frame_value def _get_message(self): """Try and get a delivered message from the connection's message stack. :rtype: rabbitpy.message.Message or None """ frame_value = self._wait_on_frame([spec.Basic.GetOk, spec.Basic.GetEmpty]) if isinstance(frame_value, spec.Basic.GetEmpty): return None return self._wait_for_content_frames(frame_value) def _multi_nack(self, delivery_tag, requeue=True): """Send a multiple negative acknowledgement, re-queueing the items :param int delivery_tag: The delivery tag for this channel :param bool requeue: Requeue the messages """ if not self._supports_basic_nack: raise exceptions.NotSupportedError('Basic.Nack') if self._is_debugging: LOGGER.debug('Sending Basic.Nack with requeue') self.rpc(spec.Basic.Nack(delivery_tag=delivery_tag, multiple=True, requeue=requeue)) def _reject_inbound_message(self, method_frame): """Used internally to reject a message when it's been received during a state that it should not have been. :param pamqp.specification.Basic.Deliver method_frame: The method frame """ self.rpc(spec.Basic.Reject(delivery_tag=method_frame.delivery_tag, requeue=True)) @property def _supports_basic_nack(self): """Indicates if the server supports Basic.Nack :rtype: bool """ return self._server_capabilities.get(b'basic.nack', False) @property def _supports_consumer_cancel_notify(self): """Indicates if the server supports sending consumer cancellation notifications :rtype: bool """ return self._server_capabilities.get(b'consumer_cancel_notify', False) @property def _supports_consumer_priorities(self): """Indicates if the server supports consumer priorities :rtype: bool """ return self._server_capabilities.get(b'consumer_priorities', False) @property def _supports_per_consumer_qos(self): """Indicates if the server supports per consumer qos :rtype: bool """ return self._server_capabilities.get(b'per_consumer_qos', False) @property def _supports_publisher_confirms(self): """Indicates if the server supports publisher confirmations :rtype: bool """ return self._server_capabilities.get(b'publisher_confirms', False) def _wait_for_content_frames(self, method_frame): """Used by both Channel._get_message and Channel._consume_message for getting a message parts off the queue and returning the fully constructed message. :param method_frame: The method frame for the message :type method_frame: Basic.Deliver or Basic.Get or Basic.Return :rtype: rabbitpy.Message """ if self.closing or self.closed: return None consuming = isinstance(method_frame, spec.Basic.Deliver) if consuming and not self._consumers: return None if self._is_debugging: LOGGER.debug('Waiting on content frames for %s: %r', method_frame.name, method_frame.delivery_tag) header_value = self._wait_on_frame(CONTENT_HEADER) if not header_value: self._reject_inbound_message(method_frame) return None self._check_for_rpc_request(header_value) body_value = bytes() if PYTHON3 else str() while len(body_value) < header_value.body_size: body_part = self._wait_on_frame(CONTENT_BODY) self._check_for_rpc_request(body_part) if not body_part: break body_value += body_part.value if len(body_value) == header_value.body_size: break if self.closing or self.closed: return None if consuming and not self._consumers: self._reject_inbound_message(method_frame) return None return self._create_message(method_frame, header_value, body_value)