st-ten-1/src/components/archive_synchronizer.py

289 lines
15 KiB
Python
Raw Normal View History

2022-06-01 16:37:19 +00:00
import json
2022-10-03 11:48:59 +00:00
import os
2022-06-01 16:37:19 +00:00
import re
import sys
import threading
import time
2024-10-23 09:46:57 +00:00
from pathlib import Path
import requests
2022-06-01 16:37:19 +00:00
import traceback
import requests
from google.api_core.exceptions import Forbidden
from google.cloud import storage
from requests import JSONDecodeError
2022-06-01 16:37:19 +00:00
from lib.db import Archive, db
from PyQt5.QtCore import QThread
from requests.adapters import HTTPAdapter, Retry
from urllib3.exceptions import InsecureRequestWarning
from .component import Component
# Suppress insecure request warning
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
class ArchiveSynchronizer(Component):
def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True):
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded)
self.simulate = "--sim-archiver" in sys.argv
2024-10-28 14:58:46 +00:00
#self.machine_status = "offline"
self.machine_status = "logged-in"
2022-06-01 16:37:19 +00:00
def config_changed(self):
self.machine_id = self.config.machine_id
if "--dev-portal" in sys.argv:
self.archive_endpoint = f"https://dev.r5portal.it/api/st-ten-save/"
2024-10-23 09:30:30 +00:00
#self.url = f"https://dev.r5portal.it/media/uploads/warning_images/st-ten-10/st-ten-11.csv"
else:
self.archive_endpoint = self.config[self.name]["archive_endpoint"]
#self.browse_folder_endpoint = self.config[self.name]["browse_folder_endpoint"]
2022-06-29 09:02:58 +00:00
self._do_set_period({"period": float(self.config[self.name]["poll_time"])})
self.hold_time = round(float(self.config[self.name]["hold_time"]) * 1000)
self.gcs_client = storage.Client.from_service_account_json(self.config[self.name]["service_account_json"])
2022-06-01 16:37:19 +00:00
self.gcs_client._http.mount("", HTTPAdapter(max_retries=Retry(total=0))) # this seems to be useless
self.gcs_client._http.adapters.move_to_end("", last=False) # this seems to be useless
2022-06-29 09:02:58 +00:00
self.bucket_id = self.config[self.name]["bucket_id"]
2022-06-01 16:37:19 +00:00
self.gcs_bucket = None
@db.connection_context()
def _get(self):
2022-10-12 12:52:34 +00:00
for record in list(Archive.select().where((Archive.archived != True) | (Archive.uploaded != True))): # using "is not True" breaks the query.. # list() forces the complete execution of the query unlocking the db unlike __enter__()
2022-06-01 16:37:19 +00:00
if not self.simulate:
if record.archived is not True:
record.archived = self.remote_archive(record) is True
if record.uploaded is not True:
record.uploaded = self.remote_store(record) is True
2023-07-18 14:47:06 +00:00
else:
self.log.info("simulated archive synchronizer cycle")
2022-06-01 16:37:19 +00:00
record.save()
if self.hold_time > 0:
QThread.msleep(self.hold_time)
self.gcs_bucket = None
if "--dev-portal" in sys.argv:
self.update_machine_status()
self.check_actions_to_do(self.machine_id)
2022-06-01 16:37:19 +00:00
super()._get()
def update_machine_status(self):
self.status_endpoint = f"https://dev.r5portal.it/api/device-info-update?machine-id={self.machine_id.upper()}&status={self.machine_status}"
status_dict = {"last_status": self.machine_status}
response = None
try:
if not self.simulate:
with requests.Session() as s:
s.mount("", HTTPAdapter(max_retries=Retry(total=0))) # this disables retries
response = requests.post(self.status_endpoint, json=status_dict, timeout=5, verify=False)
if response.status_code != 200:
raise AssertionError("bad status response")
except AssertionError as e:
self.log.warning(
f"Status: {self.machine_status}: failed to update machine status: {str(e)}: {response.status_code if response else 'no response'}: {response.content if response else 'no response'}"
)
return False
except (requests.ConnectionError, requests.Timeout) as e:
self.log.warning(
f"Status: {self.machine_status}: failed to update machine status, archive_endpoint might be unreachable: {str(e)}"
)
return False
except Exception:
self.log.error(
f"Status: {self.machine_status}: failed to update machine status:\n{traceback.format_exc()}:\n{response.status_code if response else 'no response'}: {response.content if response else 'no response'}"
)
return False
self.log.info(f"Status: {self.machine_status}: Machine Status Updated Successfully")
return True
2022-06-01 16:37:19 +00:00
def remote_archive(self, record):
r = None
2022-06-01 16:37:19 +00:00
try:
if not self.simulate:
with requests.Session() as s:
s.mount("", HTTPAdapter(max_retries=Retry(total=0))) # this disables retries
r = requests.post(self.archive_endpoint, params={
2022-10-03 11:48:59 +00:00
"data": json.dumps(record.test_data),
2022-06-01 16:37:19 +00:00
"machine_id": self.machine_id,
2022-10-03 11:48:59 +00:00
"overridden": record.overridden,
"recipe": record.test_data.get("recipe", {}).get("name", None),
"result": "OK" if record.result else "KO",
"serial": record.id,
2022-06-01 16:37:19 +00:00
"time": record.time.isoformat(),
"user": record.user.username,
}, timeout=5, verify=False)
if r.status_code != 200:
raise AssertionError("bad status response")
except AssertionError as e:
self.log.warning(f"id: {record.id}: failed to archive remotely: {str(e)}: {r.status_code}: {r.content}")
return False
except (requests.ConnectionError, requests.Timeout) as e:
self.log.warning(f"id: {record.id}: failed to archive remotely, archive_endpoint might be unreachable: {str(e)}")
return False
return True
def remote_store(self, record):
2022-10-03 11:48:59 +00:00
if "vision" not in record.test_data:
return True
2022-06-01 16:37:19 +00:00
try:
self.gcs_bucket = self.gcs_client.get_bucket(self.bucket_id, timeout=(5, 30), retry=None) # retry=None seems to be ignored
except Exception:
self.log.warning(f"id {record.id}: failed to connect to bucket {repr(self.bucket_id)}:\n{traceback.format_exc()}")
self.gcs_bucket = None
if self.gcs_bucket is None:
return False
2022-10-03 11:48:59 +00:00
for path in record.test_data["vision"]["0"]["files"]:
dt = record.time
# path_in = f"{self.images_path}/{dt.strftime('%Y')}/{dt.strftime('%m')}/{os.path.basename(path)}"
path_in = path
path_out = f"{self.machine_id}/{dt.strftime('%Y')}/{dt.strftime('%m')}/{os.path.basename(path)}"
try:
blob = self.gcs_bucket.blob(path_out)
if not self.simulate:
blob.upload_from_filename(filename=path_in)
except FileNotFoundError:
self.log.error(f"id {record.id}: {path_in}: file not found.")
2022-06-01 16:37:19 +00:00
return False
2022-10-03 11:48:59 +00:00
except Forbidden as e:
if re.match("^Object '.*?' is subject to bucket's retention policy and cannot be deleted, overwritten or archived until .*?$", e._response.json()["error"]["message"]) is not None:
self.log.info(f"id {record.id}: {path_in}: already stored.")
else:
self.log.warning(f"id: {record.id}: failed to store remotely: {str(e)}: {e._response.json()}")
return False
except Exception:
self.log.error(f"id: {record.id}: failed to store remotely:\n{traceback.format_exc()}")
self.log.info(f"id: {record.id}: stored remotely")
2022-06-01 16:37:19 +00:00
return True
def remote_fetch(self, machine_id=None):
"""
2024-10-23 09:30:30 +00:00
Fetch and download files from the server.
:param machine_id: ID of the machine.
2024-10-23 09:30:30 +00:00
:return: A list of downloaded file paths or a dictionary with errors if any occur.
"""
if machine_id is None:
raise ValueError("machine_id cannot be None")
urls_endpoint = f"http://dev.r5portal.it/get-file-urls/?machine_id={machine_id}"
2024-10-23 09:30:30 +00:00
try:
if not self.simulate:
with requests.Session() as s:
self.log.info(f"Fetching list of file URLs from: {urls_endpoint}")
# Make the HTTP GET request to fetch the list of URLs
response = s.get(urls_endpoint, timeout=5, verify=False)
# Log response details
self.log.info(f"HTTP Status Code: {response.status_code}")
self.log.info(f"Response Headers: {response.headers}")
self.log.info(f"Response Content: {response.content}")
# Handle HTTP errors for the list of URLs fetch
if response.status_code == 404:
self.log.warning(f"URLs endpoint not found: {urls_endpoint}. Please check the URL path.")
return {"error": "URLs endpoint not found"}
elif response.status_code == 403 or response.status_code == 401:
self.log.warning(f"Access forbidden or not logged in for URLs endpoint: {urls_endpoint}")
return {"error": "Access forbidden or not logged in"}
elif response.status_code != 200:
self.log.error(
f"Unexpected HTTP response status: {response.status_code} for URL: {urls_endpoint}")
return {"error": "Unexpected HTTP response status"}
# Parse the list of URLs (assuming the response contains a JSON array of URLs)
try:
file_urls = response.json()
self.log.info(f"Fetched file URLs: {file_urls}")
except ValueError as e:
self.log.error(f"Error parsing JSON response: {str(e)}")
self.log.error(f"Response Content: {response.content}")
return {"error": "Failed to parse JSON response"}
downloaded_files = []
errors = {}
# Setup the base download path
base_download_path = os.path.join(Path.home(), "PycharmProjects", "st-ten-1")
# Iterate over the list of URLs to download each file
for specific_file_url in file_urls:
2024-10-23 09:30:30 +00:00
try:
self.log.info(f"Attempting to fetch file from: {specific_file_url}")
# Make the HTTP GET request to fetch the file URL
response = s.get(specific_file_url, timeout=5, verify=False)
# Log response details
self.log.info(f"HTTP Status Code: {response.status_code}")
self.log.info(f"Response Headers: {response.headers}")
# Handle HTTP errors appropriately for each file
if response.status_code == 404:
self.log.warning(f"File not found: {specific_file_url}. Please check the URL path.")
errors[specific_file_url] = "File not found"
continue
elif response.status_code == 403:
self.log.warning(f"Access forbidden for file: {specific_file_url}")
errors[specific_file_url] = "Access forbidden"
continue
elif response.status_code != 200:
2024-10-23 09:30:30 +00:00
self.log.error(
f"Unexpected HTTP response status: {response.status_code} for URL: {specific_file_url}")
errors[specific_file_url] = f"Unexpected status {response.status_code}"
continue
# Determine the subdirectory based on the file URL or other criteria
if "warning_images" in specific_file_url:
sub_dir = os.path.join("config", "warning_images", machine_id)
elif "ricette" in specific_file_url:
sub_dir = os.path.join("config", "csv_import", "auto_csv_import")
else:
sub_dir = "tmp" # Save others under the temporary folder
# Ensure the directory exists
download_path = os.path.join(base_download_path, sub_dir)
os.makedirs(download_path, exist_ok=True)
# Save the file to the determined directory
local_file_path = os.path.join(download_path, os.path.basename(specific_file_url))
with open(local_file_path, "wb") as f:
f.write(response.content)
2024-10-23 09:30:30 +00:00
self.log.info(f"File downloaded successfully: {local_file_path}")
downloaded_files.append(local_file_path)
2024-10-23 09:30:30 +00:00
except (requests.ConnectionError, requests.Timeout) as e:
self.log.warning(
f"Failed to download file from {specific_file_url}, the URL might be unreachable: {str(e)}")
errors[specific_file_url] = "URL unreachable"
except Exception as e:
self.log.error(
f"An unexpected error occurred while downloading {specific_file_url}: {str(e)}")
errors[specific_file_url] = "Unexpected error"
return {
"downloaded_files": downloaded_files,
"errors": errors
}
except Exception as e:
self.log.error(f"An unexpected error occurred while fetching URLs: {str(e)}")
return {"error": "Unexpected error"}
def check_actions_to_do(self, machine_id: str) :
update_url = f"http://dev.r5portal.it/device/?machine-id={machine_id}"
response = requests.get(update_url)
if response.status_code == 200:
try:
#json_response = response.json()
#action_to_do = json_response.get("ACTIONS_TO_DO", "NO_ACTION")
#if action_to_do == "DOWNLOAD_FILES":
self.remote_fetch(machine_id)
#else:
# self.log.info(f"No action required: {json_response}")
except JSONDecodeError as e:
self.log.error(f"JSON decode error: {str(e)} - Content: {response.content}")
else:
self.log.error(f"Failed to update device info: {response.status_code} - {response.content}")