2022-06-01 16:37:19 +00:00
import json
2022-10-03 11:48:59 +00:00
import os
2022-06-01 16:37:19 +00:00
import re
import sys
2024-04-23 09:15:23 +00:00
import threading
import time
import requests
2022-06-01 16:37:19 +00:00
import traceback
2024-10-21 15:17:10 +00:00
from bs4 import BeautifulSoup
2022-06-01 16:37:19 +00:00
import requests
from google . api_core . exceptions import Forbidden
from google . cloud import storage
from lib . db import Archive , db
from PyQt5 . QtCore import QThread
from requests . adapters import HTTPAdapter , Retry
from urllib3 . exceptions import InsecureRequestWarning
from . component import Component
# Suppress insecure request warning
requests . packages . urllib3 . disable_warnings ( category = InsecureRequestWarning )
class ArchiveSynchronizer ( Component ) :
2024-04-23 09:15:23 +00:00
machine_status = " offline "
2022-06-01 16:37:19 +00:00
def __init__ ( self , config = None , name = None , period = 1 , lazy = True , paused = False , threaded = True ) :
super ( ) . __init__ ( config = config , name = name , period = period , lazy = lazy , paused = paused , threaded = threaded )
self . simulate = " --sim-archiver " in sys . argv
2024-04-23 09:15:23 +00:00
self . status = ArchiveSynchronizer . machine_status
2022-06-01 16:37:19 +00:00
def config_changed ( self ) :
self . machine_id = self . config . machine_id
2024-04-23 09:15:23 +00:00
if " --dev-portal " in sys . argv :
2024-10-21 15:17:10 +00:00
self . archive_endpoint = f " https://dev.r5portal.it/api/st-ten-save/ "
2024-10-23 07:36:55 +00:00
self . url = f " https://dev.r5portal.it/media/uploads/warning_images/st-ten-10/st-ten-11.csv "
2024-04-23 09:15:23 +00:00
else :
self . archive_endpoint = self . config [ self . name ] [ " archive_endpoint " ]
2024-10-21 15:17:10 +00:00
#self.browse_folder_endpoint = self.config[self.name]["browse_folder_endpoint"]
2022-06-29 09:02:58 +00:00
self . _do_set_period ( { " period " : float ( self . config [ self . name ] [ " poll_time " ] ) } )
self . hold_time = round ( float ( self . config [ self . name ] [ " hold_time " ] ) * 1000 )
self . gcs_client = storage . Client . from_service_account_json ( self . config [ self . name ] [ " service_account_json " ] )
2022-06-01 16:37:19 +00:00
self . gcs_client . _http . mount ( " " , HTTPAdapter ( max_retries = Retry ( total = 0 ) ) ) # this seems to be useless
self . gcs_client . _http . adapters . move_to_end ( " " , last = False ) # this seems to be useless
2022-06-29 09:02:58 +00:00
self . bucket_id = self . config [ self . name ] [ " bucket_id " ]
2022-06-01 16:37:19 +00:00
self . gcs_bucket = None
@db.connection_context ( )
def _get ( self ) :
2022-10-12 12:52:34 +00:00
for record in list ( Archive . select ( ) . where ( ( Archive . archived != True ) | ( Archive . uploaded != True ) ) ) : # using "is not True" breaks the query.. # list() forces the complete execution of the query unlocking the db unlike __enter__()
2022-06-01 16:37:19 +00:00
if not self . simulate :
if record . archived is not True :
record . archived = self . remote_archive ( record ) is True
if record . uploaded is not True :
record . uploaded = self . remote_store ( record ) is True
2023-07-18 14:47:06 +00:00
else :
self . log . info ( " simulated archive synchronizer cycle " )
2022-06-01 16:37:19 +00:00
record . save ( )
if self . hold_time > 0 :
QThread . msleep ( self . hold_time )
self . gcs_bucket = None
2024-04-23 09:15:23 +00:00
if " --dev-portal " in sys . argv :
self . update_machine_status ( )
2024-10-23 07:36:55 +00:00
self . remote_fetch ( self . url )
2022-06-01 16:37:19 +00:00
super ( ) . _get ( )
2024-04-23 09:15:23 +00:00
def update_machine_status ( self ) :
self . status = ArchiveSynchronizer . machine_status
#print(f"machine_status_global: {ArchiveSynchronizer.machine_status}") # TESTING
self . status_endpoint = f " https://dev.r5portal.it/api/device-info-update?machine-id= { self . machine_id . upper ( ) } &status= { self . status } "
if self . status not in [ " working " , " logged-in " , " logged-out " ] :
status_dict = { " last_status " : " offline " }
else :
status_dict = { " last_status " : self . status }
response = None
try :
if not self . simulate :
with requests . Session ( ) as s :
s . mount ( " " , HTTPAdapter ( max_retries = Retry ( total = 0 ) ) ) # this disables retries
response = requests . post ( self . status_endpoint , json = status_dict , timeout = 5 , verify = False )
if response . status_code != 200 :
raise AssertionError ( " bad status response " )
except AssertionError as e :
self . log . warning (
f " Status: { self . status } : failed to update machine status: { str ( e ) } : { response . status_code if response else ' no response ' } : { response . content if response else ' no response ' } "
)
return False
except ( requests . ConnectionError , requests . Timeout ) as e :
self . log . warning (
f " Status: { self . status } : failed to update machine status, archive_endpoint might be unreachable: { str ( e ) } "
)
return False
except Exception :
self . log . error (
f " Status: { self . status } : failed to update machine status: \n { traceback . format_exc ( ) } : \n { response . status_code if response else ' no response ' } : { response . content if response else ' no response ' } "
)
return False
self . log . info ( f " Status: { self . status } : Machine Status Updated Successfully " )
return True
2022-06-01 16:37:19 +00:00
def remote_archive ( self , record ) :
2024-04-23 09:15:23 +00:00
r = None
2022-06-01 16:37:19 +00:00
try :
if not self . simulate :
with requests . Session ( ) as s :
s . mount ( " " , HTTPAdapter ( max_retries = Retry ( total = 0 ) ) ) # this disables retries
r = requests . post ( self . archive_endpoint , params = {
2022-10-03 11:48:59 +00:00
" data " : json . dumps ( record . test_data ) ,
2022-06-01 16:37:19 +00:00
" machine_id " : self . machine_id ,
2022-10-03 11:48:59 +00:00
" overridden " : record . overridden ,
" recipe " : record . test_data . get ( " recipe " , { } ) . get ( " name " , None ) ,
" result " : " OK " if record . result else " KO " ,
" serial " : record . id ,
2022-06-01 16:37:19 +00:00
" time " : record . time . isoformat ( ) ,
" user " : record . user . username ,
} , timeout = 5 , verify = False )
if r . status_code != 200 :
raise AssertionError ( " bad status response " )
except AssertionError as e :
self . log . warning ( f " id: { record . id } : failed to archive remotely: { str ( e ) } : { r . status_code } : { r . content } " )
return False
except ( requests . ConnectionError , requests . Timeout ) as e :
self . log . warning ( f " id: { record . id } : failed to archive remotely, archive_endpoint might be unreachable: { str ( e ) } " )
return False
return True
def remote_store ( self , record ) :
2022-10-03 11:48:59 +00:00
if " vision " not in record . test_data :
return True
2022-06-01 16:37:19 +00:00
try :
self . gcs_bucket = self . gcs_client . get_bucket ( self . bucket_id , timeout = ( 5 , 30 ) , retry = None ) # retry=None seems to be ignored
except Exception :
self . log . warning ( f " id { record . id } : failed to connect to bucket { repr ( self . bucket_id ) } : \n { traceback . format_exc ( ) } " )
self . gcs_bucket = None
if self . gcs_bucket is None :
return False
2022-10-03 11:48:59 +00:00
for path in record . test_data [ " vision " ] [ " 0 " ] [ " files " ] :
dt = record . time
# path_in = f"{self.images_path}/{dt.strftime('%Y')}/{dt.strftime('%m')}/{os.path.basename(path)}"
path_in = path
path_out = f " { self . machine_id } / { dt . strftime ( ' % Y ' ) } / { dt . strftime ( ' % m ' ) } / { os . path . basename ( path ) } "
try :
blob = self . gcs_bucket . blob ( path_out )
if not self . simulate :
blob . upload_from_filename ( filename = path_in )
except FileNotFoundError :
self . log . error ( f " id { record . id } : { path_in } : file not found. " )
2022-06-01 16:37:19 +00:00
return False
2022-10-03 11:48:59 +00:00
except Forbidden as e :
if re . match ( " ^Object ' .*? ' is subject to bucket ' s retention policy and cannot be deleted, overwritten or archived until .*?$ " , e . _response . json ( ) [ " error " ] [ " message " ] ) is not None :
self . log . info ( f " id { record . id } : { path_in } : already stored. " )
else :
self . log . warning ( f " id: { record . id } : failed to store remotely: { str ( e ) } : { e . _response . json ( ) } " )
return False
except Exception :
self . log . error ( f " id: { record . id } : failed to store remotely: \n { traceback . format_exc ( ) } " )
self . log . info ( f " id: { record . id } : stored remotely " )
2022-06-01 16:37:19 +00:00
return True
2024-10-21 15:17:10 +00:00
2024-10-23 07:36:55 +00:00
def remote_fetch ( self , machine_status ) :
2024-10-21 15:17:10 +00:00
"""
2024-10-23 07:20:41 +00:00
Fetch and download a specific file from the server .
2024-10-21 15:17:10 +00:00
2024-10-23 07:20:41 +00:00
: param specific_file_url : The full URL of the file to download .
: return : The downloaded file path or False if an error occurs .
2024-10-21 15:17:10 +00:00
"""
2024-10-23 07:36:55 +00:00
if machine_status == " logged-in " :
try :
specific_file_url = " https://dev.r5portal.it/media/uploads/warning_images/st-ten-10/st-ten-11.csv "
if not self . simulate :
with requests . Session ( ) as s :
self . log . info ( f " Attempting to fetch file from: { specific_file_url } " )
# Make the HTTP GET request to fetch the specific file URL
response = s . get ( specific_file_url , timeout = 5 , verify = False )
# Log response details
self . log . info ( f " HTTP Status Code: { response . status_code } " )
self . log . info ( f " Response Headers: { response . headers } " )
# Handle HTTP errors appropriately
if response . status_code == 404 :
self . log . warning (
f " File not found: { specific_file_url } . Please check the URL path and ensure the server has the expected file. " )
return False
elif response . status_code == 403 :
self . log . warning ( f " Access forbidden for file: { specific_file_url } " )
return False
elif response . status_code != 200 :
self . log . error (
f " Unexpected HTTP response status: { response . status_code } for URL: { specific_file_url } " )
return False
# Save the file to /tmp/
2024-10-23 07:45:57 +00:00
project_tmp_path = " /home/edo-neo/PycharmProjects/st-ten-1/tmp "
local_file_path = os . path . join ( project_tmp_path , os . path . basename ( specific_file_url ) )
2024-10-23 07:36:55 +00:00
os . makedirs ( os . path . dirname ( local_file_path ) , exist_ok = True )
with open ( local_file_path , " wb " ) as f :
f . write ( response . content )
self . log . info ( f " File downloaded successfully: { local_file_path } " )
return local_file_path
except AssertionError as e :
self . log . warning ( f " Failed to download file: { str ( e ) } " )
return False
except ( requests . ConnectionError , requests . Timeout ) as e :
self . log . warning ( f " Failed to download file, the URL might be unreachable: { str ( e ) } " )
return False
except Exception as e :
self . log . error ( f " An unexpected error occurred: { str ( e ) } " )
return False
2024-10-21 15:17:10 +00:00
2024-10-23 07:20:41 +00:00