Source code for rabbitpy.connection

"""
The Connection class negotiates and manages the connection state.

"""
import logging
try:
    import queue
except ImportError:
    import Queue as queue
import socket
try:
    import ssl
except ImportError:
    ssl = None
import threading
import time

from pamqp import specification

from rabbitpy import base
from rabbitpy import io
from rabbitpy import channel
from rabbitpy import channel0
from rabbitpy import events
from rabbitpy import exceptions
from rabbitpy import message
from rabbitpy import utils

LOGGER = logging.getLogger(__name__)

AMQP = 'amqp'
AMQPS = 'amqps'

if ssl:
    SSL_CERT_MAP = {'ignore': ssl.CERT_NONE,
                    'optional': ssl.CERT_OPTIONAL,
                    'required': ssl.CERT_REQUIRED}
    SSL_VERSION_MAP = dict()
    if hasattr(ssl, 'PROTOCOL_SSLv2'):
        SSL_VERSION_MAP['SSLv2'] = ssl.PROTOCOL_SSLv2
    if hasattr(ssl, 'PROTOCOL_SSLv3'):
        SSL_VERSION_MAP['SSLv3'] = ssl.PROTOCOL_SSLv3
    if hasattr(ssl, 'PROTOCOL_SSLv23'):
        SSL_VERSION_MAP['SSLv23'] = ssl.PROTOCOL_SSLv23
    if hasattr(ssl, 'PROTOCOL_TLSv1'):
        SSL_VERSION_MAP['TLSv1'] = ssl.PROTOCOL_TLSv1
else:
    SSL_CERT_MAP, SSL_VERSION_MAP = dict(), dict()


