import os import sys import traceback from configparser import ConfigParser from pathlib import Path import cv2 import numpy import numpy as np import tensorflow as tf from lib.helpers.object_detection.utils import label_map_util from PyQt5.QtCore import (QFileSystemWatcher, QMutex, QThread, QTimer, pyqtSignal) from PyQt5.QtGui import QColor from .component import Component from .consumer import Consumer if "--no-edgetpu" not in sys.argv: if "--no-tflite" not in sys.argv: from pycoral.utils.edgetpu import make_interpreter else: def make_interpreter(*args, **kwargs): raise ValueError("\"--no-edgetpu\" in sys.argv") if "--no-tflite" not in sys.argv: from pycoral.adapters import detect from tflite_runtime.interpreter import Interpreter else: def Interpreter(*args, **kwargs): raise ValueError("\"--no-tflite\" in sys.argv") # os.environ["CUDA_VISIBLE_DEVICES"] = "-1" # # # Patch the location of gfile # tf.gfile = tf.io.gfile class Vision(Component): status_signal = pyqtSignal(dict) loading_model_signal = pyqtSignal(dict) def __init__(self, config=None, name=None, period=None, lazy=True, paused=False, threaded=True): super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded) self.lock = QMutex() self.simulate = "--sim-vision" in sys.argv def start(self): self.model = None self.consumer = Consumer(work=self.check_features, work_fifo=True, drop_fifo=True, work_maxlen=1, name="vision_consumer", paused=False) self.consumer_thread = QThread() self.consumer_thread.setTerminationEnabled(True) self.consumer.moveToThread(self.consumer_thread) self.consumer_thread.started.connect(self.consumer.start) self.consumer_thread.start() self.consumer.wait_ready() self.consumer.out.connect(self.process_consumed) super().start() def config_changed(self): # OBJECT DETECTION self.detection_threshold = float(self.config[self.name]["detection_threshold"]) # recipe self.zones = None self.labels = None # LOAD RECIPE self.recipes_dir = Path(self.config[self.name].get("recipes_dir", "./config/vision/recipes")) self.set_recipe(None) self.recipe_watcher = QFileSystemWatcher([]) self.recipe_watcher.fileChanged.connect(self._set_recipe) # LOAD MODEL self.models_dir = Path(self.config[self.name].get("models_dir", "./data/neural_networks")) self.allowed_modes = dict.fromkeys([ "edgetpu", "tflite_cpu", "normal", ]) if "--no-edgetpu" in sys.argv: self.allowed_modes.pop("edgetpu", None) if "--no-tflite" in sys.argv: self.allowed_modes.pop("edgetpu", None) self.allowed_modes.pop("tflite_cpu", None) self.load_model(self.config[self.name].get("neural_network", None)) # LOAD LABELS label_map = label_map_util.load_labelmap("./config/vision/labels/labels.pbtxt") self.num_classes = len(label_map.item) categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=self.num_classes, use_display_name=True) self.category_index = label_map_util.create_category_index(categories) self.classes_map = {c["name"]: k for k, c in self.category_index.items()} def clear_recipe(self): self.recipe = None self.points = {} self.zones = {} self.labels = {} def _set_recipe(self, recipe_path): recipe_path = str(recipe_path) self.log.info(f"changing recipe to {recipe_path!r}") watched = self.recipe_watcher.files() if recipe_path == "" and len(watched) == 0: # skip bad watcher signals return if len(watched) > 0: self.recipe_watcher.removePaths(watched) self.recipe_path = recipe_path try: if not os.path.isfile(self.recipe_path): raise AssertionError(f"Recipe file {self.recipe_path!r} could not be found.") config = ConfigParser(inline_comment_prefixes="#") read = config.read(self.recipe_path) if len(read) != 1 or self.recipe_path not in read: raise AssertionError("Recipe could not be read.") os.path.splitext(os.path.basename(read[0]))[0] self.points = self.parse_points(config.get["shapes"]) self.zones = self.parse_zones(config.get["zones"]) self.labels = self.parse_labels(config.get("labels", None)) self.recipe_watcher.addPath(str(self.recipe_path)) except Exception: self.log.exception(traceback.format_exc()) self.log.exception(f"Error reading {self.recipe_path!r}:") self.clear_recipe() self.status_signal.emit({ "recipe": self.recipe, "points": self.points, "zones": self.zones, "labels": self.labels, }) def set_recipe(self, recipe=None): if recipe is None: self.clear_recipe() else: self._set_recipe(self.recipes_dir / str(recipe)) def parse_points(self, config=None): if config is None: raise AssertionError(f"Recipe file {self.recipe_path!r} does not contain the 'shapes' section.") config = {} points = {} for point_name, point_spec in config.items(): try: center, size, fill_color, border_color, border_thickness, shape = point_spec.split(" ") center = list(map(float, center.split(","))) center[1] = -center[1] size = list(map(float, size.split(","))) if len(size) == 1: size = [size[0], size[0]] fill_color = QColor(fill_color.replace("0x", "#")) border_color = QColor(border_color.replace("0x", "#")) border_thickness = float(border_thickness) shape = shape.lower() points[point_name] = { "center": center, "size": size, "fill_color": fill_color, "border_color": border_color, "border_thickness": border_thickness, "shape": shape, } except Exception: self.log.exception(traceback.format_exc()) self.log.exception(f"point {point_name!r} in recipe file {self.recipe_path!r} could not be parsed. spec: {point_spec!r}") return points def parse_zones(self, config=None): if config is None: raise AssertionError(f"Recipe file {self.recipe_path!r} does not contain the 'zones' section.") config = {} zones = {} for zone_name, zone_spec in config.items(): zone_name = zone_name.upper() try: center, margin, d_class = zone_spec.split(" ") center = list(map(float, center.split(","))) center[1] = -center[1] if margin == "none": margin = None elif "," in margin: margin = list(map(float, margin.split(","))) else: margin = float(margin) zones[zone_name] = { "center": center, "margin": margin, "class": self.category_index[self.classes_map[d_class]], } except Exception: self.log.exception(traceback.format_exc()) self.log.exception(f"region {zone_name!r} in recipe file {self.recipe_path!r} could not be parsed. spec: {zone_spec!r}") return zones def parse_labels(self, config=None): if config is None: config = {} labels = {} for label_name, label_spec in config.items(): try: location, font_size, fill_color, border_color, border_thickness, text = label_spec.split(" ", 5) location = list(map(float, location.split(","))) location[1] = -location[1] font_size = float(font_size) fill_color = QColor(fill_color.replace("0x", "#")) border_color = QColor(border_color.replace("0x", "#")) border_thickness = float(border_thickness) text = text.replace("\\n", "\n").replace("\\t", "\t") labels[label_name] = { "location": location, "font_size": font_size, "fill_color": fill_color, "border_color": border_color, "border_thickness": border_thickness, "text": text, } except Exception: self.log.exception(traceback.format_exc()) self.log.exception(f"label {label_name!r} in recipe file {self.recipe_path!r} could not be parsed. spec: {label_spec!r}") return labels def zone_center(self, zone): return (int((zone["xmax"] + zone["xmin"]) / 2), int((zone["ymax"] + zone["ymin"]) / 2)) def get_center(self, rect): return [(rect[0] + rect[2]) / 2, (rect[1] + rect[3]) / 2] def get_distance(self, p1, p2): return pow((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2, 1 / 2) def load_model(self, model=None): # print("VISION CONSUMER", str(int(QThread.currentThreadId())), flush=True) self.log.info(f"requested neural network: {model!r}") if model is None or model.lower() in [ "", "any", "last", "latest", "newest", "none", ]: model_name = sorted([d for d in os.listdir(self.models_dir) if os.path.isdir(self.models_dir / d)], reverse=True)[0] self.log.info(f"loading neural network: {model_name!r}") self.loading_model_signal.emit({"status": "loading"}) self.lock.lock() tf_mode = None # reset tflite variables interpreter = None if tf_mode is None and "edgetpu" in self.allowed_modes: try: # create tflite edgetpu interpreter interpreter = make_interpreter(str(self.models_dir / model_name / f"{model_name}_edgetpu.tflite")) tf_mode = "edgetpu" except Exception: self.log.exception(traceback.format_exc()) if tf_mode is None and "tflite_cpu" in self.allowed_modes: try: # create tflite cpu interpreter interpreter = Interpreter(str(self.models_dir / model_name / f"{model_name}.tflite")) tf_mode = "tflite_cpu" except Exception: self.log.exception(traceback.format_exc()) # reset tensorflow variables model = None if tf_mode is None and "normal" in self.allowed_modes: try: # create tensorflow model model = tf.saved_model.load(str(self.models_dir / model_name)) tf_mode = "normal" except Exception: self.log.exception(traceback.format_exc()) self.lock.unlock() if tf_mode is None: raise RuntimeError("failed initialize any neural network model") self.tf_mode = tf_mode self.model_name = model_name if interpreter is not None: # if there is a new tflite interpreter initialize it and the related values self.tflite_input_details = interpreter.get_input_details() self.tflite_output_details = interpreter.get_output_details() self.inf_index = self.tflite_input_details[0]["index"] self.inf_shape = self.tflite_input_details[0]["shape"] # interpreter.resize_tensor_input(self.inf_index, self.inf_shape) interpreter.allocate_tensors() interpreter.invoke() # warmup else: self.tflite_input_details = None self.tflite_output_details = None self.inf_index = None self.inf_shape = None self.interpreter = interpreter # if there is a new model to be used, remove previous model if present if model is not None and self.model is not None: tf.keras.backend.clear_session() self.model = model self.log.info(f"initialized model {self.model!r} with mode {self.tf_mode!r}") self.loading_model_signal.emit({"status": "done"}) def check_features(self, image, lock=True): if image.shape != self.inf_shape[1:3]: image = cv2.resize(image, self.inf_shape[1:3], interpolation=cv2.INTER_LINEAR) tensor = np.expand_dims(np.asarray(image), axis=0) # Run inference if lock: self.lock.lock() if self.tf_mode in {"edgetpu", "tflite_cpu"}: self.interpreter.set_tensor(self.inf_index, tensor) self.interpreter.invoke() objs = detect.get_objects(self.interpreter, self.detection_threshold, (1, 1)) if lock: self.lock.unlock() boxes = [(obj.bbox.ymin / self.inf_shape[1], obj.bbox.xmin / self.inf_shape[2], obj.bbox.ymax / self.inf_shape[1], obj.bbox.xmax / self.inf_shape[2]) for obj in objs] classes = [obj.id + 1 for obj in objs] scores = [obj.score for obj in objs] detections = { "detection_boxes": [boxes], "detection_classes": [classes], "detection_scores": [scores], } else: detections = self.model(tensor) if lock: self.lock.unlock() # WARNING: results other than the ones related to tensor[-1] will be discarded parsed_detections = [] for d_box, d_class, d_score in zip( # , d_mask in zip( detections["detection_boxes"][-1], detections["detection_classes"][-1], detections["detection_scores"][-1], # detections["detection_masks"][-1], ): if d_score < self.detection_threshold: continue box = list(d_box) # box = d_box.numpy().tolist() center = self.get_center(box) detection = { "class": self.category_index[int(d_class)], "score": float(d_score), # "score": d_score.numpy().tolist(), # "mask": d_mask.numpy().tolist(), "box": box, "center": center, } parsed_detections.append(detection) return {"result": parsed_detections, "shape": tensor[-1].shape, "ok": False, "tensor": tensor[-1]} def _get(self, data): # print("VISION", str(int(QThread.currentThreadId())), flush=True) self.consumer.add_consumable(data[-1][list(self.sources)[0]]) super()._get(emit=False) def process_consumed(self, data=None): # print("VISION", str(int(QThread.currentThreadId())), flush=True) if data is None: return data = data[-1][self.consumer.name] if data is not None: super()._get([{ "frame": data["consumed"], "vision": data["result"], }])