st-ten-1/src/ui/test_leak/test_leak.py
2026-01-23 14:42:45 +01:00

586 lines
28 KiB
Python

import sys
import time
import weakref
import os
from PyQt5.QtGui import QPixmap
from PyQt5.QtWidgets import QMessageBox, QDialog, QApplication
from PyQt5.QtCore import Qt, pyqtSlot, QTimer
from ui import Dialog
from ui.test_instructions_reminder import Test_Instructions_Reminder
from ui.test_test import Test_Test
VALVE_TIME=0.5
class Test_Leak(Test_Test):
def __init__(self, config,components=None, recipe=None, step=None, pieces=None, run_once=False, reset_on_start=True, enable_override=False,parent=None):
super().__init__(components=components, recipe=recipe, step=step, pieces=pieces, run_once=run_once, reset_on_start=reset_on_start, enable_override=enable_override)
self.get_connection = None
self.io_ok = True
self.blow_on = False
self.parent=parent
self.step=step
self.config = config
self.recipe_written = False
self.start_b.clicked.connect(self.start_test)
self.stop_b.clicked.connect(self.stop_test)
self.show_instruction_b.setVisible("show_instructions" in self.parent.config["hardware_config"].keys())
self.show_instruction_b.clicked.connect(self.show_instruction)
# Connect to the tecna_error_signal to handle connection issues
self.components[self.tester_component].tecna_error_signal.connect(self.handle_modbus_error)
def _test_output_sequence(self, step):
"""Sets the first digital output of the usb_586x based on the step type."""
if step.step_type == "test_freefall_leak":
self.set_digital_out("first_output", 1) # Set high
else:
self.set_digital_out("first_output", 0) # Set low
def show_instruction(self):
dialog=Dialog()
dialog.setCentralWidget(Test_Instructions_Reminder(recipe=self.parent.recipe,bench_name=self.parent.config.machine_id))
dialog.show()
def reset(self):
self.components[self.tester_component].stop_test()
super().reset()
def stop_test(self):
self.components[self.tester_component].stop_test()
self.display_text(text="PROVA INTERROTTA", bg_color="yellow")
time.sleep(1)
self.start_b.setEnabled(True)
self.stop_b.setEnabled(False)
def start_test(self):
# print extra labels
if self.step.step_type == "leak_1":
self.parent.print_extra_labels()
# SELECT TEST CHANNEL
if self.parent.config["hardware_config"].get("external_flush_blow", None) == "present":
if self.parent.config["hardware_config"].get("dual_channel", None) == "present":
chan_sel = self.step.spec["chan_sel"] # 0=CH1, 1=CH2
self.set_digital_out("out_channel_select", chan_sel)
self.set_digital_out("in_channel_select", chan_sel)
time.sleep(VALVE_TIME)
# SET LED INDICATORS
if chan_sel == 0:
self.set_digital_out("ch1_led", True)
else:
self.set_digital_out("ch2_led", True)
self.blow_on=True
self.display_text("SOFFIAGGIO IN CORSO...")
self.set_digital_out("blow_led",True)
self.set_digital_out("blow_on",True)
time.sleep(VALVE_TIME)
self.set_digital_out("flush_on", True)
blow_time=int(self.step.spec.get('ext_blow_time',3))
self.set_digital_out("blow_led", True)
time.sleep(blow_time)
self.set_digital_out("blow_led", False)
self.set_digital_out("blow_on", False)
time.sleep(VALVE_TIME)
self.set_digital_out("flush_on", False)
if self.parent.config["hardware_config"].get("dual_channel", None) != "present":
self.set_digital_out("ch1_led", True)
self.blow_on = False
if not self.simulate:
self.components[self.tester_component].start_test()
def start(self, recipe=None, step=None, pieces=None):
self._test_output_sequence(step) # Call with the step object
# TESTING
if "--test-leak" in sys.argv:
self.simulate = True
else:
self.simulate = False
if "--autostart" in sys.argv:
self.start_b.setEnabled(True)
self.start_b.click()
# /TESTING
show = super().start(recipe=recipe, step=step, pieces=pieces)
if show is False:
return show
if "leak_2" in [s.step_type for s in self.parent.cycle_steps]:
if self.step.step_type=="leak_1":
self.test_num_l.setText("1/2")
else:
self.test_num_l.setText("2/2")
else:
self.test_num_l.setText("1/1")
if self.step.spec.get("autotest", False):
self.template_print_l.setText(f"AUTOTEST")
else:
self.template_print_l.setText(f"{self.parent.print_template}")
# Populate labels defensively to support both Leak and Free Fall specs
pressure_val = self.step.spec.get('test_pressure', self.step.spec.get('filling_pressure', ""))
leak_min_val = self.step.spec.get('test_pressure_qneg', self.step.spec.get('pressure_min', ""))
leak_max_val = self.step.spec.get('test_pressure_qpos', self.step.spec.get('pressure_max', ""))
self.recipe_pressure_l.setText(f"{pressure_val}")
self.leak_min_l.setText(f"{leak_min_val}")
self.leak_max_l.setText(f"{leak_max_val}")
self.fill_time_l.setText(f"{self.step.spec.get('filling_time', '')}")
self.settle_time_l.setText(f"{self.step.spec.get('settling_time', '')}")
self.meas_time_l.setText(f"{self.step.spec.get('test_time', '')}")
self.valore_PID_l.setText(f"{self.step.spec.get('pid_pressure_correction', '')}")
# SETUP TEST LOOP
if self.step.spec.get("autotest", False): # IF AUTOTESTING UPLOAD RECIPE EVERY TIME
self.recipe_written = False
if self.parent.config["hardware_config"].get("second_leak_test", "absent") == "present": # IF SECOND LEAK TEST ENABLED UPLOAD RECIPE EVERY TIME
self.recipe_written = False
if not self.recipe_written:
self.components[self.tester_component].write_recipe(self.recipe, self.step)
self.recipe_written=True
self.get_connection = self.components[self.tester_component].out.connect(self.get)
self.components[self.tester_component].resume()
if self.parent_assembly_widget is not None:
self.display_text(text="ATTENDERE")
self.start_b.setEnabled(False)
self.stop_b.setEnabled(False)
if self.step.spec.get("autotest", False) == "ok_check":
self.template_print_l.setVisible(False)
self.template_label.setVisible(False)
self.display_text(text="AUTOTEST: RIMUOVERE FUGA CALIBRATA E PREMERE START PER INIZIARE LA PROVA TENUTA",
bg_color="blue", text_color="white")
super().visualize(None, img=self.status_imgs_full["calibrated-leak-remove"])
elif self.step.spec.get("autotest", False) == "ko_check":
self.template_print_l.setVisible(False)
self.template_label.setVisible(False)
self.display_text(
text="AUTOTEST: COLLEGARE TUBO-TUBO + FUGA CALIBRATA E PREMERE START PER INIZIARE LA PROVA TENUTA DI PROVA",
bg_color="blue", text_color="white")
super().visualize(None, img=self.status_imgs_full["calibrated-leak"])
else:
if step.step_type == "test_freefall_leak":
# Flag Free Fall mode for special UI handling
self._free_fall_mode = True
self._free_fall_img_scale = 1.75 # scale image to almost full available space
# Hide parameters not relevant during free-fall
for name in [
"label_18", # Tempo di riempimento
"fill_time_l",
"label_19", # Tempo di assestamento
"settle_time_l",
"label_22", # Tempo di prova
"meas_time_l",
"template_label", # Etichetta selezionata (caption)
"template_print_l", # Etichetta selezionata (value)
"valore_PID", # Valore PID (caption)
"valore_PID_l", # Valore PID (value)
"label_17", # unit 's' for fill time
"label_20", # unit 's' for settling/measuring time
"label_21", # possibly another unit/aux label to hide in free fall
]:
if hasattr(self, name):
getattr(self, name).setVisible(False)
self.display_text(text="USARE IL SISTEMA DI FLUSSAGGIO. AL TERMINE POSIZIONARE IL PEZZO PER LA PROVA TENUTA")
# Show placeholder image for free-fall: PERVIETÀ.png
pervieta_path_candidates = [
"config/warning_images/generic/PERVIETÀ.png",
"config/warning_images/generic/PERVIETA.png",
"config/warning_images/generic/pervieta.png",
]
img_path = next((p for p in pervieta_path_candidates if os.path.exists(p)), None)
if img_path is not None:
super().visualize(None, img=QPixmap(img_path))
else:
# Fallbacks: try the older instruction image or default
try:
instr_folder = (self.config.get("machine", {}) or {}).get("instruction_folder", getattr(self.config, "machine_id", "")).strip() or getattr(self.config, "machine_id", "")
except Exception:
instr_folder = getattr(self.config, "machine_id", "")
ff_img = None
for ext in ("png", "jpg", "jpeg"):
candidate = f"config/instruction_images/{instr_folder}/free_fall.{ext}"
if os.path.exists(candidate):
ff_img = candidate
break
if ff_img is not None:
super().visualize(None, img=QPixmap(ff_img))
else:
super().visualize(None, img=self.status_imgs_full[None])
else:
# If the recipe contains a Free Fall test, show the PERVIETÀ image as default even on Leak steps
try:
has_free_fall = any(getattr(s, "step_type", None) == "test_freefall_leak" for s in (self.parent.cycle_steps or []))
except Exception:
has_free_fall = False
if has_free_fall:
# Reuse Free Fall mode to benefit from special scaling and image persistence
self._free_fall_mode = True
self._free_fall_img_scale = 1.75
# Keep normal Leak instruction text
self.display_text(text="COLLEGARE GLI ATTACCHI PNEUMATICI E PREMERE START PER INIZIARE LA PROVA TENUTA")
# Load PERVIETÀ image with fallbacks
pervieta_path_candidates = [
"config/warning_images/generic/PERVIETÀ_2.png",
"config/warning_images/generic/PERVIETA_2.png",
"config/warning_images/generic/pervieta_2.png",
]
img_path = next((p for p in pervieta_path_candidates if os.path.exists(p)), None)
if img_path is not None:
super().visualize(None, img=QPixmap(img_path))
else:
super().visualize(None, img=self.status_imgs_full[None])
else:
# Ensure Free Fall mode is disabled for other steps
self._free_fall_mode = False
self._free_fall_img_scale = None
self.display_text(text="COLLEGARE GLI ATTACCHI PNEUMATICI E PREMERE START PER INIZIARE LA PROVA TENUTA")
super().visualize(None, img=self.status_imgs_full[None])
if step.step_type != "test_freefall_leak":
self.template_print_l.setVisible(True)
self.template_label.setVisible(True)
if self.simulate:
QApplication.processEvents()
time.sleep(2)
# AUTO START SECOND TEST
if step.step_type == "leak_2":
if self.config["hardware_config"].get("dual_channel", "absent") == "present":
self.recipe_written = False
time.sleep(1)
self.start_b.setEnabled(True)
self.start_b.click()
else:
self.recipe_written = False
time.sleep(1)
self.start_b.setEnabled(True)
return show
def stop(self):
# disable test loop
self.components[self.tester_component].stop_test()
self.components[self.tester_component].pause()
self.disconnect(self.get_connection)
self.set_digital_out("first_output", 0) # Set low when test stops
super().stop()
self.start_b.setEnabled(False)
self.stop_b.setEnabled(False)
def get(self, data=None, override=False):
if self.done: # avoid processing if completed
return
if data is None or data[-1] is None:
super().get(None, override=override)
return
data = data[-1]
# Note: Connection issues are now handled in the visualize method
# to ensure they take precedence over other messages
# TESTING
if self.simulate:
# Different simulation profiles for free-fall vs leak_1/leak_2
is_free_fall = False
try:
is_free_fall = getattr(self.step, "step_type", None) == "test_freefall_leak"
except Exception:
# Fallback if step is a dict-like
try:
is_free_fall = (self.step.get("type") or self.step.get("step_type")) == "test_freefall_leak"
except Exception:
pass
if is_free_fall:
# Free-fall requested values (distinct from leak_1)
if "--fail-leak" in sys.argv:
data[self.tester_component] = {
"Running test: active phase": "WAITING START",
"Running test: result": "-----TESTING----- fail",
"Running test: filling pressure": 3000,
"Running test: measured leak": 0,
"Running test: pressure at the end of settling": 1831,
"Running test: pressure at the end of measure": 1831,
}
else:
data[self.tester_component] = {
"Running test: active phase": "WAITING START",
"Running test: result": "-----TESTING----- passed",
"Running test: filling pressure": 3000,
"Running test: measured leak": 0,
"Running test: pressure at the end of settling": 1831,
"Running test: pressure at the end of measure": 1831,
}
else:
# Standard leak (leak_1/leak_2) simulation — keep distinct values
if "--fail-leak" in sys.argv:
data[self.tester_component] = {
"Running test: active phase": "WAITING START",
"Running test: result": "-----TESTING----- fail",
# Legacy simulation values for fail
"Running test: filling pressure": 5000,
"Running test: measured leak": 50,
"Running test: pressure at the end of settling": 4999,
# Let end-of-measure be derived by saver when possible
}
else:
data[self.tester_component] = {
"Running test: active phase": "WAITING START",
"Running test: result": "-----TESTING----- passed",
# Legacy simulation values for pass
"Running test: filling pressure": 5000,
"Running test: measured leak": 5,
"Running test: pressure at the end of settling": 4999,
# Let end-of-measure be derived by saver when possible
}
if "Running test: result" in data[self.tester_component]:
# TEST ENDED, CHECK RESULT
result = data[self.tester_component]["Running test: result"]
step=self.step.spec.get("autotest", "")
if step == "ok_check":
ok = type(result) is str and "passed" in result.lower() # AUTOTEST - NO LEAK
self.recipe_written=False
elif step == "ko_check":
if self.tester_component == "tecna_t3":
# AUTOTEST - CALIBRATED LEAK - WINDOWED TYPE, OK EXPECTED (TECNA)
ok = type(result) is str and "passed" in result.lower()
else:
# AUTOTEST - CALIBRATED LEAK - NON WINDOWED TYPE, FAIL EXPECTED (FURNESS CONTROLS)
ok = type(result) is str and "fail" in result.lower()
self.recipe_written = False
else:
ok = type(result) is str and "passed" in result.lower() # NORMAL TEST, OK EXPECTED
# SET DIGITAL OUTPUTS
if self.parent.config["hardware_config"].get("external_flush_blow", None) == "present":
self.blow_on = True
if self.parent.config["hardware_config"].get("dual_channel", None) != "present":
self.set_digital_out("ch1_led", False)
self.display_text("SCARICO ESTERNO IN CORSO...")
self.set_digital_out("flush_led", True)
self.set_digital_out("flush_on", True)
time.sleep(VALVE_TIME)
flush_time = int(self.step.spec.get('ext_flush_time',3))
time.sleep(flush_time)
self.set_digital_out("flush_led", False)
#self.set_digital_out("flush_on", False)
if self.parent.config["hardware_config"].get("dual_channel", None) == "present":
self.set_digital_out("out_channel_select", False)
self.set_digital_out("in_channel_select", False)
self.set_digital_out("ch1_led", False)
self.set_digital_out("ch2_led", False)
else:
#result = None
ok = None
results={"ok":ok}
results.update(data)
super().get([{
"time": data.get("time", None),
"results": results
#"results": {
#"ok": ok,
#"result": result,
#"data": data[self.tester_component],
#},
}], override=override, fail=ok is False)
def visualize(self, data=None):
if data is None:
data = {}
d = data.get("results", {}).get(self.tester_component, {})
for k, l in {
"Running test: active phase": self.test_phase_l,
"Real time test pressure output": self.circuit_pressure_l,
#"Real time differential pressure output": self.leak_l,
"Running test: measured leak": self.leak_l,
"Real time pressure line regulator": self.regulated_pressure_l,
# "Active alarm flags": self._l,
"Running test: test type": self.test_type_l,
"Running test: sequence index": self.sequence_index_l,
}.items():
v = d.get(k, "-")
if type(v) is float:
v = round(v, 2)
l.setText(str(v))
# Check if there's a connection issue before displaying test status
if hasattr(self.components[self.tester_component], 'connection_lost') and self.components[self.tester_component].connection_lost:
# Connection is lost, don't display test status
# The handle_modbus_error method will display the appropriate message
# Just ensure buttons are in the correct state
self.start_b.setEnabled(False)
self.stop_b.setEnabled(True)
return
# Check if the connection was just restored
if hasattr(self.components[self.tester_component], '_previous_connection_lost') and self.components[self.tester_component]._previous_connection_lost:
# Connection was just restored, don't display test status yet
# The handle_modbus_error method will display the appropriate message
# Just ensure buttons are in the correct state
self.start_b.setEnabled(False)
self.stop_b.setEnabled(True)
return
# Check test phase
if d.get("Running test: active phase", None) in {
"WAITING START",
"ATTESA START",
"END TEST, WAITING THE START OF A NEW TEST"
"FINE TEST",
"STANDBY",
"PRESSIONE BASSA",
"PRESSIONE ALTA",
"ERRORE"
}:
self.start_b.setEnabled(True)
self.start_b.setDefault(True)
self.start_b.setFocus()
self.stop_b.setEnabled(False)
else:
if self.step is not None and not self.blow_on:
if not self.step.spec.get("autotest", False):
self.display_text(text="PROVA TENUTA IN CORSO")
else:
self.display_text(text="AUTOTEST: PROVA TENUTA IN CORSO")
self.start_b.setEnabled(False)
self.stop_b.setEnabled(True)
ok = data.get("results", {}).get("ok", None)
# Preserve PERVIETÀ image during Free Fall-related steps
if getattr(self, "_free_fall_mode", False):
# Keep the current image (likely PERVIETÀ) instead of switching to status icons
cur_img = getattr(self, "img", None)
if cur_img is not None:
super().visualize(data, img=cur_img)
else:
super().visualize(data, img=self.status_imgs_full.get(ok, self.status_imgs_full[None]))
else:
super().visualize(data, img=self.status_imgs_full.get(ok, self.status_imgs_full[None]))
def display_text(self,text="", bg_color=None,text_color=None):
# Display the message in the parent assembly widget if available
if self.parent_assembly_widget is not None:
self.parent_assembly_widget().set_text(text=text, bg_color=bg_color,text_color=text_color)
QApplication.processEvents()
time.sleep(0.3)
QApplication.processEvents()
# If parent_assembly_widget is None, display the message directly in the UI
else:
# Set the text in a label in the current widget if available
for label_name in ["status_l", "test_phase_l"]:
if hasattr(self, label_name):
label = getattr(self, label_name)
if label is not None:
# Set background color if specified
if bg_color is not None:
label.setStyleSheet(f"background-color: {bg_color}; color: {text_color or 'black'};")
# Set the text
label.setText(text)
QApplication.processEvents()
break
# If no suitable label is found, show a message box for critical errors
if bg_color == "red" or bg_color == "yellow":
QMessageBox.warning(self, "Avviso", text)
QApplication.processEvents()
def set_digital_out(self,out_name=None,state=1,component_name="digital_io"):
if self.io_ok:
bit = int(self.parent.config[component_name][out_name])
ret = self.components[component_name].set_bit_verify(0,bit,state)
if not ret:
QMessageBox.critical(None, "ERRORE", f"ERRORE I/O DIGITALE - VERIFICARE CONNESSIONE USB")
self.io_ok = False
def save_last(self):
if self.last is None:
return
@pyqtSlot(bool, str)
def handle_modbus_error(self, error, error_message):
"""
Handle errors received from the Modbus component.
If it's a connection error, display a message and stop the test.
For other errors, stop the test gracefully.
If error is False, it means the connection has been restored.
"""
if error:
if "Connection error" in error_message or "Cannot connect" in error_message:
# Connection is lost, display a message and stop the test
if self.step is not None and self.step.spec.get("autotest", False):
self.display_text(
text=f"AUTOTEST: ERRORE DI CONNESSIONE TECNA. TEST INTERROTTO.",
bg_color="red", text_color="white"
)
else:
self.display_text(
text=f"ERRORE DI CONNESSIONE TECNA. TEST INTERROTTO.",
bg_color="red", text_color="white"
)
# Stop the test when a connection error is detected
self.parent.fail_cycle()
# Always disable start button and enable stop button during connection issues
self.start_b.setEnabled(False)
self.stop_b.setEnabled(True)
else:
self.display_text(
text=f"ERRORE TECNA TEST INTERROTTO.",
bg_color="red", text_color="white"
)
self.stop_test()
elif error_message == "Connection restored":
# Connection has been restored
if self.step is not None and self.step.spec.get("autotest", False):
self.display_text(
text=f"AUTOTEST: CONNESSIONE TECNA RIPRISTINATA. CONTINUARE IL TEST.",
bg_color="green", text_color="white"
)
else:
self.display_text(
text=f"CONNESSIONE TECNA RIPRISTINATA. CONTINUARE IL TEST.",
bg_color="green", text_color="white"
)
# Reset the flag immediately to ensure the reconnection message is displayed
self.components[self.tester_component]._previous_connection_lost = False
# Force a UI update to ensure the message is displayed
QApplication.processEvents()
# Always disable start button and enable stop button during connection issues
self.start_b.setEnabled(False)
self.stop_b.setEnabled(True)
def resizeEvent(self, event=None):
# First, let the base class handle default behavior
try:
super().resizeEvent(event)
except Exception:
# Fallback: ignore if base cannot handle the event
pass
# Apply Free Fall specific image scaling (half space)
if getattr(self, "_free_fall_mode", False):
if hasattr(self, "img_l") and hasattr(self, "img") and self.img_l is not None and self.img is not None:
try:
scale = float(getattr(self, "_free_fall_img_scale", 0.5) or 0.5)
except Exception:
scale = 0.5
w = max(1, int(self.img_l.width() * scale))
h = max(1, int(self.img_l.height() * scale))
try:
self.img_l.setPixmap(self.img.scaled(w, h, Qt.KeepAspectRatio, Qt.SmoothTransformation))
except Exception:
# If scaling fails, leave as base-class result
pass