import logging import os import sys import weakref from datetime import datetime from lib.db import Archive, Steps, Users from lib.helpers import get_shift from playhouse.shortcuts import model_to_dict from PyQt5.QtCore import QTimer from PyQt5.QtWidgets import QMessageBox from ui.helpers import replace_widget from ui.recipe_selection import Recipe_Selection from ui.test_assembly import Test_Assembly from ui.test_autotest import Test_Autotest from ui.test_barcodes import Test_Barcodes from ui.test_connector import Test_Connector from ui.test_count import Test_Count from ui.test_count_end import Test_Count_End from ui.test_fail import Test_Fail from ui.test_leak import Test_Leak from ui.test_resistance import Test_Resistance from ui.test_screws import Test_Screws from ui.test_vision import Test_Vision from ui.widget import Widget class Test(Widget): def __init__(self, config, components=None): super().__init__() self.config = config self.components = components # GET LOGGER self.log = logging.getLogger("Test") # SHOW MACHINE DESCRIPTION self.machine_description_l.setText(self.config.get("machine", {}).get("description", "N/A")) # SHOW USERNAME session = Users.get_session() self.user_l.setText(session.username) if session.is_admin: self.user_l.setStyleSheet("QLabel { color: red; }") else: self.user_l.setStyleSheet("") # SHOW AND UPDATE TIME CLOCK self.refresh_time(init=True) # INIT RECIPE self.recipe = None self.step = None self.unsupported_steps = set() self.steps_dependencies = { "count": set(), "connector": {"multicomp", }, "screws": {"screwdriver", "tecna_t3", }, "resistance": {"multicomp", }, "leak": {"tecna_t3", }, "vision": {("uvc_camera", "galaxy_camera", ), "vision", "vision_saver", }, # "neo_pixels", }, "print": {"label_printer", }, } self.unsupported_steps = set() for step_name, dependencies in self.steps_dependencies.items(): for dependency in dependencies: if isinstance(dependency, tuple): if all([d not in self.components or not self.components[d].ready for d in dependency]): self.unsupported_steps.add(step_name) else: if dependency not in self.components or not self.components[dependency].ready: self.unsupported_steps.add(step_name) # INIT PIECES COUNTER self.pieces = {"ok": 0, "ko": 0} # INIT CYCLE STATES self.cycle_available_steps = { # "assembly_1": Test_Assembly(img_path=self.select_step_img("assembly_1"), text=u"INSERIRE SENSORE", widget=None), "autotest": Test_Assembly(img_path=None, text=u"ESEGUIRE PROCEDURA DI AUTOTEST", widget=Test_Autotest()), "barcodes": Test_Assembly(img_path=self.select_step_img("scan"), text=u"LEGGERE IL BARCODE DEL PEZZO DA COLLAUDARE", widget=Test_Barcodes()), "connector": Test_Assembly(img_path=self.select_step_img("scan"), text=u"COLLEGARE IL CONNETTORE INDICATO AL PEZZO E LEGGERE IL SUO BARCODE", widget=Test_Connector(run_once=True)), "count": Test_Assembly(img_path=None, text=u"INSERIRE IL NUMERO DI PEZZI ATTESI PER IL LOTTO", widget=Test_Count(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces, run_once=True)), "count_end": Test_Assembly(img_path=None, text=u"LOTTO TERMINATO, PREMERE CONTINUA PERCOMINCIARNE UNO NUOVO", widget=Test_Count_End(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)), "done": Test_Assembly(img_path=self.select_step_img("success"), text=u"COLLAUDO COMPLETATO", widget=None), "emergency": Test_Assembly(img_path=self.select_step_img("reset_emergency"), text=u"EMERGENZA INTERVENUTA - RIPRISTINARE PULSANTE E SELEZIONARE \"RESET EMERGENZA\" DAL MEN\u00d9 \"STRUMENTI\"", widget=None), "fail": Test_Assembly(img_path=self.select_step_img("fail"), text=u"CICLO INTERROTTO, PREMERE CONTINUA PER COMINCIARE UN NUOVO CICLO", widget=Test_Fail()), "leak": Test_Assembly(img_path=None, text=None, widget=Test_Leak(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)), "print": Test_Assembly(img_path=self.select_step_img("print"), text=u"STAMPA ETICHETTA IN CORSO", widget=None), "resistance": Test_Assembly(img_path=None, text=u"COLLEGARE CONNETTORE ELETTRICO PER EFFETTUARE PROVA RESISTENZA", widget=Test_Resistance(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)), "screws": Test_Assembly(img_path=None, text=u"AVVITARE TUTE LE VITI COME INDICATO", widget=Test_Screws(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)), "select_recipe": Test_Assembly(img_path=None, text=u"SELEZIONARE IL CODICE DA COLLAUDARE", widget=Recipe_Selection(config=self.config, unsupported_steps=self.unsupported_steps)), "vision": Test_Assembly(img_path=None, text=u"VERIFICARE CONTROLLO CON TELECAMERA", widget=Test_Vision(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)), "wait": Test_Assembly(img_path=self.select_step_img("wait"), text=u"ATTENDERE - PAUSA INTER CICLO", widget=None), None: Test_Assembly(img_path=self.select_step_img("warning"), text=u"ATTENZIONE - LA RICETTA SELEZIONATA NON CONTIENE FASI DI TEST", widget=None), } self.cycle_steps = None self.cycle_index = -1 # SETUP AUTOTEST self.autotest_request = False # if "--no-autotest" not in sys.argv: # self.autotest_period = 12 * 60 * 60 * 1000 # self.request_autotest("init") # else: self.autotest_period = None # INIT TEST DATA self.data = {"ok": True, "overridden": False} self.archived = None # CONNECT CYCLE CONTROLS self.cancel_b.clicked.connect(self.fail_cycle) self.change_recipe_b.clicked.connect(self.change_recipe) for step_name, w in self.cycle_available_steps.items(): if hasattr(w, "ok"): # custom ok handlers should call next again if isinstance(w.widget, (Recipe_Selection)): w.ok.connect(self.set_recipe) else: w.ok.connect(lambda data=None, step_name=step_name, self=weakref.ref(self): self().set_step(step_name, data)) if hasattr(w, "ko"): w.ko.connect(self.fail_cycle) # CUSTOM STEP CONNECTIONS self.cycle_available_steps["count"].ok.connect(self.cycle_available_steps["count_end"].widget.set_amount) # TESTING if "--test" in sys.argv: self.testing = True else: self.testing = False # /TESTING # START CYCLE self.next_timer = QTimer() self.next_timer.setSingleShot(True) self.next_timer.timeout.connect(self.next) self.next() def refresh_time(self, init=False): if init: self.time_timer = QTimer() self.time_timer.setSingleShot(True) self.time_timer.timeout.connect(self.refresh_time) t = datetime.now() self.time_l.setText("{d}/{mo}/{y}\n{h}:{m}".format(y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute)) self.time_timer.start(60 - t.second) def select_step_img(self, step, suffix=None): img_path = "./src/ui/imgs" names = [] if suffix is not None: names.append(f"{step}_{suffix}_{self.config.machine_id}") names.append(f"{step}_{suffix}") names.append(f"{step}_{self.config.machine_id}") names.append(f"{step}") for name in names: for ext in ["png", "jpg"]: path = f"{img_path}/{name}.{ext}" if os.path.isfile(path): return path raise FileNotFoundError(f"No image was found for step {step}") def change_recipe(self): self.next(action="change_recipe") def fail_cycle(self): self.next(action="fail") def setCentralWidget(self, widget): replace_widget(self, "centralWidget", widget) def request_autotest(self, reason): # you can cancel the request calling request_autotest(False) self.log.info(f"cycle request autotest: reason: {reason!r} autotest_request: {self.autotest_request!r}") if reason == "init": self.autotest_timer = QTimer() self.autotest_timer.setSingleShot(False) self.autotest_timer.timeout.connect(self.request_periodic_autotest) self.time_timer.start(self.autotest_period) reason = "boot" self.autotest_request = reason self.cycle_available_steps["autotest"].widget.set_reason(reason) def request_periodic_autotest(self): self.request_autotest("periodic") def next(self, action=None): if self.step is not None: self.log.debug(f"cycle next: cycle step: {model_to_dict(self.step)!r} action: {action!r}") else: self.log.debug(f"cycle next: cycle step: {self.step!r} action: {action!r}") current_w = self.centralWidget if hasattr(current_w, "stop"): current_w.stop() if action == "change_recipe": self.log.info(f"cycle next: action: {action!r}") self.set_recipe(recipe=None) self.step = Steps(type="select_recipe") self.cycle_index = -1 # COUNT RESET self.pieces["ok"] = 0 self.pieces["ko"] = 0 elif action == "fail": self.log.info(f"cycle next: action: {action!r}") # FAIL AND RESTART TEST self.step = Steps(type="fail") self.cycle_index = -1 # COUNT FAIL self.pieces["ko"] += 1 elif action is not None: raise NotImplementedError(f"cycle next: action {action!r} is not a valid action") # if action did not set the next cycle step # set next cycle step normally if self.recipe is None or self.cycle_steps is None: # if recipe not set: select_recipe self.step = Steps(type="select_recipe") elif action is None: if self.cycle_index == -1 and self.autotest_request is not False: # if cycle_steps is not started or has ended # and autotest was requested self.autotest_request = False self.step = Steps(type="autotest") if self.autotest_period is not None: # reset periodic autotest timer self.time_timer.start(self.autotest_period) elif len(self.cycle_steps): # goto next step in cycle_steps self.cycle_index = (self.cycle_index + 1) % len(self.cycle_steps) self.step = self.cycle_steps[self.cycle_index] else: self.cycle_index = -1 self.step = Steps(type=None) # enable/disable cycle controls self.change_recipe_b.setEnabled(self.recipe is not None) self.cancel_b.setEnabled(self.step.type is not None and self.step.type not in { "emergency", "fail", "select_recipe", "wait", }) self.log.info(f"cycle next: next cycle step: {model_to_dict(self.step)!r}") # INIT TEST DATA IF STARTING CYCLE LOOP OR IF RESET IS NEEDED if self.cycle_index <= 0: self.data = {"ok": True, "overridden": False} self.archived = None if self.recipe is not None and "recipe" not in self.data: self.data["recipe"] = model_to_dict(self.recipe) w = self.cycle_available_steps[self.step.type] show = None if hasattr(w, "start"): show = w.start(recipe=self.recipe, step=self.cycle_steps[self.cycle_index], pieces=self.pieces) if show is not False and w is not current_w: self.setCentralWidget(w) elif show is False: self.next_timer.start(0) if self.step.type == "done": self.archived = self.done() self.next_timer.start(2000) elif self.step.type == "print": compiled_label = self.print(self.archived, self.step.spec.get("template", "EtichettaR5")) self.archived.label = compiled_label self.archived.save() self.next_timer.start(2000) # elif self.step.type == "fail": # self.next_timer.start(2000) elif self.step.type == "wait": self.next_timer.start(2000) # UPDATE PIECES DISPLAY self.pieces_count_l.setText(f"{self.pieces['ok']} OK / {self.pieces['ko']} NOK / {sum(self.pieces.values())} TOT") def set_recipe(self, recipe=None): self.recipe = recipe if self.recipe is None: self.cycle_steps = None else: steps = self.recipe.get_steps() skip = set() print_found = False count_found = False for i, step in enumerate(steps): if i in skip: continue if step.type == "count": count_found = True if step.type == "resistance": steps.insert(i + 1, Steps(type="resistance", spec={ "scale": 500, "expected": float("+inf"), "tolerance": 0, })) skip.add(i + 1) if step.type == "print": steps.insert(i, Steps(type="done")) print_found = True skip.add(i + 1) if count_found: steps.insert(i + 2, Steps(type="count_end")) if not print_found: steps.append(Steps(type="done")) if count_found: steps.append(Steps(type="count_end")) steps.append(Steps(type="wait")) self.cycle_steps = steps self.check_steps_dependencies(self.cycle_steps) for w in self.cycle_available_steps.values(): if hasattr(w, "reset"): w.reset() # UPDATE RECIPE DISPLAY if self.recipe is not None: self.log.info(f"cycle recipe: cycle recipe: {model_to_dict(self.recipe)!r} cycle steps: {[model_to_dict(s) for s in self.cycle_steps]}") self.recipe_l.setText(self.recipe.name) self.recipe_l.setStyleSheet("") self.next() else: self.log.info(f"cycle recipe: cycle recipe: {self.recipe!r} cycle steps: {self.cycle_steps}") self.recipe_l.setText("NON SELEZIONATA") self.recipe_l.setStyleSheet("QLabel { color: red; }") def check_steps_dependencies(self, steps): unsupported_steps = set() missing_components = set() for step in steps: if step.type in self.unsupported_steps or step.type not in self.cycle_available_steps: unsupported_steps.add(step.type) else: for dependency in self.steps_dependencies.get(step.type, []): if isinstance(dependency, tuple): if all([d not in self.components for d in dependency]): unsupported_steps.add(step.type) else: unready = set() for d in dependency: if d in self.components: if not self.components[d].ready: self.components[d].reconfigure() if not self.components[d].ready: unready.add(d) else: unready.add(d) if unready == set(dependency): missing_components.add(" or ".join(dependency)) else: if dependency not in self.components: unsupported_steps.add(step.type) else: if not self.components[dependency].ready: self.components[dependency].reconfigure() if not self.components[dependency].ready: missing_components.add(dependency) if len(unsupported_steps): QMessageBox.critical(None, "Errore Ricetta", f"Questa ricetta contiene uno o piu step non supportati da questo banco:\n{', '.join(sorted(unsupported_steps))}\nModificare la ricetta adeguatamente.") if len(missing_components): QMessageBox.critical(None, "Errore Componenti Ricetta", f"Questa ricetta richiede i seguenti componenti per essere eseguita:\n{', '.join(sorted(missing_components))}\nModificare la ricetta adeguatamente o collegare i componenti elencati.") if len(unsupported_steps) or len(missing_components): self.change_recipe() def set_step(self, step_name, data=None): if step_name not in self.data: self.data[step_name] = {} if data is not None: data["step"] = model_to_dict(self.step) self.data[step_name][str(len(self.data[step_name]))] = data self.data["overridden"] = self.data["overridden"] or data.get("overridden", False) self.data["ok"] = self.data["ok"] and data.get("ok", False) self.next() def done(self, ok=True): self.log.info("cycle done") if "vision" in self.data: vision_test_1 = self.data.get("vision", {}).get("0", {}) out_paths = self.components["vision_saver"].save( save_time=vision_test_1.get("time", None), frame=vision_test_1.get("frame", None), # vision=vision_test_1.get("detections", None), ) self.data.get("vision", {}).get("0", {})["files"] = out_paths self.log.info(f"cycle vision saved locally: {out_paths!r}") for vision in self.data.get("vision", {}).values(): vision.pop("frame", None) vision.pop("render", None) archived = Archive.archive(self.data, ok and self.data["ok"], overridden=self.data["overridden"]) self.log.info(f"cycle archived locally: {archived!r}") # COUNT OK self.pieces["ok"] += 1 return archived @staticmethod def labellify(v): if v is None: v = "-" elif isinstance(v, float): v = f"{v:.1f}" if not isinstance(v, str): v = str(v) return v def print(self, archived, label): self.log.info("cycle print") if archived.label is not None: raise AssertionErrror("this should never happen") self.components["label_printer"].print_label(archived.label, context=None) self.log.info("cycle printed already compiled label") # LABEL PRINT recipe = archived.test_data.get("recipe", {}) leak_test_1 = archived.test_data.get("leak", {}).get("0", {}) leak_test_1_step = leak_test_1.get("step", {}) leak_test_1_step_spec = leak_test_1_step.get("spec", {}) leak_test_1_results = leak_test_1.get("results", {}) leak_test_1_results_data = leak_test_1_results.get("data", {}) context = { # RECIPE DATA "RECIPE": self.labellify(recipe.get("name", "-")), "CLIENT": self.labellify(recipe.get("client", "-")), "PART": self.labellify(recipe.get("part_number", "-")), # STEP SPEC "TPREFILL": self.labellify(leak_test_1_step_spec.get("pre_filling_time", "-")), "PPREFILL": self.labellify(leak_test_1_step_spec.get("pre_filling_pressure", "-")), "TFILL": self.labellify(leak_test_1_step_spec.get("filling_time", "-")), "TSET": self.labellify(leak_test_1_step_spec.get("settling_time", "-")), "PSETMINP": self.labellify(leak_test_1_step_spec.get("settling_pressure_min_percent", " -")), "PSETMAXP": self.labellify(leak_test_1_step_spec.get("settling_pressure_max_percent", " -")), "TTEST": self.labellify(leak_test_1_step_spec.get("test_time", "-")), "PMIN": self.labellify(leak_test_1_step_spec.get("test_pressure_min_delta", "-")), "PTEST": self.labellify(leak_test_1_step_spec.get("test_pressure", "-")), "PMAX": self.labellify(leak_test_1_step_spec.get("test_pressure_max_delta", "-")), "TFLUSH": self.labellify(leak_test_1_step_spec.get("flush_time", "-")), "PFLUSH": self.labellify(leak_test_1_step_spec.get("flush_pressure", "-")), # ACTUAL TESTED VALUES "RESTPB": self.labellify(leak_test_1_results_data.get("Running test: phase backwards time", "-")), "RESPFILL": self.labellify(leak_test_1_results_data.get("Running test: filling pressure", "-")), "RESPSET": self.labellify(leak_test_1_results_data.get("Running test: pressure at the end of settling", "-")), "RESPB": self.labellify(leak_test_1_results_data.get("Running test: burst pressure", "-")), "RESLEAK": self.labellify(leak_test_1_results_data.get("Running test: measured leak", "-")), "RESFLOW": self.labellify(leak_test_1_results_data.get("Running test: calculated leak flow rate", "-")), "RESRVP": self.labellify(leak_test_1_results_data.get("Running test: calculate RVP%", "-")), "RESRES": self.labellify(leak_test_1_results_data.get("Running test: result", "-")), # SERIAL DEFINITION "SN": str(archived.id), "SN5": f"{archived.id:0>5}", "SN6": f"{archived.id:0>6}", # TIME DEFINITION "DATETIME": archived.time.strftime("%d/%m/%Y %H:%M:%S"), "DATE": archived.time.strftime("%d/%m/%Y"), "TIME": archived.time.strftime("%H:%M:%S"), "YYYY": archived.time.strftime("%Y"), "YY": archived.time.strftime("%y"), "MO": archived.time.strftime("%m"), "DD": archived.time.strftime("%d"), "HH": archived.time.strftime("%H"), "MI": archived.time.strftime("%M"), "SS": archived.time.strftime("%S"), # EXTRA DATA "SHIFT": str(get_shift(archived.time)), "STATION": str(self.config.machine_id), "OPERATOR": str(archived.user.username), # RESULT "RESULT": str("CONFORME" if leak_test_1_results.get("ok", False) else "SCARTO") + str(" FORZATO" if self.data.get("overridden", False) else ""), "RESULT_L1": "ESITO" + str(" FORZATO" if self.data.get("overridden", False) else ""), "RESULT_L2": str("CONFORME" if leak_test_1_results.get("ok", False) else "SCARTO"), } compiled_label = self.components["label_printer"].print_label(label, context=context) self.log.info(f"cycle printed: {context!r}") return compiled_label