from .component import Component from .modbus_component import ModbusComponent from .tecna_marposs_provaset_t3l_registers import registers as t3l_registers from .tecna_marposs_provaset_t3p_registers import registers as t3p_registers # from pymodbus.client.sync import ModbusSerialClient as ModbusClient # import serial # client = ModbusClient(method="rtu", port="COM3", stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, baudrate=115200, timeout=1, strict=False) # client.connect() # client.read_holding_registers(1, count=1) class TecnaMarpossProvasetT3(ModbusComponent): def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True): super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded, registers=None) # pin_registers = { # "admin_pin": "PASSWORD: Administration", # was 1909 # "modify_pin": "PASSWORD: Modify program", # was 0 # "select_pin": "PASSWORD: Select program", # was 0 # } def config_changed(self): super().config_changed() # self.pins = { # "admin_pin": self.config.get("admin_pin", None), # "modify_pin": self.config.get("modify_pin", None), # "select_pin": self.config.get("select_pin", None), # } # self.unlock_tecna() self.model = self.config[self.name]["model"].lower() if self.model == "t3p": self.registers = t3p_registers elif self.model == "t3l": self.registers = t3l_registers else: raise NotImplementedError(f"techna t3 model {self.model!r} not implemented.") self.set_measure_units() self.units = self.get_measure_units() self.log.info(f"units: {self.units}") # def unlock_tecna(self, **kwargs): # pins = self.pins.copy() # pins.update(kwargs) # for pin_name, register in self.pin_registers.items(): # pin = pins[pin_name] # if pin is not None: # self.write(register, pin) _pressure_units = {"mH2O": 0, "mbar": 1, "kPa": 2, "mmHg": 3, "inH2O": 4, "psi": 5, "mmH2O": 6, } # (se fondoscala <=6 bar) _leak_units = {"mmH2O": 0, "mbar": 1, "Pa": 2, "mmHg": 3, "inH2O": 4, "psi": 5, } _leak_flow_units = {"cm3/min": 0, "cm3/h": 1, } _volume_units = {"litri": 0, "cm3": 1, } _time_units = {"seconds": 0, } _flow_units = {"liters/min": 0, "liters/h": 1, "m3/h": 2, "cfm": 3} _pressure_units_map = {v: k for k, v in _pressure_units.items()} _leak_units_map = {v: k for k, v in _leak_units.items()} _leak_flow_units_map = {v: k for k, v in _leak_flow_units.items()} _volume_units_map = {v: k for k, v in _volume_units.items()} _time_units_map = {v: k for k, v in _time_units.items()} _flow_units_map = {v: k for k, v in _flow_units.items()} def set_measure_units(self): if self.model == "t3p": for register, [unit, decimals] in { "MEASURE UNITS: Relative pressure": [self._pressure_units["mbar"], 0], # red, purple "MEASURE UNITS: Differential (leak) pressure": [self._leak_units["mbar"], 0], # yellow "MEASURE UNITS: Calculated leak flow rate": [self._leak_flow_units["cm3/min"], 0], # blue "MEASURE UNITS: Volume": [self._volume_units["litri"], 0], # green "MEASURE UNITS: Flow rate": [self._flow_units["liters/min"], 0], # orange }.items(): self.write(register, (decimals << 8) + unit) elif self.model == "t3l": for register, [unit, decimals] in { "MEASURE UNITS: Relative pressure": [self._pressure_units["mbar"], 0], # red, purple "MEASURE UNITS: Differential (leak) pressure": [self._leak_units["mbar"], 0], # yellow "MEASURE UNITS: Calculated leak flow rate": [self._leak_flow_units["cm3/min"], 0], # blue "MEASURE UNITS: Volume": [self._volume_units["litri"], 0], # green "MEASURE UNITS: Flow rate": [self._flow_units["liters/min"], 0], # orange }.items(): self.write(register, unit) # (decimals << 8) + unit) else: raise NotImplementedError(f"techna t3 model {self.model!r} not implemented.") def get_measure_units(self): units = {} if self.model == "t3p": for [register, unit_map, unit_names] in [ ["Running test: relative pressure format", self._pressure_units_map, ["relative_pressure", "red", "r", 21, ]], # also by documentation color and register number ["Running test: differential pressure format", self._pressure_units_map, ["differential_pressure", "purple", "p", 22, ]], # also by documentation color and register number ["Running test: relative pressure format (low resolution)", self._leak_units_map, ["relative_pressure_lr", "yellow", "y", 23, ]], # also by documentation color and register number ["Running test: calculated leak flow rate format", self._leak_flow_units_map, ["leak_flow", "blue", "b", 24, ]], # also by documentation color and register number ["Running test: volume format", self._volume_units_map, ["volume", "green", "g", 25, ]], # also by documentation color and register number ["Running test: time format", self._time_units_map, ["time", "orange", "t", 26, ]], # also by documentation color and register number ["Running test: flow rate format", self._flow_units_map, ["flow", "white", "o", 27, ]], # also by documentation color and register number ]: v = self.read(register) unit_spec = [10**(-((v >> 8) & 0xff)), unit_map[v & 0xff]] for unit_name in unit_names: units[unit_name] = unit_spec elif self.model == "t3l": for [register, unit_map, unit_names] in [ [["Running test: relative pressure scale", "Running test: relative pressure decimals"], self._pressure_units_map, ["relative_pressure", "red", "r", 1501, ]], # also by documentation color and register number [["Running test: differential pressure scale", "Running test: differential pressure decimals"], self._pressure_units_map, ["differential_pressure", "purple", "p", 1503, ]], # also by documentation color and register number [["Running test: relative pressure scale (low resolution)", "Running test: relative pressure decimals (low resolution)"], self._leak_units_map, ["relative_pressure_lr", "yellow", "y", 1505, ]], # also by documentation color and register number ["Running test: calculated leak flow rate format", self._leak_flow_units_map, ["leak_flow", "blue", "b", 1507, ]], # also by documentation color and register number ["Running test: volume format", self._volume_units_map, ["volume", "green", "g", 1508, ]], # also by documentation color and register number ["Running test: time format", self._time_units_map, ["time", "orange", "t", 1509, ]], # also by documentation color and register number ["Running test: flow rate format", self._flow_units_map, ["flow", "white", "o", 1510, ]], # also by documentation color and register number ["Running test: line pressure format", self._pressure_units_map, ["line_pressure", "lp", "l", 1511, ]], # also by documentation color and register number ]: if type(register) is list: v = [self.read(r) for r in register] unit_spec = [10**(-(v[1] & 0xff)), v[0]] else: v = self.read(register) unit_spec = [10**(-((v >> 8) & 0xff)), unit_map[v & 0xff]] for unit_name in unit_names: units[unit_name] = unit_spec else: raise NotImplementedError(f"techna t3 model {self.model!r} not implemented.") return units def _convert_from_format(self, data, formatting=None, decoding_map=None): if decoding_map is not None and data in decoding_map: data = decoding_map[data] if formatting is not None: # units = self.units[formatting] # data = [data * units[0], units[1]] data = data * self.units[formatting][0] return data def _convert_to_format(self, data, formatting=None, encoding_map=None): if formatting is not None: data = int(data / self.units[formatting][0]) if encoding_map is not None and data in encoding_map: data = encoding_map[data] return data @Component.reconfig_on_error def read(self, register, *args, data_type=None, gain=None, offset=None, formatting=None, decoding_map=None, **kwargs): if type(register) is str: register, s = self.registers[register] if data_type is None: data_type = s.get("dt", None) if gain is None: gain = s.get("g", None) if offset is None: offset = s.get("o", None) if formatting is None: formatting = s.get("f", None) if decoding_map is None: decoding_map = s.get("decoding", None) if not len(args): args = s.get("a", []) if not len(kwargs): kwargs = s.get("k", {}) if data_type is None: data_type = "16bit_uint" if gain is None: gain = 1 if offset is None: offset = 0 return self._convert_from_format( super().read( register, *args, data_type=data_type, gain=gain, offset=offset, **kwargs, ), formatting=formatting, decoding_map=decoding_map, ) @Component.reconfig_on_error def write(self, register, data, *args, data_type=None, gain=None, offset=None, formatting=None, encoding_map=None, **kwargs): if type(register) is str: register, s = self.registers[register] if data_type is None: data_type = s.get("dt", None) if gain is None: gain = s.get("g", None) if offset is None: offset = s.get("o", None) if formatting is None: formatting = s.get("f", None) if encoding_map is None: encoding_map = s.get("encoding", None) if not len(args): args = s.get("a", []) if not len(kwargs): kwargs = s.get("k", {}) if data_type is None: data_type = "16bit_uint" if gain is None: gain = 1 if offset is None: offset = 0 return super().write( register, self._convert_to_format( data, formatting=formatting, encoding_map=encoding_map, ), *args, data_type=data_type, gain=gain, offset=offset, **kwargs, ) @Component.reconfig_on_error def _get(self): # print("TECNA", str(int(QThread.currentThreadId())), flush=True) # READ INFO info = {r: self.read(r) for r in [ "Real time test pressure output", "Real time differential pressure output", "Real time pressure line regulator", "Active alarm flags", "Active test program number", "Running test: active phase", "Running test: test type", "Running test: sequence index", "Digital inputs status (mask)", # "Digital outputs status (mask)", ]} if self.model == "t3p": pass elif self.model == "t3l": info.update({r: self.read(r) for r in [ "Active not severe alarm flags", ]}) else: raise NotImplementedError(f"techna t3 model {self.model!r} not implemented.") if info["Running test: active phase"] == "FINE TEST": # "END TEST, WAITING THE START OF A NEW TEST": info.update(self.get_test_results()) self.log.debug(str(info)) super()._get([info]) @Component.reconfig_on_error def _set(self, val=None): if val is not None: # handle request: for register, value in val.items(): print(register, value) self.write(register, value) super()._set(val) def start_test(self, table=100): self.log.info(f"starting test table {table!r}") self.write("Source of test program number selection", "FROM PARAMETER (SET BY LCD OR SERIAL LINE)") self.write("Selected program", table) self.write("Start test", table) def stop_test(self): self.log.warning("stopping test") self.write("Reset running test", 0) def get_test_results(self): self.log.info("getting test results") return {r: self.read(r) for r in [ "Running test: phase backwards time", "Running test: filling pressure", "Running test: pressure at the end of settling", "Running test: burst pressure", "Running test: measured leak", "Running test: calculated leak flow rate", "Running test: calculate RVP%", "Running test: result", ]} def write_recipe(self, recipe, step, table=100): recipe_name = recipe.part_number[:16].encode("ascii") recipe_name += b"\x00" * (16 - len(recipe_name)) spec = { "Flag: Instrument settings": 0b0000000000000000, "Test program for read/write operation": table, **{719 - 1 + i: (recipe_name[i * 2 + 1] << 8) + recipe_name[i * 2] for i in range(8)}, "Test type": "Leak Test", "Test flags": 0b0110000001011100, "T0 - Pre-filling time": step.spec["pre_filling_time"], "P0 - Pre-filling pressure": step.spec["pre_filling_pressure"], "T1 - Filling time": step.spec["filling_time"], "T2 - Settling time": step.spec["settling_time"], "PR- - Min pressure tolerance %": step.spec["settling_pressure_min_percent"], "PR+ - Max pressure tolerance % (P+)": step.spec["settling_pressure_max_percent"], "T3 - Measure time": step.spec["test_time"], "Q- Lower test leak limit": step.spec["test_pressure_min_delta"], "PREL - Nominal test pressure": step.spec["test_pressure"], "Q+ Upper test leak limit": step.spec["test_pressure_max_delta"], "FST - Discharge time": step.spec["flush_time"], "FSL - Discharge limit": step.spec["flush_pressure"], } if self.model == "t3p": pass elif self.model == "t3l": spec.update({ "Use programs or use products": 0, "Nominal peak pressure": step.spec["test_pressure"], "Pn - Nominal test pressure": step.spec["test_pressure"], }) else: raise NotImplementedError(f"techna t3 model {self.model!r} not implemented.") self.log.debug(str(spec)) for register, value in spec.items(): self.write(register, value)