st-ten-1/src/components/vision_saver.py
matteo porta 05dde48ee6 lights
2022-06-29 11:02:58 +02:00

71 lines
3.3 KiB
Python
Executable File

import glob
import os
import shutil
from datetime import datetime
from pathlib import Path
import cv2
import numpy as np
from .component import Component
class VisionSaver(Component):
def __init__(self, config=None, name=None):
super().__init__(config=config, name=name, threaded=False)
def config_changed(self):
self.location = Path(self.config[self.name]["path"])
os.makedirs(self.location, exist_ok=True)
self.mask_zones = self.config[self.name].get("mask_zones", None)
self.minimum_disk_free_space_gb = self.config[self.name].get("minimum_disk_free_space_gb", None)
if self.minimum_disk_free_space_gb is not None:
self.minimum_disk_free_space_gb = float(self.minimum_disk_free_space_gb)
self.time_format = self.config[self.name]["time_format"]
def save(self, save_time, img, mask=True):
timestamp = datetime.fromtimestamp(save_time).strftime(self.time_format)
save_dir = self.location / save_time.strftime("%Y") / save_time.strftime("%m")
os.makedirs(save_dir, exist_ok=True)
out_path = save_dir / f"{timestamp}.png"
self.log.info(f"saving {out_path}")
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
if mask and self.mask_zones is not None:
height, width, channels = img.shape
out = np.full(
[height, width, channels],
[0] * channels
)
for zone_name in self.mask_zones:
zone = self.bench.zones[zone_name]["box"]
out[zone[1]:zone[3], zone[0]:zone[2]] = img[zone[1]:zone[3], zone[0]:zone[2]]
else:
out = img
cv2.imwrite(out_path, out)
return out_path
def remove_older_images_if_needed(self):
if self.minimum_disk_free_space_gb is None:
return
minimum_disk_free_bytes = self.minimum_disk_free_space_gb * 10**9
archive = os.path.abspath(self.location)
free = shutil.disk_usage(archive)[-1]
if free < minimum_disk_free_bytes:
self.log.warning(f"LOW DISK SPACE {(free / 10 ** 9):3.2f}GB/{(minimum_disk_free_bytes / 10 ** 9):3.2f}GB), removing older vision saves")
sections = sorted([os.path.dirname(section) for section in glob.glob(f"{archive}/*/")])
years = sorted({os.path.basename(os.path.dirname(year)) for section in sections for year in glob.glob(f"{section}/*/")})
while free < minimum_disk_free_bytes and len(years) > 0:
year = years.pop(0)
months = sorted({os.path.basename(os.path.dirname(month)) for section in sections for month in glob.glob(f"{section}/{year}/*/")})
while free < minimum_disk_free_bytes and len(months) > 0:
month = months.pop(0)
for section in sections:
self.log.info(f"REMOVING '{section}/{year}/{month}'")
shutil.rmtree(f"{section}/{year}/{month}", ignore_errors=True)
free = shutil.disk_usage(archive)[-1]
if len(months) == 0:
for section in sections:
self.log.info(f"REMOVING '{section}/{year}'")
shutil.rmtree(f"{section}/{year}", ignore_errors=True)
free = shutil.disk_usage(archive)[-1]