183 lines
9.5 KiB
Python
183 lines
9.5 KiB
Python
from .modbus_component import ModbusComponent
|
|
from .tecna_marposs_provaset_t3_registers import registers
|
|
|
|
# from pymodbus.client.sync import ModbusSerialClient as ModbusClient
|
|
# import serial
|
|
# client = ModbusClient(method="rtu", port="COM3", stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, baudrate=115200, timeout=1, strict=False)
|
|
# client.connect()
|
|
# client.read_holding_registers(1, count=1)
|
|
|
|
|
|
class TecnaMarpossProvasetT3(ModbusComponent):
|
|
def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True):
|
|
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded, registers=registers)
|
|
|
|
pin_registers = {
|
|
"admin_pin": "PASSWORD: Administrator", # was 1909
|
|
"modify_pin": "PASSWORD: Modify program", # was 0
|
|
"select_pin": "PASSWORD: Select program", # was 0
|
|
}
|
|
|
|
def config_changed(self):
|
|
super().config_changed()
|
|
self.pins = {
|
|
"admin_pin": self.config.get("admin_pin", None),
|
|
"modify_pin": self.config.get("modify_pin", None),
|
|
"select_pin": self.config.get("select_pin", None),
|
|
}
|
|
self.unlock_tecna()
|
|
self.set_measure_units()
|
|
self.units = self.get_measure_units()
|
|
self.log.info(f"units: {self.units}")
|
|
|
|
def unlock_tecna(self, **kwargs):
|
|
pins = self.pins.copy()
|
|
pins.update(kwargs)
|
|
for pin_name, register in self.pin_registers.items():
|
|
pin = pins[pin_name]
|
|
if pin is not None:
|
|
self.write(register, pin)
|
|
|
|
_pressure_units = {"mH2O": 0, "mbar": 1, "kPa": 2, "mmHg": 3, "inH2O": 4, "psi": 5, "mmH2O": 6, } # (se fondoscala <=6 bar)
|
|
_leak_units = {"mmH2O": 0, "mbar": 1, "Pa": 2, "mmHg": 3, "inH2O": 4, "psi": 5, }
|
|
_leak_flow_units = {"cm3/min": 0, "cm3/h": 1, }
|
|
_volume_units = {"litri": 0, "cm3": 1, }
|
|
_time_units = {"seconds": 0, }
|
|
_flow_units = {"liters/min": 0, "liters/h": 1, "m3/h": 2, }
|
|
_pressure_units_map = {v: k for k, v in _pressure_units.items()}
|
|
_leak_units_map = {v: k for k, v in _leak_units.items()}
|
|
_leak_flow_units_map = {v: k for k, v in _leak_flow_units.items()}
|
|
_volume_units_map = {v: k for k, v in _volume_units.items()}
|
|
_time_units_map = {v: k for k, v in _time_units.items()}
|
|
_flow_units_map = {v: k for k, v in _flow_units.items()}
|
|
|
|
def set_measure_units(self):
|
|
for register, [unit, decimals] in {
|
|
"MEASURE UNITS: pressure measure units": [self._pressure_units["mbar"], 0], # red, ?purple?
|
|
"MEASURE UNITS: Leak measure units": [self._leak_units["mbar"], 0], # yellow
|
|
"MEASURE UNITS: leak flow rate measure units": [self._leak_flow_units["cm3/min"], 0], # blue
|
|
"MEASURE UNITS: Volume": [self._volume_units["litri"], 0], # green
|
|
"MEASURE UNITS: Flow rate measure units": [self._flow_units["liters/min"], 0], # orange
|
|
}.items():
|
|
print(self.write(register, (unit << 8) + decimals))
|
|
|
|
def get_measure_units(self):
|
|
units = {}
|
|
for [register, unit_map, unit_names] in [
|
|
["Relative pressure variable format - high resolution", self._pressure_units_map, ["pressure_hr", "red", "r", 21, ]], # also by documentation color and register number
|
|
["Relative pressure variable format - low resolution", self._pressure_units_map, ["pressure_lr", "purple", "p", 22, ]], # also by documentation color and register number
|
|
["Format of the variables related to the measurement of the differential leak pressure", self._leak_units_map, ["leak", "yellow", "y", 23, ]], # also by documentation color and register number
|
|
["Format of the variables related to the calculated leak flow", self._leak_flow_units_map, ["leak_flow", "blue", "b", 24, ]], # also by documentation color and register number
|
|
["Format of volume variables", self._volume_units_map, ["volume", "green", "g", 25, ]], # also by documentation color and register number
|
|
["Format of time variables", self._time_units_map, ["time", "tangerine", "t", 26, ]], # also by documentation color and register number
|
|
["Format of variables related to flow measurements", self._flow_units_map, ["flow", "orange", "o", 27, ]], # also by documentation color and register number
|
|
["MEASURE UNITS: pressure measure units": self._pressure_units, ["pressur"]],
|
|
["MEASURE UNITS: Leak measure units": self._leak_units, ["lea"]],
|
|
["MEASURE UNITS: leak flow rate measure units": self._leak_flow_units, ["leaflo"]],
|
|
["MEASURE UNITS: Volume": self._volume_units, ["volum"]],
|
|
["MEASURE UNITS: Flow rate measure units": self._flow_units, ["flo"]],
|
|
]:
|
|
v = self.read(register)
|
|
unit = [10**(-((v >> 8) & 0xff)), unit_map[v & 0xff]]
|
|
for unit_name in unit_names:
|
|
units[unit_name] = unit
|
|
return units
|
|
|
|
def _convert_from_format(self, data, formatting=None, decoding_map=None):
|
|
if decoding_map is not None and data in decoding_map:
|
|
data = decoding_map[data]
|
|
if formatting is not None:
|
|
# units = self.units[formatting]
|
|
# data = [data * units[0], units[1]]
|
|
data = data * self.units[formatting][0]
|
|
return data
|
|
|
|
def _convert_to_format(self, data, formatting=None, encoding_map=None):
|
|
if formatting is not None:
|
|
data = int(data / self.units[formatting][0])
|
|
if encoding_map is not None and data in encoding_map:
|
|
data = encoding_map[data]
|
|
return data
|
|
|
|
def read(self, register, *args, formatting=None, decoding_map=None, **kwargs):
|
|
if type(register) is str:
|
|
_, s = self.registers[register]
|
|
if formatting is None:
|
|
formatting = s.get("f", None)
|
|
if decoding_map is None:
|
|
decoding_map = s.get("decoding", None)
|
|
return self._convert_from_format(super().read(register, *args, **kwargs), formatting=formatting, decoding_map=decoding_map)
|
|
|
|
def write(self, register, data, *args, formatting=None, encoding_map=None, **kwargs):
|
|
if type(register) is str:
|
|
_, s = self.registers[register]
|
|
if formatting is None:
|
|
formatting = s.get("f", None)
|
|
if encoding_map is None:
|
|
encoding_map = s.get("encoding", None)
|
|
return super().write(register, self._convert_to_format(data, formatting=formatting, encoding_map=encoding_map), *args, **kwargs)
|
|
|
|
def _get(self):
|
|
# print("TECNA", str(int(QThread.currentThreadId())), flush=True)
|
|
# READ INFO
|
|
info = {r: self.read(r) for r in [
|
|
"Instrument status: table of parameters in use",
|
|
"Instrument status: active phase",
|
|
"Test circuit pressure, in real time",
|
|
"Measured leak, in real time",
|
|
"Regulated pressure, in real time",
|
|
"Active alarm flags",
|
|
"Running test: type of test",
|
|
"Testing in progress: progressive sequence index",
|
|
]}
|
|
if info["Running test: type of test"] is None:
|
|
info.update(self.get_test_results())
|
|
self.log.debug(str(info))
|
|
super()._get([info])
|
|
|
|
def start_test(self, table=100):
|
|
self.log.info(f"starting test table {table!r}")
|
|
self.write("INSTRUMENT SETTING: Selected table", table)
|
|
self.write("Start testing", table)
|
|
|
|
def stop_test(self):
|
|
self.log.warning("stopping test")
|
|
self.write("Stop the test in progress", 0)
|
|
|
|
def get_test_results(self):
|
|
self.log.info("getting test results")
|
|
return {r: self.read(r) for r in [
|
|
"Running test: measured leak",
|
|
"Running test: calculated leak flow rate",
|
|
"Running test: calculate RVP%",
|
|
"Running test: result",
|
|
]}
|
|
|
|
def write_recipe(self, recipe, table=100):
|
|
# "pressure_ramp": self.pressure_ramp_sb,
|
|
# "stabilization_cycles": self.stabilization_cycles_sb,
|
|
recipe_name = recipe.spec["part_number"][:16].encode("ascii")
|
|
recipe_name += b"\x00" * (16 - len(recipe_name))
|
|
recipe = {
|
|
"INSTRUMENT SETTING: Various flags": 0b0000000000000000,
|
|
**{719 + i: (recipe_name[i * 2 + 1] << 8) + recipe_name[i * 2] for i in range(8)},
|
|
"Selection number of test table on which you intend to work": table,
|
|
"Type of test": "LEAK TEST",
|
|
"Test flags": 0b0000000001010000,
|
|
"T0 - Pre-filling time": recipe.spec["pre_filling_time"],
|
|
"P0 - Pre-filling pressure": recipe.spec["pre_filling_pressure"],
|
|
"T1 - Filling time": recipe.spec["filling_time"],
|
|
"T2 - Settling time": recipe.spec["settling_time"],
|
|
"PR%- Lower tolerance on pressure / P- Minimum pressure": recipe.spec["settling_pressure_min_percent"],
|
|
"PR%+ Superior tolerance on pressure / P+ Pressure max": recipe.spec["settling_pressure_max_percent"],
|
|
"T3 - Measure time": recipe.spec["test_time"],
|
|
"Q- Leak limit": recipe.spec["test_pressure_min_delta"],
|
|
"PREL - Nominal test pressure": recipe.spec["test_pressure"],
|
|
"Q+ Upper limit": recipe.spec["test_pressure_max_delta"],
|
|
"FST - discharge time": recipe.spec["flush_time"],
|
|
"FSL - discharge limit": recipe.spec["flush_pressure"],
|
|
}
|
|
self.log.debug(str(recipe))
|
|
for register, value in recipe.items():
|
|
self.write(register, value)
|