import copy import logging import os import sys import weakref from datetime import datetime, timedelta from PyQt5.QtCore import QTimer, pyqtSlot from PyQt5.QtWidgets import QMessageBox from lib.db import Archive, Recipes, Users from lib.helpers import get_shift from lib.helpers.step import Step from playhouse.shortcuts import model_to_dict from ui.barcode_recipe_selection import Barcode_Recipe_Selection from ui.helpers import replace_widget from ui.recipe_selection import Recipe_Selection from ui.test_assembly import Test_Assembly from ui.test_barcodes import Test_Barcodes from ui.test_connector import Test_Connector from ui.test_count import Test_Count from ui.test_count_end import Test_Count_End from ui.test_fail import Test_Fail from ui.test_instructions import Test_Instructions from ui.test_pipe_cutter import Test_Pipe_Cutter from ui.test_leak import Test_Leak from ui.test_resistance import Test_Resistance from ui.test_screws import Test_Screws from ui.test_vision import Test_Vision from ui.test_warning_img import Test_Warning_Img from ui.widget import Widget from components import ArchiveSynchronizer from src.components.rfid_pn532 import RFID_PN532 class Test(Widget): def __init__(self, config, components=None, main_window=None): super().__init__() self.autotest_timer = None self.main_window = main_window self.config = config self.components = components # GET LOGGER self.log = logging.getLogger("Test") # SHOW MACHINE DESCRIPTION self.machine_description_l.setText(self.config.get("machine", {}).get("description", "N/A")) # SHOW USERNAME session = Users.get_session() self.user_l.setText(session.username) if session.is_admin: self.user_l.setStyleSheet("QLabel { color: red; }") else: self.user_l.setStyleSheet("") # SHOW AND UPDATE TIME CLOCK self.refresh_time(init=True) # INIT RECIPE self.recipe = None self.auto_import = Recipe_Selection self.archive_synch = ArchiveSynchronizer(config=config) self.rfid = RFID_PN532(config=config) self.error_label.setText("") self.error_label.setStyleSheet("QLabel { color: red; }") self.rfid.rfid_error_signal.connect(self.handle_rfid_error) if self.config["hardware_config"]["barcode_recipe_selection"] == "present": self.recipe_selection_mode = "barcode" else: self.recipe_selection_mode = "table" self.step = None self.tester_component = None if self.config["hardware_config"]["tecna_t3"] == "present": self.tester_component = "tecna_t3" elif self.config["hardware_config"]["furness_controls"] == "present": self.tester_component = "furness_control" self.unsupported_steps = set() self.steps_dependencies = { "count": set(), "connector": {"multicomp", }, "instruction": {"digital_io"}, "screws": {"screwdriver", "tecna_t3", }, "resistance": {"multicomp", }, "leak_1": {self.tester_component, }, "leak_2": {self.tester_component, }, "pipe_cutter": {"pipe_cutter"}, "vision": {("uvc_camera", "galaxy_camera",), "vision", "vision_saver", }, # "neo_pixels", }, "print": {"label_printer_2"} if self.config["hardware_config"]["label_printer"] != "present" else {"label_printer"}, } self.unsupported_steps = set() for step_name, dependencies in self.steps_dependencies.items(): for dependency in dependencies: if isinstance(dependency, tuple): # if all([d not in self.components or not self.components[d].ready for d in dependency]): if all([d not in self.components for d in dependency]): self.unsupported_steps.add(step_name) else: # if dependency not in self.components or not self.components[dependency].ready: if dependency not in self.components: self.unsupported_steps.add(step_name) # INIT PIECES COUNTER self.pieces = {"ok": 0, "ko": 0} # INIT CYCLE STATES self.cycle_available_steps = { # "assembly_1": Test_Assembly(img_path=self.select_step_img("assembly_1"), text=u"INSERIRE SENSORE", widget=None), "barcodes": Test_Assembly(img_path=self.select_step_img("scan"), text=u"LEGGERE IL BARCODE DEL PEZZO DA COLLAUDARE", widget=Test_Barcodes()), "connector": Test_Assembly(img_path=self.select_step_img("scan"), text=u"COLLEGARE IL CONNETTORE INDICATO AL PEZZO E LEGGERE IL SUO BARCODE", widget=Test_Connector(run_once=True)), "count": Test_Assembly(img_path=None, text=u"INSERIRE IL NUMERO DI PEZZI ATTESI PER IL LOTTO", widget=Test_Count(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces, run_once=True)), "warning_img": Test_Assembly(img_path=None, text=u"ATTENZIONE - PER QUESTO CODICE ESEGUIRE LE OPERAZIONI INDICATE IN FIGURA", widget=Test_Warning_Img(components=self.components, recipe=self.recipe,bench_name=self.config["machine"]["image_for_warning"], step=self.step, run_once=True)), "count_end": Test_Assembly(img_path=None, text=u"LOTTO TERMINATO, PREMERE CONTINUA PERCOMINCIARNE UNO NUOVO", widget=Test_Count_End(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)), "done": Test_Assembly(img_path=self.select_step_img("success"), text=u"COLLAUDO COMPLETATO", widget=None), "emergency": Test_Assembly(img_path=self.select_step_img("reset_emergency"), text=u"EMERGENZA INTERVENUTA - RIPRISTINARE PULSANTE E SELEZIONARE \"RESET EMERGENZA\" DAL MEN\u00d9 \"STRUMENTI\"", widget=None), "fail": Test_Assembly(img_path=self.select_step_img("fail"), text=u"CICLO INTERROTTO, PREMERE CONTINUA PER COMINCIARE UN NUOVO CICLO", widget=Test_Fail(parent=self)), "blow": Test_Assembly(img_path=None, text=u"SOFFIAGGIO TUBO IN CORSO - ATTENDERE...", widget=Test_Warning_Img(components=self.components, recipe=self.recipe, step=self.step)), "leak_1": Test_Assembly(img_path=None, text=None, widget=Test_Leak(config=self.config,components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces, parent=self)) if self.config["hardware_config"]["tecna_t3"] != "absent" or self.config["hardware_config"]["furness_controls"] !="absent" else None, "leak_2": Test_Assembly(img_path=None, text=None, widget=Test_Leak(config=self.config,components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces, parent=self)) if self.config["hardware_config"]["tecna_t3"] != "absent" or self.config["hardware_config"]["furness_controls"] != "absent" else None, "flush": Test_Assembly(img_path=None, text=u"SCARICO ARIA IN CORSO - ATTENDERE...", widget=Test_Warning_Img(components=self.components, recipe=self.recipe, step=self.step)), "instruction": Test_Assembly(img_path=None, text=u"ESEGUIRE LE OPERAZIONI DI MONTAGGIO INDICATE IN FIGURA", widget=Test_Instructions(config=self.config,components=self.components, recipe=self.recipe, bench_name=self.config.machine_id, step=self.step)), "pipe_cutter": Test_Assembly(img_path=None, text=u"ATTENZIONE TAGLIO CORRUGATOIN CORSO",widget=Test_Pipe_Cutter(config=self.config, components=self.components,recipe=self.recipe, bench_name=self.config.machine_id,step=self.step)), "instruction_extra": Test_Assembly(img_path=None, text=u"ESEGUIRE LE OPERAZIONI DI MONTAGGIO EXTRA INDICATE IN FIGURA", widget=Test_Instructions(config=self.config, components=self.components,recipe=self.recipe, bench_name=self.config.machine_id,step=self.step)), "piece_removal": Test_Assembly(img_path=None, text=u"RIMUOVERE IL PEZZO APRENDO TUTTE LE CHIUSURE", widget=Test_Instructions(config=self.config,components=self.components, recipe=self.recipe, bench_name=self.config.machine_id, step=self.step)), "print": Test_Assembly(img_path=self.select_step_img("print"), text=u"STAMPA ETICHETTA IN CORSO", widget=None), "resistance": Test_Assembly(img_path=None, text=u"COLLEGARE CONNETTORE ELETTRICO PER EFFETTUARE PROVA RESISTENZA", widget=Test_Resistance(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)), "screws": Test_Assembly(img_path=None, text=u"AVVITARE TUTE LE VITI COME INDICATO", widget=Test_Screws(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)), "select_recipe": Test_Assembly(img_path=None, text=u"SELEZIONARE IL CODICE DA COLLAUDARE", widget=Recipe_Selection(config=self.config, unsupported_steps=self.unsupported_steps)), "barcode_recipe_selection": Test_Assembly(img_path=self.select_step_img("scan"), text=u"LEGGERE IL BARCODE SULLA DIMA DEL COMPONENTE DA COLLAUDARE", widget=Barcode_Recipe_Selection(parent=self)), "vision": Test_Assembly(img_path=None, text=u"VERIFICARE CONTROLLO CON TELECAMERA", widget=Test_Vision(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)), "wait": Test_Assembly(img_path=self.select_step_img("wait"), text=u"ATTENDERE - PAUSA INTER CICLO", widget=None), None: Test_Assembly(img_path=self.select_step_img("warning"), text=u"ATTENZIONE - LA RICETTA SELEZIONATA NON CONTIENE FASI DI TEST", widget=None), } self.cycle_steps = None self.cycle_index = -1 self.print_step = None self.last_label = None self.require_discard_piece = False # SETUP AUTOTEST self.autotest_request = False self.autotesting = False self.autotesting_reason = None self.autotest_cycle_steps = None if "--no-autotest" not in sys.argv: self.autotest_period = int(8.5 * 60 * 60 * 1000) # 8.5 HOURS # self.autotest_period = 12 * 60 * 60 * 1000 # 12 HOURS # if not self.config["autotest_done"]: # self.request_autotest("init") else: self.autotest_period = None # INIT TEST DATA self.data = {"ok": True, "overridden": False} self.archived = None # CONNECT CYCLE CONTROLS self.cancel_b.clicked.connect(self.fail_cycle) self.change_recipe_b.clicked.connect(self.change_recipe) self.reset_count_b.clicked.connect(self.reset_count) for step_name, w in self.cycle_available_steps.items(): if hasattr(w, "ok"): # custom ok handlers should call next again if isinstance(w.widget, Recipe_Selection): w.ok.connect(self.set_recipe) else: w.ok.connect(lambda data=None, step_namel=step_name, selfie=weakref.ref(self): selfie().set_step(step_namel, data)) if hasattr(w, "ko"): w.ko.connect(self.fail_cycle) # CUSTOM STEP CONNECTIONS self.cycle_available_steps["count"].ok.connect(self.cycle_available_steps["count_end"].widget.set_amount) # self.cycle_available_steps["warning_img"].ok.connect(self.cycle_available_steps["warning_img"].widget.set_done) if "fixture_id" in self.components.keys(): self.components["fixture_id"].new_id_signal.connect(self.load_recipe_from_rfid) self.components["fixture_id"].rfid_error_signal.connect(self.handle_rfid_error) self.tag_loaded_recipe = self.main_window.tag_loaded_recipe # TESTING if "--test" in sys.argv: self.testing = True else: self.testing = False # /TESTING # START CYCLE self.next_timer = QTimer() self.next_timer.setSingleShot(True) self.next_timer.timeout.connect(self.next) self.next() def refresh_time(self, init=False): if init: self.time_timer = QTimer() self.time_timer.setSingleShot(True) self.time_timer.timeout.connect(self.refresh_time) t = datetime.now() self.time_l.setText("{d}/{mo}/{y}\n{h}:{m}".format(y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute)) self.time_timer.start(60 - t.second) def select_step_img(self, step, suffix=None): img_path = "./src/ui/imgs" names = [] if suffix is not None: names.append(f"{step}_{suffix}_{self.config.machine_id}") names.append(f"{step}_{suffix}") names.append(f"{step}_{self.config.machine_id}") names.append(f"{step}") for name in names: for ext in ["png", "jpg"]: path = f"{img_path}/{name}.{ext}" if os.path.isfile(path): return path raise FileNotFoundError(f"No image was found for step {step}") def change_recipe(self): self.next(action="change_recipe") def set_recipe_mode_table(self): self.recipe_selection_mode = "table" self.change_recipe() def set_recipe_mode_barcode(self): self.recipe_selection_mode = "barcode" self.change_recipe() def reprint_label(self): self.print(self.last_label, self.print_step.spec.get("template", "EtichettaR5")) def fail_cycle(self): self.next(action="fail") def setCentralWidget(self, widget): replace_widget(self, "centralWidget", widget) def request_autotest(self, reason): # you can cancel the request calling request_autotest(False) if "--no-autotest" not in sys.argv: self.log.info(f"cycle request autotest: reason: {reason!r} autotest_request: {self.autotest_request!r}") if reason in ("init", "login"): self.autotest_timer = QTimer() self.autotest_timer.setSingleShot(False) self.autotest_timer.timeout.connect(self.request_periodic_autotest) if self.autotest_period is not None: self.autotest_timer.start(self.autotest_period) reason = "boot" if self.config["autotest_leak"]["enabled"] == "true": self.autotest_request = reason else: self.autotest_request = False else: self.log.info(f"Autotest request ignored (reason: {reason!r}) --no-autotest flag detected") if reason == "logout": self.next(action="abort") def request_periodic_autotest(self): self.request_autotest("periodic") def next(self, action=None): if self.step is not None: self.log.info(f"cycle step: {self.step.step_type!r} action: {action!r} current index:{self.cycle_index}") else: self.log.info(f"cycle step: {self.step!r} action: {action!r} current index:{self.cycle_index}") current_w = self.centralWidget if hasattr(current_w, "stop"): current_w.stop() if action == "change_recipe": self.log.info(f"cycle next: action: {action!r}") self.set_recipe(recipe=None) if self.config["hardware_config"]["tecna_t3"] == "present" or self.config["hardware_config"]["furness_controls"] == "present": self.cycle_available_steps["leak_1"].widget.recipe_written = False self.cycle_available_steps["leak_2"].widget.recipe_written = False self.step = Step(step_type="select_recipe") self.cycle_index = -1 self.recipe = None self.cycle_steps = None # COUNT RESET self.pieces["ok"] = 0 self.pieces["ko"] = 0 elif action in ("fail", "abort"): self.log.info(f"cycle next: action: {action!r}") # FAIL AND RESTART TEST self.step = Step(step_type="fail") self.cycle_index = -1 # COUNT FAIL if action == "fail": self.done(ok=False) elif action is not None: raise NotImplementedError(f"cycle next: action {action!r} is not a valid action") # if action did not set the next cycle step # set next cycle step normally if self.recipe is None or self.cycle_steps is None: # if recipe not set: select_recipe if self.recipe_selection_mode == "barcode": self.log.info(f"returning to barcode recipe selection") self.step = Step(step_type="barcode_recipe_selection") else: self.log.info(f"returning to recipe selection table") self.step = Step(step_type="select_recipe") elif action is None: if self.autotest_request is not False and self.autotest_cycle_steps is not None and not self.autotesting and (self.cycle_index == -1 or self.cycle_index + 1 >= len(self.cycle_steps)): # if autotest was requested # and if cycle_steps is not started or has ended self.autotesting = True self.autotesting_reason = self.autotest_request self.autotest_request = False self.log.info(f"Autotest requested (reason: {self.autotesting_reason})") if self.autotest_period is not None: # reset periodic autotest timer self.time_timer.start(self.autotest_period) self.require_discard_piece = False if self.autotesting: if self.cycle_index + 1 < len(self.autotest_cycle_steps): # goto next step in autotest_cycle_steps self.cycle_index = (self.cycle_index + 1) % len(self.autotest_cycle_steps) self.step = self.autotest_cycle_steps[self.cycle_index] else: # autotest ended self.autotesting = False if self.autotesting_reason == "logout": Users.logout() self.main_window.open_login() else: t = datetime.now() self.last_at_l.setText("{d}/{mo}/{y} {h}:{m}".format(y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute)) t += timedelta(seconds=int(self.autotest_period / 1000)) self.next_at_l.setText("{d}/{mo}/{y} {h}:{m}".format(y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute)) self.autotesting_reason = None self.cycle_index = -1 self.config["autotest_done"] = True if not self.autotesting: if len(self.cycle_steps): # goto next step in cycle_steps self.cycle_index = (self.cycle_index + 1) % len(self.cycle_steps) self.step = self.cycle_steps[self.cycle_index] else: self.cycle_index = -1 self.step = Step(step_type=None) # enable/disable cycle controls self.change_recipe_b.setEnabled(self.recipe is not None) self.cancel_b.setEnabled(self.step.step_type is not None and self.step.step_type not in { "emergency", "fail", "select_recipe", "wait", }) self.log.info(f"next cycle step: {self.step.step_type!r}") # INIT TEST DATA IF STARTING CYCLE LOOP OR IF RESET IS NEEDED if self.cycle_index == 0: self.data = {"ok": True, "overridden": False} self.archived = None if self.recipe is not None and "recipe" not in self.data: self.data["recipe"] = model_to_dict(self.recipe) w = self.cycle_available_steps[self.step.step_type] show = None if self.step.step_type == "leak_2": self.setCentralWidget(w) # NEED TO PRESHOW UI if hasattr(w, "start"): show = w.start(recipe=self.recipe, step=self.step, pieces=self.pieces) if show is not False and w is not current_w: self.setCentralWidget(w) elif show is False: self.next_timer.start(0) if self.step.step_type == "done": self.archived = self.done() self.last_label = copy.deepcopy(self.archived) self.next_timer.start(500) elif self.step.step_type == "print": compiled_label = self.print(self.archived, self.step.spec.get("template", "EtichettaR5")) self.archived.label = compiled_label self.log.info(f"Label printed. Saving...") #self.archived.save() self.main_window.main_window.run_request.emit(self.archived.save, [], {}) self.next_timer.start(500) elif self.step.step_type == "wait": self.next_timer.start(500) # UPDATE COUNT DISPLAY self.update_count_display() def reset_count(self): # COUNT RESET self.pieces["ok"] = 0 self.pieces["ko"] = 0 self.update_count_display() def update_count_display(self): self.pieces_count_l.setText(f"{self.pieces['ok']} OK / {self.pieces['ko']} NOK / {sum(self.pieces.values())} TOT") def set_recipe(self, recipe=None): self.recipe = recipe inserted_instruction = False self.require_discard_piece = False if self.recipe is None: self.cycle_steps = None self.autotest_cycle_steps = None else: steps = self.recipe.get_steps() skip = set() print_found = False count_found = False # create step sequence list barcode_names = ['serial', 'barcode_input_2', 'barcode_input_3', 'barcode_input_4', 'barcode_input_5'] for i, step in enumerate(steps): if step.step_type == "barcodes": n_pieces = int(step.spec.get("n_pieces", 1)) n_pieces_adapted = n_pieces if n_pieces_adapted == 1: step.spec["barcode_name"] = 'serial' else: step.spec["barcode_name"] = barcode_names[(n_pieces_adapted - 1) % len(barcode_names)] n_pieces_adapted -= 1 new_barcode_step = copy.deepcopy(step) new_barcode_step.spec["n_pieces"] = str(n_pieces_adapted) steps.insert(i + 1, new_barcode_step) if i in skip: continue if step.step_type == "vision": self.components["vision"].config_changed(vision_recipe=self.recipe.name) if step.step_type == "count": count_found = True if "warning_img" in step.spec: if step.spec["warning_img"]: steps.insert(i, Step(step_type="warning_img", spec={"warning_img": step.spec["warning_img"]})) skip.add(i + 1) if step.step_type in "pipe_cutter": self.pipe_cutter_step = step if "assembly" in step.spec: if step.spec["assembly"]: steps.insert(i, Step(step_type="instructions", spec={})) skip.add(i + 1) if "require_discard_piece" in step.spec: if step.spec["require_discard_piece"]: self.require_discard_piece = True if step.step_type == "resistance": # ADD STEP TO ENSURE REMOVAL OF CONNECTOR steps.insert(i + 1, Step(step_type="resistance", spec={ "scale": 500, "expected": float("+inf"), "tolerance_pos": 0, "tolerance_neg": 0, })) skip.add(i + 1) if step.step_type == "print": if print_found: continue steps.insert(i, Step(step_type="done")) print_found = True self.print_step = step if self.config["hardware_config"].get("enforce_piece_removal", "no") == "yes": steps.append(Step(step_type="piece_removal")) if count_found: steps.append(Step(step_type="count_end")) if step.step_type in ("leak_1", "leak_2"): self.leak_step = step step_types = [step.step_type for step in steps] if "leak_1" in step_types and "leak_2" in step_types: leak1_index = step_types.index("leak_1") leak2_index = step_types.index("leak_2") if leak1_index + 1 == leak2_index: # Ensure 'leak_1' is immediately followed by 'leak_2' if self.config["hardware_config"].get("second_leak_test", "yes") == "no": steps.insert(leak2_index, Step(step_type="instruction_extra", spec={})) inserted_instruction = True # Insert 'instruction_extra' after the first 'instructions' if not inserted between leaks if not inserted_instruction: for i, step in enumerate(steps): if step.step_type == "instructions": steps.insert(i + 1, Step(step_type="instruction_extra", spec={})) inserted_instruction = True break if not print_found: steps.append(Step(step_type="done")) if count_found: steps.append(Step(step_type="count_end")) steps.append(Step(step_type="wait")) self.cycle_steps = steps self.check_steps_dependencies(self.cycle_steps) leak_autotest_steps = [] # CONFIGURE LEAK AUTOTEST PARAMETERS if self.config["autotest_leak"]["enabled"] == "true": l_at_1 = copy.deepcopy(self.config["autotest_leak"]) l_at_1.pop("enabled") l_at_1 = {a: float(x) for a, x in l_at_1.items()} l_at_1["autotest"] = "ko_check" l_at_2 = copy.deepcopy(self.config["autotest_leak"]) l_at_2.pop("enabled") l_at_2 = {a: float(x) for a, x in l_at_2.items()} l_at_2["test_pressure_qneg"] = l_at_2["test_pressure_tt_qneg"] l_at_2["test_pressure_qpos"] = l_at_2["test_pressure_tt_qpos"] l_at_2["autotest"] = "ok_check" leak_autotest_steps = [Step(step_type="leak_1", spec=l_at_1), Step(step_type="leak_1", spec=l_at_2)] self.autotest_cycle_steps = [ *([ Step(step_type="resistance", spec={ "scale": 500, "expected": 1, "tolerance_pos": 5, "tolerance_neg": 5, "autotest": True, }), Step(step_type="resistance", spec={ "scale": 500, "expected": float("+inf"), "tolerance_pos": 0, "tolerance_neg": 0, "autotest": True, }), Step(step_type="resistance", spec={ "scale": 500, "expected": 10, "tolerance_pos": 1, "tolerance_neg": 1, "autotest": True, }), Step(step_type="resistance", spec={ "scale": 500, "expected": float("+inf"), "tolerance_pos": 0, "tolerance_neg": 0, "autotest": True, }), ] if "resistance" not in self.unsupported_steps else []), *(leak_autotest_steps), Step(step_type="done"), Step(step_type="wait"), ] for w in self.cycle_available_steps.values(): if hasattr(w, "reset"): w.reset() # UPDATE RECIPE DISPLAY if self.recipe is not None: self.log.info(f"set recipe: {model_to_dict(self.recipe)!r} cycle steps: {[s.step_type for s in self.cycle_steps]}") self.recipe_l.setText(self.recipe.name) self.recipe_l.setStyleSheet("") self.cycle_index = -1 self.next() else: self.log.info(f"set recipe: {self.recipe!r} cycle steps: {self.cycle_steps}") self.recipe_l.setText("NON SELEZIONATA") self.recipe_l.setStyleSheet("QLabel { color: red; }") def check_steps_dependencies(self, steps): unsupported_steps = set() missing_components = set() for step in steps: if step.step_type in self.unsupported_steps or step.step_type not in self.cycle_available_steps: unsupported_steps.add(step.step_type) else: for dependency in self.steps_dependencies.get(step.step_type, []): if isinstance(dependency, tuple): if all([d not in self.components for d in dependency]): unsupported_steps.add(step.step_type) else: unready = set() for d in dependency: if d in self.components: if not self.components[d].ready: self.components[d].reconfigure() if not self.components[d].ready: unready.add(d) else: unready.add(d) if unready == set(dependency): missing_components.add(" or ".join(dependency)) else: if dependency not in self.components: unsupported_steps.add(step.step_type) else: if not self.components[dependency].ready: self.components[dependency].reconfigure() if not self.components[dependency].ready: missing_components.add(dependency) if len(unsupported_steps): QMessageBox.critical(None, "Errore Ricetta", f"Questa ricetta contiene uno o piu step non supportati da questo banco:\n{', '.join(sorted(unsupported_steps))}\nModificare la ricetta adeguatamente.") if len(missing_components): QMessageBox.critical(None, "Errore Componenti Ricetta", f"Questa ricetta richiede i seguenti componenti per essere eseguita:\n{', '.join(sorted(missing_components))}\nModificare la ricetta adeguatamente o collegare i componenti elencati.") if len(unsupported_steps) or len(missing_components): self.change_recipe() def set_step(self, step_name, data=None): if step_name not in self.data: self.data[step_name] = {} if data is not None: data["step"] = self.step.spec data["step"].pop("name", None) # MAKE ARRAY ONLY IF MORE THAN ONE TEST OF SAME TYPE if len(self.data[step_name]) > 1: self.data[step_name][str(len(self.data[step_name]))] = data else: self.data[step_name] = data self.data["overridden"] = self.data["overridden"] or data.get("overridden", False) self.data["ok"] = self.data["ok"] and data.get("ok", False) self.next() def done(self, ok=True): self.log.info("cycle done, saving data...") # remove useless info self.data.get("recipe", {}).get("spec", {}).pop("steps", None) for leak in ["leak_1", "leak_2"]: if leak in self.data.keys(): results = {k: self.data[leak]["results"][self.tester_component][k] for k in ["Running test: result"]} results.update({k: round(float(self.data[leak]["results"][self.tester_component][k]), 2) for k in ["Running test: filling pressure", "Running test: measured leak", "Running test: pressure at the end of settling"]} ) self.data[leak]["results"] = results if "vision" in self.data: vision_test_1 = self.data.get("vision", {}).get("0", {}) out_paths = self.components["vision_saver"].save( save_time=vision_test_1.get("time", None), frame=vision_test_1.get("frame", None), # vision=vision_test_1.get("detections", None), ) self.data.get("vision", {}).get("0", {})["files"] = out_paths self.log.info(f"cycle vision saved locally: {out_paths!r}") for vision in self.data.get("vision", {}).values(): vision.pop("frame", None) vision.pop("render", None) vision.pop("detections", None) if "results" in vision.keys(): vision["results"].pop("results", None) if self.autotesting: self.data["autotest"] = True self.data["autotest_reason"] = self.autotesting_reason self.data["recipe"]["name"] = "AUTOTEST" archived = Archive.archive(self.data, ok and self.data["ok"], overridden=self.data["overridden"]) self.log.info(f"cycle archived locally: {archived!r}") if not self.autotesting: # COUNT OK if ok: self.pieces["ok"] += 1 else: self.pieces["ko"] += 1 else: if self.autotesting_reason == "logout": if ok: self.main_window.open_login() return archived @staticmethod def labellify(v): if v is None: v = "-" elif isinstance(v, float): v = f"{v:.1f}" if not isinstance(v, str): v = str(v) return v def print(self, archived, label): self.log.info("cycle print") if archived is None: self.log.error("attempting to print empty label") return None if archived.label is not None: # raise AssertionError("this should never happen") self.log.info("printing already compiled label") # LABEL PRINT recipe = archived.test_data.get("recipe", {}) leak_test_1 = archived.test_data.get("leak_1", {}) leak_test_1_step = leak_test_1.get("step", {}) leak_test_1_step_spec = leak_test_1_step.get("spec", {}) leak_test_1_results = leak_test_1.get("results", {}) leak_test_2 = archived.test_data.get("leak_2", {}) leak_test_2_step = leak_test_2.get("step", {}) leak_test_2_step_spec = leak_test_2_step.get("spec", {}) leak_test_2_results = leak_test_2.get("results", {}) psetminp_a = leak_test_1_step_spec.get("test_pressure", 0) * (100 + leak_test_1_step_spec.get("test_pressure_qneg", 0) / 100) psetmaxp_a = leak_test_1_step_spec.get("settling_pressure_max_percent", 0) * (100 + leak_test_1_step_spec.get("test_pressure_qpos", 0) / 100) psetminp2_a = leak_test_2_step_spec.get("settling_pressure_min_percent", 0) * (100 + leak_test_2_step_spec.get("test_pressure_qneg", 0) / 100) psetmaxp2_a = leak_test_2_step_spec.get("settling_pressure_max_percent", 0) * (100 + leak_test_2_step_spec.get("test_pressure_qpos", 0) / 100) if self.tester_component is not None: leak_test_1_results["Running test: pressure at the end of measure"] = leak_test_1_results["Running test: pressure at the end of settling"] + leak_test_1_results["Running test: measured leak"] printer_fields = self.print_step.spec context = { # RECIPE DATA "RECIPE": self.labellify(recipe.get("name", "-")), "CLIENT": self.labellify(recipe.get("client", "-")), "PART": self.labellify(recipe.get("part_number", "-")), "DESCRIPTION": self.labellify(recipe.get("description", "-")), "RECIPE_TO_PRINT": self.labellify(self.print_step.spec.get("labeltxt_5", "-").replace('{N11}', '')), # STEP SPEC "TPREFILL": self.labellify(leak_test_1_step.get("pre_filling_time", "-")), "PPREFILL": self.labellify(leak_test_1_step.get("pre_filling_pressure", "-")), "TFILL": self.labellify(leak_test_1_step.get("filling_time", "-")), "TSET": self.labellify(leak_test_1_step.get("settling_time", "-")), "TPREFILL2": self.labellify(leak_test_2_step.get("pre_filling_time", "-")), "PPREFILL2": self.labellify(leak_test_2_step.get("pre_filling_pressure", "-")), "TFILL2": self.labellify(leak_test_2_step.get("filling_time", "-")), "TSET2": self.labellify(leak_test_2_step.get("settling_time", "-")), "PSETMINP": self.labellify(leak_test_1_step.get("settling_pressure_min_percent", " -")), "PSETMAXP": self.labellify(leak_test_1_step.get("settling_pressure_max_percent", " -")), "PSETMINP2": self.labellify(leak_test_2_step.get("settling_pressure_min_percent", " -")), "PSETMAXP2": self.labellify(leak_test_2_step.get("settling_pressure_max_percent", " -")), "PSETMINP_A": self.labellify(psetminp_a), "PSETMAXP_A": self.labellify(psetmaxp_a), "PSETMINP2_A": self.labellify(psetminp2_a), "PSETMAXP2_A": self.labellify(psetmaxp2_a), "TTEST": self.labellify(leak_test_1_step.get("test_time", "-")), "TTEST2": self.labellify(leak_test_2_step.get("test_time", "-")), "PMIN": self.labellify(leak_test_1_step.get("test_pressure_qneg", "-")), "PMIN2": self.labellify(leak_test_2_step.get("test_pressure_qneg", "-")), "PTEST": self.labellify(leak_test_1_step.get("test_pressure", "-")), "PTEST2": self.labellify(leak_test_2_step.get("test_pressure", "-")), "PMAX": self.labellify(leak_test_1_step.get("test_pressure_qpos", "-")), "TFLUSH": self.labellify(leak_test_1_step.get("flush_time", "-")), "PFLUSH": self.labellify(leak_test_1_step.get("flush_pressure", "-")), # ACTUAL TESTED VALUES "RESPFILL": self.labellify(leak_test_1_results.get("Running test: filling pressure", "-")), "RESPFILL2": self.labellify(leak_test_2_results.get("Running test: filling pressure", "-")), "RESPSET": self.labellify(leak_test_1_results.get("Running test: pressure at the end of settling", "-")), "RESPSET2": self.labellify(leak_test_2_results.get("Running test: pressure at the end of settling", "-")), "RESPEND": self.labellify(leak_test_1_results.get("Running test: pressure at the end of measure", "-")), "RESPEND2": self.labellify(leak_test_1_results.get("Running test: pressure at the end of measure", "-")), "RESLEAK": self.labellify(leak_test_1_results.get("Running test: measured leak", "-")), "RESLEAK2": self.labellify(leak_test_2_results.get("Running test: measured leak", "-")), "RESRES": self.labellify(leak_test_1_results.get("Running test: result", "-")), "RESRES2": self.labellify(leak_test_2_results.get("Running test: result", "-")), # SERIAL DEFINITION "SN": str(archived.id), "SN4": f"{archived.id:0>4}", "SN5": f"{archived.id:0>5}", "SN6": f"{archived.id:0>6}", # TIME DEFINITION "DATETIME": archived.time.strftime("%d/%m/%Y %H:%M:%S"), "DATE": archived.time.strftime("%d/%m/%Y"), "TIME": archived.time.strftime("%H:%M:%S"), "YYYY": archived.time.strftime("%Y"), "YY": archived.time.strftime("%y"), "MO": archived.time.strftime("%m"), "DD": archived.time.strftime("%d"), "HH": archived.time.strftime("%H"), "MI": archived.time.strftime("%M"), "SS": archived.time.strftime("%S"), "JJJ": archived.time.strftime("%j"), # EXTRA DATA "SHIFT": str(get_shift(archived.time)), "STATION": str(self.config.machine_id), "OPERATOR": str(archived.user.username), "BADGE_NUM": str(archived.user.badge_number), # RESULT "RESULT": str("CONFORME" if leak_test_1.get("ok", False) else "SCARTO") + str(" FORZATO" if self.data.get("overridden", False) else ""), "RESULT_L1": "ESITO" + str(" FORZATO" if self.data.get("overridden", False) else ""), "RESULT_L2": str("CONFORME" if leak_test_1.get("ok", False) else "SCARTO"), } #TESTING BROTHER label_brother = context.get("RECIPE_TO_PRINT", "-") + context.get("DD","-") + context.get("MO","-") + context.get("YY","-") + context.get("SN5","-") barcode = str(label_brother) for n in range(5): field = f"labeltxt_{n + 1}" if field in printer_fields.keys(): if printer_fields[field] != "": context[field.upper()] = printer_fields[field] # PRINT MAIN PRODUCT LABEL if self.config["hardware_config"]["tecna_t3"] == "absent" and self.config["hardware_config"]["furness_controls"] == "absent": compiled_label = self.components["label_printer_2"].print_label(barcode) # Printing with Brother label printer else: compiled_label = self.components["label_printer"].print_label(label, context=context) self.log.info(f"Main label printed: {context!r}") return compiled_label def print_extra_labels(self): # PRINT EXTRA LABELS IF NEEDED (BEFORE LEAK TEST) if "extra_label_printer" in self.components.keys() and self.print_step is not None: printer_fields = self.print_step.spec if len(printer_fields["extra_label"]) > 0: labels = printer_fields["extra_label"].split(",") for label in labels: self.components["extra_label_printer"].print_label(f"{label}.prn", context=None) @pyqtSlot(str) def load_recipe_from_rfid(self, data): if data not in(None,''): self.tag_loaded_recipe = data if self.step.step_type == "barcode_recipe_selection": if data is not None: self.cycle_available_steps["barcode_recipe_selection"].widget.get(data) else: pass else: # fixture removed self.tag_loaded_recipe = None self.fail_cycle() self.change_recipe() @pyqtSlot(bool) def handle_rfid_error(self, connected): if connected: self.error_label.setText("") # Update the label from Designer else: self.error_label.setText("Errore RFID.") # Update the label from Designer self.error_label.setStyleSheet("color: red;")