import json import os import re import sys import threading import time from datetime import datetime from pathlib import Path import requests import traceback import requests from google.api_core.exceptions import Forbidden from google.cloud import storage from requests import JSONDecodeError from lib.db import Archive, db from lib.db.models import Log from PyQt5.QtCore import QThread from requests.adapters import HTTPAdapter, Retry from urllib3.exceptions import InsecureRequestWarning from .component import Component from ui.helpers import get_main_window # Suppress insecure request warning requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) class ArchiveSynchronizer(Component): def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True): super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded) self.main_window = None self.simulate = "--sim-archiver" in sys.argv self.machine_status = "logged-in" self.machine_id = None def config_changed(self): self.machine_id = self.config.machine_id if "--dev-portal" in sys.argv: self.archive_endpoint = f"https://dev.r5portal.it/api/st-ten-save/" self.status_endpoint = f"https://dev.r5portal.it/api/device-info-update/" self.download_endpoint = f"https://dev.r5portal.it/media/uploads/" else: self.download_endpoint = self.config[self.name]["portal_address"] + self.config[self.name]["download_root"] self.archive_endpoint = self.config[self.name]["portal_address"] + self.config[self.name]["archive_root"] self.status_endpoint = self.config[self.name]["portal_address"] + self.config[self.name]["status_root"] self._do_set_period({"period": float(self.config[self.name]["poll_time"])}) self.hold_time = round(float(self.config[self.name]["hold_time"]) * 1000) if self.name == "archive_synchronizer": self.gcs_client = storage.Client.from_service_account_json(self.config[self.name]["service_account_json"]) self.gcs_client._http.mount("", HTTPAdapter(max_retries=Retry(total=0))) # this seems to be useless self.gcs_client._http.adapters.move_to_end("", last=False) # this seems to be useless self.bucket_id = self.config[self.name]["bucket_id"] self.gcs_bucket = None @db.connection_context() def _get(self): if self.main_window is None: self.main_window = get_main_window() if self.name != "archive_synchronizer_extra": # MAIN SERVER bit_pos = 0 unsaved_records = Archive.select().where((Archive.archived == 0) | (Archive.archived == 2) | (Archive.uploaded == 0)) else: # EXTRA SERVER (VERO PROJECT SPA) bit_pos = 1 unsaved_records = Archive.select().where((Archive.archived == 0) | (Archive.archived == 1)) for record in unsaved_records: if not self.simulate: if record.archived is not True: s = time.time() save_ok = self.remote_archive(record) is True e = time.time() else: save_ok=True if record.uploaded is not True: record.uploaded = self.remote_store(record) is True else: self.log.info("simulated archive synchronizer cycle") save_ok=True if save_ok: record.archived |= (1 << bit_pos) self.log.info(f"({self.name}) id {record.id}: archived remotely") else: self.log.info(f"({self.name}) id {record.id}: failed to archive remotely") self.main_window.run_request.emit(record.save, [], {}) if self.hold_time > 0: QThread.msleep(self.hold_time) self.gcs_bucket = None # UPDATE MACHINE STATUS self.machine_status="working" super()._get() if self.name == "archive_synchronizer": self.update_machine_status() def update_machine_status(self): status_call = f"{self.status_endpoint}?machine-id={self.machine_id.upper()}&status={self.machine_status}" response = None try: if not self.simulate: with requests.Session() as s: s.mount("", HTTPAdapter(max_retries=Retry(total=0))) # this disables retries response = s.post(status_call, timeout=5, verify=False) if response.status_code != 200: raise AssertionError("bad status response") else: self.parse_response_and_execute(response) except AssertionError as e: self.log.warning( f"Status: {self.machine_status}: failed to update machine status: {str(e)}: {response.status_code if response else 'no response'}: {response.content if response else 'no response'}" ) return False except (requests.ConnectionError, requests.Timeout) as e: self.log.warning( f"Status: {self.machine_status}: failed to update machine status, archive_endpoint might be unreachable: {str(e)}" ) return False except Exception: self.log.error( f"Status: {self.machine_status}: failed to update machine status:\n{traceback.format_exc()}:\n{response.status_code if response else 'no response'}: {response.content if response else 'no response'}" ) return False self.log.info(f"Status: {self.machine_status}: Machine Status Updated Successfully") return True def parse_response_and_execute(self, response): try: data = response.json() if not isinstance(data, dict): raise ValueError("Parsed response is not a dictionary") actions = data.get("ACTIONS_TO_DO", []) # Ensure actions is a list if isinstance(actions, dict): actions = [actions] for action in actions: remote_path = action.get("remote_path") local_path = action.get("local_path") self.log.info(f"Executing remote fetch with remote_path: {remote_path} and local_path: {local_path}") result = self.remote_fetch(remote_path=remote_path, local_path=local_path) self.log.info(f"Remote fetch result: {result}") except json.JSONDecodeError: self.log.error("Failed to decode JSON response") except Exception as e: self.log.error(f"An unexpected error occurred while parsing response: {str(e)}") def remote_archive(self, record): r = None try: if not self.simulate: with requests.Session() as s: s.mount("", HTTPAdapter(max_retries=Retry(total=0))) # this disables retries if self.name == "archive_synchronizer": r = requests.post(self.archive_endpoint, params={ "data": json.dumps(record.test_data), "machine_id": self.machine_id, "overridden": record.overridden, "recipe": record.test_data.get("recipe", {}).get("name", None), "result": "OK" if record.result else "KO", "serial": record.id, "time": record.time.isoformat(), "user": record.user.username, }, timeout=5, verify=False) else: r = requests.get(self.archive_endpoint, params={ "machine_id": self.machine_id, "overridden": record.overridden, "recipe": record.test_data.get("recipe", {}).get("name", None), "result": "OK" if record.result else "KO", "serial": record.id, "time": record.time.isoformat(), "user": record.user.username, }, timeout=5, verify=False) if r.status_code != 200: raise AssertionError("bad status response") except AssertionError as e: self.log.warning(f"id: {record.id}: failed to archive remotely: {str(e)}: {r.status_code}: {r.content}") return False except (requests.ConnectionError, requests.Timeout) as e: self.log.warning(f"id: {record.id}: failed to archive remotely, archive_endpoint might be unreachable: {str(e)}") return False self.log.info(f"Archived successfully: {record.id}") return True def remote_store(self, record): if "vision" not in record.test_data: return True try: self.gcs_bucket = self.gcs_client.get_bucket(self.bucket_id, timeout=(5, 30), retry=None) # retry=None seems to be ignored except Exception: self.log.warning(f"id {record.id}: failed to connect to bucket {repr(self.bucket_id)}:\n{traceback.format_exc()}") self.gcs_bucket = None if self.gcs_bucket is None: return False for path in record.test_data["vision"]["0"]["files"]: dt = record.time # path_in = f"{self.images_path}/{dt.strftime('%Y')}/{dt.strftime('%m')}/{os.path.basename(path)}" path_in = path path_out = f"{self.machine_id}/{dt.strftime('%Y')}/{dt.strftime('%m')}/{os.path.basename(path)}" try: blob = self.gcs_bucket.blob(path_out) if not self.simulate: blob.upload_from_filename(filename=path_in) except FileNotFoundError: self.log.error(f"id {record.id}: {path_in}: file not found.") return False except Forbidden as e: if re.match("^Object '.*?' is subject to bucket's retention policy and cannot be deleted, overwritten or archived until .*?$", e._response.json()["error"]["message"]) is not None: self.log.info(f"id {record.id}: {path_in}: already stored.") else: self.log.warning(f"id: {record.id}: failed to store remotely: {str(e)}: {e._response.json()}") return False except Exception: self.log.error(f"id: {record.id}: failed to store remotely:\n{traceback.format_exc()}") self.log.info(f"Stored successfully: {record.id}") return True def remote_fetch(self, remote_path=None, local_path=None): """ Download a single file from the server and retrieve the last update time and response status. :param remote_path: Path of where to download the file from :param local_path: Path of where to save the file to :return: A dictionary with errors if any occur, and last update information. """ if remote_path is None: raise ValueError("remote_path cannot be None") if local_path is None: raise ValueError("local_path cannot be None") if "--dev-portal" in sys.argv: call_url = f"https://dev.r5portal.it/{remote_path}" else: call_url = f"https://r5portal.it/{remote_path}" log_info_type = "Download" log_time = datetime.now() log_info = f"Attempted to download from {call_url}" last_update_info = None try: if not self.simulate: with requests.Session() as s: self.log.info(f"Fetching file from: {call_url}") response = s.get(call_url, timeout=5, verify=False) self.log.info(f"HTTP Status Code: {response.status_code}") last_update_info = { "last_update_time": log_time.isoformat(), "response_status": response.status_code } if response.status_code == 404: log_info += " - File not found" self.log.warning(log_info) self.log_to_db(log_time, log_info_type, log_info) return {"error": "File not found", "last_update_info": last_update_info} elif response.status_code in [403, 401]: log_info += " - Access forbidden or not logged in" self.log.warning(log_info) self.log_to_db(log_time, log_info_type, log_info) return {"error": "Access forbidden or not logged in", "last_update_info": last_update_info} elif response.status_code != 200: log_info += f" - Unexpected HTTP response status: {response.status_code}" self.log.error(log_info) self.log_to_db(log_time, log_info_type, log_info) return {"error": "Unexpected HTTP response status", "last_update_info": last_update_info} os.makedirs(local_path, exist_ok=True) local_file_path = os.path.join(local_path, os.path.basename(remote_path)) with open(local_file_path, "wb") as f: f.write(response.content) log_info += f" - File downloaded successfully: {local_file_path}" self.log.info(log_info) self.log_to_db(log_time, log_info_type, log_info) return {"downloaded_file": local_file_path, "last_update_info": last_update_info} except requests.ConnectionError as e: log_info += f" - Connection error: {str(e)}" self.log.error(log_info) self.log_to_db(log_time, log_info_type, log_info) return {"error": "Connection error", "last_update_info": last_update_info} except requests.Timeout as e: log_info += f" - Timeout error: {str(e)}" self.log.error(log_info) self.log_to_db(log_time, log_info_type, log_info) return {"error": "Timeout error", "last_update_info": last_update_info} except Exception as e: log_info += f" - Unexpected error: {str(e)}" self.log.error(log_info) self.log_to_db(log_time, log_info_type, log_info) return {"error": "Unexpected error", "last_update_info": last_update_info} def log_to_db(self, log_time, log_info_type, log_info): """Save log information to the database.""" try: # Use the Log class instead of the log instance new_log_entry = Log(time=log_time, info_type=log_info_type, info=log_info) new_log_entry.save() # Save the log entry to the database except Exception as e: self.log.error(f"Failed to save log to database: {str(e)}") self.log.error(traceback.format_exc())