st-ten-1/src/components/vision.py
matteo porta 304fd9d664 wip
2022-08-02 18:15:30 +02:00

792 lines
35 KiB
Python

import logging
import os
import sys
import traceback
from configparser import ConfigParser
from pathlib import Path
import cv2
import numpy as np
from PyQt5.QtCore import (QFileSystemWatcher, QLineF, QMutex, QPointF, QRectF,
Qt, QThread, pyqtSignal)
from PyQt5.QtGui import (QBrush, QColor, QFont, QImage, QPainter, QPainterPath,
QPen, QPixmap)
from PyQt5.QtWidgets import QApplication
from .component import Component
from .consumer import Consumer
if "--no-gpu" in sys.argv:
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
import tensorflow as tf
from lib.helpers.object_detection.utils import label_map_util
if "--no-edgetpu" not in sys.argv:
try:
from pycoral.utils.edgetpu import make_interpreter
except (ImportError, ModuleNotFoundError):
logging.exception(traceback.format_exc())
def make_interpreter(*args, **kwargs):
raise ValueError("the 'pycoral' module is not available")
else:
def make_interpreter(*args, **kwargs):
raise ValueError("\"--no-edgetpu\" in sys.argv")
if "--no-tflite" not in sys.argv:
try:
Interpreter = tf.lite.Interpreter
# from tflite_runtime.interpreter import Interpreter
except (ImportError, ModuleNotFoundError):
logging.exception(traceback.format_exc())
def Interpreter(*args, **kwargs):
raise ValueError("the 'tflite-runtime' module is not available")
else:
def Interpreter(*args, **kwargs):
raise ValueError("\"--no-tflite\" in sys.argv")
if "--fail-vision" not in sys.argv:
vision_override = None
else:
vision_override = False
# # Patch the location of gfile
# tf.gfile = tf.io.gfile
class Vision(Component):
"""everything is expected the have shape with height (y) first then width (x)"""
status_signal = pyqtSignal(dict)
loading_model_signal = pyqtSignal(dict)
def __init__(self, config=None, name=None, period=None, lazy=True, paused=False, threaded=True):
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded)
self.lock = QMutex()
self.simulate = "--sim-vision" in sys.argv
def start(self):
self.model = None
# VISION THREAD
self.vision_consumer = Consumer(work=self.vision_consumer_work, work_fifo=True, drop_fifo=True, work_maxlen=1, name="vision_consumer", paused=False)
self.vision_consumer_thread = QThread()
self.vision_consumer_thread.setTerminationEnabled(True)
self.vision_consumer.moveToThread(self.vision_consumer_thread)
self.vision_consumer_thread.started.connect(self.vision_consumer.start)
self.vision_consumer_thread.start()
if "--debugger-workaround" in sys.argv:
QApplication.processEvents()
QThread.msleep(1000)
QApplication.processEvents()
self.vision_consumer.wait_ready()
# RENDER THREAD
self.render_consumer = Consumer(work=self.render_consumer_work, work_fifo=True, drop_fifo=True, work_maxlen=1, name="render_consumer", paused=False)
self.render_consumer_thread = QThread()
self.render_consumer_thread.setTerminationEnabled(True)
self.render_consumer.moveToThread(self.render_consumer_thread)
self.render_consumer_thread.started.connect(self.render_consumer.start)
self.render_consumer_thread.start()
if "--debugger-workaround" in sys.argv:
QApplication.processEvents()
QThread.msleep(1000)
QApplication.processEvents()
self.render_consumer.wait_ready()
# CONNECT CONSUMERS
self.vision_consumer.out.connect(self.process_vision_consumed)
self.render_consumer.out.connect(self.process_render_consumed)
super().start()
def config_changed(self):
# OBJECT DETECTION
self.detection_threshold = float(self.config[self.name].get("detection_threshold", 0.5))
# recipe
self.zones = None
self.labels = None
# LOAD RECIPE
self.recipes_dir = Path(self.config[self.name].get("recipes_dir", "./config/vision/recipes"))
self.set_recipe(None)
self.recipe_watcher = QFileSystemWatcher([])
self.recipe_watcher.fileChanged.connect(self._set_recipe)
# LOAD MODEL
self.models_dir = Path(self.config[self.name].get("models_dir", "./data/neural_networks"))
self.allowed_modes = dict.fromkeys([
"edgetpu",
"tflite",
"normal",
])
if "--no-edgetpu" in sys.argv:
self.allowed_modes.pop("edgetpu", None)
if "--no-tflite" in sys.argv:
self.allowed_modes.pop("edgetpu", None)
self.allowed_modes.pop("tflite", None)
self.load_model(self.config[self.name].get("neural_network", None))
# LOAD LABELS
label_map = label_map_util.load_labelmap("./config/vision/labels/labels.pbtxt")
self.num_classes = len(label_map.item)
categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=self.num_classes, use_display_name=True)
self.category_index = label_map_util.create_category_index(categories)
for k in self.category_index:
self.category_index[k]["color"] = self.category_index[k]["color"].replace("0x", "#")
self.classes_map = {c["name"]: k for k, c in self.category_index.items()}
self.zone_detection_filter_mode = self.config[self.name].get("zone_detection_filter_mode", "box_inside")
self.zone_detection_preference_mode = self.config[self.name].get("zone_detection_preference_mode", "score")
@staticmethod
def get_center(rect):
return [(rect[0] + rect[2]) / 2, (rect[1] + rect[3]) / 2]
@staticmethod
def get_size(rect):
return [rect[2] - rect[0], rect[3] - rect[1]]
@staticmethod
def get_box(center, size):
return [center[0] - size[0] / 2, center[1] - size[1] / 2, center[0] + size[0] / 2, center[1] + size[1] / 2]
@staticmethod
def get_distance(p1, p2):
return pow((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2, 1 / 2)
def clear_recipe(self):
self.recipe = None
self.markers = {}
self.zones = {}
self.labels = {}
def _set_recipe(self, recipe_path):
recipe_path = str(recipe_path)
self.log.info(f"changing recipe to {recipe_path!r}")
watched = self.recipe_watcher.files()
if recipe_path == "" and len(watched) == 0: # skip bad watcher signals
return
if len(watched) > 0:
self.recipe_watcher.removePaths(watched)
self.recipe_path = recipe_path
try:
if not os.path.isfile(self.recipe_path):
raise AssertionError(f"Recipe file {self.recipe_path!r} could not be found.")
config = ConfigParser(inline_comment_prefixes="#")
read = config.read(self.recipe_path)
if len(read) != 1 or self.recipe_path not in read:
raise AssertionError("Recipe could not be read.")
os.path.splitext(os.path.basename(read[0]))[0]
self.markers = self.parse_markers(config._sections.get("markers", None))
self.zones = self.parse_zones(config._sections.get("zones", None))
self.labels = self.parse_labels(config._sections.get("labels", None))
self.recipe_watcher.addPath(str(self.recipe_path))
except Exception:
self.log.exception(traceback.format_exc())
self.log.exception(f"Error reading {self.recipe_path!r}:")
self.clear_recipe()
self.status_signal.emit({
"recipe": self.recipe,
"markers": self.markers,
"zones": self.zones,
"labels": self.labels,
})
def set_recipe(self, recipe=None):
if recipe is None:
self.clear_recipe()
else:
self.recipe = recipe
self._set_recipe(self.recipes_dir / str(recipe))
def parse_markers(self, config=None):
if config is None:
raise AssertionError(f"Recipe file {self.recipe_path!r} does not contain the 'markers' section.")
config = {}
markers = {}
for marker_name, marker_spec in config.items():
try:
center, size, fill_color, border_color, border_thickness, shape = marker_spec.split(" ")
center = list(reversed(list(map(float, center.split(",")))))
size = list(reversed(list(map(float, size.split(",")))))
if len(size) == 1:
size = [size[0], size[0]]
fill_color = QColor(fill_color.replace("0x", "#"))
border_color = QColor(border_color.replace("0x", "#"))
border_thickness = float(border_thickness)
shape = shape.lower()
markers[marker_name] = {
"border_color": border_color,
"border_thickness": border_thickness,
"center": center,
"fill_color": fill_color,
"shape": shape,
"size": size,
}
except Exception:
self.log.exception(traceback.format_exc())
self.log.exception(f"marker {marker_name!r} in recipe file {self.recipe_path!r} could not be parsed. spec: {marker_spec!r}")
return markers
def parse_zones(self, config=None):
if config is None:
raise AssertionError(f"Recipe file {self.recipe_path!r} does not contain the 'zones' section.")
config = {}
zones = {}
for zone_name, zone_spec in config.items():
zone_name = zone_name.upper()
try:
center, size, d_class = zone_spec.split(" ")
center = list(reversed(list(map(float, center.split(",")))))
if "," in size:
size = list(reversed(list(map(float, size.split(",")))))
shape = "rect"
else:
size = [float(size)] * 2
shape = "ellipse"
d_class = self.category_index[self.classes_map[d_class]]
zones[zone_name] = {
"border_color": QColor(d_class["color"]),
"border_thickness": 25,
"box": self.get_box(center, size),
"convert_negative_placement": False,
"center": center,
"class": d_class,
"fill_color": QColor("#00000000"),
"pen_line": "DashLine",
"shape": shape,
"size": size,
}
except Exception:
self.log.exception(traceback.format_exc())
self.log.exception(f"region {zone_name!r} in recipe file {self.recipe_path!r} could not be parsed. spec: {zone_spec!r}")
return zones
def parse_labels(self, config=None):
if config is None:
config = {}
labels = {}
for label_name, label_spec in config.items():
try:
location, font_size, fill_color, border_color, border_thickness, text = label_spec.split(" ", 5)
location = list(reversed(list(map(float, location.split(",")))))
font_size = float(font_size)
fill_color = QColor(fill_color.replace("0x", "#"))
border_color = QColor(border_color.replace("0x", "#"))
border_thickness = float(border_thickness)
text = text.replace("\\n", "\n").replace("\\t", "\t")
labels[label_name] = {
"border_color": border_color,
"border_thickness": border_thickness,
"fill_color": fill_color,
"font_size": font_size,
"location": location,
"opacity": 1,
"shape": "text",
"text": text,
}
except Exception:
self.log.exception(traceback.format_exc())
self.log.exception(f"label {label_name!r} in recipe file {self.recipe_path!r} could not be parsed. spec: {label_spec!r}")
return labels
def load_model(self, model=None):
# print("VISION CONSUMER", str(int(QThread.currentThreadId())), flush=True)
self.log.info(f"requested neural network: {model!r}")
if model is None or model.lower() in [
"",
"any",
"last",
"latest",
"newest",
"none",
]:
model_name = sorted([d for d in os.listdir(self.models_dir) if os.path.isdir(self.models_dir / d)], reverse=True)[0]
else:
model_name = model
self.log.info(f"loading neural network: {model_name!r}")
self.loading_model_signal.emit({"status": "loading"})
self.lock.lock()
tf_mode = None
# reset tflite variables
interpreter = None
if self.simulate:
tf_mode = "simulation"
interpreter = None
if tf_mode is None and "edgetpu" in self.allowed_modes:
try:
# create tflite edgetpu interpreter
interpreter = make_interpreter(str(self.models_dir / model_name / f"{model_name}_edgetpu.tflite"))
tf_mode = "edgetpu"
except Exception:
self.log.exception(traceback.format_exc())
if tf_mode is None and "tflite" in self.allowed_modes:
try:
# create tflite cpu interpreter
interpreter = Interpreter(str(self.models_dir / model_name / f"{model_name}.tflite"))
tf_mode = "tflite"
except Exception:
self.log.exception(traceback.format_exc())
# reset tensorflow variables
model = None
if tf_mode is None and "normal" in self.allowed_modes:
try:
# create tensorflow model
model = tf.saved_model.load(str(self.models_dir / model_name)).signatures["serving_default"]
tf_mode = "normal"
except Exception:
self.log.exception(traceback.format_exc())
self.lock.unlock()
if tf_mode is None:
raise RuntimeError("failed initialize any neural network model")
self.tf_mode = tf_mode
self.model_name = model_name
if interpreter is not None:
# if there is a new tflite interpreter initialize it
interpreter.allocate_tensors()
interpreter.invoke() # warmup
self.interpreter = interpreter
# if there is a new model to be used, remove previous model if present
if model is not None and self.model is not None:
tf.keras.backend.clear_session()
self.model = model
self.log.info(f"initialized model {self.model!r} with mode {self.tf_mode!r}")
self.loading_model_signal.emit({"status": "done"})
def check_features(self, frame, lock=True):
if self.interpreter is not None and frame.shape != self.interpreter.get_input_details()[0]["shape"][1:3]:
tensor = np.expand_dims(cv2.resize(frame, self.interpreter.get_input_details()[0]["shape"][1:3], interpolation=cv2.INTER_LINEAR), axis=0)
else:
frame_resized = cv2.resize(frame, (256, 256), interpolation=cv2.INTER_LINEAR)
tensor = tf.convert_to_tensor(np.asarray(frame_resized))
tensor = tensor[tf.newaxis, ...]
# tensor = np.expand_dims(frame, axis=0)
# Run inference
if lock:
self.lock.lock()
if self.simulate or self.tf_mode == "simulation":
detections = {
"detection_scores": [[1.0]],
"detection_boxes": [[[0.2, 0.2, 0.8, 0.8]]],
"detection_classes": [[1]],
}
if lock:
self.lock.unlock()
elif self.tf_mode in {"edgetpu", "tflite"}:
i_d = self.interpreter.get_input_details()
# print(i_d)
o_d = self.interpreter.get_output_details()
# print(o_d)
self.interpreter.set_tensor(i_d[0]["index"], tensor)
self.interpreter.invoke()
# PARSE TFLITE DETECTIONS
# signature_list = self.interpreter._get_full_signature_list()
# if signature_list:
# if len(signature_list) > 1:
# raise ValueError("Only support model with one signature.")
# signature = signature_list[next(iter(signature_list))]
# # count = int(self.interpreter.get_tensor(signature["outputs"]["output_0"])[0])
# scores = self.interpreter.get_tensor(signature["outputs"]["output_1"])[0]
# class_ids = self.interpreter.get_tensor(signature["outputs"]["output_2"])[0]
# boxes = self.interpreter.get_tensor(signature["outputs"]["output_3"])[0]
if self.interpreter.get_tensor(o_d[3]["index"]).size == 1:
boxes = self.interpreter.get_tensor(o_d[0]["index"])[0]
class_ids = self.interpreter.get_tensor(o_d[1]["index"])[0]
scores = self.interpreter.get_tensor(o_d[2]["index"])[0]
# count = int(self.interpreter.get_tensor(o_d[3]["index"])[0])
else:
scores = self.interpreter.get_tensor(o_d[0]["index"])[0]
boxes = self.interpreter.get_tensor(o_d[1]["index"])[0]
# count = int(self.interpreter.get_tensor(o_d[2]["index"])[0])
class_ids = self.interpreter.get_tensor(o_d[3]["index"])[0]
if lock:
self.lock.unlock()
detections = {
"detection_scores": [scores],
"detection_boxes": [boxes],
"detection_classes": [map(lambda class_id: class_id + 1, class_ids)],
}
else:
detections = self.model(tensor)
if lock:
self.lock.unlock()
detections = {
"detection_scores": detections["detection_scores"].numpy().tolist(),
"detection_boxes": detections["detection_boxes"].numpy().tolist(),
"detection_classes": detections["detection_classes"],
}
# WARNING: results other than the ones related to tensor[-1] will be discarded
parsed_detections = []
for d_score, d_box, d_class in zip( # , d_mask in zip(
detections["detection_scores"][-1],
detections["detection_boxes"][-1],
detections["detection_classes"][-1],
# detections["detection_masks"][-1],
):
if d_score < self.detection_threshold:
continue
box = list(d_box)
box = [i * s for i, s in zip(box, frame.shape[:2] * 2)] # rescale detection to frame size
detection = {
"score": d_score,
"box": box,
"class": self.category_index[int(d_class)],
# "mask": d_mask,
"center": self.get_center(box),
"size": self.get_size(box),
}
parsed_detections.append(detection)
return parsed_detections
def detections_to_items(self, detections):
# DRAW DETECTIONS
if detections is not None and len(detections):
style = {
"border_thickness": 25,
"fill_color": QColor("#00000000"),
"shape": "rect",
"convert_negative_placement": False,
}
items = {}
for item_name, item in enumerate(detections):
items[str(item_name)] = {
**item,
**style,
"border_color": QColor(item["class"]["color"]),
}
return items
else:
return {}
def process_detections(self, detections):
if self.zones is None or not len(self.zones):
return None
# MATCH DETECTIONS WITH RECIPE
results = dict.fromkeys(self.zones)
for detection in detections:
# find closest zone center to the detection
# filtering out those that do not contain the detection
min_distance = sys.maxsize
closest_zone = None
for zone_name, zone in self.zones.items():
distance = self.get_distance(detection["center"], zone["center"])
if zone["shape"] == "rect":
if self.zone_detection_filter_mode == "center_inside":
outside_zone = any([
detection["center"][0] < zone["box"][0],
detection["center"][0] > zone["box"][2],
detection["center"][1] < zone["box"][1],
detection["center"][1] > zone["box"][3],
])
elif self.zone_detection_filter_mode == "box_inside":
outside_zone = any([
detection["box"][0] < zone["box"][0],
detection["box"][2] > zone["box"][2],
detection["box"][1] < zone["box"][1],
detection["box"][3] > zone["box"][3],
])
elif self.zone_detection_filter_mode == "box_touches":
outside_zone = any([
detection["box"][2] < zone["box"][0],
detection["box"][0] > zone["box"][2],
detection["box"][3] < zone["box"][1],
detection["box"][1] > zone["box"][3],
])
else:
raise NotImplementedError(f"invalid zone_detection_filter_mode: {self.zone_detection_filter_mode!r}")
elif zone["shape"] == "ellipse": # it's a circle
if self.zone_detection_filter_mode == "center_inside":
outside_zone = distance > zone["size"][0] / 2
elif self.zone_detection_filter_mode == "box_inside":
outside_zone = distance + max(detection["size"]) / 2 > zone["size"][0] / 2
elif self.zone_detection_filter_mode == "box_touches":
outside_zone = distance - max(detection["size"]) / 2 > zone["size"][0] / 2
else:
raise NotImplementedError(f"invalid zone_detection_filter_mode: {self.zone_detection_filter_mode!r}")
else:
raise NotImplementedError(f"invalid zone shape: {zone['shape']!r}")
if not outside_zone and distance < min_distance:
min_distance = distance
closest_zone = zone_name
if closest_zone is not None:
# if closest zone already has a matching detection
# replace it only if the current one is preferred
if results[closest_zone] is not None:
if self.zone_detection_preference_mode == "distance":
if min_distance >= results[closest_zone]["zone_distance"]:
continue
elif self.zone_detection_preference_mode == "score":
if detection["score"] <= results[closest_zone]["score"]:
continue
else:
raise NotImplementedError(f"invalid zone_detection_preference_mode: {self.zone_detection_preference_mode!r}")
results[closest_zone] = detection.copy()
results[closest_zone]["zone_distance"] = min_distance
# check detections against recipe
checked = {}
for zone_name, detection in results.items():
if detection is None:
# dummy empty detection if nothing detected
detection = {
"box": [0, 0, 0, 0],
"center": [0, 0],
"class": {
"id": None,
"name": "no_detection",
"color": "rgb(0,0,0)",
},
# "mask": [],
"score": 1,
"size": [0, 0],
}
if zone_name not in self.zones or self.zones[zone_name]["class"]["id"] in {"no_detection", "none", }:
expected_class = {
"id": None,
"name": "no_detection",
"color": "rgb(0,0,0)",
}
else:
expected_class = self.zones[zone_name]["class"]
checked[zone_name] = {
"ok": detection is not None and detection["class"]["id"] == expected_class["id"],
"expected": expected_class,
"detection": detection,
}
global vision_override
if vision_override is None:
ok = all(map(lambda detection: detection["ok"] is True, checked.values()))
else:
ok = vision_override
return {
"ok": ok,
"results": checked,
}
def results_to_items(self, results, ):
# DRAW ZONES RESULTS
if self.zones is not None and len(self.zones) and results is not None and len(results):
style = {
"pen_line": "SolidLine",
"border_thickness": 50,
"fill_color": QColor("#00000000"),
}
items = {
"_global_result": {
"box": [
style["border_thickness"],
style["border_thickness"],
-style["border_thickness"] * 2,
-style["border_thickness"] * 2,
],
"shape": "rect",
**style,
"border_color": Qt.green if results["ok"] else Qt.red,
}
}
for item_name, item in results["results"].items():
zone = self.zones[item_name]
items[str(item_name)] = {
**zone,
**style,
"border_color": Qt.green if item["ok"] else Qt.red,
}
return items
else:
return {}
@staticmethod
def convert_negative_placement(p, painter):
for k in p:
if p[k] < 0:
if k.startswith("x") or k.startswith("w"):
p[k] += painter.device().width()
elif k.startswith("y") or k.startswith("h"):
p[k] += painter.device().height()
else:
raise AssertionError("could not detect variable direction")
@staticmethod
def apply_placement_offset(p, offset):
for k in p:
if k.startswith("x"):
p[k] += offset["x"]
elif k.startswith("y"):
p[k] += offset["y"]
else:
raise AssertionError("could not detect variable direction")
def render_items(self, items, offset=None, qimage=None, painter=None):
if offset is None:
offset = {"x": 0, "y": 0}
if painter is None:
if qimage is None:
raise AssertionError("one of 'qimage' or 'painter' parameter must not be None")
painter = QPainter()
painter.begin(qimage)
for item_name, item in items.items():
try:
convert_negative_placement = item.get("convert_negative_placement", True)
if "box" in item:
p = {"x1": item["box"][1], "y1": item["box"][0], "x2": item["box"][3], "y2": item["box"][2], }
if convert_negative_placement:
Vision.convert_negative_placement(p, painter)
Vision.apply_placement_offset(p, offset)
p["w"], p["h"] = p["x2"] - p["x1"], p["y2"] - p["y1"]
p["xc"], p["yc"] = Vision.get_center([p["x1"], p["y1"], p["x2"], p["y2"]])
elif "location" in item:
p = {"x1": item["location"][1], "y1": item["location"][0], }
if convert_negative_placement:
Vision.convert_negative_placement(p, painter)
Vision.apply_placement_offset(p, offset)
if "size" in item:
p["w"], p["h"] = item["size"][1], item["size"][0]
p["x2"], p["y2"] = p["x1"] + p["w"], p["y1"] + p["h"]
p["xc"], p["yc"] = Vision.get_center([p["x1"], p["y1"], p["x2"], p["y2"]])
else:
p["w"], p["h"] = 0, 0
p["x2"], p["y2"] = p["x1"], p["y1"]
p["xc"], p["yc"] = p["x1"], p["y1"]
elif "center" in item and "size" in item:
p = {"xc": item["center"][1], "yc": item["center"][0], }
if convert_negative_placement:
Vision.convert_negative_placement(p, painter)
Vision.apply_placement_offset(p, offset)
p["w"], p["h"] = item["size"][1], item["size"][0]
p["x1"], p["y1"], p["x2"], p["y2"] = Vision.get_box([p["xc"], p["yc"]], [p["w"], p["h"]])
else:
raise AssertionError("item has no valid positioning information")
painter.setOpacity(item.get("opacity", 0.5))
painter.setBrush(QBrush(item.get("fill_color", QColor("#ffffff")), getattr(Qt, item.get("brush_pattern", "SolidPattern"))))
painter.setPen(QPen(
item.get("border_color", QColor("#000000")),
item.get("border_thickness", 1),
getattr(Qt, item.get("pen_line", "SolidLine")),
getattr(Qt, item.get("pen_cap", "SquareCap")),
getattr(Qt, item.get("pen_join", "MiterJoin")),
))
if item["shape"] == "ellipse":
painter.drawEllipse(QPointF(p["xc"], p["yc"]), p["w"] / 2, p["h"] / 2)
elif item["shape"] == "cross":
painter.drawLine(QLineF(p["xc"], p["y1"], p["xc"], p["y2"]))
painter.drawLine(QLineF(p["x1"], p["yc"], p["x2"], p["yc"]))
elif item["shape"] == "line":
painter.drawLine(QLineF(p["xc"], p["yc"], p["x2"], p["y2"]))
elif item["shape"] == "rect":
painter.drawRect(QRectF(QPointF(p["x1"], p["y1"]), QPointF(p["x2"], p["y2"])))
elif item["shape"] == "text":
old_render_hints = painter.renderHints()
painter.setRenderHints(QPainter.Antialiasing | QPainter.TextAntialiasing, True)
font = QFont()
font.setStyleHint(QFont.SansSerif, QFont.PreferDefault | QFont.PreferAntialias)
font.setBold(item.get("font_bold", False))
font.setItalic(item.get("font_italic", False))
font.setKerning(item.get("font_kerning", True))
font.setLetterSpacing(QFont.AbsoluteSpacing, item.get("font_letter_spacing", 0))
font.setWordSpacing(item.get("font_word_spacing", 0))
font.setPixelSize(round(item.get("font_size", 25)))
path = QPainterPath()
path.addText(p["x1"], p["y1"], font, item["text"])
painter.drawPath(path)
painter.setRenderHints(old_render_hints, True)
else:
raise NotImplementedError(f"item {item_name!r} has an invalid shape: {item['shape']!r}")
except Exception:
self.log.exception("".join(traceback.format_stack()))
self.log.exception(traceback.format_exc())
self.log.error(f"item {item_name!r} could not be drawn.")
def render(self, frame, detections=None, vision_results=None, mask=True, offset=None):
if mask is True:
qframe = QImage(
frame.shape[1], # width
frame.shape[0], # height
QImage.Format_RGBA8888
)
else:
aframe = cv2.cvtColor(frame, cv2.COLOR_RGB2RGBA)
qframe = QImage(
aframe.data,
aframe.shape[1], # width
aframe.shape[0], # height
aframe.shape[2] * frame.shape[1], # width * channels
QImage.Format_RGBA8888
)
painter = QPainter()
painter.begin(qframe)
for items in [
self.markers,
self.zones,
self.labels,
self.detections_to_items(detections),
self.results_to_items(vision_results)
]:
if items is not None:
try:
self.render_items(
items,
offset=offset,
painter=painter,
)
except Exception:
self.log.exception(traceback.format_exc())
painter.end()
return qframe
def _get(self, data=None):
# print("VISION", str(int(QThread.currentThreadId())), flush=True)
if data is None:
return
frame = data[-1][list(self.sources)[0]]
if frame is not None:
# ADD FRAMETO VISION_CONSUMER QUEUE
self.vision_consumer.add_consumable({"frame": frame})
super()._get(emit=False)
def vision_consumer_work(self, consumable=None):
# VISION_CONSUMER TASK
if consumable is None:
return
detections = self.check_features(consumable["frame"])
results = self.process_detections(detections)
return {"detections": detections, "results": results}
def process_vision_consumed(self, data=None):
# print("VISION", str(int(QThread.currentThreadId())), flush=True)
if data is None:
return
# ADD VISION RETURNED FROM VISION_CONSUMER TO RENDER_CONSUMER QUEUE
data = data[-1][self.vision_consumer.name]
if data is not None:
# super()._get([{
# "frame": data["consumed"]["frame"],
# "detections": data["result"]["detections"],
# "results": data["result"]["results"],
# }])
self.render_consumer.add_consumable({
"frame": data["consumed"]["frame"],
"detections": data["result"]["detections"],
"results": data["result"]["results"],
})
def render_consumer_work(self, consumable=None):
# RENDER_CONSUMER TASK
if consumable is None:
return
render = QPixmap.fromImage(self.render(
consumable["frame"],
detections=consumable["detections"],
vision_results=consumable["results"],
mask=False,
offset=None,
))
return render
def process_render_consumed(self, data=None):
# print("VISION", str(int(QThread.currentThreadId())), flush=True)
if data is None:
return
# EMIT VISION AND RENDER RESULTS RETURNED FROM RENDER_CONSUMER
data = data[-1][self.render_consumer.name]
if data is not None:
super()._get([{
"frame": data["consumed"]["frame"],
"detections": data["consumed"]["detections"],
"results": data["consumed"]["results"],
"render": data["result"],
}])