"""Data structs for use within the hermes ecosystem."""
import logging
import json
import time
import sys
from functools import reduce
log = logging.getLogger(__name__)
[docs]class Message:
"""
Basic Struct class for data sent via an :class:`hermes.Envelope`.
Provides basic and dynamic load and dump functions to easily load
data to and from it.
If you have complex data types, consider extending this class, as it requires less overhead
than, for example, dictionaries, by using __slots__.
The class's timestamp attribute (ts) denotes the time of which the data was received.
"""
__slots__ = ['dtype', 'ts']
def __init__(self, ts=None):
"""
Initialize a :class:`hermes.Message` instance.
ts is the timestamp which should be used as reference when calculating
the age of the :class:`hermes.Message` instance.
:param ts: timestamp at which the message was created.
"""
self.ts = time.time() if not ts else ts
self.dtype = self._class_to_string()
[docs] @classmethod
def load(cls, data):
"""
Load data into a new data struct.
:param data: iterable, as transported by :class:`hermes.Envelope`
:return: :class:`hermes.Message`
"""
instance = cls()
for value, attr in zip(data, cls._slots()):
setattr(instance, attr, value)
return instance
[docs] def serialize(self, encoding='UTF-8'):
"""Serialize this data struct to :class:`bytes`.
:param encoding: Encoding to use in str.encode()
:return: data of this struct as :class:`bytes`
"""
data = [getattr(self, attr) for attr in self._slots()]
return json.dumps(data).encode(encoding)
@classmethod
def _slots(cls):
"""Get the class attributes as defined in its __slots__ attribute.
This includes all parents' __slots__ values.
Returns it in order of inheritance (base class __slots__ first).
:rtype: List
"""
slot_attrs = []
class_slots = [parent_class for parent_class in reversed(cls.__mro__[:-1])]
for parent_cls in class_slots:
for attr in parent_cls.__slots__:
slot_attrs.append(attr)
return slot_attrs
def _class_to_string(self):
"""Convert this class name into a :class:`str`."""
return self.__class__.__qualname__
# pylint: disable=too-many-format-args
def __repr__(self):
"""Construct a basic string-represenation of this class instance."""
attributes_as_strings = '('
for attr in self._slots():
attributes_as_strings += '{0}={1}, '.format(attr, getattr(self, attr))
attributes_as_strings = attributes_as_strings[:-2] + ')'
s = "{0}{1}".format(self._class_to_string(), attributes_as_strings)
return s
[docs]class Envelope:
"""Transport Object for data being sent between hermes components via ZMQ.
It is encouraged to use :class:`hermes.Message` as data for more complex data objects, but
all JSON-serializable built-in data types are supported.
They track topic and origin of the data they transport, as well as the
timestamp it was last updated at. Updates occur automatically whenever
:meth:`hermes.Envelope.serialize` is called.
This timestamp can be used to detect Slow-Subscriber-Syndrome by :class:`hermes.Receiver` and
to initiate the suicidal snail pattern.
"""
__slots__ = ['topic', 'origin', 'data', 'ts']
expected_message_type = Message
def __init__(self, topic_tree, origin, data, ts=None):
"""Initialize an :class:`hermes.Envelope` instance.
:param topic_tree: topic this data belongs to
:param origin: the sender of this message (Publisher)
:param data: data struct transported by this instance
:param ts: timestamp of this message, defaults to current unix ts if
None
"""
self.topic = topic_tree
self.origin = origin
self.data = data
self.ts = ts or time.time()
def __repr__(self):
"""Construct a basic string-represenation of this class instance."""
return ("Envelope(topic=%r, origin=%r, data=%r, ts=%r)" %
(self.topic, self.origin, self.data, self.ts))
[docs] @classmethod
def load_from_frames(cls, frames, encoding=None):
"""
Load json to a new :class:`hermes.Envelope` instance.
Automatically converts to string if the passed object is
a :class:`bytes.encode()` object.
:param frames: Frames, as received by :meth:`zmq.socket.recv_multipart`
:param encoding: The encoding to use for :meth:`bytes.encode()`; default UTF-8
:return: :class:`hermes.Envelope` instance
"""
encoding = encoding if encoding else 'utf-8'
topic, origin, data, ts = [json.loads(x.decode(encoding)) for x in frames]
data_dtype = data[0]
try:
data = cls.expected_message_type.load(data_dtype)
except AttributeError:
pass
return Envelope(topic, origin, data, ts)
[docs] def convert_to_frames(self, encoding=None):
"""
Encode the :class:`hermes.Envelope` attributes as a list of json-serialized strings.
:param encoding: the encoding to us for :meth:`str.encode()`, default UTF-8
:return: list of :class:`bytes`
"""
encoding = encoding if encoding else 'utf-8'
self.update_ts()
topic = json.dumps(self.topic).encode(encoding)
origin = json.dumps(self.origin).encode(encoding)
ts = json.dumps(self.ts).encode(encoding)
try:
data = self.data.serialize(encoding)
except AttributeError:
data = json.dumps(self.data).encode(encoding)
return topic, origin, data, ts
[docs] def update_ts(self):
"""Update the :class:`hermes.Envelope` timestamp."""
self.ts = time.time()