import csv import locale import os import sys import weakref from glob import glob from lib.db import Recipes, Steps, Users, db from PyQt5.QtCore import QTimer, pyqtSignal from PyQt5.QtGui import QKeySequence from PyQt5.QtWidgets import QFileDialog, QMessageBox, QShortcut from ui.crud import Crud, Json_External_Dialog_Editor_Cell_Widget from ui.helpers import replace_widget from ui.recipe_spec_and_step_editor import Recipe_Spec_And_Step_Editor from ui.widget import Widget class Noner: def __getitem__(self, key): return None noner = Noner() class Recipe_Selection(Widget): ok = pyqtSignal(Recipes) def __init__(self, config, unsupported_steps=None): global noner super().__init__() self.config = config self.second_leak_test_enabled=self.config["hardware_config"]["second_leak_test"] == "present" self.defaults = self.config.get("recipes_defaults", noner) self.unsupported_steps = unsupported_steps if not self.second_leak_test_enabled: self.unsupported_steps.add("second_leak_test") session = Users.get_session() if session.is_admin: readonly = False crud_aliases = { "name": "Ricetta", "client": "Cliente", "part_number": "N° disegno", "spec": "Specifica", "description": "Descrizione", } filters = None else: readonly = True crud_aliases = { "name": "Ricetta", "client": "Cliente", "part_number": "N° disegno", "spec": "Specifica", "description": "Descrizione", } filters = {"archived": False} step_defaults = self.read_steps(self.config.get("recipes_defaults", noner), noner) step_defaults.update({ "vision": { # "recipe": sorted(glob("*.ini", root_dir="./config/vision/recipes/")), # only in python3.10 "recipe": sorted(map(os.path.basename, glob("./config/vision/recipes/*.ini"))), }, "print": { # "template": sorted(glob("*.prn", root_dir="./config/label_templates/")), # only in python3.10 "template": sorted(map(os.path.basename, glob("./config/label_templates/*.prn"))), }, }), self.crud = Crud( "recipes", display_name="SELEZIONE RICETTA", readonly=readonly, select=list(crud_aliases.keys()), filters=filters, fields_aliases=crud_aliases, autocomplete={ "name": self.config.get("recipes_defaults", noner)["codice_ricetta"], "client": self.config.get("recipes_defaults", noner)["cliente"], "part_number": self.config.get("recipes_defaults", noner)["part_number"], "spec": { "count": len(self.config.get("recipes_defaults", noner)["dimensione_lotto_abilitata"]) and "count" not in self.unsupported_steps, "connector": len(self.config.get("recipes_defaults", noner)["verifica_connettore_abilitata"]) and "connector" not in self.unsupported_steps, "barcodes": len(self.config.get("recipes_defaults", noner)["verifica_codice_a_barre_abilitata"]) and "barcodes" not in self.unsupported_steps, "resistance": len(self.config.get("recipes_defaults", noner)["verifica_resistenza_connettore_abilitata"]) and "resistance" not in self.unsupported_steps, "screws": len(self.config.get("recipes_defaults", noner)["avvitatura_abilitata"]) and "screws" not in self.unsupported_steps, "instruction": len(self.config.get("recipes_defaults", noner)["istruzione_abilitata"]) and "instruction" not in self.unsupported_steps, "leak_1": len(self.config.get("recipes_defaults", noner)["prova_tenuta_abilitata"]) and "leak_1" not in self.unsupported_steps, "leak_2": len(self.config.get("recipes_defaults", noner)["prova_tenuta_abilitata_2"]) and "leak_2" not in self.unsupported_steps, "vision": len(self.config.get("recipes_defaults", noner)["test_visione_abilitato"]) and "vision" not in self.unsupported_steps, "print": len(self.config.get("recipes_defaults", noner)["stampa_etichetta_abilitata"]) and "print" not in self.unsupported_steps, "step_editors": step_defaults, }, "description": self.config.get("recipes_defaults", noner)["descrizione"], "archived": False, }, sort={"name": True}, widget_classes={"spec": lambda *args, **kwargs: Json_External_Dialog_Editor_Cell_Widget( Recipe_Spec_And_Step_Editor, *args, **kwargs, unsupported_steps=self.unsupported_steps ), }, pagination=25, ) replace_widget(self, "crud_w", self.crud) self.crud_modified = None self.selected = None self.select_b.setEnabled(False) self.select_b.clicked.connect(self.select) QShortcut(QKeySequence("Return"), self).activated.connect(self.select_b.click) QShortcut(QKeySequence("Enter"), self).activated.connect(self.select_b.click) self.crud.modified.connect(self.check_modified) self.crud.selected.connect(self.check_selected) self.crud.emit() self.crud.db_tw.setColumnWidth(0, 200) self.crud.db_tw.setColumnWidth(1, 200) self.crud.db_tw.setColumnWidth(2, 200) self.crud.db_tw.setColumnWidth(3, 200) self.crud.db_tw.setColumnWidth(4, 200) self.crud.db_tw.setColumnWidth(5, 400) if session.is_admin: self.import_b.clicked.connect(lambda checked, self=weakref.ref(self): self().import_recipes()) self.export_b.clicked.connect(lambda checked, self=weakref.ref(self): self().export_recipes()) self.delete_all_b.clicked.connect(lambda checked, self=weakref.ref(self): self().delete_recipes()) else: self.import_b.setVisible(False) self.export_b.setVisible(False) self.delete_all_b.setVisible(False) # TESTING if "--auto-select" in sys.argv: recipe = "000952054" cn = self.crud.select_index["name"] self.crud.db_tw.clearSelection() for rn in range(1, self.crud.db_tw.rowCount()): if self.crud.db_tw.cellWidget(rn, cn).text() == recipe: selection = self.crud.db_tw.model().index(rn, cn) self.crud.db_tw.setCurrentIndex(selection) break self.test_timer = QTimer() self.test_timer.setSingleShot(True) self.test_timer.timeout.connect(self.select_b.clicked.emit) self.test_timer.start(500) # /TESTING def check_modified(self, modified): self.crud_modified = modified self.check(self.crud_modified, self.selected) def check_selected(self, selected=None): if selected is not None and len(selected) == 1: selected = selected[0] - 1 # - 1 because rn starts from 1 (filters line) if selected >= 0 and selected < len(self.crud.data_index): selected = self.crud.data_index[selected] if selected is not None: self.selected = selected else: self.selected = None else: self.selected = None self.check(self.crud_modified, self.selected) def check(self, modified, selected): self.select_b.setEnabled(modified is False and selected is not None) def select(self): if self.selected is not None: self.ok.emit(self.crud.db.table_model.get_by_id(self.selected)) def get_def(self,dict,key): val=dict.get(key,self.defaults[key]) return val if val != "" else self.defaults[key] def read_steps(self, row, defaults=None): if defaults is None: global noner defaults = self.config.get("recipes_defaults", noner) barcode_serial_field = self.config.get("recipe", {}).get("barcode_serial_field", "codice_a_barre").strip() decsep= locale.localeconv()["decimal_point"] rcsv = row.get("r nominale", defaults["r nominale"]).replace(" ", "").replace(",", decsep).replace("Ω", "").replace("?", "") if rcsv == "": rcsv = "999" print_template_field = self.config.get("recipe", {}).get("label_template_field", "modello_etichetta").strip() return { "count": { "amount": row.get("dimensione_lotto", defaults["dimensione_lotto"]), "warning_img": row.get("immagine_warning", ""), }, "connector": { "connector": row.get("connettore", defaults["connettore"]), }, "barcodes": { "serial": row.get(barcode_serial_field, defaults["codice_a_barre"]), }, "resistance": { "scale": locale.atof(row.get("scala_resistenza", defaults["scala_resistenza"])), "expected": locale.atof(rcsv), "tolerance_pos": locale.atof(self.get_def(row,"tolleranza_resistenza_pos")), "tolerance_neg": locale.atof(self.get_def(row,"tolleranza_resistenza_neg")), }, "screws": { "quantity": row.get("viti", defaults["viti"]) }, "instruction": { "num_tape": int(self.get_def(row,"numero nastri (n)")), "num_ring": int(self.get_def(row,"numero sensori anello (sa)")), "num_piece": int(self.get_def(row,"numero sensori presenza (sp)")) }, "leak_1": { "pre_filling_time": int(row.get("tempo_pre_riempimento", defaults["tempo_pre_riempimento"])), "pre_filling_pressure": int(row.get("pressione_pre_riempimento", defaults["pressione_pre_riempimento"])), "filling_time": int(row.get("tempo_riempimento", defaults["tempo_riempimento"])), "settling_time": int(self.get_def(row,"tempo_assestamento")), "settling_pressure_min_percent": int(row.get("percentuale_minima_pressione_assestamento", defaults["percentuale_minima_pressione_assestamento"])), "settling_pressure_max_percent": int(row.get("percentuale_massima_pressione_assestamento", defaults["percentuale_massima_pressione_assestamento"])), "test_time": int(row.get("tempo_di_test", defaults["tempo_di_test"])), "test_pressure_qneg": int(row.get("pressione_di_test_delta_minimo", defaults["pressione_di_test_delta_minimo"])), "test_pressure": int(row.get("pressione_di_test", defaults["pressione_di_test"])), "test_pressure_qpos": int(row.get("pressione_di_test_delta_massimo", defaults["pressione_di_test_delta_massimo"])), "flush_time": int(row.get("tempo_svuotamento", defaults["tempo_svuotamento"])), "flush_pressure": int(row.get("pressione_svuotamento", defaults["pressione_svuotamento"])), "relay_config": int(row.get("config_elettrovalvole", defaults["config_elettrovalvole"])) }, "leak_2": { "pre_filling_time": int(row.get("tempo_pre_riempimento_2", defaults["tempo_pre_riempimento_2"])), "pre_filling_pressure": int(row.get("pressione_pre_riempimento_2", defaults["pressione_pre_riempimento_2"])), "filling_time": int(row.get("tempo_riempimento_2", defaults["tempo_riempimento_2"])), "settling_time": int(row.get("tempo_assestamento_2", defaults["tempo_assestamento_2"])), "settling_pressure_min_percent": int(row.get("percentuale_minima_pressione_assestamento_2", defaults["percentuale_minima_pressione_assestamento_2"])), "settling_pressure_max_percent": int(row.get("percentuale_massima_pressione_assestamento_2", defaults["percentuale_massima_pressione_assestamento_2"])), "test_time": int(row.get("tempo_di_test_2", defaults["tempo_di_test_2"])), "test_pressure_qneg": int(row.get("pressione_di_test_delta_minimo_2", defaults["pressione_di_test_delta_minimo_2"])), "test_pressure": int(row.get("pressione_di_test_2", defaults["pressione_di_test_2"])), "test_pressure_qpos": int(row.get("pressione_di_test_delta_massimo_2", defaults["pressione_di_test_delta_massimo_2"])), "flush_time": int(row.get("tempo_svuotamento_2", defaults["tempo_svuotamento_2"])), "flush_pressure": int(row.get("pressione_svuotamento_2", defaults["pressione_svuotamento_2"])), "relay_config": int(row.get("config_elettrovalvole_2", defaults["config_elettrovalvole_2"])) }, "vision": { "recipe": row.get("ricetta_visione", defaults["ricetta_visione"]), }, "print": { "template": row.get(print_template_field, defaults["modello_etichetta"]), "labeltxt_1": row.get("testo_etich_1",""), "labeltxt_2": row.get("testo_etich_2", ""), "labeltxt_3": row.get("testo_etich_3", ""), "labeltxt_4": row.get("testo_etich_4", ""), "labeltxt_5": row.get("testo_etich_5", ""), "extra_label": row.get("etichette_supplementari", ""), }, } def import_recipes(self, csv_path=None, defaults=None): if defaults is None: global noner defaults = self.config.get("recipes_defaults", noner) if csv_path is None: csv_path, _ = QFileDialog.getOpenFileName( None, "Esportazione ricette", "ricette.csv", "CSV data (*.csv);;All Files (*)", ) csv_path = str(csv_path) if not len(csv_path): return self.log.info(f"recipes: importing recipes from {csv_path}") recipe_name_field = self.config.get("recipe", {}).get("recipe_name_field", "codice_ricetta").strip() part_number_field = self.config.get("recipe", {}).get("part_number_field", "part number").strip() description_field = self.config.get("recipe", {}).get("description_field", "descrizione").strip() barcode_enable_field = self.config.get("recipe", {}).get("barcode_enable_field", "verifica_codice_a_barre_abilitata").strip() with open(csv_path, "r",encoding="utf-8-sig") as f: reader = csv.DictReader(f) count = 0 for ucrow in reader: row= dict((k.lower(), v) for k, v in ucrow.items()) recipe_name = row.get(recipe_name_field, defaults["codice_ricetta"]) steps_specs = self.read_steps(row, defaults=defaults) try: recipe = Recipes.get_by_id(recipe_name) steps = recipe.get_steps_map() recipe_is_new = False except Recipes.DoesNotExist: recipe = Recipes(name=recipe_name, part_number="TEMPORARY") steps = {} for step_type, step_spec in steps_specs.items(): step = Steps() steps[step_type] = step recipe_is_new = True for step_name, step_spec in steps_specs.items(): step = steps[step_name] #step.type = re.sub(r"^(.*)_[0-9]+$", r"\1", step_name) step.type = step_name step.spec = step_spec if recipe_is_new: step.save(force_insert=True) else: step.save() recipe.client = row.get("cliente", defaults["cliente"]) recipe.part_number = row.get(part_number_field, defaults["part_number"]) recipe.description = row.get(description_field, defaults["descrizione"]) recipe.spec = { "count": len(row.get("dimensione_lotto_abilitata", defaults["dimensione_lotto_abilitata"])) and "count" not in self.unsupported_steps, "connector": len(row.get("verifica_connettore_abilitata", defaults["verifica_connettore_abilitata"])) and "connector" not in self.unsupported_steps, "barcodes": len(row.get(barcode_enable_field, defaults["verifica_codice_a_barre_abilitata"])) and "barcodes" not in self.unsupported_steps, "resistance": len(row.get("verifica_resistenza_connettore_abilitata", defaults["verifica_resistenza_connettore_abilitata"])) and "resistance" not in self.unsupported_steps, "screws": len(row.get("avvitatura_abilitata", defaults["avvitatura_abilitata"])) and "screws" not in self.unsupported_steps, "instruction": len(row.get("istruzione_abilitata", defaults["istruzione_abilitata"])) and "instruction" not in self.unsupported_steps, "leak_1": len(row.get("prova_tenuta_abilitata", defaults["prova_tenuta_abilitata"])) and "leak_1" not in self.unsupported_steps, "leak_2": len(row.get("prova_tenuta_abilitata_2", defaults["prova_tenuta_abilitata_2"])) and "leak_2" not in self.unsupported_steps, "vision": len(row.get("test_visione_abilitato", defaults["test_visione_abilitato"])) and "vision" not in self.unsupported_steps, "print": len(row.get("stampa_etichetta_abilitata", defaults["stampa_etichetta_abilitata"])) and "print" not in self.unsupported_steps, "steps": [], # should be pks of the enabled steps "available_steps": { "count": steps["count"].get_id(), "connector": steps["connector"].get_id(), "barcodes": steps["barcodes"].get_id(), "resistance": steps["resistance"].get_id(), "screws": steps["screws"].get_id(), "instruction": steps["instruction"].get_id(), "leak_1": steps["leak_1"].get_id(), "leak_2": steps["leak_2"].get_id(), "vision": steps["vision"].get_id(), "print": steps["print"].get_id(), }, } for step_name, step in recipe.spec["available_steps"].items(): if recipe.spec[step_name]: recipe.spec["steps"].append(step) if recipe_is_new: recipe.save(force_insert=True) else: recipe.save() count += 1 db.commit() self.log.info(f"recipes: imported {count} rows.") self.crud.refresh() def export_recipes(self, csv_path=None): if csv_path is None: csv_path, _ = QFileDialog.getSaveFileName( None, "Esportazione ricette", "ricette.csv", "CSV data (*.csv);;All Files (*)", ) csv_path = str(csv_path) if not len(csv_path): return if not csv_path.lower().endswith(".csv"): csv_path += ".csv" csv_dir = os.path.dirname(csv_path) if len(csv_dir): os.makedirs(csv_dir, exist_ok=True) recipe_name_field = self.config.get("recipe", {}).get("recipe_name_field", "codice_ricetta").strip() barcode_enable_field = self.config.get("recipe", {}).get("barcode_enable_field", "verifica_codice_a_barre_abilitata").strip() barcode_serial_field = self.config.get("recipe", {}).get("barcode_serial_field", "codice_a_barre").strip() print_template_field = self.config.get("recipe", {}).get("label_template_field", "modello_etichetta").strip() data = [] fieldnames = [ recipe_name_field, "cliente", "part_number", "dimensione_lotto_abilitata", "dimensione_lotto", "verifica_connettore_abilitata", "connettore", barcode_enable_field, barcode_serial_field, "verifica_resistenza_connettore_abilitata", "scala_resistenza", "r nominale", "tolleranza_resistenza_pos", "tolleranza_resistenza_neg", "avvitatura_abilitata", "viti", "prova_tenuta_abilitata", "tempo_pre_riempimento", "pressione_pre_riempimento", "tempo_riempimento", "tempo_assestamento", "percentuale_minima_pressione_assestamento", "percentuale_massima_pressione_assestamento", "tempo_di_test", "pressione_di_test_delta_minimo", "pressione_di_test", "pressione_di_test_delta_massimo", "tempo_svuotamento", "pressione_svuotamento", "prova_tenuta_abilitata_2", "tempo_pre_riempimento_2", "pressione_pre_riempimento_2", "tempo_riempimento_2", "tempo_assestamento_2", "percentuale_minima_pressione_assestamento_2", "percentuale_massima_pressione_assestamento_2", "tempo_di_test_2", "pressione_di_test_delta_minimo_2", "pressione_di_test_2", "pressione_di_test_delta_massimo_2", "tempo_svuotamento_2", "pressione_svuotamento_2", "test_visione_abilitato", "ricetta_visione", "stampa_etichetta_abilitata", print_template_field, ] for recipe in list(Recipes.select()): steps = recipe.get_steps_map() exportable = { recipe_name_field: recipe.name, "cliente": recipe.client, "part_number": recipe.part_number, # "dimensione_lotto_abilitata": "x" if recipe.spec["count"] else "", # "dimensione_lotto": steps["count"].spec["amount"], "verifica_connettore_abilitata": "x" if recipe.spec["connector"] else "", "connettore": steps["connector"].spec["connector"], barcode_enable_field: "x" if recipe.spec["barcodes"] else "", barcode_serial_field: steps["barcodes"].spec["serial"], "verifica_resistenza_connettore_abilitata": "x" if recipe.spec["resistance"] else "", "scala_resistenza": steps["resistance"].spec["scale"], "r nominale": steps["resistance"].spec["expected"], "tolleranza_resistenza_pos": steps["resistance"].spec["tolerance_pos"], "tolleranza_resistenza_neg": steps["resistance"].spec["tolerance_neg"], # "avvitatura_abilitata": "x" if recipe.spec["screws"] else "", # "viti": steps["screws"].spec["quantity"], "prova_tenuta_abilitata": "x" if recipe.spec["leak_1"] else "", "tempo_pre_riempimento": steps["leak_1"].spec["pre_filling_time"], "pressione_pre_riempimento": steps["leak_1"].spec["pre_filling_pressure"], "tempo_riempimento": steps["leak_1"].spec["filling_time"], "tempo_assestamento": steps["leak_1"].spec["settling_time"], "percentuale_minima_pressione_assestamento": steps["leak_1"].spec["settling_pressure_min_percent"], "percentuale_massima_pressione_assestamento": steps["leak_1"].spec["settling_pressure_max_percent"], "tempo_di_test": steps["leak_1"].spec["test_time"], "pressione_di_test_delta_minimo": steps["leak_1"].spec["test_pressure_qneg"], "pressione_di_test": steps["leak_1"].spec["test_pressure"], "pressione_di_test_delta_massimo": steps["leak_1"].spec["test_pressure_qpos"], "tempo_svuotamento": steps["leak_1"].spec["flush_time"], "pressione_svuotamento": steps["leak_1"].spec["flush_pressure"], "prova_tenuta_abilitata_2": "x" if recipe.spec["leak_2"] else "", "tempo_pre_riempimento_2": steps["leak_2"].spec["pre_filling_time"], "pressione_pre_riempimento_2": steps["leak_2"].spec["pre_filling_pressure"], "tempo_riempimento_2": steps["leak_2"].spec["filling_time"], "tempo_assestamento_2": steps["leak_2"].spec["settling_time"], "percentuale_minima_pressione_assestamento_2": steps["leak_2"].spec["settling_pressure_min_percent"], "percentuale_massima_pressione_assestamento_2": steps["leak_2"].spec["settling_pressure_max_percent"], "tempo_di_test_2": steps["leak_2"].spec["test_time"], "pressione_di_test_delta_minimo_2": steps["leak_2"].spec["test_pressure_qneg"], "pressione_di_test_2": steps["leak_2"].spec["test_pressure"], "pressione_di_test_delta_massimo_2": steps["leak_2"].spec["test_pressure_qpos"], "tempo_svuotamento_2": steps["leak_2"].spec["flush_time"], "pressione_svuotamento_2": steps["leak_2"].spec["flush_pressure"], "test_visione_abilitato": recipe.spec["vision"], "ricetta_visione": steps["vision"].spec["recipe"], "stampa_etichetta_abilitata": "x" if recipe.spec["print"] else "", print_template_field: steps["print"].spec["template"], } data.append(exportable) if len(data): self.log.info(f"recipes: exporting recipes to {csv_path}") with open(csv_path, "w", newline="") as f: w = csv.DictWriter(f, fieldnames, extrasaction="ignore") w.writeheader() w.writerows(data) self.log.info(f"recipes: exported {len(data)} rows.") def delete_recipes(self): ret = QMessageBox.warning( None, "Attenzione si sta cercando di cancellare tutte le ricette!", "Si è sicuri di voler eliminare tutte le ricette?\nQuesta operazione non può essere annullata!", buttons=QMessageBox.Ok | QMessageBox.Cancel, defaultButton=QMessageBox.Cancel ) if ret == QMessageBox.Ok: Recipes.delete().execute() Steps.delete().execute() self.crud.refresh()