st-ten-1/src/components/tecna_marposs_provaset_t3.py

150 lines
7.7 KiB
Python
Raw Normal View History

2022-06-08 07:11:38 +00:00
from .modbus_component import ModbusComponent
from .tecna_marposs_provaset_t3_registers import registers
2022-06-01 16:37:19 +00:00
# from pymodbus.client.sync import ModbusSerialClient as ModbusClient
# import serial
# client = ModbusClient(method="rtu", port="COM3", stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, baudrate=115200, timeout=1, strict=False)
# client.connect()
# client.read_holding_registers(1, count=1)
2022-06-08 07:11:38 +00:00
class TecnaMarpossProvasetT3(ModbusComponent):
2022-06-01 16:37:19 +00:00
def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True):
2022-06-08 07:11:38 +00:00
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded, registers=registers)
2022-06-01 16:37:19 +00:00
def config_changed(self):
2022-06-08 07:11:38 +00:00
super().config_changed()
# self.set_measure_units()
self.units = self.get_measure_units()
2022-07-04 10:36:51 +00:00
self.log.info(f"units: {self.units}")
2022-06-08 07:11:38 +00:00
_pressure_units = {"mH2O": 0, "mbar": 1, "kPa": 2, "mmHg": 3, "inH2O": 4, "psi": 5, "mmH2O": 6, } # (se fondoscala <=6 bar)
_leak_units = {"mmH2O": 0, "mbar": 1, "Pa": 2, "mmHg": 3, "inH2O": 4, "psi": 5, }
_leak_flow_units = {"cm3/min": 0, "cm3/h": 1, }
_volume_units = {"litri": 0, "cm3": 1, }
_time_units = {"seconds": 0, }
_flow_units = {"liters/min": 0, "liters/h": 1, "m3/h": 2, }
_pressure_units_map = {v: k for k, v in _pressure_units.items()}
_leak_units_map = {v: k for k, v in _leak_units.items()}
_leak_flow_units_map = {v: k for k, v in _leak_flow_units.items()}
_volume_units_map = {v: k for k, v in _volume_units.items()}
_time_units_map = {v: k for k, v in _time_units.items()}
_flow_units_map = {v: k for k, v in _flow_units.items()}
def set_measure_units(self):
return [
2022-07-04 10:36:51 +00:00
self.write("MEASURE UNITS: pressure measure units", self._pressure_units["mbar"]), # red, ?purple?
self.write("MEASURE UNITS: Leak measure units", self._leak_units["mbar"]), # yellow
self.write("MEASURE UNITS: leak flow rate measure units", self._leak_flow_units["cm3/min"]), # blue
self.write("MEASURE UNITS: Volume", self._volume_units["litri"]), # green
self.write("MEASURE UNITS: Flow rate measure units", self._flow_units["liters/min"]), # orange
2022-06-08 07:11:38 +00:00
]
def get_measure_units(self):
units = {}
for [register, unit_map, unit_names] in [
["Relative pressure variable format - high resolution", self._pressure_units_map, ["pressure_hr", "red", "r", 21, ]], # also by documentation color and register number
["Relative pressure variable format - low resolution", self._pressure_units_map, ["pressure_lr", "purple", "p", 22, ]], # also by documentation color and register number
["Format of the variables related to the measurement of the differential leak pressure", self._leak_units_map, ["leak", "yellow", "y", 23, ]], # also by documentation color and register number
["Format of the variables related to the calculated leak flow", self._leak_flow_units_map, ["leak_flow", "blue", "b", 24, ]], # also by documentation color and register number
["Format of volume variables", self._volume_units_map, ["volume", "green", "g", 25, ]], # also by documentation color and register number
["Format of time variables", self._time_units_map, ["time", "tangerine", "t", 26, ]], # also by documentation color and register number
["Format of variables related to flow measurements", self._flow_units_map, ["flow", "orange", "o", 27, ]], # also by documentation color and register number
]:
v = self.read(register)
unit = [10**(-((v >> 8) & 0xff)), unit_map[v & 0xff]]
for unit_name in unit_names:
units[unit_name] = unit
return units
2022-07-04 10:36:51 +00:00
def _convert_from_format(self, data, formatting=None, decoding_map=None):
if decoding_map is not None:
data = decoding_map[data]
if formatting is not None:
# units = self.units[formatting]
# data = [data * units[0], units[1]]
data = data * self.units[formatting][0]
return data
2022-06-08 07:11:38 +00:00
2022-07-04 10:36:51 +00:00
def _convert_to_format(self, data, formatting=None, encoding_map=None):
if formatting is not None:
data = int(data / self.units[formatting][0])
if encoding_map is not None:
data = encoding_map[data]
return data
2022-06-08 07:11:38 +00:00
2022-07-04 10:36:51 +00:00
def read(self, register, *args, formatting=None, decoding_map=None, **kwargs):
2022-06-01 16:37:19 +00:00
if type(register) is str:
2022-06-08 07:11:38 +00:00
_, s = self.registers[register]
2022-07-04 10:36:51 +00:00
if formatting is None:
formatting = s.get("f", None)
if decoding_map is None:
decoding_map = s.get("decoding", None)
return self._convert_from_format(super().read(register, *args, **kwargs), formatting=formatting, decoding_map=decoding_map)
2022-06-01 16:37:19 +00:00
2022-07-04 10:36:51 +00:00
def write(self, register, data, *args, formatting=None, encoding_map=None, **kwargs):
2022-06-08 07:11:38 +00:00
if type(register) is str:
_, s = self.registers[register]
2022-07-04 10:36:51 +00:00
if formatting is None:
formatting = s.get("f", None)
if encoding_map is None:
encoding_map = s.get("encoding", None)
return super().write(register, self._convert_to_format(data, formatting=formatting, encoding_map=encoding_map), *args, **kwargs)
2022-06-01 16:37:19 +00:00
def _get(self):
2022-06-08 07:11:38 +00:00
# print("TECNA", str(int(QThread.currentThreadId())), flush=True)
2022-06-01 16:37:19 +00:00
# READ INFO
2022-06-08 07:11:38 +00:00
info = {r: self.read(r) for r in [
"Instrument status: active phase",
"Test circuit pressure, in real time",
"Measured leak, in real time",
"Regulated pressure, in real time",
"Active alarm flags",
"Running test: type of test",
"Testing in progress: progressive sequence index",
]}
2022-07-04 10:36:51 +00:00
if info["Running test: type of test"] is None:
info.update(self.get_test_results())
2022-07-06 13:54:22 +00:00
self.log.debug(str(info))
2022-06-08 07:11:38 +00:00
super()._get([info])
2022-07-12 08:48:04 +00:00
def start_test(self, table=1):
self.log.info(f"starting test table {table!r}")
self.write("Start testing", table)
2022-07-04 10:36:51 +00:00
def stop_test(self):
2022-07-12 08:48:04 +00:00
self.log.warning("stopping test")
2022-07-04 10:36:51 +00:00
self.write("Stop the test in progress", 0)
2022-06-08 07:11:38 +00:00
def get_test_results(self):
2022-07-12 08:48:04 +00:00
self.log.info("getting test results")
2022-06-08 07:11:38 +00:00
return {r: self.read(r) for r in [
"Running test: measured leak",
"Running test: calculated leak flow rate",
"Running test: calculate RVP%",
"Running test: result",
]}
2022-07-12 08:48:04 +00:00
def write_recipe(self, recipe, table=1):
# "pressure_ramp": self.pressure_ramp_sb,
# "stabilization_cycles": self.stabilization_cycles_sb,
recipe = {
"Type of test": "LEAK TEST",
# "Test flags": recipe.spec[""],
"T0 - Pre-filling time": recipe.spec["pre_filling_time"],
"P0 - Pre-filling pressure": recipe.spec["pre_filling_pressure"],
"T1 - Filling time": recipe.spec["filling_time"],
"T2 - Settling time": recipe.spec["settling_time"],
"PR%- Lower tolerance on pressure / P- Minimum pressure": recipe.spec["settling_pressure_min_percent"],
"PR%+ Superior tolerance on pressure / P+ Pressure max": recipe.spec["settling_pressure_max_percent"],
"T3 - Measure time": recipe.spec["test_time"],
"Q- Leak limit": recipe.spec["test_pressure_min_delta"],
"PREL - Nominal test pressure": recipe.spec["test_pressure"],
"Q+ Upper limit": recipe.spec["test_pressure_max_delta"],
"FST - discharge time": recipe.spec["flush_time"],
"FSL - discharge limit": recipe.spec["flush_pressure"],
}
self.log.debug(str(recipe))
for register, value in recipe.items():
self.write(register, value)