import json import os import re import sys import threading import time from pathlib import Path import requests import traceback import requests from google.api_core.exceptions import Forbidden from google.cloud import storage from lib.db import Archive, db from PyQt5.QtCore import QThread from requests.adapters import HTTPAdapter, Retry from urllib3.exceptions import InsecureRequestWarning from .component import Component # Suppress insecure request warning requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) class ArchiveSynchronizer(Component): def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True): super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded) self.simulate = "--sim-archiver" in sys.argv #self.machine_status = "offline" def config_changed(self): self.machine_id = self.config.machine_id if "--dev-portal" in sys.argv: self.archive_endpoint = f"https://dev.r5portal.it/api/st-ten-save/" #self.url = f"https://dev.r5portal.it/media/uploads/warning_images/st-ten-10/st-ten-11.csv" else: self.archive_endpoint = self.config[self.name]["archive_endpoint"] #self.browse_folder_endpoint = self.config[self.name]["browse_folder_endpoint"] self._do_set_period({"period": float(self.config[self.name]["poll_time"])}) self.hold_time = round(float(self.config[self.name]["hold_time"]) * 1000) self.gcs_client = storage.Client.from_service_account_json(self.config[self.name]["service_account_json"]) self.gcs_client._http.mount("", HTTPAdapter(max_retries=Retry(total=0))) # this seems to be useless self.gcs_client._http.adapters.move_to_end("", last=False) # this seems to be useless self.bucket_id = self.config[self.name]["bucket_id"] self.gcs_bucket = None @db.connection_context() def _get(self): for record in list(Archive.select().where((Archive.archived != True) | (Archive.uploaded != True))): # using "is not True" breaks the query.. # list() forces the complete execution of the query unlocking the db unlike __enter__() if not self.simulate: if record.archived is not True: record.archived = self.remote_archive(record) is True if record.uploaded is not True: record.uploaded = self.remote_store(record) is True else: self.log.info("simulated archive synchronizer cycle") record.save() if self.hold_time > 0: QThread.msleep(self.hold_time) self.gcs_bucket = None if "--dev-portal" in sys.argv: self.update_machine_status() self.remote_fetch(self.machine_status,self.machine_id) super()._get() def update_machine_status(self): self.status_endpoint = f"https://dev.r5portal.it/api/device-info-update?machine-id={self.machine_id.upper()}&status={self.machine_status}" status_dict = {"last_status": self.machine_status} response = None try: if not self.simulate: with requests.Session() as s: s.mount("", HTTPAdapter(max_retries=Retry(total=0))) # this disables retries response = requests.post(self.status_endpoint, json=status_dict, timeout=5, verify=False) if response.status_code != 200: raise AssertionError("bad status response") except AssertionError as e: self.log.warning( f"Status: {self.machine_status}: failed to update machine status: {str(e)}: {response.status_code if response else 'no response'}: {response.content if response else 'no response'}" ) return False except (requests.ConnectionError, requests.Timeout) as e: self.log.warning( f"Status: {self.machine_status}: failed to update machine status, archive_endpoint might be unreachable: {str(e)}" ) return False except Exception: self.log.error( f"Status: {self.machine_status}: failed to update machine status:\n{traceback.format_exc()}:\n{response.status_code if response else 'no response'}: {response.content if response else 'no response'}" ) return False self.log.info(f"Status: {self.machine_status}: Machine Status Updated Successfully") return True def remote_archive(self, record): r = None try: if not self.simulate: with requests.Session() as s: s.mount("", HTTPAdapter(max_retries=Retry(total=0))) # this disables retries r = requests.post(self.archive_endpoint, params={ "data": json.dumps(record.test_data), "machine_id": self.machine_id, "overridden": record.overridden, "recipe": record.test_data.get("recipe", {}).get("name", None), "result": "OK" if record.result else "KO", "serial": record.id, "time": record.time.isoformat(), "user": record.user.username, }, timeout=5, verify=False) if r.status_code != 200: raise AssertionError("bad status response") except AssertionError as e: self.log.warning(f"id: {record.id}: failed to archive remotely: {str(e)}: {r.status_code}: {r.content}") return False except (requests.ConnectionError, requests.Timeout) as e: self.log.warning(f"id: {record.id}: failed to archive remotely, archive_endpoint might be unreachable: {str(e)}") return False return True def remote_store(self, record): if "vision" not in record.test_data: return True try: self.gcs_bucket = self.gcs_client.get_bucket(self.bucket_id, timeout=(5, 30), retry=None) # retry=None seems to be ignored except Exception: self.log.warning(f"id {record.id}: failed to connect to bucket {repr(self.bucket_id)}:\n{traceback.format_exc()}") self.gcs_bucket = None if self.gcs_bucket is None: return False for path in record.test_data["vision"]["0"]["files"]: dt = record.time # path_in = f"{self.images_path}/{dt.strftime('%Y')}/{dt.strftime('%m')}/{os.path.basename(path)}" path_in = path path_out = f"{self.machine_id}/{dt.strftime('%Y')}/{dt.strftime('%m')}/{os.path.basename(path)}" try: blob = self.gcs_bucket.blob(path_out) if not self.simulate: blob.upload_from_filename(filename=path_in) except FileNotFoundError: self.log.error(f"id {record.id}: {path_in}: file not found.") return False except Forbidden as e: if re.match("^Object '.*?' is subject to bucket's retention policy and cannot be deleted, overwritten or archived until .*?$", e._response.json()["error"]["message"]) is not None: self.log.info(f"id {record.id}: {path_in}: already stored.") else: self.log.warning(f"id: {record.id}: failed to store remotely: {str(e)}: {e._response.json()}") return False except Exception: self.log.error(f"id: {record.id}: failed to store remotely:\n{traceback.format_exc()}") self.log.info(f"id: {record.id}: stored remotely") return True def remote_fetch(self, machine_status=None, machine_id=None): """ Fetch and download files from the server. :param machine_status: Status of the machine (e.g., 'logged-in'). :param machine_id: ID of the machine. :return: A list of downloaded file paths or a dictionary with errors if any occur. """ # URLs endpoint from your Django server with machine_id as a query parameter if machine_id is None: raise ValueError("machine_id cannot be None") urls_endpoint = f"http://dev.r5portal.it/get-file-urls/?machine_id={machine_id}" if machine_status == "logged-in": try: # Fetching the list of specific file URLs from the Django server if not self.simulate: with requests.Session() as s: self.log.info(f"Fetching list of file URLs from: {urls_endpoint}") # Make the HTTP GET request to fetch the list of URLs response = s.get(urls_endpoint, timeout=5, verify=False) # Log response details self.log.info(f"HTTP Status Code: {response.status_code}") self.log.info(f"Response Headers: {response.headers}") self.log.info(f"Response Content: {response.content}") # Handle HTTP errors for the list of URLs fetch if response.status_code == 404: self.log.warning(f"URLs endpoint not found: {urls_endpoint}. Please check the URL path.") return {"error": "URLs endpoint not found"} elif response.status_code == 403: self.log.warning(f"Access forbidden for URLs endpoint: {urls_endpoint}") return {"error": "Access forbidden"} elif response.status_code != 200: self.log.error( f"Unexpected HTTP response status: {response.status_code} for URL: {urls_endpoint}") return {"error": "Unexpected HTTP response status"} # Parse the list of URLs (assuming the response contains a JSON array of URLs) try: file_urls = response.json() self.log.info(f"Fetched file URLs: {file_urls}") except ValueError as e: self.log.error(f"Error parsing JSON response: {str(e)}") self.log.error(f"Response Content: {response.content}") return {"error": "Failed to parse JSON response"} downloaded_files = [] errors = {} # Setup the base download path base_download_path = os.path.join(Path.home(), "PycharmProjects", "st-ten-1") # Iterate over the list of URLs to download each file for specific_file_url in file_urls: try: self.log.info(f"Attempting to fetch file from: {specific_file_url}") # Make the HTTP GET request to fetch the file URL response = s.get(specific_file_url, timeout=5, verify=False) # Log response details self.log.info(f"HTTP Status Code: {response.status_code}") self.log.info(f"Response Headers: {response.headers}") # Handle HTTP errors appropriately for each file if response.status_code == 404: self.log.warning(f"File not found: {specific_file_url}. Please check the URL path.") errors[specific_file_url] = "File not found" continue elif response.status_code == 403: self.log.warning(f"Access forbidden for file: {specific_file_url}") errors[specific_file_url] = "Access forbidden" continue elif response.status_code != 200: self.log.error( f"Unexpected HTTP response status: {response.status_code} for URL: {specific_file_url}") errors[specific_file_url] = f"Unexpected status {response.status_code}" continue # Determine the subdirectory based on the file URL or other criteria if "warning_images" in specific_file_url: sub_dir = os.path.join("config", "warning_images", machine_id) elif "ricette" in specific_file_url: sub_dir = os.path.join("config", "csv_import", "auto_csv_import") else: sub_dir = "tmp" # Save others under the temporary folder # Ensure the directory exists download_path = os.path.join(base_download_path, sub_dir) os.makedirs(download_path, exist_ok=True) # Save the file to the determined directory local_file_path = os.path.join(download_path, os.path.basename(specific_file_url)) with open(local_file_path, "wb") as f: f.write(response.content) self.log.info(f"File downloaded successfully: {local_file_path}") downloaded_files.append(local_file_path) except (requests.ConnectionError, requests.Timeout) as e: self.log.warning( f"Failed to download file from {specific_file_url}, the URL might be unreachable: {str(e)}") errors[specific_file_url] = "URL unreachable" except Exception as e: self.log.error( f"An unexpected error occurred while downloading {specific_file_url}: {str(e)}") errors[specific_file_url] = "Unexpected error" return { "downloaded_files": downloaded_files, "errors": errors } except Exception as e: self.log.error(f"An unexpected error occurred while fetching URLs: {str(e)}") return {"error": "Unexpected error"}