st-ten-1/src/components/vision_saver.py
matteo porta af6951c8f9 wip
2022-10-03 17:17:47 +02:00

110 lines
5.1 KiB
Python
Executable File

import glob
import json
import os
import shutil
from datetime import datetime
from pathlib import Path
import cv2
import numpy as np
from .component import Component
class VisionSaver(Component):
def __init__(self, config=None, name=None):
super().__init__(config=config, name=name, threaded=False)
def config_changed(self):
self.location = Path(self.config[self.name]["path"])
os.makedirs(self.location, exist_ok=True)
self.resize_resolution = self.config[self.name].get("resize_resolution", None)
if self.resize_resolution is not None:
self.resize_resolution = list(map(int, self.resize_resolution.split("x")))
self.mask_zones = self.config[self.name].get("mask_zones", None)
self.minimum_disk_free_space_gb = self.config[self.name].get("minimum_disk_free_space_gb", None)
if self.minimum_disk_free_space_gb is not None:
self.minimum_disk_free_space_gb = float(self.minimum_disk_free_space_gb)
self.time_format = self.config[self.name]["time_format"]
@Component.reconfig_on_error
def save(self, save_time=None, suffix=None, frame=None, vision=None, resize=None, mask=None):
self.remove_older_images_if_needed()
if type(save_time) is None:
save_time = datetime.now()
else:
if type(save_time) is float:
save_time = int(save_time)
if type(save_time) is int:
save_time = datetime.fromtimestamp(save_time)
if type(save_time) is not datetime:
raise ValueError(f"save_time must be float int or datetime, not {type(save_time)}")
timestamp = save_time.strftime(self.time_format)
save_dir = self.location / save_time.strftime("%Y") / save_time.strftime("%m")
os.makedirs(save_dir, exist_ok=True)
out_paths = []
if frame is not None:
if suffix is not None:
out_paths.append(save_dir / f"{timestamp}.{suffix}.png")
else:
out_paths.append(save_dir / f"{timestamp}.png")
self.log.info(f"saving {out_paths[-1]}")
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
# resize
if resize is None or resize is True:
resize = self.resize_resolution
elif resize is False:
resize = None
if resize is not None:
frame = cv2.resize(frame, resize, interpolation=cv2.INTER_LINEAR)
# mask
if mask is None or mask is True:
mask = self.mask_zones
elif mask is False:
mask = None
if mask is not None:
height, width, channels = frame.shape
out = np.full([height, width, channels], [0] * channels)
for zone_name in mask:
zone = self.bench.zones[zone_name]["box"]
out[zone[1]:zone[3], zone[0]:zone[2]] = frame[zone[1]:zone[3], zone[0]:zone[2]]
else:
out = frame
# save frame
cv2.imwrite(str(out_paths[-1]), out)
if vision is not None:
if suffix is not None:
out_paths.append(save_dir / f"{timestamp}.{suffix}.json")
else:
out_paths.append(save_dir / f"{timestamp}.json")
self.log.info(f"saving {out_paths[-1]}")
# save vision
with open(out_paths[-1], "w") as f:
json.dump(vision, f)
return list(map(str, out_paths))
def remove_older_images_if_needed(self):
if self.minimum_disk_free_space_gb is None:
return
minimum_disk_free_bytes = self.minimum_disk_free_space_gb * 10**9
archive = os.path.abspath(self.location)
free = shutil.disk_usage(archive)[-1]
if free < minimum_disk_free_bytes:
self.log.warning(f"LOW DISK SPACE {(free / 10 ** 9):3.2f}GB/{(minimum_disk_free_bytes / 10 ** 9):3.2f}GB), removing older vision saves")
sections = sorted([os.path.dirname(section) for section in glob.glob(f"{archive}/*/")])
years = sorted({os.path.basename(os.path.dirname(year)) for section in sections for year in glob.glob(f"{section}/*/")})
while free < minimum_disk_free_bytes and len(years) > 0:
year = years.pop(0)
months = sorted({os.path.basename(os.path.dirname(month)) for section in sections for month in glob.glob(f"{section}/{year}/*/")})
while free < minimum_disk_free_bytes and len(months) > 0:
month = months.pop(0)
for section in sections:
self.log.info(f"REMOVING '{section}/{year}/{month}'")
shutil.rmtree(f"{section}/{year}/{month}", ignore_errors=True)
free = shutil.disk_usage(archive)[-1]
if len(months) == 0:
for section in sections:
self.log.info(f"REMOVING '{section}/{year}'")
shutil.rmtree(f"{section}/{year}", ignore_errors=True)
free = shutil.disk_usage(archive)[-1]