import json import os import re import sys import threading import time from pathlib import Path import requests import traceback import requests from google.api_core.exceptions import Forbidden from google.cloud import storage from requests import JSONDecodeError from lib.db import Archive, db from PyQt5.QtCore import QThread from requests.adapters import HTTPAdapter, Retry from urllib3.exceptions import InsecureRequestWarning from .component import Component # Suppress insecure request warning requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) class ArchiveSynchronizer(Component): def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True): super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded) self.simulate = "--sim-archiver" in sys.argv self.machine_status = "logged-in" self.machine_id = None def config_changed(self): self.machine_id = self.config.machine_id if "--dev-portal" in sys.argv: self.archive_endpoint = f"https://dev.r5portal.it/api/st-ten-save/" self.status_endpoint = f"https://dev.r5portal.it/api/device-info-update/" self.api_endpoint = f"https://dev.r5portal.it/api/" else: self.api_endpoint = self.config[self.name]["api_endpoint"] self.archive_endpoint = self.config[self.name]["archive_endpoint"] self.status_endpoint = self.config[self.name]["status_endpoint"] self._do_set_period({"period": float(self.config[self.name]["poll_time"])}) self.hold_time = round(float(self.config[self.name]["hold_time"]) * 1000) self.gcs_client = storage.Client.from_service_account_json(self.config[self.name]["service_account_json"]) self.gcs_client._http.mount("", HTTPAdapter(max_retries=Retry(total=0))) # this seems to be useless self.gcs_client._http.adapters.move_to_end("", last=False) # this seems to be useless self.bucket_id = self.config[self.name]["bucket_id"] self.gcs_bucket = None @db.connection_context() def _get(self): for record in list(Archive.select().where((Archive.archived != True) | (Archive.uploaded != True))): # using "is not True" breaks the query.. # list() forces the complete execution of the query unlocking the db unlike __enter__() if not self.simulate: if record.archived is not True: record.archived = self.remote_archive(record) is True if record.uploaded is not True: record.uploaded = self.remote_store(record) is True else: self.log.info("simulated archive synchronizer cycle") record.save() if self.hold_time > 0: QThread.msleep(self.hold_time) self.gcs_bucket = None # UPDATE MACHINE STATUS self.update_machine_status() super()._get() def update_machine_status(self): status_call = f"{self.status_endpoint}?machine-id={self.machine_id.upper()}&status={self.machine_status}" response = None try: if not self.simulate: with requests.Session() as s: s.mount("", HTTPAdapter(max_retries=Retry(total=0))) # this disables retries response = requests.post(status_call, timeout=5, verify=False) if response.status_code != 200: raise AssertionError("bad status response") except AssertionError as e: self.log.warning( f"Status: {self.machine_status}: failed to update machine status: {str(e)}: {response.status_code if response else 'no response'}: {response.content if response else 'no response'}" ) return False except (requests.ConnectionError, requests.Timeout) as e: self.log.warning( f"Status: {self.machine_status}: failed to update machine status, archive_endpoint might be unreachable: {str(e)}" ) return False except Exception: self.log.error( f"Status: {self.machine_status}: failed to update machine status:\n{traceback.format_exc()}:\n{response.status_code if response else 'no response'}: {response.content if response else 'no response'}" ) return False self.log.info(f"Status: {self.machine_status}: Machine Status Updated Successfully") return True def remote_archive(self, record): r = None try: if not self.simulate: with requests.Session() as s: s.mount("", HTTPAdapter(max_retries=Retry(total=0))) # this disables retries r = requests.post(self.archive_endpoint, params={ "data": json.dumps(record.test_data), "machine_id": self.machine_id, "overridden": record.overridden, "recipe": record.test_data.get("recipe", {}).get("name", None), "result": "OK" if record.result else "KO", "serial": record.id, "time": record.time.isoformat(), "user": record.user.username, }, timeout=5, verify=False) if r.status_code != 200: raise AssertionError("bad status response") except AssertionError as e: self.log.warning(f"id: {record.id}: failed to archive remotely: {str(e)}: {r.status_code}: {r.content}") return False except (requests.ConnectionError, requests.Timeout) as e: self.log.warning(f"id: {record.id}: failed to archive remotely, archive_endpoint might be unreachable: {str(e)}") return False return True def remote_store(self, record): if "vision" not in record.test_data: return True try: self.gcs_bucket = self.gcs_client.get_bucket(self.bucket_id, timeout=(5, 30), retry=None) # retry=None seems to be ignored except Exception: self.log.warning(f"id {record.id}: failed to connect to bucket {repr(self.bucket_id)}:\n{traceback.format_exc()}") self.gcs_bucket = None if self.gcs_bucket is None: return False for path in record.test_data["vision"]["0"]["files"]: dt = record.time # path_in = f"{self.images_path}/{dt.strftime('%Y')}/{dt.strftime('%m')}/{os.path.basename(path)}" path_in = path path_out = f"{self.machine_id}/{dt.strftime('%Y')}/{dt.strftime('%m')}/{os.path.basename(path)}" try: blob = self.gcs_bucket.blob(path_out) if not self.simulate: blob.upload_from_filename(filename=path_in) except FileNotFoundError: self.log.error(f"id {record.id}: {path_in}: file not found.") return False except Forbidden as e: if re.match("^Object '.*?' is subject to bucket's retention policy and cannot be deleted, overwritten or archived until .*?$", e._response.json()["error"]["message"]) is not None: self.log.info(f"id {record.id}: {path_in}: already stored.") else: self.log.warning(f"id: {record.id}: failed to store remotely: {str(e)}: {e._response.json()}") return False except Exception: self.log.error(f"id: {record.id}: failed to store remotely:\n{traceback.format_exc()}") self.log.info(f"id: {record.id}: stored remotely") return True def remote_fetch(self, remote_path=None,local_path=None): """ download a single file from the server. :param remote_path: path of where to download the file from :param local_path: path of where to save the file to :return: a dictionary with errors if any occur. """ if remote_path is None: raise ValueError("remote_path cannot be None") call_url = f"{self.api_endpoint}uploads/{remote_path}" try: if not self.simulate: with requests.Session() as s: self.log.info(f"Fetching list of file URLs from: {call_url}") # Make the HTTP GET request to fetch the list of URLs response = s.get(call_url, timeout=5, verify=False) # Log response details self.log.info(f"HTTP Status Code: {response.status_code}") self.log.info(f"Response Headers: {response.headers}") self.log.info(f"Response Content: {response.content}") # Handle HTTP errors for the list of URLs fetch if response.status_code == 404: self.log.warning(f"URLs endpoint not found: {call_url}. Please check the URL path.") return {"error": "URLs endpoint not found"} elif response.status_code == 403 or response.status_code == 401: self.log.warning(f"Access forbidden or not logged in for URLs endpoint: {call_url}") return {"error": "Access forbidden or not logged in"} elif response.status_code != 200: self.log.error( f"Unexpected HTTP response status: {response.status_code} for URL: {call_url}") return {"error": "Unexpected HTTP response status"} # Parse the list of URLs (assuming the response contains a JSON array of URLs) try: file_urls = response.json() self.log.info(f"Fetched file URLs: {file_urls}") except ValueError as e: self.log.error(f"Error parsing JSON response: {str(e)}") self.log.error(f"Response Content: {response.content}") return {"error": "Failed to parse JSON response"} downloaded_files = [] errors = {} # Setup the base download path base_download_path = os.path.join(Path.home(), "PycharmProjects", "st-ten-1") # Iterate over the list of URLs to download each file for specific_file_url in file_urls: try: self.log.info(f"Attempting to fetch file from: {specific_file_url}") # Make the HTTP GET request to fetch the file URL response = s.get(specific_file_url, timeout=5, verify=False) # Log response details self.log.info(f"HTTP Status Code: {response.status_code}") self.log.info(f"Response Headers: {response.headers}") # Handle HTTP errors appropriately for each file if response.status_code == 404: self.log.warning(f"File not found: {specific_file_url}. Please check the URL path.") errors[specific_file_url] = "File not found" continue elif response.status_code == 403: self.log.warning(f"Access forbidden for file: {specific_file_url}") errors[specific_file_url] = "Access forbidden" continue elif response.status_code != 200: self.log.error( f"Unexpected HTTP response status: {response.status_code} for URL: {specific_file_url}") errors[specific_file_url] = f"Unexpected status {response.status_code}" continue # Ensure the directory exists download_path = base_download_path os.makedirs(download_path, exist_ok=True) # Save the file to the determined directory local_file_path = os.path.join(download_path, os.path.basename(specific_file_url)) with open(local_file_path, "wb") as f: f.write(response.content) self.log.info(f"File downloaded successfully: {local_file_path}") downloaded_files.append(local_file_path) except (requests.ConnectionError, requests.Timeout) as e: self.log.warning( f"Failed to download file from {specific_file_url}, the URL might be unreachable: {str(e)}") errors[specific_file_url] = "URL unreachable" except Exception as e: self.log.error( f"An unexpected error occurred while downloading {specific_file_url}: {str(e)}") errors[specific_file_url] = "Unexpected error" return { "downloaded_files": downloaded_files, "errors": errors } except Exception as e: self.log.error(f"An unexpected error occurred while fetching URLs: {str(e)}") return {"error": "Unexpected error"} def check_actions_to_do(self, machine_id: str) : update_url = f"http://dev.r5portal.it/device/?machine-id={machine_id}" response = requests.get(update_url) if response.status_code == 200: try: #json_response = response.json() #action_to_do = json_response.get("ACTIONS_TO_DO", "NO_ACTION") #if action_to_do == "DOWNLOAD_FILES": self.remote_fetch(machine_id) #else: # self.log.info(f"No action required: {json_response}") except JSONDecodeError as e: self.log.error(f"JSON decode error: {str(e)} - Content: {response.content}") else: self.log.error(f"Failed to update device info: {response.status_code} - {response.content}")