st-ten-1/src/ui/recipe_selection/recipe_selection.py
2022-11-04 15:17:32 +01:00

441 lines
23 KiB
Python
Executable File

import csv
import itertools
import os
import re
import sys
import weakref
from glob import glob
from lib.db import Recipes, Steps, Users, db
from PyQt5.QtCore import QTimer, pyqtSignal
from PyQt5.QtGui import QKeySequence
from PyQt5.QtWidgets import QFileDialog, QMessageBox, QShortcut
from ui.crud import Crud, Json_External_Dialog_Editor_Cell_Widget
from ui.helpers import replace_widget
from ui.recipe_spec_and_step_editor import Recipe_Spec_And_Step_Editor
from ui.widget import Widget
class Noner:
def __getitem__(self, key):
return None
class Recipe_Selection(Widget):
ok = pyqtSignal(Recipes)
def __init__(self, config, unsupported_steps=None):
super().__init__()
self.config = config
self.unsupported_steps = unsupported_steps
session = Users.get_session()
if session.is_admin:
# readonly = ["id"]
readonly = False
crud_aliases = {
# "id": "Id",
"name": "Ricetta",
"client": "Cliente",
"part_number": "N° disegno",
"spec": "Specifica",
"description": "Descrizione",
# "archived": "Archiviata",
}
filters = None
else:
readonly = True
crud_aliases = {
"name": "Ricetta",
"client": "Cliente",
"part_number": "N° disegno",
"spec": "Specifica",
"description": "Descrizione",
# "archived": "Archiviata",
}
filters = {"archived": False}
with open("config/machine_settings/recipes_defaults.csv", "r") as f:
self.defaults = dict(next(csv.DictReader(f)))
step_defaults = self.read_steps(self.defaults, Noner())
step_defaults.update({
"vision": {
# "recipe": sorted(glob("*.ini", root_dir="./config/vision/recipes/")), # only in python3.10
"recipe": sorted(map(os.path.basename, glob("./config/vision/recipes/*.ini"))),
},
"print": {
# "template": sorted(glob("*.prn", root_dir="./config/label_templates/")), # only in python3.10
"template": sorted(map(os.path.basename, glob("./config/label_templates/*.prn"))),
},
}),
self.crud = Crud(
"recipes",
display_name="SELEZIONE RICETTA",
readonly=readonly,
select=list(crud_aliases.keys()),
filters=filters,
fields_aliases=crud_aliases,
autocomplete={
"spec": {
"step_editors": step_defaults,
},
"archived": False,
},
sort={"name": True},
widget_classes={"spec": lambda *args, **kwargs: Json_External_Dialog_Editor_Cell_Widget(
Recipe_Spec_And_Step_Editor,
*args,
**kwargs,
unsupported_steps=self.unsupported_steps
), },
pagination=25,
)
replace_widget(self, "crud_w", self.crud)
self.crud_modified = None
self.selected = None
self.select_b.setEnabled(False)
self.select_b.clicked.connect(self.select)
QShortcut(QKeySequence("Return"), self).activated.connect(self.select_b.click)
QShortcut(QKeySequence("Enter"), self).activated.connect(self.select_b.click)
self.crud.modified.connect(self.check_modified)
self.crud.selected.connect(self.check_selected)
self.crud.emit()
self.crud.db_tw.setColumnWidth(0, 200)
self.crud.db_tw.setColumnWidth(1, 200)
self.crud.db_tw.setColumnWidth(2, 200)
self.crud.db_tw.setColumnWidth(3, 200)
self.crud.db_tw.setColumnWidth(4, 200)
self.crud.db_tw.setColumnWidth(5, 400)
if session.is_admin:
self.import_b.clicked.connect(lambda checked, self=weakref.ref(self): self().import_recipes())
self.export_b.clicked.connect(lambda checked, self=weakref.ref(self): self().export_recipes())
self.delete_all_b.clicked.connect(lambda checked, self=weakref.ref(self): self().delete_recipes())
else:
self.import_b.setVisible(False)
self.export_b.setVisible(False)
self.delete_all_b.setVisible(False)
# TESTING
if "--auto-select" in sys.argv or "--test" in sys.argv:
recipe = "TEST"
cn = self.crud.select_index["name"]
self.crud.db_tw.clearSelection()
for rn in range(1, self.crud.db_tw.rowCount()):
if self.crud.db_tw.cellWidget(rn, cn).text() == recipe:
selection = self.crud.db_tw.model().index(rn, cn)
self.crud.db_tw.setCurrentIndex(selection)
break
self.test_timer = QTimer()
self.test_timer.setSingleShot(True)
self.test_timer.timeout.connect(self.select_b.clicked.emit)
self.test_timer.start(500)
# /TESTING
def check_modified(self, modified):
self.crud_modified = modified
self.check(self.crud_modified, self.selected)
def check_selected(self, selected=None):
if selected is not None and len(selected) == 1:
selected = selected[0] - 1 # - 1 because rn starts from 1 (filters line)
if selected >= 0 and selected < len(self.crud.data_index):
selected = self.crud.data_index[selected]
if selected is not None:
self.selected = selected
else:
self.selected = None
else:
self.selected = None
self.check(self.crud_modified, self.selected)
def check(self, modified, selected):
self.select_b.setEnabled(modified is False and selected is not None)
def select(self):
if self.selected is not None:
self.ok.emit(self.crud.db.table_model.get_by_id(self.selected))
def read_steps(self, row, defaults=None):
if defaults is None:
defaults = self.defaults
barcode_serial_field = self.config.get("recipe", {}).get("barcode_serial_field", "codice_a_barre").strip()
description_field = self.config.get("recipe", {}).get("description_field", "codice_a_barre").strip()
rcsv=row.get("r nominale", defaults["r nominale"]).replace(" ", "").replace(",", ".").replace("Ω", "").replace("?", "")
if rcsv=="":
rcsv="0"
print_template_field = self.config.get("recipe", {}).get("label_template_field", "modello_etichetta").strip()
return {
"count": {
"amount": row.get("dimensione_lotto", defaults["dimensione_lotto"]),
},
"connector": {
"connector": row.get("connettore", defaults["connettore"]),
},
"barcodes": {
"serial": row.get(barcode_serial_field, defaults["codice_a_barre"]),
},
"resistance": {
"scale": float(row.get("scala_resistenza", defaults["scala_resistenza"])),
"expected": float(rcsv),
"tolerance": float(row.get("tolleranza_resistenza", defaults["tolleranza_resistenza"])),
},
"screws": {
"quantity": row.get("viti", defaults["viti"])
},
"leak_1": {
"pre_filling_time": int(row.get("tempo_pre_riempimento", defaults["tempo_pre_riempimento"])),
"pre_filling_pressure": int(row.get("pressione_pre_riempimento", defaults["pressione_pre_riempimento"])),
"filling_time": int(row.get("tempo_riempimento", defaults["tempo_riempimento"])),
"settling_time": int(row.get("tempo_assestamento", defaults["tempo_assestamento"])),
"settling_pressure_min_percent": int(row.get("percentuale_minima_pressione_assestamento", defaults["percentuale_minima_pressione_assestamento"])),
"settling_pressure_max_percent": int(row.get("percentuale_massima_pressione_assestamento", defaults["percentuale_massima_pressione_assestamento"])),
"test_time": int(row.get("tempo_di_test", defaults["tempo_di_test"])),
"test_pressure_min_delta": int(row.get("pressione_di_test_delta_minimo", defaults["pressione_di_test_delta_minimo"])),
"test_pressure": int(row.get("pressione_di_test", defaults["pressione_di_test"])),
"test_pressure_max_delta": int(row.get("pressione_di_test_delta_massimo", defaults["pressione_di_test_delta_massimo"])),
"flush_time": int(row.get("tempo_svuotamento", defaults["tempo_svuotamento"])),
"flush_pressure": int(row.get("pressione_svuotmento", defaults["pressione_svuotmento"])),
},
"leak_2": {
"pre_filling_time": int(row.get("tempo_pre_riempimento_2", defaults["tempo_pre_riempimento_2"])),
"pre_filling_pressure": int(row.get("pressione_pre_riempimento_2", defaults["pressione_pre_riempimento_2"])),
"filling_time": int(row.get("tempo_riempimento_2", defaults["tempo_riempimento_2"])),
"settling_time": int(row.get("tempo_assestamento_2", defaults["tempo_assestamento_2"])),
"settling_pressure_min_percent": int(row.get("percentuale_minima_pressione_assestamento_2", defaults["percentuale_minima_pressione_assestamento_2"])),
"settling_pressure_max_percent": int(row.get("percentuale_massima_pressione_assestamento_2", defaults["percentuale_massima_pressione_assestamento_2"])),
"test_time": int(row.get("tempo_di_test_2", defaults["tempo_di_test_2"])),
"test_pressure_min_delta": int(row.get("pressione_di_test_delta_minimo_2", defaults["pressione_di_test_delta_minimo_2"])),
"test_pressure": int(row.get("pressione_di_test_2", defaults["pressione_di_test_2"])),
"test_pressure_max_delta": int(row.get("pressione_di_test_delta_massimo_2", defaults["pressione_di_test_delta_massimo_2"])),
"flush_time": int(row.get("tempo_svuotamento_2", defaults["tempo_svuotamento_2"])),
"flush_pressure": int(row.get("pressione_svuotmento_2", defaults["pressione_svuotmento_2"])),
},
"vision": {
"recipe": row.get("ricetta_visione", defaults["ricetta_visione"]),
},
"print": {
"template": row.get(print_template_field, defaults["modello_etichetta"]),
},
}
def import_recipes(self, csv_path=None, defaults=None):
if defaults is None:
defaults = self.defaults
if csv_path is None:
csv_path, _ = QFileDialog.getOpenFileName(
None,
"Esportazione ricette",
"ricette.csv",
"CSV data (*.csv);;All Files (*)",
)
csv_path = str(csv_path)
if not len(csv_path):
return
self.log.info(f"recipes: importing recipes from {csv_path}")
recipe_name_field = self.config.get("recipe", {}).get("recipe_name_field", "codice_ricetta").strip()
part_number_field = self.config.get("recipe", {}).get("part_number_field", "part number").strip()
description_field = self.config.get("description", {}).get("description_field", "descrizione").strip()
barcode_enable_field = self.config.get("recipe", {}).get("barcode_enable_field", "verifica_codice_a_barre_abilitata").strip()
def lower_first(iterator):
return itertools.chain([next(iterator).lower()], iterator)
with open(csv_path, "r") as f:
reader = csv.DictReader(lower_first(f))
count = 0
for row in reader:
recipe_name = row.get(recipe_name_field, defaults["codice_ricetta"])
steps_specs = self.read_steps(row, defaults=defaults)
try:
recipe = Recipes.get_by_id(recipe_name)
steps = recipe.get_steps_map()
recipe_is_new = False
except Recipes.DoesNotExist:
recipe = Recipes(name=recipe_name, part_number="TEMPORARY")
steps = {}
for step_type, step_spec in steps_specs.items():
step = Steps()
steps[step_type] = step
recipe_is_new = True
for step_name, step_spec in steps_specs.items():
step = steps[step_name]
step.type = re.sub(r"^(.*)_[0-9]+$", r"\1", step_name)
step.spec = step_spec
if recipe_is_new:
step.save(force_insert=True)
else:
step.save()
recipe.client = row.get("cliente", defaults["cliente"])
recipe.part_number = row.get(part_number_field, defaults["part_number"])
recipe.description = row.get(description_field, defaults["descrizione"])
recipe.spec = {
"count": len(row.get("dimensione_lotto_abilitata", defaults["dimensione_lotto_abilitata"])) and "count" not in self.unsupported_steps,
"connector": len(row.get("verifica_connettore_abilitata", defaults["verifica_connettore_abilitata"])) and "connector" not in self.unsupported_steps,
"barcodes": len(row.get(barcode_enable_field, defaults["verifica_codice_a_barre_abilitata"])) and "barcodes" not in self.unsupported_steps,
"resistance": len(row.get("verifica_resistenza_connettore_abilitata", defaults["verifica_resistenza_connettore_abilitata"])) and "resistance" not in self.unsupported_steps,
"screws": len(row.get("avvitatura_abilitata", defaults["avvitatura_abilitata"])) and "screws" not in self.unsupported_steps,
"leak_1": len(row.get("prova_tenuta_abilitata", defaults["prova_tenuta_abilitata"])) and "leak" not in self.unsupported_steps,
"leak_2": len(row.get("prova_tenuta_abilitata_2", defaults["prova_tenuta_abilitata_2"])) and "leak" not in self.unsupported_steps,
"vision": len(row.get("test_visione_abilitato", defaults["test_visione_abilitato"])) and "vision" not in self.unsupported_steps,
"print": len(row.get("stampa_etichetta_abilitata", defaults["stampa_etichetta_abilitata"])) and "print" not in self.unsupported_steps,
"steps": [], # should be pks of the enabled steps
"available_steps": {
"count": steps["count"].get_id(),
"connector": steps["connector"].get_id(),
"barcodes": steps["barcodes"].get_id(),
"resistance": steps["resistance"].get_id(),
"screws": steps["screws"].get_id(),
"leak_1": steps["leak_1"].get_id(),
"leak_2": steps["leak_2"].get_id(),
"vision": steps["vision"].get_id(),
"print": steps["print"].get_id(),
},
}
for step_name, step in recipe.spec["available_steps"].items():
if recipe.spec[step_name]:
recipe.spec["steps"].append(step)
if recipe_is_new:
recipe.save(force_insert=True)
else:
recipe.save()
count += 1
db.commit()
self.log.info(f"recipes: imported {count} rows.")
self.crud.refresh()
def export_recipes(self, csv_path=None):
if csv_path is None:
csv_path, _ = QFileDialog.getSaveFileName(
None,
"Esportazione ricette",
"ricette.csv",
"CSV data (*.csv);;All Files (*)",
)
csv_path = str(csv_path)
if not len(csv_path):
return
if not csv_path.lower().endswith(".csv"):
csv_path += ".csv"
csv_dir = os.path.dirname(csv_path)
if len(csv_dir):
os.makedirs(csv_dir, exist_ok=True)
recipe_name_field = self.config.get("recipe", {}).get("recipe_name_field", "codice_ricetta").strip()
barcode_enable_field = self.config.get("recipe", {}).get("barcode_enable_field", "verifica_codice_a_barre_abilitata").strip()
barcode_serial_field = self.config.get("recipe", {}).get("barcode_serial_field", "codice_a_barre").strip()
print_template_field = self.config.get("recipe", {}).get("label_template_field", "modello_etichetta").strip()
data = []
fieldnames = [
recipe_name_field,
"cliente",
"part_number",
"dimensione_lotto_abilitata",
"dimensione_lotto",
"verifica_connettore_abilitata",
"connettore",
barcode_enable_field,
barcode_serial_field,
"verifica_resistenza_connettore_abilitata",
"scala_resistenza",
"r nominale",
"tolleranza_resistenza",
"avvitatura_abilitata",
"viti",
"prova_tenuta_abilitata",
"tempo_pre_riempimento",
"pressione_pre_riempimento",
"tempo_riempimento",
"tempo_assestamento",
"percentuale_minima_pressione_assestamento",
"percentuale_massima_pressione_assestamento",
"tempo_di_test",
"pressione_di_test_delta_minimo",
"pressione_di_test",
"pressione_di_test_delta_massimo",
"tempo_svuotamento",
"pressione_svuotmento",
"prova_tenuta_abilitata_2",
"tempo_pre_riempimento_2",
"pressione_pre_riempimento_2",
"tempo_riempimento_2",
"tempo_assestamento_2",
"percentuale_minima_pressione_assestamento_2",
"percentuale_massima_pressione_assestamento_2",
"tempo_di_test_2",
"pressione_di_test_delta_minimo_2",
"pressione_di_test_2",
"pressione_di_test_delta_massimo_2",
"tempo_svuotamento_2",
"pressione_svuotmento_2",
"test_visione_abilitato",
"ricetta_visione",
"stampa_etichetta_abilitata",
print_template_field,
]
for recipe in Recipes.select():
steps = recipe.get_steps_map()
exportable = {
recipe_name_field: recipe.name,
"cliente": recipe.client,
"part_number": recipe.part_number,
#"dimensione_lotto_abilitata": "x" if recipe.spec["count"] else "",
#"dimensione_lotto": steps["count"].spec["amount"],
"verifica_connettore_abilitata": "x" if recipe.spec["connector"] else "",
"connettore": steps["connector"].spec["connector"],
barcode_enable_field: "x" if recipe.spec["barcodes"] else "",
barcode_serial_field: steps["barcodes"].spec["serial"],
"verifica_resistenza_connettore_abilitata": "x" if recipe.spec["resistance"] else "",
"scala_resistenza": steps["resistance"].spec["scale"],
"r nominale": steps["resistance"].spec["expected"],
"tolleranza_resistenza": steps["resistance"].spec["tolerance"],
#"avvitatura_abilitata": "x" if recipe.spec["screws"] else "",
#"viti": steps["screws"].spec["quantity"],
"prova_tenuta_abilitata": "x" if recipe.spec["leak_1"] else "",
"tempo_pre_riempimento": steps["leak_1"].spec["pre_filling_time"],
"pressione_pre_riempimento": steps["leak_1"].spec["pre_filling_pressure"],
"tempo_riempimento": steps["leak_1"].spec["filling_time"],
"tempo_assestamento": steps["leak_1"].spec["settling_time"],
"percentuale_minima_pressione_assestamento": steps["leak_1"].spec["settling_pressure_min_percent"],
"percentuale_massima_pressione_assestamento": steps["leak_1"].spec["settling_pressure_max_percent"],
"tempo_di_test": steps["leak_1"].spec["test_time"],
"pressione_di_test_delta_minimo": steps["leak_1"].spec["test_pressure_min_delta"],
"pressione_di_test": steps["leak_1"].spec["test_pressure"],
"pressione_di_test_delta_massimo": steps["leak_1"].spec["test_pressure_max_delta"],
"tempo_svuotamento": steps["leak_1"].spec["flush_time"],
"pressione_svuotmento": steps["leak_1"].spec["flush_pressure"],
"prova_tenuta_abilitata_2": "x" if recipe.spec["leak_2"] else "",
"tempo_pre_riempimento_2": steps["leak_2"].spec["pre_filling_time"],
"pressione_pre_riempimento_2": steps["leak_2"].spec["pre_filling_pressure"],
"tempo_riempimento_2": steps["leak_2"].spec["filling_time"],
"tempo_assestamento_2": steps["leak_2"].spec["settling_time"],
"percentuale_minima_pressione_assestamento_2": steps["leak_2"].spec["settling_pressure_min_percent"],
"percentuale_massima_pressione_assestamento_2": steps["leak_2"].spec["settling_pressure_max_percent"],
"tempo_di_test_2": steps["leak_2"].spec["test_time"],
"pressione_di_test_delta_minimo_2": steps["leak_2"].spec["test_pressure_min_delta"],
"pressione_di_test_2": steps["leak_2"].spec["test_pressure"],
"pressione_di_test_delta_massimo_2": steps["leak_2"].spec["test_pressure_max_delta"],
"tempo_svuotamento_2": steps["leak_2"].spec["flush_time"],
"pressione_svuotmento_2": steps["leak_2"].spec["flush_pressure"],
"test_visione_abilitato": recipe.spec["vision"],
"ricetta_visione": steps["vision"].spec["recipe"],
"stampa_etichetta_abilitata": "x" if recipe.spec["print"] else "",
print_template_field: steps["print"].spec["template"],
}
data.append(exportable)
if len(data):
self.log.info(f"recipes: exporting recipes to {csv_path}")
with open(csv_path, "w", newline="") as f:
w = csv.DictWriter(f, fieldnames, extrasaction="ignore")
w.writeheader()
w.writerows(data)
self.log.info(f"recipes: exported {len(data)} rows.")
def delete_recipes(self):
ret = QMessageBox.warning(
None,
"Attenzione si sta cercando di cancellare tutte le ricette!",
"Si è sicuri di voler eliminare tutte le ricette?\nQuesta operazione non può essere annullata!",
buttons=QMessageBox.Ok | QMessageBox.Cancel,
defaultButton=QMessageBox.Cancel
)
if ret == QMessageBox.Ok:
Recipes.delete().execute()
Steps.delete().execute()
self.crud.refresh()