2022-06-08 07:11:38 +00:00
from . modbus_component import ModbusComponent
2022-07-18 10:32:05 +00:00
from . tecna_marposs_provaset_t3p_registers import registers
2022-06-01 16:37:19 +00:00
# from pymodbus.client.sync import ModbusSerialClient as ModbusClient
# import serial
# client = ModbusClient(method="rtu", port="COM3", stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, baudrate=115200, timeout=1, strict=False)
# client.connect()
# client.read_holding_registers(1, count=1)
2022-07-18 10:32:05 +00:00
class TecnaMarpossProvasetT3P ( ModbusComponent ) :
2022-06-01 16:37:19 +00:00
def __init__ ( self , config = None , name = None , period = 1 , lazy = True , paused = False , threaded = True ) :
2022-06-08 07:11:38 +00:00
super ( ) . __init__ ( config = config , name = name , period = period , lazy = lazy , paused = paused , threaded = threaded , registers = registers )
2022-06-01 16:37:19 +00:00
2022-07-18 10:32:05 +00:00
# pin_registers = {
# "admin_pin": "PASSWORD: Administration", # was 1909
# "modify_pin": "PASSWORD: Modify program", # was 0
# "select_pin": "PASSWORD: Select program", # was 0
# }
2022-07-13 08:36:58 +00:00
2022-06-01 16:37:19 +00:00
def config_changed ( self ) :
2022-06-08 07:11:38 +00:00
super ( ) . config_changed ( )
2022-07-18 10:32:05 +00:00
# self.pins = {
# "admin_pin": self.config.get("admin_pin", None),
# "modify_pin": self.config.get("modify_pin", None),
# "select_pin": self.config.get("select_pin", None),
# }
# self.unlock_tecna()
2022-07-13 08:36:58 +00:00
self . set_measure_units ( )
2022-06-08 07:11:38 +00:00
self . units = self . get_measure_units ( )
2022-07-04 10:36:51 +00:00
self . log . info ( f " units: { self . units } " )
2022-06-08 07:11:38 +00:00
2022-07-18 10:32:05 +00:00
# def unlock_tecna(self, **kwargs):
# pins = self.pins.copy()
# pins.update(kwargs)
# for pin_name, register in self.pin_registers.items():
# pin = pins[pin_name]
# if pin is not None:
# self.write(register, pin)
2022-07-13 08:36:58 +00:00
2022-06-08 07:11:38 +00:00
_pressure_units = { " mH2O " : 0 , " mbar " : 1 , " kPa " : 2 , " mmHg " : 3 , " inH2O " : 4 , " psi " : 5 , " mmH2O " : 6 , } # (se fondoscala <=6 bar)
_leak_units = { " mmH2O " : 0 , " mbar " : 1 , " Pa " : 2 , " mmHg " : 3 , " inH2O " : 4 , " psi " : 5 , }
_leak_flow_units = { " cm3/min " : 0 , " cm3/h " : 1 , }
_volume_units = { " litri " : 0 , " cm3 " : 1 , }
_time_units = { " seconds " : 0 , }
_flow_units = { " liters/min " : 0 , " liters/h " : 1 , " m3/h " : 2 , }
_pressure_units_map = { v : k for k , v in _pressure_units . items ( ) }
_leak_units_map = { v : k for k , v in _leak_units . items ( ) }
_leak_flow_units_map = { v : k for k , v in _leak_flow_units . items ( ) }
_volume_units_map = { v : k for k , v in _volume_units . items ( ) }
_time_units_map = { v : k for k , v in _time_units . items ( ) }
_flow_units_map = { v : k for k , v in _flow_units . items ( ) }
def set_measure_units ( self ) :
2022-07-13 08:48:10 +00:00
for register , [ unit , decimals ] in {
2022-07-18 10:32:05 +00:00
" MEASURE UNITS: Relative pressure " : [ self . _pressure_units [ " mbar " ] , 0 ] , # red, purple
" MEASURE UNITS: Differential (leak) pressure " : [ self . _leak_units [ " mbar " ] , 0 ] , # yellow
" MEASURE UNITS: Calculated leak flow rate " : [ self . _leak_flow_units [ " cm3/min " ] , 0 ] , # blue
2022-07-13 08:48:10 +00:00
" MEASURE UNITS: Volume " : [ self . _volume_units [ " litri " ] , 0 ] , # green
2022-07-18 10:32:05 +00:00
" MEASURE UNITS: Flow rate " : [ self . _flow_units [ " liters/min " ] , 0 ] , # orange
2022-07-13 08:48:10 +00:00
} . items ( ) :
2022-07-13 09:16:30 +00:00
self . write ( register , ( decimals << 8 ) + unit )
2022-06-08 07:11:38 +00:00
def get_measure_units ( self ) :
units = { }
for [ register , unit_map , unit_names ] in [
2022-07-18 10:32:05 +00:00
[ " Running test: relative pressure format " , self . _pressure_units_map , [ " relative_pressure " , " red " , " r " , 21 , ] ] , # also by documentation color and register number
[ " Running test: differential pressure format " , self . _pressure_units_map , [ " differential_pressure " , " purple " , " p " , 22 , ] ] , # also by documentation color and register number
[ " Running test: relative pressure format (low resolution) " , self . _leak_units_map , [ " relative_pressure_lr " , " yellow " , " y " , 23 , ] ] , # also by documentation color and register number
[ " Running test: calculated leak flow rate format " , self . _leak_flow_units_map , [ " leak_flow " , " blue " , " b " , 24 , ] ] , # also by documentation color and register number
[ " Running test: volume format " , self . _volume_units_map , [ " volume " , " green " , " g " , 25 , ] ] , # also by documentation color and register number
[ " Running test: time format " , self . _time_units_map , [ " time " , " orange " , " t " , 26 , ] ] , # also by documentation color and register number
[ " Running test: flow rate format " , self . _flow_units_map , [ " flow " , " white " , " o " , 27 , ] ] , # also by documentation color and register number
2022-06-08 07:11:38 +00:00
] :
v = self . read ( register )
2022-07-13 09:11:32 +00:00
unit_spec = [ 10 * * ( - ( ( v >> 8 ) & 0xff ) ) , unit_map [ v & 0xff ] ]
2022-06-08 07:11:38 +00:00
for unit_name in unit_names :
2022-07-13 08:52:00 +00:00
units [ unit_name ] = unit_spec
2022-06-08 07:11:38 +00:00
return units
2022-07-04 10:36:51 +00:00
def _convert_from_format ( self , data , formatting = None , decoding_map = None ) :
2022-07-12 14:02:45 +00:00
if decoding_map is not None and data in decoding_map :
2022-07-04 10:36:51 +00:00
data = decoding_map [ data ]
if formatting is not None :
# units = self.units[formatting]
# data = [data * units[0], units[1]]
data = data * self . units [ formatting ] [ 0 ]
return data
2022-06-08 07:11:38 +00:00
2022-07-04 10:36:51 +00:00
def _convert_to_format ( self , data , formatting = None , encoding_map = None ) :
if formatting is not None :
data = int ( data / self . units [ formatting ] [ 0 ] )
2022-07-12 14:02:45 +00:00
if encoding_map is not None and data in encoding_map :
2022-07-04 10:36:51 +00:00
data = encoding_map [ data ]
return data
2022-06-08 07:11:38 +00:00
2022-07-19 09:59:00 +00:00
def read ( self , register , * args , data_type = None , gain = None , offset = None , formatting = None , decoding_map = None , * * kwargs ) :
2022-06-01 16:37:19 +00:00
if type ( register ) is str :
2022-07-19 09:59:00 +00:00
register , s = self . registers [ register ]
if data_type is None :
data_type = s . get ( " dt " , None )
if gain is None :
gain = s . get ( " g " , None )
if offset is None :
offset = s . get ( " o " , None )
2022-07-04 10:36:51 +00:00
if formatting is None :
formatting = s . get ( " f " , None )
if decoding_map is None :
decoding_map = s . get ( " decoding " , None )
2022-07-19 09:59:00 +00:00
if not len ( args ) :
args = s . get ( " a " , [ ] )
if not len ( kwargs ) :
kwargs = s . get ( " k " , { } )
if data_type is None :
data_type = " 16bit_uint "
if gain is None :
gain = 1
if offset is None :
offset = 0
return self . _convert_from_format (
super ( ) . read (
register ,
* args ,
data_type = data_type ,
gain = gain ,
offset = offset ,
* * kwargs ,
) ,
formatting = formatting ,
decoding_map = decoding_map ,
)
def write ( self , register , data , * args , data_type = None , gain = None , offset = None , formatting = None , encoding_map = None , * * kwargs ) :
2022-06-08 07:11:38 +00:00
if type ( register ) is str :
2022-07-19 09:59:00 +00:00
register , s = self . registers [ register ]
if data_type is None :
data_type = s . get ( " dt " , None )
if gain is None :
gain = s . get ( " g " , None )
if offset is None :
offset = s . get ( " o " , None )
2022-07-04 10:36:51 +00:00
if formatting is None :
formatting = s . get ( " f " , None )
if encoding_map is None :
encoding_map = s . get ( " encoding " , None )
2022-07-19 09:59:00 +00:00
if not len ( args ) :
args = s . get ( " a " , [ ] )
if not len ( kwargs ) :
kwargs = s . get ( " k " , { } )
if data_type is None :
data_type = " 16bit_uint "
if gain is None :
gain = 1
if offset is None :
offset = 0
return super ( ) . write (
register ,
self . _convert_to_format (
data ,
formatting = formatting ,
encoding_map = encoding_map ,
) ,
* args ,
data_type = data_type ,
gain = gain ,
offset = offset ,
* * kwargs ,
)
2022-06-01 16:37:19 +00:00
def _get ( self ) :
2022-06-08 07:11:38 +00:00
# print("TECNA", str(int(QThread.currentThreadId())), flush=True)
2022-06-01 16:37:19 +00:00
# READ INFO
2022-06-08 07:11:38 +00:00
info = { r : self . read ( r ) for r in [
2022-07-18 10:32:05 +00:00
" Real time test pressure output " ,
" Real time differential pressure output " ,
" Real time pressure line regulator " ,
2022-06-08 07:11:38 +00:00
" Active alarm flags " ,
2022-07-18 10:32:05 +00:00
" Active test program number " ,
" Running test: active phase " ,
" Running test: test type " ,
" Running test: sequence index " ,
2022-06-08 07:11:38 +00:00
] }
2022-08-01 11:29:12 +00:00
if info [ " Running test: active phase " ] == " FINE TEST " : # "END TEST, WAITING THE START OF A NEW TEST":
2022-07-04 10:36:51 +00:00
info . update ( self . get_test_results ( ) )
2022-07-06 13:54:22 +00:00
self . log . debug ( str ( info ) )
2022-06-08 07:11:38 +00:00
super ( ) . _get ( [ info ] )
2022-07-12 14:02:45 +00:00
def start_test ( self , table = 100 ) :
2022-07-12 08:48:04 +00:00
self . log . info ( f " starting test table { table !r} " )
2022-07-18 10:32:05 +00:00
self . write ( " Source of test program number selection " , " FROM PARAMETER (SET BY LCD OR SERIAL LINE) " )
self . write ( " Selected program " , table )
self . write ( " Start test " , table )
2022-07-04 10:36:51 +00:00
def stop_test ( self ) :
2022-07-12 08:48:04 +00:00
self . log . warning ( " stopping test " )
2022-07-18 10:32:05 +00:00
self . write ( " Reset running test " , 0 )
2022-07-04 10:36:51 +00:00
2022-06-08 07:11:38 +00:00
def get_test_results ( self ) :
2022-07-12 08:48:04 +00:00
self . log . info ( " getting test results " )
2022-06-08 07:11:38 +00:00
return { r : self . read ( r ) for r in [
2022-07-18 10:32:05 +00:00
" Running test: phase backwards time " ,
" Running test: filling pressure " ,
" Running test: pressure at the end of settling " ,
" Running test: burst pressure " ,
2022-06-08 07:11:38 +00:00
" Running test: measured leak " ,
" Running test: calculated leak flow rate " ,
" Running test: calculate RVP % " ,
" Running test: result " ,
] }
2022-07-12 08:48:04 +00:00
2022-07-19 09:59:00 +00:00
def write_recipe ( self , recipe , step , table = 100 ) :
recipe_name = recipe . part_number [ : 16 ] . encode ( " ascii " )
2022-07-12 13:30:30 +00:00
recipe_name + = b " \x00 " * ( 16 - len ( recipe_name ) )
2022-07-19 09:59:00 +00:00
spec = {
2022-07-18 10:32:05 +00:00
" Flag: Instrument settings " : 0b0000000000000000 ,
" Test program for read/write operation " : table ,
* * { 719 - 1 + i : ( recipe_name [ i * 2 + 1 ] << 8 ) + recipe_name [ i * 2 ] for i in range ( 8 ) } ,
" Test type " : " Leak Test " ,
2022-07-18 14:25:31 +00:00
" Test flags " : 0b0110000001011100 ,
2022-07-19 09:59:00 +00:00
" T0 - Pre-filling time " : step . spec [ " pre_filling_time " ] ,
" P0 - Pre-filling pressure " : step . spec [ " pre_filling_pressure " ] ,
" T1 - Filling time " : step . spec [ " filling_time " ] ,
" T2 - Settling time " : step . spec [ " settling_time " ] ,
" PR- - Min pressure tolerance % " : step . spec [ " settling_pressure_min_percent " ] ,
" PR+ - Max pressure tolerance % (P+) " : step . spec [ " settling_pressure_max_percent " ] ,
" T3 - Measure time " : step . spec [ " test_time " ] ,
" Q- Lower test leak limit " : step . spec [ " test_pressure_min_delta " ] ,
" PREL - Nominal test pressure " : step . spec [ " test_pressure " ] ,
" Q+ Upper test leak limit " : step . spec [ " test_pressure_max_delta " ] ,
" FST - Discharge time " : step . spec [ " flush_time " ] ,
" FSL - Discharge limit " : step . spec [ " flush_pressure " ] ,
2022-07-12 08:48:04 +00:00
}
2022-07-19 09:59:00 +00:00
self . log . debug ( str ( spec ) )
for register , value in spec . items ( ) :
2022-07-12 08:48:04 +00:00
self . write ( register , value )