from .modbus_component import ModbusComponent from .tecna_marposs_provaset_t3_registers import registers # from pymodbus.client.sync import ModbusSerialClient as ModbusClient # import serial # client = ModbusClient(method="rtu", port="COM3", stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, baudrate=115200, timeout=1, strict=False) # client.connect() # client.read_holding_registers(1, count=1) class TecnaMarpossProvasetT3(ModbusComponent): def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True): super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded, registers=registers) def config_changed(self): super().config_changed() # self.set_measure_units() self.units = self.get_measure_units() self.log.info(f"units: {self.units}") _pressure_units = {"mH2O": 0, "mbar": 1, "kPa": 2, "mmHg": 3, "inH2O": 4, "psi": 5, "mmH2O": 6, } # (se fondoscala <=6 bar) _leak_units = {"mmH2O": 0, "mbar": 1, "Pa": 2, "mmHg": 3, "inH2O": 4, "psi": 5, } _leak_flow_units = {"cm3/min": 0, "cm3/h": 1, } _volume_units = {"litri": 0, "cm3": 1, } _time_units = {"seconds": 0, } _flow_units = {"liters/min": 0, "liters/h": 1, "m3/h": 2, } _pressure_units_map = {v: k for k, v in _pressure_units.items()} _leak_units_map = {v: k for k, v in _leak_units.items()} _leak_flow_units_map = {v: k for k, v in _leak_flow_units.items()} _volume_units_map = {v: k for k, v in _volume_units.items()} _time_units_map = {v: k for k, v in _time_units.items()} _flow_units_map = {v: k for k, v in _flow_units.items()} def set_measure_units(self): return [ self.write("MEASURE UNITS: pressure measure units", self._pressure_units["mbar"]), # red, ?purple? self.write("MEASURE UNITS: Leak measure units", self._leak_units["mbar"]), # yellow self.write("MEASURE UNITS: leak flow rate measure units", self._leak_flow_units["cm3/min"]), # blue self.write("MEASURE UNITS: Volume", self._volume_units["litri"]), # green self.write("MEASURE UNITS: Flow rate measure units", self._flow_units["liters/min"]), # orange ] def get_measure_units(self): units = {} for [register, unit_map, unit_names] in [ ["Relative pressure variable format - high resolution", self._pressure_units_map, ["pressure_hr", "red", "r", 21, ]], # also by documentation color and register number ["Relative pressure variable format - low resolution", self._pressure_units_map, ["pressure_lr", "purple", "p", 22, ]], # also by documentation color and register number ["Format of the variables related to the measurement of the differential leak pressure", self._leak_units_map, ["leak", "yellow", "y", 23, ]], # also by documentation color and register number ["Format of the variables related to the calculated leak flow", self._leak_flow_units_map, ["leak_flow", "blue", "b", 24, ]], # also by documentation color and register number ["Format of volume variables", self._volume_units_map, ["volume", "green", "g", 25, ]], # also by documentation color and register number ["Format of time variables", self._time_units_map, ["time", "tangerine", "t", 26, ]], # also by documentation color and register number ["Format of variables related to flow measurements", self._flow_units_map, ["flow", "orange", "o", 27, ]], # also by documentation color and register number ]: v = self.read(register) unit = [10**(-((v >> 8) & 0xff)), unit_map[v & 0xff]] for unit_name in unit_names: units[unit_name] = unit return units def _convert_from_format(self, data, formatting=None, decoding_map=None): if decoding_map is not None: data = decoding_map[data] if formatting is not None: # units = self.units[formatting] # data = [data * units[0], units[1]] data = data * self.units[formatting][0] return data def _convert_to_format(self, data, formatting=None, encoding_map=None): if formatting is not None: data = int(data / self.units[formatting][0]) if encoding_map is not None: data = encoding_map[data] return data def read(self, register, *args, formatting=None, decoding_map=None, **kwargs): if type(register) is str: _, s = self.registers[register] if formatting is None: formatting = s.get("f", None) if decoding_map is None: decoding_map = s.get("decoding", None) return self._convert_from_format(super().read(register, *args, **kwargs), formatting=formatting, decoding_map=decoding_map) def write(self, register, data, *args, formatting=None, encoding_map=None, **kwargs): if type(register) is str: _, s = self.registers[register] if formatting is None: formatting = s.get("f", None) if encoding_map is None: encoding_map = s.get("encoding", None) return super().write(register, self._convert_to_format(data, formatting=formatting, encoding_map=encoding_map), *args, **kwargs) def _get(self): # print("TECNA", str(int(QThread.currentThreadId())), flush=True) # READ INFO info = {r: self.read(r) for r in [ "Instrument status: table of parameters in use", "Instrument status: active phase", "Test circuit pressure, in real time", "Measured leak, in real time", "Regulated pressure, in real time", "Active alarm flags", "Running test: type of test", "Testing in progress: progressive sequence index", ]} if info["Running test: type of test"] is None: info.update(self.get_test_results()) self.log.debug(str(info)) super()._get([info]) def start_test(self, table=1): self.log.info(f"starting test table {table!r}") self.write("INSTRUMENT SETTING: Selected table", table) self.write("Start testing", table) def stop_test(self): self.log.warning("stopping test") self.write("Stop the test in progress", 0) def get_test_results(self): self.log.info("getting test results") return {r: self.read(r) for r in [ "Running test: measured leak", "Running test: calculated leak flow rate", "Running test: calculate RVP%", "Running test: result", ]} def write_recipe(self, recipe, table=1): # "pressure_ramp": self.pressure_ramp_sb, # "stabilization_cycles": self.stabilization_cycles_sb, recipe_name = recipe.spec["part_number"][:16].encode("ascii") recipe_name += b"\x00" * (16 - len(recipe_name)) recipe = { # "PASSWORD: Administrator": 1909, # "PASSWORD: Modify program": 0, # "PASSWORD: Select program": 0, "INSTRUMENT SETTING: Various flags": 0b0000000000000000, **{719 + i: (recipe_name[i * 2 + 1] << 8) + recipe_name[i * 2] for i in range(8)}, "Selection number of test table on which you intend to work": table, "Type of test": "LEAK TEST", "Test flags": 0b0000000001010000, "T0 - Pre-filling time": recipe.spec["pre_filling_time"], "P0 - Pre-filling pressure": recipe.spec["pre_filling_pressure"], "T1 - Filling time": recipe.spec["filling_time"], "T2 - Settling time": recipe.spec["settling_time"], "PR%- Lower tolerance on pressure / P- Minimum pressure": recipe.spec["settling_pressure_min_percent"], "PR%+ Superior tolerance on pressure / P+ Pressure max": recipe.spec["settling_pressure_max_percent"], "T3 - Measure time": recipe.spec["test_time"], "Q- Leak limit": recipe.spec["test_pressure_min_delta"], "PREL - Nominal test pressure": recipe.spec["test_pressure"], "Q+ Upper limit": recipe.spec["test_pressure_max_delta"], "FST - discharge time": recipe.spec["flush_time"], "FSL - discharge limit": recipe.spec["flush_pressure"], } self.log.debug(str(recipe)) for register, value in recipe.items(): self.write(register, value)