import glob import json import os import shutil from datetime import datetime from pathlib import Path # OpenCV is optional; only needed when saving frames try: import cv2 # noqa: F401 except Exception: cv2 = None # type: ignore import numpy as np from .component import Component class VisionSaver(Component): def __init__(self, config=None, name=None): super().__init__(config=config, name=name, threaded=False) def config_changed(self): self.location = Path(self.config[self.name]["path"]) os.makedirs(self.location, exist_ok=True) self.resize_resolution = self.config[self.name].get("resize_resolution", None) if self.resize_resolution is not None: self.resize_resolution = list(map(int, self.resize_resolution.split("x"))) self.mask_zones = self.config[self.name].get("mask_zones", None) self.minimum_disk_free_space_gb = self.config[self.name].get("minimum_disk_free_space_gb", None) if self.minimum_disk_free_space_gb is not None: self.minimum_disk_free_space_gb = float(self.minimum_disk_free_space_gb) self.time_format = self.config[self.name]["time_format"] @Component.reconfig_on_error def save(self, save_time=None, suffix=None, frame=None, vision=None, resize=None, mask=None,location=None): self.remove_older_images_if_needed() if save_time is None: save_time = datetime.now() else: if type(save_time) is float: save_time = int(save_time) if type(save_time) is int: save_time = datetime.fromtimestamp(save_time) if type(save_time) is str: save_time = datetime.now() if type(save_time) is not datetime: raise ValueError(f"save_time must be float int or datetime, not {type(save_time)}") timestamp = save_time.strftime(self.time_format) save_location = self.location if location is None else Path(location) save_dir = save_location / save_time.strftime("%Y") / save_time.strftime("%m") os.makedirs(save_dir, exist_ok=True) out_paths = [] if frame is not None: if suffix is not None: out_paths.append(save_dir / f"{timestamp}.{suffix}.png") else: out_paths.append(save_dir / f"{timestamp}.png") self.log.info(f"saving {out_paths[-1]}") frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) # resize if resize is None or resize is True: resize = self.resize_resolution elif resize is False: resize = None if resize is not None: frame = cv2.resize(frame, resize, interpolation=cv2.INTER_LINEAR) # mask if mask is None or mask is True: mask = self.mask_zones elif mask is False: mask = None if mask is not None: height, width, channels = frame.shape out = np.full([height, width, channels], [0] * channels) for zone_name in mask: zone = self.bench.zones[zone_name]["box"] out[zone[1]:zone[3], zone[0]:zone[2]] = frame[zone[1]:zone[3], zone[0]:zone[2]] else: out = frame # save frame cv2.imwrite(str(out_paths[-1]), out) if vision is not None: if suffix is not None: out_paths.append(save_dir / f"{timestamp}.{suffix}.json") else: out_paths.append(save_dir / f"{timestamp}.json") self.log.info(f"saving {out_paths[-1]}") # save vision with open(out_paths[-1], "w") as f: json.dump(vision, f) return list(map(str, out_paths)) def remove_older_images_if_needed(self): if self.minimum_disk_free_space_gb is None: return minimum_disk_free_bytes = self.minimum_disk_free_space_gb * 10**9 archive = os.path.abspath(self.location) free = shutil.disk_usage(archive)[-1] if free < minimum_disk_free_bytes: self.log.warning(f"LOW DISK SPACE {(free / 10 ** 9):3.2f}GB/{(minimum_disk_free_bytes / 10 ** 9):3.2f}GB), removing older vision saves") sections = sorted([os.path.dirname(section) for section in glob.glob(f"{archive}/*/")]) years = sorted({os.path.basename(os.path.dirname(year)) for section in sections for year in glob.glob(f"{section}/*/")}) while free < minimum_disk_free_bytes and len(years) > 0: year = years.pop(0) months = sorted({os.path.basename(os.path.dirname(month)) for section in sections for month in glob.glob(f"{section}/{year}/*/")}) while free < minimum_disk_free_bytes and len(months) > 0: month = months.pop(0) for section in sections: self.log.info(f"REMOVING '{section}/{year}/{month}'") shutil.rmtree(f"{section}/{year}/{month}", ignore_errors=True) free = shutil.disk_usage(archive)[-1] if len(months) == 0: for section in sections: self.log.info(f"REMOVING '{section}/{year}'") shutil.rmtree(f"{section}/{year}", ignore_errors=True) free = shutil.disk_usage(archive)[-1]