242 lines
10 KiB
Python
242 lines
10 KiB
Python
import pathlib
|
|
import sys
|
|
from itertools import cycle
|
|
|
|
import cv2
|
|
import imutils
|
|
import numpy as np
|
|
from PyQt5.QtCore import QMutex, Qt, QThread, pyqtSignal
|
|
from PyQt5.QtGui import QImage, QPixmap
|
|
from PyQt5.QtWidgets import (QDialog, QFormLayout, QLabel, QMessageBox,
|
|
QPushButton, QSizePolicy, QSlider)
|
|
|
|
if "--sim-camera" in sys.argv:
|
|
from components.dummies.cv2 import DummyVideoCapture as VideoCapture
|
|
else:
|
|
from cv2 import VideoCapture
|
|
|
|
from datetime import datetime
|
|
|
|
from .component import Component
|
|
|
|
|
|
class UVCCamera(Component):
|
|
_edits_new_frame = pyqtSignal(object)
|
|
|
|
def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True, registers=None):
|
|
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded)
|
|
self.lock = QMutex()
|
|
self.simulate = "--sim-camera" in sys.argv
|
|
self._last_frame = None
|
|
self.camera = None
|
|
|
|
def config_changed(self):
|
|
if self.camera is None:
|
|
self.camera = VideoCapture(0)
|
|
if self.camera is None:
|
|
raise AssertionError(f"camera not detected: 0")
|
|
if self.simulate:
|
|
self.camera.sim_imgs = cycle(sorted(pathlib.Path("data/simulation_images/").glob("*.png")))
|
|
self.resolution = {
|
|
"w": int(self.round_4(self.config[self.name]["horizontal_resolution"])),
|
|
"h": int(self.round_4(self.config[self.name]["vertical_resolution"])),
|
|
}
|
|
self._period = int(self.config[self.name].get("frame_time_ms", 300)) / 1000
|
|
exposure_time = self.config[self.name].get("exposure_time", None)
|
|
self.exposure_time = int(exposure_time) if exposure_time is not None else None
|
|
focus = self.config[self.name].get("focus", None)
|
|
self.focus = int(focus) if focus is not None else None
|
|
self.roi = {
|
|
"x": int(self.round_4(self.config[self.name].get("horizontal_crop_offset", 0))),
|
|
"w": int(self.round_4(self.config[self.name].get("horizontal_crop_resolution", self.resolution["w"]))),
|
|
"y": int(self.round_4(self.config[self.name].get("vertical_crop_offset", 0))),
|
|
"h": int(self.round_4(self.config[self.name].get("vertical_crop_resolution", self.resolution["h"]))),
|
|
"r": int(self.config[self.name].get("rotate_90_clockwise_times", 0)),
|
|
}
|
|
self.auto_white_balance = self.config[self.name].get("auto_white_balance", "").lower() in {"on", "1", "y", "yes", "true", "enable", "enabled", }
|
|
self.balance = {
|
|
"r": float(self.config[self.name].get("balance_red", 1)),
|
|
"g": float(self.config[self.name].get("balance_green", 1)),
|
|
"b": float(self.config[self.name].get("balance_blue", 1)),
|
|
}
|
|
self.lock.lock()
|
|
self.camera.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter.fourcc("M", "J", "P", "G"))
|
|
self.camera.set(cv2.CAP_PROP_FPS, 15)
|
|
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution["w"])
|
|
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution["h"])
|
|
#self.camera.set(cv2.CAP_PROP_AUTOFOCUS, 1)
|
|
if self.exposure_time is not None:
|
|
self.camera.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0)
|
|
self.camera.set(cv2.CAP_PROP_EXPOSURE, self.exposure_time)
|
|
else:
|
|
self.camera.set(cv2.CAP_PROP_AUTO_EXPOSURE, 3)
|
|
if self.focus is not None:
|
|
self.camera.set(cv2.CAP_PROP_AUTOFOCUS, 0)
|
|
self.camera.set(cv2.CAP_PROP_FOCUS, self.focus)
|
|
else:
|
|
self.camera.set(cv2.CAP_PROP_AUTOFOCUS, 1)
|
|
self.camera.set(cv2.CAP_PROP_GAIN, 1.0)
|
|
if self.auto_white_balance: # ONCE
|
|
self.camera.set(cv2.CAP_PROP_AUTO_WB, 1)
|
|
QThread.msleep(3000)
|
|
self.balance["r"] = self.camera.get(cv2.CAP_PROP_WHITE_BALANCE_RED_V)
|
|
self.balance["g"] = 1.0 # self.camera.get(cv2.)
|
|
self.balance["b"] = self.camera.get(cv2.CAP_PROP_WHITE_BALANCE_BLUE_U)
|
|
self.camera.set(cv2.CAP_PROP_AUTO_WB, 0)
|
|
else:
|
|
self.camera.set(cv2.CAP_PROP_WHITE_BALANCE_RED_V, self.balance["r"])
|
|
# self.camera.set(cv2., self.balance["g"])
|
|
self.camera.set(cv2.CAP_PROP_WHITE_BALANCE_BLUE_U, self.balance["b"])
|
|
self.gamma_lut = self.camera.get(cv2.CAP_PROP_GAMMA)
|
|
self.contrast_lut = self.camera.get(cv2.CAP_PROP_CONTRAST)
|
|
# self.color_correction = self.camera.get(cv2.)
|
|
self.edits = None
|
|
self.lock.unlock()
|
|
self.edits_enabled = "--camera-edits" in sys.argv
|
|
if self.edits_enabled:
|
|
self.init_edits()
|
|
|
|
@staticmethod
|
|
def round_4(x):
|
|
return 4 * round(int(x) / 4)
|
|
|
|
@Component.reconfig_on_error
|
|
def _get(self):
|
|
# print("UVC CAMERA", str(int(QThread.currentThreadId())), flush=True)
|
|
frame = None
|
|
self.lock.lock()
|
|
ret, frame = self.camera.read()
|
|
if ret and not self.simulate:
|
|
frame = frame[self.roi["y"]:self.roi["y"] + self.roi["h"], self.roi["x"]:self.roi["x"] + self.roi["w"]]
|
|
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
frame = np.rot90(frame, self.roi["r"])
|
|
self.lock.unlock()
|
|
if not ret:
|
|
self.log.error("failed to get frame")
|
|
elif self.edits_enabled:
|
|
frame = self.edit(frame, self.edits)
|
|
self._edits_new_frame.emit([frame])
|
|
super()._get([frame])
|
|
|
|
def __del__(self, event=None):
|
|
if self.camera is not None:
|
|
self.camera.release()
|
|
|
|
def init_edits(self):
|
|
self.edits_enabled = True
|
|
self.edits_dialog = EditsDialog(self.roi)
|
|
self.edits_dialog.edits_changed.connect(self.set_edits)
|
|
self._edits_new_frame.connect(self.edits_dialog.save_and_show_edits_new_frame)
|
|
self.edits_dialog.edits_pause.connect(self.toggle_paused)
|
|
|
|
def toggle_paused(self):
|
|
if self.running:
|
|
self.pause()
|
|
else:
|
|
self.resume()
|
|
|
|
def set_edits(self, edits=None):
|
|
self.edits = edits
|
|
|
|
def edit(self, img, edits=None):
|
|
if edits is None:
|
|
return img
|
|
# BRIGHTNESS AND CONTRAST
|
|
contrast = float(edits.get("contrast", 0))
|
|
brightness = float(edits.get("brightness", 0))
|
|
if not (contrast == 0 and brightness == 0):
|
|
img = imutils.adjust_brightness_contrast(img, brightness=brightness, contrast=contrast / 255 * 500)
|
|
# ROTATE AND SCALE
|
|
rotation = -float(edits.get("rotation", 0))
|
|
scale = float(edits.get("scale", 1))
|
|
if not (rotation == 0 and scale == 1):
|
|
img = imutils.rotate(img, rotation, scale=scale)
|
|
# TRANSLATE
|
|
translation_x = float(edits.get("translation_x", 0))
|
|
translation_y = float(edits.get("translation_y", 0))
|
|
if not (translation_x == 0 and translation_y == 0):
|
|
img = imutils.translate(img, translation_x, translation_y)
|
|
return img
|
|
|
|
|
|
class EditsDialog(QDialog):
|
|
edits_changed = pyqtSignal(object)
|
|
edits_pause = pyqtSignal()
|
|
|
|
def __init__(self, roi):
|
|
super().__init__()
|
|
self.frame = None
|
|
self.edits = {
|
|
"brightness": 0,
|
|
"contrast": 0,
|
|
"rotation": 0,
|
|
"scale": 1,
|
|
"translation_x": 0,
|
|
"translation_y": 0,
|
|
}
|
|
self.edits_specs = {
|
|
"brightness": [[-255, 255], 1, QSlider(Qt.Horizontal), QLabel()],
|
|
"contrast": [[-255, 255], 1, QSlider(Qt.Horizontal), QLabel()],
|
|
"rotation": [[-180, 180], 1, QSlider(Qt.Horizontal), QLabel()],
|
|
"scale": [[0, 5], 100, QSlider(Qt.Horizontal), QLabel()],
|
|
"translation_x": [[-roi["w"], roi["w"]], 1, QSlider(Qt.Horizontal), QLabel()],
|
|
"translation_y": [[-roi["h"], roi["h"]], 1, QSlider(Qt.Horizontal), QLabel()],
|
|
}
|
|
layout = QFormLayout()
|
|
self.edits_frame_l = QLabel()
|
|
self.edits_frame_l.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
|
|
layout.addRow(self.edits_frame_l)
|
|
self.edits_pause_b = QPushButton("toggle paused")
|
|
layout.addRow(self.edits_pause_b)
|
|
self.edits_pause_b.clicked.connect(self.edits_pause)
|
|
for edit, limits in self.edits_specs.items():
|
|
limit, multiplier, slider, label = limits
|
|
slider.setRange(limit[0] * multiplier, limit[1] * multiplier)
|
|
slider.setSingleStep(1)
|
|
slider.setValue(self.edits[edit] * multiplier)
|
|
label.setText(f"{edit}: {self.edits[edit]}")
|
|
layout.addRow(label, slider)
|
|
slider.valueChanged.connect(self.update_edits)
|
|
self.edits_save_frame_b = QPushButton("save frame")
|
|
layout.addRow(self.edits_save_frame_b)
|
|
self.edits_save_frame_b.clicked.connect(self.edits_save_frame)
|
|
self.setLayout(layout)
|
|
self.update_edits()
|
|
self.show()
|
|
|
|
def update_edits(self):
|
|
for edit, limits in self.edits_specs.items():
|
|
limit, multiplier, slider, label = limits
|
|
self.edits[edit] = slider.value() / multiplier
|
|
label.setText(f"{edit}: {self.edits[edit]}")
|
|
self.edits_changed.emit(self.edits)
|
|
|
|
def save_and_show_edits_new_frame(self, frame):
|
|
self.frame = frame[0]
|
|
if self.frame is not None:
|
|
self.edits_frame_l.setPixmap(
|
|
QPixmap.fromImage(
|
|
QImage(
|
|
self.frame.data,
|
|
self.frame.shape[1], # width
|
|
self.frame.shape[0], # height
|
|
self.frame.shape[2] * self.frame.shape[1], # width * channels
|
|
QImage.Format_RGB888
|
|
)
|
|
).scaled(
|
|
max(self.edits_frame_l.width(), 640),
|
|
max(self.edits_frame_l.height(), 480),
|
|
Qt.KeepAspectRatio,
|
|
Qt.SmoothTransformation
|
|
)
|
|
)
|
|
|
|
def edits_save_frame(self):
|
|
if self.frame is None:
|
|
return
|
|
out_path = f"./{datetime.now().isoformat()}.png"
|
|
self.log.info(f"saving frame: {out_path!r}")
|
|
img = cv2.cvtColor(self.frame, cv2.COLOR_RGB2BGR)
|
|
cv2.imwrite(out_path, img)
|
|
return out_path
|