st-ten-1/src/components/tecna_marposs_provaset_t3.py

127 lines
6.4 KiB
Python
Raw Normal View History

2022-06-08 07:11:38 +00:00
from .modbus_component import ModbusComponent
from .tecna_marposs_provaset_t3_registers import registers
2022-06-01 16:37:19 +00:00
# from pymodbus.client.sync import ModbusSerialClient as ModbusClient
# import serial
# client = ModbusClient(method="rtu", port="COM3", stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, baudrate=115200, timeout=1, strict=False)
# client.connect()
# client.read_holding_registers(1, count=1)
2022-06-08 07:11:38 +00:00
class TecnaMarpossProvasetT3(ModbusComponent):
2022-06-01 16:37:19 +00:00
def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True):
2022-06-08 07:11:38 +00:00
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded, registers=registers)
2022-06-01 16:37:19 +00:00
def config_changed(self):
2022-06-08 07:11:38 +00:00
super().config_changed()
# self.set_measure_units()
self.units = self.get_measure_units()
2022-07-04 10:36:51 +00:00
self.log.info(f"units: {self.units}")
2022-06-08 07:11:38 +00:00
_pressure_units = {"mH2O": 0, "mbar": 1, "kPa": 2, "mmHg": 3, "inH2O": 4, "psi": 5, "mmH2O": 6, } # (se fondoscala <=6 bar)
_leak_units = {"mmH2O": 0, "mbar": 1, "Pa": 2, "mmHg": 3, "inH2O": 4, "psi": 5, }
_leak_flow_units = {"cm3/min": 0, "cm3/h": 1, }
_volume_units = {"litri": 0, "cm3": 1, }
_time_units = {"seconds": 0, }
_flow_units = {"liters/min": 0, "liters/h": 1, "m3/h": 2, }
_pressure_units_map = {v: k for k, v in _pressure_units.items()}
_leak_units_map = {v: k for k, v in _leak_units.items()}
_leak_flow_units_map = {v: k for k, v in _leak_flow_units.items()}
_volume_units_map = {v: k for k, v in _volume_units.items()}
_time_units_map = {v: k for k, v in _time_units.items()}
_flow_units_map = {v: k for k, v in _flow_units.items()}
def set_measure_units(self):
return [
2022-07-04 10:36:51 +00:00
self.write("MEASURE UNITS: pressure measure units", self._pressure_units["mbar"]), # red, ?purple?
self.write("MEASURE UNITS: Leak measure units", self._leak_units["mbar"]), # yellow
self.write("MEASURE UNITS: leak flow rate measure units", self._leak_flow_units["cm3/min"]), # blue
self.write("MEASURE UNITS: Volume", self._volume_units["litri"]), # green
self.write("MEASURE UNITS: Flow rate measure units", self._flow_units["liters/min"]), # orange
2022-06-08 07:11:38 +00:00
]
def get_measure_units(self):
units = {}
for [register, unit_map, unit_names] in [
["Relative pressure variable format - high resolution", self._pressure_units_map, ["pressure_hr", "red", "r", 21, ]], # also by documentation color and register number
["Relative pressure variable format - low resolution", self._pressure_units_map, ["pressure_lr", "purple", "p", 22, ]], # also by documentation color and register number
["Format of the variables related to the measurement of the differential leak pressure", self._leak_units_map, ["leak", "yellow", "y", 23, ]], # also by documentation color and register number
["Format of the variables related to the calculated leak flow", self._leak_flow_units_map, ["leak_flow", "blue", "b", 24, ]], # also by documentation color and register number
["Format of volume variables", self._volume_units_map, ["volume", "green", "g", 25, ]], # also by documentation color and register number
["Format of time variables", self._time_units_map, ["time", "tangerine", "t", 26, ]], # also by documentation color and register number
["Format of variables related to flow measurements", self._flow_units_map, ["flow", "orange", "o", 27, ]], # also by documentation color and register number
]:
v = self.read(register)
unit = [10**(-((v >> 8) & 0xff)), unit_map[v & 0xff]]
for unit_name in unit_names:
units[unit_name] = unit
return units
2022-07-04 10:36:51 +00:00
def _convert_from_format(self, data, formatting=None, decoding_map=None):
if decoding_map is not None:
data = decoding_map[data]
if formatting is not None:
# units = self.units[formatting]
# data = [data * units[0], units[1]]
data = data * self.units[formatting][0]
return data
2022-06-08 07:11:38 +00:00
2022-07-04 10:36:51 +00:00
def _convert_to_format(self, data, formatting=None, encoding_map=None):
if formatting is not None:
data = int(data / self.units[formatting][0])
if encoding_map is not None:
data = encoding_map[data]
return data
2022-06-08 07:11:38 +00:00
2022-07-04 10:36:51 +00:00
def read(self, register, *args, formatting=None, decoding_map=None, **kwargs):
2022-06-01 16:37:19 +00:00
if type(register) is str:
2022-06-08 07:11:38 +00:00
_, s = self.registers[register]
2022-07-04 10:36:51 +00:00
if formatting is None:
formatting = s.get("f", None)
if decoding_map is None:
decoding_map = s.get("decoding", None)
return self._convert_from_format(super().read(register, *args, **kwargs), formatting=formatting, decoding_map=decoding_map)
2022-06-01 16:37:19 +00:00
2022-07-04 10:36:51 +00:00
def write(self, register, data, *args, formatting=None, encoding_map=None, **kwargs):
2022-06-08 07:11:38 +00:00
if type(register) is str:
_, s = self.registers[register]
2022-07-04 10:36:51 +00:00
if formatting is None:
formatting = s.get("f", None)
if encoding_map is None:
encoding_map = s.get("encoding", None)
return super().write(register, self._convert_to_format(data, formatting=formatting, encoding_map=encoding_map), *args, **kwargs)
2022-06-01 16:37:19 +00:00
def _get(self):
2022-06-08 07:11:38 +00:00
# print("TECNA", str(int(QThread.currentThreadId())), flush=True)
2022-06-01 16:37:19 +00:00
# READ INFO
2022-06-08 07:11:38 +00:00
info = {r: self.read(r) for r in [
"Instrument status: active phase",
"Test circuit pressure, in real time",
"Measured leak, in real time",
"Regulated pressure, in real time",
"Active alarm flags",
"Running test: type of test",
"Testing in progress: progressive sequence index",
]}
2022-07-04 10:36:51 +00:00
if info["Running test: type of test"] is None:
info.update(self.get_test_results())
self.start_test()
2022-07-06 13:54:22 +00:00
self.log.debug(str(info))
2022-06-08 07:11:38 +00:00
super()._get([info])
2022-07-04 10:36:51 +00:00
def start_test(self):
self.log.info("starting test")
self.write("Start testing", 1)
def stop_test(self):
self.log.warining("stopping test")
self.write("Stop the test in progress", 0)
2022-06-08 07:11:38 +00:00
def get_test_results(self):
return {r: self.read(r) for r in [
"Running test: measured leak",
"Running test: calculated leak flow rate",
"Running test: calculate RVP%",
"Running test: result",
]}