import copy import logging import os import sys import weakref from datetime import datetime, timedelta from distutils.util import change_root from PyQt5.QtCore import QTimer, pyqtSlot, pyqtSignal from PyQt5.QtWidgets import QMessageBox from lib.db import Archive, Recipes, Users from lib.helpers import get_shift from lib.helpers.step import Step from playhouse.shortcuts import model_to_dict from ui.barcode_recipe_selection import Barcode_Recipe_Selection from ui.helpers import replace_widget from ui.recipe_selection import Recipe_Selection from ui.test_assembly import Test_Assembly from ui.test_barcodes import Test_Barcodes from ui.test_connector import Test_Connector from ui.test_count import Test_Count from ui.test_count_end import Test_Count_End from ui.test_fail import Test_Fail from ui.test_instructions import Test_Instructions from src.ui.test_pipe_cutter import Test_Pipe_Cutter from ui.test_leak import Test_Leak from ui.test_resistance import Test_Resistance from ui.test_screws import Test_Screws from ui.test_vision import Test_Vision from ui.test_warning_img import Test_Warning_Img from ui.widget import Widget from components import ArchiveSynchronizer class Test(Widget): def __init__(self, config, components=None, main_window=None): super().__init__() self.autotest_timer = None self.main_window = main_window self.config = config self.components = components # GET LOGGER self.log = logging.getLogger("Test") # SHOW MACHINE DESCRIPTION self.machine_description_l.setText(self.config.get("machine", {}).get("description", "N/A")) # SHOW USERNAME session = Users.get_session() self.original_username = session.username # Check if we should set the label to "ADMIN" if session.username.upper() == "ADMIN": self.user_l.setText("ADMIN") session._is_admin = True else: self.user_l.setText(session.username) session._is_admin = False if session.is_admin: self.user_l.setStyleSheet("QLabel { color: red; }") else: self.user_l.setStyleSheet("") # Store original admin status self.had_admin = session.is_admin self.flag_label.setVisible(False) if len(sys.argv) > 1: self.flag_label.setVisible(True) self.update_label_with_args() # Initial update else: self.flag_label.setVisible(False) self.active_errors = [] # List to hold current errors (type: tuples of (message, is_error)) self.current_error_index = 0 # Keeps track of the current error index during alternation # Timer for alternating errors self.error_timer = QTimer() self.error_timer.setInterval(2000) # Fire every 2 seconds self.error_timer.timeout.connect(self.display_current_error) # Connect the timer to the display logic # SHOW AND UPDATE TIME CLOCK self.refresh_time(init=True) # INIT RECIPE self.recipe = None if "fixture_id" in self.components: self.rfid = self.components["fixture_id"] self.rfid.rfid_error_signal.connect(self.handle_rfid_error) if "tecna_t3" in self.components: self.tecna = self.components["tecna_t3"] #self.tecna.tecna_error_signal.connect(self.handle_modbus_error) self.error_label.setText("") self.error_label.setStyleSheet("QLabel { color: red; }") if self.config["hardware_config"]["barcode_recipe_selection"] == "present": self.recipe_selection_mode = "barcode" else: self.recipe_selection_mode = "table" self.step = None self.tester_component = None if self.config["hardware_config"]["tecna_t3"] == "present": self.tester_component = "tecna_t3" #self.components["tecna_t3"].tecna_error_signal.connect(self.handle_modbus_error) elif self.config["hardware_config"]["furness_controls"] == "present": self.tester_component = "furness_control" self.unsupported_steps = set() self.steps_dependencies = { "count": set(), "connector": {"multicomp", }, "instruction": {"digital_io"}, "screws": {"screwdriver", "tecna_t3", }, "resistance": {"multicomp", }, "leak_1": {self.tester_component, }, "leak_2": {self.tester_component, }, "pipe_cutter": {"pipe_cutter"}, "vision": {("uvc_camera", "galaxy_camera","hikrobot_sc"), "vision", "vision_saver", }, # "neo_pixels", }, "print": {"label_printer_2"} if self.config["hardware_config"]["label_printer"] != "present" else {"label_printer"}, } self.unsupported_steps = set() for step_name, dependencies in self.steps_dependencies.items(): for dependency in dependencies: if isinstance(dependency, tuple): # if all([d not in self.components or not self.components[d].ready for d in dependency]): if all([d not in self.components for d in dependency]): self.unsupported_steps.add(step_name) else: # if dependency not in self.components or not self.components[dependency].ready: if dependency not in self.components: self.unsupported_steps.add(step_name) # INIT PIECES COUNTER self.pieces = {"ok": 0, "ko": 0} # INIT CYCLE STATES self.cycle_available_steps = { # "assembly_1": Test_Assembly(img_path=self.select_step_img("assembly_1"), text=u"INSERIRE SENSORE", widget=None), "barcodes": Test_Assembly(img_path=self.select_step_img("scan"), text=u"LEGGERE IL BARCODE DEL PEZZO DA COLLAUDARE", widget=Test_Barcodes()), "connector": Test_Assembly(img_path=self.select_step_img("scan"), text=u"COLLEGARE IL CONNETTORE INDICATO AL PEZZO E LEGGERE IL SUO BARCODE", widget=Test_Connector(run_once=True)), "count": Test_Assembly(img_path=None, text=u"INSERIRE IL NUMERO DI PEZZI ATTESI PER IL LOTTO", widget=Test_Count(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces, run_once=True)), "warning_img": Test_Assembly(img_path=None, text=u"ATTENZIONE - PER QUESTO CODICE ESEGUIRE LE OPERAZIONI INDICATE IN FIGURA", widget=Test_Warning_Img(components=self.components, recipe=self.recipe,bench_name=self.config["machine"]["image_for_warning"], step=self.step, run_once=True)), "count_end": Test_Assembly(img_path=None, text=u"LOTTO TERMINATO, PREMERE CONTINUA PERCOMINCIARNE UNO NUOVO", widget=Test_Count_End(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)), "done": Test_Assembly(img_path=self.select_step_img("success"), text=u"COLLAUDO COMPLETATO", widget=None), "emergency": Test_Assembly(img_path=self.select_step_img("reset_emergency"), text=u"EMERGENZA INTERVENUTA - RIPRISTINARE PULSANTE E SELEZIONARE \"RESET EMERGENZA\" DAL MEN\u00d9 \"STRUMENTI\"", widget=None), "fail": Test_Assembly(img_path=self.select_step_img("fail"), text=u"CICLO INTERROTTO, PREMERE CONTINUA PER COMINCIARE UN NUOVO CICLO", widget=Test_Fail(parent=self)), "blow": Test_Assembly(img_path=None, text=u"SOFFIAGGIO TUBO IN CORSO - ATTENDERE...", widget=Test_Warning_Img(components=self.components, recipe=self.recipe, step=self.step)), "leak_1": Test_Assembly(img_path=None, text=None, widget=Test_Leak(config=self.config,components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces, parent=self)) if self.config["hardware_config"]["tecna_t3"] != "absent" or self.config["hardware_config"]["furness_controls"] !="absent" else None, "leak_2": Test_Assembly(img_path=None, text=None, widget=Test_Leak(config=self.config,components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces, parent=self)) if self.config["hardware_config"]["tecna_t3"] != "absent" or self.config["hardware_config"]["furness_controls"] != "absent" else None, "flush": Test_Assembly(img_path=None, text=u"SCARICO ARIA IN CORSO - ATTENDERE...", widget=Test_Warning_Img(components=self.components, recipe=self.recipe, step=self.step)), "instruction": Test_Assembly(img_path=None, text=u"ESEGUIRE LE OPERAZIONI DI MONTAGGIO INDICATE IN FIGURA", widget=Test_Instructions(config=self.config,components=self.components, recipe=self.recipe, bench_name=self.config.machine_id, step=self.step)), "pipe_cutter": Test_Assembly(img_path=None, text=u"ATTENZIONE TAGLIO CORRUGATO IN CORSO",widget=Test_Pipe_Cutter(config=self.config, components=self.components,recipe=self.recipe, bench_name=self.config.machine_id,step=self.step)), "instruction_extra": Test_Assembly(img_path=None, text=u"ESEGUIRE LE OPERAZIONI DI MONTAGGIO EXTRA INDICATE IN FIGURA", widget=Test_Instructions(config=self.config, components=self.components,recipe=self.recipe, bench_name=self.config.machine_id,step=self.step)), "piece_removal": Test_Assembly(img_path=None, text=u"RIMUOVERE IL PEZZO APRENDO TUTTE LE CHIUSURE", widget=Test_Instructions(config=self.config,components=self.components, recipe=self.recipe, bench_name=self.config.machine_id, step=self.step)), "print": Test_Assembly(img_path=self.select_step_img("print"), text=u"STAMPA ETICHETTA IN CORSO", widget=None), "resistance": Test_Assembly(img_path=None, text=u"COLLEGARE CONNETTORE ELETTRICO PER EFFETTUARE PROVA RESISTENZA", widget=Test_Resistance(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)), "screws": Test_Assembly(img_path=None, text=u"AVVITARE TUTE LE VITI COME INDICATO", widget=Test_Screws(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)), "select_recipe": Test_Assembly(img_path=None, text=u"SELEZIONARE IL CODICE DA COLLAUDARE", widget=Recipe_Selection(config=self.config, unsupported_steps=self.unsupported_steps)), "barcode_recipe_selection": Test_Assembly(img_path=self.select_step_img("scan"), text=u"LEGGERE IL BARCODE SULLA DIMA DEL COMPONENTE DA COLLAUDARE", widget=Barcode_Recipe_Selection(parent=self)), "vision": Test_Assembly(img_path=None, text=u"VERIFICARE CONTROLLO CON TELECAMERA", widget=Test_Vision(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)), "wait": Test_Assembly(img_path=self.select_step_img("wait"), text=u"ATTENDERE - PAUSA INTER CICLO", widget=None), None: Test_Assembly(img_path=self.select_step_img("warning"), text=u"ATTENZIONE - LA RICETTA SELEZIONATA NON CONTIENE FASI DI TEST", widget=None), } self.cycle_steps = None self.cycle_index = -1 self.print_step = None self.last_label = None self.require_discard_piece = False # SETUP AUTOTEST self.autotest_request = False self.autotesting = False self.autotesting_reason = None self.autotest_cycle_steps = None if "--no-autotest" in sys.argv: self.setStyleSheet("background-color: red;") else: self.setStyleSheet("background-color: white;") if "--no-autotest" not in sys.argv: if "--test-autotest" in sys.argv: self.autotest_period = int(60 * 1000) # 1 min else: #self.autotest_period = int(8.5 * 60 * 60 * 1000)# 8.5 HOURS self.autotest_period = int(4 * 60 * 60 * 1000)# 4 HOURS # self.autotest_period = 12 * 60 * 60 * 1000 # 12 HOURS # if not self.config["autotest_done"]: # self.request_autotest("init") else: self.autotest_period = None # INIT TEST DATA self.data = {"ok": True, "overridden": False} self.archived = None # CONNECT CYCLE CONTROLS self.cancel_b.clicked.connect(self.fail_cycle) self.change_recipe_b.clicked.connect(self.change_recipe) self.reset_count_b.clicked.connect(self.reset_count) for step_name, w in self.cycle_available_steps.items(): if hasattr(w, "ok"): # custom ok handlers should call next again if isinstance(w.widget, Recipe_Selection): w.ok.connect(self.set_recipe) else: w.ok.connect(lambda data=None, step_namel=step_name, selfie=weakref.ref(self): selfie().set_step(step_namel, data)) if hasattr(w, "ko"): w.ko.connect(self.fail_cycle) # CUSTOM STEP CONNECTIONS self.cycle_available_steps["count"].ok.connect(self.cycle_available_steps["count_end"].widget.set_amount) # self.cycle_available_steps["warning_img"].ok.connect(self.cycle_available_steps["warning_img"].widget.set_done) if "fixture_id" in self.components.keys(): self.components["fixture_id"].new_id_signal.connect(self.load_recipe_from_rfid) self.components["fixture_id"].rfid_error_signal.connect(self.handle_rfid_error) self.tag_loaded_recipe = self.main_window.tag_loaded_recipe # TESTING if "--test" in sys.argv: self.testing = True else: self.testing = False # /TESTING # START CYCLE self.next_timer = QTimer() self.next_timer.setSingleShot(True) self.next_timer.timeout.connect(self.next) self.next() def refresh_time(self, init=False): if init and not hasattr(self, 'time_timer'): self.time_timer = QTimer() self.time_timer.setSingleShot(False) self.time_timer.timeout.connect(self.refresh_time) t = datetime.now() self.time_l.setText("{d}/{mo}/{y}\n{h}:{m:02d}".format( y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute )) # Always restart the timer, ensuring intervals are set correctly if hasattr(self, 'time_timer'): # Safeguard for uninitialized timer self.time_timer.start(30000) def select_step_img(self, step, suffix=None): img_path = "./src/ui/imgs" names = [] if suffix is not None: names.append(f"{step}_{suffix}_{self.config.machine_id}") names.append(f"{step}_{suffix}") names.append(f"{step}_{self.config.machine_id}") names.append(f"{step}") for name in names: for ext in ["png", "jpg"]: path = f"{img_path}/{name}.{ext}" if os.path.isfile(path): return path raise FileNotFoundError(f"No image was found for step {step}") def change_recipe(self): self.next(action="change_recipe") def set_recipe_mode_table(self): self.recipe_selection_mode = "table" self.change_recipe() def set_recipe_mode_barcode(self): self.recipe_selection_mode = "barcode" self.change_recipe() def cut_tube(self): self.components["pipe_cutter"].to_calibrate() self.components["pipe_cutter"].start_cutting() def reprint_label(self): self.print(self.last_label, self.print_step.spec.get("template", "EtichettaR5")) def fail_cycle(self): self.next(action="fail") def setCentralWidget(self, widget): replace_widget(self, "centralWidget", widget) def enable_temp_admin(self): """Enable temporary admin privileges for the current user""" session = Users.get_session() if session is None: self.log.warning("Cannot enable temporary admin privileges: No active session") return False # Save the current label text before changing it self.saved_user_label_text = self.user_l.text() # Enable temporary admin privileges using the Session class method session.enable_temp_admin() # Update UI to reflect admin status self.user_l.setText("ADMIN") self.user_l.setStyleSheet("QLabel { color: red; }") self.log.info(f"Temporary admin privileges enabled for user: {session.username}") return True def disable_temp_admin(self): """Disable temporary admin privileges and restore original user status""" session = Users.get_session() if session is None: self.log.warning("Cannot disable temporary admin privileges: No active session") return False # Disable temporary admin privileges using the Session class method session.disable_temp_admin() # Restore original UI # Use the saved label text if available, otherwise fall back to original username if hasattr(self, 'saved_user_label_text'): self.user_l.setText(self.saved_user_label_text) else: self.user_l.setText(self.original_username) # Set style based on original admin status if self.had_admin: self.user_l.setStyleSheet("QLabel { color: red; }") else: self.user_l.setStyleSheet("") self.log.info(f"Temporary admin privileges disabled for user: {session.username}") return True def request_autotest(self, reason): # you can cancel the request calling request_autotest(False) if "--no-autotest" not in sys.argv: self.log.info(f"cycle request autotest: reason: {reason!r} autotest_request: {self.autotest_request!r}") if reason in ("init", "login"): self.autotest_timer = QTimer() self.autotest_timer.setSingleShot(False) self.autotest_timer.timeout.connect(self.request_periodic_autotest) if self.autotest_period is not None: self.autotest_timer.start(self.autotest_period) reason = "boot" if not hasattr(self, "refresh_timer"): self.refresh_timer = QTimer() self.refresh_timer.setSingleShot(False) self.refresh_timer.timeout.connect(self.refresh_time) self.refresh_timer.start(30000) if self.config["autotest_leak"]["enabled"] == "true": self.autotest_request = reason else: self.autotest_request = False else: self.log.info(f"Autotest request ignored (reason: {reason!r}) --no-autotest flag detected") if reason == "logout": self.next(action="abort") def request_periodic_autotest(self): self.request_autotest("periodic") def next(self, action=None): if self.step is not None: self.log.info(f"cycle step: {self.step.step_type!r} action: {action!r} current index:{self.cycle_index}") else: self.log.info(f"cycle step: {self.step!r} action: {action!r} current index:{self.cycle_index}") current_w = self.centralWidget if hasattr(current_w, "stop"): self.log.info(f"stopping widget {self.step.step_type}") current_w.stop() if action == "change_recipe": self.log.info(f"cycle next: action: {action!r}") self.set_recipe(recipe=None) if self.config["hardware_config"]["tecna_t3"] == "present" or self.config["hardware_config"][ "furness_controls"] == "present": self.cycle_available_steps["leak_1"].widget.recipe_written = False self.cycle_available_steps["leak_2"].widget.recipe_written = False self.step = Step(step_type="select_recipe") self.cycle_index = -1 self.recipe = None self.cycle_steps = None # COUNT RESET self.pieces["ok"] = 0 self.pieces["ko"] = 0 elif action in ("fail", "abort"): self.log.info(f"cycle next: action: {action!r}") # FAIL AND RESTART TEST self.step = Step(step_type="fail") self.cycle_index = -1 # COUNT FAIL if action == "fail": self.done(ok=False) elif action is not None: raise NotImplementedError(f"cycle next: action {action!r} is not a valid action") # Set next cycle step normally if no action sets it explicitly if self.recipe is None or self.cycle_steps is None: # If recipe not set: select_recipe if self.recipe_selection_mode == "barcode": self.log.info(f"returning to barcode recipe selection") self.step = Step(step_type="barcode_recipe_selection") else: self.log.info(f"returning to recipe selection table") self.step = Step(step_type="select_recipe") elif action is None: if ( self.autotest_request is not False and self.autotest_cycle_steps is not None and not self.autotesting and (self.cycle_index == -1 or self.cycle_index + 1 >= len(self.cycle_steps)) ): # If autotest was requested # and if cycle_steps is not started or has ended self.cycle_index = -1 self.autotesting = True self.autotesting_reason = self.autotest_request self.autotest_request = False self.log.info(f"Autotest requested (reason: {self.autotesting_reason})") if self.autotest_period is not None: # Reset periodic autotest timer self.time_timer.start(self.autotest_period) self.require_discard_piece = False if self.autotesting: if self.cycle_index + 1 < len(self.autotest_cycle_steps): # Go to next step in autotest_cycle_steps self.cycle_index += 1 self.step = self.autotest_cycle_steps[self.cycle_index] else: # When autotest ends, check if it needs to restart if not self.data.get("ok"): # Autotest failed self.log.warning("Restarting autotest from the first step due to failure.") self.cycle_index = 0 self.step = self.autotest_cycle_steps[self.cycle_index] # Restart from the first step else: # Autotest succeeded; proceed to post-autotest actions self.autotesting = False if self.autotesting_reason == "logout": Users.logout() self.main_window.open_login() else: t = datetime.now() self.last_at_l.setText( "{d}/{mo}/{y} {h}:{m}".format(y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute)) t += timedelta(seconds=int(self.autotest_period / 1000)) self.next_at_l.setText( "{d}/{mo}/{y} {h}:{m}".format(y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute)) self.autotesting_reason = None self.cycle_index = -1 self.config["autotest_done"] = True if not self.autotesting: if len(self.cycle_steps): # Go to next step in cycle_steps self.cycle_index = (self.cycle_index + 1) % len(self.cycle_steps) self.step = self.cycle_steps[self.cycle_index] else: self.cycle_index = -1 self.step = Step(step_type=None) # Enable/disable cycle controls self.change_recipe_b.setEnabled(self.recipe is not None) self.cancel_b.setEnabled(self.step.step_type is not None and self.step.step_type not in { "emergency", "fail", "select_recipe", "wait", }) self.log.info(f"next cycle step: {self.step.step_type!r}") # INIT TEST DATA IF STARTING CYCLE OR RESET IS NEEDED if self.cycle_index == 0 and not self.autotesting: # Initialize test data for normal cycles self.data = {"ok": True, "overridden": False} self.archived = None if self.recipe is not None and "recipe" not in self.data: self.data["recipe"] = model_to_dict(self.recipe) # Remaining logic for updating widgets, starting timers, etc. w = self.cycle_available_steps[self.step.step_type] show = None if self.step.step_type == "leak_2": self.setCentralWidget(w) # Pre-show UI for leak_2 if hasattr(w, "start"): show = w.start(recipe=self.recipe, step=self.step, pieces=self.pieces) if show is not False and w is not current_w: self.setCentralWidget(w) elif show is False: self.next_timer.start(0) if self.step.step_type == "done": self.archived = self.done() self.last_label = copy.deepcopy(self.archived) self.next_timer.start(500) elif self.step.step_type == "print": compiled_label = self.print(self.archived, self.step.spec.get("template", "EtichettaR5")) self.archived.test_data.update({"print": compiled_label}) self.archived.test_data.update({"print_template": self.print_template}) self.archived.test_data.update({"barcode_stampato": self.printed_barcode}) self.archived.label = compiled_label self.log.info(f"Label printed. Saving...") # self.archived.save() self.main_window.main_window.run_request.emit(self.archived.save, [], {}) self.next_timer.start(500) elif self.step.step_type == "wait": self.next_timer.start(500) # Update display self.update_count_display() def reset_count(self): # COUNT RESET self.pieces["ok"] = 0 self.pieces["ko"] = 0 self.update_count_display() def update_count_display(self): self.pieces_count_l.setText(f"{self.pieces['ok']} OK / {self.pieces['ko']} NOK / {sum(self.pieces.values())} TOT") def set_recipe(self, recipe=None): self.recipe = recipe self.config.active_recipe = recipe inserted_instruction = False self.require_discard_piece = False if self.recipe is None: self.cycle_steps = None self.autotest_cycle_steps = None else: steps = self.recipe.get_steps() skip = set() print_found = False count_found = False # create step sequence list barcode_names = ['serial', 'barcode_input_2', 'barcode_input_3', 'barcode_input_4', 'barcode_input_5'] for i, step in enumerate(steps): if step.step_type == "barcodes": n_pieces_value = step.spec.get("n_pieces") # Fix: Handle empty string and None n_pieces = 1 if n_pieces_value in (None, '') else n_pieces_value try: n_pieces_adapted = int(n_pieces) except ValueError: self.log.error(f"Invalid value for n_pieces: {n_pieces}") # Log the error n_pieces_adapted = 1 # Default to 1 if conversion fails if n_pieces_adapted == 1: step.spec["barcode_name"] = 'serial' else: step.spec["barcode_name"] = barcode_names[(n_pieces_adapted - 1) % len(barcode_names)] n_pieces_adapted -= 1 new_barcode_step = copy.deepcopy(step) new_barcode_step.spec["n_pieces"] = str(n_pieces_adapted) steps.insert(i + 1, new_barcode_step) if i in skip: continue if step.step_type == "vision": self.components["vision"].config_changed(vision_recipe=self.recipe.name) if step.step_type == "count": count_found = True if "warning_img" in step.spec: if step.spec["warning_img"]: steps.insert(i, Step(step_type="warning_img", spec={"warning_img": step.spec["warning_img"]})) skip.add(i + 1) if "assembly" in step.spec: if step.spec["assembly"]: steps.insert(i, Step(step_type="instructions", spec={})) skip.add(i + 1) if "require_discard_piece" in step.spec: if step.spec["require_discard_piece"]: self.require_discard_piece = True if step.step_type == "resistance": # ADD STEP TO ENSURE REMOVAL OF CONNECTOR steps.insert(i + 1, Step(step_type="resistance", spec={ "scale": 500, "expected": float("+inf"), "tolerance_pos": 0, "tolerance_neg": 0, })) skip.add(i + 1) if step.step_type == "print": self.print_template = step.spec.get("template", "EtichettaR5") # Store the template if print_found: continue steps.insert(i, Step(step_type="done", spec={})) print_found = True self.print_step = step if self.config["hardware_config"].get("enforce_piece_removal", "no") == "yes": if recipe.spec.get("instruction",False) is not False: steps.append(Step(step_type="piece_removal", spec={})) skip.add(i + 1) if count_found: steps.append(Step(step_type="count_end", spec={})) skip.add(i + 1) if step.step_type in ("leak_1", "leak_2"): self.leak_step = step step_types = [step.step_type for step in steps] if "leak_1" in step_types and "leak_2" in step_types: leak1_index = step_types.index("leak_1") leak2_index = step_types.index("leak_2") if leak1_index + 1 == leak2_index: # Ensure 'leak_1' is immediately followed by 'leak_2' if self.config["hardware_config"].get("second_leak_test", "yes") == "no": steps.insert(leak2_index, Step(step_type="instruction_extra", spec={})) inserted_instruction = True # Insert 'instruction_extra' after the first 'instructions' if not inserted between leaks if not inserted_instruction: for i, step in enumerate(steps): if step.step_type == "instructions": steps.insert(i + 1, Step(step_type="instruction_extra", spec={})) inserted_instruction = True break if not print_found: steps.append(Step(step_type="done")) if count_found: steps.append(Step(step_type="count_end")) steps.append(Step(step_type="wait")) self.cycle_steps = steps self.check_steps_dependencies(self.cycle_steps) leak_autotest_steps = [] # CONFIGURE LEAK AUTOTEST PARAMETERS if self.config["autotest_leak"]["enabled"] == "true": l_at_1 = copy.deepcopy(self.config["autotest_leak"]) l_at_1.pop("enabled") l_at_1 = {a: float(x) for a, x in l_at_1.items()} l_at_1["autotest"] = "ko_check" l_at_2 = copy.deepcopy(self.config["autotest_leak"]) l_at_2.pop("enabled") l_at_2 = {a: float(x) for a, x in l_at_2.items()} l_at_2["test_pressure_qneg"] = l_at_2["test_pressure_tt_qneg"] l_at_2["test_pressure_qpos"] = l_at_2["test_pressure_tt_qpos"] l_at_2["autotest"] = "ok_check" leak_autotest_steps = [Step(step_type="leak_1", spec=l_at_1), Step(step_type="leak_1", spec=l_at_2)] self.autotest_cycle_steps = [ *([ Step(step_type="resistance", spec={ "scale": 500, "expected": 1, "tolerance_pos": 5, "tolerance_neg": 5, "autotest": True, }), Step(step_type="resistance", spec={ "scale": 500, "expected": float("+inf"), "tolerance_pos": 0, "tolerance_neg": 0, "autotest": True, }), Step(step_type="resistance", spec={ "scale": 500, "expected": 10, "tolerance_pos": 1, "tolerance_neg": 1, "autotest": True, }), Step(step_type="resistance", spec={ "scale": 500, "expected": float("+inf"), "tolerance_pos": 0, "tolerance_neg": 0, "autotest": True, }), ] if "resistance" not in self.unsupported_steps else []), *(leak_autotest_steps), Step(step_type="done"), Step(step_type="wait"), ] for w in self.cycle_available_steps.values(): if hasattr(w, "reset"): w.reset() # UPDATE RECIPE DISPLAY if self.recipe is not None: self.log.info(f"set recipe: {model_to_dict(self.recipe)!r} cycle steps: {[s.step_type for s in self.cycle_steps]}") self.recipe_l.setText(self.recipe.name) self.recipe_l.setStyleSheet("") self.cycle_index = -1 self.next() else: self.log.info(f"set recipe: {self.recipe!r} cycle steps: {self.cycle_steps}") self.recipe_l.setText("NON SELEZIONATA") self.recipe_l.setStyleSheet("QLabel { color: red; }") def check_steps_dependencies(self, steps): unsupported_steps = set() missing_components = set() for step in steps: if step.step_type in self.unsupported_steps or step.step_type not in self.cycle_available_steps: unsupported_steps.add(step.step_type) else: for dependency in self.steps_dependencies.get(step.step_type, []): if isinstance(dependency, tuple): if all([d not in self.components for d in dependency]): unsupported_steps.add(step.step_type) else: unready = set() for d in dependency: if d in self.components: if not self.components[d].ready: self.components[d].reconfigure() if not self.components[d].ready: unready.add(d) else: unready.add(d) if unready == set(dependency): missing_components.add(" or ".join(dependency)) else: if dependency not in self.components: unsupported_steps.add(step.step_type) else: if not self.components[dependency].ready: self.components[dependency].reconfigure() if not self.components[dependency].ready: missing_components.add(dependency) if len(unsupported_steps): QMessageBox.critical(None, "Errore Ricetta", f"Questa ricetta contiene uno o piu step non supportati da questo banco:\n{', '.join(sorted(unsupported_steps))}\nModificare la ricetta adeguatamente.") if len(missing_components): QMessageBox.critical(None, "Errore Componenti Ricetta", f"Questa ricetta richiede i seguenti componenti per essere eseguita:\n{', '.join(sorted(missing_components))}\nModificare la ricetta adeguatamente o collegare i componenti elencati.") if len(unsupported_steps) or len(missing_components): self.change_recipe() def set_step(self, step_name, data=None): if step_name not in self.data: self.data[step_name] = {} if data is not None: data["step"] = self.step.spec data["step"].pop("name", None) # MAKE ARRAY ONLY IF MORE THAN ONE TEST OF SAME TYPE if len(self.data[step_name]) > 1: self.data[step_name][str(len(self.data[step_name]))] = data else: self.data[step_name] = data self.data["overridden"] = self.data["overridden"] or data.get("overridden", False) self.data["ok"] = self.data["ok"] and data.get("ok", False) self.next() def done(self, ok=True): self.log.info("cycle done, saving data...") # Remove useless info self.data.get("recipe", {}).get("spec", {}).pop("steps", None) for leak in ["leak_1", "leak_2"]: if leak in self.data.keys() and "results" in self.data[leak]: # Check if tester_component exists in results if self.tester_component in self.data[leak]["results"]: # Safely extract results with necessary checks results = {k: self.data[leak]["results"][self.tester_component].get(k, "N/A") for k in ["Running test: result"]} results.update( { k: round(float(self.data[leak]["results"][self.tester_component].get(k, 0.0)), 2) for k in [ "Running test: filling pressure", "Running test: measured leak", "Running test: pressure at the end of settling", ] } ) self.data[leak]["results"] = results else: self.log.warning( f"Missing tester_component '{self.tester_component}' in leak results for '{leak}', skipping...") self.data[leak]["results"] = {"Running test: result": "MISSING"} else: self.log.warning(f"Leak '{leak}' has no results; skipping...") if "vision" in self.data: vision = self.data.get("vision", {}) # Save vision results if available out_paths = self.components["vision_saver"].save( save_time=vision.get("time", None), frame=vision.get("frame", None), ) vision["files"] = out_paths self.log.info(f"cycle vision saved locally: {out_paths!r}") vision.pop("frame", None) vision.pop("render", None) vision.pop("detections", None) if "results" in vision.keys(): vision["results"].pop("results", None) if self.autotesting: self.data["autotest"] = True self.data["autotest_reason"] = self.autotesting_reason self.data["recipe"]["name"] = "AUTOTEST" # Archive and update results archived = Archive.archive(self.data, ok and self.data["ok"], overridden=self.data["overridden"]) self.log.info(f"cycle archived locally: {archived!r}") if not self.autotesting: # Count results based on success or failure if ok: self.pieces["ok"] += 1 else: self.pieces["ko"] += 1 else: if self.autotesting_reason == "logout": if ok: # Make sure to properly logout the user before opening login screen from lib.db.models.users import Users from components import ArchiveSynchronizer Users.logout() ArchiveSynchronizer.machine_status = "logged-out" self.main_window.open_login() return archived @staticmethod def labellify(v): if v is None: v = "-" elif isinstance(v, float): v = f"{v:.1f}" if not isinstance(v, str): v = str(v) return v def print(self, archived, label): self.log.info("cycle print") if archived is None: self.log.error("attempting to print empty label") return None if archived.label is not None: # raise AssertionError("this should never happen") self.log.info("printing already compiled label") # LABEL PRINT recipe = archived.test_data.get("recipe", {}) leak_test_1 = archived.test_data.get("leak_1", {}) leak_test_1_step = leak_test_1.get("step", {}) leak_test_1_step_spec = leak_test_1_step.get("spec", {}) leak_test_1_results = leak_test_1.get("results", {}) leak_test_2 = archived.test_data.get("leak_2", {}) leak_test_2_step = leak_test_2.get("step", {}) leak_test_2_step_spec = leak_test_2_step.get("spec", {}) leak_test_2_results = leak_test_2.get("results", {}) psetminp_a = leak_test_1_step_spec.get("test_pressure", 0) * (100 + leak_test_1_step_spec.get("test_pressure_qneg", 0) / 100) psetmaxp_a = leak_test_1_step_spec.get("settling_pressure_max_percent", 0) * (100 + leak_test_1_step_spec.get("test_pressure_qpos", 0) / 100) psetminp2_a = leak_test_2_step_spec.get("settling_pressure_min_percent", 0) * (100 + leak_test_2_step_spec.get("test_pressure_qneg", 0) / 100) psetmaxp2_a = leak_test_2_step_spec.get("settling_pressure_max_percent", 0) * (100 + leak_test_2_step_spec.get("test_pressure_qpos", 0) / 100) if self.tester_component is not None: if self.recipe.spec["leak_1"]: leak_test_1_results["Running test: pressure at the end of measure"] = ( leak_test_1_results["Running test: pressure at the end of settling"] + leak_test_1_results["Running test: measured leak"]) if self.recipe.spec["leak_2"]: leak_test_2_results["Running test: pressure at the end of measure"] = ( leak_test_2_results["Running test: pressure at the end of settling"] + leak_test_2_results["Running test: measured leak"]) printer_fields = self.print_step.spec context = { # RECIPE DATA "RECIPE": self.labellify(recipe.get("name", "-")), "CLIENT": self.labellify(recipe.get("client", "-")), "PART": self.labellify(recipe.get("part_number", "-")), "DESCRIPTION": self.labellify(recipe.get("description", "-")), "RECIPE_TO_PRINT": self.labellify(self.print_step.spec.get("labeltxt_5", "-").replace('{N11}', '')), # STEP SPEC "TPREFILL": self.labellify(leak_test_1_step.get("pre_filling_time", "-")), "PPREFILL": self.labellify(leak_test_1_step.get("pre_filling_pressure", "-")), "TFILL": self.labellify(leak_test_1_step.get("filling_time", "-")), "TSET": self.labellify(leak_test_1_step.get("settling_time", "-")), "TPREFILL2": self.labellify(leak_test_2_step.get("pre_filling_time", "-")), "PPREFILL2": self.labellify(leak_test_2_step.get("pre_filling_pressure", "-")), "TFILL2": self.labellify(leak_test_2_step.get("filling_time", "-")), "TSET2": self.labellify(leak_test_2_step.get("settling_time", "-")), "PSETMINP": self.labellify(leak_test_1_step.get("settling_pressure_min_percent", " -")), "PSETMAXP": self.labellify(leak_test_1_step.get("settling_pressure_max_percent", " -")), "PSETMINP2": self.labellify(leak_test_2_step.get("settling_pressure_min_percent", " -")), "PSETMAXP2": self.labellify(leak_test_2_step.get("settling_pressure_max_percent", " -")), "PSETMINP_A": self.labellify(psetminp_a), "PSETMAXP_A": self.labellify(psetmaxp_a), "PSETMINP2_A": self.labellify(psetminp2_a), "PSETMAXP2_A": self.labellify(psetmaxp2_a), "TTEST": self.labellify(leak_test_1_step.get("test_time", "-")), "TTEST2": self.labellify(leak_test_2_step.get("test_time", "-")), "PMIN": self.labellify(leak_test_1_step.get("test_pressure_qneg", "-")), "PMIN2": self.labellify(leak_test_2_step.get("test_pressure_qneg", "-")), "PTEST": self.labellify(leak_test_1_step.get("test_pressure", "-")), "PTEST2": self.labellify(leak_test_2_step.get("test_pressure", "-")), "PMAX": self.labellify(leak_test_1_step.get("test_pressure_qpos", "-")), "TFLUSH": self.labellify(leak_test_1_step.get("flush_time", "-")), "PFLUSH": self.labellify(leak_test_1_step.get("flush_pressure", "-")), # ACTUAL TESTED VALUES "RESPFILL": self.labellify(leak_test_1_results.get("Running test: filling pressure", "-")), "RESPFILL2": self.labellify(leak_test_2_results.get("Running test: filling pressure", "-")), "RESPSET": self.labellify(leak_test_1_results.get("Running test: pressure at the end of settling", "-")), "RESPSET2": self.labellify(leak_test_2_results.get("Running test: pressure at the end of settling", "-")), "RESPEND": self.labellify(leak_test_1_results.get("Running test: pressure at the end of measure", "-")), "RESPEND2": self.labellify(leak_test_1_results.get("Running test: pressure at the end of measure", "-")), "RESLEAK": self.labellify(leak_test_1_results.get("Running test: measured leak", "-")), "RESLEAK2": self.labellify(leak_test_2_results.get("Running test: measured leak", "-")), "RESRES": self.labellify(leak_test_1_results.get("Running test: result", "-")), "RESRES2": self.labellify(leak_test_2_results.get("Running test: result", "-")), # SERIAL DEFINITION "SN": str(archived.id), "SN4": f"{archived.id:0>4}", "SN5": f"{archived.id:0>5}", "SN6": f"{archived.id:0>6}", "SN7": f"{archived.id:0>7}", # TIME DEFINITION "DATETIME": archived.time.strftime("%d/%m/%Y %H:%M:%S"), "DATE": archived.time.strftime("%d/%m/%Y"), "TIME": archived.time.strftime("%H:%M:%S"), "YYYY": archived.time.strftime("%Y"), "YY": archived.time.strftime("%y"), "MO": archived.time.strftime("%m"), "DD": archived.time.strftime("%d"), "HH": archived.time.strftime("%H"), "MI": archived.time.strftime("%M"), "SS": archived.time.strftime("%S"), "JJJ": archived.time.strftime("%j"), "WW": archived.time.strftime("%W"), # EXTRA DATA "SHIFT": str(get_shift(archived.time)), "STATION": str(self.config.machine_id), "OPERATOR": str(archived.user.username), "BADGE_NUM": str(archived.user.badge_number), # BARCODE "BCODE": str(self.step.spec.get("barcode","")), # RESULT "RESULT": str("CONFORME" if leak_test_1.get("ok", False) else "SCARTO") + str(" FORZATO" if self.data.get("overridden", False) else ""), "RESULT_L1": "ESITO" + str(" FORZATO" if self.data.get("overridden", False) else ""), "RESULT_L2": str("CONFORME" if leak_test_1.get("ok", False) else "SCARTO"), } #TESTING BROTHER label_brother = context.get("RECIPE_TO_PRINT", "-") + context.get("DD","-") + context.get("MO","-") + context.get("YY","-") + context.get("SN5","-") barcode = str(label_brother) barcode_format = self.print_step.spec.get("barcode", "-") self.printed_barcode = barcode_format.format_map(context) formatted_barcode = barcode_format.format(**context) context['BCODE'] = formatted_barcode if self.archived is not None: self.archived.barcode = self.printed_barcode for n in range(5): field = f"labeltxt_{n + 1}" if field in printer_fields.keys(): if printer_fields[field] != "": context[field.upper()] = printer_fields[field] # PRINT MAIN PRODUCT LABEL if self.config["hardware_config"]["tecna_t3"] == "absent" and self.config["hardware_config"]["furness_controls"] == "absent": compiled_label = self.components["label_printer_2"].print_label(barcode) # Printing with Brother label printer else: compiled_label = self.components["label_printer"].print_label(label, context=context) self.log.info(f"Main label printed: {context!r}") # return fields used to print label for saving into test archive return context def print_extra_labels(self): # PRINT EXTRA LABELS IF NEEDED (BEFORE LEAK TEST) if "extra_label_printer" in self.components.keys() and self.print_step is not None and self.autotesting is False: printer_fields = self.print_step.spec if len(printer_fields["extra_label"]) > 0: labels = printer_fields["extra_label"].split(",") for label in labels: self.components["extra_label_printer"].print_label(f"{label}.prn", context=None) @pyqtSlot(str) def load_recipe_from_rfid(self, data): if data not in(None,''): self.tag_loaded_recipe = data if self.step.step_type == "barcode_recipe_selection": if data is not None: self.cycle_available_steps["barcode_recipe_selection"].widget.get(data) else: pass else: # fixture removed self.tag_loaded_recipe = None self.fail_cycle() self.change_recipe() def add_error(self, message, is_error): """ Add a new error message to the list of active errors. Args: message (str): The error message to add. is_error (bool): True if it's an error that should show in red, False for non-errors. """ # Avoid duplicates if (message, is_error) not in self.active_errors: self.active_errors.append((message, is_error)) # Start the timer if it's not already active if not self.error_timer.isActive(): self.error_timer.start() def remove_error(self, message): """ Remove an error message from the list of active errors. Args: message (str): The error message to remove. """ self.active_errors = [err for err in self.active_errors if err[0] != message] # Reset the error index if necessary if self.current_error_index >= len(self.active_errors): self.current_error_index = 0 # Stop the timer if there are no more errors if not self.active_errors: self.error_label.setText("") self.error_label.setStyleSheet("") self.error_timer.stop() def display_current_error(self): """ Display the current error from the active errors list. If there are multiple errors, it alternates between them every time this function is called. """ if self.active_errors: # Get the current error message and update the label message, is_error = self.active_errors[self.current_error_index] self.error_label.setText(message) if is_error: self.error_label.setStyleSheet("color: red;") else: self.error_label.setStyleSheet("color: green;") # Move to the next error for the next call to this method self.current_error_index = (self.current_error_index + 1) % len(self.active_errors) else: # Clear the label if there are no errors self.error_label.setText("") self.error_label.setStyleSheet("") @pyqtSlot(bool) def handle_rfid_error(self, connected): """ Handle errors related to the RFID system. Args: connected (bool): True if connected, False if not. """ if connected: self.remove_error("Errore RFID.") # Remove the RFID error else: self.add_error("Errore RFID.", True) # Add the RFID error @pyqtSlot(bool, str) def handle_modbus_error(self, has_error, error_message): """ Handle Tecna/Modbus errors and manage the error list. Args: has_error (bool): True if there is an error, False otherwise. error_message (str): The error message to add. """ #print(f"DEBUG: Modbus error handler called - has_error={has_error}, error_message={error_message}") # Debugging if has_error: self.add_error(f"Errore Tecna", True) # Add the Modbus error else: self.remove_error(f"Errore Tecna") # Remove the Modbus error def update_label_with_args(self, extra_info=None): """ Updates the flag label with the string representation of current command-line arguments and optional extra info, directly displaying it on the label. Args: extra_info (str): Optional. Extra information to append to the label text. """ # Combine command-line arguments args_text = " ".join(sys.argv[1:]) if len( sys.argv) > 1 else "No system arguments provided." # Default to No args if args_text: # If there are CLI arguments (or default message) # Combine CLI args and extra info for label text if extra_info: args_text += f" | {extra_info}" # Update the label text directly with args_text self.flag_label.setText(args_text) self.flag_label.setStyleSheet("QLabel { color: red; font-weight: bold; }") # Customize color if needed self.flag_label.setVisible(True) # Ensure the label is visible else: # No args provided self.flag_label.setVisible(False) # Hide label