import sys import time import weakref import os from PyQt5.QtGui import QPixmap from PyQt5.QtWidgets import QMessageBox, QDialog, QApplication from PyQt5.QtCore import Qt, pyqtSlot from ui import Dialog from ui.test_instructions_reminder import Test_Instructions_Reminder from ui.test_test import Test_Test VALVE_TIME=0.5 class Test_Leak(Test_Test): def __init__(self, config,components=None, recipe=None, step=None, pieces=None, run_once=False, reset_on_start=True, enable_override=False,parent=None): super().__init__(components=components, recipe=recipe, step=step, pieces=pieces, run_once=run_once, reset_on_start=reset_on_start, enable_override=enable_override) self.get_connection = None self.io_ok = True self.blow_on = False self.parent=parent self.step=step self.config = config self.recipe_written = False self.start_b.clicked.connect(self.start_test) self.stop_b.clicked.connect(self.stop_test) self.show_instruction_b.setVisible("show_instructions" in self.parent.config["hardware_config"].keys()) self.show_instruction_b.clicked.connect(self.show_instruction) # Connect to the tecna_error_signal to handle connection issues self.components[self.tester_component].tecna_error_signal.connect(self.handle_modbus_error) def show_instruction(self): dialog=Dialog() dialog.setCentralWidget(Test_Instructions_Reminder(recipe=self.parent.recipe,bench_name=self.parent.config.machine_id)) dialog.show() def reset(self): self.components[self.tester_component].stop_test() super().reset() def stop_test(self): self.components[self.tester_component].stop_test() self.display_text(text="PROVA INTERROTTA", bg_color="yellow") time.sleep(1) self.start_b.setEnabled(True) self.stop_b.setEnabled(False) def start_test(self): # print extra labels if self.step.step_type == "leak_1": self.parent.print_extra_labels() # SELECT TEST CHANNEL if self.parent.config["hardware_config"].get("external_flush_blow", None) == "present": if self.parent.config["hardware_config"].get("dual_channel", None) == "present": chan_sel = self.step.spec["chan_sel"] # 0=CH1, 1=CH2 self.set_digital_out("out_channel_select", chan_sel) self.set_digital_out("in_channel_select", chan_sel) time.sleep(VALVE_TIME) # SET LED INDICATORS if chan_sel == 0: self.set_digital_out("ch1_led", True) else: self.set_digital_out("ch2_led", True) self.blow_on=True self.display_text("SOFFIAGGIO IN CORSO...") self.set_digital_out("blow_led",True) self.set_digital_out("blow_on",True) time.sleep(VALVE_TIME) self.set_digital_out("flush_on", True) blow_time=int(self.step.spec.get('ext_blow_time',3)) self.set_digital_out("blow_led", True) time.sleep(blow_time) self.set_digital_out("blow_led", False) self.set_digital_out("blow_on", False) time.sleep(VALVE_TIME) self.set_digital_out("flush_on", False) if self.parent.config["hardware_config"].get("dual_channel", None) != "present": self.set_digital_out("ch1_led", True) self.blow_on = False if not self.simulate: self.components[self.tester_component].start_test() def start(self, recipe=None, step=None, pieces=None): # TESTING if "--test-leak" in sys.argv: self.simulate = True else: self.simulate = False if "--autostart" in sys.argv: self.start_b.setEnabled(True) self.start_b.click() # /TESTING show = super().start(recipe=recipe, step=step, pieces=pieces) if show is False: return show if "leak_2" in [s.step_type for s in self.parent.cycle_steps]: if self.step.step_type=="leak_1": self.test_num_l.setText("1/2") else: self.test_num_l.setText("2/2") else: self.test_num_l.setText("1/1") if self.step.spec.get("autotest", False): self.template_print_l.setText(f"AUTOTEST") else: self.template_print_l.setText(f"{self.parent.print_template}") # Populate labels defensively to support both Leak and Free Fall specs pressure_val = self.step.spec.get('test_pressure', self.step.spec.get('filling_pressure', "")) leak_min_val = self.step.spec.get('test_pressure_qneg', self.step.spec.get('pressure_min', "")) leak_max_val = self.step.spec.get('test_pressure_qpos', self.step.spec.get('pressure_max', "")) self.recipe_pressure_l.setText(f"{pressure_val}") self.leak_min_l.setText(f"{leak_min_val}") self.leak_max_l.setText(f"{leak_max_val}") self.fill_time_l.setText(f"{self.step.spec.get('filling_time', '')}") self.settle_time_l.setText(f"{self.step.spec.get('settling_time', '')}") self.meas_time_l.setText(f"{self.step.spec.get('test_time', '')}") self.valore_PID_l.setText(f"{self.step.spec.get('pid_pressure_correction', '')}") # SETUP TEST LOOP if self.step.spec.get("autotest", False): # IF AUTOTESTING UPLOAD RECIPE EVERY TIME self.recipe_written = False if self.parent.config["hardware_config"].get("second_leak_test", "absent") == "present": # IF SECOND LEAK TEST ENABLED UPLOAD RECIPE EVERY TIME self.recipe_written = False if not self.recipe_written: self.components[self.tester_component].write_recipe(self.recipe, self.step) self.recipe_written=True self.get_connection = self.components[self.tester_component].out.connect(self.get) self.components[self.tester_component].resume() if self.parent_assembly_widget is not None: self.display_text(text="ATTENDERE") self.start_b.setEnabled(False) self.stop_b.setEnabled(False) if self.step.spec.get("autotest", False) == "ok_check": self.template_print_l.setVisible(False) self.template_label.setVisible(False) self.display_text(text="AUTOTEST: RIMUOVERE FUGA CALIBRATA E PREMERE START PER INIZIARE LA PROVA TENUTA", bg_color="blue", text_color="white") super().visualize(None, img=self.status_imgs_full["calibrated-leak-remove"]) elif self.step.spec.get("autotest", False) == "ko_check": self.template_print_l.setVisible(False) self.template_label.setVisible(False) self.display_text( text="AUTOTEST: COLLEGARE TUBO-TUBO + FUGA CALIBRATA E PREMERE START PER INIZIARE LA PROVA TENUTA DI PROVA", bg_color="blue", text_color="white") super().visualize(None, img=self.status_imgs_full["calibrated-leak"]) else: if step.step_type == "test_freefall_leak": # Flag Free Fall mode for special UI handling self._free_fall_mode = True self._free_fall_img_scale = 1.75 # scale image to almost full available space # Hide parameters not relevant during free-fall for name in [ "label_18", # Tempo di riempimento "fill_time_l", "label_19", # Tempo di assestamento "settle_time_l", "label_22", # Tempo di prova "meas_time_l", "template_label", # Etichetta selezionata (caption) "template_print_l", # Etichetta selezionata (value) "valore_PID", # Valore PID (caption) "valore_PID_l", # Valore PID (value) "label_17", # unit 's' for fill time "label_20", # unit 's' for settling/measuring time "label_21", # possibly another unit/aux label to hide in free fall ]: if hasattr(self, name): getattr(self, name).setVisible(False) self.display_text(text="USARE IL SISTEMA DI FLUSSAGGIO. AL TERMINE POSIZIONARE IL PEZZO PER LA PROVA TENUTA") # Show placeholder image for free-fall: PERVIETÀ.png pervieta_path_candidates = [ "config/warning_images/generic/PERVIETÀ.png", "config/warning_images/generic/PERVIETA.png", "config/warning_images/generic/pervieta.png", ] img_path = next((p for p in pervieta_path_candidates if os.path.exists(p)), None) if img_path is not None: super().visualize(None, img=QPixmap(img_path)) else: # Fallbacks: try the older instruction image or default try: instr_folder = (self.config.get("machine", {}) or {}).get("instruction_folder", getattr(self.config, "machine_id", "")).strip() or getattr(self.config, "machine_id", "") except Exception: instr_folder = getattr(self.config, "machine_id", "") ff_img = None for ext in ("png", "jpg", "jpeg"): candidate = f"config/instruction_images/{instr_folder}/free_fall.{ext}" if os.path.exists(candidate): ff_img = candidate break if ff_img is not None: super().visualize(None, img=QPixmap(ff_img)) else: super().visualize(None, img=self.status_imgs_full[None]) else: # If the recipe contains a Free Fall test, show the PERVIETÀ image as default even on Leak steps try: has_free_fall = any(getattr(s, "step_type", None) == "test_freefall_leak" for s in (self.parent.cycle_steps or [])) except Exception: has_free_fall = False if has_free_fall: # Reuse Free Fall mode to benefit from special scaling and image persistence self._free_fall_mode = True self._free_fall_img_scale = 1.75 # Keep normal Leak instruction text self.display_text(text="COLLEGARE GLI ATTACCHI PNEUMATICI E PREMERE START PER INIZIARE LA PROVA TENUTA") # Load PERVIETÀ image with fallbacks pervieta_path_candidates = [ "config/warning_images/generic/PERVIETÀ_2.png", "config/warning_images/generic/PERVIETA_2.png", "config/warning_images/generic/pervieta_2.png", ] img_path = next((p for p in pervieta_path_candidates if os.path.exists(p)), None) if img_path is not None: super().visualize(None, img=QPixmap(img_path)) else: super().visualize(None, img=self.status_imgs_full[None]) else: # Ensure Free Fall mode is disabled for other steps self._free_fall_mode = False self._free_fall_img_scale = None self.display_text(text="COLLEGARE GLI ATTACCHI PNEUMATICI E PREMERE START PER INIZIARE LA PROVA TENUTA") super().visualize(None, img=self.status_imgs_full[None]) if step.step_type != "test_freefall_leak": self.template_print_l.setVisible(True) self.template_label.setVisible(True) if self.simulate: QApplication.processEvents() time.sleep(2) # AUTO START SECOND TEST if step.step_type == "leak_2": if self.config["hardware_config"].get("dual_channel", "absent") == "present": self.recipe_written = False time.sleep(1) self.start_b.setEnabled(True) self.start_b.click() else: self.recipe_written = False time.sleep(1) self.start_b.setEnabled(True) return show def stop(self): # disable test loop self.components[self.tester_component].stop_test() self.components[self.tester_component].pause() self.disconnect(self.get_connection) super().stop() self.start_b.setEnabled(False) self.stop_b.setEnabled(False) def get(self, data=None, override=False): if self.done: # avoid processing if completed return if data is None or data[-1] is None: super().get(None, override=override) return data = data[-1] # Note: Connection issues are now handled in the visualize method # to ensure they take precedence over other messages # TESTING if self.simulate: # Different simulation profiles for free-fall vs leak_1/leak_2 is_free_fall = False try: is_free_fall = getattr(self.step, "step_type", None) == "test_freefall_leak" except Exception: # Fallback if step is a dict-like try: is_free_fall = (self.step.get("type") or self.step.get("step_type")) == "test_freefall_leak" except Exception: pass if is_free_fall: # Free-fall requested values (distinct from leak_1) if "--fail-leak" in sys.argv: data[self.tester_component] = { "Running test: active phase": "WAITING START", "Running test: result": "-----TESTING----- fail", "Running test: filling pressure": 3000, "Running test: measured leak": 0, "Running test: pressure at the end of settling": 1831, "Running test: pressure at the end of measure": 1831, } else: data[self.tester_component] = { "Running test: active phase": "WAITING START", "Running test: result": "-----TESTING----- passed", "Running test: filling pressure": 3000, "Running test: measured leak": 0, "Running test: pressure at the end of settling": 1831, "Running test: pressure at the end of measure": 1831, } else: # Standard leak (leak_1/leak_2) simulation — keep distinct values if "--fail-leak" in sys.argv: data[self.tester_component] = { "Running test: active phase": "WAITING START", "Running test: result": "-----TESTING----- fail", # Legacy simulation values for fail "Running test: filling pressure": 5000, "Running test: measured leak": 50, "Running test: pressure at the end of settling": 4999, # Let end-of-measure be derived by saver when possible } else: data[self.tester_component] = { "Running test: active phase": "WAITING START", "Running test: result": "-----TESTING----- passed", # Legacy simulation values for pass "Running test: filling pressure": 5000, "Running test: measured leak": 5, "Running test: pressure at the end of settling": 4999, # Let end-of-measure be derived by saver when possible } if "Running test: result" in data[self.tester_component]: # TEST ENDED, CHECK RESULT result = data[self.tester_component]["Running test: result"] step=self.step.spec.get("autotest", "") if step == "ok_check": ok = type(result) is str and "passed" in result.lower() # AUTOTEST - NO LEAK self.recipe_written=False elif step == "ko_check": if self.tester_component == "tecna_t3": # AUTOTEST - CALIBRATED LEAK - WINDOWED TYPE, OK EXPECTED (TECNA) ok = type(result) is str and "passed" in result.lower() else: # AUTOTEST - CALIBRATED LEAK - NON WINDOWED TYPE, FAIL EXPECTED (FURNESS CONTROLS) ok = type(result) is str and "fail" in result.lower() self.recipe_written = False else: ok = type(result) is str and "passed" in result.lower() # NORMAL TEST, OK EXPECTED # SET DIGITAL OUTPUTS if self.parent.config["hardware_config"].get("external_flush_blow", None) == "present": self.blow_on = True if self.parent.config["hardware_config"].get("dual_channel", None) != "present": self.set_digital_out("ch1_led", False) self.display_text("SCARICO ESTERNO IN CORSO...") self.set_digital_out("flush_led", True) self.set_digital_out("flush_on", True) time.sleep(VALVE_TIME) flush_time = int(self.step.spec.get('ext_flush_time',3)) time.sleep(flush_time) self.set_digital_out("flush_led", False) #self.set_digital_out("flush_on", False) if self.parent.config["hardware_config"].get("dual_channel", None) == "present": self.set_digital_out("out_channel_select", False) self.set_digital_out("in_channel_select", False) self.set_digital_out("ch1_led", False) self.set_digital_out("ch2_led", False) else: #result = None ok = None results={"ok":ok} results.update(data) super().get([{ "time": data.get("time", None), "results": results #"results": { #"ok": ok, #"result": result, #"data": data[self.tester_component], #}, }], override=override, fail=ok is False) def visualize(self, data=None): if data is None: data = {} d = data.get("results", {}).get(self.tester_component, {}) for k, l in { "Running test: active phase": self.test_phase_l, "Real time test pressure output": self.circuit_pressure_l, #"Real time differential pressure output": self.leak_l, "Running test: measured leak": self.leak_l, "Real time pressure line regulator": self.regulated_pressure_l, # "Active alarm flags": self._l, "Running test: test type": self.test_type_l, "Running test: sequence index": self.sequence_index_l, }.items(): v = d.get(k, "-") if type(v) is float: v = round(v, 2) l.setText(str(v)) # Check if there's a connection issue before displaying test status if hasattr(self.components[self.tester_component], 'connection_lost') and self.components[self.tester_component].connection_lost: # Connection is lost, don't display test status # The handle_modbus_error method will display the appropriate message # Just ensure buttons are in the correct state self.start_b.setEnabled(False) self.stop_b.setEnabled(True) return # Check if the connection was just restored if hasattr(self.components[self.tester_component], '_previous_connection_lost') and self.components[self.tester_component]._previous_connection_lost: # Connection was just restored, don't display test status yet # The handle_modbus_error method will display the appropriate message # Just ensure buttons are in the correct state self.start_b.setEnabled(False) self.stop_b.setEnabled(True) return # Check test phase if d.get("Running test: active phase", None) in { "WAITING START", "ATTESA START", "END TEST, WAITING THE START OF A NEW TEST" "FINE TEST", "STANDBY", "PRESSIONE BASSA", "PRESSIONE ALTA", "ERRORE" }: self.start_b.setEnabled(True) self.start_b.setDefault(True) self.start_b.setFocus() self.stop_b.setEnabled(False) else: if self.step is not None and not self.blow_on: if not self.step.spec.get("autotest", False): self.display_text(text="PROVA TENUTA IN CORSO") else: self.display_text(text="AUTOTEST: PROVA TENUTA IN CORSO") self.start_b.setEnabled(False) self.stop_b.setEnabled(True) ok = data.get("results", {}).get("ok", None) # Preserve PERVIETÀ image during Free Fall-related steps if getattr(self, "_free_fall_mode", False): # Keep the current image (likely PERVIETÀ) instead of switching to status icons cur_img = getattr(self, "img", None) if cur_img is not None: super().visualize(data, img=cur_img) else: super().visualize(data, img=self.status_imgs_full.get(ok, self.status_imgs_full[None])) else: super().visualize(data, img=self.status_imgs_full.get(ok, self.status_imgs_full[None])) def display_text(self,text="", bg_color=None,text_color=None): # Display the message in the parent assembly widget if available if self.parent_assembly_widget is not None: self.parent_assembly_widget().set_text(text=text, bg_color=bg_color,text_color=text_color) QApplication.processEvents() time.sleep(0.3) QApplication.processEvents() # If parent_assembly_widget is None, display the message directly in the UI else: # Set the text in a label in the current widget if available for label_name in ["status_l", "test_phase_l"]: if hasattr(self, label_name): label = getattr(self, label_name) if label is not None: # Set background color if specified if bg_color is not None: label.setStyleSheet(f"background-color: {bg_color}; color: {text_color or 'black'};") # Set the text label.setText(text) QApplication.processEvents() break # If no suitable label is found, show a message box for critical errors if bg_color == "red" or bg_color == "yellow": QMessageBox.warning(self, "Avviso", text) QApplication.processEvents() def set_digital_out(self,out_name=None,state=1,component_name="digital_io_flush_blow"): if self.io_ok: bit = int(self.parent.config[component_name][out_name]) ret = self.components[component_name].set_bit_verify(0,bit,state) if not ret: QMessageBox.critical(None, "ERRORE", f"ERRORE I/O DIGITALE - VERIFICARE CONNESSIONE USB") self.io_ok = False def save_last(self): if self.last is None: return @pyqtSlot(bool, str) def handle_modbus_error(self, error, error_message): """ Handle errors received from the Modbus component. If it's a connection error, display a message and stop the test. For other errors, stop the test gracefully. If error is False, it means the connection has been restored. """ if error: if "Connection error" in error_message or "Cannot connect" in error_message: # Connection is lost, display a message and stop the test if self.step is not None and self.step.spec.get("autotest", False): self.display_text( text=f"AUTOTEST: ERRORE DI CONNESSIONE TECNA. TEST INTERROTTO.", bg_color="red", text_color="white" ) else: self.display_text( text=f"ERRORE DI CONNESSIONE TECNA. TEST INTERROTTO.", bg_color="red", text_color="white" ) # Stop the test when a connection error is detected self.parent.fail_cycle() # Always disable start button and enable stop button during connection issues self.start_b.setEnabled(False) self.stop_b.setEnabled(True) else: self.display_text( text=f"ERRORE TECNA TEST INTERROTTO.", bg_color="red", text_color="white" ) self.stop_test() elif error_message == "Connection restored": # Connection has been restored if self.step is not None and self.step.spec.get("autotest", False): self.display_text( text=f"AUTOTEST: CONNESSIONE TECNA RIPRISTINATA. CONTINUARE IL TEST.", bg_color="green", text_color="white" ) else: self.display_text( text=f"CONNESSIONE TECNA RIPRISTINATA. CONTINUARE IL TEST.", bg_color="green", text_color="white" ) # Reset the flag immediately to ensure the reconnection message is displayed self.components[self.tester_component]._previous_connection_lost = False # Force a UI update to ensure the message is displayed QApplication.processEvents() # Always disable start button and enable stop button during connection issues self.start_b.setEnabled(False) self.stop_b.setEnabled(True) def resizeEvent(self, event=None): # First, let the base class handle default behavior try: super().resizeEvent(event) except Exception: # Fallback: ignore if base cannot handle the event pass # Apply Free Fall specific image scaling (half space) if getattr(self, "_free_fall_mode", False): if hasattr(self, "img_l") and hasattr(self, "img") and self.img_l is not None and self.img is not None: try: scale = float(getattr(self, "_free_fall_img_scale", 0.5) or 0.5) except Exception: scale = 0.5 w = max(1, int(self.img_l.width() * scale)) h = max(1, int(self.img_l.height() * scale)) try: self.img_l.setPixmap(self.img.scaled(w, h, Qt.KeepAspectRatio, Qt.SmoothTransformation)) except Exception: # If scaling fails, leave as base-class result pass