import json import os import re import sys import threading import time from pathlib import Path import requests import traceback import requests from google.api_core.exceptions import Forbidden from google.cloud import storage from requests import JSONDecodeError from lib.db import Archive, db from PyQt5.QtCore import QThread from requests.adapters import HTTPAdapter, Retry from urllib3.exceptions import InsecureRequestWarning from .component import Component # Suppress insecure request warning requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) class ArchiveSynchronizer(Component): def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True): super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded) self.simulate = "--sim-archiver" in sys.argv self.machine_status = "logged-in" self.machine_id = None def config_changed(self): self.machine_id = self.config.machine_id if "--dev-portal" in sys.argv: self.archive_endpoint = f"https://dev.r5portal.it/api/st-ten-save/" self.status_endpoint = f"https://dev.r5portal.it/api/device-info-update/" self.download_endpoint = f"https://dev.r5portal.it/media/uploads/" else: self.download_endpoint = self.config[self.name]["portal_address"] + self.config[self.name]["download_root"] self.archive_endpoint = self.config[self.name]["portal_address"] + self.config[self.name]["archive_root"] self.status_endpoint = self.config[self.name]["portal_address"] + self.config[self.name]["status_root"] self._do_set_period({"period": float(self.config[self.name]["poll_time"])}) self.hold_time = round(float(self.config[self.name]["hold_time"]) * 1000) self.gcs_client = storage.Client.from_service_account_json(self.config[self.name]["service_account_json"]) self.gcs_client._http.mount("", HTTPAdapter(max_retries=Retry(total=0))) # this seems to be useless self.gcs_client._http.adapters.move_to_end("", last=False) # this seems to be useless self.bucket_id = self.config[self.name]["bucket_id"] self.gcs_bucket = None @db.connection_context() def _get(self): for record in list(Archive.select().where((Archive.archived != True) | (Archive.uploaded != True))): # using "is not True" breaks the query.. # list() forces the complete execution of the query unlocking the db unlike __enter__() if not self.simulate: if record.archived is not True: record.archived = self.remote_archive(record) is True if record.uploaded is not True: record.uploaded = self.remote_store(record) is True else: self.log.info("simulated archive synchronizer cycle") record.save() if self.hold_time > 0: QThread.msleep(self.hold_time) self.gcs_bucket = None # UPDATE MACHINE STATUS self.machine_status="working" super()._get() self.update_machine_status() def update_machine_status(self): status_call = f"{self.status_endpoint}?machine-id={self.machine_id.upper()}&status={self.machine_status}" response = None try: if not self.simulate: with requests.Session() as s: s.mount("", HTTPAdapter(max_retries=Retry(total=0))) # this disables retries response = s.post(status_call, timeout=5, verify=False) if response.status_code != 200: raise AssertionError("bad status response") else: self.parse_response_and_execute(response) except AssertionError as e: self.log.warning( f"Status: {self.machine_status}: failed to update machine status: {str(e)}: {response.status_code if response else 'no response'}: {response.content if response else 'no response'}" ) return False except (requests.ConnectionError, requests.Timeout) as e: self.log.warning( f"Status: {self.machine_status}: failed to update machine status, archive_endpoint might be unreachable: {str(e)}" ) return False except Exception: self.log.error( f"Status: {self.machine_status}: failed to update machine status:\n{traceback.format_exc()}:\n{response.status_code if response else 'no response'}: {response.content if response else 'no response'}" ) return False self.log.info(f"Status: {self.machine_status}: Machine Status Updated Successfully") return True def parse_response_and_execute(self, response): try: data = response.json() if not isinstance(data, dict): raise ValueError("Parsed response is not a dictionary") actions = data.get("ACTIONS_TO_DO", []) # Ensure actions is a list if isinstance(actions, dict): actions = [actions] for action in actions: remote_path = action.get("remote_path") local_path = action.get("local_path") self.log.info(f"Executing remote fetch with remote_path: {remote_path} and local_path: {local_path}") result = self.remote_fetch(remote_path=remote_path, local_path=local_path) self.log.info(f"Remote fetch result: {result}") except json.JSONDecodeError: self.log.error("Failed to decode JSON response") except Exception as e: self.log.error(f"An unexpected error occurred while parsing response: {str(e)}") def remote_archive(self, record): r = None try: if not self.simulate: with requests.Session() as s: s.mount("", HTTPAdapter(max_retries=Retry(total=0))) # this disables retries r = requests.post(self.archive_endpoint, params={ "data": json.dumps(record.test_data), "machine_id": self.machine_id, "overridden": record.overridden, "recipe": record.test_data.get("recipe", {}).get("name", None), "result": "OK" if record.result else "KO", "serial": record.id, "time": record.time.isoformat(), "user": record.user.username, }, timeout=5, verify=False) if r.status_code != 200: raise AssertionError("bad status response") except AssertionError as e: self.log.warning(f"id: {record.id}: failed to archive remotely: {str(e)}: {r.status_code}: {r.content}") return False except (requests.ConnectionError, requests.Timeout) as e: self.log.warning(f"id: {record.id}: failed to archive remotely, archive_endpoint might be unreachable: {str(e)}") return False return True def remote_store(self, record): if "vision" not in record.test_data: return True try: self.gcs_bucket = self.gcs_client.get_bucket(self.bucket_id, timeout=(5, 30), retry=None) # retry=None seems to be ignored except Exception: self.log.warning(f"id {record.id}: failed to connect to bucket {repr(self.bucket_id)}:\n{traceback.format_exc()}") self.gcs_bucket = None if self.gcs_bucket is None: return False for path in record.test_data["vision"]["0"]["files"]: dt = record.time # path_in = f"{self.images_path}/{dt.strftime('%Y')}/{dt.strftime('%m')}/{os.path.basename(path)}" path_in = path path_out = f"{self.machine_id}/{dt.strftime('%Y')}/{dt.strftime('%m')}/{os.path.basename(path)}" try: blob = self.gcs_bucket.blob(path_out) if not self.simulate: blob.upload_from_filename(filename=path_in) except FileNotFoundError: self.log.error(f"id {record.id}: {path_in}: file not found.") return False except Forbidden as e: if re.match("^Object '.*?' is subject to bucket's retention policy and cannot be deleted, overwritten or archived until .*?$", e._response.json()["error"]["message"]) is not None: self.log.info(f"id {record.id}: {path_in}: already stored.") else: self.log.warning(f"id: {record.id}: failed to store remotely: {str(e)}: {e._response.json()}") return False except Exception: self.log.error(f"id: {record.id}: failed to store remotely:\n{traceback.format_exc()}") self.log.info(f"id: {record.id}: stored remotely") return True def remote_fetch(self, remote_path=None, local_path=None): """ download a single file from the server. :param remote_path: path of where to download the file from :param local_path: path of where to save the file to :return: a dictionary with errors if any occur. """ if remote_path is None: raise ValueError("remote_path cannot be None") if local_path is None: raise ValueError("local_path cannot be None") call_url = f"{self.download_endpoint}{remote_path}" try: if not self.simulate: with requests.Session() as s: self.log.info(f"Fetching file from: {call_url}") # Make the HTTP GET request to fetch the file response = s.get(call_url, timeout=5, verify=False) # Log response details self.log.info(f"HTTP Status Code: {response.status_code}") #self.log.info(f"Response Headers: {response.headers}") #self.log.info(f"Response Content: {response.content}") # Handle HTTP errors if response.status_code == 404: self.log.warning(f"File not found: {call_url}. Please check the URL path.") return {"error": "File not found"} elif response.status_code == 403 or response.status_code == 401: self.log.warning(f"Access forbidden or not logged in for file: {call_url}") return {"error": "Access forbidden or not logged in"} elif response.status_code != 200: self.log.error(f"Unexpected HTTP response status: {response.status_code} for URL: {call_url}") return {"error": "Unexpected HTTP response status"} # Ensure the directory exists os.makedirs(local_path, exist_ok=True) # Save the file to the local path local_file_path = os.path.join(local_path, os.path.basename(remote_path)) with open(local_file_path, "wb") as f: f.write(response.content) self.log.info(f"File downloaded successfully: {local_file_path}") return {"downloaded_file": local_file_path} except requests.ConnectionError as e: self.log.error(f"Connection error occurred while fetching the file: {str(e)}") return {"error": "Connection error"} except requests.Timeout as e: self.log.error(f"Timeout error occurred while fetching the file: {str(e)}") return {"error": "Timeout error"} except Exception as e: self.log.error(f"An unexpected error occurred: {str(e)}") return {"error": "Unexpected error"}