2022-06-08 07:11:38 +00:00
from . modbus_component import ModbusComponent
from . tecna_marposs_provaset_t3_registers import registers
2022-06-01 16:37:19 +00:00
# from pymodbus.client.sync import ModbusSerialClient as ModbusClient
# import serial
# client = ModbusClient(method="rtu", port="COM3", stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, baudrate=115200, timeout=1, strict=False)
# client.connect()
# client.read_holding_registers(1, count=1)
2022-06-08 07:11:38 +00:00
class TecnaMarpossProvasetT3 ( ModbusComponent ) :
2022-06-01 16:37:19 +00:00
def __init__ ( self , config = None , name = None , period = 1 , lazy = True , paused = False , threaded = True ) :
2022-06-08 07:11:38 +00:00
super ( ) . __init__ ( config = config , name = name , period = period , lazy = lazy , paused = paused , threaded = threaded , registers = registers )
2022-06-01 16:37:19 +00:00
2022-07-13 08:36:58 +00:00
pin_registers = {
" admin_pin " : " PASSWORD: Administrator " , # was 1909
" modify_pin " : " PASSWORD: Modify program " , # was 0
" select_pin " : " PASSWORD: Select program " , # was 0
}
2022-06-01 16:37:19 +00:00
def config_changed ( self ) :
2022-06-08 07:11:38 +00:00
super ( ) . config_changed ( )
2022-07-13 08:36:58 +00:00
self . pins = {
" admin_pin " : self . config . get ( " admin_pin " , None ) ,
" modify_pin " : self . config . get ( " modify_pin " , None ) ,
" select_pin " : self . config . get ( " select_pin " , None ) ,
}
self . unlock_tecna ( )
self . set_measure_units ( )
2022-06-08 07:11:38 +00:00
self . units = self . get_measure_units ( )
2022-07-04 10:36:51 +00:00
self . log . info ( f " units: { self . units } " )
2022-06-08 07:11:38 +00:00
2022-07-13 08:36:58 +00:00
def unlock_tecna ( self , * * kwargs ) :
pins = self . pins . copy ( )
pins . update ( kwargs )
for pin_name , register in self . pin_registers . items ( ) :
pin = pins [ pin_name ]
if pin is not None :
self . write ( register , pin )
2022-06-08 07:11:38 +00:00
_pressure_units = { " mH2O " : 0 , " mbar " : 1 , " kPa " : 2 , " mmHg " : 3 , " inH2O " : 4 , " psi " : 5 , " mmH2O " : 6 , } # (se fondoscala <=6 bar)
_leak_units = { " mmH2O " : 0 , " mbar " : 1 , " Pa " : 2 , " mmHg " : 3 , " inH2O " : 4 , " psi " : 5 , }
_leak_flow_units = { " cm3/min " : 0 , " cm3/h " : 1 , }
_volume_units = { " litri " : 0 , " cm3 " : 1 , }
_time_units = { " seconds " : 0 , }
_flow_units = { " liters/min " : 0 , " liters/h " : 1 , " m3/h " : 2 , }
_pressure_units_map = { v : k for k , v in _pressure_units . items ( ) }
_leak_units_map = { v : k for k , v in _leak_units . items ( ) }
_leak_flow_units_map = { v : k for k , v in _leak_flow_units . items ( ) }
_volume_units_map = { v : k for k , v in _volume_units . items ( ) }
_time_units_map = { v : k for k , v in _time_units . items ( ) }
_flow_units_map = { v : k for k , v in _flow_units . items ( ) }
def set_measure_units ( self ) :
2022-07-13 08:48:10 +00:00
for register , [ unit , decimals ] in {
" MEASURE UNITS: pressure measure units " : [ self . _pressure_units [ " mbar " ] , 0 ] , # red, ?purple?
" MEASURE UNITS: Leak measure units " : [ self . _leak_units [ " mbar " ] , 0 ] , # yellow
" MEASURE UNITS: leak flow rate measure units " : [ self . _leak_flow_units [ " cm3/min " ] , 0 ] , # blue
" MEASURE UNITS: Volume " : [ self . _volume_units [ " litri " ] , 0 ] , # green
" MEASURE UNITS: Flow rate measure units " : [ self . _flow_units [ " liters/min " ] , 0 ] , # orange
} . items ( ) :
2022-07-13 08:56:02 +00:00
print ( self . write ( register , ( decimals << 8 ) + unit ) )
2022-06-08 07:11:38 +00:00
def get_measure_units ( self ) :
units = { }
for [ register , unit_map , unit_names ] in [
[ " Relative pressure variable format - high resolution " , self . _pressure_units_map , [ " pressure_hr " , " red " , " r " , 21 , ] ] , # also by documentation color and register number
[ " Relative pressure variable format - low resolution " , self . _pressure_units_map , [ " pressure_lr " , " purple " , " p " , 22 , ] ] , # also by documentation color and register number
[ " Format of the variables related to the measurement of the differential leak pressure " , self . _leak_units_map , [ " leak " , " yellow " , " y " , 23 , ] ] , # also by documentation color and register number
[ " Format of the variables related to the calculated leak flow " , self . _leak_flow_units_map , [ " leak_flow " , " blue " , " b " , 24 , ] ] , # also by documentation color and register number
[ " Format of volume variables " , self . _volume_units_map , [ " volume " , " green " , " g " , 25 , ] ] , # also by documentation color and register number
[ " Format of time variables " , self . _time_units_map , [ " time " , " tangerine " , " t " , 26 , ] ] , # also by documentation color and register number
[ " Format of variables related to flow measurements " , self . _flow_units_map , [ " flow " , " orange " , " o " , 27 , ] ] , # also by documentation color and register number
2022-07-13 08:54:56 +00:00
[ " MEASURE UNITS: pressure measure units " , self . _pressure_units_map , [ " pressur " ] ] ,
[ " MEASURE UNITS: Leak measure units " , self . _leak_units_map , [ " lea " ] ] ,
[ " MEASURE UNITS: leak flow rate measure units " , self . _leak_flow_units_map , [ " leaflo " ] ] ,
[ " MEASURE UNITS: Volume " , self . _volume_units_map , [ " volum " ] ] ,
[ " MEASURE UNITS: Flow rate measure units " , self . _flow_units_map , [ " flo " ] ] ,
2022-06-08 07:11:38 +00:00
] :
v = self . read ( register )
2022-07-13 08:52:00 +00:00
unit = v & 0xff
decimals = ( v >> 8 ) & 0xff
unit_spec = [ 10 * * ( - decimals ) , unit_map . get ( unit , unit ) ]
2022-06-08 07:11:38 +00:00
for unit_name in unit_names :
2022-07-13 08:52:00 +00:00
units [ unit_name ] = unit_spec
2022-06-08 07:11:38 +00:00
return units
2022-07-04 10:36:51 +00:00
def _convert_from_format ( self , data , formatting = None , decoding_map = None ) :
2022-07-12 14:02:45 +00:00
if decoding_map is not None and data in decoding_map :
2022-07-04 10:36:51 +00:00
data = decoding_map [ data ]
if formatting is not None :
# units = self.units[formatting]
# data = [data * units[0], units[1]]
data = data * self . units [ formatting ] [ 0 ]
return data
2022-06-08 07:11:38 +00:00
2022-07-04 10:36:51 +00:00
def _convert_to_format ( self , data , formatting = None , encoding_map = None ) :
if formatting is not None :
data = int ( data / self . units [ formatting ] [ 0 ] )
2022-07-12 14:02:45 +00:00
if encoding_map is not None and data in encoding_map :
2022-07-04 10:36:51 +00:00
data = encoding_map [ data ]
return data
2022-06-08 07:11:38 +00:00
2022-07-04 10:36:51 +00:00
def read ( self , register , * args , formatting = None , decoding_map = None , * * kwargs ) :
2022-06-01 16:37:19 +00:00
if type ( register ) is str :
2022-06-08 07:11:38 +00:00
_ , s = self . registers [ register ]
2022-07-04 10:36:51 +00:00
if formatting is None :
formatting = s . get ( " f " , None )
if decoding_map is None :
decoding_map = s . get ( " decoding " , None )
return self . _convert_from_format ( super ( ) . read ( register , * args , * * kwargs ) , formatting = formatting , decoding_map = decoding_map )
2022-06-01 16:37:19 +00:00
2022-07-04 10:36:51 +00:00
def write ( self , register , data , * args , formatting = None , encoding_map = None , * * kwargs ) :
2022-06-08 07:11:38 +00:00
if type ( register ) is str :
_ , s = self . registers [ register ]
2022-07-04 10:36:51 +00:00
if formatting is None :
formatting = s . get ( " f " , None )
if encoding_map is None :
encoding_map = s . get ( " encoding " , None )
return super ( ) . write ( register , self . _convert_to_format ( data , formatting = formatting , encoding_map = encoding_map ) , * args , * * kwargs )
2022-06-01 16:37:19 +00:00
def _get ( self ) :
2022-06-08 07:11:38 +00:00
# print("TECNA", str(int(QThread.currentThreadId())), flush=True)
2022-06-01 16:37:19 +00:00
# READ INFO
2022-06-08 07:11:38 +00:00
info = { r : self . read ( r ) for r in [
2022-07-12 13:55:28 +00:00
" Instrument status: table of parameters in use " ,
2022-06-08 07:11:38 +00:00
" Instrument status: active phase " ,
" Test circuit pressure, in real time " ,
" Measured leak, in real time " ,
" Regulated pressure, in real time " ,
" Active alarm flags " ,
" Running test: type of test " ,
" Testing in progress: progressive sequence index " ,
] }
2022-07-04 10:36:51 +00:00
if info [ " Running test: type of test " ] is None :
info . update ( self . get_test_results ( ) )
2022-07-06 13:54:22 +00:00
self . log . debug ( str ( info ) )
2022-06-08 07:11:38 +00:00
super ( ) . _get ( [ info ] )
2022-07-12 14:02:45 +00:00
def start_test ( self , table = 100 ) :
2022-07-12 08:48:04 +00:00
self . log . info ( f " starting test table { table !r} " )
2022-07-12 13:55:28 +00:00
self . write ( " INSTRUMENT SETTING: Selected table " , table )
2022-07-12 08:48:04 +00:00
self . write ( " Start testing " , table )
2022-07-04 10:36:51 +00:00
def stop_test ( self ) :
2022-07-12 08:48:04 +00:00
self . log . warning ( " stopping test " )
2022-07-04 10:36:51 +00:00
self . write ( " Stop the test in progress " , 0 )
2022-06-08 07:11:38 +00:00
def get_test_results ( self ) :
2022-07-12 08:48:04 +00:00
self . log . info ( " getting test results " )
2022-06-08 07:11:38 +00:00
return { r : self . read ( r ) for r in [
" Running test: measured leak " ,
" Running test: calculated leak flow rate " ,
" Running test: calculate RVP % " ,
" Running test: result " ,
] }
2022-07-12 08:48:04 +00:00
2022-07-12 14:02:45 +00:00
def write_recipe ( self , recipe , table = 100 ) :
2022-07-12 08:48:04 +00:00
# "pressure_ramp": self.pressure_ramp_sb,
# "stabilization_cycles": self.stabilization_cycles_sb,
2022-07-12 13:30:30 +00:00
recipe_name = recipe . spec [ " part_number " ] [ : 16 ] . encode ( " ascii " )
recipe_name + = b " \x00 " * ( 16 - len ( recipe_name ) )
2022-07-12 08:48:04 +00:00
recipe = {
2022-07-12 13:55:28 +00:00
" INSTRUMENT SETTING: Various flags " : 0b0000000000000000 ,
2022-07-12 13:30:30 +00:00
* * { 719 + i : ( recipe_name [ i * 2 + 1 ] << 8 ) + recipe_name [ i * 2 ] for i in range ( 8 ) } ,
2022-07-12 12:22:37 +00:00
" Selection number of test table on which you intend to work " : table ,
2022-07-12 08:48:04 +00:00
" Type of test " : " LEAK TEST " ,
2022-07-12 12:44:18 +00:00
" Test flags " : 0b0000000001010000 ,
2022-07-12 08:48:04 +00:00
" T0 - Pre-filling time " : recipe . spec [ " pre_filling_time " ] ,
" P0 - Pre-filling pressure " : recipe . spec [ " pre_filling_pressure " ] ,
" T1 - Filling time " : recipe . spec [ " filling_time " ] ,
" T2 - Settling time " : recipe . spec [ " settling_time " ] ,
" PR %- Lo wer tolerance on pressure / P- Minimum pressure " : recipe . spec [ " settling_pressure_min_percent " ] ,
" PR % + Superior tolerance on pressure / P+ Pressure max " : recipe . spec [ " settling_pressure_max_percent " ] ,
" T3 - Measure time " : recipe . spec [ " test_time " ] ,
" Q- Leak limit " : recipe . spec [ " test_pressure_min_delta " ] ,
" PREL - Nominal test pressure " : recipe . spec [ " test_pressure " ] ,
" Q+ Upper limit " : recipe . spec [ " test_pressure_max_delta " ] ,
" FST - discharge time " : recipe . spec [ " flush_time " ] ,
" FSL - discharge limit " : recipe . spec [ " flush_pressure " ] ,
}
self . log . debug ( str ( recipe ) )
for register , value in recipe . items ( ) :
self . write ( register , value )