Source code for hermes.receiver

"""Receiver Component for usage in Node class."""

# Import Built-Ins
import logging
import time
from queue import Queue, Empty
from threading import Thread, Event

# Import Third-Party
import zmq

# Import home-grown
from hermes.structs import Envelope

# Init Logging Facilities
log = logging.getLogger(__name__)


[docs]class Receiver(Thread): """Class providing a connection to one or many ZMQ Publisher(s).""" # pylint: disable=too-many-instance-attributes def __init__(self, sub_addr, name, topics=None, exchanges=None): """ Initialize a Receiver instance. :param sub_addr: Address to which this :class:`hermes.Receiver` binds to :param topics: List of topics to subscribe to :param exchanges: List of exchanges to subscribe to :param name: Name to give this :class:`hermes.Receiver` instance """ self.zmq_context = zmq.Context() self.sock = None self.sub_addr = sub_addr self.timeout = 1 self._topics = topics if topics else '' self._exchanges = exchanges if exchanges else '' self.q = Queue() self._running = Event() super(Receiver, self).__init__(name=name)
[docs] def stop(self, timeout=None): """ Stop the :class:`hermes.Receiver` instance. :param timeout: time in seconds until :exc:`TimeOutError` is raised :return: :class:`None` """ log.info("Stopping Receiver instance..") self.join(timeout) log.info("..done.")
[docs] def join(self, timeout=None): """Join the :class:`hermes.Receiver` instance. Clears the :attr:`hermes.Receiver._is_running` flag, causing a graceful shutdown of the run loop. :param timeout: timeout in seconds passed to :meth:`threading.Thread.join()` :return: :class:`None` """ self._running.clear() super(Receiver, self).join(timeout=timeout)
[docs] def run(self): """ Execute the custom run loop for the :class:`hermes.Receiver` class. It connectos to a ZMQ publisher on the local machine using the ports found in :attr:`hermes.Receiver.ports`. If this is empty, it simply loops doing nothing. :return: :class:`None` """ self._running.set() ctx = zmq.Context() self.sock = ctx.socket(zmq.SUB) log.info("Setting sockopts to subscribe to topics %r.." % self._topics) self.sock.setsockopt_unicode(zmq.SUBSCRIBE, self._topics) log.info("Connecting Publisher to zmq.XPUB Socket at %s.." % self.sub_addr) self.sock.connect(self.sub_addr) log.info("Success! Executing receiver loop..") while self._running.is_set(): try: frames = self.sock.recv_multipart(flags=zmq.NOBLOCK) except zmq.error.Again: continue try: envelope = Envelope.load_from_frames(frames) except KeyError as e: log.exception(e) log.error(frames) continue log.debug("run(): Received %r", envelope) if self._exchanges and envelope.origin not in self._exchanges: continue recv_at = time.time() if recv_at - float(envelope.ts) > self.timeout: log.error("Reciever %s: Receiver cannot keep up with publisher " "(message delay(%s) > %s)! Cannot take peer " "pressure, committing suicide.", self.name, recv_at - envelope.ts, self.timeout) self._running.clear() continue self.q.put(envelope) ctx.destroy() self.sock = None log.info("Loop terminated.")
[docs] def recv(self, block=False, timeout=None): """ Wrap around :meth:`Queue.get()`. Returns the popped value or :class:`None` if the :class:`queue.Queue` is empty. :return: data or :class:`None` """ if not self.q.empty(): return self.q.get(block, timeout) return None