st-ten-1/src/ui/test/test.py
matteo porta c450ab75c1 wip
2022-07-26 20:25:23 +02:00

304 lines
14 KiB
Python
Executable File

import logging
import os
import sys
from datetime import datetime
from lib.db import Archive, Steps, Users
from lib.helpers import get_shift
from playhouse.shortcuts import model_to_dict
from PyQt5.QtCore import QTimer
from ui.helpers import replace_widget
from ui.recipe_selection import Recipe_Selection
from ui.test_assembly import Test_Assembly
from ui.test_autotest import Test_Autotest
from ui.test_barcodes import Test_Barcodes
from ui.test_leak import Test_Leak
from ui.test_vision import Test_Vision
from ui.widget import Widget
class Test(Widget):
def __init__(self, machine_id=None, components=None):
super().__init__()
self.machine_id = machine_id
self.components = components
# GET LOGGER
self.log = logging.getLogger("Test")
# SHOW USERNAME
session = Users.get_session()
self.user_l.setText(session.username)
if session.is_admin:
self.user_l.setStyleSheet("QLabel { color: red; }")
else:
self.user_l.setStyleSheet("")
# SHOW AND UPDATE TIME CLOCK
self.refresh_time(init=True)
# INIT RECIPE
self.recipe = None
self.step = None
# INIT CYCLE STATES
self.cycle_available_steps = {
# "assembly_1": Test_Assembly(self.select_step_img("assembly_1"), u"INSERIRE SENSORE"),
"autotest": Test_Assembly(None, u"ESEGUIRE PROCEDURA DI AUTOTEST", Test_Autotest()),
"barcodes": Test_Assembly(self.select_step_img("scan"), u"LEGGERE IL BARCODE DEL PEZZO DA COLLAUDARE", Test_Barcodes()),
"done": Test_Assembly(self.select_step_img("success"), u"COLLAUDO COMPLETATO"),
"emergency": Test_Assembly(self.select_step_img("reset_emergency"), u"EMERGENZA INTERVENUTA - RIPRISTINARE PULSANTE E SELEZIONARE \"RESET EMERGENZA\" DAL MEN\u00d9 \"STRUMENTI\""),
"fail": Test_Assembly(self.select_step_img("fail"), u"CICLO INTERROTTO"),
"leak": Test_Assembly(None, u"ESECUZIONE TEST IN CORSO - ATTENDERE", Test_Leak(components=self.components, recipe=self.recipe, step=self.step)),
"print": Test_Assembly(self.select_step_img("print"), u"STAMPA ETICHETTA IN CORSO"),
"select_recipe": Test_Assembly(None, u"SELEZIONARE IL CODICE DA COLLAUDARE", Recipe_Selection()),
"vision": Test_Assembly(None, u"VERIFICARE CONTROLLO CON TELECAMERA", Test_Vision(components=self.components, recipe=self.recipe, step=self.step)),
"wait_start": Test_Assembly(self.select_step_img("wait_start"), u"PREMERE START PER INIZIARE PROVA TENUTA"),
"wait": Test_Assembly(self.select_step_img("wait"), u"ATTENDERE - PAUSA INTER CICLO"),
None: Test_Assembly(self.select_step_img("warning"), u"ATTENZIONE - LA RICETTA SELEZIONATA NON CONTIENE FASI DI TEST"),
}
self.cycle_steps = None
self.cycle_index = -1
# SETUP AUTOTEST
self.autotest_request = False
# if "--no-autotest" not in sys.argv:
# self.autotest_period = 12 * 60 * 60 * 1000
# self.request_autotest("init")
# else:
self.autotest_period = None
# INIT TEST DATA
self.data = {"ok": True, "overridden": False}
self.archived = None
# INIT PIECES COUNTER ([pieces_ok, pieces_failed])
self.pieces = [0, 0]
# CONNECT CYCLE CONTROLS
self.cancel_b.clicked.connect(self.fail_cycle)
self.change_recipe_b.clicked.connect(self.change_recipe)
for w in self.cycle_available_steps.values():
if hasattr(w, "ok"):
# custom ok handlers should call next again
if type(w.widget) is Recipe_Selection:
w.ok.connect(self.set_recipe)
elif type(w.widget) is Test_Barcodes:
w.ok.connect(self.set_barcodes)
elif type(w.widget) is Test_Leak:
w.ok.connect(self.set_leak)
elif type(w.widget) is Test_Vision:
w.ok.connect(self.set_vision)
else:
w.ok.connect(self.next)
if hasattr(w, "ko"):
w.ko.connect(self.fail_cycle)
# TESTING
if "--test" in sys.argv:
self.testing = True
else:
self.testing = False
# /TESTING
# START CYCLE
self.next_timer = QTimer()
self.next_timer.setSingleShot(True)
self.next_timer.timeout.connect(self.next)
self.next()
def refresh_time(self, init=False):
if init:
self.time_timer = QTimer()
self.time_timer.setSingleShot(True)
self.time_timer.timeout.connect(self.refresh_time)
t = datetime.now()
self.time_l.setText("{d}/{mo}/{y}\n{h}:{m}".format(y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute))
self.time_timer.start(60 - t.second)
def select_step_img(self, step, suffix=None):
img_path = "./src/ui/imgs"
names = []
if suffix is not None:
names.append(f"{step}_{suffix}_{self.machine_id}")
names.append(f"{step}_{suffix}")
names.append(f"{step}_{self.machine_id}")
names.append(f"{step}")
for name in names:
for ext in ["png", "jpg"]:
path = f"{img_path}/{name}.{ext}"
if os.path.isfile(path):
return path
raise FileNotFoundError(f"No image was found for step {step}")
def change_recipe(self):
self.next(action="change_recipe")
def fail_cycle(self):
self.next(action="fail")
def setCentralWidget(self, widget):
replace_widget(self, "centralWidget", widget)
def request_autotest(self, reason): # you can cancel the request calling request_autotest(False)
self.log.info(f"cycle request autotest: reason: {reason!r} autotest_request: {self.autotest_request!r}")
if reason == "init":
self.autotest_timer = QTimer()
self.autotest_timer.setSingleShot(False)
self.autotest_timer.timeout.connect(self.request_periodic_autotest)
self.time_timer.start(self.autotest_period)
reason = "boot"
self.autotest_request = reason
self.cycle_available_steps["autotest"].widget.set_reason(reason)
def request_periodic_autotest(self):
self.request_autotest("periodic")
def next(self, action=None):
self.log.debug(f"cycle next: cycle step: {self.step!r} action: {action!r}")
current_w = self.centralWidget
if hasattr(current_w, "stop"):
current_w.stop()
if action == "change_recipe":
self.log.info(f"cycle next: action: {action!r}")
self.set_recipe(recipe=None)
self.step = Steps(type="select_recipe")
self.cycle_index = -1
# RESET TEST DATA
self.data = {"ok": True, "overridden": False}
self.archived = None
elif action == "fail":
self.log.info(f"cycle next: action: {action!r}")
# COUNT FAIL
self.pieces[1] += 1
# FAIL AND RESTART TEST
self.step = Steps(type="fail")
self.cycle_index = -1
# RESET TEST DATA
self.data = {"ok": True, "overridden": False}
self.archived = None
elif action is not None:
raise NotImplementedError(f"cycle next: action {action!r} is not a valid action")
# if action did not set the next cycle step
# set next cycle step normally
if self.recipe is None or self.cycle_steps is None:
# if recipe not set: select_recipe
self.step = Steps(type="select_recipe")
elif action is None:
if self.cycle_index == -1 and self.autotest_request is not False:
# if cycle_steps is not started or has ended
# and autotest was requested
self.autotest_request = False
self.step = Steps(type="autotest")
if self.autotest_period is not None: # reset periodic autotest timer
self.time_timer.start(self.autotest_period)
elif len(self.cycle_steps):
# goto next step in cycle_steps
self.cycle_index = (self.cycle_index + 1) % len(self.cycle_steps)
self.step = self.cycle_steps[self.cycle_index]
else:
self.cycle_index = -1
self.archived = None
self.step = Steps(type=None)
# enable/disable cycle controls
self.change_recipe_b.setEnabled(self.recipe is not None)
self.cancel_b.setEnabled(self.step.type is not None and self.step.type not in {
"emergency",
"fail",
"select_recipe",
"wait",
})
self.log.info(f"cycle next: next cycle step: {self.step!r}")
# INIT TEST DATA IF STARTING CYCLE LOOP
if self.cycle_index == 0:
self.data = {"ok": True, "overridden": False}
self.archived = None
w = self.cycle_available_steps[self.step.type]
if hasattr(w, "start"):
w.start(recipe=self.recipe, step=self.cycle_steps[self.cycle_index])
if w is not current_w:
self.setCentralWidget(w)
if self.step.type == "done":
self.archived = self.done()
self.next_timer.start(2000)
elif self.step.type == "print":
self.print(self.archived)
self.next_timer.start(2000)
elif self.step.type == "fail":
self.next_timer.start(2000)
elif self.step.type == "wait":
self.next_timer.start(6000)
# UPDATE PIECES DISPLAY
self.pieces_count_l.setText(f"{self.pieces[0]} OK / {self.pieces[1]} NOK / {sum(self.pieces)} TOT")
def set_recipe(self, recipe=None):
self.recipe = recipe
if self.recipe is None:
self.cycle_steps = None
else:
steps = self.recipe.get_steps()
print_found = False
for i, step in enumerate(steps):
if step.type == "print":
steps.insert(i, Steps(type="done"))
print_found = True
break
if not print_found:
steps.append(Steps(type="done"))
steps.append(Steps(type="wait"))
self.cycle_steps = steps
# UPDATE RECIPE DISPLAY
if self.recipe is not None:
self.recipe_l.setText(self.recipe.name)
self.recipe_l.setStyleSheet("")
self.next()
else:
self.recipe_l.setText("NON SELEZIONATA")
self.recipe_l.setStyleSheet("QLabel { color: red; }")
def set_barcodes(self, barcodes=None):
if "barcodes" not in self.data:
self.data["barcodes"] = {}
self.data["barcodes"].update(barcodes)
self.next()
def set_leak(self, leak=None):
if "leak" not in self.data:
self.data["leak"] = []
leak["step"] = model_to_dict(self.step)
self.data["leak"].append(leak)
self.data["overridden"] = self.data["overridden"] or leak.get("overridden", False)
self.data["ok"] = self.data["ok"] and leak.get("ok", False)
self.next()
def set_vision(self, vision=None):
if "vision" not in self.data:
self.data["vision"] = []
vision["step"] = model_to_dict(self.step)
self.data["vision"].append(vision)
self.data["overridden"] = self.data["overridden"] or vision.get("overridden", False)
self.data["ok"] = self.data["ok"] and vision.get("ok", False)
self.next()
def done(self, ok=True):
self.log.info("cycle done")
archived = Archive.archive(self.recipe, self.data, ok and self.data["ok"], overridden=self.data["overridden"])
self.log.info(f"cycle archived locally: {archived!r}")
# COUNT OK
self.pieces[0] += 1
return archived
def print(self, archived=None):
self.log.info("cycle print")
# LABEL PRINT
context = {
"DATAMATRIX": str(self.data.get("barcodes", {}).get("serial", "")),
"DATAMATRIX_TEXT": str(self.data.get("barcodes", {}).get("serial", "-")),
"PIECE_NUM": str(self.data.get("barcodes", {}).get("serial", "-")),
"DATETIME": str(archived.time.isoformat()),
"SHIFT": str(get_shift(archived.time)),
"STATION": str(self.machine_id),
"OPERATOR": str(Users.get_session().user.username),
"PMAX": str(self.data.get("leak", [{}, {}])[0].get("step", {}).get("spec", {}).get("test_pressure", "-")),
"PMIN": str(self.data.get("leak", [{}, {}])[0].get("results", {}).get("data", {}).get("Running test: pressure at the end of settling", "-")),
"LEAK": str(self.data.get("leak", [{}, {}])[0].get("results", {}).get("data", {}).get("Running test: measured leak", "-")),
"TFILL": str(self.data.get("leak", [{}, {}])[0].get("step", {}).get("spec", {}).get("filling_time", "-")),
"TSTAB": str(self.data.get("leak", [{}, {}])[0].get("step", {}).get("spec", {}).get("settling_time", "-")),
"TTEST": str(self.data.get("leak", [{}, {}])[0].get("step", {}).get("spec", {}).get("test_time", "-")),
"MAXLEAK": str(self.data.get("leak", [{}, {}])[0].get("step", {}).get("spec", {}).get("test_pressure_max_delta", "-")),
"PTESTMIN": str(self.data.get("leak", [{}, {}])[0].get("step", {}).get("spec", {}).get("test_pressure_min_delta", "-")),
"RESULT_L1": "ESITO",
"RESULT_L2": str("CONFORME" if self.data.get("leak", [{}, {}])[0].get("results", {}).get("ok", False) else "SCARTO"),
}
self.components["label_printer"].print_label("EtichettaR5", context=context)
self.log.info(f"cycle printed: {context!r}")