from .modbus_component import ModbusComponent from .tecna_marposs_provaset_t3_registers import registers # from pymodbus.client.sync import ModbusSerialClient as ModbusClient # import serial # client = ModbusClient(method="rtu", port="COM3", stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, baudrate=115200, timeout=1, strict=False) # client.connect() # client.read_holding_registers(1, count=1) class TecnaMarpossProvasetT3(ModbusComponent): def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True): super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded, registers=registers) def config_changed(self): super().config_changed() # self.set_measure_units() self.units = self.get_measure_units() _pressure_units = {"mH2O": 0, "mbar": 1, "kPa": 2, "mmHg": 3, "inH2O": 4, "psi": 5, "mmH2O": 6, } # (se fondoscala <=6 bar) _leak_units = {"mmH2O": 0, "mbar": 1, "Pa": 2, "mmHg": 3, "inH2O": 4, "psi": 5, } _leak_flow_units = {"cm3/min": 0, "cm3/h": 1, } _volume_units = {"litri": 0, "cm3": 1, } _time_units = {"seconds": 0, } _flow_units = {"liters/min": 0, "liters/h": 1, "m3/h": 2, } _pressure_units_map = {v: k for k, v in _pressure_units.items()} _leak_units_map = {v: k for k, v in _leak_units.items()} _leak_flow_units_map = {v: k for k, v in _leak_flow_units.items()} _volume_units_map = {v: k for k, v in _volume_units.items()} _time_units_map = {v: k for k, v in _time_units.items()} _flow_units_map = {v: k for k, v in _flow_units.items()} def set_measure_units(self): return [ self.write("MEASURE UNITS: pressure measure units", self._pressure_units["mbar"]), self.write("MEASURE UNITS: Leak measure units", self._leak_units["mbar"]), self.write("MEASURE UNITS: leak flow rate measure units", self._leak_flow_units["cm3/min"]), self.write("MEASURE UNITS: Volume", self._volume_units["litri"]), self.write("MEASURE UNITS: Flow rate measure units", self._flow_units["liters/min"]), ] def get_measure_units(self): units = {} for [register, unit_map, unit_names] in [ ["Relative pressure variable format - high resolution", self._pressure_units_map, ["pressure_hr", "red", "r", 21, ]], # also by documentation color and register number ["Relative pressure variable format - low resolution", self._pressure_units_map, ["pressure_lr", "purple", "p", 22, ]], # also by documentation color and register number ["Format of the variables related to the measurement of the differential leak pressure", self._leak_units_map, ["leak", "yellow", "y", 23, ]], # also by documentation color and register number ["Format of the variables related to the calculated leak flow", self._leak_flow_units_map, ["leak_flow", "blue", "b", 24, ]], # also by documentation color and register number ["Format of volume variables", self._volume_units_map, ["volume", "green", "g", 25, ]], # also by documentation color and register number ["Format of time variables", self._time_units_map, ["time", "tangerine", "t", 26, ]], # also by documentation color and register number ["Format of variables related to flow measurements", self._flow_units_map, ["flow", "orange", "o", 27, ]], # also by documentation color and register number ]: v = self.read(register) unit = [10**(-((v >> 8) & 0xff)), unit_map[v & 0xff]] for unit_name in unit_names: units[unit_name] = unit return units def _convert_from_format(self, data, format=None): if format is None: return data # units = self.units[format] # return [data * units[0], units[1]] return data * self.units[format][0] def _convert_to_format(self, data, format=None): if format is None: return data return int(data / self.units[format][0]) def read(self, register, *args, format=None, **kwargs): if type(register) is str: _, s = self.registers[register] if format is None: format = s.get("f") return self._convert_from_format(super().read(register, *args, **kwargs), format=format) def write(self, register, data, *args, format=None, **kwargs): if type(register) is str: _, s = self.registers[register] if format is None: format = s.get("f") return super().write(register, self._convert_to_format(data, format=format), *args, **kwargs) def _get(self): # print("TECNA", str(int(QThread.currentThreadId())), flush=True) # READ INFO if self.units is None: self.set_measure_units() self.units = self.get_measure_units() info = {r: self.read(r) for r in [ "Instrument status: active phase", "Test circuit pressure, in real time", "Measured leak, in real time", "Regulated pressure, in real time", "Active alarm flags", "Running test: type of test", "Testing in progress: progressive sequence index", ]} super()._get([info]) def get_test_results(self): return {r: self.read(r) for r in [ "Running test: measured leak", "Running test: calculated leak flow rate", "Running test: calculate RVP%", "Running test: result", ]}