st-ten-1/src/components/vision_saver.py

110 lines
5.1 KiB
Python
Raw Normal View History

2022-06-01 16:37:19 +00:00
import glob
2022-08-02 16:15:30 +00:00
import json
2022-06-01 16:37:19 +00:00
import os
import shutil
from datetime import datetime
from pathlib import Path
import cv2
import numpy as np
from .component import Component
class VisionSaver(Component):
def __init__(self, config=None, name=None):
super().__init__(config=config, name=name, threaded=False)
def config_changed(self):
2022-06-29 09:02:58 +00:00
self.location = Path(self.config[self.name]["path"])
2022-06-01 16:37:19 +00:00
os.makedirs(self.location, exist_ok=True)
2022-08-01 11:29:12 +00:00
self.resize_resolution = self.config[self.name].get("resize_resolution", None)
if self.resize_resolution is not None:
self.resize_resolution = list(map(int, self.resize_resolution.split("x")))
2022-06-29 09:02:58 +00:00
self.mask_zones = self.config[self.name].get("mask_zones", None)
self.minimum_disk_free_space_gb = self.config[self.name].get("minimum_disk_free_space_gb", None)
2022-06-01 16:37:19 +00:00
if self.minimum_disk_free_space_gb is not None:
self.minimum_disk_free_space_gb = float(self.minimum_disk_free_space_gb)
2022-06-29 09:02:58 +00:00
self.time_format = self.config[self.name]["time_format"]
2022-06-01 16:37:19 +00:00
@Component.reconfig_on_error
2022-08-02 16:15:30 +00:00
def save(self, save_time=None, suffix=None, frame=None, vision=None, resize=None, mask=None):
self.remove_older_images_if_needed()
if type(save_time) is None:
save_time = datetime.now()
else:
if type(save_time) is float:
save_time = int(save_time)
if type(save_time) is int:
save_time = datetime.fromtimestamp(save_time)
2022-08-01 11:29:12 +00:00
if type(save_time) is not datetime:
raise ValueError(f"save_time must be float int or datetime, not {type(save_time)}")
timestamp = save_time.strftime(self.time_format)
2022-06-01 16:37:19 +00:00
save_dir = self.location / save_time.strftime("%Y") / save_time.strftime("%m")
os.makedirs(save_dir, exist_ok=True)
2022-08-02 16:15:30 +00:00
out_paths = []
if frame is not None:
if suffix is not None:
out_paths.append(save_dir / f"{timestamp}.{suffix}.png")
else:
out_paths.append(save_dir / f"{timestamp}.png")
self.log.info(f"saving {out_paths[-1]}")
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
# resize
if resize is None or resize is True:
resize = self.resize_resolution
elif resize is False:
resize = None
if resize is not None:
frame = cv2.resize(frame, resize, interpolation=cv2.INTER_LINEAR)
# mask
if mask is None or mask is True:
mask = self.mask_zones
elif mask is False:
mask = None
if mask is not None:
height, width, channels = frame.shape
out = np.full([height, width, channels], [0] * channels)
for zone_name in mask:
zone = self.bench.zones[zone_name]["box"]
out[zone[1]:zone[3], zone[0]:zone[2]] = frame[zone[1]:zone[3], zone[0]:zone[2]]
else:
out = frame
# save frame
cv2.imwrite(str(out_paths[-1]), out)
if vision is not None:
if suffix is not None:
out_paths.append(save_dir / f"{timestamp}.{suffix}.json")
else:
out_paths.append(save_dir / f"{timestamp}.json")
self.log.info(f"saving {out_paths[-1]}")
# save vision
with open(out_paths[-1], "w") as f:
json.dump(vision, f)
2022-10-03 11:48:59 +00:00
return list(map(str, out_paths))
2022-06-01 16:37:19 +00:00
def remove_older_images_if_needed(self):
if self.minimum_disk_free_space_gb is None:
return
minimum_disk_free_bytes = self.minimum_disk_free_space_gb * 10**9
archive = os.path.abspath(self.location)
free = shutil.disk_usage(archive)[-1]
if free < minimum_disk_free_bytes:
self.log.warning(f"LOW DISK SPACE {(free / 10 ** 9):3.2f}GB/{(minimum_disk_free_bytes / 10 ** 9):3.2f}GB), removing older vision saves")
sections = sorted([os.path.dirname(section) for section in glob.glob(f"{archive}/*/")])
years = sorted({os.path.basename(os.path.dirname(year)) for section in sections for year in glob.glob(f"{section}/*/")})
while free < minimum_disk_free_bytes and len(years) > 0:
year = years.pop(0)
months = sorted({os.path.basename(os.path.dirname(month)) for section in sections for month in glob.glob(f"{section}/{year}/*/")})
while free < minimum_disk_free_bytes and len(months) > 0:
month = months.pop(0)
for section in sections:
self.log.info(f"REMOVING '{section}/{year}/{month}'")
shutil.rmtree(f"{section}/{year}/{month}", ignore_errors=True)
free = shutil.disk_usage(archive)[-1]
if len(months) == 0:
for section in sections:
self.log.info(f"REMOVING '{section}/{year}'")
shutil.rmtree(f"{section}/{year}", ignore_errors=True)
free = shutil.disk_usage(archive)[-1]