Source code for slacm.discovery

'''
Created on Sep 26, 2020

@author: esdev
'''

import logging
import threading
import asyncio
import zmq
import zmq.asyncio
import time
import traceback
import sys
from slacm.utils import find_free_port
from slacm.config import Config

from kademlia.network import Server

SLAM_DS='tcp://127.0.0.1:'

[docs] class RootServer(threading.Thread): ''' Root discovery server thread - runs in the 'root' app. ''' def __init__(self,port,interface): threading.Thread.__init__(self) self.logger = logging.getLogger(__name__) self.logger.info('RootServer(%d,%s)',port,interface) self.port = port self.interface = interface
[docs] def run(self): self.loop = asyncio.new_event_loop() self.loop.set_debug(True) self.server = Server() self.logger.info('RootServer.run.listen()') self.loop.run_until_complete(self.server.listen(self.port,self.interface)) self.logger.info('RootServer.run_forever()') try: self.loop.run_forever() except: traceback.print_exc() self.logger.info("root terminated")
[docs] def stop(self): for cb in [self.server.stop, self.loop.stop]: self.loop.call_soon_threadsafe(cb)
[docs] class PeerServer(threading.Thread): ''' Peer discovery server thread - each host (root and peer alike) runs a copy of this. Peer nodes connect via the root node's RootServer. ''' SLAM_DS_GET='g' SLAM_DS_SET='s' SLAM_DS_HALT='h' def __init__(self,rootHostPort,peerPort,localPort): threading.Thread.__init__(self) self.logger = logging.getLogger(__name__) self.logger.info('PeerServer(root=%r,peer=%r,local=%r)',rootHostPort,peerPort,localPort) self.daemon = True self.rootServer = rootHostPort self.peerPort = peerPort self.localPort = localPort self.server = None self.ctx = zmq.asyncio.Context.instance()
[docs] async def peer(self): self.ctrl = self.ctx.socket(zmq.REP) self.ctrl.bind(SLAM_DS+str(self.localPort)) self.server = Server() self.logger.info('PeerServer.peer.listen()') await self.server.listen(self.peerPort) self.logger.info('PeerServer.peer.bootstrap()') await self.server.bootstrap([self.rootServer]) stop = False self.logger.info('PeerServer.run()') while not stop: msg = await self.ctrl.recv_pyobj() if msg[0] == PeerServer.SLAM_DS_GET: rsp = await self.server.get(msg[1]) elif msg[0] == PeerServer.SLAM_DS_SET: rsp = await self.server.set(msg[1],msg[2]) elif msg[0] == PeerServer.SLAM_DS_HALT: rsp = 'done' stop = True else: rsp = '???' await self.ctrl.send_pyobj(rsp) self.server.stop()
[docs] def run(self): asyncio.run(self.peer()) self.logger.info("peer terminated")
[docs] class DiscoveryService(object): ''' Discovery service class - instantiated by the App (on the root and peer nodes alike). Acts as the interface of the App to the discovery service. ''' def __init__(self,context,interface,root_port=None): self.logger = logging.getLogger(__name__) self.interface = interface self.root_port = root_port self.ctx = context if self.root_port is None: self.root_port = find_free_port() self.root_server = RootServer(port=self.root_port,interface=self.interface) self.root_server.start() time.sleep(1) else: self.root_server = None self.peer_port = find_free_port() self.local_port = find_free_port() self.peer_server = PeerServer((self.interface,self.root_port),self.peer_port,self.local_port) self.peer_server.start() self.command = self.ctx.socket(zmq.REQ) self.command.connect(SLAM_DS+str(self.local_port))
[docs] def get_local_port(self): return self.local_port
[docs] def root(self): if self.root_server: return self.interface,self.root_port else: return None
[docs] def call(self,msg): self.logger.info('send: %s' % str(msg)) self.command.send_pyobj(msg) rsp = self.command.recv_pyobj() self.logger.info('recv: %s' % str(rsp)) return rsp
[docs] def get(self,key): return self.call((PeerServer.SLAM_DS_GET,key))
[docs] def set(self,key,value): return self.call((PeerServer.SLAM_DS_SET,key,value))
[docs] def stop(self): res = self.call((PeerServer.SLAM_DS_HALT,)) self.peer_server.join() if self.root_server: self.root_server.stop() self.root_server.join() return res
[docs] class DiscoveryClient(object): ''' Discovery service client - each component has a copy of this. Acts as the interface of the Component to the discovery service. ''' def __init__(self,context,service): self.logger = logging.getLogger(__name__) self.ctx = context self.local_port = service.get_local_port() self.command = self.ctx.socket(zmq.REQ) self.command.connect(SLAM_DS+str(self.local_port))
[docs] def call(self,msg): self.logger.info('client send: %s' % str(msg)) self.command.send_pyobj(msg) rsp = self.command.recv_pyobj() self.logger.info('client recv: %s' % str(rsp)) return rsp
[docs] def get(self,key): ''' Lookup the value belonging to the key in the discovery service. Wait until the lookup is successful. ''' ans = None tout = 3.0 if Config.DISC_TIMEOUT <= 0 else (Config.DISC_TIMEOUT / 1000.0) tslp = 1.0 if Config.RECV_TIMEOUT <= 0 else (Config.RECV_TIMEOUT / 1000.0) while True: ans = self.call((PeerServer.SLAM_DS_GET,key)) if ans: break time.sleep(tslp) tout -= tslp if tout <= 0: break return ans
[docs] def set(self,key,value): ''' Set the value for the key in the discovery service. ''' return self.call((PeerServer.SLAM_DS_SET,key,value))
if __name__ == '__main__': '''Test''' logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) logger.addHandler(logging.StreamHandler()) ctx = zmq.Context.instance() disco = DiscoveryService(ctx,'127.0.0.1') rsp = disco.get('KEY') print("disco.get('KEY') -> %s" % rsp) rsp = disco.set('KEY','VALUE') print("disco.set('KEY','VALUE') -> %s" % rsp) rsp = disco.get('KEY') print("disco.get('KEY') -> %s" % rsp) disco.stop() #