2022-10-18 14:41:53 +00:00
from lib . db import Recipes , Steps , db
from PyQt5 . QtCore import QSemaphore , pyqtSignal
from PyQt5 . QtWidgets import QMessageBox
2022-09-06 10:06:43 +00:00
from . component import Component
2022-06-08 07:11:38 +00:00
from . modbus_component import ModbusComponent
2022-09-14 12:47:09 +00:00
from . tecna_marposs_provaset_t3l_registers import registers as t3l_registers
from . tecna_marposs_provaset_t3p_registers import registers as t3p_registers
2022-06-01 16:37:19 +00:00
2022-09-14 12:47:09 +00:00
class TecnaMarpossProvasetT3 ( ModbusComponent ) :
2022-10-18 14:41:53 +00:00
_store_recipes_signal = pyqtSignal ( object )
_store_recipes_lock = QSemaphore ( 0 )
2022-06-01 16:37:19 +00:00
def __init__ ( self , config = None , name = None , period = 1 , lazy = True , paused = False , threaded = True ) :
2022-09-14 12:47:09 +00:00
super ( ) . __init__ ( config = config , name = name , period = period , lazy = lazy , paused = paused , threaded = threaded , registers = None )
2022-06-01 16:37:19 +00:00
def config_changed ( self ) :
2022-06-08 07:11:38 +00:00
super ( ) . config_changed ( )
2022-10-18 14:41:53 +00:00
self . _store_recipes_signal . connect ( self . _store_recipes )
2022-09-14 12:47:09 +00:00
self . model = self . config [ self . name ] [ " model " ] . lower ( )
if self . model == " t3p " :
self . registers = t3p_registers
elif self . model == " t3l " :
self . registers = t3l_registers
else :
2023-07-01 14:16:58 +00:00
raise NotImplementedError ( f " tecna t3 model { self . model !r} not implemented. " )
2022-07-13 08:36:58 +00:00
self . set_measure_units ( )
2022-06-08 07:11:38 +00:00
self . units = self . get_measure_units ( )
2022-10-18 14:41:53 +00:00
self . max_program_number = self . read ( " Max number of programs " )
2022-10-24 10:19:20 +00:00
self . saver_label_count = min ( abs ( int ( self . config [ self . name ] . get ( " saver_label_count " , 1 ) ) ) , 0b1111 )
self . saver_print_on_fail = 1 if self . config [ self . name ] . get ( " saver_print_on_fail " , " no " ) . lower ( ) in { " yes " , " y " , " on " , " true " , " 1 " , " x " } else 0
self . saver_label_template = min ( abs ( int ( self . config [ self . name ] . get ( " saver_label_template " , 1 ) ) ) , 0b11111111 )
self . model = self . config [ self . name ] [ " model " ] . lower ( )
2022-07-04 10:36:51 +00:00
self . log . info ( f " units: { self . units } " )
2022-06-08 07:11:38 +00:00
_pressure_units = { " mH2O " : 0 , " mbar " : 1 , " kPa " : 2 , " mmHg " : 3 , " inH2O " : 4 , " psi " : 5 , " mmH2O " : 6 , } # (se fondoscala <=6 bar)
_leak_units = { " mmH2O " : 0 , " mbar " : 1 , " Pa " : 2 , " mmHg " : 3 , " inH2O " : 4 , " psi " : 5 , }
_leak_flow_units = { " cm3/min " : 0 , " cm3/h " : 1 , }
_volume_units = { " litri " : 0 , " cm3 " : 1 , }
_time_units = { " seconds " : 0 , }
2022-09-14 12:47:09 +00:00
_flow_units = { " liters/min " : 0 , " liters/h " : 1 , " m3/h " : 2 , " cfm " : 3 }
2022-06-08 07:11:38 +00:00
_pressure_units_map = { v : k for k , v in _pressure_units . items ( ) }
_leak_units_map = { v : k for k , v in _leak_units . items ( ) }
_leak_flow_units_map = { v : k for k , v in _leak_flow_units . items ( ) }
_volume_units_map = { v : k for k , v in _volume_units . items ( ) }
_time_units_map = { v : k for k , v in _time_units . items ( ) }
_flow_units_map = { v : k for k , v in _flow_units . items ( ) }
def set_measure_units ( self ) :
2022-09-14 12:47:09 +00:00
if self . model == " t3p " :
for register , [ unit , decimals ] in {
" MEASURE UNITS: Relative pressure " : [ self . _pressure_units [ " mbar " ] , 0 ] , # red, purple
" MEASURE UNITS: Differential (leak) pressure " : [ self . _leak_units [ " mbar " ] , 0 ] , # yellow
" MEASURE UNITS: Calculated leak flow rate " : [ self . _leak_flow_units [ " cm3/min " ] , 0 ] , # blue
" MEASURE UNITS: Volume " : [ self . _volume_units [ " litri " ] , 0 ] , # green
" MEASURE UNITS: Flow rate " : [ self . _flow_units [ " liters/min " ] , 0 ] , # orange
} . items ( ) :
self . write ( register , ( decimals << 8 ) + unit )
elif self . model == " t3l " :
for register , [ unit , decimals ] in {
" MEASURE UNITS: Relative pressure " : [ self . _pressure_units [ " mbar " ] , 0 ] , # red, purple
" MEASURE UNITS: Differential (leak) pressure " : [ self . _leak_units [ " mbar " ] , 0 ] , # yellow
" MEASURE UNITS: Calculated leak flow rate " : [ self . _leak_flow_units [ " cm3/min " ] , 0 ] , # blue
" MEASURE UNITS: Volume " : [ self . _volume_units [ " litri " ] , 0 ] , # green
" MEASURE UNITS: Flow rate " : [ self . _flow_units [ " liters/min " ] , 0 ] , # orange
} . items ( ) :
self . write ( register , unit ) # (decimals << 8) + unit)
else :
2023-07-01 14:16:58 +00:00
raise NotImplementedError ( f " tecna t3 model { self . model !r} not implemented. " )
2022-06-08 07:11:38 +00:00
def get_measure_units ( self ) :
units = { }
2022-09-14 12:47:09 +00:00
if self . model == " t3p " :
for [ register , unit_map , unit_names ] in [
[ " Running test: relative pressure format " , self . _pressure_units_map , [ " relative_pressure " , " red " , " r " , 21 , ] ] , # also by documentation color and register number
[ " Running test: differential pressure format " , self . _pressure_units_map , [ " differential_pressure " , " purple " , " p " , 22 , ] ] , # also by documentation color and register number
[ " Running test: relative pressure format (low resolution) " , self . _leak_units_map , [ " relative_pressure_lr " , " yellow " , " y " , 23 , ] ] , # also by documentation color and register number
[ " Running test: calculated leak flow rate format " , self . _leak_flow_units_map , [ " leak_flow " , " blue " , " b " , 24 , ] ] , # also by documentation color and register number
[ " Running test: volume format " , self . _volume_units_map , [ " volume " , " green " , " g " , 25 , ] ] , # also by documentation color and register number
[ " Running test: time format " , self . _time_units_map , [ " time " , " orange " , " t " , 26 , ] ] , # also by documentation color and register number
[ " Running test: flow rate format " , self . _flow_units_map , [ " flow " , " white " , " o " , 27 , ] ] , # also by documentation color and register number
] :
v = self . read ( register )
unit_spec = [ 10 * * ( - ( ( v >> 8 ) & 0xff ) ) , unit_map [ v & 0xff ] ]
for unit_name in unit_names :
units [ unit_name ] = unit_spec
elif self . model == " t3l " :
for [ register , unit_map , unit_names ] in [
[ [ " Running test: relative pressure scale " , " Running test: relative pressure decimals " ] , self . _pressure_units_map , [ " relative_pressure " , " red " , " r " , 1501 , ] ] , # also by documentation color and register number
[ [ " Running test: differential pressure scale " , " Running test: differential pressure decimals " ] , self . _pressure_units_map , [ " differential_pressure " , " purple " , " p " , 1503 , ] ] , # also by documentation color and register number
[ [ " Running test: relative pressure scale (low resolution) " , " Running test: relative pressure decimals (low resolution) " ] , self . _leak_units_map , [ " relative_pressure_lr " , " yellow " , " y " , 1505 , ] ] , # also by documentation color and register number
[ " Running test: calculated leak flow rate format " , self . _leak_flow_units_map , [ " leak_flow " , " blue " , " b " , 1507 , ] ] , # also by documentation color and register number
[ " Running test: volume format " , self . _volume_units_map , [ " volume " , " green " , " g " , 1508 , ] ] , # also by documentation color and register number
[ " Running test: time format " , self . _time_units_map , [ " time " , " orange " , " t " , 1509 , ] ] , # also by documentation color and register number
[ " Running test: flow rate format " , self . _flow_units_map , [ " flow " , " white " , " o " , 1510 , ] ] , # also by documentation color and register number
[ " Running test: line pressure format " , self . _pressure_units_map , [ " line_pressure " , " lp " , " l " , 1511 , ] ] , # also by documentation color and register number
] :
if type ( register ) is list :
v = [ self . read ( r ) for r in register ]
unit_spec = [ 10 * * ( - ( v [ 1 ] & 0xff ) ) , v [ 0 ] ]
else :
v = self . read ( register )
unit_spec = [ 10 * * ( - ( ( v >> 8 ) & 0xff ) ) , unit_map [ v & 0xff ] ]
for unit_name in unit_names :
units [ unit_name ] = unit_spec
else :
2023-07-01 14:16:58 +00:00
raise NotImplementedError ( f " tecna t3 model { self . model !r} not implemented. " )
2022-06-08 07:11:38 +00:00
return units
2022-07-04 10:36:51 +00:00
def _convert_from_format ( self , data , formatting = None , decoding_map = None ) :
2022-07-12 14:02:45 +00:00
if decoding_map is not None and data in decoding_map :
2022-07-04 10:36:51 +00:00
data = decoding_map [ data ]
if formatting is not None :
# units = self.units[formatting]
# data = [data * units[0], units[1]]
data = data * self . units [ formatting ] [ 0 ]
return data
2022-06-08 07:11:38 +00:00
2022-07-04 10:36:51 +00:00
def _convert_to_format ( self , data , formatting = None , encoding_map = None ) :
if formatting is not None :
data = int ( data / self . units [ formatting ] [ 0 ] )
2022-07-12 14:02:45 +00:00
if encoding_map is not None and data in encoding_map :
2022-07-04 10:36:51 +00:00
data = encoding_map [ data ]
return data
2022-06-08 07:11:38 +00:00
2022-09-06 10:06:43 +00:00
@Component.reconfig_on_error
2022-07-19 09:59:00 +00:00
def read ( self , register , * args , data_type = None , gain = None , offset = None , formatting = None , decoding_map = None , * * kwargs ) :
2022-06-01 16:37:19 +00:00
if type ( register ) is str :
2022-07-19 09:59:00 +00:00
register , s = self . registers [ register ]
if data_type is None :
data_type = s . get ( " dt " , None )
if gain is None :
gain = s . get ( " g " , None )
if offset is None :
offset = s . get ( " o " , None )
2022-07-04 10:36:51 +00:00
if formatting is None :
formatting = s . get ( " f " , None )
if decoding_map is None :
decoding_map = s . get ( " decoding " , None )
2022-07-19 09:59:00 +00:00
if not len ( args ) :
args = s . get ( " a " , [ ] )
if not len ( kwargs ) :
kwargs = s . get ( " k " , { } )
if data_type is None :
data_type = " 16bit_uint "
if gain is None :
gain = 1
if offset is None :
offset = 0
return self . _convert_from_format (
super ( ) . read (
register ,
* args ,
data_type = data_type ,
gain = gain ,
offset = offset ,
* * kwargs ,
) ,
formatting = formatting ,
decoding_map = decoding_map ,
)
2022-09-06 10:06:43 +00:00
@Component.reconfig_on_error
2022-07-19 09:59:00 +00:00
def write ( self , register , data , * args , data_type = None , gain = None , offset = None , formatting = None , encoding_map = None , * * kwargs ) :
2022-06-08 07:11:38 +00:00
if type ( register ) is str :
2022-07-19 09:59:00 +00:00
register , s = self . registers [ register ]
if data_type is None :
data_type = s . get ( " dt " , None )
if gain is None :
gain = s . get ( " g " , None )
if offset is None :
offset = s . get ( " o " , None )
2022-07-04 10:36:51 +00:00
if formatting is None :
formatting = s . get ( " f " , None )
if encoding_map is None :
encoding_map = s . get ( " encoding " , None )
2022-07-19 09:59:00 +00:00
if not len ( args ) :
args = s . get ( " a " , [ ] )
if not len ( kwargs ) :
kwargs = s . get ( " k " , { } )
if data_type is None :
data_type = " 16bit_uint "
if gain is None :
gain = 1
if offset is None :
offset = 0
return super ( ) . write (
register ,
self . _convert_to_format (
data ,
formatting = formatting ,
encoding_map = encoding_map ,
) ,
* args ,
data_type = data_type ,
gain = gain ,
offset = offset ,
* * kwargs ,
)
2022-06-01 16:37:19 +00:00
2022-09-06 10:06:43 +00:00
@Component.reconfig_on_error
2022-06-01 16:37:19 +00:00
def _get ( self ) :
2022-06-08 07:11:38 +00:00
# print("TECNA", str(int(QThread.currentThreadId())), flush=True)
2022-06-01 16:37:19 +00:00
# READ INFO
2022-06-08 07:11:38 +00:00
info = { r : self . read ( r ) for r in [
2022-07-18 10:32:05 +00:00
" Real time test pressure output " ,
2023-07-01 14:46:33 +00:00
" Real time differential pressure output " ,
2022-07-18 10:32:05 +00:00
" Real time pressure line regulator " ,
2023-07-01 14:46:33 +00:00
" Active alarm flags " ,
" Active test program number " ,
2022-07-18 10:32:05 +00:00
" Running test: active phase " ,
" Running test: test type " ,
2023-05-24 13:36:52 +00:00
" Running test: sequence index " ,
" Digital inputs status (mask) " ,
2023-07-01 14:46:33 +00:00
# "Digital outputs status (mask)",
2022-06-08 07:11:38 +00:00
] }
2022-09-14 12:47:09 +00:00
if self . model == " t3p " :
pass
elif self . model == " t3l " :
info . update ( { r : self . read ( r ) for r in [
" Active not severe alarm flags " ,
] } )
else :
2023-05-18 14:12:40 +00:00
raise NotImplementedError ( f " Tecna t3 model { self . model !r} not implemented. " )
2022-08-01 11:29:12 +00:00
if info [ " Running test: active phase " ] == " FINE TEST " : # "END TEST, WAITING THE START OF A NEW TEST":
2022-07-04 10:36:51 +00:00
info . update ( self . get_test_results ( ) )
2023-05-18 14:12:40 +00:00
for round_me in [ " measured leak " ] :
2023-05-20 13:28:57 +00:00
if round_me in info . keys ( ) :
info . update ( { round_me : float ( f " { info [ round_me ] : .2f } " ) } )
2022-07-06 13:54:22 +00:00
self . log . debug ( str ( info ) )
2022-06-08 07:11:38 +00:00
super ( ) . _get ( [ info ] )
2022-10-04 11:51:36 +00:00
@Component.reconfig_on_error
def _set ( self , val = None ) :
if val is not None : # handle request:
for register , value in val . items ( ) :
print ( register , value )
self . write ( register , value )
super ( ) . _set ( val )
2022-10-18 14:41:53 +00:00
def start_test ( self , table = None ) :
if table is None :
table = self . max_program_number
2022-07-12 08:48:04 +00:00
self . log . info ( f " starting test table { table !r} " )
2022-07-18 10:32:05 +00:00
self . write ( " Source of test program number selection " , " FROM PARAMETER (SET BY LCD OR SERIAL LINE) " )
self . write ( " Selected program " , table )
self . write ( " Start test " , table )
2022-07-04 10:36:51 +00:00
def stop_test ( self ) :
2022-07-12 08:48:04 +00:00
self . log . warning ( " stopping test " )
2022-07-18 10:32:05 +00:00
self . write ( " Reset running test " , 0 )
2022-07-04 10:36:51 +00:00
2022-06-08 07:11:38 +00:00
def get_test_results ( self ) :
2022-07-12 08:48:04 +00:00
self . log . info ( " getting test results " )
2022-06-08 07:11:38 +00:00
return { r : self . read ( r ) for r in [
2023-05-18 14:12:40 +00:00
#"Running test: phase backwards time",
2022-07-18 10:32:05 +00:00
" Running test: filling pressure " ,
" Running test: pressure at the end of settling " ,
2023-05-18 14:12:40 +00:00
#"Running test: burst pressure",
2022-06-08 07:11:38 +00:00
" Running test: measured leak " ,
2023-05-18 14:12:40 +00:00
#"Running test: calculated leak flow rate",
#"Running test: calculate RVP%",
2022-06-08 07:11:38 +00:00
" Running test: result " ,
] }
2022-07-12 08:48:04 +00:00
2022-10-18 14:41:53 +00:00
def write_recipe ( self , recipe , step , table = None ) :
if table is None :
table = self . max_program_number
2022-07-19 09:59:00 +00:00
recipe_name = recipe . part_number [ : 16 ] . encode ( " ascii " )
2022-07-12 13:30:30 +00:00
recipe_name + = b " \x00 " * ( 16 - len ( recipe_name ) )
2022-10-18 14:41:53 +00:00
recipe_barcode = f " j { recipe . part_number } " [ : 16 ] . encode ( " ascii " )
recipe_barcode + = b " \x00 " * ( 24 - len ( recipe_barcode ) )
2023-07-24 20:46:19 +00:00
test_flags = 0b0110100001010100 if ( step . spec . get ( " autotest " , False ) in [ " ko_check " ] ) else 0b0110000001010100
2023-07-25 13:44:45 +00:00
pid_mode = int ( step . spec . get ( " pid_mode " , self . config [ " recipes_defaults " ] [ " pid_mode " ] ) ) << 4
2023-07-24 20:46:19 +00:00
test_flags = test_flags | pid_mode
2023-07-25 13:44:45 +00:00
pid_ramps = 0b0000000000000000 | int ( step . spec . get ( " pid_level " , self . config [ " recipes_defaults " ] [ " pid_level " ] ) ) << 8 | int ( step . spec . get ( " pid_speed " , self . config [ " recipes_defaults " ] [ " pid_speed " ] ) ) << 12
2022-07-19 09:59:00 +00:00
spec = {
2022-07-18 10:32:05 +00:00
" Flag: Instrument settings " : 0b0000000000000000 ,
" Test program for read/write operation " : table ,
2022-10-18 14:41:53 +00:00
* * { 719 - 1 + i : ( recipe_name [ i * 2 + 1 ] << 8 ) + recipe_name [ i * 2 ] for i in range ( 8 ) } , # program name
* * { 727 - 1 + i : ( recipe_barcode [ i * 2 + 1 ] << 8 ) + recipe_barcode [ i * 2 ] for i in range ( 12 ) } , # program associated bar-code
2022-10-24 10:19:20 +00:00
* * { 761 - 1 + i : ( recipe_name [ i * 2 + 1 ] << 8 ) + recipe_name [ i * 2 ] for i in range ( 8 ) } , # print field 1
# **{769 - 1 + i: (recipe_name[i * 2 + 1] << 8) + recipe_name[i * 2] for i in range(8)}, # print field 2
" Print options " : 0b0000000000000000 | self . saver_label_count << 12 | self . saver_print_on_fail << 8 | self . saver_label_template ,
2022-07-18 10:32:05 +00:00
" Test type " : " Leak Test " ,
2023-07-24 20:46:19 +00:00
" Test flags " : test_flags ,
2022-07-19 09:59:00 +00:00
" T0 - Pre-filling time " : step . spec [ " pre_filling_time " ] ,
" P0 - Pre-filling pressure " : step . spec [ " pre_filling_pressure " ] ,
" T1 - Filling time " : step . spec [ " filling_time " ] ,
" T2 - Settling time " : step . spec [ " settling_time " ] ,
" PR- - Min pressure tolerance % " : step . spec [ " settling_pressure_min_percent " ] ,
" PR+ - Max pressure tolerance % (P+) " : step . spec [ " settling_pressure_max_percent " ] ,
" T3 - Measure time " : step . spec [ " test_time " ] ,
2023-06-10 09:22:26 +00:00
" Q- Lower test leak limit " : step . spec [ " test_pressure_qneg " ] ,
2022-07-19 09:59:00 +00:00
" PREL - Nominal test pressure " : step . spec [ " test_pressure " ] ,
2023-06-10 09:22:26 +00:00
" Q+ Upper test leak limit " : step . spec [ " test_pressure_qpos " ] ,
2022-07-19 09:59:00 +00:00
" FST - Discharge time " : step . spec [ " flush_time " ] ,
" FSL - Discharge limit " : step . spec [ " flush_pressure " ] ,
2023-07-01 14:46:33 +00:00
" PSQ - Next sequence program PSOUT mode " : 0 ,
2023-07-24 20:46:19 +00:00
" RAMPS: T1 configuration " : pid_ramps ,
2023-07-25 13:44:45 +00:00
" PID: pressure correction " : 100 ,
" Various flags " : 0b0000000000000000
2023-07-01 14:46:33 +00:00
2022-07-12 08:48:04 +00:00
}
2022-09-14 12:47:09 +00:00
if self . model == " t3p " :
pass
elif self . model == " t3l " :
spec . update ( {
" Use programs or use products " : 0 ,
" Nominal peak pressure " : step . spec [ " test_pressure " ] ,
" Pn - Nominal test pressure " : step . spec [ " test_pressure " ] ,
} )
else :
2023-07-01 14:16:58 +00:00
raise NotImplementedError ( f " tecna t3 model { self . model !r} not implemented. " )
2022-07-19 09:59:00 +00:00
self . log . debug ( str ( spec ) )
for register , value in spec . items ( ) :
2022-07-12 08:48:04 +00:00
self . write ( register , value )
2022-10-18 14:41:53 +00:00
@db.connection_context ( )
def store_recipes ( self , recipes ) :
2022-10-20 07:20:43 +00:00
if not self . ready :
self . resume ( )
resumed = True
2022-10-18 14:41:53 +00:00
if not self . ready :
QMessageBox . critical (
None ,
2022-10-18 14:45:13 +00:00
" Impossibile salvare le ricette sulla tecna " ,
2022-10-18 14:41:53 +00:00
" La tecna non sembra essere pronta " ,
)
return
recipes = [ ]
for recipe in list ( Recipes . select ( ) . order_by ( Recipes . name . asc ( ) ) ) :
2022-10-20 07:20:43 +00:00
# if recipe.spec["leak_1"]:
recipes . append ( [ recipe , Steps . get_by_id ( recipe . spec [ " available_steps " ] [ " leak_1 " ] ) ] )
2022-10-18 14:41:53 +00:00
# reverve last for our recipe control
if len ( recipes ) > max ( self . max_program_number - 1 , 0 ) :
self . log . warning ( f " too many recipes ( { len ( recipes ) } ), saving only first { max ( self . max_program_number - 1 , 0 ) } " )
QMessageBox . warning (
None ,
2022-10-18 14:45:13 +00:00
" Impossibile salvare tutte le ricette sulla tecna " ,
2022-10-18 14:41:53 +00:00
f " Troppe ricette ( { len ( recipes ) } ), saranno salvate solamente le prime { max ( self . max_program_number - 1 , 0 ) } " ,
)
self . _store_recipes_signal . emit ( recipes [ : max ( self . max_program_number - 1 , 0 ) ] )
self . _store_recipes_lock . acquire ( max ( self . _store_recipes_lock . available ( ) , 1 ) )
2022-10-18 14:47:48 +00:00
QMessageBox . information (
2022-10-18 14:45:13 +00:00
None ,
" Ricette salvate sulla tecna " ,
f " Salvate { min ( len ( recipes ) , max ( self . max_program_number - 1 , 0 ) ) } ricette " ,
)
2022-10-20 07:20:43 +00:00
if resumed :
self . pause ( )
2022-10-18 14:41:53 +00:00
def _store_recipes ( self , recipes ) :
if len ( recipes ) > max ( self . max_program_number - 1 , 0 ) :
self . log . warning ( f " too many recipes ( { len ( recipes ) } ) saving only first { max ( self . max_program_number - 1 , 0 ) } " )
for i , [ recipe , step ] in enumerate ( recipes [ : max ( self . max_program_number - 1 , 0 ) ] , start = 1 ) :
2022-10-24 10:19:20 +00:00
self . log . debug ( f " saving recipe { recipe . part_number } to table { i } " )
self . write_recipe ( recipe , step , table = i )
2022-10-18 14:45:13 +00:00
self . log . info ( f " saved { min ( len ( recipes ) , max ( self . max_program_number - 1 , 0 ) ) } recipes " )
2022-10-18 14:53:17 +00:00
self . _store_recipes_lock . release ( 1 )