import logging import os import sys from datetime import datetime from lib.db import Archive, Steps, Users from lib.helpers import get_shift from playhouse.shortcuts import model_to_dict from PyQt5.QtCore import QTimer from ui.helpers import replace_widget from ui.recipe_selection import Recipe_Selection from ui.test_assembly import Test_Assembly from ui.test_autotest import Test_Autotest from ui.test_barcodes import Test_Barcodes from ui.test_leak import Test_Leak from ui.test_vision import Test_Vision from ui.widget import Widget class Test(Widget): def __init__(self, machine_id=None, components=None): super().__init__() self.machine_id = machine_id self.components = components # GET LOGGER self.log = logging.getLogger("Test") # SHOW USERNAME session = Users.get_session() self.user_l.setText(session.username) if session.is_admin: self.user_l.setStyleSheet("QLabel { color: red; }") else: self.user_l.setStyleSheet("") # SHOW AND UPDATE TIME CLOCK self.refresh_time(init=True) # INIT RECIPE self.recipe = None self.step = None # INIT CYCLE STATES self.cycle_available_steps = { # "assembly_1": Test_Assembly(img_path=self.select_step_img("assembly_1"), text=u"INSERIRE SENSORE", widget=None), "autotest": Test_Assembly(img_path=None, text=u"ESEGUIRE PROCEDURA DI AUTOTEST", widget=Test_Autotest()), "barcodes": Test_Assembly(img_path=self.select_step_img("scan"), text=u"LEGGERE IL BARCODE DEL PEZZO DA COLLAUDARE", widget=Test_Barcodes()), "done": Test_Assembly(img_path=self.select_step_img("success"), text=u"COLLAUDO COMPLETATO", widget=None), "emergency": Test_Assembly(img_path=self.select_step_img("reset_emergency"), text=u"EMERGENZA INTERVENUTA - RIPRISTINARE PULSANTE E SELEZIONARE \"RESET EMERGENZA\" DAL MEN\u00d9 \"STRUMENTI\"", widget=None), "fail": Test_Assembly(img_path=self.select_step_img("fail"), text=u"CICLO INTERROTTO", widget=None), "leak": Test_Assembly(img_path=None, text=None, widget=Test_Leak(components=self.components, recipe=self.recipe, step=self.step)), "print": Test_Assembly(img_path=self.select_step_img("print"), text=u"STAMPA ETICHETTA IN CORSO", widget=None), "select_recipe": Test_Assembly(img_path=None, text=u"SELEZIONARE IL CODICE DA COLLAUDARE", widget=Recipe_Selection()), "vision": Test_Assembly(img_path=None, text=u"VERIFICARE CONTROLLO CON TELECAMERA", widget=Test_Vision(components=self.components, recipe=self.recipe, step=self.step)), "wait": Test_Assembly(img_path=self.select_step_img("wait"), text=u"ATTENDERE - PAUSA INTER CICLO", widget=None), None: Test_Assembly(img_path=self.select_step_img("warning"), text=u"ATTENZIONE - LA RICETTA SELEZIONATA NON CONTIENE FASI DI TEST", widget=None), } self.cycle_steps = None self.cycle_index = -1 # SETUP AUTOTEST self.autotest_request = False # if "--no-autotest" not in sys.argv: # self.autotest_period = 12 * 60 * 60 * 1000 # self.request_autotest("init") # else: self.autotest_period = None # INIT TEST DATA self.data = {"ok": True, "overridden": False} self.archived = None # INIT PIECES COUNTER ([pieces_ok, pieces_failed]) self.pieces = [0, 0] # CONNECT CYCLE CONTROLS self.cancel_b.clicked.connect(self.fail_cycle) self.change_recipe_b.clicked.connect(self.change_recipe) for w in self.cycle_available_steps.values(): if hasattr(w, "ok"): # custom ok handlers should call next again if type(w.widget) is Recipe_Selection: w.ok.connect(self.set_recipe) elif type(w.widget) is Test_Barcodes: w.ok.connect(self.set_barcodes) elif type(w.widget) is Test_Leak: w.ok.connect(self.set_leak) elif type(w.widget) is Test_Vision: w.ok.connect(self.set_vision) else: w.ok.connect(self.next) if hasattr(w, "ko"): w.ko.connect(self.fail_cycle) # TESTING if "--test" in sys.argv: self.testing = True else: self.testing = False # /TESTING # START CYCLE self.next_timer = QTimer() self.next_timer.setSingleShot(True) self.next_timer.timeout.connect(self.next) self.next() def refresh_time(self, init=False): if init: self.time_timer = QTimer() self.time_timer.setSingleShot(True) self.time_timer.timeout.connect(self.refresh_time) t = datetime.now() self.time_l.setText("{d}/{mo}/{y}\n{h}:{m}".format(y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute)) self.time_timer.start(60 - t.second) def select_step_img(self, step, suffix=None): img_path = "./src/ui/imgs" names = [] if suffix is not None: names.append(f"{step}_{suffix}_{self.machine_id}") names.append(f"{step}_{suffix}") names.append(f"{step}_{self.machine_id}") names.append(f"{step}") for name in names: for ext in ["png", "jpg"]: path = f"{img_path}/{name}.{ext}" if os.path.isfile(path): return path raise FileNotFoundError(f"No image was found for step {step}") def change_recipe(self): self.next(action="change_recipe") def fail_cycle(self): self.next(action="fail") def setCentralWidget(self, widget): replace_widget(self, "centralWidget", widget) def request_autotest(self, reason): # you can cancel the request calling request_autotest(False) self.log.info(f"cycle request autotest: reason: {reason!r} autotest_request: {self.autotest_request!r}") if reason == "init": self.autotest_timer = QTimer() self.autotest_timer.setSingleShot(False) self.autotest_timer.timeout.connect(self.request_periodic_autotest) self.time_timer.start(self.autotest_period) reason = "boot" self.autotest_request = reason self.cycle_available_steps["autotest"].widget.set_reason(reason) def request_periodic_autotest(self): self.request_autotest("periodic") def next(self, action=None): self.log.debug(f"cycle next: cycle step: {self.step!r} action: {action!r}") current_w = self.centralWidget if hasattr(current_w, "stop"): current_w.stop() if action == "change_recipe": self.log.info(f"cycle next: action: {action!r}") self.set_recipe(recipe=None) self.step = Steps(type="select_recipe") self.cycle_index = -1 # RESET TEST DATA self.data = {"ok": True, "overridden": False} self.archived = None # COUNT RESET self.pieces[0] = 0 self.pieces[1] = 0 elif action == "fail": self.log.info(f"cycle next: action: {action!r}") # FAIL AND RESTART TEST self.step = Steps(type="fail") self.cycle_index = -1 # RESET TEST DATA self.data = {"ok": True, "overridden": False} self.archived = None # COUNT FAIL self.pieces[1] += 1 elif action is not None: raise NotImplementedError(f"cycle next: action {action!r} is not a valid action") # if action did not set the next cycle step # set next cycle step normally if self.recipe is None or self.cycle_steps is None: # if recipe not set: select_recipe self.step = Steps(type="select_recipe") elif action is None: if self.cycle_index == -1 and self.autotest_request is not False: # if cycle_steps is not started or has ended # and autotest was requested self.autotest_request = False self.step = Steps(type="autotest") if self.autotest_period is not None: # reset periodic autotest timer self.time_timer.start(self.autotest_period) elif len(self.cycle_steps): # goto next step in cycle_steps self.cycle_index = (self.cycle_index + 1) % len(self.cycle_steps) self.step = self.cycle_steps[self.cycle_index] else: self.cycle_index = -1 self.archived = None self.step = Steps(type=None) # enable/disable cycle controls self.change_recipe_b.setEnabled(self.recipe is not None) self.cancel_b.setEnabled(self.step.type is not None and self.step.type not in { "emergency", "fail", "select_recipe", "wait", }) self.log.info(f"cycle next: next cycle step: {self.step!r}") # INIT TEST DATA IF STARTING CYCLE LOOP if self.cycle_index == 0: self.data = {"ok": True, "overridden": False} self.archived = None w = self.cycle_available_steps[self.step.type] if hasattr(w, "start"): w.start(recipe=self.recipe, step=self.cycle_steps[self.cycle_index]) if w is not current_w: self.setCentralWidget(w) if self.step.type == "done": self.archived = self.done() self.next_timer.start(2000) elif self.step.type == "print": self.print(self.archived) self.next_timer.start(2000) elif self.step.type == "fail": self.next_timer.start(2000) elif self.step.type == "wait": self.next_timer.start(6000) # UPDATE PIECES DISPLAY self.pieces_count_l.setText(f"{self.pieces[0]} OK / {self.pieces[1]} NOK / {sum(self.pieces)} TOT") def set_recipe(self, recipe=None): self.recipe = recipe if self.recipe is None: self.cycle_steps = None else: steps = self.recipe.get_steps() print_found = False for i, step in enumerate(steps): if step.type == "print": steps.insert(i, Steps(type="done")) print_found = True break if not print_found: steps.append(Steps(type="done")) steps.append(Steps(type="wait")) self.cycle_steps = steps # UPDATE RECIPE DISPLAY if self.recipe is not None: self.recipe_l.setText(self.recipe.name) self.recipe_l.setStyleSheet("") self.next() else: self.recipe_l.setText("NON SELEZIONATA") self.recipe_l.setStyleSheet("QLabel { color: red; }") def set_barcodes(self, barcodes=None): if "barcodes" not in self.data: self.data["barcodes"] = {} self.data["barcodes"].update(barcodes) self.next() def set_leak(self, leak=None): if "leak" not in self.data: self.data["leak"] = {} leak["step"] = model_to_dict(self.step) self.data["leak"][len(self.data["leak"])] = leak self.data["overridden"] = self.data["overridden"] or leak.get("overridden", False) self.data["ok"] = self.data["ok"] and leak.get("ok", False) self.next() def set_vision(self, vision=None): if "vision" not in self.data: self.data["vision"] = {} vision["step"] = model_to_dict(self.step) self.data["vision"][len(self.data["vision"])] = vision self.data["overridden"] = self.data["overridden"] or vision.get("overridden", False) self.data["ok"] = self.data["ok"] and vision.get("ok", False) self.next() def done(self, ok=True): self.log.info("cycle done") vision_test_1 = self.data.get("vision", {}).get(0, {}) out_paths = self.components["vision_saver"].save( save_time=vision_test_1.get("time", None), frame=vision_test_1.get("frame", None), # vision=vision_test_1.get("detections", None), ) self.log.info(f"cycle vision saved locally: {out_paths!r}") for vision in self.data["vision"].values(): vision.pop("frame", None) vision.pop("render", None) archived = Archive.archive(self.recipe, self.data, ok and self.data["ok"], overridden=self.data["overridden"]) self.log.info(f"cycle archived locally: {archived!r}") # COUNT OK self.pieces[0] += 1 return archived def print(self, archived=None): self.log.info("cycle print") # LABEL PRINT leak_test_1 = self.data.get("leak", {}).get(0, {}) leak_test_1_step_spec = leak_test_1.get("step", {}).get("spec", {}) leak_test_1_results = leak_test_1.get("results", {}) leak_test_1_results_data = leak_test_1_results.get("data", {}) datamatrix = str(archived.recipe.part_number) + archived.time.strftime("%m%y") + f"{archived.id:0>5}" pmax = leak_test_1_step_spec.get("test_pressure", "-") pmin = leak_test_1_results_data.get("Running test: pressure at the end of settling", "-") leak = leak_test_1_results_data.get("Running test: measured leak", "-") context = { "DATAMATRIX": datamatrix, "DATAMATRIX_TEXT": datamatrix, "PIECE_NUM": str(archived.id), "DATETIME": archived.time.strftime("%d/%m/%Y %H:%M:%S"), "SHIFT": str(get_shift(archived.time)), "STATION": str(self.machine_id), "OPERATOR": str(Users.get_session().user.username), "PMAX": f"{pmax:.3f}" if type(pmax) is not str else pmax, "PMIN": f"{pmin:.3f}" if type(pmin) is not str else pmin, "LEAK": f"{leak:.3f}" if type(leak) is not str else leak, "TFILL": str(leak_test_1_step_spec.get("filling_time", "-")), "TSTAB": str(leak_test_1_step_spec.get("settling_time", "-")), "TTEST": str(leak_test_1_step_spec.get("test_time", "-")), "MAXLEAK": str(leak_test_1_step_spec.get("test_pressure_max_delta", "-")), "PTESTMIN": str(leak_test_1_step_spec.get("test_pressure_min_delta", "-")), "RESULT_L1": "ESITO" + str(" FORZATO" if self.data.get("overridden", False) else ""), "RESULT_L2": str("CONFORME" if leak_test_1_results.get("ok", False) else "SCARTO"), } self.components["label_printer"].print_label("EtichettaR5", context=context) self.log.info(f"cycle printed: {context!r}")