from collections import deque from PyQt5.QtCore import QMutex from .component import Component class Consumer(Component): def __init__(self, work, work_fifo=True, drop_fifo=True, work_maxlen=None, config=None, name=None, holdoff=0.1, lazy=True, paused=False, threaded=True): super().__init__(config=config, name=name, period=holdoff, lazy=lazy, paused=paused, threaded=threaded) self.work = work self.work_fifo = work_fifo self.drop_fifo = drop_fifo self.work_queue = deque(maxlen=work_maxlen) self.lock = QMutex() def add_consumable(self, consumable): # should be called from another thread self.lock.lock() # check work queue is not full if self.work_queue.maxlen is not None and len(self.work_queue) >= self.work_queue.maxlen: if self.drop_fifo: skipped = self.work_queue.popleft() else: skipped = self.work_queue.pop() # self.log.debug(f"skipped consumable: {skipped!r}") self.log.debug("skipped consumable") # add consumable to work queue self.work_queue.append(consumable) self.lock.unlock() def _get(self): self.lock.lock() if len(self.work_queue): if self.work_fifo: consumed = self.work_queue.popleft() else: consumed = self.work_queue.pop() self.lock.unlock() self.log.debug("working...") result = None result = self.work(consumed) # self.log.debug(f"result: {result!r}") self.log.debug("done working") super()._get([{ "consumed": consumed, "result": result, }]) else: self.lock.unlock() super()._get(emit=False)