st-ten-1/src/main.py

176 lines
7.5 KiB
Python
Raw Normal View History

2022-06-01 16:37:19 +00:00
#!/usr/bin/env python3
import faulthandler
import logging
import os
# import pdb
import signal
import sys
import traceback
2022-07-05 16:16:22 +00:00
import weakref
2022-06-01 16:37:19 +00:00
from datetime import datetime
from pathlib import Path
2022-07-04 10:36:51 +00:00
app = None
2022-06-01 16:37:19 +00:00
2022-06-29 11:04:31 +00:00
def quit_app(signalnum=None, handler=None):
logging.info(f"quitting app. signal: {signalnum!r}, handler: {handler!r}")
2022-06-01 16:37:19 +00:00
global app
2022-07-04 10:36:51 +00:00
if app is not None:
app.quit()
2022-06-01 16:37:19 +00:00
quit()
# SETUP QUITTING ON CTRL+C
signal.signal(signal.SIGINT, quit_app)
# SETUP FAULTHANDLER
faulthandler.enable(file=sys.stderr, all_threads=True)
# SETUP LOGS
logs_dir = Path(".") / "data" / "logs"
os.makedirs(logs_dir, exist_ok=True)
logging.basicConfig(
format="{asctime}:{name}:{levelname}:{message}",
datefmt="%Y-%m-%dT%H-%M-%S%z",
style="{",
level="INFO",
handlers=[
logging.StreamHandler(stream=sys.stderr),
logging.FileHandler(
2022-07-04 10:36:51 +00:00
logs_dir / f"{datetime.now().isoformat().replace(':', ';')}.log",
2022-06-01 16:37:19 +00:00
mode="a",
encoding="utf-8",
delay=False,
2022-07-06 11:23:46 +00:00
**({"errors": "surrogateescape"} if sys.version_info.major >= 3 and sys.version_info.minor >= 10 else {}),
2022-06-01 16:37:19 +00:00
),
],
force=True,
2022-07-06 11:23:46 +00:00
**({"encoding": "utf-8"} if sys.version_info.major >= 3 and sys.version_info.minor >= 10 else {}),
**({"errors": "surrogateescape"} if sys.version_info.major >= 3 and sys.version_info.minor >= 10 else {}),
2022-06-01 16:37:19 +00:00
)
try:
# IMPORT PROJECT ONLY AFTER SETTING UP SIGNAL, FAULTHANDLER AND LOGGHING
2022-07-06 09:54:44 +00:00
from components import (ArchiveSynchronizer, GalaxyCamera, NeoPixels,
Os_Label_Printer, RemoteAPI,
TecnaMarpossProvasetT3, TestComponent, Vision,
VisionSaver)
2022-06-01 16:37:19 +00:00
from lib.db import Users
from lib.helpers import ConfigReader
from PyQt5.QtCore import QObject, QThread, pyqtSignal
from PyQt5.QtWidgets import QApplication, QMessageBox
from ui import (About, Archive, Autotests_Archive, Login, Main_Window,
Test, Users_Management)
class Main(QObject):
do = pyqtSignal(dict)
@staticmethod
def _do(config):
return config["f"](*config.get("a", []), **config.get("k", {}))
def __init__(self, parent=None):
# print(f"MAIN {int(QThread.currentThreadId())}", flush=True)
super().__init__()
self.do.connect(self._do)
try:
# READ CONFIG
self.config = ConfigReader()
# INIT COMPONENT
self.components_specs = {
2022-06-08 07:11:38 +00:00
"archive_synchronizer": {"c": ArchiveSynchronizer},
2022-06-22 15:18:29 +00:00
"galaxy_camera": {"c": GalaxyCamera, "k": {"paused": True}},
2022-06-08 07:11:38 +00:00
"label_printer": {"c": Os_Label_Printer, "t": False},
2022-06-29 11:04:31 +00:00
"neo_pixels": {"c": NeoPixels, "t": False},
2022-06-21 12:10:52 +00:00
"remote_api": {"c": RemoteAPI, "k": {"main": self}},
2022-07-12 08:48:04 +00:00
"tecna_t3": {"c": TecnaMarpossProvasetT3, "k": {"paused": True}},
2022-06-21 12:10:52 +00:00
"vision_saver": {"c": VisionSaver, "t": False},
2022-06-22 15:18:29 +00:00
"vision": {"c": Vision, "k": {"paused": True}},
2022-06-01 16:37:19 +00:00
}
self.components = {}
self.threads = {}
for component_name, spec in self.components_specs.items():
2022-06-08 07:11:38 +00:00
self.components[component_name] = spec["c"](*spec.get("a", []), config=self.config, name=component_name, **spec.get("k", {}))
if spec.get("t", True):
2022-06-01 16:37:19 +00:00
self.threads[component_name] = QThread()
self.threads[component_name].setTerminationEnabled(True)
self.components[component_name].moveToThread(self.threads[component_name])
for component_name, thread in self.threads.items():
component = self.components[component_name]
thread.started.connect(component.start)
thread.start()
2022-06-21 12:10:52 +00:00
if component_name == "vision":
component.wait_ready(timeout=60)
else:
component.wait_ready()
2022-06-01 16:37:19 +00:00
except Exception as e:
logging.exception(traceback.format_exc())
QMessageBox.critical(None, "Errore Banco", f"Non e stato possibile connettersi al banco:\n\n{e}")
quit()
2022-06-22 15:18:29 +00:00
# connect camera frames to vision
2022-06-29 11:04:31 +00:00
if "vision" in self.components and "galaxy_camera" in self.components:
self.components["vision"].set_sources({"galaxy_camera": self.components["galaxy_camera"].out})
2022-06-01 16:37:19 +00:00
# GUI INIT
2022-06-29 11:04:31 +00:00
if "--no-gui" not in sys.argv:
# self.main_window = Main_Window(self.bench)
self.main_window = Main_Window()
# CONNECT MAIN WINDOW ACTIONS
2022-07-05 16:16:22 +00:00
self.main_window.archive_a.triggered.connect(lambda checked, self=weakref.ref(self): self().main_window.open_dialog(Archive()))
2022-06-29 11:04:31 +00:00
if "--archive" in sys.argv:
self.main_window.archive_a.trigger()
2022-07-05 16:16:22 +00:00
self.main_window.autotests_archive_a.triggered.connect(lambda checked, self=weakref.ref(self): self().main_window.open_dialog(Autotests_Archive()))
2022-06-29 11:04:31 +00:00
if "--autotests-archive" in sys.argv:
self.main_window.autotests_archive_a.trigger()
2022-07-05 16:16:22 +00:00
self.main_window.about_a.triggered.connect(lambda checked, self=weakref.ref(self): self().main_window.open_dialog(About()))
2022-06-29 11:04:31 +00:00
if "--about" in sys.argv:
self.main_window.about_a.trigger()
self.main_window.admin_m.menuAction().setVisible(False) # admin menu should not be visible before an admin logs in
self.main_window.quit_a.triggered.connect(quit_app)
2022-07-05 16:16:22 +00:00
self.main_window.users_management_a.triggered.connect(lambda checked, self=weakref.ref(self): self().main_window.open_dialog(Users_Management()))
2022-06-29 11:04:31 +00:00
if "--users-management" in sys.argv:
self.main_window.users_management_a.trigger()
# OPEN LOGIN TAB
self.open_login()
# SHOW MAIN WINDOW
if "--panel" in sys.argv:
self.main_window.show()
elif "--maximized" in sys.argv:
self.main_window.showMaximized()
elif "--full-screen" in sys.argv:
self.main_window.showFullScreen()
else:
self.main_window.showFullScreen()
2022-06-01 16:37:19 +00:00
def open_login(self):
tab = Login()
tab.successful_login.connect(self.logghed_in)
self.main_window.open_tab(tab)
def logghed_in(self):
session = Users.get_session()
if session is not None:
if session.is_admin:
self.main_window.admin_m.menuAction().setVisible(True)
else:
self.main_window.admin_m.menuAction().setVisible(False)
2022-07-05 16:16:22 +00:00
# open test
self.main_window.open_tab(Test(self.config.machine_id, self.components))
2022-06-29 11:04:31 +00:00
2022-06-01 16:37:19 +00:00
if __name__ == "__main__":
app = QApplication(sys.argv)
main = Main()
2022-06-29 11:04:31 +00:00
if "--no-gui" not in sys.argv:
app.exec()
if "--interact" in sys.argv:
import code
import readline
variables = globals().copy()
variables.update(locals())
shell = code.InteractiveConsole(variables)
shell.interact()
2022-06-01 16:37:19 +00:00
except Exception:
logging.exception(traceback.format_exc())
# extype, value, tb = sys.exc_info()
# pdb.post_mortem(tb)