[docs]class Connection(base.StatefulObject): """The Connection object is responsible for negotiating a connection and managing its state. When creating a new instance of the Connection object, if no URL is passed in, it uses the default connection parameters of localhost port 5672, virtual host / with the guest/guest username/password combination. Represented as a AMQP URL the connection information is: :code:`amqp://guest:guest@localhost:5672/%2F` To use a different connection, pass in a AMQP URL that follows the standard format: :code:`[scheme]://[username]:[password]@[host]:[port]/[virtual_host]` The following example connects to the test virtual host on a RabbitMQ server running at 192.168.1.200 port 5672 as the user "www" and the password rabbitmq: :code:`amqp://admin192.168.1.200:5672/test` .. note:: You should be aware that most connection exceptions may be raised during the use of all functionality in the library. :param str url: The AMQP connection URL :raises: rabbitpy.exceptions.AMQPException :raises: rabbitpy.exceptions.ConnectionException :raises: rabbitpy.exceptions.ConnectionResetException :raises: rabbitpy.exceptions.RemoteClosedException """ CANCEL_METHOD = ['Basic.Cancel'] DEFAULT_CHANNEL_MAX = 65535 DEFAULT_HEARTBEAT_INTERVAL = 300 DEFAULT_LOCALE = 'en_US' DEFAULT_URL = 'amqp://guest:guest@localhost:5672/%2F' DEFAULT_VHOST = '%2F' GUEST = 'guest' PORTS = {'amqp': 5672, 'amqps': 5671, 'api': 15672} QUEUE_WAIT = 0.01 def __init__(self, url=None): """Create a new instance of the Connection object""" super(Connection, self).__init__() # Create a name for the connection self._name = '0x%x' % id(self) # Extract parts of connection URL for use later self._args = self._process_url(url or self.DEFAULT_URL) # General events and queues shared across threads self._events = events.Events() # A queue for the child threads to put exceptions in self._exceptions = queue.Queue() # One queue for writing frames, regardless of the channel sending them self._write_queue = queue.Queue() # Lock used when managing the channel stack self._channel_lock = threading.Lock() # Attributes for core object threads self._channel0 = None self._channels = dict() self._io = None # Used by Message for breaking up body frames self._max_frame_size = None # Connect to RabbitMQ self._connect() def __enter__(self): """For use as a context manager, return a handle to this object instance. :rtype: Connection """ return self def __exit__(self, exc_type, exc_val, exc_tb): """When leaving the context, examine why the context is leaving, if it's an exception or what. """ if exc_type and exc_val: self._set_state(self.CLOSED) raise self._set_state(self.CLOSED) self._shutdown_connection(True) @property def blocked(self): """Indicates if the connection is blocked from publishing by RabbitMQ. This flag indicates communication from RabbitMQ that the connection is blocked using the Connection.Blocked RPC notification from RabbitMQ that was added in RabbitMQ 3.2. @TODO If RabbitMQ version < 3.2, use the HTTP management API to query the value :rtype: bool """ return self._events.is_set(events.CONNECTION_BLOCKED)
[docs] def channel(self, blocking_read=False): """Create a new channel If blocking_read is True, the cross-thread Queue.get use will use blocking operations that lower resource utilization and increase throughput. However, due to how Python's blocking Queue.get is implemented, KeyboardInterrupt is not raised when CTRL-C is pressed. :param bool blocking_read: Enable for higher throughput :raises: rabbitpy.exceptions.AMQPException :raises: rabbitpy.exceptions.RemoteClosedChannelException """ with self._channel_lock: channel_id = self._get_next_channel_id() channel_frames = queue.Queue() self._channels[channel_id] = \ channel.Channel(channel_id, self.capabilities, self._events, self._exceptions, channel_frames, self._write_queue, self._max_frame_size, self._io.write_trigger, blocking_read) self._add_channel_to_io(self._channels[channel_id], channel_frames) self._channels[channel_id].open() return self._channels[channel_id]
[docs] def close(self): """Close the connection, including all open channels""" if not self.closed: self._set_state(self.CLOSING) # Shutdown the IO thread and socket self._shutdown_connection() # Set state and clear out remote name self._set_state(self.CLOSED)
@property def capabilities(self): """Return the RabbitMQ Server capabilities from the connection negotiation process. :rtype: dict """ return self._channel0.properties.get(b'capabilities', dict()) @property def server_properties(self): """Return the RabbitMQ Server properties from the connection negotiation process. :rtype: dict """ return self._channel0.properties def _add_channel_to_io(self, channel_id, channel_queue): """Add a channel and queue to the IO object. :param Queue.Queue channel_queue: Channel inbound msg queue :param rabbitpy.base.AMQPChannel: The channel to add """ LOGGER.debug('Adding channel %s to io', int(channel_id)) self._io.add_channel(channel_id, channel_queue) @property def _api_credentials(self): """Return the auth credentials as a tuple @rtype: tuple """ return self._args['username'], self._args['password'] def _close_channels(self): """Close all the channels that are currently open.""" for channel_id in self._channels: if (self._channels[channel_id].open and not self._channels[channel_id].closing): self._channels[channel_id].close() def _connect(self): """Connect to the RabbitMQ Server""" self._set_state(self.OPENING) # Create and start the IO object that reads, writes & dispatches frames self._io = self._create_io_thread() self._io.daemon = True self._io.start() # Wait for IO to connect to the socket or raise an exception while self.opening and not self._events.is_set(events.SOCKET_OPENED): if not self._exceptions.empty(): exception = self._exceptions.get() raise exception self._events.wait(events.SOCKET_OPENED) # If the socket could not be opened, return instead of waiting if self.closed: return self.close() # Create the Channel0 queue and add it to the IO thread self._channel0 = self._create_channel0() self._add_channel_to_io(self._channel0, None) self._channel0.start() # Wait for Channel0 to raise an exception or negotiate the connection while not self._channel0.open: if not self._exceptions.empty(): exception = self._exceptions.get() self._io.stop() raise exception time.sleep(0.01) # Set the maximum frame size for channel use self._max_frame_size = self._channel0.maximum_frame_size def _create_channel0(self): """Each connection should have a distinct channel0 :rtype: rabbitpy.channel0.Channel0 """ return channel0.Channel0(connection_args=self._args, events_obj=self._events, exception_queue=self._exceptions, write_queue=self._write_queue, write_trigger=self._io.write_trigger) def _create_io_thread(self): """Create the IO thread and the objects it uses for communication. :rtype: rabbitpy.io.IO """ return io.IO(name='%s-io' % self._name, kwargs={'events': self._events, 'exceptions': self._exceptions, 'connection_args': self._args, 'write_queue': self._write_queue}) def _create_message(self, channel_id, method_frame, header_frame, body): """Create a message instance with the channel it was received on and the dictionary of message parts. :param int channel_id: The channel id the message was sent on :param pamqp.specification.Frame method_frame: The method frame value :param pamqp.header.ContentHeader header_frame: The header frame value :param str body: The message body :rtype: rabbitpy.message.Message """ msg = message.Message(self._channels[channel_id], body, header_frame.properties.to_dict()) msg.method = method_frame msg.name = method_frame.name return msg def _get_next_channel_id(self): """Return the next channel id :rtype: int """ if not self._channels: return 1 if self._max_channel_id == self._channel0.maximum_channels: raise exceptions.TooManyChannelsError return self._max_channel_id + 1 @staticmethod def _get_ssl_validation(values): """Return the value mapped from the string value in the query string for the AMQP URL specifying which level of server certificate validation is required, if any. :param dict values: The dict of query values from the AMQP URI :rtype: int """ validation = (values.get('verify', [None])[0] or values.get('ssl_validation', [None])[0]) if validation is None: return None if validation not in SSL_CERT_MAP: raise ValueError('Unsupported server cert validation option: %s', validation) return SSL_CERT_MAP[validation] @staticmethod def _get_ssl_version(values): """Return the value mapped from the string value in the query string for the AMQP URL for SSL version. :param dict values: The dict of query values from the AMQP URI :rtype: int """ version = values.get('ssl_version', [None])[0] if version is None: return None if version not in SSL_VERSION_MAP: raise ValueError('Unuspported SSL version: %s' % version) return SSL_VERSION_MAP[version] @property def _max_channel_id(self): return max(list(self._channels.keys())) @staticmethod def _normalize_expectations(channel_id, expectations): """Turn a class or list of classes into a list of class names. :param expectations: List of classes or class name or class obj :type expectations: list or str or pamqp.specification.Frame :rtype: list """ if isinstance(expectations, list): output = list() for value in expectations: if isinstance(value, str): output.append('%i:%s' % (channel_id, value)) else: output.append('%i:%s' % (channel_id, value.name)) return output elif utils.is_string(expectations): return ['%i:%s' % (channel_id, expectations)] return ['%i:%s' % (channel_id, expectations.name)] def _process_url(self, url): """Parse the AMQP URL passed in and return the configuration information in a dictionary of values. The URL format is as follows: amqp[s]://username:password@host:port/virtual_host[?query string] Values in the URL such as the virtual_host should be URL encoded or quoted just as a URL would be in a web browser. The default virtual host / in RabbitMQ should be passed as %2F. Default values: - If port is omitted, port 5762 is used for AMQP and port 5671 is used for AMQPS - If username or password is omitted, the default value is guest - If the virtual host is omitted, the default value of %2F is used Query string options: - heartbeat - channel_max - frame_max - locale - cacertfile - Path to CA certificate file - certfile - Path to client certificate file - keyfile - Path to client certificate key - verify - Server certificate validation requirements (1) - ssl_version - SSL version to use (2) (1) Should be one of three values: - ignore - Ignore the cert if provided (default) - optional - Cert is validated if provided - required - Cert is required and validated (2) Should be one of four values: - SSLv2 - SSLv3 - SSLv23 - TLSv1 :param str url: The AMQP url passed in :rtype: dict :raises: ValueError """ parsed = utils.urlparse(url) # Ensure the protocol scheme is what is expected if parsed.scheme not in list(self.PORTS.keys()): raise ValueError('Unsupported protocol: %s' % parsed.scheme) # Toggle the SSL flag based upon the URL scheme use_ssl = True if parsed.scheme == 'amqps' else False # Ensure that SSL is available if SSL is requested if use_ssl and not ssl: LOGGER.warning('SSL requested but not available, disabling') use_ssl = False # Use the default ports if one is not specified port = parsed.port or (self.PORTS[AMQPS] if parsed.scheme == AMQPS else self.PORTS[AMQP]) # Set the vhost to be after the base slash if it was specified vhost = parsed.path[1:] if parsed.path else self.DEFAULT_VHOST # If the path was just the base path, set the vhost to the default if not vhost: vhost = self.DEFAULT_VHOST # Parse the query string query_values = utils.parse_qs(parsed.query) channel_max = int(query_values.get('channel_max', [None])[0] or self.DEFAULT_CHANNEL_MAX) frame_max = int(query_values.get('frame_max', [None])[0] or specification.FRAME_MAX_SIZE) heartbeat = int(query_values.get('heartbeat', [None])[0] or self.DEFAULT_HEARTBEAT_INTERVAL) # Return the configuration dictionary to use when connecting return {'host': parsed.hostname, 'port': port, 'virtual_host': utils.unquote(vhost), 'username': parsed.username or self.GUEST, 'password': parsed.password or self.GUEST, 'heartbeat': heartbeat, 'frame_max': frame_max, 'channel_max': channel_max, 'locale': query_values.get('locale', [None])[0], 'ssl': use_ssl, 'cacertfile': (query_values.get('cacertfile', [None])[0] or query_values.get('ssl_cacert', [None])[0]), 'certfile': (query_values.get('certfile', [None])[0] or query_values.get('ssl_cert', [None])[0]), 'keyfile': (query_values.get('keyfile', [None])[0] or query_values.get('ssl_key', [None])[0]), 'verify': self._get_ssl_validation(query_values), 'ssl_version': self._get_ssl_version(query_values)} def _shutdown_connection(self, force=False): """Tell Channel0 and IO to stop if they are not stopped. :param force: Force the connection to shutdown without AMQP negotiation :type force: bool """ if not force and not self._io.is_alive(): self._set_state(self.CLOSED) LOGGER.debug('Cant shutdown connection, IO is no longer alive') return # Close any open channels for chan_id in [chan_id for chan_id in self._channels if not self._channels[chan_id].closed]: if force: self._channels[chan_id]._force_close() else: self._channels[chan_id].close() # If the connection is still established, close it if (self._channel0.open and not self._events.is_set(events.CHANNEL0_CLOSED)): self._channel0.close() # Loop while Channel 0 closes LOGGER.debug('Waiting on channel0 to close') while not self._channel0.closed and self._io.is_alive(): LOGGER.debug('Waiting on channel0 to close') time.sleep(0.1) LOGGER.debug('channel0 closed') # Close the socket if (self._events.is_set(events.SOCKET_OPENED) and not self._events.is_set(events.SOCKET_CLOSED)): LOGGER.debug('Requesting IO socket close') self._events.set(events.SOCKET_CLOSE) # Break out of select waiting self._trigger_write() LOGGER.debug('Waiting on socket to close') self._events.wait(events.SOCKET_CLOSED, 0.1) while self._io.is_alive(): time.sleep(0.25) def _trigger_write(self): """Notifies the IO loop we need to write a frame by writing a byte to a local socket. """ try: self._io.write_trigger.send(b'0') except socket.error: pass