st-ten-1/src/components/component.py

227 lines
7.1 KiB
Python
Raw Normal View History

2022-06-01 16:37:19 +00:00
import logging
from lib.helpers import timing
from PyQt5.QtCore import QObject, QSemaphore, Qt, QTimer, pyqtSignal
class Component(QObject):
out = pyqtSignal(list)
_pause = pyqtSignal()
_resume = pyqtSignal()
_set_sources = pyqtSignal(dict)
_set_period = pyqtSignal(dict)
def __init__(
self,
config=None,
name=None,
period=None, # period to call _get
lazy=True, # whether or not accumulate periodic _get calls if falling behind
paused=False,
threaded=True,
):
super().__init__()
self.config = config
self.name = name if name is not None else str(id(self))
self._threaded = threaded
self._period = period
self._single_shot = lazy
self._paused = paused
self._started = False
self._running = False
self.sources = {}
if self._threaded:
self._lock = QSemaphore(1)
self._lock.acquire(max(self._lock.available(), 1))
self._timer = None
self.log = logging.getLogger(f"{self.__class__.__name__} ({self.name})")
if not self._threaded:
self.start()
def _config_changed(self):
self.log.info("reconfigure")
self.config_changed()
self.log.debug(f"config: {self.config}")
def config_changed(self):
pass
def start(self):
self._pause.connect(self._do_pause)
self._resume.connect(self._do_resume)
self._set_sources.connect(self._do_set_sources)
self._set_period.connect(self._do_set_period)
self.config.updated.connect(self._config_changed)
self._config_changed()
self._init_periodic()
self._started = True
if not self._paused:
self._do_resume()
elif self._threaded:
self._lock.release()
self.log.info("started")
@property
def started(self):
if self._threaded:
self._lock.acquire(max(self._lock.available(), 1))
started = self._started
if self._threaded:
self._lock.release()
return started
@property
def running(self):
if self._threaded:
self._lock.acquire(max(self._lock.available(), 1))
running = self._running
if self._threaded:
self._lock.release()
return running
def wait_ready(self, timeout=5):
if self._threaded:
timeout = round(timeout * 1000)
if self._lock.tryAcquire(max(self._lock.available(), 1), timeout):
self._lock.release()
else:
self._lock.release()
raise RuntimeError(f"{self.name} was not ready before timeout of {timeout}ms")
def pause(self):
if self._threaded:
self._lock.acquire(max(self._lock.available(), 1))
if self._running is False:
if self._threaded:
self._lock.release()
return
if self._threaded:
self._pause.emit()
self.wait_ready()
else:
self._do_pause()
def resume(self):
if self._threaded:
self._lock.acquire(max(self._lock.available(), 1))
if self._running is True:
if self._threaded:
self._lock.release()
return
if self._threaded:
self._resume.emit()
self.wait_ready()
else:
self._do_resume()
def set_sources(self, sources=None): # sources should be {"source_name": signal_to_connect}
if self._threaded:
self._lock.acquire(max(self._lock.available(), 1))
self._set_sources.emit(sources)
self.wait_ready()
else:
self._do_set_sources(sources)
def _init_periodic(self):
if self._period is not None:
if self._timer is None:
self._timer = QTimer()
self._timer.setTimerType(Qt.PreciseTimer)
self._timer.setSingleShot(self._single_shot)
self._timer.setInterval(round(self._period * 1000))
self.log.debug(f"init periodic: period: {self._period}, single shot: {self._single_shot}")
else:
self.log.debug("no init periodic")
def set_period(self, period=None, lazy=True):
if self._threaded:
self._lock.acquire(max(self._lock.available(), 1))
self._set_sources.emit({"period": period, "lazy": lazy})
self.wait_ready()
else:
self._do_set_period({"period": period, "lazy": lazy})
def _start_periodic(self):
if self._timer is not None:
self._timer.timeout.connect(self._get)
self._timer.start()
self.log.debug(f"started periodic: {list(self.sources)}")
else:
self.log.debug("no started periodic")
def _stop_periodic(self):
if self._timer is not None:
self._timer.stop()
try:
self._timer.timeout.disconnect()
except TypeError:
pass
self.log.debug(f"stopped periodic: {list(self.sources)}")
else:
self.log.debug("no stopped periodic")
def _connect_sources(self):
if self.sources is not None:
for source in self.sources.values():
source.connect(self._get)
self.log.debug(f"connected sources: {list(self.sources)}")
else:
self.log.debug("no connected sources")
def _disconnect_sources(self):
if self.sources is not None:
for source in self.sources.values():
try:
source.disconnect()
except TypeError:
pass
self.log.debug(f"disconnected sources: {list(self.sources)}")
else:
self.log.debug("no disconnected sources")
def _do_resume(self):
self._start_periodic()
self._connect_sources()
self._running = True
self.log.info("resumed")
if self._threaded:
self._lock.release()
def _do_pause(self):
self._stop_periodic()
self._disconnect_sources()
self._running = False
self.log.info("paused")
if self._threaded:
self._lock.release()
def _do_set_sources(self, sources):
if self._running:
self._disconnect_sources()
self.sources = sources
if self._running:
self._connect_sources()
self.log.info("set sources")
if self._threaded:
self._lock.release()
def _do_set_period(self, spec):
self._period = spec.get("period", None)
self._single_shot = spec.get("lazy", True)
self._init_periodic()
self.log.info("set period")
if self._threaded:
self._lock.release()
def _get(self, data=None):
if data is None:
data = [None]
got = [{"time": timing(), self.name: d} for d in data]
self.out.emit(got)
self.log.debug(f"_get: {got}")
if self._single_shot:
self._timer.start()
def set(self, val):
self.log.debug(f"set: {val}")