import logging import os import sys import time from datetime import datetime from lib.db import Archive, Recipes, Users from PyQt5.QtCore import Qt, QTimer from ui.helpers import replace_widget from ui.recipe_selection import Recipe_Selection from ui.test_assembly import Test_Assembly from ui.test_autotest import Test_Autotest from ui.widget import Widget class Test(Widget): def __init__(self, system_name=None): super().__init__() self.system_name = system_name # GET LOGGER self.log = logging.getLogger("Test") # SHOW USERNAME session = Users.get_session() self.user_l.setText(session.username) if session.is_admin: self.user_l.setStyleSheet("QLabel { color: red; }") else: self.user_l.setStyleSheet("") # SHOW AND UPDATE TIME CLOCK self.refresh_time(init=True) # INIT RECIPE self.recipe = None # INIT CYCLE STATES self.cycle_state = None self.cycle_states = { "select_recipe": Test_Assembly(None, u"SELEZIONARE IL CODICE DA COLLAUDARE", Recipe_Selection()), "autotest": Test_Assembly(None, u"ESEGUIRE PROCEDURA DI AUTOTEST", Test_Autotest()), "assembly_1": Test_Assembly(self.select_step_img("assembly_1"), u"INSERIRE SENSORE"), "done": Test_Assembly(self.select_step_img("success"), u"COLLAUDO COMPLETATO - RIMUOVERE IL SENSORE"), "wait": Test_Assembly(self.select_step_img("wait"), u"ATTENDERE - PAUSA INTER CICLO"), "fail": Test_Assembly(self.select_step_img("fail"), u"CICLO INTERROTTO - RIMUOVERE IL SENSORE"), "emergency": Test_Assembly(self.select_step_img("reset_emergency"), u"EMERGENZA INTERVENUTA - RIPRISTINARE PULSANTE E SELEZIONARE \"RESET EMERGENZA\" DAL MEN\u00d9 \"STRUMENTI\""), } self.cycle_loop = ["assembly_1", "done", "wait"] self.cycle_changing_state = False # SETUP AUTOTEST self.autotest_request = False if "--no-autotest" not in sys.argv: self.autotest_period = 12 * 60 * 60 * 1000 self.request_autotest("init") else: self.autotest_period = None # INIT PIECES COUNTER ([pieces_ok, pieces_failed]) self.pieces = [0, 0] # CONNECT CYCLE CONTROLS self.cancel_b.clicked.connect(self.fail_cycle) self.change_recipe_b.clicked.connect(self.change_recipe) for w in self.cycle_states.values(): if hasattr(w, "ko"): w.ko.connect(self.fail_cycle) if hasattr(w, "ok"): # custom ok handlers should call next again if type(w.widget) is Recipe_Selection: w.ok.connect(self.set_recipe) # elif type(w) is Test_Camera: # w.ok.connect(self.set_data) else: w.ok.connect(self.next) # TESTING if "--test" in sys.argv: self.testing = True else: self.testing = False # /TESTING # START CYCLE self.next() def refresh_time(self, init=False): if init: self.time_timer = QTimer() self.time_timer.setSingleShot(True) self.time_timer.timeout.connect(self.refresh_time) t = datetime.now() self.time_l.setText("{d}/{mo}/{y}\n{h}:{m}".format(y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute)) self.time_timer.start(60 - t.second) def select_step_img(self, step, suffix=None): img_path = "./src/ui/imgs" names = [] if suffix is not None: names.append(f"{step}_{suffix}_{self.system_name}") names.append(f"{step}_{suffix}") names.append(f"{step}_{self.system_name}") names.append(f"{step}") for name in names: for ext in ["png", "jpg"]: path = f"{img_path}/{name}.{ext}" if os.path.isfile(path): return path raise FileNotFoundError(f"No image was found for step {step}") def change_recipe(self): self.next(action="change_recipe") def fail_cycle(self): self.next(action="fail") def setCentralWidget(self, widget): replace_widget(self, "centralWidget", widget) # def check_next(self, interpreted): # self.disconnect(self.watched) # if "digital_io" in interpreted: # if interpreted["digital_io"]["emergency"] is True and self.cycle_state != "emergency": # self.cycle_state = "emergency" # self.next() # elif interpreted["digital_io"]["emergency"] is False and self.cycle_state == "emergency" and self.is_first_cycle_time: # self.is_first_cycle_time = False # # reset cycle # if self.recipe is None: # self.cycle_state = -1 # go to recipe selection # else: # self.cycle_state = 0 # skip recipe selection # self.next(fail=True) # if type(self.centralWidget()) is not Test_Autotest: # IGNORE DURING AUTOTEST # if self.cycle_state == 1: # WAIT FOR PIECE PRESENCE # if self.testing is True and self.is_first_cycle_time: # self.bench.inputs["io"].set_bit(*self.bench.parsers["digital_io"].presence1, True) # self.bench.inputs["io"].set_bit(*self.bench.parsers["digital_io"].presence2, True) # # /TESTING # if self.is_first_cycle_time: # self.is_first_cycle_time = False # log_msg("cycle state:", self.cycle_state, "done", msg_type="test") # if "digital_io" in interpreted and interpreted["digital_io"]["presence1"] and interpreted["digital_io"]["presence2"]: # self.bench.parsers["digital_io"].handler({"lock": True}) # self.next() # elif self.cycle_state == 2: # PIECE INSERTED, LOCK PIECE AFTER DELAY # if self.is_first_cycle_time: # self.is_first_cycle_time = False # log_msg("cycle state:", self.cycle_state, "done", msg_type="test") # self.timer.start(2000) # elif self.cycle_state == 4: # CAMERA TEST & BARCODE READ DONE, START ASSEMBLY PHASE # if self.is_first_cycle_time: # self.is_first_cycle_time = False # log_msg("cycle state:", self.cycle_state, "done", msg_type="test") # self.timer.start(2000) # elif self.cycle_state == 5: # WAIT FOR SCREWDRIVER COUNTS # if self.is_first_cycle_time and self.bench.parsers["pick_to_light"].request is None: # self.bench.parsers["pick_to_light"].handler({ # "vtfe10": self.recipe.vtfe10, # "vtfe11": self.recipe.vtfe11, # "vtfe13": self.recipe.vtfe13, # }) # log_msg("SCREWDRIVER ENABLE", msg_type="test") # elif "pick_to_light" in interpreted and interpreted["pick_to_light"] is not None: # screws = [interpreted["pick_to_light"][k] for k in ["vtfe10", "vtfe11", "vtfe13"]] # needed = [self.bench.parsers["pick_to_light"].request[k] for k in ["vtfe10", "vtfe11", "vtfe13"]] # done = all([screwed >= requested for screwed, requested in zip(screws, needed)]) # self.screw_text(screws) # if self.is_first_cycle_time and done: # self.is_first_cycle_time = False # log_msg("cycle state:", self.cycle_state, "done", msg_type="test") # self.bench.parsers["pick_to_light"].handler(None) # self.screw_text([0, 0, 0]) # self.bench.parsers["digital_io"].handler({"lock": False}) # self.next() # elif self.cycle_state == 6: # # TESTING # if self.testing is True: # self.bench.inputs["io"].set_bit(*self.bench.parsers["digital_io"].presence1, False) # self.bench.inputs["io"].set_bit(*self.bench.parsers["digital_io"].presence2, False) # # /TESTING # if not self.is_done: # self.is_done = True # self.done(True) # if self.is_done and "digital_io" in interpreted and not interpreted["digital_io"]["presence1"] and not interpreted["digital_io"]["presence2"] and self.is_first_cycle_time: # self.is_first_cycle_time = False # log_msg("cycle state:", self.cycle_state, "done", msg_type="test") # if self.skip: # # reset cycle # self.cycle_state = 0 # skip recipe selection # self.skip = False # self.next() # else: # self.timer.start(2000) # elif self.cycle_state == 7: # WAITING BEFORE NEW CYCLE # if self.is_first_cycle_time: # self.is_first_cycle_time = False # log_msg("cycle state:", self.cycle_state, "wait", msg_type="test") # # reset cycle # self.cycle_state = 0 # skip recipe selection # self.timer.start(6000) # self.watched = self.bench.interpreter.update.connect(self.check_next) def request_autotest(self, reason): # you can cancel the request calling request_autotest(False) self.log.info(f"cycle request autotest: reason: {reason!r} autotest_request: {self.autotest_request!r}") if reason == "init": self.autotest_timer = QTimer() self.autotest_timer.setSingleShot(False) self.autotest_timer.timeout.connect(self.request_periodic_autotest) self.time_timer.start(self.autotest_period) reason = "boot" self.autotest_request = reason self.cycle_states["autotest"].widget.set_reason(reason) def request_periodic_autotest(self): self.request_autotest("periodic") def next(self, action=None): self.log.debug(f"cycle next: cycle_state: {self.cycle_state!r} action: {action!r}") if action == "change_recipe": self.log.info(f"cycle next: action: {action!r}") self.set_recipe(recipe=None) self.cycle_changing_state = True self.cycle_state = "select_recipe" elif action == "fail": self.log.info(f"cycle next: action: {action!r}") if self.cycle_state in self.cycle_loop: pass # COUNT FAIL self.pieces[1] += 1 # FAIL AND RESTART TEST self.cycle_changing_state = True self.cycle_state = "fail" elif action is not None: raise NotImplementedError(f"cycle next: action {action!r} is not a valid action") # if action did not set the next cycle_state # set next cycle_state normally if not self.cycle_changing_state: self.cycle_changing_state = True if self.recipe is None: # if recipe not set: select_recipe self.cycle_state = "select_recipe" else: try: # get current cycle_state index in cycle_loop cycle_index = self.cycle_loop.index(self.cycle_state) except ValueError: # if current cycle_state not in cycle_loop # start the cycle_loop cycle_index = -1 if cycle_index == -1 and self.autotest_request is not False: # if cycle_loop is not started or has ended # and autotest was requested self.autotest_request = False self.cycle_state = "autotest" if self.autotest_period is not None: # reset periodic autotest timer self.time_timer.start(self.autotest_period) else: # goto next step in cycle_loop self.cycle_state = self.cycle_loop[(cycle_index + 1) % len(self.cycle_loop)] # enable/disable cycle controls self.change_recipe_b.setEnabled(self.recipe is not None) self.cancel_b.setEnabled(self.cycle_state not in { "emergency", "fail", "select_recipe", "wait", }) self.log.info(f"cycle next: next cycle_state: {self.cycle_state!r}") if self.cycle_state == "done": self.done() w = self.cycle_states[self.cycle_state] # UPDATE PIECES DISPLAY self.pieces_count_l.setText(f"{self.pieces[0]} OK / {self.pieces[1]} NOK / {sum(self.pieces)} TOT") self.setCentralWidget(w) self.cycle_changing_state = False def set_recipe(self, recipe=None): self.recipe = recipe # UPDATE RECIPE DISPLAY if self.recipe is not None: self.recipe_l.setText(self.recipe.name) self.recipe_l.setStyleSheet("") self.next() else: self.recipe_l.setText("NON SELEZIONATA") self.recipe_l.setStyleSheet("QLabel { color: red; }") def done(self, ok=False): self.log.info("cycle done") archived = Archive.archive(self.recipe, self.data, overridden=self.data["overridden"]) self.log.info(f"cycle archived locally: {archived!r}") # LABEL PRINT # self.printer.print_label("1", archived) # self.log.info(f"cycle printed: {archived!r}") # COUNT OK self.pieces[0] += 1