684 lines
36 KiB
Python
Executable File
684 lines
36 KiB
Python
Executable File
import copy
|
|
import logging
|
|
import os
|
|
import sys
|
|
import weakref
|
|
from datetime import datetime, timedelta
|
|
|
|
from lib.db import Archive, Steps, Users
|
|
from lib.helpers import get_shift
|
|
from playhouse.shortcuts import model_to_dict
|
|
from PyQt5.QtCore import QTimer, pyqtSlot
|
|
from PyQt5.QtWidgets import QMessageBox
|
|
from ui.helpers import replace_widget
|
|
from ui.recipe_selection import Recipe_Selection
|
|
from ui.barcode_recipe_selection import Barcode_Recipe_Selection
|
|
from ui.test_assembly import Test_Assembly
|
|
from ui.test_barcodes import Test_Barcodes
|
|
from ui.test_connector import Test_Connector
|
|
from ui.test_count import Test_Count
|
|
from ui.test_count_end import Test_Count_End
|
|
from ui.test_warning_img import Test_Warning_Img
|
|
from ui.test_instructions import Test_Instructions
|
|
from ui.test_fail import Test_Fail
|
|
from ui.test_leak import Test_Leak
|
|
from ui.test_resistance import Test_Resistance
|
|
from ui.test_screws import Test_Screws
|
|
from ui.test_vision import Test_Vision
|
|
from ui.widget import Widget
|
|
|
|
|
|
class Test(Widget):
|
|
def __init__(self, config, components=None,main_window=None):
|
|
super().__init__()
|
|
self.main_window=main_window
|
|
self.config = config
|
|
self.components = components
|
|
# GET LOGGER
|
|
self.log = logging.getLogger("Test")
|
|
# SHOW MACHINE DESCRIPTION
|
|
self.machine_description_l.setText(self.config.get("machine", {}).get("description", "N/A"))
|
|
# SHOW USERNAME
|
|
session = Users.get_session()
|
|
self.user_l.setText(session.username)
|
|
if session.is_admin:
|
|
self.user_l.setStyleSheet("QLabel { color: red; }")
|
|
else:
|
|
self.user_l.setStyleSheet("")
|
|
# SHOW AND UPDATE TIME CLOCK
|
|
self.refresh_time(init=True)
|
|
# INIT RECIPE
|
|
self.recipe = None
|
|
|
|
if self.config["hardware_config"]["barcode_recipe_selection"]=="present":
|
|
self.recipe_selection_mode = "barcode"
|
|
else:
|
|
self.recipe_selection_mode = "table"
|
|
self.step = None
|
|
self.unsupported_steps = set()
|
|
self.steps_dependencies = {
|
|
"count": set(),
|
|
"connector": {"multicomp", },
|
|
"instruction":{"digital_io"},
|
|
"screws": {"screwdriver", "tecna_t3", },
|
|
"resistance": {"multicomp", },
|
|
"leak_1": {"tecna_t3", },
|
|
"leak_2": {"tecna_t3", },
|
|
"vision": {("uvc_camera", "galaxy_camera", ), "vision", "vision_saver", }, # "neo_pixels", },
|
|
"print": {"label_printer", },
|
|
}
|
|
self.unsupported_steps = set()
|
|
for step_name, dependencies in self.steps_dependencies.items():
|
|
for dependency in dependencies:
|
|
if isinstance(dependency, tuple):
|
|
# if all([d not in self.components or not self.components[d].ready for d in dependency]):
|
|
if all([d not in self.components for d in dependency]):
|
|
self.unsupported_steps.add(step_name)
|
|
else:
|
|
# if dependency not in self.components or not self.components[dependency].ready:
|
|
if dependency not in self.components:
|
|
self.unsupported_steps.add(step_name)
|
|
# INIT PIECES COUNTER
|
|
self.pieces = {"ok": 0, "ko": 0}
|
|
# INIT CYCLE STATES
|
|
self.cycle_available_steps = {
|
|
# "assembly_1": Test_Assembly(img_path=self.select_step_img("assembly_1"), text=u"INSERIRE SENSORE", widget=None),
|
|
"barcodes": Test_Assembly(img_path=self.select_step_img("scan"), text=u"LEGGERE IL BARCODE DEL PEZZO DA COLLAUDARE", widget=Test_Barcodes()),
|
|
"connector": Test_Assembly(img_path=self.select_step_img("scan"), text=u"COLLEGARE IL CONNETTORE INDICATO AL PEZZO E LEGGERE IL SUO BARCODE", widget=Test_Connector(run_once=True)),
|
|
"count": Test_Assembly(img_path=None, text=u"INSERIRE IL NUMERO DI PEZZI ATTESI PER IL LOTTO", widget=Test_Count(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces, run_once=True)),
|
|
"warning_img": Test_Assembly(img_path=None, text=u"ATTENZIONE - PER QUESTO CODICE ESEGUIRE LE OPERAZIONI INDICATE IN FIGURA", widget=Test_Warning_Img(components=self.components, recipe=self.recipe, step=self.step,run_once=True)),
|
|
"count_end": Test_Assembly(img_path=None, text=u"LOTTO TERMINATO, PREMERE CONTINUA PERCOMINCIARNE UNO NUOVO", widget=Test_Count_End(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)),
|
|
"done": Test_Assembly(img_path=self.select_step_img("success"), text=u"COLLAUDO COMPLETATO", widget=None),
|
|
"emergency": Test_Assembly(img_path=self.select_step_img("reset_emergency"), text=u"EMERGENZA INTERVENUTA - RIPRISTINARE PULSANTE E SELEZIONARE \"RESET EMERGENZA\" DAL MEN\u00d9 \"STRUMENTI\"", widget=None),
|
|
"fail": Test_Assembly(img_path=self.select_step_img("fail"), text=u"CICLO INTERROTTO, PREMERE CONTINUA PER COMINCIARE UN NUOVO CICLO", widget=Test_Fail(parent=self)),
|
|
"blow": Test_Assembly(img_path=None, text=u"SOFFIAGGIO TUBO IN CORSO - ATTENDERE...", widget=Test_Warning_Img(components=self.components, recipe=self.recipe, step=self.step)),
|
|
"leak_1": Test_Assembly(img_path=None, text=None, widget=Test_Leak(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces,parent=self)),
|
|
"leak_2": Test_Assembly(img_path=None, text=None, widget=Test_Leak(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces,parent=self)),
|
|
"flush": Test_Assembly(img_path=None, text=u"SCARICO ARIA IN CORSO - ATTENDERE...", widget=Test_Warning_Img(components=self.components, recipe=self.recipe, step=self.step)),
|
|
"instruction": Test_Assembly(img_path=None, text=u"ESEGUIRE LE OPERAZIONI DI MONTAGGIO INDICATE IN FIGURA", widget=Test_Instructions(components=self.components, recipe=self.recipe,bench_name=self.config.machine_id, step=self.step)),
|
|
"piece_removal": Test_Assembly(img_path=None, text=u"RIMUOVERE IL PEZZO APRENDO TUTTE LE CHIUSURE", widget=Test_Instructions(components=self.components, recipe=self.recipe,bench_name=self.config.machine_id, step=self.step)),
|
|
"print": Test_Assembly(img_path=self.select_step_img("print"), text=u"STAMPA ETICHETTA IN CORSO", widget=None),
|
|
"resistance": Test_Assembly(img_path=None, text=u"COLLEGARE CONNETTORE ELETTRICO PER EFFETTUARE PROVA RESISTENZA", widget=Test_Resistance(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)),
|
|
"screws": Test_Assembly(img_path=None, text=u"AVVITARE TUTE LE VITI COME INDICATO", widget=Test_Screws(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)),
|
|
"select_recipe": Test_Assembly(img_path=None, text=u"SELEZIONARE IL CODICE DA COLLAUDARE", widget=Recipe_Selection(config=self.config, unsupported_steps=self.unsupported_steps)),
|
|
"barcode_recipe_selection": Test_Assembly(img_path=self.select_step_img("scan"), text=u"LEGGERE IL BARCODE SULLA DIMA DEL COMPONENTE DA COLLAUDARE", widget=Barcode_Recipe_Selection(parent=self)),
|
|
"vision": Test_Assembly(img_path=None, text=u"VERIFICARE CONTROLLO CON TELECAMERA", widget=Test_Vision(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)),
|
|
"wait": Test_Assembly(img_path=self.select_step_img("wait"), text=u"ATTENDERE - PAUSA INTER CICLO", widget=None),
|
|
None: Test_Assembly(img_path=self.select_step_img("warning"), text=u"ATTENZIONE - LA RICETTA SELEZIONATA NON CONTIENE FASI DI TEST", widget=None),
|
|
}
|
|
self.cycle_steps = None
|
|
self.cycle_index = -1
|
|
self.print_step = None
|
|
# SETUP AUTOTEST
|
|
self.autotest_request = False
|
|
self.autotesting = False
|
|
self.autotesting_reason = None
|
|
self.autotest_cycle_steps = None
|
|
if "--no-autotest" not in sys.argv:
|
|
self.autotest_period = 8.5 * 60 * 60 * 1000 # 8.5 HOURS
|
|
# self.autotest_period = 12 * 60 * 60 * 1000 # 12 HOURS
|
|
#if not self.config["autotest_done"]:
|
|
# self.request_autotest("init")
|
|
else:
|
|
self.autotest_period = None
|
|
# INIT TEST DATA
|
|
self.data = {"ok": True, "overridden": False}
|
|
self.archived = None
|
|
# CONNECT CYCLE CONTROLS
|
|
self.cancel_b.clicked.connect(self.fail_cycle)
|
|
self.change_recipe_b.clicked.connect(self.change_recipe)
|
|
for step_name, w in self.cycle_available_steps.items():
|
|
if hasattr(w, "ok"):
|
|
# custom ok handlers should call next again
|
|
if isinstance(w.widget, (Recipe_Selection)):
|
|
w.ok.connect(self.set_recipe)
|
|
else:
|
|
w.ok.connect(lambda data=None, step_name=step_name, self=weakref.ref(self): self().set_step(step_name, data))
|
|
if hasattr(w, "ko"):
|
|
w.ko.connect(self.fail_cycle)
|
|
# CUSTOM STEP CONNECTIONS
|
|
self.cycle_available_steps["count"].ok.connect(self.cycle_available_steps["count_end"].widget.set_amount)
|
|
#self.cycle_available_steps["warning_img"].ok.connect(self.cycle_available_steps["warning_img"].widget.set_done)
|
|
|
|
if "fixture_id" in self.components.keys():
|
|
self.components["fixture_id"].new_id_signal.connect(self.load_recipe_from_rfid)
|
|
|
|
# TESTING
|
|
if "--test" in sys.argv:
|
|
self.testing = True
|
|
else:
|
|
self.testing = False
|
|
# /TESTING
|
|
# START CYCLE
|
|
self.next_timer = QTimer()
|
|
self.next_timer.setSingleShot(True)
|
|
self.next_timer.timeout.connect(self.next)
|
|
self.next()
|
|
|
|
def refresh_time(self, init=False):
|
|
if init:
|
|
self.time_timer = QTimer()
|
|
self.time_timer.setSingleShot(True)
|
|
self.time_timer.timeout.connect(self.refresh_time)
|
|
t = datetime.now()
|
|
self.time_l.setText("{d}/{mo}/{y}\n{h}:{m}".format(y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute))
|
|
self.time_timer.start(60 - t.second)
|
|
|
|
def select_step_img(self, step, suffix=None):
|
|
img_path = "./src/ui/imgs"
|
|
names = []
|
|
if suffix is not None:
|
|
names.append(f"{step}_{suffix}_{self.config.machine_id}")
|
|
names.append(f"{step}_{suffix}")
|
|
names.append(f"{step}_{self.config.machine_id}")
|
|
names.append(f"{step}")
|
|
for name in names:
|
|
for ext in ["png", "jpg"]:
|
|
path = f"{img_path}/{name}.{ext}"
|
|
if os.path.isfile(path):
|
|
return path
|
|
raise FileNotFoundError(f"No image was found for step {step}")
|
|
|
|
def change_recipe(self):
|
|
self.next(action="change_recipe")
|
|
|
|
def set_recipe_mode_table(self):
|
|
self.recipe_selection_mode = "table"
|
|
self.change_recipe()
|
|
|
|
def set_recipe_mode_barcode(self):
|
|
self.recipe_selection_mode = "barcode"
|
|
self.change_recipe()
|
|
|
|
def fail_cycle(self):
|
|
self.next(action="fail")
|
|
|
|
def setCentralWidget(self, widget):
|
|
replace_widget(self, "centralWidget", widget)
|
|
|
|
def request_autotest(self, reason): # you can cancel the request calling request_autotest(False)
|
|
if "--no-autotest" not in sys.argv:
|
|
|
|
self.log.info(f"cycle request autotest: reason: {reason!r} autotest_request: {self.autotest_request!r}")
|
|
if reason in ("init","login"):
|
|
self.autotest_timer = QTimer()
|
|
self.autotest_timer.setSingleShot(False)
|
|
self.autotest_timer.timeout.connect(self.request_periodic_autotest)
|
|
if self.autotest_period is not None:
|
|
self.autotest_timer.start(self.autotest_period)
|
|
reason = "boot"
|
|
self.autotest_request = reason
|
|
if reason == "logout":
|
|
self.next(action="abort")
|
|
|
|
|
|
def request_periodic_autotest(self):
|
|
self.request_autotest("periodic")
|
|
|
|
def next(self, action=None):
|
|
if self.step is not None:
|
|
self.log.debug(f"cycle next: cycle step: {model_to_dict(self.step)!r} action: {action!r}")
|
|
else:
|
|
self.log.debug(f"cycle next: cycle step: {self.step!r} action: {action!r}")
|
|
current_w = self.centralWidget
|
|
if hasattr(current_w, "stop"):
|
|
current_w.stop()
|
|
if action == "change_recipe":
|
|
self.log.info(f"cycle next: action: {action!r}")
|
|
self.set_recipe(recipe=None)
|
|
self.cycle_available_steps["leak_1"].recipe_written=False
|
|
self.cycle_available_steps["leak_2"].recipe_written=False
|
|
self.step = Steps(type="select_recipe")
|
|
self.cycle_index = -1
|
|
# COUNT RESET
|
|
self.pieces["ok"] = 0
|
|
self.pieces["ko"] = 0
|
|
elif action in ("fail","abort"):
|
|
self.log.info(f"cycle next: action: {action!r}")
|
|
# FAIL AND RESTART TEST
|
|
self.step = Steps(type="fail")
|
|
self.cycle_index = -1
|
|
# COUNT FAIL
|
|
if action == "fail":
|
|
self.done(ok=False)
|
|
elif action is not None:
|
|
raise NotImplementedError(f"cycle next: action {action!r} is not a valid action")
|
|
# if action did not set the next cycle step
|
|
# set next cycle step normally
|
|
if self.recipe is None or self.cycle_steps is None:
|
|
# if recipe not set: select_recipe
|
|
if self.recipe_selection_mode == "barcode":
|
|
self.step = Steps(type="barcode_recipe_selection")
|
|
else:
|
|
self.step = Steps(type="select_recipe")
|
|
elif action is None:
|
|
if self.autotest_request is not False and self.autotest_cycle_steps is not None and not self.autotesting and (self.cycle_index == -1 or self.cycle_index + 1 >= len(self.cycle_steps)):
|
|
# if autotest was requested
|
|
# and if cycle_steps is not started or has ended
|
|
self.autotesting = True
|
|
self.autotesting_reason = self.autotest_request
|
|
self.autotest_request = False
|
|
if self.autotest_period is not None: # reset periodic autotest timer
|
|
self.time_timer.start(self.autotest_period)
|
|
if self.autotesting:
|
|
if self.cycle_index + 1 < len(self.autotest_cycle_steps):
|
|
# goto next step in autotest_cycle_steps
|
|
self.cycle_index = (self.cycle_index + 1) % len(self.autotest_cycle_steps)
|
|
self.step = self.autotest_cycle_steps[self.cycle_index]
|
|
else:
|
|
# autotest ended
|
|
self.autotesting = False
|
|
if self.autotesting_reason == "logout":
|
|
Users.logout()
|
|
self.main_window.open_login()
|
|
else:
|
|
t = datetime.now()
|
|
self.last_at_l.setText("{d}/{mo}/{y} {h}:{m}".format(y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute))
|
|
t+=timedelta(seconds=int(self.autotest_period/1000))
|
|
self.next_at_l.setText("{d}/{mo}/{y} {h}:{m}".format(y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute))
|
|
self.autotesting_reason = None
|
|
self.cycle_index = -1
|
|
self.config["autotest_done"] = True
|
|
if not self.autotesting:
|
|
if len(self.cycle_steps):
|
|
# goto next step in cycle_steps
|
|
self.cycle_index = (self.cycle_index + 1) % len(self.cycle_steps)
|
|
self.step = self.cycle_steps[self.cycle_index]
|
|
|
|
else:
|
|
self.cycle_index = -1
|
|
self.step = Steps(type=None)
|
|
# enable/disable cycle controls
|
|
self.change_recipe_b.setEnabled(self.recipe is not None)
|
|
self.cancel_b.setEnabled(self.step.type is not None and self.step.type not in {
|
|
"emergency",
|
|
"fail",
|
|
"select_recipe",
|
|
"wait",
|
|
})
|
|
self.log.info(f"cycle next: next cycle step: {model_to_dict(self.step)!r}")
|
|
# INIT TEST DATA IF STARTING CYCLE LOOP OR IF RESET IS NEEDED
|
|
if self.cycle_index == 0:
|
|
self.data = {"ok": True, "overridden": False}
|
|
self.archived = None
|
|
if self.recipe is not None and "recipe" not in self.data:
|
|
self.data["recipe"] = model_to_dict(self.recipe)
|
|
w = self.cycle_available_steps[self.step.type]
|
|
show = None
|
|
if self.step.type=="leak_2":
|
|
self.setCentralWidget(w) # NEED TO PRESHOW UI
|
|
if hasattr(w, "start"):
|
|
show = w.start(recipe=self.recipe, step=self.step, pieces=self.pieces)
|
|
if show is not False and w is not current_w:
|
|
self.setCentralWidget(w)
|
|
elif show is False:
|
|
self.next_timer.start(0)
|
|
if self.step.type == "done":
|
|
self.archived = self.done()
|
|
self.next_timer.start(2000)
|
|
elif self.step.type == "print":
|
|
compiled_label = self.print(self.archived, self.step.spec.get("template", "EtichettaR5"))
|
|
self.archived.label = compiled_label
|
|
self.archived.save()
|
|
self.next_timer.start(2000)
|
|
elif self.step.type == "wait":
|
|
self.next_timer.start(2000)
|
|
# UPDATE PIECES DISPLAY
|
|
self.pieces_count_l.setText(f"{self.pieces['ok']} OK / {self.pieces['ko']} NOK / {sum(self.pieces.values())} TOT")
|
|
|
|
def set_recipe(self, recipe=None):
|
|
self.recipe = recipe
|
|
if self.recipe is None:
|
|
self.cycle_steps = None
|
|
self.autotest_cycle_steps = None
|
|
else:
|
|
steps = self.recipe.get_steps()
|
|
skip = set()
|
|
print_found = False
|
|
count_found = False
|
|
# create step sequence list
|
|
for i, step in enumerate(steps):
|
|
if i in skip:
|
|
continue
|
|
if step.type == "vision":
|
|
self.components["vision"].config_changed(vision_recipe=self.recipe.name)
|
|
if step.type == "count":
|
|
count_found = True
|
|
if "warning_img" in step.spec:
|
|
if step.spec["warning_img"]:
|
|
steps.insert(i,Steps(type="warning_img", spec={"warning_img":step.spec["warning_img"]}))
|
|
skip.add(i + 1)
|
|
if "assembly" in step.spec:
|
|
if step.spec["assembly"]:
|
|
steps.insert(i, Steps(type="instructions", spec={"num_tape": step.spec["num_tape"],
|
|
"num_piece": step.spec["num_piece"],
|
|
"num_ring": step.spec["num_ring"]}))
|
|
skip.add(i + 1)
|
|
if step.type == "resistance":
|
|
steps.insert(i + 1, Steps(type="resistance", spec={
|
|
"scale": 500,
|
|
"expected": float("+inf"),
|
|
"tolerance_pos": 0,
|
|
"tolerance_neg": 0,
|
|
}))
|
|
skip.add(i + 1)
|
|
if step.type == "print":
|
|
steps.insert(i, Steps(type="done"))
|
|
print_found = True
|
|
self.print_step=step
|
|
skip.add(i + 1)
|
|
if count_found:
|
|
steps.insert(i + 2, Steps(type="count_end"))
|
|
|
|
if self.config["hardware_config"].get("enforce_piece_removal", "no") == "yes":
|
|
steps.append(Steps(type="piece_removal"))
|
|
|
|
if not print_found:
|
|
steps.append(Steps(type="done"))
|
|
if count_found:
|
|
steps.append(Steps(type="count_end"))
|
|
steps.append(Steps(type="wait"))
|
|
self.cycle_steps = steps
|
|
self.check_steps_dependencies(self.cycle_steps)
|
|
leak_autotest_steps=[]
|
|
# CONFIGURE LEAK AUTOTEST PARAMETERS
|
|
if self.config["autotest_leak"]["enabled"]=="true":
|
|
l_at_1=copy.deepcopy(self.config["autotest_leak"])
|
|
l_at_1.pop("enabled")
|
|
l_at_1={a: float(x) for a, x in l_at_1.items()}
|
|
l_at_1["autotest"]="ko_check"
|
|
l_at_2=copy.deepcopy(self.config["autotest_leak"])
|
|
l_at_2.pop("enabled")
|
|
l_at_2={a: float(x) for a, x in l_at_2.items()}
|
|
l_at_2["test_pressure_qneg"]=l_at_2["test_pressure_tt_qneg"]
|
|
l_at_2["test_pressure_qpos"]=l_at_2["test_pressure_tt_qpos"]
|
|
l_at_2["autotest"]="ok_check"
|
|
leak_autotest_steps=[Steps(type="leak_1",spec=l_at_1),Steps(type="leak_1",spec=l_at_2)]
|
|
|
|
self.autotest_cycle_steps = [
|
|
*([
|
|
Steps(type="resistance", spec={
|
|
"scale": 500,
|
|
"expected": 1,
|
|
"tolerance_pos": 5,
|
|
"tolerance_neg": 5,
|
|
"autotest": True,
|
|
}),
|
|
Steps(type="resistance", spec={
|
|
"scale": 500,
|
|
"expected": float("+inf"),
|
|
"tolerance_pos": 0,
|
|
"tolerance_neg": 0,
|
|
"autotest": True,
|
|
}),
|
|
Steps(type="resistance", spec={
|
|
"scale": 500,
|
|
"expected": 10,
|
|
"tolerance_pos": 1,
|
|
"tolerance_neg": 1,
|
|
"autotest": True,
|
|
}),
|
|
Steps(type="resistance", spec={
|
|
"scale": 500,
|
|
"expected": float("+inf"),
|
|
"tolerance_pos": 0,
|
|
"tolerance_neg": 0,
|
|
"autotest": True,
|
|
}),
|
|
] if "resistance" not in self.unsupported_steps else []),
|
|
*(leak_autotest_steps),
|
|
Steps(type="done"),
|
|
Steps(type="wait"),
|
|
]
|
|
for w in self.cycle_available_steps.values():
|
|
if hasattr(w, "reset"):
|
|
w.reset()
|
|
# UPDATE RECIPE DISPLAY
|
|
if self.recipe is not None:
|
|
self.log.info(f"cycle recipe: cycle recipe: {model_to_dict(self.recipe)!r} cycle steps: {[model_to_dict(s) for s in self.cycle_steps]}")
|
|
self.recipe_l.setText(self.recipe.name)
|
|
self.recipe_l.setStyleSheet("")
|
|
self.next()
|
|
else:
|
|
self.log.info(f"cycle recipe: cycle recipe: {self.recipe!r} cycle steps: {self.cycle_steps}")
|
|
self.recipe_l.setText("NON SELEZIONATA")
|
|
self.recipe_l.setStyleSheet("QLabel { color: red; }")
|
|
|
|
def check_steps_dependencies(self, steps):
|
|
unsupported_steps = set()
|
|
missing_components = set()
|
|
for step in steps:
|
|
if step.type in self.unsupported_steps or step.type not in self.cycle_available_steps:
|
|
unsupported_steps.add(step.type)
|
|
else:
|
|
for dependency in self.steps_dependencies.get(step.type, []):
|
|
if isinstance(dependency, tuple):
|
|
if all([d not in self.components for d in dependency]):
|
|
unsupported_steps.add(step.type)
|
|
else:
|
|
unready = set()
|
|
for d in dependency:
|
|
if d in self.components:
|
|
if not self.components[d].ready:
|
|
self.components[d].reconfigure()
|
|
if not self.components[d].ready:
|
|
unready.add(d)
|
|
else:
|
|
unready.add(d)
|
|
if unready == set(dependency):
|
|
missing_components.add(" or ".join(dependency))
|
|
else:
|
|
if dependency not in self.components:
|
|
unsupported_steps.add(step.type)
|
|
else:
|
|
if not self.components[dependency].ready:
|
|
self.components[dependency].reconfigure()
|
|
if not self.components[dependency].ready:
|
|
missing_components.add(dependency)
|
|
if len(unsupported_steps):
|
|
QMessageBox.critical(None, "Errore Ricetta", f"Questa ricetta contiene uno o piu step non supportati da questo banco:\n{', '.join(sorted(unsupported_steps))}\nModificare la ricetta adeguatamente.")
|
|
if len(missing_components):
|
|
QMessageBox.critical(None, "Errore Componenti Ricetta", f"Questa ricetta richiede i seguenti componenti per essere eseguita:\n{', '.join(sorted(missing_components))}\nModificare la ricetta adeguatamente o collegare i componenti elencati.")
|
|
if len(unsupported_steps) or len(missing_components):
|
|
self.change_recipe()
|
|
|
|
def set_step(self, step_name, data=None):
|
|
if step_name not in self.data:
|
|
self.data[step_name] = {}
|
|
if data is not None:
|
|
data["step"] = model_to_dict(self.step)
|
|
data["step"].pop("name",None)
|
|
|
|
# MAKE ARRAY ONLY IF MORE THAN ONE TEST OF SAME TYPE
|
|
if len(self.data[step_name])>1:
|
|
self.data[step_name][str(len(self.data[step_name]))] = data
|
|
else:
|
|
self.data[step_name] = data
|
|
|
|
self.data["overridden"] = self.data["overridden"] or data.get("overridden", False)
|
|
self.data["ok"] = self.data["ok"] and data.get("ok", False)
|
|
self.next()
|
|
|
|
def done(self, ok=True):
|
|
self.log.info("cycle done, saving data...")
|
|
|
|
#remove useless info
|
|
self.data.get("recipe",{}).get("spec",{}).pop("steps",None)
|
|
self.data.get("recipe",{}).get("spec",{}).pop("available_steps",None)
|
|
for leak in ["leak_1","leak_2"]:
|
|
if leak in self.data.keys():
|
|
self.data[leak]["results"]={k:self.data[leak]["results"]["tecna_t3"][k] for k in ["Running test: filling pressure",
|
|
"Running test: measured leak",
|
|
"Running test: pressure at the end of settling",
|
|
"Running test: result"]
|
|
|
|
}
|
|
if "vision" in self.data:
|
|
vision_test_1 = self.data.get("vision", {}).get("0", {})
|
|
out_paths = self.components["vision_saver"].save(
|
|
save_time=vision_test_1.get("time", None),
|
|
frame=vision_test_1.get("frame", None),
|
|
# vision=vision_test_1.get("detections", None),
|
|
)
|
|
self.data.get("vision", {}).get("0", {})["files"] = out_paths
|
|
self.log.info(f"cycle vision saved locally: {out_paths!r}")
|
|
for vision in self.data.get("vision", {}).values():
|
|
vision.pop("frame", None)
|
|
vision.pop("render", None)
|
|
vision.pop("detections", None)
|
|
if "results" in vision.keys():
|
|
vision["results"].pop("results", None)
|
|
if self.autotesting:
|
|
self.data["autotest"] = True
|
|
self.data["autotest_reason"] = self.autotesting_reason
|
|
self.data["recipe"]["name"] = "AUTOTEST"
|
|
archived = Archive.archive(self.data, ok and self.data["ok"], overridden=self.data["overridden"])
|
|
self.log.info(f"cycle archived locally: {archived!r}")
|
|
if not self.autotesting:
|
|
# COUNT OK
|
|
if ok:
|
|
self.pieces["ok"] += 1
|
|
else:
|
|
self.pieces["ko"] += 1
|
|
else:
|
|
if self.autotesting_reason == "logout":
|
|
if ok:
|
|
self.main_window.open_login()
|
|
|
|
return archived
|
|
|
|
@staticmethod
|
|
def labellify(v):
|
|
if v is None:
|
|
v = "-"
|
|
elif isinstance(v, float):
|
|
v = f"{v:.1f}"
|
|
if not isinstance(v, str):
|
|
v = str(v)
|
|
return v
|
|
|
|
def print(self, archived, label):
|
|
self.log.info("cycle print")
|
|
if archived.label is not None:
|
|
raise AssertionError("this should never happen")
|
|
self.components["label_printer"].print_label(archived.label, context=None)
|
|
self.log.info("cycle printed already compiled label")
|
|
# LABEL PRINT
|
|
recipe = archived.test_data.get("recipe", {})
|
|
leak_test_1 = archived.test_data.get("leak_1", {})
|
|
leak_test_1_step = leak_test_1.get("step", {})
|
|
leak_test_1_step_spec = leak_test_1_step.get("spec", {})
|
|
leak_test_1_results = leak_test_1.get("results", {})
|
|
leak_test_2 = archived.test_data.get("leak_2", {})
|
|
leak_test_2_step = leak_test_2.get("step", {})
|
|
leak_test_2_step_spec = leak_test_2_step.get("spec", {})
|
|
leak_test_2_results = leak_test_2.get("results", {})
|
|
|
|
psetminp_a = leak_test_1_step_spec.get("test_pressure", 0) * (100+leak_test_1_step_spec.get("test_pressure_qneg", 0)/100)
|
|
psetmaxp_a = leak_test_1_step_spec.get("settling_pressure_max_percent", 0) * (100+leak_test_1_step_spec.get("test_pressure_qpos", 0)/100)
|
|
psetminp2_a = leak_test_2_step_spec.get("settling_pressure_min_percent", 0) * (100+leak_test_2_step_spec.get("test_pressure_qneg", 0)/100)
|
|
psetmaxp2_a = leak_test_2_step_spec.get("settling_pressure_max_percent", 0) * (100+leak_test_2_step_spec.get("test_pressure_qpos", 0)/100)
|
|
|
|
printer_fields = self.step.spec
|
|
context = {
|
|
# RECIPE DATA
|
|
"RECIPE": self.labellify(recipe.get("name", "-")),
|
|
"CLIENT": self.labellify(recipe.get("client", "-")),
|
|
"PART": self.labellify(recipe.get("part_number", "-")),
|
|
"DESCRIPTION": self.labellify(recipe.get("description", "-")),
|
|
# STEP SPEC
|
|
"TPREFILL": self.labellify(leak_test_1_step_spec.get("pre_filling_time", "-")),
|
|
"PPREFILL": self.labellify(leak_test_1_step_spec.get("pre_filling_pressure", "-")),
|
|
"TFILL": self.labellify(leak_test_1_step_spec.get("filling_time", "-")),
|
|
"TSET": self.labellify(leak_test_1_step_spec.get("settling_time", "-")),
|
|
"TPREFILL2": self.labellify(leak_test_2_step_spec.get("pre_filling_time", "-")),
|
|
"PPREFILL2": self.labellify(leak_test_2_step_spec.get("pre_filling_pressure", "-")),
|
|
"TFILL2": self.labellify(leak_test_2_step_spec.get("filling_time", "-")),
|
|
"TSET2": self.labellify(leak_test_2_step_spec.get("settling_time", "-")),
|
|
"PSETMINP": self.labellify(leak_test_1_step_spec.get("settling_pressure_min_percent", " -")),
|
|
"PSETMAXP": self.labellify(leak_test_1_step_spec.get("settling_pressure_max_percent", " -")),
|
|
"PSETMINP2": self.labellify(leak_test_2_step_spec.get("settling_pressure_min_percent", " -")),
|
|
"PSETMAXP2": self.labellify(leak_test_2_step_spec.get("settling_pressure_max_percent", " -")),
|
|
"PSETMINP_A": self.labellify(psetminp_a),
|
|
"PSETMAXP_A": self.labellify(psetmaxp_a),
|
|
"PSETMINP2_A": self.labellify(psetminp2_a),
|
|
"PSETMAXP2_A": self.labellify(psetmaxp2_a),
|
|
"TTEST": self.labellify(leak_test_1_step_spec.get("test_time", "-")),
|
|
"TTEST2": self.labellify(leak_test_2_step_spec.get("test_time", "-")),
|
|
"PMIN": self.labellify(leak_test_1_step_spec.get("test_pressure_qneg", "-")),
|
|
"PMIN2": self.labellify(leak_test_2_step_spec.get("test_pressure_qneg", "-")),
|
|
"PTEST": self.labellify(leak_test_1_step_spec.get("test_pressure", "-")),
|
|
"PTEST2": self.labellify(leak_test_2_step_spec.get("test_pressure", "-")),
|
|
"PMAX": self.labellify(leak_test_1_step_spec.get("test_pressure_qpos", "-")),
|
|
"TFLUSH": self.labellify(leak_test_1_step_spec.get("flush_time", "-")),
|
|
"PFLUSH": self.labellify(leak_test_1_step_spec.get("flush_pressure", "-")),
|
|
# ACTUAL TESTED VALUES
|
|
"RESPFILL": self.labellify(leak_test_1_results.get("Running test: filling pressure", "-")),
|
|
"RESPFILL2": self.labellify(leak_test_2_results.get("Running test: filling pressure", "-")),
|
|
"RESPSET": self.labellify(leak_test_1_results.get("Running test: pressure at the end of settling", "-")),
|
|
"RESPSET2": self.labellify(leak_test_2_results.get("Running test: pressure at the end of settling", "-")),
|
|
"RESLEAK": self.labellify(leak_test_1_results.get("Running test: measured leak", "-")),
|
|
"RESLEAK2": self.labellify(leak_test_2_results.get("Running test: measured leak", "-")),
|
|
"RESRES": self.labellify(leak_test_1_results.get("Running test: result", "-")),
|
|
"RESRES2": self.labellify(leak_test_2_results.get("Running test: result", "-")),
|
|
# SERIAL DEFINITION
|
|
"SN": str(archived.id),
|
|
"SN4": f"{archived.id:0>4}",
|
|
"SN5": f"{archived.id:0>5}",
|
|
"SN6": f"{archived.id:0>6}",
|
|
# TIME DEFINITION
|
|
"DATETIME": archived.time.strftime("%d/%m/%Y %H:%M:%S"),
|
|
"DATE": archived.time.strftime("%d/%m/%Y"),
|
|
"TIME": archived.time.strftime("%H:%M:%S"),
|
|
"YYYY": archived.time.strftime("%Y"),
|
|
"YY": archived.time.strftime("%y"),
|
|
"MO": archived.time.strftime("%m"),
|
|
"DD": archived.time.strftime("%d"),
|
|
"HH": archived.time.strftime("%H"),
|
|
"MI": archived.time.strftime("%M"),
|
|
"SS": archived.time.strftime("%S"),
|
|
"JJJ": archived.time.strftime("%j"),
|
|
# EXTRA DATA
|
|
"SHIFT": str(get_shift(archived.time)),
|
|
"STATION": str(self.config.machine_id),
|
|
"OPERATOR": str(archived.user.username),
|
|
"BADGE_NUM": str(archived.user.badge_number),
|
|
|
|
# RESULT
|
|
"RESULT": str("CONFORME" if leak_test_1_results.get("ok", False) else "SCARTO") + str(" FORZATO" if self.data.get("overridden", False) else ""),
|
|
"RESULT_L1": "ESITO" + str(" FORZATO" if self.data.get("overridden", False) else ""),
|
|
"RESULT_L2": str("CONFORME" if leak_test_1_results.get("ok", False) else "SCARTO"),
|
|
}
|
|
for n in range(5):
|
|
field=f"labeltxt_{n+1}"
|
|
if field in printer_fields.keys():
|
|
if printer_fields[field] !="":
|
|
context[field.upper()]=printer_fields[field]
|
|
|
|
# PRINT MAIN PRODUCT LABEL
|
|
compiled_label = self.components["label_printer"].print_label(label, context=context)
|
|
self.log.info(f"Main label printed: {context!r}")
|
|
return compiled_label
|
|
|
|
def print_extra_labels(self):
|
|
# PRINT EXTRA LABELS IF NEEDED (BEFORE LEAK TEST)
|
|
if "extra_label_printer" in self.components.keys() and self.print_step is not None:
|
|
printer_fields = self.print_step.spec
|
|
if len(printer_fields["extra_label"]) > 0:
|
|
labels = printer_fields["extra_label"].split(",")
|
|
for label in labels:
|
|
self.components["extra_label_printer"].print_label(f"{label}.prn", context=None)
|
|
|
|
@pyqtSlot(str)
|
|
def load_recipe_from_rfid(self,data):
|
|
if self.step.type == "barcode_recipe_selection":
|
|
if data is not None:
|
|
self.cycle_available_steps["barcode_recipe_selection"].widget.get(data)
|
|
else:
|
|
# fixture removed
|
|
self.fail_cycle()
|
|
self.change_recipe()
|
|
else:
|
|
if data is not None:
|
|
self.set_recipe_mode_barcode()
|