import glob import os import shutil from datetime import datetime from pathlib import Path import cv2 import numpy as np from .component import Component class VisionSaver(Component): def __init__(self, config=None, name=None): super().__init__(config=config, name=name, threaded=False) def config_changed(self): self.location = Path(self.config[self.name]["path"]) os.makedirs(self.location, exist_ok=True) self.mask_zones = self.config[self.name].get("mask_zones", None) self.minimum_disk_free_space_gb = self.config[self.name].get("minimum_disk_free_space_gb", None) if self.minimum_disk_free_space_gb is not None: self.minimum_disk_free_space_gb = float(self.minimum_disk_free_space_gb) self.time_format = self.config[self.name]["time_format"] def save(self, save_time, img, mask=True): timestamp = datetime.fromtimestamp(save_time).strftime(self.time_format) save_dir = self.location / save_time.strftime("%Y") / save_time.strftime("%m") os.makedirs(save_dir, exist_ok=True) out_path = save_dir / f"{timestamp}.png" self.log.info(f"saving {out_path}") img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) if mask and self.mask_zones is not None: height, width, channels = img.shape out = np.full( [height, width, channels], [0] * channels ) for zone_name in self.mask_zones: zone = self.bench.zones[zone_name]["box"] out[zone[1]:zone[3], zone[0]:zone[2]] = img[zone[1]:zone[3], zone[0]:zone[2]] else: out = img cv2.imwrite(out_path, out) return out_path def remove_older_images_if_needed(self): if self.minimum_disk_free_space_gb is None: return minimum_disk_free_bytes = self.minimum_disk_free_space_gb * 10**9 archive = os.path.abspath(self.location) free = shutil.disk_usage(archive)[-1] if free < minimum_disk_free_bytes: self.log.warning(f"LOW DISK SPACE {(free / 10 ** 9):3.2f}GB/{(minimum_disk_free_bytes / 10 ** 9):3.2f}GB), removing older vision saves") sections = sorted([os.path.dirname(section) for section in glob.glob(f"{archive}/*/")]) years = sorted({os.path.basename(os.path.dirname(year)) for section in sections for year in glob.glob(f"{section}/*/")}) while free < minimum_disk_free_bytes and len(years) > 0: year = years.pop(0) months = sorted({os.path.basename(os.path.dirname(month)) for section in sections for month in glob.glob(f"{section}/{year}/*/")}) while free < minimum_disk_free_bytes and len(months) > 0: month = months.pop(0) for section in sections: self.log.info(f"REMOVING '{section}/{year}/{month}'") shutil.rmtree(f"{section}/{year}/{month}", ignore_errors=True) free = shutil.disk_usage(archive)[-1] if len(months) == 0: for section in sections: self.log.info(f"REMOVING '{section}/{year}'") shutil.rmtree(f"{section}/{year}", ignore_errors=True) free = shutil.disk_usage(archive)[-1]