651 lines
33 KiB
Python
651 lines
33 KiB
Python
import time
|
|
|
|
from lib.db import Recipes, db
|
|
from PyQt5.QtCore import QSemaphore, pyqtSignal, pyqtSlot
|
|
from PyQt5.QtWidgets import QMessageBox
|
|
|
|
from .component import Component
|
|
from .modbus_component import ModbusComponent
|
|
from .tecna_marposs_provaset_t3l_registers import registers as t3l_registers
|
|
from .tecna_marposs_provaset_t3p_registers import registers as t3p_registers
|
|
|
|
|
|
class TecnaMarpossProvasetT3(ModbusComponent):
|
|
_store_recipes_signal = pyqtSignal(object)
|
|
_store_recipes_lock = QSemaphore(0)
|
|
tecna_error_signal = pyqtSignal(bool, str) # Emits (True, error_message) if error exists, else (False, "")
|
|
|
|
def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True):
|
|
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded, registers=None)
|
|
# Connect to the modbus_error_signal to handle connection issues
|
|
self.modbus_error_signal.connect(self.handle_modbus_error)
|
|
self.connection_lost = False
|
|
self._previous_connection_lost = False
|
|
|
|
def config_changed(self):
|
|
super().config_changed()
|
|
self._store_recipes_signal.connect(self._store_recipes)
|
|
self.model = self.config[self.name]["model"].lower()
|
|
if self.model == "t3p":
|
|
self.registers = t3p_registers
|
|
elif self.model == "t3l":
|
|
self.registers = t3l_registers
|
|
else:
|
|
raise NotImplementedError(f"tecna t3 model {self.model!r} not implemented.")
|
|
self.set_measure_units()
|
|
try:
|
|
self.units = self.get_measure_units()
|
|
self.max_program_number = self.read("Max number of programs")
|
|
self.saver_label_count = min(abs(int(self.config[self.name].get("saver_label_count", 1))), 0b1111)
|
|
self.saver_print_on_fail = 1 if self.config[self.name].get("saver_print_on_fail", "no").lower() in {"yes", "y", "on", "true", "1", "x"} else 0
|
|
self.saver_label_template = min(abs(int(self.config[self.name].get("saver_label_template", 1))), 0b11111111)
|
|
self.model = self.config[self.name]["model"].lower()
|
|
self.log.info(f"units: {self.units}")
|
|
except Exception as e:
|
|
error_message = f"Error during config_changed operation: {str(e)}"
|
|
# Mark connection as lost if it's a connection error
|
|
if "Connection error" in str(e) or "Cannot connect" in str(e):
|
|
self.connection_lost = True
|
|
self.log.warning("Connection to Tecna Marposs lost during config_changed operation, will attempt to reconnect on next periodic call")
|
|
# Emit a signal to notify the UI of the connection loss
|
|
self.tecna_error_signal.emit(True, error_message)
|
|
|
|
_pressure_units = {"mH2O": 0, "mbar": 1, "kPa": 2, "mmHg": 3, "inH2O": 4, "psi": 5, "mmH2O": 6, } # (se fondoscala <=6 bar)
|
|
_leak_units = {"mmH2O": 0, "mbar": 1, "Pa": 2, "mmHg": 3, "inH2O": 4, "psi": 5, }
|
|
_leak_flow_units = {"cm3/min": 0, "cm3/h": 1, }
|
|
_volume_units = {"litri": 0, "cm3": 1, }
|
|
_time_units = {"seconds": 0, }
|
|
_flow_units = {"liters/min": 0, "liters/h": 1, "m3/h": 2, "cfm": 3}
|
|
_pressure_units_map = {v: k for k, v in _pressure_units.items()}
|
|
_leak_units_map = {v: k for k, v in _leak_units.items()}
|
|
_leak_flow_units_map = {v: k for k, v in _leak_flow_units.items()}
|
|
_volume_units_map = {v: k for k, v in _volume_units.items()}
|
|
_time_units_map = {v: k for k, v in _time_units.items()}
|
|
_flow_units_map = {v: k for k, v in _flow_units.items()}
|
|
|
|
def set_measure_units(self):
|
|
if self.model == "t3p":
|
|
for register, [unit, decimals] in {
|
|
"MEASURE UNITS: Relative pressure": [self._pressure_units["mbar"], 0], # red, purple
|
|
"MEASURE UNITS: Differential (leak) pressure": [self._leak_units["mbar"], 0], # yellow
|
|
"MEASURE UNITS: Calculated leak flow rate": [self._leak_flow_units["cm3/min"], 0], # blue
|
|
"MEASURE UNITS: Volume": [self._volume_units["litri"], 0], # green
|
|
"MEASURE UNITS: Flow rate": [self._flow_units["liters/min"], 0], # orange
|
|
}.items():
|
|
self.write(register, (decimals << 8) + unit)
|
|
elif self.model == "t3l":
|
|
for register, [unit, decimals] in {
|
|
"MEASURE UNITS: Relative pressure": [self._pressure_units["mbar"], 0], # red, purple
|
|
"MEASURE UNITS: Differential (leak) pressure": [self._leak_units["mbar"], 0], # yellow
|
|
"MEASURE UNITS: Calculated leak flow rate": [self._leak_flow_units["cm3/min"], 0], # blue
|
|
"MEASURE UNITS: Volume": [self._volume_units["litri"], 0], # green
|
|
"MEASURE UNITS: Flow rate": [self._flow_units["liters/min"], 0], # orange
|
|
}.items():
|
|
self.write(register, unit) # (decimals << 8) + unit)
|
|
else:
|
|
raise NotImplementedError(f"tecna t3 model {self.model!r} not implemented.")
|
|
|
|
def get_measure_units(self):
|
|
units = {}
|
|
if self.model == "t3p":
|
|
for [register, unit_map, unit_names] in [
|
|
["Running test: relative pressure format", self._pressure_units_map, ["relative_pressure", "red", "r", 21, ]], # also by documentation color and register number
|
|
["Running test: differential pressure format", self._pressure_units_map, ["differential_pressure", "purple", "p", 22, ]], # also by documentation color and register number
|
|
["Running test: relative pressure format (low resolution)", self._leak_units_map, ["relative_pressure_lr", "yellow", "y", 23, ]], # also by documentation color and register number
|
|
["Running test: calculated leak flow rate format", self._leak_flow_units_map, ["leak_flow", "blue", "b", 24, ]], # also by documentation color and register number
|
|
["Running test: volume format", self._volume_units_map, ["volume", "green", "g", 25, ]], # also by documentation color and register number
|
|
["Running test: time format", self._time_units_map, ["time", "orange", "t", 26, ]], # also by documentation color and register number
|
|
["Running test: flow rate format", self._flow_units_map, ["flow", "white", "o", 27, ]], # also by documentation color and register number
|
|
]:
|
|
v = self.read(register)
|
|
unit_spec = [10**(-((v >> 8) & 0xff)), unit_map[v & 0xff]]
|
|
for unit_name in unit_names:
|
|
units[unit_name] = unit_spec
|
|
elif self.model == "t3l":
|
|
for [register, unit_map, unit_names] in [
|
|
[["Running test: relative pressure scale", "Running test: relative pressure decimals"], self._pressure_units_map, ["relative_pressure", "red", "r", 1501, ]], # also by documentation color and register number
|
|
[["Running test: differential pressure scale", "Running test: differential pressure decimals"], self._pressure_units_map, ["differential_pressure", "purple", "p", 1503, ]], # also by documentation color and register number
|
|
[["Running test: relative pressure scale (low resolution)", "Running test: relative pressure decimals (low resolution)"], self._leak_units_map, ["relative_pressure_lr", "yellow", "y", 1505, ]], # also by documentation color and register number
|
|
["Running test: calculated leak flow rate format", self._leak_flow_units_map, ["leak_flow", "blue", "b", 1507, ]], # also by documentation color and register number
|
|
["Running test: volume format", self._volume_units_map, ["volume", "green", "g", 1508, ]], # also by documentation color and register number
|
|
["Running test: time format", self._time_units_map, ["time", "orange", "t", 1509, ]], # also by documentation color and register number
|
|
["Running test: flow rate format", self._flow_units_map, ["flow", "white", "o", 1510, ]], # also by documentation color and register number
|
|
["Running test: line pressure format", self._pressure_units_map, ["line_pressure", "lp", "l", 1511, ]], # also by documentation color and register number
|
|
]:
|
|
if type(register) is list:
|
|
v = [self.read(r) for r in register]
|
|
unit_spec = [10**(-(v[1] & 0xff)), v[0]]
|
|
else:
|
|
v = self.read(register)
|
|
unit_spec = [10**(-((v >> 8) & 0xff)), unit_map[v & 0xff]]
|
|
for unit_name in unit_names:
|
|
units[unit_name] = unit_spec
|
|
else:
|
|
raise NotImplementedError(f"tecna t3 model {self.model!r} not implemented.")
|
|
return units
|
|
|
|
def _convert_from_format(self, data, formatting=None, decoding_map=None):
|
|
if decoding_map is not None and data in decoding_map:
|
|
data = decoding_map[data]
|
|
if formatting is not None:
|
|
# units = self.units[formatting]
|
|
# data = [data * units[0], units[1]]
|
|
if data is not None:
|
|
data = data * self.units[formatting][0]
|
|
else:
|
|
return None
|
|
return data
|
|
|
|
def _convert_to_format(self, data, formatting=None, encoding_map=None):
|
|
if formatting is not None:
|
|
data = int(data / self.units[formatting][0])
|
|
if encoding_map is not None and data in encoding_map:
|
|
data = encoding_map[data]
|
|
return data
|
|
|
|
@Component.reconfig_on_error
|
|
def read(self, register, *args, data_type=None, gain=None, offset=None, formatting=None, decoding_map=None,
|
|
**kwargs):
|
|
try:
|
|
if type(register) is str:
|
|
register, s = self.registers[register]
|
|
if data_type is None:
|
|
data_type = s.get("dt", None)
|
|
if gain is None:
|
|
gain = s.get("g", None)
|
|
if offset is None:
|
|
offset = s.get("o", None)
|
|
if formatting is None:
|
|
formatting = s.get("f", None)
|
|
if decoding_map is None:
|
|
decoding_map = s.get("decoding", None)
|
|
if not len(args):
|
|
args = s.get("a", [])
|
|
if not len(kwargs):
|
|
kwargs = s.get("k", {})
|
|
if data_type is None:
|
|
data_type = "16bit_uint"
|
|
if gain is None:
|
|
gain = 1
|
|
if offset is None:
|
|
offset = 0
|
|
return self._convert_from_format(
|
|
super().read(
|
|
register,
|
|
*args,
|
|
data_type=data_type,
|
|
gain=gain,
|
|
offset=offset,
|
|
**kwargs,
|
|
),
|
|
formatting=formatting,
|
|
decoding_map=decoding_map,
|
|
)
|
|
|
|
except Exception as e:
|
|
error_message = f"Error during read operation on register {register}: {str(e)}"
|
|
self.log.error(error_message, exc_info=True)
|
|
# Do not use tecna_error_signal as per issue description
|
|
# self.tecna_error_signal.emit(True, error_message)
|
|
# Mark connection as lost if it's a connection error
|
|
if "Connection error" in str(e) or "Cannot connect" in str(e):
|
|
self.connection_lost = True
|
|
self.log.warning("Connection to Tecna Marposs lost during read operation, will attempt to reconnect on next periodic call")
|
|
raise # Re-raise the exception for further upstream handling if needed
|
|
|
|
# @Component.reconfig_on_error
|
|
@Component.reconfig_on_error
|
|
def write(self, register, data, *args, data_type=None, gain=None, offset=None, formatting=None, encoding_map=None,
|
|
**kwargs):
|
|
try:
|
|
if type(register) is str:
|
|
register, s = self.registers[register]
|
|
if data_type is None:
|
|
data_type = s.get("dt", None)
|
|
if gain is None:
|
|
gain = s.get("g", None)
|
|
if offset is None:
|
|
offset = s.get("o", None)
|
|
if formatting is None:
|
|
formatting = s.get("f", None)
|
|
if encoding_map is None:
|
|
encoding_map = s.get("encoding", None)
|
|
if not len(args):
|
|
args = s.get("a", [])
|
|
if not len(kwargs):
|
|
kwargs = s.get("k", {})
|
|
if data_type is None:
|
|
data_type = "16bit_uint"
|
|
if gain is None:
|
|
gain = 1
|
|
if offset is None:
|
|
offset = 0
|
|
return super().write(
|
|
register,
|
|
self._convert_to_format(
|
|
data,
|
|
formatting=formatting,
|
|
encoding_map=encoding_map,
|
|
),
|
|
*args,
|
|
data_type=data_type,
|
|
gain=gain,
|
|
offset=offset,
|
|
**kwargs,
|
|
)
|
|
except Exception as e:
|
|
error_message = f"Error during write operation on register {register} with data {data}: {str(e)}"
|
|
self.log.error(error_message, exc_info=True)
|
|
# Mark connection as lost if it's a connection error
|
|
if "Connection error" in str(e) or "Cannot connect" in str(e):
|
|
self.connection_lost = True
|
|
self.log.warning("Connection to Tecna Marposs lost during write operation, will attempt to reconnect on next periodic call")
|
|
# Emit a signal to notify the UI of the connection loss
|
|
self.tecna_error_signal.emit(True, error_message)
|
|
raise # Re-raise the exception for further upstream handling if needed
|
|
|
|
@Component.reconfig_on_error
|
|
def _get(self):
|
|
try:
|
|
# If connection was lost, try to reconnect
|
|
if self.connection_lost:
|
|
try:
|
|
self.log.info("Attempting to reconnect to Tecna Marposs...")
|
|
if self.connection_type == "ethernet":
|
|
if not self.client.connect():
|
|
self.log.warning("Reconnection attempt failed")
|
|
return
|
|
else:
|
|
if not self.client.connect():
|
|
self.log.warning("Reconnection attempt failed")
|
|
return
|
|
|
|
if not self.client.is_socket_open():
|
|
self.log.warning("Reconnection socket not open")
|
|
return
|
|
|
|
# If we get here, connection was successful
|
|
self.log.info("Successfully reconnected to Tecna Marposs")
|
|
# Store the previous connection state before updating it
|
|
self._previous_connection_lost = True
|
|
self.connection_lost = False
|
|
# Emit a signal to notify the UI that the connection has been restored
|
|
self.tecna_error_signal.emit(False, "Connection restored")
|
|
# Force a small delay to ensure the UI has time to process the signal
|
|
time.sleep(0.1)
|
|
except Exception as e:
|
|
self.log.error(f"Error during reconnection attempt: {str(e)}")
|
|
return
|
|
|
|
# READ INFO
|
|
info = {r: self.read(r) for r in [
|
|
"Real time test pressure output",
|
|
"Real time differential pressure output",
|
|
"Real time pressure line regulator",
|
|
"Active alarm flags",
|
|
"Active test program number",
|
|
"Running test: active phase",
|
|
"Running test: test type",
|
|
"Running test: sequence index",
|
|
"Digital inputs status (mask)",
|
|
# "Digital outputs status (mask)",
|
|
]}
|
|
if self.model == "t3p":
|
|
pass
|
|
elif self.model == "t3l":
|
|
info.update({r: self.read(r) for r in [
|
|
"Active not severe alarm flags",
|
|
]})
|
|
else:
|
|
raise NotImplementedError(f"Tecna t3 model {self.model!r} not implemented.")
|
|
|
|
if info["Running test: active phase"] == "FINE TEST": # "END TEST, WAITING THE START OF A NEW TEST":
|
|
info.update(self.get_test_results())
|
|
|
|
for round_me in ["measured leak"]:
|
|
if round_me in info.keys():
|
|
info.update({round_me: float(f"{info[round_me]:.2f}")})
|
|
self.log.debug(str(info))
|
|
super()._get([info])
|
|
except Exception as e:
|
|
error_message = f"Error during _get operation: {str(e)}"
|
|
self.log.error(error_message, exc_info=True)
|
|
# Mark connection as lost if it's a connection error
|
|
if "Connection error" in str(e) or "Cannot connect" in str(e):
|
|
self.connection_lost = True
|
|
self.log.warning("Connection to Tecna Marposs lost, will attempt to reconnect on next periodic call")
|
|
# Emit a signal to notify the UI of the connection loss
|
|
self.tecna_error_signal.emit(True, error_message)
|
|
# Don't raise the exception to allow periodic reconnection attempts
|
|
# Just return without emitting data
|
|
|
|
@Component.reconfig_on_error
|
|
def _set(self, val=None):
|
|
if val is not None: # handle request:
|
|
for register, value in val.items():
|
|
print(register, value)
|
|
self.write(register, value)
|
|
super()._set(val)
|
|
|
|
def start_test(self, table=None):
|
|
if table is None:
|
|
table = self.max_program_number
|
|
self.log.info(f"starting test table {table!r}")
|
|
self.write("Source of test program number selection", "FROM PARAMETER (SET BY LCD OR SERIAL LINE)")
|
|
self.write("Selected program", table)
|
|
self.write("Start test", table)
|
|
|
|
def stop_test(self):
|
|
self.log.warning("stopping test")
|
|
self.write("Reset running test", 0)
|
|
|
|
def get_test_results(self):
|
|
self.log.info("getting test results")
|
|
return {r: self.read(r) for r in [
|
|
#"Running test: phase backwards time",
|
|
"Running test: filling pressure",
|
|
"Running test: pressure at the end of settling",
|
|
#"Running test: burst pressure",
|
|
"Running test: measured leak",
|
|
#"Running test: calculated leak flow rate",
|
|
#"Running test: calculate RVP%",
|
|
"Running test: result",
|
|
]}
|
|
|
|
def write_recipe(self, recipe, step, table=None):
|
|
if table is None:
|
|
table = self.max_program_number
|
|
recipe_name = recipe.part_number[:16].encode("ascii")
|
|
recipe_name += b"\x00" * (16 - len(recipe_name))
|
|
recipe_barcode = f"j{recipe.part_number}"[:16].encode("ascii")
|
|
recipe_barcode += b"\x00" * (24 - len(recipe_barcode))
|
|
# Base flags and PID mode
|
|
test_flags = 0b0110100001010000 if (step.spec.get("autotest", False) in ["ko_check"]) else 0b0110000001010000
|
|
pid_mode_text = step.spec.get("pid_mod_config", "AUTO") # Get the selected text from the combobox
|
|
pid_mode_value = { # Mapping of text to numeric values
|
|
"AUTO": 5,
|
|
"FAST": 0,
|
|
"MEDIUM": 1,
|
|
"SLOW": 2,
|
|
}.get(pid_mode_text, 5)
|
|
# Inject PID mode bits (bits 4..6)
|
|
test_flags = (test_flags & ~(7 << 4)) | (pid_mode_value << 4)
|
|
|
|
pid_ramps = 0b0000000000000000 | int(self.config["recipes_defaults"]["pid_level"]) << 8 | int(self.config["recipes_defaults"]["pid_speed"]) << 12
|
|
# Build a robust map of parameters with safe defaults, so Free Fall (minimal spec) won't crash
|
|
_defs = self.config.get("recipes_defaults", {}) or {}
|
|
_s = step.spec or {}
|
|
|
|
def _gv(key, def_key=None, fallback_key=None):
|
|
if key in _s:
|
|
return _s.get(key)
|
|
if fallback_key and fallback_key in _s:
|
|
return _s.get(fallback_key)
|
|
if def_key is not None:
|
|
return int(_defs.get(def_key, 0)) if isinstance(_defs.get(def_key, 0), (int, float, str)) else 0
|
|
return 0
|
|
|
|
# Determine nominal pressure; for Free Fall we allow "filling_pressure" to be the nominal
|
|
test_pressure_val = _gv("test_pressure", "pressione_di_test", fallback_key="filling_pressure")
|
|
|
|
# Free Fall specific behavior
|
|
is_free_fall = (step.step_type == "test_freefall_leak")
|
|
test_type_value = "Leak Test"
|
|
if is_free_fall:
|
|
# Use Blockage test type for Free Fall as requested (code 2 in registers map)
|
|
test_type_value = "Blockage"
|
|
# Set/clear T1/Pr (bit 1) based on continuous_filling: True => pressure mode
|
|
if bool(_s.get("continuous_filling", False)):
|
|
test_flags |= (1 << 1)
|
|
else:
|
|
test_flags &= ~(1 << 1)
|
|
|
|
# Compute pressure tolerances in % from absolute min/max if provided (for Free Fall)
|
|
pr_minus_percent = None
|
|
pr_plus_percent = None
|
|
try:
|
|
nominal = float(test_pressure_val) if test_pressure_val is not None else 0.0
|
|
if is_free_fall and nominal > 0:
|
|
if _s.get("pressure_min") is not None:
|
|
pmin = max(0.0, float(_s.get("pressure_min")))
|
|
pr_minus_percent = max(0.0, min(999.9, ((nominal - pmin) * 100.0) / nominal))
|
|
if _s.get("pressure_max") is not None:
|
|
pmax = max(0.0, float(_s.get("pressure_max")))
|
|
pr_plus_percent = max(0.0, min(999.9, ((pmax - nominal) * 100.0) / nominal))
|
|
except Exception:
|
|
pr_minus_percent = pr_minus_percent if pr_minus_percent is not None else None
|
|
pr_plus_percent = pr_plus_percent if pr_plus_percent is not None else None
|
|
|
|
spec = {
|
|
"Flag: Instrument settings": 0b0000000000000000,
|
|
"Test program for read/write operation": table,
|
|
**{719 - 1 + i: (recipe_name[i * 2 + 1] << 8) + recipe_name[i * 2] for i in range(8)}, # program name
|
|
**{727 - 1 + i: (recipe_barcode[i * 2 + 1] << 8) + recipe_barcode[i * 2] for i in range(12)}, # program associated bar-code
|
|
**{761 - 1 + i: (recipe_name[i * 2 + 1] << 8) + recipe_name[i * 2] for i in range(8)}, # print field 1
|
|
# **{769 - 1 + i: (recipe_name[i * 2 + 1] << 8) + recipe_name[i * 2] for i in range(8)}, # print field 2
|
|
"Print options": 0b0000000000000000 | self.saver_label_count << 12 | self.saver_print_on_fail << 8 | self.saver_label_template,
|
|
"Test type": test_type_value,
|
|
"Test flags": test_flags,
|
|
"T0 - Pre-filling time": _gv("pre_filling_time", "tempo_pre_riempimento"),
|
|
"P0 - Pre-filling pressure": _gv("pre_filling_pressure", "pressione_pre_riempimento"),
|
|
"T1 - Filling time": _gv("filling_time", "tempo_riempimento"),
|
|
"T2 - Settling time": _gv("settling_time", "tempo_assestamento"),
|
|
"PR- - Min pressure tolerance %": (pr_minus_percent if pr_minus_percent is not None else _gv("settling_pressure_min_percent", "percentuale_minima_pressione_assestamento")),
|
|
"PR+ - Max pressure tolerance % (P+)": (pr_plus_percent if pr_plus_percent is not None else _gv("settling_pressure_max_percent", "percentuale_massima_pressione_assestamento")),
|
|
"T3 - Measure time": _gv("test_time", "tempo_di_test"),
|
|
"Q- Lower test leak limit": _gv("test_pressure_qneg", "pressione_di_test_delta_minimo"),
|
|
"PREL - Nominal test pressure": test_pressure_val,
|
|
"Q+ Upper test leak limit": _gv("test_pressure_qpos", "pressione_di_test_delta_massimo"),
|
|
"FST - Discharge time": _gv("flush_time", "tempo_svuotamento"),
|
|
"FSL - Discharge limit": _gv("flush_pressure", "pressione_svuotamento"),
|
|
"PSQ - Next sequence program PSOUT mode": 0,
|
|
"RAMPS: T1 configuration": pid_ramps,
|
|
"PID: pressure correction": _gv("pid_pressure_correction", "pid_pressure_correction"),
|
|
"Various flags": 0b0000000000010000 if self.config["recipes_defaults"]["tester_discharge_enable"] in ("yes", "x") else 0b0000000000000000
|
|
|
|
}
|
|
if self.model == "t3p":
|
|
pass
|
|
elif self.model == "t3l":
|
|
spec.update({
|
|
"Use programs or use products": 0,
|
|
"Nominal peak pressure": test_pressure_val,
|
|
"Pn - Nominal test pressure": test_pressure_val,
|
|
})
|
|
else:
|
|
raise NotImplementedError(f"tecna t3 model {self.model!r} not implemented.")
|
|
self.log.debug(str(spec))
|
|
for register, value in spec.items():
|
|
self.write(register, value)
|
|
# Override PR-/PR+ handling for Free Fall (Blockage) to write ABSOLUTE pressures
|
|
if is_free_fall:
|
|
try:
|
|
ptet = _s.get("pre_filling_pressure", None)
|
|
pmin = _s.get("pressure_min", None)
|
|
pmax = _s.get("pressure_max", None)
|
|
# Sanity checks and auto-correct
|
|
if pmin is not None and pmax is not None:
|
|
try:
|
|
pmin_v = float(pmin)
|
|
pmax_v = float(pmax)
|
|
if pmin_v > pmax_v:
|
|
self.log.warning(f"Free Fall: pressure_min ({pmin_v}) > pressure_max ({pmax_v}); swapping values to maintain consistency")
|
|
pmin_v, pmax_v = pmax_v, pmin_v
|
|
pmin, pmax = pmin_v, pmax_v
|
|
except Exception:
|
|
pass
|
|
# Write absolute values using relative pressure low-res format (23) and no gain
|
|
if pmin is not None:
|
|
self.write("PR- - Min pressure tolerance %", pmin)
|
|
self.log.info(f"Free Fall: wrote PR- (min final pressure) = {pmin} mbar (format 23)")
|
|
if pmax is not None:
|
|
self.write("PR+ - Max pressure tolerance % (P+)", pmax)
|
|
self.log.info(f"Free Fall: wrote PR+ (max final pressure) = {pmax} mbar (format 23)")
|
|
if ptet is not None:
|
|
self.write("P0 - Pre-filling pressure", ptet)
|
|
self.log.info(f"Free Fall: wrote PR+ (max final pressure) = {ptet} mbar (format 23)")
|
|
|
|
except Exception as e:
|
|
try:
|
|
self.log.exception(f"Free Fall: failed to write absolute PR-/PR+ values: {e}")
|
|
except Exception:
|
|
pass
|
|
pass
|
|
|
|
@db.connection_context()
|
|
def store_recipes(self, recipes):
|
|
if not self.ready:
|
|
self.resume()
|
|
resumed = True
|
|
if not self.ready:
|
|
QMessageBox.critical(
|
|
None,
|
|
"Impossibile salvare le ricette sulla tecna",
|
|
"La tecna non sembra essere pronta",
|
|
)
|
|
return
|
|
recipes = []
|
|
for recipe in list(Recipes.select().order_by(Recipes.name.asc())):
|
|
# if recipe.spec["leak_1"]:
|
|
recipes.append([recipe, recipe.spec["steps"]["leak_1"]])
|
|
# reverve last for our recipe control
|
|
if len(recipes) > max(self.max_program_number - 1, 0):
|
|
self.log.warning(f"too many recipes ({len(recipes)}), saving only first {max(self.max_program_number - 1, 0)}")
|
|
QMessageBox.warning(
|
|
None,
|
|
"Impossibile salvare tutte le ricette sulla tecna",
|
|
f"Troppe ricette ({len(recipes)}), saranno salvate solamente le prime {max(self.max_program_number - 1, 0)}",
|
|
)
|
|
self._store_recipes_signal.emit(recipes[:max(self.max_program_number - 1, 0)])
|
|
self._store_recipes_lock.acquire(max(self._store_recipes_lock.available(), 1))
|
|
QMessageBox.information(
|
|
None,
|
|
"Ricette salvate sulla tecna",
|
|
f"Salvate {min(len(recipes), max(self.max_program_number - 1, 0))} ricette",
|
|
)
|
|
if resumed:
|
|
self.pause()
|
|
|
|
def _store_recipes(self, recipes):
|
|
if len(recipes) > max(self.max_program_number - 1, 0):
|
|
self.log.warning(f"too many recipes ({len(recipes)}) saving only first {max(self.max_program_number - 1, 0)}")
|
|
for i, [recipe, step] in enumerate(recipes[:max(self.max_program_number - 1, 0)], start=1):
|
|
self.log.debug(f"saving recipe {recipe.part_number} to table {i}")
|
|
self.write_recipe(recipe, step, table=i)
|
|
self.log.info(f"saved {min(len(recipes), max(self.max_program_number - 1, 0))} recipes")
|
|
self._store_recipes_lock.release(1)
|
|
|
|
# SELECT & READ RECIPE FROM TECNA MEMORY
|
|
|
|
def read_recipe(self, recipe_number):
|
|
spec = {
|
|
**{(719 - 1 + i) : f"R{i}" for i in range(8)},
|
|
**{(727 - 1 + i) : f"C{i}" for i in range(12)},
|
|
**{(761 - 1 + i) : f"F1_{i}" for i in range(8)},
|
|
**{(769 - 1 + i) : f"F2_{i}" for i in range(8)},
|
|
"T0 - Pre-filling time": "pre_filling_time",
|
|
"P0 - Pre-filling pressure": "pre_filling_pressure",
|
|
"T1 - Filling time": "filling_time",
|
|
"T2 - Settling time": "settling_time",
|
|
"PR- - Min pressure tolerance %": "settling_pressure_min_percent",
|
|
"PR+ - Max pressure tolerance % (P+)": "settling_pressure_max_percent",
|
|
"T3 - Measure time": "test_time",
|
|
"Q- Lower test leak limit": "test_pressure_qneg",
|
|
"Q+ Upper test leak limit": "test_pressure_qpos",
|
|
"FST - Discharge time": "flush_time",
|
|
"FSL - Discharge limit": "flush_pressure",
|
|
"Print options": "print_options"
|
|
}
|
|
if self.model == "t3p":
|
|
spec.update({
|
|
"PREL - Nominal test pressure": "test_pressure",
|
|
})
|
|
elif self.model == "t3l":
|
|
spec.update({
|
|
"PREL - Nominal test pressure": "test_pressure",
|
|
})
|
|
else:
|
|
raise NotImplementedError(f"tecna t3 model {self.model!r} not implemented.")
|
|
|
|
# SET RECIPE NUMBER
|
|
#self.write("Source of test program number selection", "FROM PARAMETER (SET BY LCD OR SERIAL LINE)")
|
|
#self.write("Selected program", recipe_number)
|
|
self.write("Test program for read/write operation", recipe_number)
|
|
|
|
recipe_data = {}
|
|
# READ ALL PARAMETERS
|
|
for register_name, field_name in spec.items():
|
|
recipe_data[field_name] = self.read(register_name)
|
|
time.sleep(0.01)
|
|
recipe_name=self.int_array_to_str([recipe_data[f"R{wn}"] for wn in range(8)])
|
|
recipe_code=self.int_array_to_str([recipe_data[f"C{wn}"] for wn in range(12)])
|
|
recipe_f1=self.int_array_to_str([recipe_data[f"F1_{wn}"] for wn in range(8)])
|
|
recipe_f2=self.int_array_to_str([recipe_data[f"F2_{wn}"] for wn in range(8)])
|
|
recipe_data["recipe_name"]=recipe_name
|
|
recipe_data["recipe_code"]=recipe_code
|
|
recipe_data["recipe_f1"]=recipe_f1
|
|
recipe_data["recipe_f2"]=recipe_f2
|
|
recipe_data["print_template"] = int(recipe_data["print_options"] & 0xFF)
|
|
if self.model == "t3p":
|
|
recipe_data["test_time"]=int(recipe_data["test_time"]/10)
|
|
recipe_data["pre_filling_time"] = int(recipe_data["pre_filling_time"] / 10)
|
|
recipe_data["filling_time"] = int(recipe_data["filling_time"] / 10)
|
|
recipe_data["settling_time"] = int(recipe_data["settling_time"] / 10)
|
|
recipe_data["test_pressure_qneg"] = int(recipe_data["test_pressure_qneg"] / 100)
|
|
recipe_data["test_pressure_qpos"] = int(recipe_data["test_pressure_qpos"] / 100)
|
|
|
|
return recipe_data
|
|
|
|
@staticmethod
|
|
def int_array_to_str(arr):
|
|
"""
|
|
Translates an array of 16-bit integers, where each integer contains 2 ASCII characters,
|
|
into a string.
|
|
|
|
Args:
|
|
arr: The array of 16-bit integers.
|
|
|
|
Returns:
|
|
The translated string.
|
|
"""
|
|
result = ""
|
|
for value in arr:
|
|
char1 = chr(value & 0xFF)
|
|
char2 = chr(value >> 8)
|
|
|
|
# Stop if we encounter a null byte
|
|
if char1 == '\0':
|
|
break
|
|
result+=char1
|
|
if char2 == '\0':
|
|
break
|
|
result+=char2
|
|
return result
|
|
|
|
@pyqtSlot(str)
|
|
def handle_modbus_error(self, error_message):
|
|
"""
|
|
Handle errors received from the ModbusComponent.
|
|
Emit a signal to notify the UI of the error.
|
|
"""
|
|
if error_message:
|
|
# Log the error and emit a signal
|
|
self.log.error(f"Modbus error encountered: {error_message}")
|
|
|
|
# Mark connection as lost if it's a connection error
|
|
if "Connection error" in error_message or "Cannot connect" in error_message:
|
|
self.connection_lost = True
|
|
self.log.warning("Connection to Tecna Marposs lost, will attempt to reconnect on next periodic call")
|
|
# Emit a signal to notify the UI of the connection loss
|
|
self.tecna_error_signal.emit(True, error_message)
|
|
|
|
return True, error_message
|
|
else:
|
|
# Connection restored
|
|
if self.connection_lost:
|
|
self.connection_lost = False
|
|
self.log.info("Connection to Tecna Marposs restored")
|
|
# Emit a signal to notify the UI that the connection has been restored
|
|
self.tecna_error_signal.emit(False, "Connection restored")
|
|
return False, None
|