Source code for hermes.node
"""Node Class to pull together receivers and publishers.
Functions as the smallest available unit with which can be communicated in a cluster.
It offers slots for a Publisher and Receiver object. Each of these must implement at least a
start() and stop() function, as well as a recv() (Receiver) and publish() (Publisher) method.
The passed objects are therefore not limited to :class:`hermes.Publisher`
and :class:`hermes.Receiver` objects.
When left unmodified, the Node will simply pass data from the receiver to the publisher.
:class:`hermes.Node` supports the `with` statement and will start up all facilities it has
stored in its instance's :attr:`hermes.Node.facilities` property. These will also be stopped after
leaving the with block, respectively.
"""
# Import Built-Ins
import logging
# Import Third-Party
# Import Home-grown
from hermes.structs import Envelope
log = logging.getLogger(__name__)
[docs]class Node:
"""
Basic Node Class.
Provides a basic interface for starting and stopping a node.
Extend this as necessary.
"""
# pylint: disable=too-few-public-methods
def __init__(self, name, receiver=None, publisher=None):
"""
Initialize the instance.
:param name: name of the :class:`hermes.Node` instance.
:param receiver: :class:`hermes.Receiver` instance.
:param publisher: :class:`hermes.Publisher` instance.
"""
self.publisher = publisher
self.receiver = receiver
self._facilities = [self.receiver, self.publisher]
self.name = name
self._running = False
@property
def facilities(self):
"""Return the names of facilities registered with this :class:`hermes.Node` instance."""
return [f.name for f in self._facilities]
[docs] def start(self):
"""Start the :class:`hermes.Node` instance and its facilities."""
log.info("Starting node..")
self._start_facilities()
log.info("..done.")
[docs] def stop(self):
"""Stop the :class:`hermes.Node` instance and its facilities."""
log.info("Stopping node..")
self._stop_facilities()
log.info("..done.")
def __enter__(self):
"""Start facilities upon entering with-block."""
self._start_facilities()
def __exit__(self, exc_type, exc_val, exc_tb):
"""Stop facilities upon leaving with-block."""
self._stop_facilities()
def _start_facilities(self):
"""
Start the Facilities available.
Iterates over :attr:`hermes.Node._facilities` and starts all facilities that evaluate
to True.
All facilities must support a stop() method, otherwise an exception is
logged and the facility isn't stopped.
"""
self._running = True
log.debug("Starting facilities (%r total)", len(self._facilities))
for facility in self._facilities:
if facility:
try:
facility.start()
except Exception as e:
log.exception(e)
log.error("Could not start all facilities!")
raise
log.debug("All facilities started successfully.")
def _stop_facilities(self):
"""
Stop the available facilities.
Iterates over self._facilities and stops all facilities that evaluate
to True.
All facilities must support a stop() method, otherwise an exception is
logged and the facility isn't stopped.
"""
self._running = False
log.debug("Stopping facilities (%r total)", len(self._facilities))
for facility in self._facilities:
if facility:
try:
facility.stop()
except Exception as e:
log.exception(e)
log.error("Could not stop all facilities!")
raise
log.debug("All facilities stopped successfully.")
[docs] def publish(self, channel, data):
"""
Publish the given data to channel, if a :class:`hermes.Receiver` instance is available.
The topic is generated from channel and :class:`hermes.Node.name`.
:param channel: topic tree
:param data: Data Struct or string
:return: :class:`None`
"""
envelope = Envelope(channel + '/' + self.name, self.name, data)
try:
self.publisher.publish(envelope)
except AttributeError:
raise NotImplementedError
[docs] def recv(self, block=False, timeout=None):
"""Receive data from the :class:`hermes.Receiver` instance, if available."""
try:
return self.receiver.recv(block, timeout)
except AttributeError:
raise NotImplementedError
[docs] def run(self):
"""Execute the main loop, which can be extended as necessary.
If not extended, the following loop will be executed while
:attr:`hermes.Node._running` is True:
1. call :meth:`hermes.Node.recv` and check if there's a message
2. if a message was received:
call :meth:`hermes.Node.publish` and send message.
3. Repeat.
"""
while self._running:
msg = self.recv()
if msg:
self.publish('RAW', msg)