st-ten-1/src/ui/test/test.py
2025-01-28 18:29:00 +01:00

830 lines
44 KiB
Python
Executable File

import copy
import logging
import os
import sys
import weakref
from datetime import datetime, timedelta
from PyQt5.QtCore import QTimer, pyqtSlot, pyqtSignal
from PyQt5.QtWidgets import QMessageBox
from lib.db import Archive, Recipes, Users
from lib.helpers import get_shift
from lib.helpers.step import Step
from playhouse.shortcuts import model_to_dict
from ui.barcode_recipe_selection import Barcode_Recipe_Selection
from ui.helpers import replace_widget
from ui.recipe_selection import Recipe_Selection
from ui.test_assembly import Test_Assembly
from ui.test_barcodes import Test_Barcodes
from ui.test_connector import Test_Connector
from ui.test_count import Test_Count
from ui.test_count_end import Test_Count_End
from ui.test_fail import Test_Fail
from ui.test_instructions import Test_Instructions
from src.ui.test_pipe_cutter import Test_Pipe_Cutter
from ui.test_leak import Test_Leak
from ui.test_resistance import Test_Resistance
from ui.test_screws import Test_Screws
from ui.test_vision import Test_Vision
from ui.test_warning_img import Test_Warning_Img
from ui.widget import Widget
from components import ArchiveSynchronizer
from src.components.rfid_pn532 import RFID_PN532
class Test(Widget):
def __init__(self, config, components=None, main_window=None):
super().__init__()
self.autotest_timer = None
self.main_window = main_window
self.config = config
self.components = components
# GET LOGGER
self.log = logging.getLogger("Test")
# SHOW MACHINE DESCRIPTION
self.machine_description_l.setText(self.config.get("machine", {}).get("description", "N/A"))
# SHOW USERNAME
session = Users.get_session()
self.user_l.setText(session.username)
if session.is_admin:
self.user_l.setStyleSheet("QLabel { color: red; }")
else:
self.user_l.setStyleSheet("")
# SHOW AND UPDATE TIME CLOCK
self.refresh_time(init=True)
# INIT RECIPE
self.recipe = None
self.auto_import = Recipe_Selection
self.archive_synch = ArchiveSynchronizer(config=config)
self.rfid = RFID_PN532(config=config)
self.error_label.setText("")
self.error_label.setStyleSheet("QLabel { color: red; }")
self.rfid.rfid_error_signal.connect(self.handle_rfid_error)
if self.config["hardware_config"]["barcode_recipe_selection"] == "present":
self.recipe_selection_mode = "barcode"
else:
self.recipe_selection_mode = "table"
self.step = None
self.tester_component = None
if self.config["hardware_config"]["tecna_t3"] == "present":
self.tester_component = "tecna_t3"
elif self.config["hardware_config"]["furness_controls"] == "present":
self.tester_component = "furness_control"
self.unsupported_steps = set()
self.steps_dependencies = {
"count": set(),
"connector": {"multicomp", },
"instruction": {"digital_io"},
"screws": {"screwdriver", "tecna_t3", },
"resistance": {"multicomp", },
"leak_1": {self.tester_component, },
"leak_2": {self.tester_component, },
"pipe_cutter": {"pipe_cutter"},
"vision": {("uvc_camera", "galaxy_camera","hikrobot_sc"), "vision", "vision_saver", }, # "neo_pixels", },
"print": {"label_printer_2"} if self.config["hardware_config"]["label_printer"] != "present" else {"label_printer"},
}
self.unsupported_steps = set()
for step_name, dependencies in self.steps_dependencies.items():
for dependency in dependencies:
if isinstance(dependency, tuple):
# if all([d not in self.components or not self.components[d].ready for d in dependency]):
if all([d not in self.components for d in dependency]):
self.unsupported_steps.add(step_name)
else:
# if dependency not in self.components or not self.components[dependency].ready:
if dependency not in self.components:
self.unsupported_steps.add(step_name)
# INIT PIECES COUNTER
self.pieces = {"ok": 0, "ko": 0}
# INIT CYCLE STATES
self.cycle_available_steps = {
# "assembly_1": Test_Assembly(img_path=self.select_step_img("assembly_1"), text=u"INSERIRE SENSORE", widget=None),
"barcodes": Test_Assembly(img_path=self.select_step_img("scan"), text=u"LEGGERE IL BARCODE DEL PEZZO DA COLLAUDARE", widget=Test_Barcodes()),
"connector": Test_Assembly(img_path=self.select_step_img("scan"), text=u"COLLEGARE IL CONNETTORE INDICATO AL PEZZO E LEGGERE IL SUO BARCODE", widget=Test_Connector(run_once=True)),
"count": Test_Assembly(img_path=None, text=u"INSERIRE IL NUMERO DI PEZZI ATTESI PER IL LOTTO",
widget=Test_Count(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces, run_once=True)),
"warning_img": Test_Assembly(img_path=None, text=u"ATTENZIONE - PER QUESTO CODICE ESEGUIRE LE OPERAZIONI INDICATE IN FIGURA",
widget=Test_Warning_Img(components=self.components, recipe=self.recipe,bench_name=self.config["machine"]["image_for_warning"], step=self.step, run_once=True)),
"count_end": Test_Assembly(img_path=None, text=u"LOTTO TERMINATO, PREMERE CONTINUA PERCOMINCIARNE UNO NUOVO",
widget=Test_Count_End(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)),
"done": Test_Assembly(img_path=self.select_step_img("success"), text=u"COLLAUDO COMPLETATO", widget=None),
"emergency": Test_Assembly(img_path=self.select_step_img("reset_emergency"),
text=u"EMERGENZA INTERVENUTA - RIPRISTINARE PULSANTE E SELEZIONARE \"RESET EMERGENZA\" DAL MEN\u00d9 \"STRUMENTI\"", widget=None),
"fail": Test_Assembly(img_path=self.select_step_img("fail"), text=u"CICLO INTERROTTO, PREMERE CONTINUA PER COMINCIARE UN NUOVO CICLO", widget=Test_Fail(parent=self)),
"blow": Test_Assembly(img_path=None, text=u"SOFFIAGGIO TUBO IN CORSO - ATTENDERE...", widget=Test_Warning_Img(components=self.components, recipe=self.recipe, step=self.step)),
"leak_1": Test_Assembly(img_path=None, text=None, widget=Test_Leak(config=self.config,components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces, parent=self))
if self.config["hardware_config"]["tecna_t3"] != "absent" or self.config["hardware_config"]["furness_controls"] !="absent" else None,
"leak_2": Test_Assembly(img_path=None, text=None, widget=Test_Leak(config=self.config,components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces, parent=self))
if self.config["hardware_config"]["tecna_t3"] != "absent" or self.config["hardware_config"]["furness_controls"] != "absent" else None,
"flush": Test_Assembly(img_path=None, text=u"SCARICO ARIA IN CORSO - ATTENDERE...", widget=Test_Warning_Img(components=self.components, recipe=self.recipe, step=self.step)),
"instruction": Test_Assembly(img_path=None, text=u"ESEGUIRE LE OPERAZIONI DI MONTAGGIO INDICATE IN FIGURA",
widget=Test_Instructions(config=self.config,components=self.components, recipe=self.recipe, bench_name=self.config.machine_id, step=self.step)),
"pipe_cutter": Test_Assembly(img_path=None, text=u"ATTENZIONE TAGLIO CORRUGATO IN CORSO",widget=Test_Pipe_Cutter(config=self.config, components=self.components,recipe=self.recipe, bench_name=self.config.machine_id,step=self.step)),
"instruction_extra": Test_Assembly(img_path=None, text=u"ESEGUIRE LE OPERAZIONI DI MONTAGGIO EXTRA INDICATE IN FIGURA",
widget=Test_Instructions(config=self.config, components=self.components,recipe=self.recipe, bench_name=self.config.machine_id,step=self.step)),
"piece_removal": Test_Assembly(img_path=None, text=u"RIMUOVERE IL PEZZO APRENDO TUTTE LE CHIUSURE",
widget=Test_Instructions(config=self.config,components=self.components, recipe=self.recipe, bench_name=self.config.machine_id, step=self.step)),
"print": Test_Assembly(img_path=self.select_step_img("print"), text=u"STAMPA ETICHETTA IN CORSO", widget=None),
"resistance": Test_Assembly(img_path=None, text=u"COLLEGARE CONNETTORE ELETTRICO PER EFFETTUARE PROVA RESISTENZA",
widget=Test_Resistance(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)),
"screws": Test_Assembly(img_path=None, text=u"AVVITARE TUTE LE VITI COME INDICATO", widget=Test_Screws(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)),
"select_recipe": Test_Assembly(img_path=None, text=u"SELEZIONARE IL CODICE DA COLLAUDARE", widget=Recipe_Selection(config=self.config, unsupported_steps=self.unsupported_steps)),
"barcode_recipe_selection": Test_Assembly(img_path=self.select_step_img("scan"), text=u"LEGGERE IL BARCODE SULLA DIMA DEL COMPONENTE DA COLLAUDARE",
widget=Barcode_Recipe_Selection(parent=self)),
"vision": Test_Assembly(img_path=None, text=u"VERIFICARE CONTROLLO CON TELECAMERA", widget=Test_Vision(components=self.components, recipe=self.recipe, step=self.step, pieces=self.pieces)),
"wait": Test_Assembly(img_path=self.select_step_img("wait"), text=u"ATTENDERE - PAUSA INTER CICLO", widget=None),
None: Test_Assembly(img_path=self.select_step_img("warning"), text=u"ATTENZIONE - LA RICETTA SELEZIONATA NON CONTIENE FASI DI TEST", widget=None),
}
self.cycle_steps = None
self.cycle_index = -1
self.print_step = None
self.last_label = None
self.require_discard_piece = False
# SETUP AUTOTEST
self.autotest_request = False
self.autotesting = False
self.autotesting_reason = None
self.autotest_cycle_steps = None
if "--no-autotest" not in sys.argv:
self.autotest_period = int(8.5 * 60 * 60 * 1000) # 8.5 HOURS
# self.autotest_period = 12 * 60 * 60 * 1000 # 12 HOURS
# if not self.config["autotest_done"]:
# self.request_autotest("init")
else:
self.autotest_period = None
# INIT TEST DATA
self.data = {"ok": True, "overridden": False}
self.archived = None
# CONNECT CYCLE CONTROLS
self.cancel_b.clicked.connect(self.fail_cycle)
self.change_recipe_b.clicked.connect(self.change_recipe)
self.reset_count_b.clicked.connect(self.reset_count)
for step_name, w in self.cycle_available_steps.items():
if hasattr(w, "ok"):
# custom ok handlers should call next again
if isinstance(w.widget, Recipe_Selection):
w.ok.connect(self.set_recipe)
else:
w.ok.connect(lambda data=None, step_namel=step_name, selfie=weakref.ref(self): selfie().set_step(step_namel, data))
if hasattr(w, "ko"):
w.ko.connect(self.fail_cycle)
# CUSTOM STEP CONNECTIONS
self.cycle_available_steps["count"].ok.connect(self.cycle_available_steps["count_end"].widget.set_amount)
# self.cycle_available_steps["warning_img"].ok.connect(self.cycle_available_steps["warning_img"].widget.set_done)
if "fixture_id" in self.components.keys():
self.components["fixture_id"].new_id_signal.connect(self.load_recipe_from_rfid)
self.components["fixture_id"].rfid_error_signal.connect(self.handle_rfid_error)
self.tag_loaded_recipe = self.main_window.tag_loaded_recipe
# TESTING
if "--test" in sys.argv:
self.testing = True
else:
self.testing = False
# /TESTING
# START CYCLE
self.next_timer = QTimer()
self.next_timer.setSingleShot(True)
self.next_timer.timeout.connect(self.next)
self.next()
def refresh_time(self, init=False):
if init:
self.time_timer = QTimer()
self.time_timer.setSingleShot(True)
self.time_timer.timeout.connect(self.refresh_time)
t = datetime.now()
self.time_l.setText("{d}/{mo}/{y}\n{h}:{m}".format(y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute))
self.time_timer.start(60 - t.second)
def select_step_img(self, step, suffix=None):
img_path = "./src/ui/imgs"
names = []
if suffix is not None:
names.append(f"{step}_{suffix}_{self.config.machine_id}")
names.append(f"{step}_{suffix}")
names.append(f"{step}_{self.config.machine_id}")
names.append(f"{step}")
for name in names:
for ext in ["png", "jpg"]:
path = f"{img_path}/{name}.{ext}"
if os.path.isfile(path):
return path
raise FileNotFoundError(f"No image was found for step {step}")
def change_recipe(self):
self.next(action="change_recipe")
def set_recipe_mode_table(self):
self.recipe_selection_mode = "table"
self.change_recipe()
def set_recipe_mode_barcode(self):
self.recipe_selection_mode = "barcode"
self.change_recipe()
def reprint_label(self):
self.print(self.last_label, self.print_step.spec.get("template", "EtichettaR5"))
def fail_cycle(self):
self.next(action="fail")
def setCentralWidget(self, widget):
replace_widget(self, "centralWidget", widget)
def request_autotest(self, reason): # you can cancel the request calling request_autotest(False)
if "--no-autotest" not in sys.argv:
self.log.info(f"cycle request autotest: reason: {reason!r} autotest_request: {self.autotest_request!r}")
if reason in ("init", "login"):
self.autotest_timer = QTimer()
self.autotest_timer.setSingleShot(False)
self.autotest_timer.timeout.connect(self.request_periodic_autotest)
if self.autotest_period is not None:
self.autotest_timer.start(self.autotest_period)
reason = "boot"
if self.config["autotest_leak"]["enabled"] == "true":
self.autotest_request = reason
else:
self.autotest_request = False
else:
self.log.info(f"Autotest request ignored (reason: {reason!r}) --no-autotest flag detected")
if reason == "logout":
self.next(action="abort")
def request_periodic_autotest(self):
self.request_autotest("periodic")
def next(self, action=None):
if self.step is not None:
self.log.info(f"cycle step: {self.step.step_type!r} action: {action!r} current index:{self.cycle_index}")
else:
self.log.info(f"cycle step: {self.step!r} action: {action!r} current index:{self.cycle_index}")
current_w = self.centralWidget
if hasattr(current_w, "stop"):
self.log.info(f"stopping widget {self.step.step_type}")
current_w.stop()
if action == "change_recipe":
self.log.info(f"cycle next: action: {action!r}")
self.set_recipe(recipe=None)
if self.config["hardware_config"]["tecna_t3"] == "present" or self.config["hardware_config"]["furness_controls"] == "present":
self.cycle_available_steps["leak_1"].widget.recipe_written = False
self.cycle_available_steps["leak_2"].widget.recipe_written = False
self.step = Step(step_type="select_recipe")
self.cycle_index = -1
self.recipe = None
self.cycle_steps = None
# COUNT RESET
self.pieces["ok"] = 0
self.pieces["ko"] = 0
elif action in ("fail", "abort"):
self.log.info(f"cycle next: action: {action!r}")
# FAIL AND RESTART TEST
self.step = Step(step_type="fail")
self.cycle_index = -1
# COUNT FAIL
if action == "fail":
self.done(ok=False)
elif action is not None:
raise NotImplementedError(f"cycle next: action {action!r} is not a valid action")
# if action did not set the next cycle step
# set next cycle step normally
if self.recipe is None or self.cycle_steps is None:
# if recipe not set: select_recipe
if self.recipe_selection_mode == "barcode":
self.log.info(f"returning to barcode recipe selection")
self.step = Step(step_type="barcode_recipe_selection")
else:
self.log.info(f"returning to recipe selection table")
self.step = Step(step_type="select_recipe")
elif action is None:
if self.autotest_request is not False and self.autotest_cycle_steps is not None and not self.autotesting and (self.cycle_index == -1 or self.cycle_index + 1 >= len(self.cycle_steps)):
# if autotest was requested
# and if cycle_steps is not started or has ended
self.autotesting = True
self.autotesting_reason = self.autotest_request
self.autotest_request = False
self.log.info(f"Autotest requested (reason: {self.autotesting_reason})")
if self.autotest_period is not None: # reset periodic autotest timer
self.time_timer.start(self.autotest_period)
self.require_discard_piece = False
if self.autotesting:
if self.cycle_index + 1 < len(self.autotest_cycle_steps):
# goto next step in autotest_cycle_steps
self.cycle_index = (self.cycle_index + 1) % len(self.autotest_cycle_steps)
self.step = self.autotest_cycle_steps[self.cycle_index]
else:
# autotest ended
self.autotesting = False
if self.autotesting_reason == "logout":
Users.logout()
self.main_window.open_login()
else:
t = datetime.now()
self.last_at_l.setText("{d}/{mo}/{y} {h}:{m}".format(y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute))
t += timedelta(seconds=int(self.autotest_period / 1000))
self.next_at_l.setText("{d}/{mo}/{y} {h}:{m}".format(y=t.year, mo=t.month, d=t.day, h=t.hour, m=t.minute))
self.autotesting_reason = None
self.cycle_index = -1
self.config["autotest_done"] = True
if not self.autotesting:
if len(self.cycle_steps):
# goto next step in cycle_steps
self.cycle_index = (self.cycle_index + 1) % len(self.cycle_steps)
self.step = self.cycle_steps[self.cycle_index]
else:
self.cycle_index = -1
self.step = Step(step_type=None)
# enable/disable cycle controls
self.change_recipe_b.setEnabled(self.recipe is not None)
self.cancel_b.setEnabled(self.step.step_type is not None and self.step.step_type not in {
"emergency",
"fail",
"select_recipe",
"wait",
})
self.log.info(f"next cycle step: {self.step.step_type!r}")
# INIT TEST DATA IF STARTING CYCLE LOOP OR IF RESET IS NEEDED
if self.cycle_index == 0:
self.data = {"ok": True, "overridden": False}
self.archived = None
if self.recipe is not None and "recipe" not in self.data:
self.data["recipe"] = model_to_dict(self.recipe)
w = self.cycle_available_steps[self.step.step_type]
show = None
if self.step.step_type == "leak_2":
self.setCentralWidget(w) # NEED TO PRESHOW UI
if hasattr(w, "start"):
show = w.start(recipe=self.recipe, step=self.step, pieces=self.pieces)
if show is not False and w is not current_w:
self.setCentralWidget(w)
elif show is False:
self.next_timer.start(0)
if self.step.step_type == "done":
self.archived = self.done()
self.last_label = copy.deepcopy(self.archived)
self.next_timer.start(500)
elif self.step.step_type == "print":
compiled_label = self.print(self.archived, self.step.spec.get("template", "EtichettaR5"))
self.archived.label = compiled_label
self.log.info(f"Label printed. Saving...")
#self.archived.save()
self.main_window.main_window.run_request.emit(self.archived.save, [], {})
self.next_timer.start(500)
elif self.step.step_type == "wait":
self.next_timer.start(500)
# UPDATE COUNT DISPLAY
self.update_count_display()
def reset_count(self):
# COUNT RESET
self.pieces["ok"] = 0
self.pieces["ko"] = 0
self.update_count_display()
def update_count_display(self):
self.pieces_count_l.setText(f"{self.pieces['ok']} OK / {self.pieces['ko']} NOK / {sum(self.pieces.values())} TOT")
def set_recipe(self, recipe=None):
self.recipe = recipe
self.config.active_recipe = recipe
inserted_instruction = False
self.require_discard_piece = False
if self.recipe is None:
self.cycle_steps = None
self.autotest_cycle_steps = None
else:
steps = self.recipe.get_steps()
skip = set()
print_found = False
count_found = False
# create step sequence list
barcode_names = ['serial', 'barcode_input_2', 'barcode_input_3', 'barcode_input_4', 'barcode_input_5']
for i, step in enumerate(steps):
if step.step_type == "barcodes":
n_pieces = int(step.spec.get("n_pieces", 1))
n_pieces_adapted = n_pieces
if n_pieces_adapted == 1:
step.spec["barcode_name"] = 'serial'
else:
step.spec["barcode_name"] = barcode_names[(n_pieces_adapted - 1) % len(barcode_names)]
n_pieces_adapted -= 1
new_barcode_step = copy.deepcopy(step)
new_barcode_step.spec["n_pieces"] = str(n_pieces_adapted)
steps.insert(i + 1, new_barcode_step)
if i in skip:
continue
if step.step_type == "vision":
self.components["vision"].config_changed(vision_recipe=self.recipe.name)
if step.step_type == "count":
count_found = True
if "warning_img" in step.spec:
if step.spec["warning_img"]:
steps.insert(i, Step(step_type="warning_img", spec={"warning_img": step.spec["warning_img"]}))
skip.add(i + 1)
if "assembly" in step.spec:
if step.spec["assembly"]:
steps.insert(i, Step(step_type="instructions", spec={}))
skip.add(i + 1)
if "require_discard_piece" in step.spec:
if step.spec["require_discard_piece"]:
self.require_discard_piece = True
if step.step_type == "resistance": # ADD STEP TO ENSURE REMOVAL OF CONNECTOR
steps.insert(i + 1, Step(step_type="resistance", spec={
"scale": 500,
"expected": float("+inf"),
"tolerance_pos": 0,
"tolerance_neg": 0,
}))
skip.add(i + 1)
if step.step_type == "print":
if print_found:
continue
steps.insert(i, Step(step_type="done"))
print_found = True
self.print_step = step
if self.config["hardware_config"].get("enforce_piece_removal", "no") == "yes":
steps.append(Step(step_type="piece_removal"))
if count_found:
steps.append(Step(step_type="count_end"))
if step.step_type in ("leak_1", "leak_2"):
self.leak_step = step
step_types = [step.step_type for step in steps]
if "leak_1" in step_types and "leak_2" in step_types:
leak1_index = step_types.index("leak_1")
leak2_index = step_types.index("leak_2")
if leak1_index + 1 == leak2_index: # Ensure 'leak_1' is immediately followed by 'leak_2'
if self.config["hardware_config"].get("second_leak_test", "yes") == "no":
steps.insert(leak2_index, Step(step_type="instruction_extra", spec={}))
inserted_instruction = True
# Insert 'instruction_extra' after the first 'instructions' if not inserted between leaks
if not inserted_instruction:
for i, step in enumerate(steps):
if step.step_type == "instructions":
steps.insert(i + 1, Step(step_type="instruction_extra", spec={}))
inserted_instruction = True
break
if not print_found:
steps.append(Step(step_type="done"))
if count_found:
steps.append(Step(step_type="count_end"))
steps.append(Step(step_type="wait"))
self.cycle_steps = steps
self.check_steps_dependencies(self.cycle_steps)
leak_autotest_steps = []
# CONFIGURE LEAK AUTOTEST PARAMETERS
if self.config["autotest_leak"]["enabled"] == "true":
l_at_1 = copy.deepcopy(self.config["autotest_leak"])
l_at_1.pop("enabled")
l_at_1 = {a: float(x) for a, x in l_at_1.items()}
l_at_1["autotest"] = "ko_check"
l_at_2 = copy.deepcopy(self.config["autotest_leak"])
l_at_2.pop("enabled")
l_at_2 = {a: float(x) for a, x in l_at_2.items()}
l_at_2["test_pressure_qneg"] = l_at_2["test_pressure_tt_qneg"]
l_at_2["test_pressure_qpos"] = l_at_2["test_pressure_tt_qpos"]
l_at_2["autotest"] = "ok_check"
leak_autotest_steps = [Step(step_type="leak_1", spec=l_at_1), Step(step_type="leak_1", spec=l_at_2)]
self.autotest_cycle_steps = [
*([
Step(step_type="resistance", spec={
"scale": 500,
"expected": 1,
"tolerance_pos": 5,
"tolerance_neg": 5,
"autotest": True,
}),
Step(step_type="resistance", spec={
"scale": 500,
"expected": float("+inf"),
"tolerance_pos": 0,
"tolerance_neg": 0,
"autotest": True,
}),
Step(step_type="resistance", spec={
"scale": 500,
"expected": 10,
"tolerance_pos": 1,
"tolerance_neg": 1,
"autotest": True,
}),
Step(step_type="resistance", spec={
"scale": 500,
"expected": float("+inf"),
"tolerance_pos": 0,
"tolerance_neg": 0,
"autotest": True,
}),
] if "resistance" not in self.unsupported_steps else []),
*(leak_autotest_steps),
Step(step_type="done"),
Step(step_type="wait"),
]
for w in self.cycle_available_steps.values():
if hasattr(w, "reset"):
w.reset()
# UPDATE RECIPE DISPLAY
if self.recipe is not None:
self.log.info(f"set recipe: {model_to_dict(self.recipe)!r} cycle steps: {[s.step_type for s in self.cycle_steps]}")
self.recipe_l.setText(self.recipe.name)
self.recipe_l.setStyleSheet("")
self.cycle_index = -1
self.next()
else:
self.log.info(f"set recipe: {self.recipe!r} cycle steps: {self.cycle_steps}")
self.recipe_l.setText("NON SELEZIONATA")
self.recipe_l.setStyleSheet("QLabel { color: red; }")
def check_steps_dependencies(self, steps):
unsupported_steps = set()
missing_components = set()
for step in steps:
if step.step_type in self.unsupported_steps or step.step_type not in self.cycle_available_steps:
unsupported_steps.add(step.step_type)
else:
for dependency in self.steps_dependencies.get(step.step_type, []):
if isinstance(dependency, tuple):
if all([d not in self.components for d in dependency]):
unsupported_steps.add(step.step_type)
else:
unready = set()
for d in dependency:
if d in self.components:
if not self.components[d].ready:
self.components[d].reconfigure()
if not self.components[d].ready:
unready.add(d)
else:
unready.add(d)
if unready == set(dependency):
missing_components.add(" or ".join(dependency))
else:
if dependency not in self.components:
unsupported_steps.add(step.step_type)
else:
if not self.components[dependency].ready:
self.components[dependency].reconfigure()
if not self.components[dependency].ready:
missing_components.add(dependency)
if len(unsupported_steps):
QMessageBox.critical(None, "Errore Ricetta",
f"Questa ricetta contiene uno o piu step non supportati da questo banco:\n{', '.join(sorted(unsupported_steps))}\nModificare la ricetta adeguatamente.")
if len(missing_components):
QMessageBox.critical(None, "Errore Componenti Ricetta",
f"Questa ricetta richiede i seguenti componenti per essere eseguita:\n{', '.join(sorted(missing_components))}\nModificare la ricetta adeguatamente o collegare i componenti elencati.")
if len(unsupported_steps) or len(missing_components):
self.change_recipe()
def set_step(self, step_name, data=None):
if step_name not in self.data:
self.data[step_name] = {}
if data is not None:
data["step"] = self.step.spec
data["step"].pop("name", None)
# MAKE ARRAY ONLY IF MORE THAN ONE TEST OF SAME TYPE
if len(self.data[step_name]) > 1:
self.data[step_name][str(len(self.data[step_name]))] = data
else:
self.data[step_name] = data
self.data["overridden"] = self.data["overridden"] or data.get("overridden", False)
self.data["ok"] = self.data["ok"] and data.get("ok", False)
self.next()
def done(self, ok=True):
self.log.info("cycle done, saving data...")
# remove useless info
self.data.get("recipe", {}).get("spec", {}).pop("steps", None)
for leak in ["leak_1", "leak_2"]:
if leak in self.data.keys():
results = {k: self.data[leak]["results"][self.tester_component][k] for k in ["Running test: result"]}
results.update({k: round(float(self.data[leak]["results"][self.tester_component][k]), 2) for k in ["Running test: filling pressure",
"Running test: measured leak",
"Running test: pressure at the end of settling"]}
)
self.data[leak]["results"] = results
if "vision" in self.data:
vision = self.data.get("vision", {})
out_paths = self.components["vision_saver"].save(
save_time=vision.get("time", None),
frame=vision.get("frame", None),
# vision=vision_test_1.get("detections", None),
)
vision["files"] = out_paths
self.log.info(f"cycle vision saved locally: {out_paths!r}")
vision.pop("frame", None)
vision.pop("render", None)
vision.pop("detections", None)
if "results" in vision.keys():
vision["results"].pop("results", None)
if self.autotesting:
self.data["autotest"] = True
self.data["autotest_reason"] = self.autotesting_reason
self.data["recipe"]["name"] = "AUTOTEST"
archived = Archive.archive(self.data, ok and self.data["ok"], overridden=self.data["overridden"])
self.log.info(f"cycle archived locally: {archived!r}")
if not self.autotesting:
# COUNT OK
if ok:
self.pieces["ok"] += 1
else:
self.pieces["ko"] += 1
else:
if self.autotesting_reason == "logout":
if ok:
self.main_window.open_login()
return archived
@staticmethod
def labellify(v):
if v is None:
v = "-"
elif isinstance(v, float):
v = f"{v:.1f}"
if not isinstance(v, str):
v = str(v)
return v
def print(self, archived, label):
self.log.info("cycle print")
if archived is None:
self.log.error("attempting to print empty label")
return None
if archived.label is not None:
# raise AssertionError("this should never happen")
self.log.info("printing already compiled label")
# LABEL PRINT
recipe = archived.test_data.get("recipe", {})
leak_test_1 = archived.test_data.get("leak_1", {})
leak_test_1_step = leak_test_1.get("step", {})
leak_test_1_step_spec = leak_test_1_step.get("spec", {})
leak_test_1_results = leak_test_1.get("results", {})
leak_test_2 = archived.test_data.get("leak_2", {})
leak_test_2_step = leak_test_2.get("step", {})
leak_test_2_step_spec = leak_test_2_step.get("spec", {})
leak_test_2_results = leak_test_2.get("results", {})
psetminp_a = leak_test_1_step_spec.get("test_pressure", 0) * (100 + leak_test_1_step_spec.get("test_pressure_qneg", 0) / 100)
psetmaxp_a = leak_test_1_step_spec.get("settling_pressure_max_percent", 0) * (100 + leak_test_1_step_spec.get("test_pressure_qpos", 0) / 100)
psetminp2_a = leak_test_2_step_spec.get("settling_pressure_min_percent", 0) * (100 + leak_test_2_step_spec.get("test_pressure_qneg", 0) / 100)
psetmaxp2_a = leak_test_2_step_spec.get("settling_pressure_max_percent", 0) * (100 + leak_test_2_step_spec.get("test_pressure_qpos", 0) / 100)
if self.tester_component is not None:
if self.recipe.spec["leak_1"]:
leak_test_1_results["Running test: pressure at the end of measure"] = (
leak_test_1_results["Running test: pressure at the end of settling"]
+ leak_test_1_results["Running test: measured leak"])
if self.recipe.spec["leak_2"]:
leak_test_2_results["Running test: pressure at the end of measure"] = (
leak_test_2_results["Running test: pressure at the end of settling"]
+ leak_test_2_results["Running test: measured leak"])
printer_fields = self.print_step.spec
context = {
# RECIPE DATA
"RECIPE": self.labellify(recipe.get("name", "-")),
"CLIENT": self.labellify(recipe.get("client", "-")),
"PART": self.labellify(recipe.get("part_number", "-")),
"DESCRIPTION": self.labellify(recipe.get("description", "-")),
"RECIPE_TO_PRINT": self.labellify(self.print_step.spec.get("labeltxt_5", "-").replace('{N11}', '')),
# STEP SPEC
"TPREFILL": self.labellify(leak_test_1_step.get("pre_filling_time", "-")),
"PPREFILL": self.labellify(leak_test_1_step.get("pre_filling_pressure", "-")),
"TFILL": self.labellify(leak_test_1_step.get("filling_time", "-")),
"TSET": self.labellify(leak_test_1_step.get("settling_time", "-")),
"TPREFILL2": self.labellify(leak_test_2_step.get("pre_filling_time", "-")),
"PPREFILL2": self.labellify(leak_test_2_step.get("pre_filling_pressure", "-")),
"TFILL2": self.labellify(leak_test_2_step.get("filling_time", "-")),
"TSET2": self.labellify(leak_test_2_step.get("settling_time", "-")),
"PSETMINP": self.labellify(leak_test_1_step.get("settling_pressure_min_percent", " -")),
"PSETMAXP": self.labellify(leak_test_1_step.get("settling_pressure_max_percent", " -")),
"PSETMINP2": self.labellify(leak_test_2_step.get("settling_pressure_min_percent", " -")),
"PSETMAXP2": self.labellify(leak_test_2_step.get("settling_pressure_max_percent", " -")),
"PSETMINP_A": self.labellify(psetminp_a),
"PSETMAXP_A": self.labellify(psetmaxp_a),
"PSETMINP2_A": self.labellify(psetminp2_a),
"PSETMAXP2_A": self.labellify(psetmaxp2_a),
"TTEST": self.labellify(leak_test_1_step.get("test_time", "-")),
"TTEST2": self.labellify(leak_test_2_step.get("test_time", "-")),
"PMIN": self.labellify(leak_test_1_step.get("test_pressure_qneg", "-")),
"PMIN2": self.labellify(leak_test_2_step.get("test_pressure_qneg", "-")),
"PTEST": self.labellify(leak_test_1_step.get("test_pressure", "-")),
"PTEST2": self.labellify(leak_test_2_step.get("test_pressure", "-")),
"PMAX": self.labellify(leak_test_1_step.get("test_pressure_qpos", "-")),
"TFLUSH": self.labellify(leak_test_1_step.get("flush_time", "-")),
"PFLUSH": self.labellify(leak_test_1_step.get("flush_pressure", "-")),
# ACTUAL TESTED VALUES
"RESPFILL": self.labellify(leak_test_1_results.get("Running test: filling pressure", "-")),
"RESPFILL2": self.labellify(leak_test_2_results.get("Running test: filling pressure", "-")),
"RESPSET": self.labellify(leak_test_1_results.get("Running test: pressure at the end of settling", "-")),
"RESPSET2": self.labellify(leak_test_2_results.get("Running test: pressure at the end of settling", "-")),
"RESPEND": self.labellify(leak_test_1_results.get("Running test: pressure at the end of measure", "-")),
"RESPEND2": self.labellify(leak_test_1_results.get("Running test: pressure at the end of measure", "-")),
"RESLEAK": self.labellify(leak_test_1_results.get("Running test: measured leak", "-")),
"RESLEAK2": self.labellify(leak_test_2_results.get("Running test: measured leak", "-")),
"RESRES": self.labellify(leak_test_1_results.get("Running test: result", "-")),
"RESRES2": self.labellify(leak_test_2_results.get("Running test: result", "-")),
# SERIAL DEFINITION
"SN": str(archived.id),
"SN4": f"{archived.id:0>4}",
"SN5": f"{archived.id:0>5}",
"SN6": f"{archived.id:0>6}",
# TIME DEFINITION
"DATETIME": archived.time.strftime("%d/%m/%Y %H:%M:%S"),
"DATE": archived.time.strftime("%d/%m/%Y"),
"TIME": archived.time.strftime("%H:%M:%S"),
"YYYY": archived.time.strftime("%Y"),
"YY": archived.time.strftime("%y"),
"MO": archived.time.strftime("%m"),
"DD": archived.time.strftime("%d"),
"HH": archived.time.strftime("%H"),
"MI": archived.time.strftime("%M"),
"SS": archived.time.strftime("%S"),
"JJJ": archived.time.strftime("%j"),
# EXTRA DATA
"SHIFT": str(get_shift(archived.time)),
"STATION": str(self.config.machine_id),
"OPERATOR": str(archived.user.username),
"BADGE_NUM": str(archived.user.badge_number),
# RESULT
"RESULT": str("CONFORME" if leak_test_1.get("ok", False) else "SCARTO") + str(" FORZATO" if self.data.get("overridden", False) else ""),
"RESULT_L1": "ESITO" + str(" FORZATO" if self.data.get("overridden", False) else ""),
"RESULT_L2": str("CONFORME" if leak_test_1.get("ok", False) else "SCARTO"),
}
#TESTING BROTHER
label_brother = context.get("RECIPE_TO_PRINT", "-") + context.get("DD","-") + context.get("MO","-") + context.get("YY","-") + context.get("SN5","-")
barcode = str(label_brother)
for n in range(5):
field = f"labeltxt_{n + 1}"
if field in printer_fields.keys():
if printer_fields[field] != "":
context[field.upper()] = printer_fields[field]
# PRINT MAIN PRODUCT LABEL
if self.config["hardware_config"]["tecna_t3"] == "absent" and self.config["hardware_config"]["furness_controls"] == "absent":
compiled_label = self.components["label_printer_2"].print_label(barcode) # Printing with Brother label printer
else:
compiled_label = self.components["label_printer"].print_label(label, context=context)
self.log.info(f"Main label printed: {context!r}")
# return fields used to print label for saving into test archive
return context
def print_extra_labels(self):
# PRINT EXTRA LABELS IF NEEDED (BEFORE LEAK TEST)
if "extra_label_printer" in self.components.keys() and self.print_step is not None:
printer_fields = self.print_step.spec
if len(printer_fields["extra_label"]) > 0:
labels = printer_fields["extra_label"].split(",")
for label in labels:
self.components["extra_label_printer"].print_label(f"{label}.prn", context=None)
@pyqtSlot(str)
def load_recipe_from_rfid(self, data):
if data not in(None,''):
self.tag_loaded_recipe = data
if self.step.step_type == "barcode_recipe_selection":
if data is not None:
self.cycle_available_steps["barcode_recipe_selection"].widget.get(data)
else:
pass
else:
# fixture removed
self.tag_loaded_recipe = None
self.fail_cycle()
self.change_recipe()
@pyqtSlot(bool)
def handle_rfid_error(self, connected):
if connected:
self.error_label.setText("") # Update the GUI label
else:
self.error_label.setText("Errore RFID.") # Update the GUI label
self.error_label.setStyleSheet("color: red;")