110 lines
5.2 KiB
Python
110 lines
5.2 KiB
Python
|
|
import json
|
||
|
|
import re
|
||
|
|
import sys
|
||
|
|
import traceback
|
||
|
|
|
||
|
|
import requests
|
||
|
|
from google.api_core.exceptions import Forbidden
|
||
|
|
from google.cloud import storage
|
||
|
|
from lib.db import Archive, db
|
||
|
|
from PyQt5.QtCore import QThread
|
||
|
|
from requests.adapters import HTTPAdapter, Retry
|
||
|
|
from urllib3.exceptions import InsecureRequestWarning
|
||
|
|
|
||
|
|
from .component import Component
|
||
|
|
|
||
|
|
# Suppress insecure request warning
|
||
|
|
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
|
||
|
|
|
||
|
|
|
||
|
|
class ArchiveSynchronizer(Component):
|
||
|
|
def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True):
|
||
|
|
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded)
|
||
|
|
self.simulate = "--sim-archiver" in sys.argv
|
||
|
|
|
||
|
|
def config_changed(self):
|
||
|
|
self.machine_id = self.config.machine_id
|
||
|
|
config = self.config["archive_synchronizer"]
|
||
|
|
self.archive_endpoint = config["archive_endpoint"]
|
||
|
|
self.images_path = config["images_path"]
|
||
|
|
self._do_set_period({"period": float(config["poll_time"])})
|
||
|
|
self.hold_time = round(float(config["hold_time"]) * 1000)
|
||
|
|
self.gcs_client = storage.Client.from_service_account_json(config["service_account_json"])
|
||
|
|
self.gcs_client._http.mount("", HTTPAdapter(max_retries=Retry(total=0))) # this seems to be useless
|
||
|
|
self.gcs_client._http.adapters.move_to_end("", last=False) # this seems to be useless
|
||
|
|
self.bucket_id = config["bucket_id"]
|
||
|
|
self.gcs_bucket = None
|
||
|
|
|
||
|
|
@db.connection_context()
|
||
|
|
@db.atomic()
|
||
|
|
def _get(self):
|
||
|
|
for record in Archive.select().where((Archive.archived != True) | (Archive.uploaded != True)): # using "is not True" breaks the query.
|
||
|
|
if not self.simulate:
|
||
|
|
if record.archived is not True:
|
||
|
|
record.archived = self.remote_archive(record) is True
|
||
|
|
if record.uploaded is not True:
|
||
|
|
record.uploaded = self.remote_store(record) is True
|
||
|
|
record.save()
|
||
|
|
if self.hold_time > 0:
|
||
|
|
QThread.msleep(self.hold_time)
|
||
|
|
self.gcs_bucket = None
|
||
|
|
super()._get()
|
||
|
|
|
||
|
|
def remote_archive(self, record):
|
||
|
|
try:
|
||
|
|
if not self.simulate:
|
||
|
|
with requests.Session() as s:
|
||
|
|
s.mount("", HTTPAdapter(max_retries=Retry(total=0))) # this disables retries
|
||
|
|
r = requests.post(self.archive_endpoint, params={
|
||
|
|
"machine_id": self.machine_id,
|
||
|
|
"time": record.time.isoformat(),
|
||
|
|
"user": record.user.username,
|
||
|
|
"recipe": record.recipe.name,
|
||
|
|
"test_data": json.dumps(record.test_data),
|
||
|
|
"result": record.result,
|
||
|
|
"overridden": record.overridden,
|
||
|
|
}, timeout=5, verify=False)
|
||
|
|
if r.status_code != 200:
|
||
|
|
raise AssertionError("bad status response")
|
||
|
|
except AssertionError as e:
|
||
|
|
self.log.warning(f"id: {record.id}: failed to archive remotely: {str(e)}: {r.status_code}: {r.content}")
|
||
|
|
return False
|
||
|
|
except (requests.ConnectionError, requests.Timeout) as e:
|
||
|
|
self.log.warning(f"id: {record.id}: failed to archive remotely, archive_endpoint might be unreachable: {str(e)}")
|
||
|
|
return False
|
||
|
|
except Exception:
|
||
|
|
self.log.error(f"id: {record.id}: failed to archive remotely:\n{traceback.format_exc()}:\n{r.status_code}: {r.content}")
|
||
|
|
return False
|
||
|
|
self.log.info(f"id: {record.id}: archived remotely")
|
||
|
|
return True
|
||
|
|
|
||
|
|
def remote_store(self, record):
|
||
|
|
try:
|
||
|
|
self.gcs_bucket = self.gcs_client.get_bucket(self.bucket_id, timeout=(5, 30), retry=None) # retry=None seems to be ignored
|
||
|
|
except Exception:
|
||
|
|
self.log.warning(f"id {record.id}: failed to connect to bucket {repr(self.bucket_id)}:\n{traceback.format_exc()}")
|
||
|
|
self.gcs_bucket = None
|
||
|
|
if self.gcs_bucket is None:
|
||
|
|
return False
|
||
|
|
dt = record.vision_time
|
||
|
|
timestamp = dt.strftime(self.time_format)
|
||
|
|
img_in = f"{self.images_path}/frames/{dt.strftime('%Y')}/{dt.strftime('%m')}/{timestamp}.png"
|
||
|
|
img_out = f"{self.bench.type}/{dt.strftime('%Y')}/{dt.strftime('%m')}/{timestamp}.png"
|
||
|
|
try:
|
||
|
|
blob = self.gcs_bucket.blob(img_out)
|
||
|
|
if not self.simulate:
|
||
|
|
blob.upload_from_filename(filename=img_in)
|
||
|
|
except FileNotFoundError:
|
||
|
|
self.log.error(f"id {record.id}: {img_in}: file not found.")
|
||
|
|
return False
|
||
|
|
except Forbidden as e:
|
||
|
|
if re.match("^Object '.*?' is subject to bucket's retention policy and cannot be deleted, overwritten or archived until .*?$", e._response.json()["error"]["message"]) is not None:
|
||
|
|
self.log.info(f"id {record.id}: {img_in}: already stored.")
|
||
|
|
else:
|
||
|
|
self.log.warning(f"id: {record.id}: failed to store remotely: {str(e)}: {e._response.json()}")
|
||
|
|
return False
|
||
|
|
except Exception:
|
||
|
|
self.log.error(f"id: {record.id}: failed to store remotely:\n{traceback.format_exc()}")
|
||
|
|
self.log.info(f"id: {record.id}: stored remotely")
|
||
|
|
return True
|