Source code for slacm.timer

'''
Created on Sep 18, 2020

@author: esdev
'''
import threading
import zmq
import time
import logging
import struct
from enum import Enum,auto

from slacm.port import Port
from slacm.exceptions import InvalidOperation

[docs] class TimerThread(threading.Thread): ''' Thread for a timer port. '''
[docs] class Command(Enum): # Timer command codes TERMINATE = auto() ACTIVATE = auto() DEACTIVATE = auto() START = auto() CANCEL = auto() HALT = auto()
def __init__(self,parent): threading.Thread.__init__(self) self.logger = logging.getLogger(__name__) self.name = parent.instName self.logger.info("TimerThread.__init__(%s)",self.name) self.context = parent.context if parent.period == 0: self.period = None self.periodic = False else: self.period = parent.period * 0.001 # millisec self.periodic = True self.delay = None self._ready = threading.Event() # Timer thread ready to accept commands self._ready.clear() self._running = False # Timer ('counter') is running self.lock = None
[docs] def ready(self): return self._ready
[docs] def cmdError(self,where,cmd): self.logger.error("Timer %s:%s: cmd = %r" % (self.name,where,cmd))
[docs] def waitFor(self,timeout=None): res = self.poller.poll(timeout) if len(res) == 0: return None else: (s,_m) = res[0] with self.lock: data = s.recv_pyobj() return data
[docs] def run(self): self.logger.info("TimerThread.run(%s)",self.name) self.lock = threading.RLock() self.socket = self.context.socket(zmq.PAIR) # PUB self.socket.bind('inproc://timer_' + self.name) self.poller = zmq.Poller() self.poller.register(self.socket,zmq.POLLIN) self._ready.set() # Ready to accept commands self.timeout = None self.active = False self._running = False self.skip = False self.last = None while 1: msg = self.waitFor(self.timeout) if msg == TimerThread.Command.TERMINATE: break # Terminated elif msg == None: # Timeout if not self.active: # Wait if not active self.timeout = None continue self.last = time.time() if self._running: if self.periodic and self.skip: self.skip = False else: with self.lock: self.socket.send_pyobj(self.last) if self.periodic: # Periodic: again self.timeout = int(self.period * 1000) pass else: # Sporadic: wait for next command self._running = False self.timeout = None continue elif msg == TimerThread.Command.ACTIVATE: self.active = True self.last = None elif msg == TimerThread.Command.DEACTIVATE: self.active = False self.timeout = None self.last = None elif msg == TimerThread.Command.START: if self.active: if self.periodic: self.timeout = int(self.period * 1000) else: self.timeout = int(self.delay * 1000) self._running = True else: # Not active self.cmdError('not active',msg) continue elif msg == TimerThread.Command.CANCEL: if self.periodic: if self.last != None: # Skip next firing delay = self.last + self.period - time.time() self.timeout = int(delay * 1000) self.skip = True else: self._running = False self.timeout = None elif msg == TimerThread.Command.HALT: self.timeout = None self._running = False else: self.cmdError('loop',msg) self.logger.info("TimerThread.done")
[docs] def fire(self): ''' Fire timer from the 'outside' ''' if self.active: with self.lock: self.last = time.time() self.socket.send_pyobj(self.last)
[docs] def getPeriod(self): ''' Read out the period ''' return self.period
[docs] def setPeriod(self,_period): ''' Set the period - will be changed after the next firing. Period must be positive ''' assert type(_period) == float and _period > 0.0 self.period = _period
[docs] def getDelay(self): ''' Get the current delay (for sporadic timer) ''' return self.delay
[docs] def setDelay(self,_delay): ''' Set the current delay (for sporadic timer) ''' assert type(_delay) == float and _delay > 0.0 self.delay = _delay
[docs] def running(self): ''' Returns True if the timer is running ''' return self._running
[docs] class TimerPort(Port): ''' Timer port ''' def __init__(self, parent, name, spec): ''' Constructor for a Timer port. ''' super().__init__(parent,name,spec) self.logger = logging.getLogger(__name__) self.logger.info('TimerPort.__init__(%s)',name) self.instName = self.parent.name + '.' + self.name self.context = parent.context self.period = self.spec.period self.thread = None
[docs] def setup(self,owner,_disco): self.owner = owner self.thread = TimerThread(self) self.thread.start() time.sleep(0.001) assert self.instName == self.thread.name self.socket = self.context.socket(zmq.PAIR) # SUB self.socket.connect('inproc://timer_' + self.instName)
[docs] def finalize(self,_disco): pass
[docs] def reset(self): pass
[docs] def activate(self): ''' Activate the timer port ''' if self.thread != None: self.socket.send_pyobj(TimerThread.Command.ACTIVATE) if self.thread.getPeriod(): # Periodic timer self.socket.send_pyobj(TimerThread.Command.START)
[docs] def deactivate(self): ''' Deactivate the timer port ''' if self.thread != None: self.socket.send_pyobj(TimerThread.Command.DEACTIVATE)
[docs] def terminate(self): ''' Terminate the timer ''' if self.thread != None: self.logger.info("terminating") self.socket.send_pyobj(TimerThread.Command.TERMINATE) # self.thread.terminate() self.thread.join() self.logger.info("terminated")
[docs] def getPeriod(self): ''' Read the period of the periodic timer ''' if self.thread != None: return self.thread.getPeriod() else: return None
[docs] def setPeriod(self,_period): ''' Set the period - will be changed after the next firing. Period must be positive ''' if not (type(_period) == float and _period > 0.0): raise InvalidOperation(f"invalid argument: setPeriod({_period})") if self.thread != None: self.thread.setPeriod(_period)
[docs] def getDelay(self): ''' Get the current delay (for sporadic timer) ''' if self.thread != None: return self.thread.getDelay() else: return None
[docs] def setDelay(self,_delay): ''' Set the current delay (for sporadic timer) ''' if not (type(_delay) == float and _delay > 0.0): raise InvalidOperation(f"invalid argument: setDelay({_delay})") if self.thread != None: self.thread.setDelay(_delay)
[docs] def launch(self): ''' Launch (start) the sporadic timer ''' if self.thread != None: # self.thread.launch() self.socket.send_pyobj(TimerThread.Command.START)
[docs] def running(self): ''' Returns True if the timer is running ''' if self.thread != None: return self.thread.running() else: return None
[docs] def cancel(self): ''' Cancel the sporadic timer ''' if self.thread != None: # self.thread.cancel() self.socket.send_pyobj(TimerThread.Command.CANCEL)
[docs] def halt(self): ''' Halt the timer ''' if self.thread != None: # self.thread.halt() self.socket.send_pyobj(TimerThread.Command.HALT)
[docs] def fire(self): ''' Force a timer firing ''' if self.thread != None: self.thread.fire()
[docs] def getSocket(self): return self.socket
[docs] def inSocket(self): return True
[docs] def recv_pyobj(self): res = self.socket.recv_pyobj() return res
[docs] def recv(self): value = self.socket.recv_pyobj() res = bytearray(struct.pack("f", value)) return res
[docs] def send_pyobj(self,msg): raise InvalidOperation("attempt to send_pyobj() through a timer port")
[docs] def send(self,msg): raise InvalidOperation("attempt to send() through a timer port")