2022-06-01 16:37:19 +00:00
import json
2022-10-03 11:48:59 +00:00
import os
2022-06-01 16:37:19 +00:00
import re
import sys
2024-04-23 09:15:23 +00:00
import threading
import time
2024-10-23 09:46:57 +00:00
from pathlib import Path
2024-04-23 09:15:23 +00:00
import requests
2022-06-01 16:37:19 +00:00
import traceback
import requests
from google . api_core . exceptions import Forbidden
from google . cloud import storage
2024-10-29 16:28:35 +00:00
from requests import JSONDecodeError
2022-06-01 16:37:19 +00:00
from lib . db import Archive , db
from PyQt5 . QtCore import QThread
from requests . adapters import HTTPAdapter , Retry
from urllib3 . exceptions import InsecureRequestWarning
from . component import Component
# Suppress insecure request warning
requests . packages . urllib3 . disable_warnings ( category = InsecureRequestWarning )
2024-11-06 20:00:17 +00:00
2022-06-01 16:37:19 +00:00
class ArchiveSynchronizer ( Component ) :
def __init__ ( self , config = None , name = None , period = 1 , lazy = True , paused = False , threaded = True ) :
super ( ) . __init__ ( config = config , name = name , period = period , lazy = lazy , paused = paused , threaded = threaded )
self . simulate = " --sim-archiver " in sys . argv
2024-10-29 11:04:28 +00:00
self . machine_status = " logged-in "
2024-11-06 20:00:17 +00:00
self . machine_id = None
2024-10-29 11:03:20 +00:00
2022-06-01 16:37:19 +00:00
def config_changed ( self ) :
self . machine_id = self . config . machine_id
2024-04-23 09:15:23 +00:00
if " --dev-portal " in sys . argv :
2024-10-21 15:17:10 +00:00
self . archive_endpoint = f " https://dev.r5portal.it/api/st-ten-save/ "
2024-11-06 11:37:19 +00:00
self . status_endpoint = f " https://dev.r5portal.it/api/device-info-update/ "
2024-11-08 15:47:25 +00:00
self . download_endpoint = f " https://dev.r5portal.it/media/uploads/ "
2024-04-23 09:15:23 +00:00
else :
2024-11-08 15:47:25 +00:00
self . download_endpoint = self . config [ self . name ] [ " portal_address " ] + self . config [ self . name ] [ " download_root " ]
self . archive_endpoint = self . config [ self . name ] [ " portal_address " ] + self . config [ self . name ] [ " archive_root " ]
self . status_endpoint = self . config [ self . name ] [ " portal_address " ] + self . config [ self . name ] [ " status_root " ]
2022-06-29 09:02:58 +00:00
self . _do_set_period ( { " period " : float ( self . config [ self . name ] [ " poll_time " ] ) } )
self . hold_time = round ( float ( self . config [ self . name ] [ " hold_time " ] ) * 1000 )
self . gcs_client = storage . Client . from_service_account_json ( self . config [ self . name ] [ " service_account_json " ] )
2022-06-01 16:37:19 +00:00
self . gcs_client . _http . mount ( " " , HTTPAdapter ( max_retries = Retry ( total = 0 ) ) ) # this seems to be useless
self . gcs_client . _http . adapters . move_to_end ( " " , last = False ) # this seems to be useless
2022-06-29 09:02:58 +00:00
self . bucket_id = self . config [ self . name ] [ " bucket_id " ]
2022-06-01 16:37:19 +00:00
self . gcs_bucket = None
@db.connection_context ( )
def _get ( self ) :
2022-10-12 12:52:34 +00:00
for record in list ( Archive . select ( ) . where ( ( Archive . archived != True ) | ( Archive . uploaded != True ) ) ) : # using "is not True" breaks the query.. # list() forces the complete execution of the query unlocking the db unlike __enter__()
2022-06-01 16:37:19 +00:00
if not self . simulate :
if record . archived is not True :
record . archived = self . remote_archive ( record ) is True
if record . uploaded is not True :
record . uploaded = self . remote_store ( record ) is True
2023-07-18 14:47:06 +00:00
else :
self . log . info ( " simulated archive synchronizer cycle " )
2022-06-01 16:37:19 +00:00
record . save ( )
if self . hold_time > 0 :
QThread . msleep ( self . hold_time )
self . gcs_bucket = None
2024-10-24 09:11:03 +00:00
2024-11-06 11:37:19 +00:00
# UPDATE MACHINE STATUS
2024-11-08 15:47:25 +00:00
self . machine_status = " working "
2024-11-06 11:37:19 +00:00
2022-06-01 16:37:19 +00:00
super ( ) . _get ( )
2024-11-08 15:47:25 +00:00
self . update_machine_status ( )
2022-06-01 16:37:19 +00:00
2024-04-23 09:15:23 +00:00
def update_machine_status ( self ) :
2024-11-06 11:37:19 +00:00
status_call = f " { self . status_endpoint } ?machine-id= { self . machine_id . upper ( ) } &status= { self . machine_status } "
2024-04-23 09:15:23 +00:00
response = None
try :
if not self . simulate :
with requests . Session ( ) as s :
s . mount ( " " , HTTPAdapter ( max_retries = Retry ( total = 0 ) ) ) # this disables retries
2024-11-08 15:47:25 +00:00
response = s . post ( status_call , timeout = 5 , verify = False )
2024-04-23 09:15:23 +00:00
if response . status_code != 200 :
raise AssertionError ( " bad status response " )
2024-11-08 15:47:25 +00:00
else :
self . parse_response_and_execute ( response )
2024-04-23 09:15:23 +00:00
except AssertionError as e :
self . log . warning (
2024-10-24 09:11:03 +00:00
f " Status: { self . machine_status } : failed to update machine status: { str ( e ) } : { response . status_code if response else ' no response ' } : { response . content if response else ' no response ' } "
2024-04-23 09:15:23 +00:00
)
return False
except ( requests . ConnectionError , requests . Timeout ) as e :
self . log . warning (
2024-10-24 09:11:03 +00:00
f " Status: { self . machine_status } : failed to update machine status, archive_endpoint might be unreachable: { str ( e ) } "
2024-04-23 09:15:23 +00:00
)
return False
except Exception :
self . log . error (
2024-10-24 09:11:03 +00:00
f " Status: { self . machine_status } : failed to update machine status: \n { traceback . format_exc ( ) } : \n { response . status_code if response else ' no response ' } : { response . content if response else ' no response ' } "
2024-04-23 09:15:23 +00:00
)
return False
2024-10-24 09:11:03 +00:00
self . log . info ( f " Status: { self . machine_status } : Machine Status Updated Successfully " )
2024-04-23 09:15:23 +00:00
return True
2024-11-06 20:00:17 +00:00
2024-11-08 15:47:25 +00:00
def parse_response_and_execute ( self , response ) :
try :
data = response . json ( )
if not isinstance ( data , dict ) :
raise ValueError ( " Parsed response is not a dictionary " )
actions = data . get ( " ACTIONS_TO_DO " , [ ] )
# Ensure actions is a list
if isinstance ( actions , dict ) :
actions = [ actions ]
for action in actions :
remote_path = action . get ( " remote_path " )
local_path = action . get ( " local_path " )
self . log . info ( f " Executing remote fetch with remote_path: { remote_path } and local_path: { local_path } " )
result = self . remote_fetch ( remote_path = remote_path , local_path = local_path )
self . log . info ( f " Remote fetch result: { result } " )
except json . JSONDecodeError :
self . log . error ( " Failed to decode JSON response " )
except Exception as e :
self . log . error ( f " An unexpected error occurred while parsing response: { str ( e ) } " )
2022-06-01 16:37:19 +00:00
def remote_archive ( self , record ) :
2024-04-23 09:15:23 +00:00
r = None
2022-06-01 16:37:19 +00:00
try :
if not self . simulate :
with requests . Session ( ) as s :
s . mount ( " " , HTTPAdapter ( max_retries = Retry ( total = 0 ) ) ) # this disables retries
r = requests . post ( self . archive_endpoint , params = {
2022-10-03 11:48:59 +00:00
" data " : json . dumps ( record . test_data ) ,
2022-06-01 16:37:19 +00:00
" machine_id " : self . machine_id ,
2022-10-03 11:48:59 +00:00
" overridden " : record . overridden ,
" recipe " : record . test_data . get ( " recipe " , { } ) . get ( " name " , None ) ,
" result " : " OK " if record . result else " KO " ,
" serial " : record . id ,
2022-06-01 16:37:19 +00:00
" time " : record . time . isoformat ( ) ,
" user " : record . user . username ,
} , timeout = 5 , verify = False )
if r . status_code != 200 :
raise AssertionError ( " bad status response " )
except AssertionError as e :
self . log . warning ( f " id: { record . id } : failed to archive remotely: { str ( e ) } : { r . status_code } : { r . content } " )
return False
except ( requests . ConnectionError , requests . Timeout ) as e :
self . log . warning ( f " id: { record . id } : failed to archive remotely, archive_endpoint might be unreachable: { str ( e ) } " )
return False
return True
def remote_store ( self , record ) :
2022-10-03 11:48:59 +00:00
if " vision " not in record . test_data :
return True
2022-06-01 16:37:19 +00:00
try :
self . gcs_bucket = self . gcs_client . get_bucket ( self . bucket_id , timeout = ( 5 , 30 ) , retry = None ) # retry=None seems to be ignored
except Exception :
self . log . warning ( f " id { record . id } : failed to connect to bucket { repr ( self . bucket_id ) } : \n { traceback . format_exc ( ) } " )
self . gcs_bucket = None
if self . gcs_bucket is None :
return False
2022-10-03 11:48:59 +00:00
for path in record . test_data [ " vision " ] [ " 0 " ] [ " files " ] :
dt = record . time
# path_in = f"{self.images_path}/{dt.strftime('%Y')}/{dt.strftime('%m')}/{os.path.basename(path)}"
path_in = path
path_out = f " { self . machine_id } / { dt . strftime ( ' % Y ' ) } / { dt . strftime ( ' % m ' ) } / { os . path . basename ( path ) } "
try :
blob = self . gcs_bucket . blob ( path_out )
if not self . simulate :
blob . upload_from_filename ( filename = path_in )
except FileNotFoundError :
self . log . error ( f " id { record . id } : { path_in } : file not found. " )
2022-06-01 16:37:19 +00:00
return False
2022-10-03 11:48:59 +00:00
except Forbidden as e :
if re . match ( " ^Object ' .*? ' is subject to bucket ' s retention policy and cannot be deleted, overwritten or archived until .*?$ " , e . _response . json ( ) [ " error " ] [ " message " ] ) is not None :
self . log . info ( f " id { record . id } : { path_in } : already stored. " )
else :
self . log . warning ( f " id: { record . id } : failed to store remotely: { str ( e ) } : { e . _response . json ( ) } " )
return False
except Exception :
self . log . error ( f " id: { record . id } : failed to store remotely: \n { traceback . format_exc ( ) } " )
self . log . info ( f " id: { record . id } : stored remotely " )
2022-06-01 16:37:19 +00:00
return True
2024-10-21 15:17:10 +00:00
2024-11-08 15:47:25 +00:00
def remote_fetch ( self , remote_path = None , local_path = None ) :
2024-10-21 15:17:10 +00:00
"""
2024-11-06 20:00:17 +00:00
download a single file from the server .
2024-10-21 15:17:10 +00:00
2024-11-06 20:00:17 +00:00
: param remote_path : path of where to download the file from
: param local_path : path of where to save the file to
: return : a dictionary with errors if any occur .
2024-10-21 15:17:10 +00:00
"""
2024-11-06 20:00:17 +00:00
if remote_path is None :
raise ValueError ( " remote_path cannot be None " )
2024-11-08 15:47:25 +00:00
if local_path is None :
raise ValueError ( " local_path cannot be None " )
2024-11-11 15:57:51 +00:00
call_url = f " https://dev.r5portal.it/ { remote_path } "
2024-10-29 16:28:35 +00:00
try :
if not self . simulate :
with requests . Session ( ) as s :
2024-11-08 15:47:25 +00:00
self . log . info ( f " Fetching file from: { call_url } " )
2024-10-29 16:28:35 +00:00
2024-11-08 15:47:25 +00:00
# Make the HTTP GET request to fetch the file
2024-11-06 20:00:17 +00:00
response = s . get ( call_url , timeout = 5 , verify = False )
2024-10-29 16:28:35 +00:00
# Log response details
self . log . info ( f " HTTP Status Code: { response . status_code } " )
2024-11-08 15:47:25 +00:00
#self.log.info(f"Response Headers: {response.headers}")
#self.log.info(f"Response Content: {response.content}")
2024-10-29 16:28:35 +00:00
2024-11-08 15:47:25 +00:00
# Handle HTTP errors
2024-10-29 16:28:35 +00:00
if response . status_code == 404 :
2024-11-08 15:47:25 +00:00
self . log . warning ( f " File not found: { call_url } . Please check the URL path. " )
return { " error " : " File not found " }
2024-10-29 16:28:35 +00:00
elif response . status_code == 403 or response . status_code == 401 :
2024-11-08 15:47:25 +00:00
self . log . warning ( f " Access forbidden or not logged in for file: { call_url } " )
2024-10-29 16:28:35 +00:00
return { " error " : " Access forbidden or not logged in " }
elif response . status_code != 200 :
2024-11-08 15:47:25 +00:00
self . log . error ( f " Unexpected HTTP response status: { response . status_code } for URL: { call_url } " )
2024-10-29 16:28:35 +00:00
return { " error " : " Unexpected HTTP response status " }
2024-11-08 15:47:25 +00:00
# Ensure the directory exists
os . makedirs ( local_path , exist_ok = True )
2024-10-29 16:28:35 +00:00
2024-11-08 15:47:25 +00:00
# Save the file to the local path
local_file_path = os . path . join ( local_path , os . path . basename ( remote_path ) )
with open ( local_file_path , " wb " ) as f :
f . write ( response . content )
2024-10-23 07:36:55 +00:00
2024-11-08 15:47:25 +00:00
self . log . info ( f " File downloaded successfully: { local_file_path } " )
return { " downloaded_file " : local_file_path }
2024-10-29 16:28:35 +00:00
2024-11-08 15:47:25 +00:00
except requests . ConnectionError as e :
self . log . error ( f " Connection error occurred while fetching the file: { str ( e ) } " )
return { " error " : " Connection error " }
except requests . Timeout as e :
self . log . error ( f " Timeout error occurred while fetching the file: { str ( e ) } " )
return { " error " : " Timeout error " }
2024-10-29 16:28:35 +00:00
except Exception as e :
2024-11-08 15:47:25 +00:00
self . log . error ( f " An unexpected error occurred: { str ( e ) } " )
2024-10-29 16:28:35 +00:00
return { " error " : " Unexpected error " }
2024-10-23 07:20:41 +00:00