Source code for hermes.proxy
"""Basic XPub/XSub Proxy Interface for a cluster."""
# pylint: disable=too-few-public-methods
# Import Built-Ins
import logging
from threading import Thread
# Import Third-Party
import zmq
# Import Homebrew
# Init Logging Facilities
log = logging.getLogger(__name__)
[docs]class PostOffice(Thread):
"""
Class to forward subscriptions from publishers to subscribers.
Uses :const:`zmq.XSUB` & :const:`zmq.XPUB` ZMQ sockets to act as intermediary. Subscribe to
these using the respective PUB or SUB socket by binding to the same address as
XPUB or XSUB device.
"""
def __init__(self, proxy_in, proxy_out, debug_addr=None):
"""
Initialize a :class:`hermes.PostOffice` instance.
The addresses used when instantiating these are also the ones your
publihser and receiver nodes should bind to.
:param proxy_in: ZMQ Address, including port - facing towards cluster nodes
:param proxy_out: ZMQ address, including port - facing away from cluster nodes
:param debug_addr: ZMQ address, including port
"""
self.xsub_url = proxy_in
self.xpub_url = proxy_out
self._debug_addr = debug_addr
self.ctx = zmq.Context()
super(PostOffice, self).__init__()
@property
def debug_addr(self):
"""Return debug socket's address."""
return self._debug_addr
@property
def running(self):
"""Check if the thread is still alive and running."""
return self.is_alive()
[docs] def stop(self, timeout=None):
"""Stop the thread.
:param timeout: timeout in seconds to wait for join
"""
self.ctx.term()
self.join(timeout)
[docs] def run(self):
"""
Serve XPub-XSub Sockets.
Relays Publisher Socket data to Subscribers, and allows subscribers
to sub to that data. Offers the benefit of having a single static
address to connect to a cluster.
:return: :class:`None`
"""
ctx = self.ctx
log.info("Setting up XPUB ZMQ socket..")
xpub = ctx.socket(zmq.XPUB)
log.info("Binding XPUB socket facing subscribers to %s..", self.xpub_url)
xpub.bind(self.xpub_url)
log.info("Setting up XSUB ZMQ socket..")
xsub = ctx.socket(zmq.XSUB)
log.info("Binding XSUB socket facing publishers to %s..", self.xsub_url)
xsub.bind(self.xsub_url)
# Set up a debug socket, if address is given.
if self.debug_addr:
debug_pub = ctx.socket(zmq.PUB)
debug_pub.bind(self.debug_addr)
else:
debug_pub = None
log.info("Launching poll loop..")
try:
zmq.proxy(xpub, xsub, debug_pub)
except zmq.error.ContextTerminated:
xpub.close()
xsub.close()
debug_pub.close()
log.info("Closed sockets, Proxy terminated")