import pathlib import sys from itertools import cycle import cv2 import imutils import numpy as np from PyQt5.QtCore import QMutex, Qt, QThread, pyqtSignal from PyQt5.QtGui import QImage, QPixmap from PyQt5.QtWidgets import (QDialog, QFormLayout, QLabel, QMessageBox, QPushButton, QSizePolicy, QSlider) if "--sim-camera" in sys.argv: from components.dummies.cv2 import DummyVideoCapture as VideoCapture else: from cv2 import VideoCapture from datetime import datetime from .component import Component class UVCCamera(Component): _edits_new_frame = pyqtSignal(object) def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True, registers=None): super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded) self.lock = QMutex() self.simulate = "--sim-camera" in sys.argv self._last_frame = None self.camera = None def config_changed(self): if self.camera is None: self.camera = VideoCapture(0) if self.camera is None: raise AssertionError(f"camera not detected: 0") if self.simulate: self.camera.sim_imgs = cycle(sorted(pathlib.Path("data/simulation_images/").glob("*.png"))) self.resolution = { "w": int(self.round_4(self.config[self.name]["horizontal_resolution"])), "h": int(self.round_4(self.config[self.name]["vertical_resolution"])), } self._period = int(self.config[self.name].get("frame_time_ms", 300)) / 1000 exposure_time = self.config[self.name].get("exposure_time", None) self.exposure_time = int(exposure_time) if exposure_time is not None else None focus = self.config[self.name].get("focus", None) self.focus = int(focus) if focus is not None else None self.roi = { "x": int(self.round_4(self.config[self.name].get("horizontal_crop_offset", 0))), "w": int(self.round_4(self.config[self.name].get("horizontal_crop_resolution", self.resolution["w"]))), "y": int(self.round_4(self.config[self.name].get("vertical_crop_offset", 0))), "h": int(self.round_4(self.config[self.name].get("vertical_crop_resolution", self.resolution["h"]))), "r": int(self.config[self.name].get("rotate_90_clockwise_times", 0)), } self.auto_white_balance = self.config[self.name].get("auto_white_balance", "").lower() in {"on", "1", "y", "yes", "true", "enable", "enabled", } self.balance = { "r": float(self.config[self.name].get("balance_red", 1)), "g": float(self.config[self.name].get("balance_green", 1)), "b": float(self.config[self.name].get("balance_blue", 1)), } self.lock.lock() self.camera.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter.fourcc("M", "J", "P", "G")) self.camera.set(cv2.CAP_PROP_FPS, 15) self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution["w"]) self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution["h"]) #self.camera.set(cv2.CAP_PROP_AUTOFOCUS, 1) if self.exposure_time is not None: self.camera.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0) self.camera.set(cv2.CAP_PROP_EXPOSURE, self.exposure_time) else: self.camera.set(cv2.CAP_PROP_AUTO_EXPOSURE, 3) if self.focus is not None: self.camera.set(cv2.CAP_PROP_AUTOFOCUS, 0) self.camera.set(cv2.CAP_PROP_FOCUS, self.focus) else: self.camera.set(cv2.CAP_PROP_AUTOFOCUS, 1) self.camera.set(cv2.CAP_PROP_GAIN, 1.0) if self.auto_white_balance: # ONCE self.camera.set(cv2.CAP_PROP_AUTO_WB, 1) QThread.msleep(3000) self.balance["r"] = self.camera.get(cv2.CAP_PROP_WHITE_BALANCE_RED_V) self.balance["g"] = 1.0 # self.camera.get(cv2.) self.balance["b"] = self.camera.get(cv2.CAP_PROP_WHITE_BALANCE_BLUE_U) self.camera.set(cv2.CAP_PROP_AUTO_WB, 0) else: self.camera.set(cv2.CAP_PROP_WHITE_BALANCE_RED_V, self.balance["r"]) # self.camera.set(cv2., self.balance["g"]) self.camera.set(cv2.CAP_PROP_WHITE_BALANCE_BLUE_U, self.balance["b"]) self.gamma_lut = self.camera.get(cv2.CAP_PROP_GAMMA) self.contrast_lut = self.camera.get(cv2.CAP_PROP_CONTRAST) # self.color_correction = self.camera.get(cv2.) self.edits = None self.lock.unlock() self.edits_enabled = "--camera-edits" in sys.argv if self.edits_enabled: self.init_edits() @staticmethod def round_4(x): return 4 * round(int(x) / 4) @Component.reconfig_on_error def _get(self): # print("UVC CAMERA", str(int(QThread.currentThreadId())), flush=True) frame = None self.lock.lock() ret, frame = self.camera.read() if ret and not self.simulate: frame = frame[self.roi["y"]:self.roi["y"] + self.roi["h"], self.roi["x"]:self.roi["x"] + self.roi["w"]] frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frame = np.rot90(frame, self.roi["r"]) self.lock.unlock() if not ret: self.log.error("failed to get frame") elif self.edits_enabled: frame = self.edit(frame, self.edits) self._edits_new_frame.emit([frame]) super()._get([frame]) def __del__(self, event=None): if self.camera is not None: self.camera.release() def init_edits(self): self.edits_enabled = True self.edits_dialog = EditsDialog(self.roi) self.edits_dialog.edits_changed.connect(self.set_edits) self._edits_new_frame.connect(self.edits_dialog.save_and_show_edits_new_frame) self.edits_dialog.edits_pause.connect(self.toggle_paused) def toggle_paused(self): if self.running: self.pause() else: self.resume() def set_edits(self, edits=None): self.edits = edits def edit(self, img, edits=None): if edits is None: return img # BRIGHTNESS AND CONTRAST contrast = float(edits.get("contrast", 0)) brightness = float(edits.get("brightness", 0)) if not (contrast == 0 and brightness == 0): img = imutils.adjust_brightness_contrast(img, brightness=brightness, contrast=contrast / 255 * 500) # ROTATE AND SCALE rotation = -float(edits.get("rotation", 0)) scale = float(edits.get("scale", 1)) if not (rotation == 0 and scale == 1): img = imutils.rotate(img, rotation, scale=scale) # TRANSLATE translation_x = float(edits.get("translation_x", 0)) translation_y = float(edits.get("translation_y", 0)) if not (translation_x == 0 and translation_y == 0): img = imutils.translate(img, translation_x, translation_y) return img class EditsDialog(QDialog): edits_changed = pyqtSignal(object) edits_pause = pyqtSignal() def __init__(self, roi): super().__init__() self.frame = None self.edits = { "brightness": 0, "contrast": 0, "rotation": 0, "scale": 1, "translation_x": 0, "translation_y": 0, } self.edits_specs = { "brightness": [[-255, 255], 1, QSlider(Qt.Horizontal), QLabel()], "contrast": [[-255, 255], 1, QSlider(Qt.Horizontal), QLabel()], "rotation": [[-180, 180], 1, QSlider(Qt.Horizontal), QLabel()], "scale": [[0, 5], 100, QSlider(Qt.Horizontal), QLabel()], "translation_x": [[-roi["w"], roi["w"]], 1, QSlider(Qt.Horizontal), QLabel()], "translation_y": [[-roi["h"], roi["h"]], 1, QSlider(Qt.Horizontal), QLabel()], } layout = QFormLayout() self.edits_frame_l = QLabel() self.edits_frame_l.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding) layout.addRow(self.edits_frame_l) self.edits_pause_b = QPushButton("toggle paused") layout.addRow(self.edits_pause_b) self.edits_pause_b.clicked.connect(self.edits_pause) for edit, limits in self.edits_specs.items(): limit, multiplier, slider, label = limits slider.setRange(limit[0] * multiplier, limit[1] * multiplier) slider.setSingleStep(1) slider.setValue(self.edits[edit] * multiplier) label.setText(f"{edit}: {self.edits[edit]}") layout.addRow(label, slider) slider.valueChanged.connect(self.update_edits) self.edits_save_frame_b = QPushButton("save frame") layout.addRow(self.edits_save_frame_b) self.edits_save_frame_b.clicked.connect(self.edits_save_frame) self.setLayout(layout) self.update_edits() self.show() def update_edits(self): for edit, limits in self.edits_specs.items(): limit, multiplier, slider, label = limits self.edits[edit] = slider.value() / multiplier label.setText(f"{edit}: {self.edits[edit]}") self.edits_changed.emit(self.edits) def save_and_show_edits_new_frame(self, frame): self.frame = frame[0] if self.frame is not None: self.edits_frame_l.setPixmap( QPixmap.fromImage( QImage( self.frame.data, self.frame.shape[1], # width self.frame.shape[0], # height self.frame.shape[2] * self.frame.shape[1], # width * channels QImage.Format_RGB888 ) ).scaled( max(self.edits_frame_l.width(), 640), max(self.edits_frame_l.height(), 480), Qt.KeepAspectRatio, Qt.SmoothTransformation ) ) def edits_save_frame(self): if self.frame is None: return out_path = f"./{datetime.now().isoformat()}.png" self.log.info(f"saving frame: {out_path!r}") img = cv2.cvtColor(self.frame, cv2.COLOR_RGB2BGR) cv2.imwrite(out_path, img) return out_path