2022-06-01 16:37:19 +00:00
import json
2022-10-03 11:48:59 +00:00
import os
2022-06-01 16:37:19 +00:00
import re
import sys
2024-04-23 09:15:23 +00:00
import threading
import time
2024-12-02 11:24:02 +00:00
from datetime import datetime
2024-10-23 09:46:57 +00:00
from pathlib import Path
2024-04-23 09:15:23 +00:00
import requests
2022-06-01 16:37:19 +00:00
import traceback
import requests
from google . api_core . exceptions import Forbidden
from google . cloud import storage
2024-10-29 16:28:35 +00:00
from requests import JSONDecodeError
2022-06-01 16:37:19 +00:00
from lib . db import Archive , db
2024-12-02 11:24:02 +00:00
from lib . db . models import Log
2022-06-01 16:37:19 +00:00
from PyQt5 . QtCore import QThread
from requests . adapters import HTTPAdapter , Retry
from urllib3 . exceptions import InsecureRequestWarning
2024-12-18 13:46:25 +00:00
from lib . helpers . recipe_manager import import_recipes , backup_current_recipes
2022-06-01 16:37:19 +00:00
from . component import Component
2024-12-02 16:00:32 +00:00
from ui . helpers import get_main_window
2022-06-01 16:37:19 +00:00
# Suppress insecure request warning
requests . packages . urllib3 . disable_warnings ( category = InsecureRequestWarning )
2024-11-06 20:00:17 +00:00
2022-06-01 16:37:19 +00:00
class ArchiveSynchronizer ( Component ) :
def __init__ ( self , config = None , name = None , period = 1 , lazy = True , paused = False , threaded = True ) :
super ( ) . __init__ ( config = config , name = name , period = period , lazy = lazy , paused = paused , threaded = threaded )
2024-12-02 16:00:32 +00:00
self . main_window = None
2022-06-01 16:37:19 +00:00
self . simulate = " --sim-archiver " in sys . argv
2024-10-29 11:04:28 +00:00
self . machine_status = " logged-in "
2024-11-06 20:00:17 +00:00
self . machine_id = None
2024-10-29 11:03:20 +00:00
2022-06-01 16:37:19 +00:00
def config_changed ( self ) :
self . machine_id = self . config . machine_id
2024-04-23 09:15:23 +00:00
if " --dev-portal " in sys . argv :
2024-10-21 15:17:10 +00:00
self . archive_endpoint = f " https://dev.r5portal.it/api/st-ten-save/ "
2024-11-06 11:37:19 +00:00
self . status_endpoint = f " https://dev.r5portal.it/api/device-info-update/ "
2024-11-08 15:47:25 +00:00
self . download_endpoint = f " https://dev.r5portal.it/media/uploads/ "
2024-04-23 09:15:23 +00:00
else :
2024-11-08 15:47:25 +00:00
self . download_endpoint = self . config [ self . name ] [ " portal_address " ] + self . config [ self . name ] [ " download_root " ]
self . archive_endpoint = self . config [ self . name ] [ " portal_address " ] + self . config [ self . name ] [ " archive_root " ]
self . status_endpoint = self . config [ self . name ] [ " portal_address " ] + self . config [ self . name ] [ " status_root " ]
2022-06-29 09:02:58 +00:00
self . _do_set_period ( { " period " : float ( self . config [ self . name ] [ " poll_time " ] ) } )
self . hold_time = round ( float ( self . config [ self . name ] [ " hold_time " ] ) * 1000 )
2024-12-06 10:51:45 +00:00
if self . name == " archive_synchronizer " :
self . gcs_client = storage . Client . from_service_account_json ( self . config [ self . name ] [ " service_account_json " ] )
self . gcs_client . _http . mount ( " " , HTTPAdapter ( max_retries = Retry ( total = 0 ) ) ) # this seems to be useless
self . gcs_client . _http . adapters . move_to_end ( " " , last = False ) # this seems to be useless
self . bucket_id = self . config [ self . name ] [ " bucket_id " ]
2022-06-01 16:37:19 +00:00
self . gcs_bucket = None
@db.connection_context ( )
def _get ( self ) :
2024-12-02 16:00:32 +00:00
if self . main_window is None :
self . main_window = get_main_window ( )
2024-12-06 11:05:15 +00:00
if self . name != " archive_synchronizer_extra " :
2024-12-06 10:58:57 +00:00
# MAIN SERVER
bit_pos = 0
unsaved_records = Archive . select ( ) . where ( ( Archive . archived == 0 ) |
( Archive . archived == 2 ) |
( Archive . uploaded == 0 ) )
2025-02-03 14:33:41 +00:00
saved_mask = [ 1 , 3 ]
2024-12-06 10:58:57 +00:00
else :
# EXTRA SERVER (VERO PROJECT SPA)
bit_pos = 1
unsaved_records = Archive . select ( ) . where ( ( Archive . archived == 0 ) |
( Archive . archived == 1 ) )
2025-02-03 14:33:41 +00:00
saved_mask = [ 0 , 2 ]
2024-12-06 10:58:57 +00:00
for record in unsaved_records :
2022-06-01 16:37:19 +00:00
if not self . simulate :
2025-02-03 14:33:41 +00:00
if record . archived not in saved_mask :
2024-12-06 10:58:57 +00:00
s = time . time ( )
save_ok = self . remote_archive ( record ) is True
e = time . time ( )
else :
2025-01-24 15:51:58 +00:00
save_ok = True
2022-06-01 16:37:19 +00:00
if record . uploaded is not True :
record . uploaded = self . remote_store ( record ) is True
2023-07-18 14:47:06 +00:00
else :
self . log . info ( " simulated archive synchronizer cycle " )
2024-12-06 10:58:57 +00:00
save_ok = True
if save_ok :
record . archived | = ( 1 << bit_pos )
2025-01-24 15:51:58 +00:00
# self.log.info(f"({self.name}) id {record.id}: archived remotely")
2024-12-06 10:58:57 +00:00
else :
2025-01-24 15:51:58 +00:00
pass
# self.log.info(f"({self.name}) id {record.id}: failed to archive remotely")
2025-01-27 13:37:48 +00:00
if self . main_window is not None :
self . main_window . run_request . emit ( record . save , [ ] , { } )
2024-12-02 16:00:32 +00:00
2022-06-01 16:37:19 +00:00
if self . hold_time > 0 :
QThread . msleep ( self . hold_time )
self . gcs_bucket = None
2024-10-24 09:11:03 +00:00
2024-11-06 11:37:19 +00:00
# UPDATE MACHINE STATUS
2024-11-08 15:47:25 +00:00
self . machine_status = " working "
2024-11-06 11:37:19 +00:00
2022-06-01 16:37:19 +00:00
super ( ) . _get ( )
2024-12-05 16:31:49 +00:00
if self . name == " archive_synchronizer " :
self . update_machine_status ( )
2022-06-01 16:37:19 +00:00
2024-04-23 09:15:23 +00:00
def update_machine_status ( self ) :
2024-11-06 11:37:19 +00:00
status_call = f " { self . status_endpoint } ?machine-id= { self . machine_id . upper ( ) } &status= { self . machine_status } "
2024-04-23 09:15:23 +00:00
response = None
try :
if not self . simulate :
with requests . Session ( ) as s :
s . mount ( " " , HTTPAdapter ( max_retries = Retry ( total = 0 ) ) ) # this disables retries
2024-11-08 15:47:25 +00:00
response = s . post ( status_call , timeout = 5 , verify = False )
2024-04-23 09:15:23 +00:00
if response . status_code != 200 :
raise AssertionError ( " bad status response " )
2024-11-08 15:47:25 +00:00
else :
self . parse_response_and_execute ( response )
2024-04-23 09:15:23 +00:00
except AssertionError as e :
2025-01-24 15:51:58 +00:00
if response is not None :
self . log . warning ( f " Status: { self . machine_status } : failed to update machine status: { str ( e ) } : { response . status_code } : { response . content } " )
else :
self . log . warning ( f " Status: no response " )
2024-04-23 09:15:23 +00:00
return False
except ( requests . ConnectionError , requests . Timeout ) as e :
self . log . warning (
2024-10-24 09:11:03 +00:00
f " Status: { self . machine_status } : failed to update machine status, archive_endpoint might be unreachable: { str ( e ) } "
2024-04-23 09:15:23 +00:00
)
return False
except Exception :
self . log . error (
2025-01-24 15:51:58 +00:00
f " Status: { self . machine_status } : failed to update machine status: \n { traceback . format_exc ( ) } : \n { response . status_code } : { response . content } "
2024-04-23 09:15:23 +00:00
)
return False
2024-10-24 09:11:03 +00:00
self . log . info ( f " Status: { self . machine_status } : Machine Status Updated Successfully " )
2024-04-23 09:15:23 +00:00
return True
2024-11-06 20:00:17 +00:00
2024-11-08 15:47:25 +00:00
def parse_response_and_execute ( self , response ) :
2024-12-18 13:46:25 +00:00
"""
Parse the response and execute actions based on the ` ACTIONS_TO_DO ` received .
"""
2024-11-08 15:47:25 +00:00
try :
data = response . json ( )
if not isinstance ( data , dict ) :
raise ValueError ( " Parsed response is not a dictionary " )
actions = data . get ( " ACTIONS_TO_DO " , [ ] )
# Ensure actions is a list
if isinstance ( actions , dict ) :
actions = [ actions ]
for action in actions :
2024-12-18 13:46:25 +00:00
action_type = action . get ( " action " ) # Determine which type of action to perform
if action_type == " import " : # Handle import action
remote_path = action . get ( " remote_path " )
if not remote_path :
self . log . warning ( " Import action received without a remote_path. " )
continue
# Use remote_fetch to download the recipe file from the server
fetch_result = self . remote_fetch ( remote_path = remote_path , local_path = " tmp " )
if ' downloaded_file ' in fetch_result :
downloaded_file = fetch_result [ ' downloaded_file ' ]
self . log . info ( f " Recipe file downloaded successfully to { downloaded_file } . " )
# Perform the import action
try :
# Backup current recipes before importing
backup_path = backup_current_recipes (
config = self . config , # Backup configuration object
logger = self . log # Logger for backup messages
)
self . log . info ( f " Backup created successfully at { backup_path } . " )
# Proceed with importing recipes
import_recipes (
config = self . config ,
csv_path = downloaded_file , # Use the downloaded file path
logger = self . log
)
self . log . info ( f " Imported recipes successfully from { downloaded_file } . " )
except Exception as e :
self . log . error ( f " Failed to import recipes: { str ( e ) } " )
continue
else :
self . log . warning ( f " Failed to fetch the recipe file: { fetch_result . get ( ' error ' ) } . " )
elif action_type == " download " : # Handle fetch action
remote_path = action . get ( " remote_path " )
local_path = action . get ( " local_path " , " tmp " ) # Use "tmp" as a fallback local path
self . log . info (
f " Executing remote fetch with remote_path: { remote_path } and local_path: { local_path } " )
result = self . remote_fetch ( remote_path = remote_path , local_path = local_path )
self . log . info ( f " Remote fetch result: { result } " )
else :
self . log . warning ( f " Unhandled action type: { action_type } " )
2024-11-08 15:47:25 +00:00
except json . JSONDecodeError :
self . log . error ( " Failed to decode JSON response " )
except Exception as e :
self . log . error ( f " An unexpected error occurred while parsing response: { str ( e ) } " )
2022-06-01 16:37:19 +00:00
def remote_archive ( self , record ) :
2024-04-23 09:15:23 +00:00
r = None
2022-06-01 16:37:19 +00:00
try :
if not self . simulate :
with requests . Session ( ) as s :
s . mount ( " " , HTTPAdapter ( max_retries = Retry ( total = 0 ) ) ) # this disables retries
2024-12-06 13:25:41 +00:00
if self . name == " archive_synchronizer " :
r = requests . post ( self . archive_endpoint , params = {
" data " : json . dumps ( record . test_data ) ,
" machine_id " : self . machine_id ,
" overridden " : record . overridden ,
" recipe " : record . test_data . get ( " recipe " , { } ) . get ( " name " , None ) ,
" result " : " OK " if record . result else " KO " ,
" serial " : record . id ,
" time " : record . time . isoformat ( ) ,
" user " : record . user . username ,
} , timeout = 5 , verify = False )
else :
r = requests . get ( self . archive_endpoint , params = {
" machine_id " : self . machine_id ,
" overridden " : record . overridden ,
" recipe " : record . test_data . get ( " recipe " , { } ) . get ( " name " , None ) ,
" result " : " OK " if record . result else " KO " ,
" serial " : record . id ,
" time " : record . time . isoformat ( ) ,
" user " : record . user . username ,
} , timeout = 5 , verify = False )
2022-06-01 16:37:19 +00:00
if r . status_code != 200 :
raise AssertionError ( " bad status response " )
except AssertionError as e :
self . log . warning ( f " id: { record . id } : failed to archive remotely: { str ( e ) } : { r . status_code } : { r . content } " )
return False
except ( requests . ConnectionError , requests . Timeout ) as e :
self . log . warning ( f " id: { record . id } : failed to archive remotely, archive_endpoint might be unreachable: { str ( e ) } " )
return False
2024-12-02 16:00:32 +00:00
self . log . info ( f " Archived successfully: { record . id } " )
2022-06-01 16:37:19 +00:00
return True
def remote_store ( self , record ) :
2022-10-03 11:48:59 +00:00
if " vision " not in record . test_data :
return True
2022-06-01 16:37:19 +00:00
try :
self . gcs_bucket = self . gcs_client . get_bucket ( self . bucket_id , timeout = ( 5 , 30 ) , retry = None ) # retry=None seems to be ignored
except Exception :
self . log . warning ( f " id { record . id } : failed to connect to bucket { repr ( self . bucket_id ) } : \n { traceback . format_exc ( ) } " )
self . gcs_bucket = None
if self . gcs_bucket is None :
return False
2025-01-24 15:51:58 +00:00
for path in record . test_data [ " vision " ] [ " files " ] :
2022-10-03 11:48:59 +00:00
dt = record . time
# path_in = f"{self.images_path}/{dt.strftime('%Y')}/{dt.strftime('%m')}/{os.path.basename(path)}"
path_in = path
path_out = f " { self . machine_id } / { dt . strftime ( ' % Y ' ) } / { dt . strftime ( ' % m ' ) } / { os . path . basename ( path ) } "
try :
blob = self . gcs_bucket . blob ( path_out )
if not self . simulate :
blob . upload_from_filename ( filename = path_in )
except FileNotFoundError :
self . log . error ( f " id { record . id } : { path_in } : file not found. " )
2022-06-01 16:37:19 +00:00
return False
2022-10-03 11:48:59 +00:00
except Forbidden as e :
if re . match ( " ^Object ' .*? ' is subject to bucket ' s retention policy and cannot be deleted, overwritten or archived until .*?$ " , e . _response . json ( ) [ " error " ] [ " message " ] ) is not None :
self . log . info ( f " id { record . id } : { path_in } : already stored. " )
else :
self . log . warning ( f " id: { record . id } : failed to store remotely: { str ( e ) } : { e . _response . json ( ) } " )
return False
except Exception :
self . log . error ( f " id: { record . id } : failed to store remotely: \n { traceback . format_exc ( ) } " )
2024-12-02 16:00:32 +00:00
self . log . info ( f " Stored successfully: { record . id } " )
2022-06-01 16:37:19 +00:00
return True
2024-10-21 15:17:10 +00:00
2024-11-08 15:47:25 +00:00
def remote_fetch ( self , remote_path = None , local_path = None ) :
2024-10-21 15:17:10 +00:00
"""
2024-12-09 14:37:08 +00:00
Download a single file from the server and retrieve the last update time and response status .
2024-10-21 15:17:10 +00:00
2024-12-02 11:24:02 +00:00
: param remote_path : Path of where to download the file from
: param local_path : Path of where to save the file to
2024-12-09 14:37:08 +00:00
: return : A dictionary with errors if any occur , and last update information .
2024-10-21 15:17:10 +00:00
"""
2024-11-06 20:00:17 +00:00
if remote_path is None :
raise ValueError ( " remote_path cannot be None " )
2024-11-08 15:47:25 +00:00
if local_path is None :
raise ValueError ( " local_path cannot be None " )
2024-12-09 14:37:08 +00:00
if " --dev-portal " in sys . argv :
call_url = f " https://dev.r5portal.it/ { remote_path } "
else :
call_url = f " https://r5portal.it/ { remote_path } "
2024-12-02 11:24:02 +00:00
log_info_type = " Download "
log_time = datetime . now ( )
log_info = f " Attempted to download from { call_url } "
2024-12-09 14:37:08 +00:00
last_update_info = None
2024-10-29 16:28:35 +00:00
try :
if not self . simulate :
with requests . Session ( ) as s :
2024-11-08 15:47:25 +00:00
self . log . info ( f " Fetching file from: { call_url } " )
2024-10-29 16:28:35 +00:00
2024-11-06 20:00:17 +00:00
response = s . get ( call_url , timeout = 5 , verify = False )
2024-10-29 16:28:35 +00:00
self . log . info ( f " HTTP Status Code: { response . status_code } " )
2024-12-09 14:37:08 +00:00
last_update_info = {
" last_update_time " : log_time . isoformat ( ) ,
" response_status " : response . status_code
}
2024-10-29 16:28:35 +00:00
if response . status_code == 404 :
2024-12-02 11:24:02 +00:00
log_info + = " - File not found "
self . log . warning ( log_info )
self . log_to_db ( log_time , log_info_type , log_info )
2024-12-09 14:37:08 +00:00
return { " error " : " File not found " , " last_update_info " : last_update_info }
2024-12-02 11:24:02 +00:00
elif response . status_code in [ 403 , 401 ] :
log_info + = " - Access forbidden or not logged in "
self . log . warning ( log_info )
self . log_to_db ( log_time , log_info_type , log_info )
2024-12-09 14:37:08 +00:00
return { " error " : " Access forbidden or not logged in " , " last_update_info " : last_update_info }
2024-10-29 16:28:35 +00:00
elif response . status_code != 200 :
2024-12-02 11:24:02 +00:00
log_info + = f " - Unexpected HTTP response status: { response . status_code } "
self . log . error ( log_info )
self . log_to_db ( log_time , log_info_type , log_info )
2024-12-09 14:37:08 +00:00
return { " error " : " Unexpected HTTP response status " , " last_update_info " : last_update_info }
2024-10-29 16:28:35 +00:00
2024-12-18 13:46:25 +00:00
# Make sure the local path exists
2024-11-08 15:47:25 +00:00
os . makedirs ( local_path , exist_ok = True )
2024-10-29 16:28:35 +00:00
2024-12-18 13:46:25 +00:00
# Construct the correct file path
2024-11-08 15:47:25 +00:00
local_file_path = os . path . join ( local_path , os . path . basename ( remote_path ) )
with open ( local_file_path , " wb " ) as f :
f . write ( response . content )
2024-10-23 07:36:55 +00:00
2024-12-02 11:24:02 +00:00
log_info + = f " - File downloaded successfully: { local_file_path } "
self . log . info ( log_info )
self . log_to_db ( log_time , log_info_type , log_info )
2024-12-18 13:46:25 +00:00
2024-12-09 14:37:08 +00:00
return { " downloaded_file " : local_file_path , " last_update_info " : last_update_info }
2024-10-29 16:28:35 +00:00
2024-11-08 15:47:25 +00:00
except requests . ConnectionError as e :
2024-12-02 11:24:02 +00:00
log_info + = f " - Connection error: { str ( e ) } "
self . log . error ( log_info )
self . log_to_db ( log_time , log_info_type , log_info )
2024-12-09 14:37:08 +00:00
return { " error " : " Connection error " , " last_update_info " : last_update_info }
2024-11-08 15:47:25 +00:00
except requests . Timeout as e :
2024-12-02 11:24:02 +00:00
log_info + = f " - Timeout error: { str ( e ) } "
self . log . error ( log_info )
self . log_to_db ( log_time , log_info_type , log_info )
2024-12-09 14:37:08 +00:00
return { " error " : " Timeout error " , " last_update_info " : last_update_info }
2024-10-29 16:28:35 +00:00
except Exception as e :
2024-12-02 11:24:02 +00:00
log_info + = f " - Unexpected error: { str ( e ) } "
self . log . error ( log_info )
self . log_to_db ( log_time , log_info_type , log_info )
2024-12-09 14:37:08 +00:00
return { " error " : " Unexpected error " , " last_update_info " : last_update_info }
2024-10-23 07:20:41 +00:00
2024-12-02 11:24:02 +00:00
def log_to_db ( self , log_time , log_info_type , log_info ) :
""" Save log information to the database. """
try :
# Use the Log class instead of the log instance
new_log_entry = Log ( time = log_time , info_type = log_info_type , info = log_info )
new_log_entry . save ( ) # Save the log entry to the database
except Exception as e :
self . log . error ( f " Failed to save log to database: { str ( e ) } " )
self . log . error ( traceback . format_exc ( ) )