st-ten-1/src/ui/recipe_selection/recipe_selection.py

386 lines
19 KiB
Python
Raw Normal View History

2022-09-20 15:42:59 +00:00
import csv
2022-07-27 12:54:19 +00:00
import os
2022-09-20 15:42:59 +00:00
import re
2022-06-01 16:37:19 +00:00
import sys
2022-09-20 15:42:59 +00:00
import weakref
2022-07-27 12:54:19 +00:00
from glob import glob
2022-06-01 16:37:19 +00:00
2022-09-20 15:42:59 +00:00
from lib.db import Recipes, Steps, Users
2022-07-19 09:59:00 +00:00
from PyQt5.QtCore import QTimer, pyqtSignal
2022-06-01 16:37:19 +00:00
from PyQt5.QtGui import QKeySequence
2022-09-20 15:42:59 +00:00
from PyQt5.QtWidgets import QFileDialog, QShortcut
2022-07-19 09:59:00 +00:00
from ui.crud import Crud, Json_External_Dialog_Editor_Cell_Widget
2022-06-01 16:37:19 +00:00
from ui.helpers import replace_widget
2022-07-25 09:16:14 +00:00
from ui.recipe_spec_and_step_editor import Recipe_Spec_And_Step_Editor
2022-06-01 16:37:19 +00:00
from ui.widget import Widget
2022-09-20 15:42:59 +00:00
class Noner:
def __getitem__(self, key):
return None
2022-06-01 16:37:19 +00:00
class Recipe_Selection(Widget):
ok = pyqtSignal(Recipes)
2022-09-20 15:42:59 +00:00
def __init__(self, config, unsupported_steps=None):
2022-06-01 16:37:19 +00:00
super().__init__()
2022-09-20 15:42:59 +00:00
self.config = config
self.recipe_name_field = self.config.get("recipe_name", "name").strip()
2022-06-01 16:37:19 +00:00
session = Users.get_session()
if session.is_admin:
2022-09-07 15:24:40 +00:00
# readonly = ["id"]
readonly = False
2022-06-01 16:37:19 +00:00
crud_aliases = {
2022-09-07 15:24:40 +00:00
# "id": "Id",
2022-06-01 16:37:19 +00:00
"name": "Ricetta",
2022-07-19 09:59:00 +00:00
"client": "Cliente",
"part_number": "N° disegno",
2022-06-01 16:37:19 +00:00
"spec": "Specifica",
2022-07-26 13:34:09 +00:00
"description": "Descrizione",
2022-09-13 10:39:09 +00:00
# "archived": "Archiviata",
2022-06-01 16:37:19 +00:00
}
filters = None
else:
readonly = True
crud_aliases = {
"name": "Ricetta",
"client": "Cliente",
"part_number": "N° disegno",
"spec": "Specifica",
2022-07-26 13:34:09 +00:00
"description": "Descrizione",
2022-07-19 09:59:00 +00:00
# "archived": "Archiviata",
2022-06-01 16:37:19 +00:00
}
filters = {"archived": False}
2022-09-20 15:42:59 +00:00
with open("config/machine_settings/recipes_defaults.csv", "r") as f:
self.defaults = dict(next(csv.DictReader(f)))
step_defaults = self.read_steps(self.defaults, Noner())
2022-06-01 16:37:19 +00:00
self.crud = Crud(
"recipes",
display_name="SELEZIONE RICETTA",
readonly=readonly,
select=list(crud_aliases.keys()),
filters=filters,
fields_aliases=crud_aliases,
2022-09-20 15:42:59 +00:00
# autocomplete={
# "spec": {
# "step_editors": step_defaults.update({
# "vision": {
# # "recipe": sorted(glob("*.ini", root_dir="./config/vision/recipes/")), # only in python3.10
# "recipe": sorted(map(os.path.basename, glob("./config/vision/recipes/*.ini"))),
# },
# "print": {
# # "template": sorted(glob("*.prn", root_dir="./config/label_templates/")), # only in python3.10
# "template": sorted(map(os.path.basename, glob("./config/label_templates/*.prn"))),
# },
# }),
# },
# "archived": False,
# },
widget_classes={"spec": lambda *args, **kwargs: Json_External_Dialog_Editor_Cell_Widget(
Recipe_Spec_And_Step_Editor,
*args,
**kwargs,
unsupported_steps=unsupported_steps
), },
2022-09-13 10:39:09 +00:00
pagination=25,
2022-06-01 16:37:19 +00:00
)
2022-09-13 10:39:09 +00:00
self.crud.delete_b.setEnabled(False)
self.crud.delete_b.setVisible(False)
2022-06-01 16:37:19 +00:00
replace_widget(self, "crud_w", self.crud)
self.crud_modified = None
self.selected = None
self.select_b.setEnabled(False)
self.select_b.clicked.connect(self.select)
QShortcut(QKeySequence("Return"), self).activated.connect(self.select_b.click)
2022-09-06 15:35:49 +00:00
QShortcut(QKeySequence("Enter"), self).activated.connect(self.select_b.click)
2022-06-01 16:37:19 +00:00
self.crud.modified.connect(self.check_modified)
self.crud.selected.connect(self.check_selected)
self.crud.emit()
2022-09-13 10:39:09 +00:00
self.crud.db_tw.setColumnWidth(0, 200)
2022-07-26 13:47:31 +00:00
self.crud.db_tw.setColumnWidth(1, 200)
self.crud.db_tw.setColumnWidth(2, 200)
self.crud.db_tw.setColumnWidth(3, 200)
self.crud.db_tw.setColumnWidth(4, 200)
self.crud.db_tw.setColumnWidth(5, 400)
2022-09-20 15:42:59 +00:00
self.import_b.clicked.connect(lambda checked, self=weakref.ref(self): self().import_recipes())
self.export_b.clicked.connect(lambda checked, self=weakref.ref(self): self().export_recipes())
2022-06-01 16:37:19 +00:00
# TESTING
if "--auto-select" in sys.argv or "--test" in sys.argv:
2022-09-13 10:43:38 +00:00
recipe = "TEST"
2022-06-01 16:37:19 +00:00
cn = self.crud.select_index["name"]
2022-09-06 13:15:01 +00:00
self.crud.db_tw.clearSelection()
2022-06-01 16:37:19 +00:00
for rn in range(1, self.crud.db_tw.rowCount()):
2022-09-13 10:43:38 +00:00
if self.crud.db_tw.cellWidget(rn, cn).text() == recipe:
2022-06-01 16:37:19 +00:00
selection = self.crud.db_tw.model().index(rn, cn)
self.crud.db_tw.setCurrentIndex(selection)
break
self.test_timer = QTimer()
self.test_timer.setSingleShot(True)
self.test_timer.timeout.connect(self.select_b.clicked.emit)
self.test_timer.start(500)
# /TESTING
def check_modified(self, modified):
self.crud_modified = modified
self.check(self.crud_modified, self.selected)
2022-07-26 14:05:04 +00:00
def check_selected(self, selected=None):
if selected is not None and len(selected) == 1:
2022-06-01 16:37:19 +00:00
selected = selected[0] - 1 # - 1 because rn starts from 1 (filters line)
if selected >= 0 and selected < len(self.crud.data_index):
selected = self.crud.data_index[selected]
2022-07-06 13:54:22 +00:00
if selected is not None:
2022-07-26 14:05:04 +00:00
self.selected = selected
2022-07-06 13:54:22 +00:00
else:
self.selected = None
2022-06-01 16:37:19 +00:00
else:
self.selected = None
self.check(self.crud_modified, self.selected)
def check(self, modified, selected):
self.select_b.setEnabled(modified is False and selected is not None)
def select(self):
if self.selected is not None:
2022-07-26 14:05:04 +00:00
self.ok.emit(self.crud.db.table_model.get_by_id(self.selected))
2022-09-20 15:42:59 +00:00
def read_steps(self, row, defaults=None):
if defaults is None:
defaults = self.defaults
return {
"connector": {
"connector": row.get("connector", defaults["connector"])
},
"barcodes": {
"serial": row.get("barcodes_serial", defaults["barcodes_serial"]),
},
"resistance": {
"scale": float(row.get("resistance_scale", defaults["resistance_scale"])),
"expected": float(row.get("resistance_expected", defaults["resistance_expected"])),
"tolerance": float(row.get("resistance_tolerance", defaults["resistance_tolerance"]))
},
"leak_1": {
"pre_filling_time": int(row.get("leak_1_pre_filling_time", defaults["leak_1_pre_filling_time"])),
"pre_filling_pressure": int(row.get("leak_1_pre_filling_pressure", defaults["leak_1_pre_filling_pressure"])),
"filling_time": int(row.get("leak_1_filling_time", defaults["leak_1_filling_time"])),
"settling_time": int(row.get("leak_1_settling_time", defaults["leak_1_settling_time"])),
"settling_pressure_min_percent": int(row.get("leak_1_settling_pressure_min_percent", defaults["leak_1_settling_pressure_min_percent"])),
"settling_pressure_max_percent": int(row.get("leak_1_settling_pressure_max_percent", defaults["leak_1_settling_pressure_max_percent"])),
"test_time": int(row.get("leak_1_test_time", defaults["leak_1_test_time"])),
"test_pressure_min_delta": int(row.get("leak_1_test_pressure_min_delta", defaults["leak_1_test_pressure_min_delta"])),
"test_pressure": int(row.get("leak_1_test_pressure", defaults["leak_1_test_pressure"])),
"test_pressure_max_delta": int(row.get("leak_1_test_pressure_max_delta", defaults["leak_1_test_pressure_max_delta"])),
"flush_time": int(row.get("leak_1_flush_time", defaults["leak_1_flush_time"])),
"flush_pressure": int(row.get("leak_1_flush_pressure", defaults["leak_1_flush_pressure"]))
},
"leak_2": {
"pre_filling_time": int(row.get("leak_2_pre_filling_time", defaults["leak_2_pre_filling_time"])),
"pre_filling_pressure": int(row.get("leak_2_pre_filling_pressure", defaults["leak_2_pre_filling_pressure"])),
"filling_time": int(row.get("leak_2_filling_time", defaults["leak_2_filling_time"])),
"settling_time": int(row.get("leak_2_settling_time", defaults["leak_2_settling_time"])),
"settling_pressure_min_percent": int(row.get("leak_2_settling_pressure_min_percent", defaults["leak_2_settling_pressure_min_percent"])),
"settling_pressure_max_percent": int(row.get("leak_2_settling_pressure_max_percent", defaults["leak_2_settling_pressure_max_percent"])),
"test_time": int(row.get("leak_2_test_time", defaults["leak_2_test_time"])),
"test_pressure_min_delta": int(row.get("leak_2_test_pressure_min_delta", defaults["leak_2_test_pressure_min_delta"])),
"test_pressure": int(row.get("leak_2_test_pressure", defaults["leak_2_test_pressure"])),
"test_pressure_max_delta": int(row.get("leak_2_test_pressure_max_delta", defaults["leak_2_test_pressure_max_delta"])),
"flush_time": int(row.get("leak_2_flush_time", defaults["leak_2_flush_time"])),
"flush_pressure": int(row.get("leak_2_flush_pressure", defaults["leak_2_flush_pressure"]))
},
"vision": {
"recipe": row.get("vision_recipe", defaults["vision_recipe"]),
},
"print": {
"template": row.get("print_template", defaults["print_template"]),
},
}
def import_recipes(self, csv_path=None, defaults=None):
if defaults is None:
defaults = self.defaults
if csv_path is None:
csv_path, _ = QFileDialog.getOpenFileName(
None,
"Esportazione ricette",
"ricette.csv",
"CSV data (*.csv);;All Files (*)",
)
csv_path = str(csv_path)
if not len(csv_path):
return
self.log.info(f"recipes: importing recipes from {csv_path}")
with open(csv_path, "r") as f:
reader = csv.DictReader(f)
count = 0
for row in reader:
recipe_name = row.get(self.recipe_name_field, row.get(defaults.get(self.recipe_name_field, "name")))
try:
recipe = Recipes.get_by_id(recipe_name)
recipe_is_new = False
except Recipes.DoesNotExist:
recipe = Recipes(name=recipe_name)
recipe_is_new = True
steps = {}
for step_type, step_spec in self.read_steps(row, defaults=defaults).items():
try:
if recipe.spec is None:
raise Steps.DoesNotExist()
step = Steps.get_by_id(recipe.spec.get("available_steps", {}).get(step_type, None))
step_is_new = False
except Steps.DoesNotExist:
step = Steps()
step_is_new = True
step.type = re.sub(r"^(.*)_[0-9]+$", r"\1", step_type)
step.spec = step_spec
if step_is_new:
step.save(force_insert=True)
else:
step.save()
steps[step_type] = step
recipe.client = row.get("client", defaults["client"])
recipe.part_number = row.get("part_number", defaults["part_number"])
recipe.spec = {
"connector": bool(len(row.get("connector_enabled", defaults["connector_enabled"]))),
"barcodes": bool(len(row.get("barcodes_enabled", defaults["barcodes_enabled"]))),
"resistance": bool(len(row.get("resistance_enabled", defaults["resistance_enabled"]))),
"leak_1": bool(len(row.get("leak_1_enabled", defaults["leak_1_enabled"]))),
"leak_2": bool(len(row.get("leak_2_enabled", defaults["leak_2_enabled"]))),
"vision": bool(len(row.get("vision_enabled", defaults["vision_enabled"]))),
"print": bool(len(row.get("print_enabled", defaults["print_enabled"]))),
"steps": [], # should be pks of the enabled steps
"available_steps": {
"connector": steps["connector"].name,
"barcodes": steps["barcodes"].name,
"resistance": steps["resistance"].name,
"leak_1": steps["leak_1"].name,
"leak_2": steps["leak_2"].name,
"vision": steps["vision"].name,
"print": steps["print"].name,
},
}
for step_name, step in recipe.spec["available_steps"].items():
if recipe.spec[step_name]:
recipe.spec["steps"].append(step)
if recipe_is_new:
recipe.save(force_insert=True)
else:
recipe.save()
count += 1
self.log.info(f"recipes: imported {count} rows.")
self.crud.refresh()
def export_recipes(self, csv_path=None):
if csv_path is None:
csv_path, _ = QFileDialog.getSaveFileName(
None,
"Esportazione ricette",
"ricette.csv",
"CSV data (*.csv);;All Files (*)",
)
csv_path = str(csv_path)
if not len(csv_path):
return
if not csv_path.lower().endswith(".csv"):
csv_path += ".csv"
csv_dir = os.path.dirname(csv_path)
if len(csv_dir):
os.makedirs(csv_dir, exist_ok=True)
data = []
fieldnames = [
self.recipe_name_field,
"client",
"part_number",
"connector_enabled",
"connector",
"barcodes_enabled",
"barcodes_serial",
"resistance_enabled",
"resistance_scale",
"resistance_expected",
"resistance_tolerance",
"leak_1_enabled",
"leak_1_pre_filling_time",
"leak_1_pre_filling_pressure",
"leak_1_filling_time",
"leak_1_settling_time",
"leak_1_settling_pressure_min_percent",
"leak_1_settling_pressure_max_percent",
"leak_1_test_time",
"leak_1_test_pressure_min_delta",
"leak_1_test_pressure",
"leak_1_test_pressure_max_delta",
"leak_1_flush_time",
"leak_1_flush_pressure",
"leak_2_enabled",
"leak_2_pre_filling_time",
"leak_2_pre_filling_pressure",
"leak_2_filling_time",
"leak_2_settling_time",
"leak_2_settling_pressure_min_percent",
"leak_2_settling_pressure_max_percent",
"leak_2_test_time",
"leak_2_test_pressure_min_delta",
"leak_2_test_pressure",
"leak_2_test_pressure_max_delta",
"leak_2_flush_time",
"leak_2_flush_pressure",
"vision_enabled",
"vision_recipe",
"print_enabled",
"print_template",
]
for recipe in Recipes.select():
steps = recipe.get_steps_map()
exportable = {
self.recipe_name_field: recipe.name,
"client": recipe.client,
"part_number": recipe.part_number,
"connector_enabled": recipe.spec["connector"],
"connector": steps["connector"].spec["connector"],
"barcodes_enabled": recipe.spec["barcodes"],
"barcodes_serial": steps["barcodes"].spec["serial"],
"resistance_enabled": recipe.spec["resistance"],
"resistance_scale": steps["resistance"].spec["scale"],
"resistance_expected": steps["resistance"].spec["expected"],
"resistance_tolerance": steps["resistance"].spec["tolerance"],
"leak_1_enabled": recipe.spec["leak_1"],
"leak_1_pre_filling_time": steps["leak_1"].spec["pre_filling_time"],
"leak_1_pre_filling_pressure": steps["leak_1"].spec["pre_filling_pressure"],
"leak_1_filling_time": steps["leak_1"].spec["filling_time"],
"leak_1_settling_time": steps["leak_1"].spec["settling_time"],
"leak_1_settling_pressure_min_percent": steps["leak_1"].spec["settling_pressure_min_percent"],
"leak_1_settling_pressure_max_percent": steps["leak_1"].spec["settling_pressure_max_percent"],
"leak_1_test_time": steps["leak_1"].spec["test_time"],
"leak_1_test_pressure_min_delta": steps["leak_1"].spec["test_pressure_min_delta"],
"leak_1_test_pressure": steps["leak_1"].spec["test_pressure"],
"leak_1_test_pressure_max_delta": steps["leak_1"].spec["test_pressure_max_delta"],
"leak_1_flush_time": steps["leak_1"].spec["flush_time"],
"leak_1_flush_pressure": steps["leak_1"].spec["flush_pressure"],
"leak_2_enabled": recipe.spec["leak_2"],
"leak_2_pre_filling_time": steps["leak_2"].spec["pre_filling_time"],
"leak_2_pre_filling_pressure": steps["leak_2"].spec["pre_filling_pressure"],
"leak_2_filling_time": steps["leak_2"].spec["filling_time"],
"leak_2_settling_time": steps["leak_2"].spec["settling_time"],
"leak_2_settling_pressure_min_percent": steps["leak_2"].spec["settling_pressure_min_percent"],
"leak_2_settling_pressure_max_percent": steps["leak_2"].spec["settling_pressure_max_percent"],
"leak_2_test_time": steps["leak_2"].spec["test_time"],
"leak_2_test_pressure_min_delta": steps["leak_2"].spec["test_pressure_min_delta"],
"leak_2_test_pressure": steps["leak_2"].spec["test_pressure"],
"leak_2_test_pressure_max_delta": steps["leak_2"].spec["test_pressure_max_delta"],
"leak_2_flush_time": steps["leak_2"].spec["flush_time"],
"leak_2_flush_pressure": steps["leak_2"].spec["flush_pressure"],
"vision_enabled": recipe.spec["vision"],
"vision_recipe": steps["vision"].spec["recipe"],
"print_enabled": recipe.spec["print"],
"print_template": steps["print"].spec["template"],
}
data.append(exportable)
if len(data):
self.log.info(f"recipes: exporting recipes to {csv_path}")
with open(csv_path, "w", newline="") as f:
w = csv.DictWriter(f, fieldnames, extrasaction="ignore")
w.writeheader()
w.writerows(data)
self.log.info(f"recipes: exported {len(data)} rows.")