import csv import locale import os import sys import weakref from glob import glob from lib.db import Recipes, Users, db from PyQt5.QtCore import QTimer, pyqtSignal from PyQt5.QtGui import QKeySequence from PyQt5.QtWidgets import QFileDialog, QMessageBox, QShortcut import shutil from lib.helpers.step import Step from ui.crud import Crud, Json_External_Dialog_Editor_Cell_Widget from ui.helpers import replace_widget from ui.recipe_spec_and_step_editor import Recipe_Spec_And_Step_Editor from ui.widget import Widget from datetime import datetime from src.components import ArchiveSynchronizer class Noner: def __getitem__(self, key): return None noner = Noner() class Recipe_Selection(Widget): ok = pyqtSignal(Recipes) def __init__(self, config, unsupported_steps=None): global noner super().__init__() self.config = config self.archive_sync = ArchiveSynchronizer() self.second_leak_test_enabled = self.config["hardware_config"]["second_leak_test"] == "present" self.defaults = self.config.get("recipes_defaults", noner) self.unsupported_steps = unsupported_steps session = Users.get_session() if session.is_admin: readonly = False crud_aliases = { "name": "Ricetta", "client": "Cliente", "part_number": "N° disegno", "spec": "Specifica", "description": "Descrizione", } filters = None else: readonly = True crud_aliases = { "name": "Ricetta", "client": "Cliente", "part_number": "N° disegno", "spec": "Specifica", "description": "Descrizione", } filters = {"archived": False} step_defaults = self.read_steps(self.config.get("recipes_defaults", noner), noner) custom_label_folder = f"config/label_templates/{str(self.config.machine_id)}/" standard_label_folder = f"config/label_templates/" if os.path.exists(custom_label_folder): label_folder = custom_label_folder else: label_folder = standard_label_folder step_defaults.update({ "vision": { # "recipe": sorted(glob("*.ini", root_dir="./config/vision/recipes/")), # only in python3.10 "recipe": sorted(map(os.path.basename, glob("./config/vision/recipes/*.ini"))), }, "print": { # "template": sorted(glob("*.prn", root_dir="./config/label_templates/")), # only in python3.10 "template": sorted(map(os.path.basename, glob(f"{label_folder}*.prn"))), }, }), self.crud = Crud( "recipes", display_name="SELEZIONE RICETTA", readonly=readonly, select=list(crud_aliases.keys()), filters=filters, fields_aliases=crud_aliases, autocomplete={ "name": self.config.get("recipes_defaults", noner)["codice_ricetta"], "client": self.config.get("recipes_defaults", noner)["cliente"], "part_number": self.config.get("recipes_defaults", noner)["part_number"], "spec": { "count": len(self.config.get("recipes_defaults", noner)["dimensione_lotto_abilitata"]) and "count" not in self.unsupported_steps, "connector": len(self.config.get("recipes_defaults", noner)["verifica_connettore_abilitata"]) and "connector" not in self.unsupported_steps, "barcodes": len( self.config.get("recipes_defaults", noner)["verifica_codice_a_barre_abilitata"]) and "barcodes" not in self.unsupported_steps, "resistance": len( self.config.get("recipes_defaults", noner)["verifica_resistenza_connettore_abilitata"]) and "resistance" not in self.unsupported_steps, "screws": len(self.config.get("recipes_defaults", noner)["avvitatura_abilitata"]) and "screws" not in self.unsupported_steps, "instruction": len(self.config.get("recipes_defaults", noner)["istruzione_abilitata"]) and "instruction" not in self.unsupported_steps, "leak_1": len(self.config.get("recipes_defaults", noner)["prova_tenuta_abilitata"]) and "leak_1" not in self.unsupported_steps, "leak_2": len(self.config.get("recipes_defaults", noner)["prova_tenuta_abilitata_2"]) and "leak_2" not in self.unsupported_steps, "vision": len(self.config.get("recipes_defaults", noner)["test_visione_abilitato"]) and "vision" not in self.unsupported_steps, "print": len(self.config.get("recipes_defaults", noner)["stampa_etichetta_abilitata"]) and "print" not in self.unsupported_steps, "step_editors": step_defaults, }, "description": self.config.get("recipes_defaults", noner)["descrizione"], "archived": False, }, sort={"name": True}, widget_classes={"spec": lambda *args, **kwargs: Json_External_Dialog_Editor_Cell_Widget( Recipe_Spec_And_Step_Editor, *args, **kwargs, unsupported_steps=self.unsupported_steps ), }, pagination=25, ) replace_widget(self, "crud_w", self.crud) self.crud_modified = None self.selected = None self.select_b.setEnabled(False) self.select_b.clicked.connect(self.select) QShortcut(QKeySequence("Return"), self).activated.connect(self.select_b.click) QShortcut(QKeySequence("Enter"), self).activated.connect(self.select_b.click) self.crud.modified.connect(self.check_modified) self.crud.selected.connect(self.check_selected) self.crud.emit() self.crud.db_tw.setColumnWidth(0, 200) self.crud.db_tw.setColumnWidth(1, 200) self.crud.db_tw.setColumnWidth(2, 200) self.crud.db_tw.setColumnWidth(3, 200) self.crud.db_tw.setColumnWidth(4, 200) self.crud.db_tw.setColumnWidth(5, 400) if session.is_admin: self.import_b.clicked.connect(lambda checked, selfi=weakref.ref(self): selfi().import_recipes()) self.export_b.clicked.connect(lambda checked, selfi=weakref.ref(self): selfi().export_recipes()) self.delete_all_b.clicked.connect(lambda checked, selfi=weakref.ref(self): selfi().delete_recipes()) else: self.import_b.setVisible(False) self.export_b.setVisible(False) self.delete_all_b.setVisible(False) # TESTING if "--auto-select" in sys.argv: recipe = "R56738/1" cn = self.crud.select_index["name"] self.crud.db_tw.clearSelection() for rn in range(1, self.crud.db_tw.rowCount()): if self.crud.db_tw.cellWidget(rn, cn).text() == recipe: selection = self.crud.db_tw.model().index(rn, cn) self.crud.db_tw.setCurrentIndex(selection) break self.test_timer = QTimer() self.test_timer.setSingleShot(True) self.test_timer.timeout.connect(self.select_b.clicked.emit) self.test_timer.start(500) # /TESTING def check_modified(self, modified): self.crud_modified = modified self.check(self.crud_modified, self.selected) def check_selected(self, selected=None): if selected is not None and len(selected) == 1: selected = selected[0] - 1 # - 1 because rn starts from 1 (filters line) if selected >= 0 and selected < len(self.crud.data_index): selected = self.crud.data_index[selected] if selected is not None: self.selected = selected else: self.selected = None else: self.selected = None self.check(self.crud_modified, self.selected) def check(self, modified, selected): self.select_b.setEnabled(modified is False and selected is not None) def select(self): if self.selected is not None: self.ok.emit(self.crud.db.table_model.get_by_id(self.selected)) def get_def(self, dict, key): val = dict.get(key, self.defaults[key]) return val if val != "" else self.defaults[key] # READ RECIPE STEPS FROM CSV ROW def read_steps(self, row, defaults=None): if defaults is None: global noner defaults = self.config.get("recipes_defaults", noner) barcode_serial_field = self.config.get("recipe", {}).get("barcode_serial_field", "codice_a_barre").strip() warning_image_field = self.config.get("recipe", {}).get("warning_image_field", "warning_img").strip() decsep = locale.localeconv()["decimal_point"] rcsv = row.get("r nominale", defaults["r nominale"]).replace(" ", "").replace(",", decsep).replace("Ω", "").replace("?", "") if rcsv == "": rcsv = "999" print_template_field = self.config.get("recipe", {}).get("label_template_field", "modello_etichetta").strip() return { "count": { "amount": row.get("dimensione_lotto", defaults["dimensione_lotto"]), "warning_img": row.get(warning_image_field, defaults["warning_img"]), "require_discard_piece": row.get("richiedi_inserimento_scarto", defaults["richiedi_inserimento_scarto"]) }, "connector": { "connector": row.get("connettore", defaults["connettore"]), }, "barcodes": { "serial": row.get(barcode_serial_field, defaults["codice_a_barre"]), "n_pieces": row.get("n_componenti") or defaults["n_componenti"], "barcode_input_2": row.get("barcode_input_2", "-"), "barcode_input_3": row.get("barcode_input_3", "-"), "barcode_input_4": row.get("barcode_input_4", "-"), "barcode_input_5": row.get("barcode_input_5", "-"), }, "resistance": { "scale": locale.atof(row.get("scala_resistenza", defaults["scala_resistenza"])), "expected": locale.atof(rcsv), "tolerance_pos": locale.atof(self.get_def(row, "tolleranza_resistenza_pos")), "tolerance_neg": locale.atof(self.get_def(row, "tolleranza_resistenza_neg")), }, "screws": { "quantity": row.get("viti", defaults["viti"]) }, "instruction": {}, "leak_1": { "pre_filling_time": int(row.get("tempo_pre_riempimento", defaults["tempo_pre_riempimento"])), "pre_filling_pressure": int(row.get("pressione_pre_riempimento", defaults["pressione_pre_riempimento"])), "filling_time": int(row.get("tempo_riempimento", defaults["tempo_riempimento"])), "settling_time": int(self.get_def(row, "tempo_assestamento")), "settling_pressure_min_percent": int( row.get("percentuale_minima_pressione_assestamento", defaults["percentuale_minima_pressione_assestamento"])), "settling_pressure_max_percent": int( row.get("percentuale_massima_pressione_assestamento", defaults["percentuale_massima_pressione_assestamento"])), "test_time": int(row.get("tempo_di_test", defaults["tempo_di_test"])), "test_pressure_qneg": int(row.get("pressione_di_test_delta_minimo", defaults["pressione_di_test_delta_minimo"])), "test_pressure": int(row.get("pressione_di_test", defaults["pressione_di_test"])), "test_pressure_qpos": int(row.get("pressione_di_test_delta_massimo", defaults["pressione_di_test_delta_massimo"])), "flush_time": int(row.get("tempo_svuotamento", defaults["tempo_svuotamento"])), "flush_pressure": int(row.get("pressione_svuotamento", defaults["pressione_svuotamento"])), "chan_sel": int(row.get("canale_di_prova", defaults["canale_di_prova"])), "ext_flush_time": int(row.get("tempo_svuotamento_esterno", defaults["tempo_svuotamento_esterno"])), "ext_blow_time": int(row.get("tempo_soffiaggio_esterno", defaults["tempo_soffiaggio_esterno"])), }, "leak_2": { "pre_filling_time": int(row.get("tempo_pre_riempimento_2", defaults["tempo_pre_riempimento_2"])), "pre_filling_pressure": int(row.get("pressione_pre_riempimento_2", defaults["pressione_pre_riempimento_2"])), "filling_time": int(row.get("tempo_riempimento_2", defaults["tempo_riempimento_2"])), "settling_time": int(row.get("tempo_assestamento_2", defaults["tempo_assestamento_2"])), "settling_pressure_min_percent": int( row.get("percentuale_minima_pressione_assestamento_2", defaults["percentuale_minima_pressione_assestamento_2"])), "settling_pressure_max_percent": int( row.get("percentuale_massima_pressione_assestamento_2", defaults["percentuale_massima_pressione_assestamento_2"])), "test_time": int(row.get("tempo_di_test_2", defaults["tempo_di_test_2"])), "test_pressure_qneg": int(row.get("pressione_di_test_delta_minimo_2", defaults["pressione_di_test_delta_minimo_2"])), "test_pressure": int(row.get("pressione_di_test_2", defaults["pressione_di_test_2"])), "test_pressure_qpos": int(row.get("pressione_di_test_delta_massimo_2", defaults["pressione_di_test_delta_massimo_2"])), "flush_time": int(row.get("tempo_svuotamento_2", defaults["tempo_svuotamento_2"])), "flush_pressure": int(row.get("pressione_svuotamento_2", defaults["pressione_svuotamento_2"])), "chan_sel": int(row.get("canale_di_prova_2", defaults["canale_di_prova_2"])), "ext_flush_time": int(row.get("tempo_svuotamento_esterno_2", defaults["tempo_svuotamento_esterno"])), "ext_blow_time": int(row.get("tempo_soffiaggio_esterno_2", defaults["tempo_soffiaggio_esterno"])), }, "vision": { "recipe": row.get("ricetta_visione", defaults["ricetta_visione"]), }, "print": { "template": row.get(print_template_field, defaults["modello_etichetta"]), "labeltxt_1": row.get("testo_etich_1", ""), "labeltxt_2": row.get("testo_etich_2", ""), "labeltxt_3": row.get("testo_etich_3", ""), "labeltxt_4": row.get("testo_etich_4", ""), "labeltxt_5": row.get("barcode_input_finelinea", ""), "extra_label": row.get("etichette_supplementari", ""), }, } # IMPORT RECIPES FROM CSV FILE TO DATABASE def import_recipes(self, csv_path=None, defaults=None): if defaults is None: global noner defaults = self.config.get("recipes_defaults", noner) if csv_path is None: options = QFileDialog.Options() options |= QFileDialog.DontUseNativeDialog csv_path, _ = QFileDialog.getOpenFileName( self, "Importazione ricette", "ricette.csv", "CSV data (*.csv);;All Files (*)", options=options, ) csv_path = str(csv_path) if not len(csv_path): return self.log.info(f"recipes: importing recipes from {csv_path}") recipe_name_field = self.config.get("recipe", {}).get("recipe_name_field", "codice_ricetta").strip() part_number_field = self.config.get("recipe", {}).get("part_number_field", "part number").strip() description_field = self.config.get("recipe", {}).get("description_field", "descrizione").strip() barcode_enable_field = self.config.get("recipe", {}).get("barcode_enable_field", "verifica_codice_a_barre_abilitata").strip() with open(csv_path, "r", encoding="utf-8-sig") as f: reader = csv.DictReader(f) count = 0 for ucrow in reader: row = dict((k.lower(), v) for k, v in ucrow.items()) recipe_name = row.get(recipe_name_field, defaults["codice_ricetta"]) steps_specs = self.read_steps(row, defaults=defaults) # create recipe or update existing one in DB try: recipe = Recipes.get_by_id(recipe_name) recipe_is_new = False except Recipes.DoesNotExist: recipe = Recipes(name=recipe_name, part_number="TEMPORARY") recipe_is_new = True recipe.client = row.get("cliente", defaults["cliente"]) recipe.part_number = row.get(part_number_field, defaults["part_number"]) recipe.description = row.get(description_field, defaults["descrizione"]) recipe.spec = { "count": len(row.get("dimensione_lotto_abilitata", defaults["dimensione_lotto_abilitata"])) and "count" not in self.unsupported_steps, "connector": len( row.get("verifica_connettore_abilitata", defaults["verifica_connettore_abilitata"])) and "connector" not in self.unsupported_steps, "barcodes": len(row.get(barcode_enable_field, defaults["verifica_codice_a_barre_abilitata"])) and "barcodes" not in self.unsupported_steps, "resistance": len(row.get("verifica_resistenza_connettore_abilitata", defaults["verifica_resistenza_connettore_abilitata"])) and "resistance" not in self.unsupported_steps, "screws": len(row.get("avvitatura_abilitata", defaults["avvitatura_abilitata"])) and "screws" not in self.unsupported_steps, "instruction": len(row.get("istruzione_abilitata", defaults["istruzione_abilitata"])) and "instruction" not in self.unsupported_steps, "leak_1": len(row.get("prova_tenuta_abilitata", defaults["prova_tenuta_abilitata"])) and "leak_1" not in self.unsupported_steps, "leak_2": len(row.get("prova_tenuta_abilitata_2", defaults["prova_tenuta_abilitata_2"])) and "leak_2" not in self.unsupported_steps, "vision": len(row.get("test_visione_abilitato", defaults["test_visione_abilitato"])) and "vision" not in self.unsupported_steps, "print": len(row.get("stampa_etichetta_abilitata", defaults["stampa_etichetta_abilitata"])) and "print" not in self.unsupported_steps, } if recipe_is_new: recipe.save(force_insert=True) else: recipe.save() count += 1 db.commit() self.log.info(f"recipes: imported {count} rows.") self.crud.refresh() # EXPORT RECIPES TABLE TO CSV FILE def export_recipes(self, csv_path=None): if csv_path is None: options = QFileDialog.Options() options |= QFileDialog.DontUseNativeDialog csv_path, _ = QFileDialog.getSaveFileName( self, "Esportazione ricette", "ricette.csv", "CSV data (*.csv);;All Files (*)", options=options, ) csv_path = str(csv_path) if not len(csv_path): return if not csv_path.lower().endswith(".csv"): csv_path += ".csv" csv_dir = os.path.dirname(csv_path) if len(csv_dir): os.makedirs(csv_dir, exist_ok=True) recipe_name_field = self.config.get("recipe", {}).get("recipe_name_field", "codice_ricetta").strip() barcode_enable_field = self.config.get("recipe", {}).get("barcode_enable_field", "verifica_codice_a_barre_abilitata").strip() barcode_serial_field = self.config.get("recipe", {}).get("barcode_serial_field", "codice_a_barre").strip() print_template_field = self.config.get("recipe", {}).get("label_template_field", "modello_etichetta").strip() data = [] fieldnames = set() # Use a set to avoid duplicates for recipe in list(Recipes.select()): steps = recipe.get_steps_map() exportable = { # BASE SECTION recipe_name_field: recipe.name, "cliente": recipe.client, "part_number": recipe.part_number, } # Add base fields to the fieldnames fieldnames.update([recipe_name_field, "cliente", "part_number"]) # Check and add fields conditionally for each section if "connector" in steps: exportable.update({ "verifica_connettore_abilitata": "x", "connettore": steps["connector"].spec["connector"] }) fieldnames.update(["verifica_connettore_abilitata", "connettore"]) if "resistance" in steps: exportable.update({ "verifica_resistenza_connettore_abilitata": "x", "scala_resistenza": steps["resistance"].spec["scale"], "r nominale": steps["resistance"].spec["expected"], "tolleranza_resistenza_pos": steps["resistance"].spec["tolerance_pos"], "tolleranza_resistenza_neg": steps["resistance"].spec["tolerance_neg"] }) fieldnames.update(["verifica_resistenza_connettore_abilitata", "scala_resistenza", "r nominale", "tolleranza_resistenza_pos", "tolleranza_resistenza_neg"]) if "barcodes" in steps: exportable.update({ barcode_enable_field: "x", barcode_serial_field: steps["barcodes"].spec["serial"] }) fieldnames.update([barcode_enable_field, barcode_serial_field]) if recipe.spec.get("steps", {}).get("screws") and "screws" in steps: exportable.update({ "avvitatura_abilitata": "x", "viti": steps["screws"].spec["quantity"] }) fieldnames.update(["avvitatura_abilitata", "viti"]) if "leak_1" in steps: exportable.update({ "prova_tenuta_abilitata": "x", "tempo_pre_riempimento": steps["leak_1"].spec["pre_filling_time"], "pressione_pre_riempimento": steps["leak_1"].spec["pre_filling_pressure"], "tempo_riempimento": steps["leak_1"].spec["filling_time"], "tempo_assestamento": steps["leak_1"].spec["settling_time"], "percentuale_minima_pressione_assestamento": steps["leak_1"].spec["settling_pressure_min_percent"], "percentuale_massima_pressione_assestamento": steps["leak_1"].spec["settling_pressure_max_percent"], "tempo_di_test": steps["leak_1"].spec["test_time"], "pressione_di_test_delta_minimo": steps["leak_1"].spec["test_pressure_qneg"], "pressione_di_test": steps["leak_1"].spec["test_pressure"], "pressione_di_test_delta_massimo": steps["leak_1"].spec["test_pressure_qpos"], "tempo_svuotamento": steps["leak_1"].spec["flush_time"], "pressione_svuotamento": steps["leak_1"].spec["flush_pressure"], }) fieldnames.update(["prova_tenuta_abilitata", "tempo_pre_riempimento", "pressione_pre_riempimento", "tempo_riempimento", "tempo_assestamento", "percentuale_minima_pressione_assestamento", "percentuale_massima_pressione_assestamento", "tempo_di_test", "pressione_di_test_delta_minimo", "pressione_di_test", "pressione_di_test_delta_massimo", "tempo_svuotamento", "pressione_svuotamento"]) if "leak_2" in steps: exportable.update({ "prova_tenuta_abilitata_2": "x", "tempo_pre_riempimento_2": steps["leak_2"].spec["pre_filling_time"], "pressione_pre_riempimento_2": steps["leak_2"].spec["pre_filling_pressure"], "tempo_riempimento_2": steps["leak_2"].spec["filling_time"], "tempo_assestamento_2": steps["leak_2"].spec["settling_time"], "percentuale_minima_pressione_assestamento_2": steps["leak_2"].spec[ "settling_pressure_min_percent"], "percentuale_massima_pressione_assestamento_2": steps["leak_2"].spec[ "settling_pressure_max_percent"], "tempo_di_test_2": steps["leak_2"].spec["test_time"], "pressione_di_test_delta_minimo_2": steps["leak_2"].spec["test_pressure_qneg"], "pressione_di_test_2": steps["leak_2"].spec["test_pressure"], "pressione_di_test_delta_massimo_2": steps["leak_2"].spec["test_pressure_qpos"], "tempo_svuotamento_2": steps["leak_2"].spec["flush_time"], "pressione_svuotamento_2": steps["leak_2"].spec["flush_pressure"], }) fieldnames.update(["prova_tenuta_abilitata_2", "tempo_pre_riempimento_2", "pressione_pre_riempimento_2", "tempo_riempimento_2", "tempo_assestamento_2", "percentuale_minima_pressione_assestamento_2", "percentuale_massima_pressione_assestamento_2", "tempo_di_test_2", "pressione_di_test_delta_minimo_2", "pressione_di_test_2", "pressione_di_test_delta_massimo_2", "tempo_svuotamento_2", "pressione_svuotamento_2"]) if "vision" in steps: exportable.update({ "test_visione_abilitato": recipe.spec["vision"], "ricetta_visione": steps["vision"].spec["recipe"] }) fieldnames.update(["test_visione_abilitato", "ricetta_visione"]) if "print" in steps: exportable.update({ "stampa_etichetta_abilitata": "x", print_template_field: steps["print"].spec["template"], "etichette_supplementari": steps["print"].spec["extra_label"] }) fieldnames.update(["stampa_etichetta_abilitata", print_template_field, "etichette_supplementari"]) # Append the exportable dictionary to the data list data.append(exportable) # Convert the set to a list for CSV writing fieldnames = list(fieldnames) # Export to CSV if there is data if len(data): self.log.info(f"recipes: exporting recipes to {csv_path}") with open(csv_path, "w", newline="") as f: w = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore") w.writeheader() w.writerows(data) self.log.info(f"recipes: exported {len(data)} rows.") def delete_recipes(self): ret = QMessageBox.warning( None, "Attenzione si sta cercando di cancellare tutte le ricette!", "Si è sicuri di voler eliminare tutte le ricette?\nQuesta operazione non può essere annullata!", buttons=QMessageBox.Ok | QMessageBox.Cancel, defaultButton=QMessageBox.Cancel ) if ret == QMessageBox.Ok: Recipes.delete().execute() self.crud.refresh() def backup_current_recipes(self): # Define the backup directory and file name backup_dir = os.path.join('config', 'csv_import', 'backup_csv') timestamp = datetime.now().strftime("%d%m%y%H%M%S") backup_file = f"backup_{timestamp}.csv" backup_path = os.path.join(backup_dir, backup_file) # Ensure the backup directory exists os.makedirs(backup_dir, exist_ok=True) # Export current recipes to backup file self.export_recipes(csv_path=backup_path) def move_imported_csv(self, csv_path): # Move the imported CSV to the 'imported_csv' directory imported_dir = os.path.join('config', 'csv_import', 'imported_csv') os.makedirs(imported_dir, exist_ok=True) imported_path = os.path.join(imported_dir, os.path.basename(csv_path)) shutil.move(csv_path, imported_path) self.log.info(f"Imported CSV moved to {imported_path}") return imported_path def check_and_import_auto_csv(self): # Define the directory to check auto_import_dir = os.path.join('config', 'csv_import', 'auto_csv_import') # Check if the directory exists and is not empty if os.path.exists(auto_import_dir) and os.listdir(auto_import_dir): # Perform backup self.backup_current_recipes() # Move and import each CSV file in the directory for csv_file in os.listdir(auto_import_dir): csv_path = os.path.join(auto_import_dir, csv_file) if os.path.isfile(csv_path) and csv_file.endswith(".csv"): self.import_recipes(csv_path=csv_path) self.move_imported_csv(csv_path)