#!/usr/bin/env python3 import argparse import faulthandler import logging import os import platform import signal import sys import traceback import weakref from datetime import datetime from pathlib import Path from ui.diagnostics import Diagnostics from lib.helpers.single_process import SingleProcess from ui.logs_management.info import Logs_Management if platform.system().lower() == "windows": sys.path.append(f"{os.getcwd()}\src\components") else: sys.path.append(f"{os.getcwd()}/src/components") app = None parser = argparse.ArgumentParser(prog='ST-TEN', description='Leak test system') parser.add_argument('-s', '--system-id') args, unspec = parser.parse_known_args() def quit_app(signalnum=None, handler=None): logging.info(f"quitting app. signal: {signalnum!r}, handler: {handler!r}") global app if app is not None: app.quit() quit() # SETUP QUITTING ON CTRL+C signal.signal(signal.SIGINT, quit_app) # SETUP FAULTHANDLER faulthandler.enable(file=sys.stderr, all_threads=True) # SETUP LOGS logs_dir = Path(".") / "data" / "logs" os.makedirs(logs_dir, exist_ok=True) logging.basicConfig( format="{asctime}:{name}:{levelname}:{message}", datefmt="%Y-%m-%d_%H-%M-%S", style="{", level="DEBUG" if "--debug" in sys.argv else "INFO", handlers=[ logging.StreamHandler(stream=sys.stderr), logging.FileHandler( logs_dir / f"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.log", mode="a", encoding="utf-8", delay=False, **({"errors": "surrogateescape"} if sys.version_info.major >= 3 and sys.version_info.minor >= 10 else {}), ), ], force=True, **({"encoding": "utf-8"} if sys.version_info.major >= 3 and sys.version_info.minor >= 10 else {}), **({"errors": "surrogateescape"} if sys.version_info.major >= 3 and sys.version_info.minor >= 10 else {}), ) try: # IMPORT PROJECT ONLY AFTER SETTING UP SIGNAL, FAULTHANDLER AND LOGGING from components import (ArchiveSynchronizer, Multicomp730424, Os_Label_Printer, RemoteAPI, TecnaMarpossProvasetT3, FurnessControlsLeakTester, TecnaScrewdriver, USB_586x, RFID_PN532,BrotherLabelPrinter) from lib.db import Users from lib.helpers import ConfigReader from PyQt5.QtCore import QObject, QThread, pyqtSignal, pyqtSlot from PyQt5.QtWidgets import QApplication, QMessageBox from ui import About, Archive, Login, Main_Window, Test, Users_Management,Logs_Management ,Recipe_Selection, \ Barcode_Recipe_Selection if "--vision" in sys.argv: from components import GalaxyCamera, NeoPixels, UVCCamera, Vision, VisionSaver class Main(QObject): do = pyqtSignal(dict) @staticmethod def _do(config): return config["f"](*config.get("a", []), **config.get("k", {})) def __init__(self, parent=None): # print(f"MAIN {int(QThread.currentThreadId())}", flush=True) super().__init__() self.tag_loaded_recipe = None self.do.connect(self._do) try: # READ CONFIG system_id = args.system_id if "system_id" in args else None self.config = ConfigReader(system_id=system_id) logging.info(f"STARTING SESSION ON MACHINE {self.config['machine']['description']}") self.config["autotest_done"] = False # INIT COMPONENT self.components_specs = { "archive_synchronizer": {"c": ArchiveSynchronizer}, "archive_synchronizer_extra": {"c": ArchiveSynchronizer}, "label_printer": {"c": Os_Label_Printer, "t": False}, "extra_label_printer": {"c": Os_Label_Printer, "t": False}, "label_printer_2": {"c": BrotherLabelPrinter, "t": False}, "multicomp": {"c": Multicomp730424, "k": {"paused": True}}, "remote_api": {"c": RemoteAPI, "k": {"main": self}}, "screwdriver": {"c": TecnaScrewdriver, "k": {"paused": True}}, "tecna_t3": {"c": TecnaMarpossProvasetT3, "k": {"paused": True}}, "furness_controls": {"c": FurnessControlsLeakTester, "k": {"paused": True}}, "digital_io": {"c": USB_586x, "k": {"paused": True}}, "digital_io_flush_blow": {"c": USB_586x, "k": {"paused": True}}, "fixture_id": {"c": RFID_PN532, "k": {"paused": False, "lazy": False}}, } # VISION COMPONENT IS OPTIONAL AND DISABLED BY DEFAULT if "--vision" in sys.argv: # merge dicts self.components_specs = {**self.components_specs, **{ "galaxy_camera": {"c": GalaxyCamera, "k": {"paused": True}}, "neo_pixels": {"c": NeoPixels, "t": False}, "uvc_camera": {"c": UVCCamera, "k": {"paused": True}}, "vision_saver": {"c": VisionSaver, "t": False}, "vision": {"c": Vision, "k": {"paused": True}} } } for component_name in list(self.components_specs): if self.config.get("hardware_config", {}).get(component_name, None) != "present": self.components_specs.pop(component_name, None) elif component_name not in self.components_specs: raise AssertionError(f"{component_name!r} is not a valid component name") self.components = {} self.threads = {} for component_name, spec in self.components_specs.items(): self.components[component_name] = spec["c"](*spec.get("a", []), config=self.config, name=component_name, **spec.get("k", {})) if spec.get("t", True): self.threads[component_name] = QThread() self.threads[component_name].setTerminationEnabled(True) self.components[component_name].moveToThread(self.threads[component_name]) if "fixture_id" in self.components.keys(): self.components["fixture_id"].new_id_signal.connect(self.load_recipe_from_rfid) for component_name, thread in self.threads.items(): component = self.components[component_name] thread.started.connect(component.start) thread.start() # DEBUGGER WORKAROUND QApplication.processEvents() QThread.msleep(1000) QApplication.processEvents() if component_name == "vision": component.wait_completion(timeout=60) else: component.wait_completion() except Exception as e: logging.exception(traceback.format_exc()) QMessageBox.critical(None, "Errore", f"Errore di avvio del programma di collaudo:\n\n{e}") quit() # connect camera frames to vision if "vision" in self.components and "uvc_camera" in self.components: self.components["vision"].set_sources({"uvc_camera": self.components["uvc_camera"].out}) elif "vision" in self.components and "galaxy_camera" in self.components: self.components["vision"].set_sources({"galaxy_camera": self.components["galaxy_camera"].out}) # connect tecna to screwdriver if "screwdriver" in self.components and "tecna_t3" in self.components: self.components["tecna_t3"].set_requestors({"screwdriver": self.components["screwdriver"].request}) self.components["screwdriver"].set_sources({"tecna_t3": self.components["tecna_t3"].out}) # GUI INIT if "--no-gui" not in sys.argv: self.main_window = Main_Window() # CONNECT MAIN WINDOW ACTIONS self.main_window.logout_a.triggered.connect(lambda checked, selfie=weakref.ref(self): selfie().logout()) self.main_window.archive_a.triggered.connect( lambda checked, selfie=weakref.ref(self): selfie().main_window.open_dialog( Archive(hide_cloud_image="vision_saver" not in selfie().components))) if "--archive" in sys.argv: self.main_window.archive_a.trigger() if "--about" in sys.argv: self.main_window.about_a.trigger() # admin menu should not be visible before an admin logs in self.main_window.admin_m.menuAction().setVisible(False) self.main_window.about_a.triggered.connect( lambda checked, selfie=weakref.ref(self): selfie().main_window.open_dialog(About())) self.main_window.download_a.triggered.connect( lambda checked, selfie=weakref.ref(self): selfie().main_window.open_dialog(Logs_Management())) self.main_window.quit_a.triggered.connect(quit_app) self.main_window.users_management_a.triggered.connect( lambda checked, selfie=weakref.ref(self): selfie().main_window.open_dialog(Users_Management())) self.main_window.table_selection_a.triggered.connect(self.set_recipe_mode_table) self.main_window.barcode_selection_a.triggered.connect(self.set_recipe_mode_barcode) self.main_window.ristampa_etichetta_a.triggered.connect(self.reprint_label) self.main_window.diagnostics_a.triggered.connect( lambda checked, selfie=weakref.ref(self): selfie().main_window.open_dialog(Diagnostics(selfie()))) if "--users-management" in sys.argv: self.main_window.users_management_a.trigger() # CONFIG-SPECIFIC MENU ENTRY ACTIVATION if "tecna_t3" in self.components and ( "--enable-saving-tecna-recipes" in sys.argv or self.config.get("tecna_t3", {}).get("saver", None) == "present"): self.main_window.save_tecna_recipes_a.triggered.connect(self.components["tecna_t3"].store_recipes) self.main_window.save_tecna_recipes_a.setVisible(True) if "--save-tecna-recipes" in sys.argv: self.main_window.save_tecna_recipes_a.trigger() else: self.main_window.save_tecna_recipes_a.setVisible(False) self.main_window.barcode_selection_a.setVisible( self.config["hardware_config"]["barcode_recipe_selection"] == "present") # OPEN LOGIN TAB self.open_login() # SHOW MAIN WINDOW if "--panel" in sys.argv: self.main_window.show() elif "--maximized" in sys.argv: self.main_window.showMaximized() elif "--full-screen" in sys.argv: self.main_window.showFullScreen() else: self.main_window.showFullScreen() def open_login(self): tab = Login() tab.successful_login.connect(self.logged_in) self.main_window.open_tab(tab) def logged_in(self): session = Users.get_session() if session is not None: if session.is_admin: self.main_window.admin_m.menuAction().setVisible(True) else: self.main_window.admin_m.menuAction().setVisible(False) # open test self.main_window.open_tab(Test(self.config, self.components, self)) self.main_window.centralWidget().request_autotest("login") else: self.main_window.admin_m.menuAction().setVisible(False) def logout(self): # Users.logout() self.main_window.admin_m.menuAction().setVisible(False) if type(self.main_window.centralWidget().centralWidget.widget) in ( Recipe_Selection, Barcode_Recipe_Selection): # LOGOUT IMMEDIATELY IF NOT TESTING Users.logout() ArchiveSynchronizer.machine_status = "logged-out" self.open_login() else: if not self.main_window.centralWidget().autotesting: # LOGOUT AFTER AUTOTEST IF TESTING self.main_window.centralWidget().request_autotest("logout") else: # LOGOUT IMMEDIATELY IF AUTOTESTING Users.logout() self.open_login() def set_recipe_mode_table(self): self.main_window.centralWidget().set_recipe_mode_table() def set_recipe_mode_barcode(self): self.main_window.centralWidget().set_recipe_mode_barcode() def reprint_label(self): self.main_window.centralWidget().reprint_label() @pyqtSlot(str) def load_recipe_from_rfid(self, data): self.tag_loaded_recipe = data if __name__ == "__main__": app = QApplication(sys.argv) with SingleProcess() as single_process_lock: if not single_process_lock and "--no-lock" not in sys.argv: logging.error(f"Program already opened, exiting...") QMessageBox.critical(None, "ERRORE", "IL PROGRAMMA E' GIA' IN ESECUZIONE") exit(0) main = Main() if "--no-gui" not in sys.argv: app.exec() if "--interact" in sys.argv: import code import readline variables = globals().copy() variables.update(locals()) shell = code.InteractiveConsole(variables) shell.interact() except Exception: logging.exception(traceback.format_exc()) # extype, value, tb = sys.exc_info() # pdb.post_mortem(tb)