st-ten-1/src/components/consumer.py
2022-06-22 17:18:29 +02:00

51 lines
1.8 KiB
Python

from collections import deque
from PyQt5.QtCore import QMutex
from .component import Component
class Consumer(Component):
def __init__(self, work, work_fifo=True, drop_fifo=True, work_maxlen=None, config=None, name=None, holdoff=0.1, lazy=True, paused=False, threaded=True):
super().__init__(config=config, name=name, period=holdoff, lazy=lazy, paused=paused, threaded=threaded)
self.work = work
self.work_fifo = work_fifo
self.drop_fifo = drop_fifo
self.work_queue = deque(maxlen=work_maxlen)
self.lock = QMutex()
def add_consumable(self, consumable): # should be called from another thread
self.lock.lock()
# check work queue is not full
if self.work_queue.maxlen is not None and len(self.work_queue) >= self.work_queue.maxlen:
if self.drop_fifo:
skipped = self.work_queue.popleft()
else:
skipped = self.work_queue.pop()
# self.log.debug(f"skipped consumable: {skipped!r}")
self.log.debug("skipped consumable")
# add consumable to work queue
self.work_queue.append(consumable)
self.lock.unlock()
def _get(self):
self.lock.lock()
if len(self.work_queue):
if self.work_fifo:
consumed = self.work_queue.popleft()
else:
consumed = self.work_queue.pop()
self.lock.unlock()
self.log.debug("working...")
result = None
result = self.work(consumed)
# self.log.debug(f"result: {result!r}")
self.log.debug("done working")
super()._get([{
"consumed": consumed,
"result": result,
}])
else:
self.lock.unlock()
super()._get(emit=False)