"""
The Message class represents a message that is sent or received and contains
methods for publishing the message, or in the case that the message was
delivered by RabbitMQ, acknowledging it, rejecting it or negatively
acknowledging it.
"""
import datetime
import json
import logging
import math
import time
import pprint
import uuid
from pamqp import body
from pamqp import header
from pamqp import specification
from rabbitpy import base
from rabbitpy import exceptions
from rabbitpy import utils
LOGGER = logging.getLogger(__name__)
# Python 2.6 does not have a memoryview object, create dummy for isinstance
try:
_PY_VERSION_CHECK = memoryview(b'foo')
except NameError:
# pylint: disable=too-few-public-methods, redefined-builtin
# pylint: disable=invalid-name, missing-docstring
class memoryview(object):
pass
class Properties(specification.Basic.Properties):
"""Proxy class for :py:class:`pamqp.specification.Basic.Properties`"""
pass
[docs]class Message(base.AMQPClass):
"""Created by both rabbitpy internally when a message is delivered or
returned from RabbitMQ and by implementing applications, the Message class
is used to publish a message to and access and respond to a message from
RabbitMQ.
When specifying properties for a message, pass in a dict of key value items
that match the AMQP Basic.Properties specification with a small caveat.
Due to an overlap in the AMQP specification and the Python keyword
:code:`type`, the :code:`type` property is referred to as
:code:`message_type`.
The following is a list of the available properties:
* app_id
* content_type
* content_encoding
* correlation_id
* delivery_mode
* expiration
* headers
* message_id
* message_type
* priority
* reply_to
* timestamp
* user_id
**Automated features**
When passing in the body value, if it is a dict or list, it will
automatically be JSON serialized and the content type ``application/json``
will be set on the message properties.
When publishing a message to RabbitMQ, if the opinionated value is ``True``
and no ``message_id`` value was passed in as a property, a UUID will be
generated and specified as a property of the message.
Additionally, if opinionated is ``True`` and the ``timestamp`` property
is not specified when passing in ``properties``, the current Unix epoch
value will be set in the message properties.
.. note:: As of 0.21.0 ``auto_id`` is deprecated in favor of
``opinionated`` and it will be removed in a future version. As of
0.22.0 ``opinionated`` is defaulted to ``False``.
:param channel: The channel object for the message object to act upon
:type channel: :py:class:`rabbitpy.channel.Channel`
:param body_value: The message body
:type body_value: str|bytes|unicode|memoryview|dict|json
:param dict properties: A dictionary of message properties
:param bool auto_id: Add a message id if no properties were passed in.
:param bool opinionated: Automatically populate properties if True
:raises KeyError: Raised when an invalid property is passed in
"""
method = None
name = 'Message'
def __init__(self, channel, body_value, properties=None, auto_id=False,
opinionated=False):
"""Create a new instance of the Message object."""
super(Message, self).__init__(channel, 'Message')
# Always have a dict of properties set
self.properties = properties or {}
# Assign the body value
if isinstance(body_value, memoryview):
self.body = bytes(body_value)
else:
# pylint: disable=redefined-variable-type
self.body = self._auto_serialize(body_value)
# Add a message id if auto_id is not turned off and it is not set
if (opinionated or auto_id) and 'message_id' not in self.properties:
if auto_id:
raise DeprecationWarning('Use opinionated instead of auto_id')
self._add_auto_message_id()
if opinionated:
if 'timestamp' not in self.properties:
self._add_timestamp()
# Enforce datetime timestamps
if 'timestamp' in self.properties:
self.properties['timestamp'] = \
self._as_datetime(self.properties['timestamp'])
# Don't let invalid property keys in
if self._invalid_properties:
msg = 'Invalid property: %s' % self._invalid_properties[0]
raise KeyError(msg)
@property
def delivery_tag(self):
"""Return the delivery tag for a message that was delivered or gotten
from RabbitMQ.
:rtype: int or None
"""
return self.method.delivery_tag if self.method else None
@property
def redelivered(self):
"""Indicates if this message may have been delivered before (but not
acknowledged)"
:rtype: bool or None
"""
return self.method.redelivered if self.method else None
@property
def routing_key(self):
"""Return the routing_key for a message that was delivered or gotten
from RabbitMQ.
:rtype: int or None
"""
return self.method.routing_key if self.method else None
@property
def exchange(self):
"""Return the source exchange for a message that was delivered or
gotten from RabbitMQ.
:rtype: string or None
"""
return self.method.exchange if self.method else None
[docs] def ack(self, all_previous=False):
"""Acknowledge receipt of the message to RabbitMQ. Will raise an
ActionException if the message was not received from a broker.
:raises: ActionException
"""
if not self.method:
raise exceptions.ActionException('Can not ack non-received '
'message')
basic_ack = specification.Basic.Ack(self.method.delivery_tag,
multiple=all_previous)
self.channel.write_frame(basic_ack)
[docs] def json(self):
"""Deserialize the message body if it is JSON, returning the value.
:rtype: any
"""
try:
return json.loads(self.body)
except TypeError: # pragma: no cover
return json.loads(self.body.decode('utf-8'))
[docs] def nack(self, requeue=False, all_previous=False):
"""Negatively acknowledge receipt of the message to RabbitMQ. Will
raise an ActionException if the message was not received from a broker.
:param bool requeue: Requeue the message
:param bool all_previous: Nack all previous unacked messages up to and
including this one
:raises: ActionException
"""
if not self.method:
raise exceptions.ActionException('Can not nack non-received '
'message')
basic_nack = specification.Basic.Nack(self.method.delivery_tag,
requeue=requeue,
multiple=all_previous)
self.channel.write_frame(basic_nack)
[docs] def pprint(self, properties=False): # pragma: no cover
"""Print a formatted representation of the message.
:param bool properties: Include properties in the representation
"""
print('Exchange: %s\n' % self.method.exchange)
print('Routing Key: %s\n' % self.method.routing_key)
if properties:
print('Properties:\n')
pprint.pprint(self.properties)
print('\nBody:\n')
pprint.pprint(self.body)
[docs] def publish(self, exchange, routing_key='', mandatory=False,
immediate=False):
"""Publish the message to the exchange with the specified routing
key.
In Python 2 if the message is a ``unicode`` value it will be converted
to a ``str`` using ``str.encode('UTF-8')``. If you do not want the
auto-conversion to take place, set the body to a ``str`` or ``bytes``
value prior to publishing.
In Python 3 if the message is a ``str`` value it will be converted to
a ``bytes`` value using ``bytes(value.encode('UTF-8'))``. If you do
not want the auto-conversion to take place, set the body to a
``bytes`` value prior to publishing.
:param exchange: The exchange to publish the message to
:type exchange: str or :class:`rabbitpy.Exchange`
:param str routing_key: The routing key to use
:param bool mandatory: Requires the message is published
:param bool immediate: Request immediate delivery
:return: bool or None
:raises: rabbitpy.exceptions.MessageReturnedException
"""
if isinstance(exchange, base.AMQPClass):
exchange = exchange.name
# Coerce the body to the proper type
payload = utils.maybe_utf8_encode(self.body)
frames = [specification.Basic.Publish(exchange=exchange,
routing_key=routing_key or '',
mandatory=mandatory,
immediate=immediate),
header.ContentHeader(body_size=len(payload),
properties=self._properties)]
# Calculate how many body frames are needed
pieces = int(math.ceil(len(payload) /
float(self.channel.maximum_frame_size)))
# Send the message
for offset in range(0, pieces):
start = self.channel.maximum_frame_size * offset
end = start + self.channel.maximum_frame_size
if end > len(payload):
end = len(payload)
frames.append(body.ContentBody(payload[start:end]))
# Write the frames out
self.channel.write_frames(frames)
# If publisher confirmations are enabled, wait for the response
if self.channel.publisher_confirms:
response = self.channel.wait_for_confirmation()
if isinstance(response, specification.Basic.Ack):
return True
elif isinstance(response, specification.Basic.Nack):
return False
else:
raise exceptions.UnexpectedResponseError(response)
[docs] def reject(self, requeue=False):
"""Reject receipt of the message to RabbitMQ. Will raise
an ActionException if the message was not received from a broker.
:param bool requeue: Requeue the message
:raises: ActionException
"""
if not self.method:
raise exceptions.ActionException('Can not reject non-received '
'message')
basic_reject = specification.Basic.Reject(self.method.delivery_tag,
requeue=requeue)
self.channel.write_frame(basic_reject)
def _add_auto_message_id(self):
"""Set the message_id property to a new UUID."""
self.properties['message_id'] = str(uuid.uuid4())
def _add_timestamp(self):
"""Add the timestamp to the properties"""
self.properties['timestamp'] = datetime.datetime.utcnow()
@staticmethod
def _as_datetime(value):
"""Return the passed in value as a ``datetime.datetime`` value.
:param value: The value to convert or pass through
:type value: datetime.datetime
:type value: time.struct_time
:type value: int
:type value: float
:type value: str
:type value: bytes
:type value: unicode
:rtype: datetime.datetime
:raises: TypeError
"""
if value is None:
return None
if isinstance(value, datetime.datetime):
return value
if isinstance(value, time.struct_time):
return datetime.datetime(*value[:6])
if utils.is_string(value):
value = int(value)
if isinstance(value, float) or isinstance(value, int):
return datetime.datetime.fromtimestamp(value)
raise TypeError('Could not cast a %s value to a datetime.datetime' %
type(value))
def _auto_serialize(self, body_value):
"""Automatically serialize the body as JSON if it is a dict or list.
:param mixed body_value: The message body passed into the constructor
:return: bytes|str
"""
if isinstance(body_value, dict) or isinstance(body_value, list):
self.properties['content_type'] = 'application/json'
return json.dumps(body_value, ensure_ascii=False)
return body_value
def _coerce_properties(self):
"""Force properties to be set to the correct data type"""
for key, value in self.properties.items():
_type = specification.Basic.Properties.type(key)
if self.properties[key] is None:
continue
if _type == 'shortstr':
if not utils.is_string(value):
LOGGER.warning('Coercing property %s to bytes', key)
value = str(value)
self.properties[key] = utils.maybe_utf8_encode(value)
elif _type == 'octet' and not isinstance(value, int):
LOGGER.warning('Coercing property %s to int', key)
try:
self.properties[key] = int(value)
except TypeError as error:
LOGGER.warning('Could not coerce %s: %s', key, error)
elif _type == 'table' and not isinstance(value, dict):
LOGGER.warning('Resetting invalid value for %s to None', key)
self.properties[key] = {}
if key == 'timestamp':
self.properties[key] = self._as_datetime(value)
@property
def _invalid_properties(self):
"""Return a list of invalid properties that currently exist in the the
properties that are set.
:rtype: list
"""
return [key for key in self.properties
if key not in specification.Basic.Properties.attributes()]
@property
def _properties(self):
"""Return a new Basic.Properties object representing the message
properties.
:rtype: pamqp.specification.Basic.Properties
"""
self._prune_invalid_properties()
self._coerce_properties()
return specification.Basic.Properties(**self.properties)
def _prune_invalid_properties(self):
"""Remove invalid properties from the message properties."""
for key in self._invalid_properties:
LOGGER.warning('Removing invalid property "%s"', key)
del self.properties[key]