2022-06-01 16:37:19 +00:00
import json
2022-10-03 11:48:59 +00:00
import os
2022-06-01 16:37:19 +00:00
import re
import sys
2024-04-23 09:15:23 +00:00
import threading
import time
2024-10-23 09:46:57 +00:00
from pathlib import Path
2024-04-23 09:15:23 +00:00
import requests
2022-06-01 16:37:19 +00:00
import traceback
2024-10-21 15:17:10 +00:00
from bs4 import BeautifulSoup
2022-06-01 16:37:19 +00:00
import requests
from google . api_core . exceptions import Forbidden
from google . cloud import storage
from lib . db import Archive , db
from PyQt5 . QtCore import QThread
from requests . adapters import HTTPAdapter , Retry
from urllib3 . exceptions import InsecureRequestWarning
from . component import Component
# Suppress insecure request warning
requests . packages . urllib3 . disable_warnings ( category = InsecureRequestWarning )
class ArchiveSynchronizer ( Component ) :
2024-04-23 09:15:23 +00:00
machine_status = " offline "
2022-06-01 16:37:19 +00:00
def __init__ ( self , config = None , name = None , period = 1 , lazy = True , paused = False , threaded = True ) :
super ( ) . __init__ ( config = config , name = name , period = period , lazy = lazy , paused = paused , threaded = threaded )
self . simulate = " --sim-archiver " in sys . argv
2024-04-23 09:15:23 +00:00
self . status = ArchiveSynchronizer . machine_status
2022-06-01 16:37:19 +00:00
def config_changed ( self ) :
self . machine_id = self . config . machine_id
2024-04-23 09:15:23 +00:00
if " --dev-portal " in sys . argv :
2024-10-21 15:17:10 +00:00
self . archive_endpoint = f " https://dev.r5portal.it/api/st-ten-save/ "
2024-10-23 09:30:30 +00:00
#self.url = f"https://dev.r5portal.it/media/uploads/warning_images/st-ten-10/st-ten-11.csv"
2024-04-23 09:15:23 +00:00
else :
self . archive_endpoint = self . config [ self . name ] [ " archive_endpoint " ]
2024-10-21 15:17:10 +00:00
#self.browse_folder_endpoint = self.config[self.name]["browse_folder_endpoint"]
2022-06-29 09:02:58 +00:00
self . _do_set_period ( { " period " : float ( self . config [ self . name ] [ " poll_time " ] ) } )
self . hold_time = round ( float ( self . config [ self . name ] [ " hold_time " ] ) * 1000 )
self . gcs_client = storage . Client . from_service_account_json ( self . config [ self . name ] [ " service_account_json " ] )
2022-06-01 16:37:19 +00:00
self . gcs_client . _http . mount ( " " , HTTPAdapter ( max_retries = Retry ( total = 0 ) ) ) # this seems to be useless
self . gcs_client . _http . adapters . move_to_end ( " " , last = False ) # this seems to be useless
2022-06-29 09:02:58 +00:00
self . bucket_id = self . config [ self . name ] [ " bucket_id " ]
2022-06-01 16:37:19 +00:00
self . gcs_bucket = None
@db.connection_context ( )
def _get ( self ) :
2022-10-12 12:52:34 +00:00
for record in list ( Archive . select ( ) . where ( ( Archive . archived != True ) | ( Archive . uploaded != True ) ) ) : # using "is not True" breaks the query.. # list() forces the complete execution of the query unlocking the db unlike __enter__()
2022-06-01 16:37:19 +00:00
if not self . simulate :
if record . archived is not True :
record . archived = self . remote_archive ( record ) is True
if record . uploaded is not True :
record . uploaded = self . remote_store ( record ) is True
2023-07-18 14:47:06 +00:00
else :
self . log . info ( " simulated archive synchronizer cycle " )
2022-06-01 16:37:19 +00:00
record . save ( )
if self . hold_time > 0 :
QThread . msleep ( self . hold_time )
self . gcs_bucket = None
2024-04-23 09:15:23 +00:00
if " --dev-portal " in sys . argv :
self . update_machine_status ( )
2024-10-23 13:00:17 +00:00
self . remote_fetch ( self . machine_status , self . machine_id )
2022-06-01 16:37:19 +00:00
super ( ) . _get ( )
2024-04-23 09:15:23 +00:00
def update_machine_status ( self ) :
self . status = ArchiveSynchronizer . machine_status
#print(f"machine_status_global: {ArchiveSynchronizer.machine_status}") # TESTING
self . status_endpoint = f " https://dev.r5portal.it/api/device-info-update?machine-id= { self . machine_id . upper ( ) } &status= { self . status } "
if self . status not in [ " working " , " logged-in " , " logged-out " ] :
status_dict = { " last_status " : " offline " }
else :
status_dict = { " last_status " : self . status }
response = None
try :
if not self . simulate :
with requests . Session ( ) as s :
s . mount ( " " , HTTPAdapter ( max_retries = Retry ( total = 0 ) ) ) # this disables retries
response = requests . post ( self . status_endpoint , json = status_dict , timeout = 5 , verify = False )
if response . status_code != 200 :
raise AssertionError ( " bad status response " )
except AssertionError as e :
self . log . warning (
f " Status: { self . status } : failed to update machine status: { str ( e ) } : { response . status_code if response else ' no response ' } : { response . content if response else ' no response ' } "
)
return False
except ( requests . ConnectionError , requests . Timeout ) as e :
self . log . warning (
f " Status: { self . status } : failed to update machine status, archive_endpoint might be unreachable: { str ( e ) } "
)
return False
except Exception :
self . log . error (
f " Status: { self . status } : failed to update machine status: \n { traceback . format_exc ( ) } : \n { response . status_code if response else ' no response ' } : { response . content if response else ' no response ' } "
)
return False
self . log . info ( f " Status: { self . status } : Machine Status Updated Successfully " )
return True
2022-06-01 16:37:19 +00:00
def remote_archive ( self , record ) :
2024-04-23 09:15:23 +00:00
r = None
2022-06-01 16:37:19 +00:00
try :
if not self . simulate :
with requests . Session ( ) as s :
s . mount ( " " , HTTPAdapter ( max_retries = Retry ( total = 0 ) ) ) # this disables retries
r = requests . post ( self . archive_endpoint , params = {
2022-10-03 11:48:59 +00:00
" data " : json . dumps ( record . test_data ) ,
2022-06-01 16:37:19 +00:00
" machine_id " : self . machine_id ,
2022-10-03 11:48:59 +00:00
" overridden " : record . overridden ,
" recipe " : record . test_data . get ( " recipe " , { } ) . get ( " name " , None ) ,
" result " : " OK " if record . result else " KO " ,
" serial " : record . id ,
2022-06-01 16:37:19 +00:00
" time " : record . time . isoformat ( ) ,
" user " : record . user . username ,
} , timeout = 5 , verify = False )
if r . status_code != 200 :
raise AssertionError ( " bad status response " )
except AssertionError as e :
self . log . warning ( f " id: { record . id } : failed to archive remotely: { str ( e ) } : { r . status_code } : { r . content } " )
return False
except ( requests . ConnectionError , requests . Timeout ) as e :
self . log . warning ( f " id: { record . id } : failed to archive remotely, archive_endpoint might be unreachable: { str ( e ) } " )
return False
return True
def remote_store ( self , record ) :
2022-10-03 11:48:59 +00:00
if " vision " not in record . test_data :
return True
2022-06-01 16:37:19 +00:00
try :
self . gcs_bucket = self . gcs_client . get_bucket ( self . bucket_id , timeout = ( 5 , 30 ) , retry = None ) # retry=None seems to be ignored
except Exception :
self . log . warning ( f " id { record . id } : failed to connect to bucket { repr ( self . bucket_id ) } : \n { traceback . format_exc ( ) } " )
self . gcs_bucket = None
if self . gcs_bucket is None :
return False
2022-10-03 11:48:59 +00:00
for path in record . test_data [ " vision " ] [ " 0 " ] [ " files " ] :
dt = record . time
# path_in = f"{self.images_path}/{dt.strftime('%Y')}/{dt.strftime('%m')}/{os.path.basename(path)}"
path_in = path
path_out = f " { self . machine_id } / { dt . strftime ( ' % Y ' ) } / { dt . strftime ( ' % m ' ) } / { os . path . basename ( path ) } "
try :
blob = self . gcs_bucket . blob ( path_out )
if not self . simulate :
blob . upload_from_filename ( filename = path_in )
except FileNotFoundError :
self . log . error ( f " id { record . id } : { path_in } : file not found. " )
2022-06-01 16:37:19 +00:00
return False
2022-10-03 11:48:59 +00:00
except Forbidden as e :
if re . match ( " ^Object ' .*? ' is subject to bucket ' s retention policy and cannot be deleted, overwritten or archived until .*?$ " , e . _response . json ( ) [ " error " ] [ " message " ] ) is not None :
self . log . info ( f " id { record . id } : { path_in } : already stored. " )
else :
self . log . warning ( f " id: { record . id } : failed to store remotely: { str ( e ) } : { e . _response . json ( ) } " )
return False
except Exception :
self . log . error ( f " id: { record . id } : failed to store remotely: \n { traceback . format_exc ( ) } " )
self . log . info ( f " id: { record . id } : stored remotely " )
2022-06-01 16:37:19 +00:00
return True
2024-10-21 15:17:10 +00:00
2024-10-23 14:51:27 +00:00
def remote_fetch ( self , machine_status = None , machine_id = None ) :
2024-10-21 15:17:10 +00:00
"""
2024-10-23 09:30:30 +00:00
Fetch and download files from the server .
2024-10-21 15:17:10 +00:00
2024-10-23 09:30:30 +00:00
: param machine_status : Status of the machine ( e . g . , ' logged-in ' ) .
2024-10-23 13:00:17 +00:00
: param machine_id : ID of the machine .
2024-10-23 09:30:30 +00:00
: return : A list of downloaded file paths or a dictionary with errors if any occur .
2024-10-21 15:17:10 +00:00
"""
2024-10-23 13:00:17 +00:00
# URLs endpoint from your Django server with machine_id as a query parameter
if machine_id is None :
raise ValueError ( " machine_id cannot be None " )
urls_endpoint = f " http://dev.r5portal.it/get-file-urls/?machine_id= { machine_id } "
2024-10-23 09:30:30 +00:00
2024-10-23 07:36:55 +00:00
if machine_status == " logged-in " :
try :
2024-10-23 09:30:30 +00:00
# Fetching the list of specific file URLs from the Django server
2024-10-23 07:36:55 +00:00
if not self . simulate :
with requests . Session ( ) as s :
2024-10-23 09:30:30 +00:00
self . log . info ( f " Fetching list of file URLs from: { urls_endpoint } " )
2024-10-23 07:36:55 +00:00
2024-10-23 09:30:30 +00:00
# Make the HTTP GET request to fetch the list of URLs
response = s . get ( urls_endpoint , timeout = 5 , verify = False )
2024-10-23 07:36:55 +00:00
# Log response details
self . log . info ( f " HTTP Status Code: { response . status_code } " )
self . log . info ( f " Response Headers: { response . headers } " )
2024-10-23 09:30:30 +00:00
self . log . info ( f " Response Content: { response . content } " )
2024-10-23 13:00:17 +00:00
2024-10-23 09:30:30 +00:00
# Handle HTTP errors for the list of URLs fetch
2024-10-23 07:36:55 +00:00
if response . status_code == 404 :
2024-10-23 09:30:30 +00:00
self . log . warning ( f " URLs endpoint not found: { urls_endpoint } . Please check the URL path. " )
return { " error " : " URLs endpoint not found " }
2024-10-23 07:36:55 +00:00
elif response . status_code == 403 :
2024-10-23 09:30:30 +00:00
self . log . warning ( f " Access forbidden for URLs endpoint: { urls_endpoint } " )
return { " error " : " Access forbidden " }
2024-10-23 07:36:55 +00:00
elif response . status_code != 200 :
self . log . error (
2024-10-23 09:30:30 +00:00
f " Unexpected HTTP response status: { response . status_code } for URL: { urls_endpoint } " )
return { " error " : " Unexpected HTTP response status " }
2024-10-23 07:36:55 +00:00
2024-10-23 09:30:30 +00:00
# Parse the list of URLs (assuming the response contains a JSON array of URLs)
try :
file_urls = response . json ( )
self . log . info ( f " Fetched file URLs: { file_urls } " )
except ValueError as e :
self . log . error ( f " Error parsing JSON response: { str ( e ) } " )
self . log . error ( f " Response Content: { response . content } " )
return { " error " : " Failed to parse JSON response " }
2024-10-23 13:00:17 +00:00
2024-10-23 09:30:30 +00:00
downloaded_files = [ ]
errors = { }
2024-10-23 07:36:55 +00:00
2024-10-23 14:51:27 +00:00
# Setup the base download path
base_download_path = os . path . join ( Path . home ( ) , " PycharmProjects " , " st-ten-1 " )
2024-10-23 09:30:30 +00:00
# Iterate over the list of URLs to download each file
for specific_file_url in file_urls :
try :
self . log . info ( f " Attempting to fetch file from: { specific_file_url } " )
2024-10-23 07:36:55 +00:00
2024-10-23 09:30:30 +00:00
# Make the HTTP GET request to fetch the file URL
response = s . get ( specific_file_url , timeout = 5 , verify = False )
2024-10-23 07:36:55 +00:00
2024-10-23 09:30:30 +00:00
# Log response details
self . log . info ( f " HTTP Status Code: { response . status_code } " )
self . log . info ( f " Response Headers: { response . headers } " )
# Handle HTTP errors appropriately for each file
if response . status_code == 404 :
self . log . warning ( f " File not found: { specific_file_url } . Please check the URL path. " )
errors [ specific_file_url ] = " File not found "
continue
elif response . status_code == 403 :
self . log . warning ( f " Access forbidden for file: { specific_file_url } " )
errors [ specific_file_url ] = " Access forbidden "
continue
elif response . status_code != 200 :
self . log . error (
f " Unexpected HTTP response status: { response . status_code } for URL: { specific_file_url } " )
errors [ specific_file_url ] = f " Unexpected status { response . status_code } "
continue
2024-10-21 15:17:10 +00:00
2024-10-23 14:51:27 +00:00
# Determine the subdirectory based on the file URL or other criteria
if " warning_images " in specific_file_url :
sub_dir = os . path . join ( " config " , " warning_images " , machine_id )
elif " ricette " in specific_file_url :
sub_dir = os . path . join ( " config " , " csv_import " , " auto_csv_import " )
else :
sub_dir = " tmp " # Save others under the temporary folder
# Ensure the directory exists
download_path = os . path . join ( base_download_path , sub_dir )
os . makedirs ( download_path , exist_ok = True )
2024-10-23 09:30:30 +00:00
2024-10-23 14:51:27 +00:00
# Save the file to the determined directory
local_file_path = os . path . join ( download_path , os . path . basename ( specific_file_url ) )
2024-10-23 09:30:30 +00:00
with open ( local_file_path , " wb " ) as f :
f . write ( response . content )
self . log . info ( f " File downloaded successfully: { local_file_path } " )
downloaded_files . append ( local_file_path )
except ( requests . ConnectionError , requests . Timeout ) as e :
self . log . warning (
f " Failed to download file from { specific_file_url } , the URL might be unreachable: { str ( e ) } " )
errors [ specific_file_url ] = " URL unreachable "
except Exception as e :
self . log . error (
f " An unexpected error occurred while downloading { specific_file_url } : { str ( e ) } " )
errors [ specific_file_url ] = " Unexpected error "
# Return the list of downloaded files and any errors
return {
" downloaded_files " : downloaded_files ,
" errors " : errors
}
except Exception as e :
self . log . error ( f " An unexpected error occurred while fetching URLs: { str ( e ) } " )
return { " error " : " Unexpected error " }
2024-10-23 07:20:41 +00:00