import sys import time import weakref import os from PyQt5.QtGui import QPixmap from PyQt5.QtWidgets import QMessageBox, QDialog, QApplication from PyQt5.QtCore import Qt, pyqtSlot, QTimer from ui import Dialog from ui.test_instructions_reminder import Test_Instructions_Reminder from ui.test_test import Test_Test VALVE_TIME=0.5 class Test_Leak(Test_Test): def __init__(self, config,components=None, recipe=None, step=None, pieces=None, run_once=False, reset_on_start=True, enable_override=False,parent=None): super().__init__(components=components, recipe=recipe, step=step, pieces=pieces, run_once=run_once, reset_on_start=reset_on_start, enable_override=enable_override) self.get_connection = None self.io_ok = True self.blow_on = False self.parent=parent self.step=step self.config = config self.recipe_written = False self.start_b.clicked.connect(self.start_test) self.stop_b.clicked.connect(self.stop_test) self.show_instruction_b.setVisible("show_instructions" in self.parent.config["hardware_config"].keys()) self.show_instruction_b.clicked.connect(self.show_instruction) # Connect to the tecna_error_signal to handle connection issues self.components[self.tester_component].tecna_error_signal.connect(self.handle_modbus_error) def show_instruction(self): dialog=Dialog() dialog.setCentralWidget(Test_Instructions_Reminder(recipe=self.parent.recipe,bench_name=self.parent.config.machine_id)) dialog.show() def reset(self): self.components[self.tester_component].stop_test() super().reset() def stop_test(self): self.components[self.tester_component].stop_test() self.display_text(text="PROVA INTERROTTA", bg_color="yellow") time.sleep(1) self.start_b.setEnabled(True) self.stop_b.setEnabled(False) def start_test(self): if self.step.step_type == "test_freefall_leak": self.set_digital_out("first_output", 1) # Set high # print extra labels if self.step.step_type == "leak_1": self.parent.print_extra_labels() # SELECT TEST CHANNEL if self.parent.config["hardware_config"].get("external_flush_blow", None) == "present": if self.parent.config["hardware_config"].get("dual_channel", None) == "present": chan_sel = self.step.spec["chan_sel"] # 0=CH1, 1=CH2 self.set_digital_out("out_channel_select", chan_sel) self.set_digital_out("in_channel_select", chan_sel) time.sleep(VALVE_TIME) # SET LED INDICATORS if chan_sel == 0: self.set_digital_out("ch1_led", True) else: self.set_digital_out("ch2_led", True) self.blow_on=True self.display_text("SOFFIAGGIO IN CORSO...") self.set_digital_out("blow_led",True) self.set_digital_out("blow_on",True) time.sleep(VALVE_TIME) self.set_digital_out("flush_on", True) blow_time=int(self.step.spec.get('ext_blow_time',3)) self.set_digital_out("blow_led", True) time.sleep(blow_time) self.set_digital_out("blow_led", False) self.set_digital_out("blow_on", False) time.sleep(VALVE_TIME) self.set_digital_out("flush_on", False) if self.parent.config["hardware_config"].get("dual_channel", None) != "present": self.set_digital_out("ch1_led", True) self.blow_on = False if not self.simulate: self.components[self.tester_component].start_test() def start(self, recipe=None, step=None, pieces=None): # TESTING if "--test-leak" in sys.argv: self.simulate = True else: self.simulate = False if "--autostart" in sys.argv: self.start_b.setEnabled(True) self.start_b.click() # /TESTING show = super().start(recipe=recipe, step=step, pieces=pieces) if show is False: return show # Autostart if the flag was set by the previous step (e.g., freefall_leak) if getattr(self.parent, 'autostart_next_step', False): self.parent.autostart_next_step = False # Reset the flag self.start_b.setEnabled(True) self.start_b.click() if "leak_2" in [s.step_type for s in self.parent.cycle_steps]: if self.step.step_type=="leak_1": self.test_num_l.setText("1/2") else: self.test_num_l.setText("2/2") else: self.test_num_l.setText("1/1") if self.step.spec.get("autotest", False): self.template_print_l.setText(f"AUTOTEST") else: self.template_print_l.setText(f"{self.parent.print_template}") # Populate labels defensively to support both Leak and Free Fall specs pressure_val = self.step.spec.get('test_pressure', self.step.spec.get('filling_pressure', "")) leak_min_val = self.step.spec.get('test_pressure_qneg', self.step.spec.get('pressure_min', "")) leak_max_val = self.step.spec.get('test_pressure_qpos', self.step.spec.get('pressure_max', "")) self.recipe_pressure_l.setText(f"{pressure_val}") self.leak_min_l.setText(f"{leak_min_val}") self.leak_max_l.setText(f"{leak_max_val}") self.fill_time_l.setText(f"{self.step.spec.get('filling_time', '')}") self.settle_time_l.setText(f"{self.step.spec.get('settling_time', '')}") self.meas_time_l.setText(f"{self.step.spec.get('test_time', '')}") self.valore_PID_l.setText(f"{self.step.spec.get('pid_pressure_correction', '')}") # SETUP TEST LOOP if self.step.spec.get("autotest", False): # IF AUTOTESTING UPLOAD RECIPE EVERY TIME self.recipe_written = False if self.parent.config["hardware_config"].get("second_leak_test", "absent") == "present": # IF SECOND LEAK TEST ENABLED UPLOAD RECIPE EVERY TIME self.recipe_written = False if not self.recipe_written: self.components[self.tester_component].write_recipe(self.recipe, self.step) self.recipe_written=True self.get_connection = self.components[self.tester_component].out.connect(self.get) self.components[self.tester_component].resume() if self.parent_assembly_widget is not None: self.display_text(text="ATTENDERE") self.start_b.setEnabled(False) self.stop_b.setEnabled(False) if self.step.spec.get("autotest", False) == "ok_check": self.template_print_l.setVisible(False) self.template_label.setVisible(False) self.display_text(text="AUTOTEST: RIMUOVERE FUGA CALIBRATA E PREMERE START PER INIZIARE LA PROVA TENUTA", bg_color="blue", text_color="white") super().visualize(None, img=self.status_imgs_full["calibrated-leak-remove"]) elif self.step.spec.get("autotest", False) == "ko_check": self.template_print_l.setVisible(False) self.template_label.setVisible(False) self.display_text( text="AUTOTEST: COLLEGARE TUBO-TUBO + FUGA CALIBRATA E PREMERE START PER INIZIARE LA PROVA TENUTA DI PROVA", bg_color="blue", text_color="white") super().visualize(None, img=self.status_imgs_full["calibrated-leak"]) else: if step.step_type == "test_freefall_leak": # Hide parameters not relevant during free-fall for name in [ "label_18", # Tempo di riempimento "fill_time_l", "label_19", # Tempo di assestamento "settle_time_l", "label_22", # Tempo di prova "meas_time_l", "template_label", # Etichetta selezionata (caption) "template_print_l", # Etichetta selezionata (value) "valore_PID", # Valore PID (caption) "valore_PID_l", # Valore PID (value) "label_17", # unit 's' for fill time "label_20", # unit 's' for settling/measuring time "label_21", # possibly another unit/aux label to hide in free fall ]: if hasattr(self, name): getattr(self, name).setVisible(False) self.display_text(text="USARE IL SISTEMA DI FLUSSAGGIO. AL TERMINE POSIZIONARE IL PEZZO PER LA PROVA TENUTA") # Show placeholder image for free-fall super().visualize(None, img=self.status_imgs_full[None]) else: # If the recipe contains a Free Fall test, show a default image even on Leak steps try: has_free_fall = any(getattr(s, "step_type", None) == "test_freefall_leak" for s in (self.parent.cycle_steps or [])) except Exception: has_free_fall = False if has_free_fall: # Keep normal Leak instruction text self.display_text(text="COLLEGARE GLI ATTACCHI PNEUMATICI E PREMERE START PER INIZIARE LA PROVA TENUTA") # Load default image super().visualize(None, img=self.status_imgs_full[None]) else: # Ensure Free Fall mode is disabled for other steps self._free_fall_mode = False self._free_fall_img_scale = None self.display_text(text="COLLEGARE GLI ATTACCHI PNEUMATICI E PREMERE START PER INIZIARE LA PROVA TENUTA") super().visualize(None, img=self.status_imgs_full[None]) if step.step_type != "test_freefall_leak": self.template_print_l.setVisible(True) self.template_label.setVisible(True) if self.simulate: QApplication.processEvents() time.sleep(2) # AUTO START SECOND TEST if step.step_type == "leak_2": if self.config["hardware_config"].get("dual_channel", "absent") == "present": self.recipe_written = False time.sleep(1) self.start_b.setEnabled(True) self.start_b.click() else: self.recipe_written = False time.sleep(1) self.start_b.setEnabled(True) return show def stop(self): # disable test loop self.components[self.tester_component].stop_test() self.components[self.tester_component].pause() self.disconnect(self.get_connection) if self.step.step_type == "test_freefall_leak": self.set_digital_out("first_output", 0) # Set low when test stops super().stop() self.start_b.setEnabled(False) self.stop_b.setEnabled(False) def get(self, data=None, override=False): if self.done: # avoid processing if completed return if data is None or data[-1] is None: super().get(None, override=override) return data = data[-1] # Note: Connection issues are now handled in the visualize method # to ensure they take precedence over other messages # TESTING if self.simulate: # Different simulation profiles for free-fall vs leak_1/leak_2 is_free_fall = False try: is_free_fall = getattr(self.step, "step_type", None) == "test_freefall_leak" except Exception: # Fallback if step is a dict-like try: is_free_fall = (self.step.get("type") or self.step.get("step_type")) == "test_freefall_leak" except Exception: pass if is_free_fall: # Free-fall requested values (distinct from leak_1) if "--fail-leak" in sys.argv: data[self.tester_component] = { "Running test: active phase": "WAITING START", "Running test: result": "-----TESTING----- fail", "Running test: filling pressure": 3000, "Running test: measured leak": 0, "Running test: pressure at the end of settling": 1831, "Running test: pressure at the end of measure": 1831, } else: data[self.tester_component] = { "Running test: active phase": "WAITING START", "Running test: result": "-----TESTING----- passed", "Running test: filling pressure": 3000, "Running test: measured leak": 0, "Running test: pressure at the end of settling": 1831, "Running test: pressure at the end of measure": 1831, } else: # Standard leak (leak_1/leak_2) simulation — keep distinct values if "--fail-leak" in sys.argv: data[self.tester_component] = { "Running test: active phase": "WAITING START", "Running test: result": "-----TESTING----- fail", # Legacy simulation values for fail "Running test: filling pressure": 5000, "Running test: measured leak": 50, "Running test: pressure at the end of settling": 4999, # Let end-of-measure be derived by saver when possible } else: data[self.tester_component] = { "Running test: active phase": "WAITING START", "Running test: result": "-----TESTING----- passed", # Legacy simulation values for pass "Running test: filling pressure": 5000, "Running test: measured leak": 5, "Running test: pressure at the end of settling": 4999, # Let end-of-measure be derived by saver when possible } if "Running test: result" in data[self.tester_component]: # TEST ENDED, CHECK RESULT result = data[self.tester_component]["Running test: result"] step=self.step.spec.get("autotest", "") if step == "ok_check": ok = type(result) is str and "passed" in result.lower() # AUTOTEST - NO LEAK self.recipe_written=False elif step == "ko_check": if self.tester_component == "tecna_t3": # AUTOTEST - CALIBRATED LEAK - WINDOWED TYPE, OK EXPECTED (TECNA) ok = type(result) is str and "passed" in result.lower() else: # AUTOTEST - CALIBRATED LEAK - NON WINDOWED TYPE, FAIL EXPECTED (FURNESS CONTROLS) ok = type(result) is str and "fail" in result.lower() self.recipe_written = False else: ok = type(result) is str and "passed" in result.lower() # NORMAL TEST, OK EXPECTED # If freefall test is OK, autostart the next test (leak_1) if self.step.step_type == "test_freefall_leak" and ok: # The parent's `get` will advance to the next step. # When the new step (leak_1) starts, we want it to auto-click. # We can't click here because the new step widget is not yet created. # So we set a flag on the parent. if self.parent: self.parent.autostart_next_step = True # SET DIGITAL OUTPUTS if self.parent.config["hardware_config"].get("external_flush_blow", None) == "present": self.blow_on = True if self.parent.config["hardware_config"].get("dual_channel", None) != "present": self.set_digital_out("ch1_led", False) self.display_text("SCARICO ESTERNO IN CORSO...") self.set_digital_out("flush_led", True) self.set_digital_out("flush_on", True) time.sleep(VALVE_TIME) flush_time = int(self.step.spec.get('ext_flush_time',3)) time.sleep(flush_time) self.set_digital_out("flush_led", False) #self.set_digital_out("flush_on", False) if self.parent.config["hardware_config"].get("dual_channel", None) == "present": self.set_digital_out("out_channel_select", False) self.set_digital_out("in_channel_select", False) self.set_digital_out("ch1_led", False) self.set_digital_out("ch2_led", False) else: #result = None ok = None results={"ok":ok} results.update(data) super().get([{ "time": data.get("time", None), "results": results #"results": { #"ok": ok, #"result": result, #"data": data[self.tester_component], #}, }], override=override, fail=ok is False) def visualize(self, data=None): if data is None: data = {} d = data.get("results", {}).get(self.tester_component, {}) for k, l in { "Running test: active phase": self.test_phase_l, "Real time test pressure output": self.circuit_pressure_l, #"Real time differential pressure output": self.leak_l, "Running test: measured leak": self.leak_l, "Real time pressure line regulator": self.regulated_pressure_l, # "Active alarm flags": self._l, "Running test: test type": self.test_type_l, "Running test: sequence index": self.sequence_index_l, }.items(): v = d.get(k, "-") if type(v) is float: v = round(v, 2) l.setText(str(v)) # Check if there's a connection issue before displaying test status if hasattr(self.components[self.tester_component], 'connection_lost') and self.components[self.tester_component].connection_lost: # Connection is lost, don't display test status # The handle_modbus_error method will display the appropriate message # Just ensure buttons are in the correct state self.start_b.setEnabled(False) self.stop_b.setEnabled(True) return # Check if the connection was just restored if hasattr(self.components[self.tester_component], '_previous_connection_lost') and self.components[self.tester_component]._previous_connection_lost: # Connection was just restored, don't display test status yet # The handle_modbus_error method will display the appropriate message # Just ensure buttons are in the correct state self.start_b.setEnabled(False) self.stop_b.setEnabled(True) return # Check test phase if d.get("Running test: active phase", None) in { "WAITING START", "ATTESA START", "END TEST, WAITING THE START OF A NEW TEST" "FINE TEST", "STANDBY", "PRESSIONE BASSA", "PRESSIONE ALTA", "ERRORE" }: self.start_b.setEnabled(True) self.start_b.setDefault(True) self.start_b.setFocus() self.stop_b.setEnabled(False) else: if self.step is not None and not self.blow_on: if not self.step.spec.get("autotest", False): self.display_text(text="PROVA TENUTA IN CORSO") else: self.display_text(text="AUTOTEST: PROVA TENUTA IN CORSO") self.start_b.setEnabled(False) self.stop_b.setEnabled(True) ok = data.get("results", {}).get("ok", None) # Preserve PERVIETÀ image during Free Fall-related steps #if getattr(self, "_free_fall_mode", False): # Keep the current image (likely PERVIETÀ) instead of switching to status icons #cur_img = getattr(self, "img", None) #if cur_img is not None: #super().visualize(data, img=cur_img) #else: #super().visualize(data, img=self.status_imgs_full.get(ok, self.status_imgs_full[None])) #else: #super().visualize(data, img=self.status_imgs_full.get(ok, self.status_imgs_full[None])) def display_text(self,text="", bg_color=None,text_color=None): # Display the message in the parent assembly widget if available if self.parent_assembly_widget is not None: self.parent_assembly_widget().set_text(text=text, bg_color=bg_color,text_color=text_color) QApplication.processEvents() time.sleep(0.3) QApplication.processEvents() # If parent_assembly_widget is None, display the message directly in the UI else: # Set the text in a label in the current widget if available for label_name in ["status_l", "test_phase_l"]: if hasattr(self, label_name): label = getattr(self, label_name) if label is not None: # Set background color if specified if bg_color is not None: label.setStyleSheet(f"background-color: {bg_color}; color: {text_color or 'black'};") # Set the text label.setText(text) QApplication.processEvents() break # If no suitable label is found, show a message box for critical errors if bg_color == "red" or bg_color == "yellow": QMessageBox.warning(self, "Avviso", text) QApplication.processEvents() def set_digital_out(self,out_name=None,state=1,component_name="digital_io"): if self.io_ok: bit = int(self.parent.config[component_name][out_name]) ret = self.components[component_name].set_bit_verify(0,bit,state) if not ret: QMessageBox.critical(None, "ERRORE", f"ERRORE I/O DIGITALE - VERIFICARE CONNESSIONE USB") self.io_ok = False def save_last(self): if self.last is None: return @pyqtSlot(bool, str) def handle_modbus_error(self, error, error_message): """ Handle errors received from the Modbus component. If it's a connection error, display a message and stop the test. For other errors, stop the test gracefully. If error is False, it means the connection has been restored. """ if error: if "Connection error" in error_message or "Cannot connect" in error_message: # Connection is lost, display a message and stop the test if self.step is not None and self.step.spec.get("autotest", False): self.display_text( text=f"AUTOTEST: ERRORE DI CONNESSIONE TECNA. TEST INTERROTTO.", bg_color="red", text_color="white" ) else: self.display_text( text=f"ERRORE DI CONNESSIONE TECNA. TEST INTERROTTO.", bg_color="red", text_color="white" ) # Stop the test when a connection error is detected self.parent.fail_cycle() # Always disable start button and enable stop button during connection issues self.start_b.setEnabled(False) self.stop_b.setEnabled(True) else: self.display_text( text=f"ERRORE TECNA TEST INTERROTTO.", bg_color="red", text_color="white" ) self.stop_test() elif error_message == "Connection restored": # Connection has been restored if self.step is not None and self.step.spec.get("autotest", False): self.display_text( text=f"AUTOTEST: CONNESSIONE TECNA RIPRISTINATA. CONTINUARE IL TEST.", bg_color="green", text_color="white" ) else: self.display_text( text=f"CONNESSIONE TECNA RIPRISTINATA. CONTINUARE IL TEST.", bg_color="green", text_color="white" ) # Reset the flag immediately to ensure the reconnection message is displayed self.components[self.tester_component]._previous_connection_lost = False # Force a UI update to ensure the message is displayed QApplication.processEvents() # Always disable start button and enable stop button during connection issues self.start_b.setEnabled(False) self.stop_b.setEnabled(True)