import copy import logging import os import sys import weakref from datetime import datetime, timedelta from distutils.util import change_root from PyQt5.QtCore import QTimer, pyqtSlot, pyqtSignal from PyQt5.QtWidgets import QMessageBox from lib.db import Archive, Recipes, Users from lib.helpers import get_shift from lib.helpers.step import Step from playhouse.shortcuts import model_to_dict from ui.barcode_recipe_selection import Barcode_Recipe_Selection from ui.helpers import replace_widget from ui.recipe_selection import Recipe_Selection from ui.test_assembly import Test_Assembly from ui.test_barcodes import Test_Barcodes from ui.test_connector import Test_Connector from ui.test_count import Test_Count from ui.test_count_end import Test_Count_End from ui.test_fail import Test_Fail from ui.test_instructions import Test_Instructions from src.ui.test_pipe_cutter import Test_Pipe_Cutter from ui.test_leak import Test_Leak from ui.test_resistance import Test_Resistance from ui.test_screws import Test_Screws from ui.test_vision import Test_Vision from ui.test_warning_img import Test_Warning_Img from ui.widget import Widget from components import ArchiveSynchronizer class Test(Widget): def __init__(self, config, components=None, main_window=None): super().__init__() self.autotest_timer = None self.main_window = main_window self.config = config self.components = components # GET LOGGER self.log = logging.getLogger("Test") # SHOW MACHINE DESCRIPTION self.machine_description_l.setText(self.config.get("machine", {}).get("description", "N/A")) # SHOW USERNAME session = Users.get_session() self.original_username = session.username # Check if we should set the label to "ADMIN" if session.username.upper() == "ADMIN": self.user_l.setText("ADMIN") session._is_admin = True else: self.user_l.setText(session.username) session._is_admin = False if session.is_admin: self.user_l.setStyleSheet("QLabel { color: red; }") else: self.user_l.setStyleSheet("") # Store original admin status self.had_admin = session.is_admin self.flag_label.setVisible(False) if len(sys.argv) > 1: self.flag_label.setVisible(True) self.update_label_with_args() # Initial update else: self.flag_label.setVisible(False) self.active_errors = [] # List to hold current errors (type: tuples of (message, is_error)) self.current_error_index = 0 # Keeps track of the current error index during alternation # Timer for alternating errors self.error_timer = QTimer() self.error_timer.setInterval(2000) # Fire every 2 seconds self.error_timer.timeout.connect(self.display_current_error) # Connect the timer to the display logic # SHOW AND UPDATE TIME CLOCK self.refresh_time(init=True) # INIT RECIPE self.recipe = None if "fixture_id" in self.components: self.rfid = self.components["fixture_id"] self.rfid.rfid_error_signal.connect(self.handle_rfid_error) if "tecna_t3" in self.components: self.tecna = self.components["tecna_t3"] #self.tecna.tecna_error_signal.connect(self.handle_modbus_error) self.error_label.setText("") self.error_label.setStyleSheet("QLabel { color: red; }") if self.config["hardware_config"]["barcode_recipe_selection"] == "present": self.recipe_selection_mode = "barcode" else: self.recipe_selection_mode = "table" self.step = None self.tester_component = None if self.config["hardware_config"]["tecna_t3"] == "present": self.tester_component = "tecna_t3" #self.components["tecna_t3"].tecna_error_signal.connect(self.handle_modbus_error) elif self.config["hardware_config"]["furness_controls"] == "present": self.tester_component = "furness_control" self.unsupported_steps = set() self.steps_dependencies = { "count": set(), "connector": {"multicomp", }, "instruction": {"digital_io"}, "screws": {"screwdriver", "tecna_t3", }, "resistance": {"multicomp", }, "leak_1": {self.tester_component, }, "leak_2": {self.tester_component, }, "pipe_cutter": {"pipe_cutter"}, "vision": {("uvc_camera", "galaxy_camera","hikrobot_sc"), "vision", "vision_saver", }, # "neo_pixels", }, "print": {"label_printer_2"} if self.config["hardware_config"]["label_printer"] != "present" else {"label_printer"}, } self.unsupported_steps = set() for step_name, dependencies in self.steps_dependencies.items(): for dependency in dependencies: if isinstance(dependency, tuple): # if all([d not in self.components or not self.components[d].ready for d in dependency]): if all([d not in self.components for d in dependency]): self.unsupported_steps.add(step_name) else: # if dependency not in self.components or not self.components[dependency].ready: if dependency not in self.components: self.unsupported_steps.add(step_name) # INIT PIECES COUNTER self.pieces = {"ok": 0, "ko": 0} # INIT CYCLE STATES self.cycle_available_steps = { # "assembly_1": Test_Assembly(img_path=self.select_step_img("assembly_1"), text=u"INSERIRE SENSORE", widget=None), "barcodes": Test_Assembly(img_path=self.select_step_img("scan"), text=u"LEGGERE IL BARCODE DEL PEZZO DA COLLAUDARE", widget=Test_Barcodes()), "connector": Test_Assembly(img_path=self.select_step_img("scan"), text=u"COLLEGARE IL CONNETTORE INDICATO AL PEZZO E LEGGERE IL SUO BARCODE", widget=Test_Connector(run_once=True)), "count": Test_Assembly(img_path=None, text=u"INSERIRE IL NUMERO DI PEZZI ATTESI PER IL LOTTO", widget=Test_Count(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces, run_once=True)), "warning_img": Test_Assembly(img_path=None, text=u"ATTENZIONE - PER QUESTO CODICE ESEGUIRE LE OPERAZIONI INDICATE IN FIGURA", widget=Test_Warning_Img(components=self.components, recipe=self.recipe,bench_name=self.config["machine"]["image_for_warning"], step=self.step, run_once=True)), "count_end": Test_Assembly(img_path=None, text=u"LOTTO TERMINATO, PREMERE CONTINUA PERCOMINCIARNE UNO NUOVO", widget=Test_Count_End(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)), "done": Test_Assembly(img_path=self.select_step_img("success"), text=u"COLLAUDO COMPLETATO", widget=None), "emergency": Test_Assembly(img_path=self.select_step_img("reset_emergency"), text=u"EMERGENZA INTERVENUTA - RIPRISTINARE PULSANTE E SELEZIONARE \"RESET EMERGENZA\" DAL MEN\u00d9 \"STRUMENTI\"", widget=None), "fail": Test_Assembly(img_path=self.select_step_img("fail"), text=u"CICLO INTERROTTO, PREMERE CONTINUA PER COMINCIARE UN NUOVO CICLO", widget=Test_Fail(parent=self)), "blow": Test_Assembly(img_path=None, text=u"SOFFIAGGIO TUBO IN CORSO - ATTENDERE...", widget=Test_Warning_Img(components=self.components, recipe=self.recipe, step=self.step)), "leak_1": Test_Assembly(img_path=None, text=None, widget=Test_Leak(config=self.config,components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces, parent=self)) if self.config["hardware_config"]["tecna_t3"] != "absent" or self.config["hardware_config"]["furness_controls"] !="absent" else None, "leak_2": Test_Assembly(img_path=None, text=None, widget=Test_Leak(config=self.config,components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces, parent=self)) if self.config["hardware_config"]["tecna_t3"] != "absent" or self.config["hardware_config"]["furness_controls"] != "absent" else None, "flush": Test_Assembly(img_path=None, text=u"SCARICO ARIA IN CORSO - ATTENDERE...", widget=Test_Warning_Img(components=self.components, recipe=self.recipe, step=self.step)), "instruction": Test_Assembly(img_path=None, text=u"ESEGUIRE LE OPERAZIONI DI MONTAGGIO INDICATE IN FIGURA", widget=Test_Instructions(config=self.config,components=self.components, recipe=self.recipe, bench_name=self.config.machine_id, step=self.step)), "pipe_cutter": Test_Assembly(img_path=None, text=u"ATTENZIONE TAGLIO CORRUGATO IN CORSO",widget=Test_Pipe_Cutter(config=self.config, components=self.components,recipe=self.recipe, bench_name=self.config.machine_id,step=self.step)), "instruction_extra": Test_Assembly(img_path=None, text=u"ESEGUIRE LE OPERAZIONI DI MONTAGGIO EXTRA INDICATE IN FIGURA", widget=Test_Instructions(config=self.config, components=self.components,recipe=self.recipe, bench_name=self.config.machine_id,step=self.step)), "piece_removal": Test_Assembly(img_path=None, text=u"RIMUOVERE IL PEZZO APRENDO TUTTE LE CHIUSURE", widget=Test_Instructions(config=self.config,components=self.components, recipe=self.recipe, bench_name=self.config.machine_id, step=self.step)), "print": Test_Assembly(img_path=self.select_step_img("print"), text=u"STAMPA ETICHETTA IN CORSO", widget=None), "resistance": Test_Assembly(img_path=None, text=u"COLLEGARE CONNETTORE ELETTRICO PER EFFETTUARE PROVA RESISTENZA", widget=Test_Resistance(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)), "screws": Test_Assembly(img_path=None, text=u"AVVITARE TUTE LE VITI COME INDICATO", widget=Test_Screws(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)), "select_recipe": Test_Assembly(img_path=None, text=u"SELEZIONARE IL CODICE DA COLLAUDARE", widget=Recipe_Selection(config=self.config, unsupported_steps=self.unsupported_steps)), "barcode_recipe_selection": Test_Assembly(img_path=self.select_step_img("scan"), text=u"LEGGERE IL BARCODE SULLA DIMA DEL COMPONENTE DA COLLAUDARE", widget=Barcode_Recipe_Selection(parent=self)), "vision": Test_Assembly(img_path=None, text=u"VERIFICARE CONTROLLO CON TELECAMERA", widget=Test_Vision(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)), "wait": Test_Assembly(img_path=self.select_step_img("wait"), text=u"ATTENDERE - PAUSA INTER CICLO", widget=None), None: Test_Assembly(img_path=self.select_step_img("warning"), text=u"ATTENZIONE - LA RICETTA SELEZIONATA NON CONTIENE FASI DI TEST", widget=None), } self.cycle_steps = None self.cycle_index = -1 self.print_step = None self.last_label = None self.require_discard_piece = False # SETUP AUTOTEST self.autotest_request = False self.autotesting = False self.autotesting_reason = None self.autotest_cycle_steps = None if "--no-autotest" in sys.argv: self.setStyleSheet("background-color: red;") else: self.setStyleSheet("background-color: white;") if "--no-autotest" not in sys.argv: if "--test-autotest" in sys.argv: self.autotest_period = int(60 * 1000) # 1 min else: #self.autotest_period = int(8.5 * 60 * 60 * 1000)# 8.5 HOURS self.autotest_period = int(4 * 60 * 60 * 1000)# 4 HOURS # self.autotest_period = 12 * 60 * 60 * 1000 # 12 HOURS # if not self.config["autotest_done"]: # self.request_autotest("init") else: self.autotest_period = None # INIT TEST DATA self.data = {"ok": True, "overridden": False} self.archived = None # CONNECT CYCLE CONTROLS self.cancel_b.clicked.connect(self.fail_cycle) self.change_recipe_b.clicked.connect(self.change_recipe) self.reset_count_b.clicked.connect(self.reset_count) for step_name, w in self.cycle_available_steps.items(): if hasattr(w, "ok"): # custom ok handlers should call next again if isinstance(w.widget, Recipe_Selection): w.ok.connect(self.set_recipe) else: w.ok.connect(lambda data=None, step_namel=step_name, selfie=weakref.ref(self): selfie().set_step(step_namel, data)) if hasattr(w, "ko"): w.ko.connect(self.fail_cycle) # CUSTOM STEP CONNECTIONS self.cycle_available_steps["count"].ok.connect(self.cycle_available_steps["count_end"].widget.set_amount) # self.cycle_available_steps["warning_img"].ok.connect(self.cycle_available_steps["warning_img"].widget.set_done) if "fixture_id" in self.components.keys(): self.components["fixture_id"].new_id_signal.connect(self.load_recipe_from_rfid) self.components["fixture_id"].rfid_error_signal.connect(self.handle_rfid_error) self.tag_loaded_recipe = self.main_window.tag_loaded_recipe # TESTING if "--test" in sys.argv: self.testing = True else: self.testing = False # /TESTING # START CYCLE self.next_timer = QTimer() self.next_timer.setSingleShot(True) self.next_timer.timeout.connect(self.next) self.next() def refresh_time(self, init=False): if init and not hasattr(self, 'time_timer'): self.time_timer = QTimer() self.time_timer.setSingleShot(False) self.time_timer.timeout.connect(self.refresh_time) t = datetime.now() self.time_l.setText("{d}/{mo}/{y}\n{h}:{m:02d}".format( y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute )) # Always restart the timer, ensuring intervals are set correctly if hasattr(self, 'time_timer'): # Safeguard for uninitialized timer self.time_timer.start(30000) def select_step_img(self, step, suffix=None): img_path = "./src/ui/imgs" names = [] if suffix is not None: names.append(f"{step}_{suffix}_{self.config.machine_id}") names.append(f"{step}_{suffix}") names.append(f"{step}_{self.config.machine_id}") names.append(f"{step}") for name in names: for ext in ["png", "jpg"]: path = f"{img_path}/{name}.{ext}" if os.path.isfile(path): return path raise FileNotFoundError(f"No image was found for step {step}") def change_recipe(self): self.next(action="change_recipe") def set_recipe_mode_table(self): self.recipe_selection_mode = "table" self.change_recipe() def set_recipe_mode_barcode(self): self.recipe_selection_mode = "barcode" self.change_recipe() def cut_tube(self): self.components["pipe_cutter"].to_calibrate() self.components["pipe_cutter"].start_cutting() def reprint_label(self): self.print(self.last_label, self.print_step.spec.get("template", "EtichettaR5")) def fail_cycle(self): self.next(action="fail") def setCentralWidget(self, widget): replace_widget(self, "centralWidget", widget) def enable_temp_admin(self): """Enable temporary admin privileges for the current user""" session = Users.get_session() if session is None: self.log.warning("Cannot enable temporary admin privileges: No active session") return False # Save the current label text before changing it self.saved_user_label_text = self.user_l.text() # Enable temporary admin privileges using the Session class method session.enable_temp_admin() # Update UI to reflect admin status self.user_l.setText("ADMIN") self.user_l.setStyleSheet("QLabel { color: red; }") self.log.info(f"Temporary admin privileges enabled for user: {session.username}") return True def disable_temp_admin(self): """Disable temporary admin privileges and restore original user status""" session = Users.get_session() if session is None: self.log.warning("Cannot disable temporary admin privileges: No active session") return False # Disable temporary admin privileges using the Session class method session.disable_temp_admin() # Restore original UI # Use the saved label text if available, otherwise fall back to original username if hasattr(self, 'saved_user_label_text'): self.user_l.setText(self.saved_user_label_text) else: self.user_l.setText(self.original_username) # Set style based on original admin status if self.had_admin: self.user_l.setStyleSheet("QLabel { color: red; }") else: self.user_l.setStyleSheet("") self.log.info(f"Temporary admin privileges disabled for user: {session.username}") return True def request_autotest(self, reason): # you can cancel the request calling request_autotest(False) if "--no-autotest" not in sys.argv: self.log.info(f"cycle request autotest: reason: {reason!r} autotest_request: {self.autotest_request!r}") if reason in ("init", "login"): self.autotest_timer = QTimer() self.autotest_timer.setSingleShot(False) self.autotest_timer.timeout.connect(self.request_periodic_autotest) if self.autotest_period is not None: self.autotest_timer.start(self.autotest_period) reason = "boot" if not hasattr(self, "refresh_timer"): self.refresh_timer = QTimer() self.refresh_timer.setSingleShot(False) self.refresh_timer.timeout.connect(self.refresh_time) self.refresh_timer.start(30000) if self.config["autotest_leak"]["enabled"] == "true": self.autotest_request = reason else: self.autotest_request = False else: self.log.info(f"Autotest request ignored (reason: {reason!r}) --no-autotest flag detected") if reason == "logout": self.next(action="abort") def request_periodic_autotest(self): self.request_autotest("periodic") def next(self, action=None): if self.step is not None: self.log.info(f"cycle step: {self.step.step_type!r} action: {action!r} current index:{self.cycle_index}") else: self.log.info(f"cycle step: {self.step!r} action: {action!r} current index:{self.cycle_index}") current_w = self.centralWidget if hasattr(current_w, "stop"): self.log.info(f"stopping widget {self.step.step_type}") current_w.stop() if action == "change_recipe": self.log.info(f"cycle next: action: {action!r}") self.set_recipe(recipe=None) if self.config["hardware_config"]["tecna_t3"] == "present" or self.config["hardware_config"][ "furness_controls"] == "present": self.cycle_available_steps["leak_1"].widget.recipe_written = False self.cycle_available_steps["leak_2"].widget.recipe_written = False self.step = Step(step_type="select_recipe") self.cycle_index = -1 self.recipe = None self.cycle_steps = None # COUNT RESET self.pieces["ok"] = 0 self.pieces["ko"] = 0 elif action in ("fail", "abort"): self.log.info(f"cycle next: action: {action!r}") # FAIL AND RESTART TEST self.step = Step(step_type="fail") self.cycle_index = -1 # COUNT FAIL if action == "fail": self.done(ok=False) elif action is not None: raise NotImplementedError(f"cycle next: action {action!r} is not a valid action") # Set next cycle step normally if no action sets it explicitly if self.recipe is None or self.cycle_steps is None: # If recipe not set: select_recipe if self.recipe_selection_mode == "barcode": self.log.info(f"returning to barcode recipe selection") self.step = Step(step_type="barcode_recipe_selection") else: self.log.info(f"returning to recipe selection table") self.step = Step(step_type="select_recipe") elif action is None: if ( self.autotest_request is not False and self.autotest_cycle_steps is not None and not self.autotesting and (self.cycle_index == -1 or self.cycle_index + 1 >= len(self.cycle_steps)) ): # If autotest was requested # and if cycle_steps is not started or has ended self.cycle_index = -1 self.autotesting = True self.autotesting_reason = self.autotest_request self.autotest_request = False self.log.info(f"Autotest requested (reason: {self.autotesting_reason})") if self.autotest_period is not None: # Reset periodic autotest timer self.time_timer.start(self.autotest_period) self.require_discard_piece = False if self.autotesting: if self.cycle_index + 1 < len(self.autotest_cycle_steps): # Go to next step in autotest_cycle_steps self.cycle_index += 1 self.step = self.autotest_cycle_steps[self.cycle_index] else: # When autotest ends, check if it needs to restart if not self.data.get("ok"): # Autotest failed self.log.warning("Restarting autotest from the first step due to failure.") self.cycle_index = 0 self.step = self.autotest_cycle_steps[self.cycle_index] # Restart from the first step else: # Autotest succeeded; proceed to post-autotest actions self.autotesting = False if self.autotesting_reason == "logout": Users.logout() self.main_window.open_login() else: t = datetime.now() self.last_at_l.setText( "{d}/{mo}/{y} {h}:{m}".format(y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute)) t += timedelta(seconds=int(self.autotest_period / 1000)) self.next_at_l.setText( "{d}/{mo}/{y} {h}:{m}".format(y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute)) self.autotesting_reason = None self.cycle_index = -1 self.config["autotest_done"] = True if not self.autotesting: if len(self.cycle_steps): # Go to next step in cycle_steps self.cycle_index = (self.cycle_index + 1) % len(self.cycle_steps) self.step = self.cycle_steps[self.cycle_index] else: self.cycle_index = -1 self.step = Step(step_type=None) # Enable/disable cycle controls self.change_recipe_b.setEnabled(self.recipe is not None) self.cancel_b.setEnabled(self.step.step_type is not None and self.step.step_type not in { "emergency", "fail", "select_recipe", "wait", }) self.log.info(f"next cycle step: {self.step.step_type!r}") # INIT TEST DATA IF STARTING CYCLE OR RESET IS NEEDED if self.cycle_index == 0 and not self.autotesting: # Initialize test data for normal cycles self.data = {"ok": True, "overridden": False} self.archived = None if self.recipe is not None and "recipe" not in self.data: self.data["recipe"] = model_to_dict(self.recipe) # Remaining logic for updating widgets, starting timers, etc. w = self.cycle_available_steps[self.step.step_type] show = None if self.step.step_type == "leak_2": self.setCentralWidget(w) # Pre-show UI for leak_2 if hasattr(w, "start"): show = w.start(recipe=self.recipe, step=self.step, pieces=self.pieces) if show is not False and w is not current_w: self.setCentralWidget(w) elif show is False: self.next_timer.start(0) if self.step.step_type == "done": self.archived = self.done() self.last_label = copy.deepcopy(self.archived) self.next_timer.start(500) elif self.step.step_type == "print": compiled_label = self.print(self.archived, self.step.spec.get("template", "EtichettaR5")) self.archived.test_data.update({"print": compiled_label}) self.archived.test_data.update({"print_template": self.print_template}) self.archived.test_data.update({"barcode_stampato": self.printed_barcode}) self.archived.label = compiled_label self.log.info(f"Label printed. Saving...") # self.archived.save() self.main_window.main_window.run_request.emit(self.archived.save, [], {}) self.next_timer.start(500) elif self.step.step_type == "wait": self.next_timer.start(500) # Update display self.update_count_display() def reset_count(self): # COUNT RESET self.pieces["ok"] = 0 self.pieces["ko"] = 0 self.update_count_display() def update_count_display(self): self.pieces_count_l.setText(f"{self.pieces['ok']} OK / {self.pieces['ko']} NOK / {sum(self.pieces.values())} TOT") def set_recipe(self, recipe=None): self.recipe = recipe self.config.active_recipe = recipe inserted_instruction = False self.require_discard_piece = False if self.recipe is None: self.cycle_steps = None self.autotest_cycle_steps = None else: steps = self.recipe.get_steps() skip = set() print_found = False count_found = False # create step sequence list barcode_names = ['serial', 'barcode_input_2', 'barcode_input_3', 'barcode_input_4', 'barcode_input_5'] for i, step in enumerate(steps): if step.step_type == "barcodes": n_pieces_value = step.spec.get("n_pieces") # Fix: Handle empty string and None n_pieces = 1 if n_pieces_value in (None, '') else n_pieces_value try: n_pieces_adapted = int(n_pieces) except ValueError: self.log.error(f"Invalid value for n_pieces: {n_pieces}") # Log the error n_pieces_adapted = 1 # Default to 1 if conversion fails if n_pieces_adapted == 1: step.spec["barcode_name"] = 'serial' else: step.spec["barcode_name"] = barcode_names[(n_pieces_adapted - 1) % len(barcode_names)] n_pieces_adapted -= 1 new_barcode_step = copy.deepcopy(step) new_barcode_step.spec["n_pieces"] = str(n_pieces_adapted) steps.insert(i + 1, new_barcode_step) if i in skip: continue if step.step_type == "vision": self.components["vision"].config_changed(vision_recipe=self.recipe.name) if step.step_type == "count": count_found = True if "warning_img" in step.spec: if step.spec["warning_img"]: steps.insert(i, Step(step_type="warning_img", spec={"warning_img": step.spec["warning_img"]})) skip.add(i + 1) if "assembly" in step.spec: if step.spec["assembly"]: steps.insert(i, Step(step_type="instructions", spec={})) skip.add(i + 1) if "require_discard_piece" in step.spec: if step.spec["require_discard_piece"]: self.require_discard_piece = True if step.step_type == "resistance": # ADD STEP TO ENSURE REMOVAL OF CONNECTOR steps.insert(i + 1, Step(step_type="resistance", spec={ "scale": 500, "expected": float("+inf"), "tolerance_pos": 0, "tolerance_neg": 0, })) skip.add(i + 1) if step.step_type == "print": self.print_template = step.spec.get("template", "EtichettaR5") # Store the template if print_found: continue steps.insert(i, Step(step_type="done", spec={})) print_found = True self.print_step = step if self.config["hardware_config"].get("enforce_piece_removal", "no") == "yes": if recipe.spec.get("instruction",False) is not False: steps.append(Step(step_type="piece_removal", spec={})) skip.add(i + 1) if count_found: steps.append(Step(step_type="count_end", spec={})) skip.add(i + 1) if step.step_type in ("leak_1", "leak_2"): self.leak_step = step step_types = [step.step_type for step in steps] if "leak_1" in step_types and "leak_2" in step_types: leak1_index = step_types.index("leak_1") leak2_index = step_types.index("leak_2") if leak1_index + 1 == leak2_index: # Ensure 'leak_1' is immediately followed by 'leak_2' if self.config["hardware_config"].get("second_leak_test", "yes") == "no": steps.insert(leak2_index, Step(step_type="instruction_extra", spec={})) inserted_instruction = True # Insert 'instruction_extra' after the first 'instructions' if not inserted between leaks if not inserted_instruction: for i, step in enumerate(steps): if step.step_type == "instructions": steps.insert(i + 1, Step(step_type="instruction_extra", spec={})) inserted_instruction = True break if not print_found: steps.append(Step(step_type="done")) if count_found: steps.append(Step(step_type="count_end")) steps.append(Step(step_type="wait")) self.cycle_steps = steps self.check_steps_dependencies(self.cycle_steps) leak_autotest_steps = [] # CONFIGURE LEAK AUTOTEST PARAMETERS if self.config["autotest_leak"]["enabled"] == "true": l_at_1 = copy.deepcopy(self.config["autotest_leak"]) l_at_1.pop("enabled") l_at_1 = {a: float(x) for a, x in l_at_1.items()} l_at_1["autotest"] = "ko_check" l_at_2 = copy.deepcopy(self.config["autotest_leak"]) l_at_2.pop("enabled") l_at_2 = {a: float(x) for a, x in l_at_2.items()} l_at_2["test_pressure_qneg"] = l_at_2["test_pressure_tt_qneg"] l_at_2["test_pressure_qpos"] = l_at_2["test_pressure_tt_qpos"] l_at_2["autotest"] = "ok_check" leak_autotest_steps = [Step(step_type="leak_1", spec=l_at_1), Step(step_type="leak_1", spec=l_at_2)] self.autotest_cycle_steps = [ *([ Step(step_type="resistance", spec={ "scale": 500, "expected": 1, "tolerance_pos": 5, "tolerance_neg": 5, "autotest": True, }), Step(step_type="resistance", spec={ "scale": 500, "expected": float("+inf"), "tolerance_pos": 0, "tolerance_neg": 0, "autotest": True, }), Step(step_type="resistance", spec={ "scale": 500, "expected": 10, "tolerance_pos": 1, "tolerance_neg": 1, "autotest": True, }), Step(step_type="resistance", spec={ "scale": 500, "expected": float("+inf"), "tolerance_pos": 0, "tolerance_neg": 0, "autotest": True, }), ] if "resistance" not in self.unsupported_steps else []), *(leak_autotest_steps), Step(step_type="done"), Step(step_type="wait"), ] for w in self.cycle_available_steps.values(): if hasattr(w, "reset"): w.reset() # UPDATE RECIPE DISPLAY if self.recipe is not None: self.log.info(f"set recipe: {model_to_dict(self.recipe)!r} cycle steps: {[s.step_type for s in self.cycle_steps]}") self.recipe_l.setText(self.recipe.name) self.recipe_l.setStyleSheet("") self.cycle_index = -1 self.next() else: self.log.info(f"set recipe: {self.recipe!r} cycle steps: {self.cycle_steps}") self.recipe_l.setText("NON SELEZIONATA") self.recipe_l.setStyleSheet("QLabel { color: red; }") def check_steps_dependencies(self, steps): unsupported_steps = set() missing_components = set() for step in steps: if step.step_type in self.unsupported_steps or step.step_type not in self.cycle_available_steps: unsupported_steps.add(step.step_type) else: for dependency in self.steps_dependencies.get(step.step_type, []): if isinstance(dependency, tuple): if all([d not in self.components for d in dependency]): unsupported_steps.add(step.step_type) else: unready = set() for d in dependency: if d in self.components: if not self.components[d].ready: self.components[d].reconfigure() if not self.components[d].ready: unready.add(d) else: unready.add(d) if unready == set(dependency): missing_components.add(" or ".join(dependency)) else: if dependency not in self.components: unsupported_steps.add(step.step_type) else: if not self.components[dependency].ready: self.components[dependency].reconfigure() if not self.components[dependency].ready: missing_components.add(dependency) if len(unsupported_steps): QMessageBox.critical(None, "Errore Ricetta", f"Questa ricetta contiene uno o piu step non supportati da questo banco:\n{', '.join(sorted(unsupported_steps))}\nModificare la ricetta adeguatamente.") if len(missing_components): QMessageBox.critical(None, "Errore Componenti Ricetta", f"Questa ricetta richiede i seguenti componenti per essere eseguita:\n{', '.join(sorted(missing_components))}\nModificare la ricetta adeguatamente o collegare i componenti elencati.") if len(unsupported_steps) or len(missing_components): self.change_recipe() def set_step(self, step_name, data=None): if step_name not in self.data: self.data[step_name] = {} if data is not None: data["step"] = self.step.spec data["step"].pop("name", None) # MAKE ARRAY ONLY IF MORE THAN ONE TEST OF SAME TYPE if len(self.data[step_name]) > 1: self.data[step_name][str(len(self.data[step_name]))] = data else: self.data[step_name] = data self.data["overridden"] = self.data["overridden"] or data.get("overridden", False) self.data["ok"] = self.data["ok"] and data.get("ok", False) self.next() def done(self, ok=True): self.log.info("cycle done, saving data...") # Remove useless info self.data.get("recipe", {}).get("spec", {}).pop("steps", None) for leak in ["leak_1", "leak_2"]: if leak in self.data.keys() and "results" in self.data[leak]: # Check if tester_component exists in results if self.tester_component in self.data[leak]["results"]: # Safely extract results with necessary checks results = {k: self.data[leak]["results"][self.tester_component].get(k, "N/A") for k in ["Running test: result"]} results.update( { k: round(float(self.data[leak]["results"][self.tester_component].get(k, 0.0)), 2) for k in [ "Running test: filling pressure", "Running test: measured leak", "Running test: pressure at the end of settling", ] } ) self.data[leak]["results"] = results else: self.log.warning( f"Missing tester_component '{self.tester_component}' in leak results for '{leak}', skipping...") self.data[leak]["results"] = {"Running test: result": "MISSING"} else: self.log.warning(f"Leak '{leak}' has no results; skipping...") if "vision" in self.data: vision = self.data.get("vision", {}) # Save vision results if available out_paths = self.components["vision_saver"].save( save_time=vision.get("time", None), frame=vision.get("frame", None), ) vision["files"] = out_paths self.log.info(f"cycle vision saved locally: {out_paths!r}") vision.pop("frame", None) vision.pop("render", None) vision.pop("detections", None) if "results" in vision.keys(): vision["results"].pop("results", None) if self.autotesting: self.data["autotest"] = True self.data["autotest_reason"] = self.autotesting_reason self.data["recipe"]["name"] = "AUTOTEST" # Archive and update results archived = Archive.archive(self.data, ok and self.data["ok"], overridden=self.data["overridden"]) self.log.info(f"cycle archived locally: {archived!r}") if not self.autotesting: # Count results based on success or failure if ok: self.pieces["ok"] += 1 else: self.pieces["ko"] += 1 else: if self.autotesting_reason == "logout": if ok: # Make sure to properly logout the user before opening login screen from lib.db.models.users import Users from components import ArchiveSynchronizer Users.logout() ArchiveSynchronizer.machine_status = "logged-out" self.main_window.open_login() return archived @staticmethod def labellify(v): if v is None: v = "-" elif isinstance(v, float): v = f"{v:.1f}" if not isinstance(v, str): v = str(v) return v def print(self, archived, label): self.log.info("cycle print") if archived is None: self.log.error("attempting to print empty label") return None if archived.label is not None: # raise AssertionError("this should never happen") self.log.info("printing already compiled label") # LABEL PRINT recipe = archived.test_data.get("recipe", {}) # Define barcode format variables part_number = recipe.get("part_number", "-") self.barcode_prefix = f"#{part_number} ###*1IT ECE" self.module_data_format = f"1IT ECE{archived.id:0>7}" self.barcode_suffix = "*=" self.barcode_format = self.print_step.spec.get("barcode") leak_test_1 = archived.test_data.get("leak_1", {}) leak_test_1_step = leak_test_1.get("step", {}) leak_test_1_step_spec = leak_test_1_step.get("spec", {}) leak_test_1_results = leak_test_1.get("results", {}) leak_test_2 = archived.test_data.get("leak_2", {}) leak_test_2_step = leak_test_2.get("step", {}) leak_test_2_step_spec = leak_test_2_step.get("spec", {}) leak_test_2_results = leak_test_2.get("results", {}) psetminp_a = leak_test_1_step_spec.get("test_pressure", 0) * (100 + leak_test_1_step_spec.get("test_pressure_qneg", 0) / 100) psetmaxp_a = leak_test_1_step_spec.get("settling_pressure_max_percent", 0) * (100 + leak_test_1_step_spec.get("test_pressure_qpos", 0) / 100) psetminp2_a = leak_test_2_step_spec.get("settling_pressure_min_percent", 0) * (100 + leak_test_2_step_spec.get("test_pressure_qneg", 0) / 100) psetmaxp2_a = leak_test_2_step_spec.get("settling_pressure_max_percent", 0) * (100 + leak_test_2_step_spec.get("test_pressure_qpos", 0) / 100) if self.tester_component is not None: if self.recipe.spec["leak_1"]: leak_test_1_results["Running test: pressure at the end of measure"] = ( leak_test_1_results["Running test: pressure at the end of settling"] + leak_test_1_results["Running test: measured leak"]) if self.recipe.spec["leak_2"]: leak_test_2_results["Running test: pressure at the end of measure"] = ( leak_test_2_results["Running test: pressure at the end of settling"] + leak_test_2_results["Running test: measured leak"]) printer_fields = self.print_step.spec context = { # RECIPE DATA "RECIPE": self.labellify(recipe.get("name", "-")), "CLIENT": self.labellify(recipe.get("client", "-")), "PART": self.labellify(recipe.get("part_number", "-")), "DESCRIPTION": self.labellify(recipe.get("description", "-")), "RECIPE_TO_PRINT": self.labellify(self.print_step.spec.get("labeltxt_5", "-").replace('{N11}', '')), # STEP SPEC "TPREFILL": self.labellify(leak_test_1_step.get("pre_filling_time", "-")), "PPREFILL": self.labellify(leak_test_1_step.get("pre_filling_pressure", "-")), "TFILL": self.labellify(leak_test_1_step.get("filling_time", "-")), "TSET": self.labellify(leak_test_1_step.get("settling_time", "-")), "TPREFILL2": self.labellify(leak_test_2_step.get("pre_filling_time", "-")), "PPREFILL2": self.labellify(leak_test_2_step.get("pre_filling_pressure", "-")), "TFILL2": self.labellify(leak_test_2_step.get("filling_time", "-")), "TSET2": self.labellify(leak_test_2_step.get("settling_time", "-")), "PSETMINP": self.labellify(leak_test_1_step.get("settling_pressure_min_percent", " -")), "PSETMAXP": self.labellify(leak_test_1_step.get("settling_pressure_max_percent", " -")), "PSETMINP2": self.labellify(leak_test_2_step.get("settling_pressure_min_percent", " -")), "PSETMAXP2": self.labellify(leak_test_2_step.get("settling_pressure_max_percent", " -")), "PSETMINP_A": self.labellify(psetminp_a), "PSETMAXP_A": self.labellify(psetmaxp_a), "PSETMINP2_A": self.labellify(psetminp2_a), "PSETMAXP2_A": self.labellify(psetmaxp2_a), "TTEST": self.labellify(leak_test_1_step.get("test_time", "-")), "TTEST2": self.labellify(leak_test_2_step.get("test_time", "-")), "PMIN": self.labellify(leak_test_1_step.get("test_pressure_qneg", "-")), "PMIN2": self.labellify(leak_test_2_step.get("test_pressure_qneg", "-")), "PTEST": self.labellify(leak_test_1_step.get("test_pressure", "-")), "PTEST2": self.labellify(leak_test_2_step.get("test_pressure", "-")), "PMAX": self.labellify(leak_test_1_step.get("test_pressure_qpos", "-")), "TFLUSH": self.labellify(leak_test_1_step.get("flush_time", "-")), "PFLUSH": self.labellify(leak_test_1_step.get("flush_pressure", "-")), # ACTUAL TESTED VALUES "RESPFILL": self.labellify(leak_test_1_results.get("Running test: filling pressure", "-")), "RESPFILL2": self.labellify(leak_test_2_results.get("Running test: filling pressure", "-")), "RESPSET": self.labellify(leak_test_1_results.get("Running test: pressure at the end of settling", "-")), "RESPSET2": self.labellify(leak_test_2_results.get("Running test: pressure at the end of settling", "-")), "RESPEND": self.labellify(leak_test_1_results.get("Running test: pressure at the end of measure", "-")), "RESPEND2": self.labellify(leak_test_1_results.get("Running test: pressure at the end of measure", "-")), "RESLEAK": self.labellify(leak_test_1_results.get("Running test: measured leak", "-")), "RESLEAK2": self.labellify(leak_test_2_results.get("Running test: measured leak", "-")), "RESRES": self.labellify(leak_test_1_results.get("Running test: result", "-")), "RESRES2": self.labellify(leak_test_2_results.get("Running test: result", "-")), # SERIAL DEFINITION "SN": str(archived.id), "SN4": f"{archived.id:0>4}", "SN5": f"{archived.id:0>5}", "SN6": f"{archived.id:0>6}", "SN7": f"{archived.id:0>7}", # TIME DEFINITION "DATETIME": archived.time.strftime("%d/%m/%Y %H:%M:%S"), "DATE": archived.time.strftime("%d/%m/%Y"), "TIME": archived.time.strftime("%H:%M:%S"), "YYYY": archived.time.strftime("%Y"), "YY": archived.time.strftime("%y"), "MO": archived.time.strftime("%m"), "DD": archived.time.strftime("%d"), "HH": archived.time.strftime("%H"), "MI": archived.time.strftime("%M"), "SS": archived.time.strftime("%S"), "JJJ": archived.time.strftime("%j"), "WW": archived.time.strftime("%W"), # EXTRA DATA "SHIFT": str(get_shift(archived.time)), "STATION": str(self.config.machine_id), "OPERATOR": str(archived.user.username), "BADGE_NUM": str(archived.user.badge_number), # BARCODE "BCODE": str(self.step.spec.get("barcode","")), # RESULT "RESULT": str("CONFORME" if leak_test_1.get("ok", False) else "SCARTO") + str(" FORZATO" if self.data.get("overridden", False) else ""), "RESULT_L1": "ESITO" + str(" FORZATO" if self.data.get("overridden", False) else ""), "RESULT_L2": str("CONFORME" if leak_test_1.get("ok", False) else "SCARTO"), } #TESTING BROTHER label_brother = context.get("RECIPE_TO_PRINT", "-") + context.get("DD","-") + context.get("MO","-") + context.get("YY","-") + context.get("SN5","-") barcode = str(label_brother) formatted_module_data = self.module_data_format.format(**context) check = self.calculate_modulo43_checksum(formatted_module_data) context.update( { "barcode_prefix": self.barcode_prefix, "check": check, "barcode_suffix": self.barcode_suffix, } ) formatted_barcode = self.barcode_format.format(**context) context['BCODE'] = formatted_barcode self.printed_barcode = formatted_barcode if self.archived is not None: self.archived.barcode = self.printed_barcode for n in range(5): field = f"labeltxt_{n + 1}" if field in printer_fields.keys(): if printer_fields[field] != "": context[field.upper()] = printer_fields[field] # PRINT MAIN PRODUCT LABEL if self.config["hardware_config"]["tecna_t3"] == "absent" and self.config["hardware_config"]["furness_controls"] == "absent": compiled_label = self.components["label_printer_2"].print_label(barcode) # Printing with Brother label printer else: compiled_label = self.components["label_printer"].print_label(label, context=context) self.log.info(f"Main label printed: {context!r}") # return fields used to print label for saving into test archive return context def print_extra_labels(self): # PRINT EXTRA LABELS IF NEEDED (BEFORE LEAK TEST) if "extra_label_printer" in self.components.keys() and self.print_step is not None and self.autotesting is False: printer_fields = self.print_step.spec if len(printer_fields["extra_label"]) > 0: labels = printer_fields["extra_label"].split(",") for label in labels: self.components["extra_label_printer"].print_label(f"{label}.prn", context=None) @pyqtSlot(str) def load_recipe_from_rfid(self, data): if data not in(None,''): self.tag_loaded_recipe = data if self.step.step_type == "barcode_recipe_selection": if data is not None: self.cycle_available_steps["barcode_recipe_selection"].widget.get(data) else: pass else: # fixture removed self.tag_loaded_recipe = None self.fail_cycle() self.change_recipe() def add_error(self, message, is_error): """ Add a new error message to the list of active errors. Args: message (str): The error message to add. is_error (bool): True if it's an error that should show in red, False for non-errors. """ # Avoid duplicates if (message, is_error) not in self.active_errors: self.active_errors.append((message, is_error)) # Start the timer if it's not already active if not self.error_timer.isActive(): self.error_timer.start() def remove_error(self, message): """ Remove an error message from the list of active errors. Args: message (str): The error message to remove. """ self.active_errors = [err for err in self.active_errors if err[0] != message] # Reset the error index if necessary if self.current_error_index >= len(self.active_errors): self.current_error_index = 0 # Stop the timer if there are no more errors if not self.active_errors: self.error_label.setText("") self.error_label.setStyleSheet("") self.error_timer.stop() def display_current_error(self): """ Display the current error from the active errors list. If there are multiple errors, it alternates between them every time this function is called. """ if self.active_errors: # Get the current error message and update the label message, is_error = self.active_errors[self.current_error_index] self.error_label.setText(message) if is_error: self.error_label.setStyleSheet("color: red;") else: self.error_label.setStyleSheet("color: green;") # Move to the next error for the next call to this method self.current_error_index = (self.current_error_index + 1) % len(self.active_errors) else: # Clear the label if there are no errors self.error_label.setText("") self.error_label.setStyleSheet("") @pyqtSlot(bool) def handle_rfid_error(self, connected): """ Handle errors related to the RFID system. Args: connected (bool): True if connected, False if not. """ if connected: self.remove_error("Errore RFID.") # Remove the RFID error else: self.add_error("Errore RFID.", True) # Add the RFID error @pyqtSlot(bool, str) def handle_modbus_error(self, has_error, error_message): """ Handle Tecna/Modbus errors and manage the error list. Args: has_error (bool): True if there is an error, False otherwise. error_message (str): The error message to add. """ #print(f"DEBUG: Modbus error handler called - has_error={has_error}, error_message={error_message}") # Debugging if has_error: self.add_error(f"Errore Tecna", True) # Add the Modbus error else: self.remove_error(f"Errore Tecna") # Remove the Modbus error def update_label_with_args(self, extra_info=None): """ Updates the flag label with the string representation of current command-line arguments and optional extra info, directly displaying it on the label. Args: extra_info (str): Optional. Extra information to append to the label text. """ # Combine command-line arguments args_text = " ".join(sys.argv[1:]) if len( sys.argv) > 1 else "No system arguments provided." # Default to No args if args_text: # If there are CLI arguments (or default message) # Combine CLI args and extra info for label text if extra_info: args_text += f" | {extra_info}" # Update the label text directly with args_text self.flag_label.setText(args_text) self.flag_label.setStyleSheet("QLabel { color: red; font-weight: bold; }") # Customize color if needed self.flag_label.setVisible(True) # Ensure the label is visible else: # No args provided self.flag_label.setVisible(False) # Hide label def calculate_modulo43_checksum(self,data_sequence: str) -> str: """ Calculates the Modulo 43 checksum for a given data sequence. The function determines the checksum value for each character based on a predefined assignment table, calculates the total sum of these values, and then finds the remainder of the division by 43. The character corresponding to this remainder is the check digit. Args: data_sequence: The input string for which to calculate the checksum. Returns: The Modulo 43 check digit as a single character. Raises: ValueError: If the data_sequence contains a character not found in the assignment table. """ assignment_table = { '0': 0, '1': 1, '2': 2, '3': 3, '4': 4, '5': 5, '6': 6, '7': 7, '8': 8, '9': 9, 'A': 10, 'B': 11, 'C': 12, 'D': 13, 'E': 14, 'F': 15, 'G': 16, 'H': 17, 'I': 18, 'J': 19, 'K': 20, 'L': 21, 'M': 22, 'N': 23, 'O': 24, 'P': 25, 'Q': 26, 'R': 27, 'S': 28, 'T': 29, 'U': 30, 'V': 31, 'W': 32, 'X': 33, 'Y': 34, 'Z': 35, '-': 36, '.': 37, ' ': 38, '$': 39, '/': 40, '+': 41, '%': 42 } total_sum = 0 for char in data_sequence: if char in assignment_table: total_sum += assignment_table[char] else: raise ValueError(f"Character '{char}' is not valid for checksum calculation.") remainder = total_sum % 43 # Invert the assignment_table to map the remainder back to the character for char, value in assignment_table.items(): if value == remainder: return char # This part of the code should be unreachable given the logic return "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"