Source code for hermes.publisher

"""Publisher component for use in a Node class."""

# Import Built-Ins
import logging
from queue import Queue
from threading import Thread, Event

# Import Third-Party
import zmq

# Import home-grown


# Init Logging Facilities
log = logging.getLogger(__name__)


[docs]class Publisher(Thread): """ Allows publishing data to subscribers. The publishing is realized with ZMQ's Publisher sockets, and supports publishing to multiple subscribers. The :meth:`hermes.Publisher.run` method continuously checks for data on the internal q, which is fed by the :meth:`hermes.Publisher.publish` method. """ def __init__(self, target_addr, name, ctx=None, socket_type=None): """ Initialize Instance. :param pub_addr: Address this instance should bind to :param name: Name to give this :class:`hermes.Publisher` instance. """ self.target_addr = target_addr self._running = False self.sock = None self.ctx = ctx or zmq.Context().instance() self._input = self.ctx.socket(zmq.PUSH) self._output = self.ctx.socket(zmq.PULL) self.socket_type = socket_type or zmq.PUB super(Publisher, self).__init__(name=name)
[docs] def publish(self, envelope): """ Publish the given data to all current subscribers. :param envelope: :class:`hermes.Envelope` instance :return: None """ if self.sock: self._input.send_multipart(envelope.convert_to_frames()) return True return False
[docs] def stop(self, timeout=None): """ Stop the :class:`hermes.Publisher` instance. :param timeout: time in seconds until :exc:`TimeOutError` is raised :return: :class:`None` """ log.info("Stopping Publisher instance..") self.join(timeout=timeout) log.info("..done.")
[docs] def join(self, timeout=None): """ Join the :class:`hermes.Publisher` instance and shut it down. Clears the :attr:`hermes.Publisher._running` flag to gracefully terminate the run loop. :param timeout: timeout in seconds to wait for :meth:`hermes.Publisher.join` to finish :return: :class:`None` """ log.debug("Clearing _running state..") self._running = False log.debug("Closing socket..") try: self.sock.close() except AttributeError: log.debug("Socket was already closed!") super(Publisher, self).join(timeout)
[docs] def run(self): """ Customized run loop to publish data. Sets up a ZMQ publisher socket and sends data as soon as it is available on the internal Queue at :attr:`hermes.Publisher.q`. :return: :cls:`None` """ self._running = True ctx = zmq.Context() self.sock = ctx.socket(self.socket_type) log.info("Connecting Publisher to zmq.XSUB Socket at %s.." % self.pub_addr) self.sock.connect(self.target_addr) log.info("Executing publisher loop..") while self._running: try: frames = self._output.recv_multipart(zmq.NOBLOCK) except zmq.EAgain: continue log.debug("Sending %r ..", cts_msg) try: self.sock.send_multipart(frames) except zmq.error.ZMQError as e: log.error("ZMQError while sending data (%r) - stopping Publisher", e) break except AttributeError: if not self._running: log.info("Exiting publisher loop..") break raise ctx.destroy() self.sock = None log.info("Loop terminated.")