from .modbus_component import ModbusComponent from .tecna_marposs_provaset_t3p_registers import registers # from pymodbus.client.sync import ModbusSerialClient as ModbusClient # import serial # client = ModbusClient(method="rtu", port="COM3", stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, baudrate=115200, timeout=1, strict=False) # client.connect() # client.read_holding_registers(1, count=1) class TecnaMarpossProvasetT3P(ModbusComponent): def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True): super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded, registers=registers) # pin_registers = { # "admin_pin": "PASSWORD: Administration", # was 1909 # "modify_pin": "PASSWORD: Modify program", # was 0 # "select_pin": "PASSWORD: Select program", # was 0 # } def config_changed(self): super().config_changed() # self.pins = { # "admin_pin": self.config.get("admin_pin", None), # "modify_pin": self.config.get("modify_pin", None), # "select_pin": self.config.get("select_pin", None), # } # self.unlock_tecna() self.set_measure_units() self.units = self.get_measure_units() self.log.info(f"units: {self.units}") # def unlock_tecna(self, **kwargs): # pins = self.pins.copy() # pins.update(kwargs) # for pin_name, register in self.pin_registers.items(): # pin = pins[pin_name] # if pin is not None: # self.write(register, pin) _pressure_units = {"mH2O": 0, "mbar": 1, "kPa": 2, "mmHg": 3, "inH2O": 4, "psi": 5, "mmH2O": 6, } # (se fondoscala <=6 bar) _leak_units = {"mmH2O": 0, "mbar": 1, "Pa": 2, "mmHg": 3, "inH2O": 4, "psi": 5, } _leak_flow_units = {"cm3/min": 0, "cm3/h": 1, } _volume_units = {"litri": 0, "cm3": 1, } _time_units = {"seconds": 0, } _flow_units = {"liters/min": 0, "liters/h": 1, "m3/h": 2, } _pressure_units_map = {v: k for k, v in _pressure_units.items()} _leak_units_map = {v: k for k, v in _leak_units.items()} _leak_flow_units_map = {v: k for k, v in _leak_flow_units.items()} _volume_units_map = {v: k for k, v in _volume_units.items()} _time_units_map = {v: k for k, v in _time_units.items()} _flow_units_map = {v: k for k, v in _flow_units.items()} def set_measure_units(self): for register, [unit, decimals] in { "MEASURE UNITS: Relative pressure": [self._pressure_units["mbar"], 0], # red, purple "MEASURE UNITS: Differential (leak) pressure": [self._leak_units["mbar"], 0], # yellow "MEASURE UNITS: Calculated leak flow rate": [self._leak_flow_units["cm3/min"], 0], # blue "MEASURE UNITS: Volume": [self._volume_units["litri"], 0], # green "MEASURE UNITS: Flow rate": [self._flow_units["liters/min"], 0], # orange }.items(): self.write(register, (decimals << 8) + unit) def get_measure_units(self): units = {} for [register, unit_map, unit_names] in [ ["Running test: relative pressure format", self._pressure_units_map, ["relative_pressure", "red", "r", 21, ]], # also by documentation color and register number ["Running test: differential pressure format", self._pressure_units_map, ["differential_pressure", "purple", "p", 22, ]], # also by documentation color and register number ["Running test: relative pressure format (low resolution)", self._leak_units_map, ["relative_pressure_lr", "yellow", "y", 23, ]], # also by documentation color and register number ["Running test: calculated leak flow rate format", self._leak_flow_units_map, ["leak_flow", "blue", "b", 24, ]], # also by documentation color and register number ["Running test: volume format", self._volume_units_map, ["volume", "green", "g", 25, ]], # also by documentation color and register number ["Running test: time format", self._time_units_map, ["time", "orange", "t", 26, ]], # also by documentation color and register number ["Running test: flow rate format", self._flow_units_map, ["flow", "white", "o", 27, ]], # also by documentation color and register number ]: v = self.read(register) unit_spec = [10**(-((v >> 8) & 0xff)), unit_map[v & 0xff]] for unit_name in unit_names: units[unit_name] = unit_spec return units def _convert_from_format(self, data, formatting=None, decoding_map=None): if decoding_map is not None and data in decoding_map: data = decoding_map[data] if formatting is not None: # units = self.units[formatting] # data = [data * units[0], units[1]] data = data * self.units[formatting][0] return data def _convert_to_format(self, data, formatting=None, encoding_map=None): if formatting is not None: data = int(data / self.units[formatting][0]) if encoding_map is not None and data in encoding_map: data = encoding_map[data] return data def read(self, register, *args, formatting=None, decoding_map=None, **kwargs): if type(register) is str: _, s = self.registers[register] if formatting is None: formatting = s.get("f", None) if decoding_map is None: decoding_map = s.get("decoding", None) return self._convert_from_format(super().read(register, *args, **kwargs), formatting=formatting, decoding_map=decoding_map) def write(self, register, data, *args, formatting=None, encoding_map=None, **kwargs): if type(register) is str: _, s = self.registers[register] if formatting is None: formatting = s.get("f", None) if encoding_map is None: encoding_map = s.get("encoding", None) return super().write(register, self._convert_to_format(data, formatting=formatting, encoding_map=encoding_map), *args, **kwargs) def _get(self): # print("TECNA", str(int(QThread.currentThreadId())), flush=True) # READ INFO info = {r: self.read(r) for r in [ "Real time test pressure output", "Real time differential pressure output", "Real time pressure line regulator", "Active alarm flags", "Active test program number", "Running test: active phase", "Running test: test type", "Running test: sequence index", ]} if info["Running test: active phase"] == "RESULT PRESENT": info.update(self.get_test_results()) elif info["Running test: active phase"] == "WAITING START": pass self.log.debug(str(info)) super()._get([info]) def start_test(self, table=100): self.log.info(f"starting test table {table!r}") self.write("Source of test program number selection", "FROM PARAMETER (SET BY LCD OR SERIAL LINE)") self.write("Selected program", table) self.write("Start test", table) def stop_test(self): self.log.warning("stopping test") self.write("Reset running test", 0) def get_test_results(self): self.log.info("getting test results") return {r: self.read(r) for r in [ "Running test: phase backwards time", "Running test: filling pressure", "Running test: pressure at the end of settling", "Running test: burst pressure", "Running test: measured leak", "Running test: calculated leak flow rate", "Running test: calculate RVP%", "Running test: result", ]} def write_recipe(self, recipe, table=100): # "pressure_ramp": self.pressure_ramp_sb, # "stabilization_cycles": self.stabilization_cycles_sb, recipe_name = recipe.spec["part_number"][:16].encode("ascii") recipe_name += b"\x00" * (16 - len(recipe_name)) recipe = { "Flag: Instrument settings": 0b0000000000000000, "Test program for read/write operation": table, **{719 - 1 + i: (recipe_name[i * 2 + 1] << 8) + recipe_name[i * 2] for i in range(8)}, "Test type": "Leak Test", "Test flags": 0b0110000001011100, "T0 - Pre-filling time": recipe.spec["pre_filling_time"], "P0 - Pre-filling pressure": recipe.spec["pre_filling_pressure"], "T1 - Filling time": recipe.spec["filling_time"], "T2 - Settling time": recipe.spec["settling_time"], "PR- - Min pressure tolerance %": recipe.spec["settling_pressure_min_percent"], "PR+ - Max pressure tolerance % (P+)": recipe.spec["settling_pressure_max_percent"], "T3 - Measure time": recipe.spec["test_time"], "Q- Lower test leak limit": recipe.spec["test_pressure_min_delta"], "PREL - Nominal test pressure": recipe.spec["test_pressure"], "Q+ Upper test leak limit": recipe.spec["test_pressure_max_delta"], "FST - Discharge time": recipe.spec["flush_time"], "FSL - Discharge limit": recipe.spec["flush_pressure"], } self.log.debug(str(recipe)) for register, value in recipe.items(): self.write(register, value)