Source code for slacm.actor

'''
Created on Sep 18, 2020

@author: esdev
'''

import os,sys,signal
import logging
import zmq
import traceback
from multiprocessing import Process
import time

from slacm.instance import Instance

[docs] class Actor(object): ''' Class for application actors. ''' OK = 0 ERR = -1 SETUP = 1 FINALIZE = 2 START = 3 STOP = 4 def __init__(self, parent, model): ''' :param parent: parent app :param model: actor model object ''' self.logger = logging.getLogger(__name__) self.parent = parent self.model = model self.name = model.name self.parentContext = self.parent.context self.childContext = None self.logger.info(f'Actor.__init__:{self.name}') self.instances = {} self.disco = self.parent.get_disco() self.netInfo = self.parent.get_netInfo() self.locals = self.model.locals self.params = self.parent.get_actor_params(self.name)
[docs] def getApp(self): ''' :return parent: the parent app ''' return self.parent
[docs] def get_disco(self): ''' :return disco: the discovery object ''' return self.disco
[docs] def get_netInfo(self): ''' :return netInfo: the network interface information ''' return self.netInfo
[docs] def is_local(self,message): ''' Return True if the message is a 'host-local' for the actor. ''' return message in self.locals
[docs] def get_comp_params(self,comp): ''' Return the parameters of a component of this actor ''' return self.parent.get_comp_params(self.name,comp)
[docs] def setup(self): ''' Execute the 'setup' operation for the actor. Launches a subprocess that runs the components. ''' self.logger.info("Actor.setup") self.command = self.parentContext.socket(zmq.PAIR) self.parentPort = self.command.bind_to_random_port("tcp://" + self.netInfo.localHost) self.child = Process(target=self.main) # ,args=(self,)) self.child.daemon = True self.child.start() time.sleep(0.001) self.command.send_pyobj(Actor.SETUP) _ack = self.command.recv_pyobj()
[docs] def finalize(self): ''' Execute the 'finalize' operation for the actor ''' self.logger.info("Actor.finalize") self.command.send_pyobj(Actor.FINALIZE) _ack = self.command.recv_pyobj()
[docs] def run(self): ''' Start running the actor (i.e. its components) ''' self.logger.info("Actor.run") self.command.send_pyobj(Actor.START) _ack = self.command.recv_pyobj()
[docs] def terminate(self): ''' Terminate the actor (i.e. ist components) ''' self.logger.info("Actor.terminate") self.command.send_pyobj(Actor.STOP)
# _ack = self.command.recv_pyobj()
[docs] def join(self): ''' Execute a 'join' operation on the child subprocess. ''' self.logger.info("Actor.join") self.child.join()
[docs] def main(self): ''' Main method of the subprocess. Creates the component instances, then launches a message handler for commands coming from the parent process. ''' self.childContext = zmq.Context() self.control = self.childContext.socket(zmq.PAIR) self.control.connect("tcp://" + self.parent.netInfo.localHost + ":" + str(self.parentPort)) time.sleep(0.001) signal.signal(signal.SIGTERM,signal.SIG_IGN) signal.signal(signal.SIGINT,signal.SIG_IGN) try: for instance in self.model.instances: self.instances[instance.name] = Instance(self,instance) except Exception as e: traceback.print_exc() return while True: msg = self.control.recv_pyobj() self.logger.info("main: cmd: %s" % str(msg)) if msg == Actor.SETUP: for (_name, instance) in self.instances.items(): instance.setup() self.control.send_pyobj(Actor.OK) elif msg == Actor.FINALIZE: for (_name, instance) in self.instances.items(): instance.finalize() self.control.send_pyobj(Actor.OK) elif msg == Actor.START: for (_name, instance) in self.instances.items(): instance.start() self.control.send_pyobj(Actor.OK) elif msg == Actor.STOP: for (_name, instance) in self.instances.items(): instance.stop() self.control.send_pyobj(Actor.OK) break