Source code for hermes.publisher
"""Publisher component for use in a Node class."""
# Import Built-Ins
import logging
from queue import Queue
from threading import Thread, Event
# Import Third-Party
import zmq
# Import home-grown
# Init Logging Facilities
log = logging.getLogger(__name__)
[docs]class Publisher(Thread):
"""
Allows publishing data to subscribers.
The publishing is realized with ZMQ's Publisher sockets, and supports publishing
to multiple subscribers.
The :meth:`hermes.Publisher.run` method continuously checks for data on the internal q,
which is fed by the :meth:`hermes.Publisher.publish` method.
"""
def __init__(self, pub_addr, name, ctx=None):
"""
Initialize Instance.
:param pub_addr: Address this instance should bind to
:param name: Name to give this :class:`hermes.Publisher` instance.
"""
self.pub_addr = pub_addr
self._running = Event()
self.sock = None
self.q = Queue()
self.ctx = ctx or zmq.Context().instance()
super(Publisher, self).__init__(name=name)
[docs] def publish(self, envelope):
"""
Publish the given data to all current subscribers.
:param envelope: :class:`hermes.Envelope` instance
:return: None
"""
if self.sock:
self.q.put(envelope)
return True
return False
[docs] def stop(self, timeout=None):
"""
Stop the :class:`hermes.Publisher` instance.
:param timeout: time in seconds until :exc:`TimeOutError` is raised
:return: :class:`None`
"""
log.info("Stopping Publisher instance..")
self.join(timeout=timeout)
log.info("..done.")
[docs] def join(self, timeout=None):
"""
Join the :class:`hermes.Publisher` instance and shut it down.
Clears the :attr:`hermes.Publisher._running` flag to gracefully terminate the run loop.
:param timeout: timeout in seconds to wait for :meth:`hermes.Publisher.join` to finish
:return: :class:`None`
"""
log.debug("Clearing _running state..")
self._running.clear()
log.debug("Closing socket..")
try:
self.sock.close()
except AttributeError:
log.debug("Socket was already closed!")
pass
super(Publisher, self).join(timeout)
[docs] def run(self):
"""
Custumized run loop to publish data.
Sets up a ZMQ publisher socket and sends data as soon as it is available
on the internal Queue at :attr:`hermes.Publisher.q`.
:return: :class:`None`
"""
self._running.set()
ctx = zmq.Context()
self.sock = ctx.socket(zmq.PUB)
log.info("Connecting Publisher to zmq.XSUB Socket at %s.." % self.pub_addr)
self.sock.connect(self.pub_addr)
log.info("Success! Executing publisher loop..")
while self._running.is_set():
if not self.q.empty():
cts_msg = self.q.get(block=False)
frames = cts_msg.convert_to_frames()
log.debug("Sending %r ..", cts_msg)
try:
self.sock.send_multipart(frames)
except zmq.error.ZMQError as e:
log.error("ZMQError while sending data (%s), "
"stopping Publisher", e)
break
else:
continue
ctx.destroy()
self.sock = None
log.info("Loop terminated.")