Source code for slacm.component

'''
Created on Sep 19, 2020

@author: esdev
'''
import threading
import zmq
import time
import logging
import heapq
import itertools
import traceback
import pathlib
from ruamel.yaml import YAML
from slacm.discovery import DiscoveryClient
from slacm.exceptions import UndefinedHandler
from slacm.config import Config,HostnameFilter

[docs] class ComponentThread(threading.Thread): ''' Component executor thread. ''' def __init__(self,parent): threading.Thread.__init__(self) self.logger = logging.getLogger(__name__) self.logger.info("ComponentThread.__init__(%s)",self.name) self.parent = parent _app = self.parent.getActor().getApp() self.name = parent.name self.context = parent.context self.disco_service = parent.disco self.disco_client = None self.component = parent.component self.netInfo = self.parent.get_netInfo() self.handlers = {} self.control = None self.pq = [] self.tc = itertools.count()
[docs] def get_netInfo(self): ''' Return the network interface information. ''' return self.netInfo
[docs] def setupControl(self): ''' Set up the inner 'control' socket (for receiving commands) ''' self.control = self.context.socket(zmq.PAIR) self.control.connect('inproc://part_' + self.name + '_control')
[docs] def sendControl(self,msg): ''' Send a message via the control socket (to the parent) ''' self.logger.info('sendControl[%s]: %s', self.name,str(msg)) self.control.send_pyobj(msg)
[docs] def recvControl(self): ''' Receive a control message from the parent via the control socket. ''' msg = self.control.recv_pyobj() self.logger.info('recvControl[%s]: %s', self.name,str(msg)) return msg
[docs] def setupPoller(self): ''' Setup the poller for the component thread, initially for the control socket only. ''' self.poller = zmq.Poller() self.sock2NameMap = {} self.sock2PortMap = {} self.poller.register(self.control,zmq.POLLIN) self.sock2NameMap[self.control] = "" self.sock2PrioMap = {}
[docs] def updatePoller(self): ''' Update the poller with all the input ports. ''' for portName in self.parent.ports: portObj = self.parent.ports[portName] portSocket = portObj.getSocket() portIsInput = portObj.inSocket() if portSocket != None: self.sock2PortMap[portSocket] = portObj if portIsInput: self.poller.register(portSocket,zmq.POLLIN) self.sock2NameMap[portSocket] = portName self.sock2PrioMap[portSocket] = portObj.getIndex()
[docs] def setupDisco(self): ''' Setup the component as a client of the discovery service. ''' self.disco_client = DiscoveryClient(self.context,self.disco_service)
[docs] def setup(self): ''' Execute the 'setup' phase of component initialization. Construct all ports, and register the 'server' ports with the discovery service. ''' for (_portName,port) in self.parent.ports.items(): _res = port.setup(self,self.disco_client)
[docs] def finalize(self): ''' Execute the 'finalize' phase of component initialization. Finalize all ports, and connect the 'client' ports with the discovery service. ''' for (_portName,port) in self.parent.ports.items(): _res = port.finalize(self.disco_client)
[docs] def activate(self): ''' Activate all ports. ''' for(portName,port) in self.parent.ports.items(): _res = port.activate() self.component.activate()
[docs] def deactivate(self): ''' Deactivate all ports. ''' for(portName,port) in self.parent.ports.items(): _res = port.deactivate() self.component.deactivate()
[docs] def terminate(self): ''' Terminate all ports. ''' for(portName,port) in self.parent.ports.items(): _res = port.terminate()
[docs] def runCommand(self,msg): ''' Run a control command sent to the component thread by its parent actor. ''' stop,err = False,False try: if msg == Component.STOP: self.logger.info("stop") self.deactivate() self.terminate() stop = True elif msg == Component.START: self.logger.info("start") self.updatePoller() self.activate() elif msg == Component.SETUP: self.logger.info("setup") self.setup() elif msg == Component.FINALIZE: self.logger.info("finalize") self.finalize() else: self.logger.warning("unknown command %s" % str(msg)) err = True # Should report an error except Exception as e: traceback.print_exc() self.logger.error('runCommand(): %s',str(e)) err = True return (stop,err)
[docs] def locateHandler(self,portName): ''' Locate the handler belonging to the the name port. ''' handler = self.handlers.get(portName,None) if handler: return handler else: funcName = 'on_' + portName handler = getattr(self.component, funcName, None) if handler is None: raise UndefinedHandler(f"no handler in {self.name}: {funcName}") else: self.handlers[portName] = handler return handler
[docs] def executeHandlerFor(self,socket): ''' Execute the handler for the port corresponding to the socket. The handler is always allowed to run to completion, the operation is never preempted. ''' if socket in self.sock2PortMap: portName = self.sock2NameMap[socket] portObj = self.sock2PortMap[socket] try: func_ = self.locateHandler(portName) func_() except: traceback.print_exc() raise else: self.logger.error('Unbound port')
def __scheduler(self,sockets): ''' Simple-minded scheduler. ''' for socket in sockets: self.executeHandlerFor(socket)
[docs] def scheduler(self, sockets): ''' Priority scheduler for the component message processing. The priority order is determined by the order of component ports. The dictionary of active sockets is scanned, and the \ they are inserted into a priority queue (according to their priority value). The queue is processed (in order of \ priority). After each invocation, the inputs are polled (in a no-wait operation) and the priority queue is updated. ''' while True: for socket in sockets: if socket in self.sock2PortMap: pri = self.sock2PrioMap[socket] cnt = next(self.tc) entry = (pri,cnt,socket) heapq.heappush(self.pq,entry) sockets = {} while True: try: pri,cnt,socket = heapq.heappop(self.pq) # Execute one task self.executeHandlerFor(socket) if len(self.pq) == 0: # Empty queue, return return sockets = dict(self.poller.poll(None)) # Poll to check if something came in if sockets: if self.control in sockets: # Handle control message msg = self.recvControl() self.toStop = self.runCommand(msg) del sockets[self.control] if self.toStop: return # Return if we must stop if len(sockets): # More sockets to handle, break # break from inner loop to schedule tasks else: # Nothing came in continue # keep running inner loop except IndexError: # Queue empty, return return
[docs] def run(self): ''' Main loop of component thread. ''' self.setupControl() self.setupDisco() self.setupPoller() self.toStop = False while True: sockets = dict(self.poller.poll()) if self.control in sockets: msg = self.recvControl() (self.toStop,err) = self.runCommand(msg) self.sendControl(Component.OK if not err else Component.ERR) del sockets[self.control] if self.toStop: break if len(sockets) > 0: self.scheduler(sockets) if self.toStop: break # self.logger.info("stopping") if hasattr(self.component,'__destroy__'): destroy_ = getattr(self.instance,'__destroy__') destroy_()
# self.logger.info("stopped")
[docs] class Component(object): ''' Base class for all components. ''' OK = 0 ERR = -1 SETUP = 1 FINALIZE = 2 START = 3 STOP = 4 def __init__(self): ''' Component base class constructor. Must be called by the derived class. Runs in a parent thread (i.e. not the component thread), at which time the ports are not available yet. ''' class_ = getattr(self,'__class__') className = getattr(class_,'__name__') self.owner = class_.OWNER # This is set in the parent part (temporarily) qualName = self.owner.qualName inst_logconf = '%s-log.yaml' % qualName comp_logconf = '%s-log.yaml' % className done = False yaml = YAML() try: logging.config.dictConfig(yaml.load(pathlib.Path(inst_logconf))) done = True except Exception: try: logging.config.dictConfig(yaml.load(pathlib.Path(comp_logconf))) done = True except Exception: pass if not done: opt = Config.APP_LOGS self.logger = logging.getLogger(qualName) self.logger.setLevel(logging.INFO) self.logger.propagate=False if opt == 'std': self.loghandler = logging.StreamHandler() # stdout elif opt == 'log': logFile = "%s.log" % qualName self.loghandler = logging.FileHandler(logFile) # log file else: self.loghandler = logging.NullHandler() # (null) self.loghandler.setLevel(logging.INFO) self.loghandler.addFilter(HostnameFilter()) self.logformatter = logging.Formatter('%(levelname)s:%(asctime)s:[%(hostname)s.%(process)d.%(threadName)s]:%(name)s:%(message)s') self.loghandler.setFormatter(self.logformatter) self.logger.addHandler(self.loghandler) # self.thread = None
[docs] def activate(self): ''' Method executed before the message handlers are activated. Runs in the component thread. ''' pass
[docs] def deactivate(self): ''' Method executed when the component is stopped. Runs in the component thread. ''' pass