2022-06-01 16:37:19 +00:00
import json
2022-10-03 11:48:59 +00:00
import os
2022-06-01 16:37:19 +00:00
import re
import sys
import traceback
import requests
from google . api_core . exceptions import Forbidden
from google . cloud import storage
from lib . db import Archive , db
from PyQt5 . QtCore import QThread
from requests . adapters import HTTPAdapter , Retry
from urllib3 . exceptions import InsecureRequestWarning
from . component import Component
# Suppress insecure request warning
requests . packages . urllib3 . disable_warnings ( category = InsecureRequestWarning )
class ArchiveSynchronizer ( Component ) :
def __init__ ( self , config = None , name = None , period = 1 , lazy = True , paused = False , threaded = True ) :
super ( ) . __init__ ( config = config , name = name , period = period , lazy = lazy , paused = paused , threaded = threaded )
self . simulate = " --sim-archiver " in sys . argv
def config_changed ( self ) :
self . machine_id = self . config . machine_id
2022-06-29 09:02:58 +00:00
self . archive_endpoint = self . config [ self . name ] [ " archive_endpoint " ]
2022-10-03 11:48:59 +00:00
# self.images_path = self.config[self.name]["images_path"]
2022-06-29 09:02:58 +00:00
self . _do_set_period ( { " period " : float ( self . config [ self . name ] [ " poll_time " ] ) } )
self . hold_time = round ( float ( self . config [ self . name ] [ " hold_time " ] ) * 1000 )
self . gcs_client = storage . Client . from_service_account_json ( self . config [ self . name ] [ " service_account_json " ] )
2022-06-01 16:37:19 +00:00
self . gcs_client . _http . mount ( " " , HTTPAdapter ( max_retries = Retry ( total = 0 ) ) ) # this seems to be useless
self . gcs_client . _http . adapters . move_to_end ( " " , last = False ) # this seems to be useless
2022-06-29 09:02:58 +00:00
self . bucket_id = self . config [ self . name ] [ " bucket_id " ]
2022-06-01 16:37:19 +00:00
self . gcs_bucket = None
@db.connection_context ( )
def _get ( self ) :
2022-10-12 12:52:34 +00:00
for record in list ( Archive . select ( ) . where ( ( Archive . archived != True ) | ( Archive . uploaded != True ) ) ) : # using "is not True" breaks the query.. # list() forces the complete execution of the query unlocking the db unlike __enter__()
2022-06-01 16:37:19 +00:00
if not self . simulate :
if record . archived is not True :
record . archived = self . remote_archive ( record ) is True
if record . uploaded is not True :
record . uploaded = self . remote_store ( record ) is True
2023-07-18 14:47:06 +00:00
else :
self . log . info ( " simulated archive synchronizer cycle " )
2022-06-01 16:37:19 +00:00
record . save ( )
if self . hold_time > 0 :
QThread . msleep ( self . hold_time )
self . gcs_bucket = None
super ( ) . _get ( )
def remote_archive ( self , record ) :
try :
if not self . simulate :
with requests . Session ( ) as s :
s . mount ( " " , HTTPAdapter ( max_retries = Retry ( total = 0 ) ) ) # this disables retries
r = requests . post ( self . archive_endpoint , params = {
2022-10-03 11:48:59 +00:00
" data " : json . dumps ( record . test_data ) ,
2022-06-01 16:37:19 +00:00
" machine_id " : self . machine_id ,
2022-10-03 11:48:59 +00:00
" overridden " : record . overridden ,
" recipe " : record . test_data . get ( " recipe " , { } ) . get ( " name " , None ) ,
" result " : " OK " if record . result else " KO " ,
" serial " : record . id ,
2022-06-01 16:37:19 +00:00
" time " : record . time . isoformat ( ) ,
" user " : record . user . username ,
} , timeout = 5 , verify = False )
if r . status_code != 200 :
raise AssertionError ( " bad status response " )
except AssertionError as e :
self . log . warning ( f " id: { record . id } : failed to archive remotely: { str ( e ) } : { r . status_code } : { r . content } " )
return False
except ( requests . ConnectionError , requests . Timeout ) as e :
self . log . warning ( f " id: { record . id } : failed to archive remotely, archive_endpoint might be unreachable: { str ( e ) } " )
return False
except Exception :
self . log . error ( f " id: { record . id } : failed to archive remotely: \n { traceback . format_exc ( ) } : \n { r . status_code } : { r . content } " )
return False
self . log . info ( f " id: { record . id } : archived remotely " )
return True
def remote_store ( self , record ) :
2022-10-03 11:48:59 +00:00
if " vision " not in record . test_data :
return True
2022-06-01 16:37:19 +00:00
try :
self . gcs_bucket = self . gcs_client . get_bucket ( self . bucket_id , timeout = ( 5 , 30 ) , retry = None ) # retry=None seems to be ignored
except Exception :
self . log . warning ( f " id { record . id } : failed to connect to bucket { repr ( self . bucket_id ) } : \n { traceback . format_exc ( ) } " )
self . gcs_bucket = None
if self . gcs_bucket is None :
return False
2022-10-03 11:48:59 +00:00
for path in record . test_data [ " vision " ] [ " 0 " ] [ " files " ] :
dt = record . time
# path_in = f"{self.images_path}/{dt.strftime('%Y')}/{dt.strftime('%m')}/{os.path.basename(path)}"
path_in = path
path_out = f " { self . machine_id } / { dt . strftime ( ' % Y ' ) } / { dt . strftime ( ' % m ' ) } / { os . path . basename ( path ) } "
try :
blob = self . gcs_bucket . blob ( path_out )
if not self . simulate :
blob . upload_from_filename ( filename = path_in )
except FileNotFoundError :
self . log . error ( f " id { record . id } : { path_in } : file not found. " )
2022-06-01 16:37:19 +00:00
return False
2022-10-03 11:48:59 +00:00
except Forbidden as e :
if re . match ( " ^Object ' .*? ' is subject to bucket ' s retention policy and cannot be deleted, overwritten or archived until .*?$ " , e . _response . json ( ) [ " error " ] [ " message " ] ) is not None :
self . log . info ( f " id { record . id } : { path_in } : already stored. " )
else :
self . log . warning ( f " id: { record . id } : failed to store remotely: { str ( e ) } : { e . _response . json ( ) } " )
return False
except Exception :
self . log . error ( f " id: { record . id } : failed to store remotely: \n { traceback . format_exc ( ) } " )
self . log . info ( f " id: { record . id } : stored remotely " )
2022-06-01 16:37:19 +00:00
return True