From 533d010881c80b5fe81b5c157b615ac69defbc1d Mon Sep 17 00:00:00 2001 From: edo-neo Date: Tue, 24 Dec 2024 11:44:57 +0100 Subject: [PATCH] dev --- src/lib/helpers/recipe_manager.py | 405 ++++++++++++++++++++++++++++++ 1 file changed, 405 insertions(+) create mode 100644 src/lib/helpers/recipe_manager.py diff --git a/src/lib/helpers/recipe_manager.py b/src/lib/helpers/recipe_manager.py new file mode 100644 index 0000000..7b6aa79 --- /dev/null +++ b/src/lib/helpers/recipe_manager.py @@ -0,0 +1,405 @@ +import os +import csv +import locale +from datetime import datetime +import shutil +from PyQt5.QtWidgets import QFileDialog +from lib.db import Recipes, db # Assuming these are part of your project structure + + +def read_steps(row, config, defaults=None, unsupported_steps=None): + if defaults is None: + defaults = config.get("recipes_defaults", lambda k: None) + + # Configurable fields from the config object + barcode_serial_field = config.get("recipe", {}).get("barcode_serial_field", "codice_a_barre").strip() + warning_image_field = config.get("recipe", {}).get("warning_image_field", "warning_img").strip() + print_template_field = config.get("recipe", {}).get("label_template_field", "modello_etichetta").strip() + decsep = locale.localeconv()["decimal_point"] + + # Extract and clean "r nominale" value + rcsv = ( + row.get("r nominale", defaults["r nominale"]) + .replace(" ", "").replace(",", decsep).replace("Ω", "").replace("?", "") + ) + if rcsv == "": + rcsv = "999" # Default fallback for "r nominale" if empty + + # Helper functions + def get_default_value(field, key): + value = field.get(key, defaults[key]) + return value if value != "" else defaults[key] + + def safe_parse(value): + try: + return int(float(value)) + except ValueError: + return 0 # Default to 0 if parsing fails + + # Define the steps dictionary + steps = { + "count": { + "amount": row.get("dimensione_lotto", defaults["dimensione_lotto"]), + "warning_img": row.get(warning_image_field, defaults["warning_img"]), + "require_discard_piece": row.get("richiedi_inserimento_scarto", defaults["richiedi_inserimento_scarto"]), + }, + "connector": { + "connector": row.get("connettore", defaults["connettore"]), + }, + "barcodes": { + "serial": row.get(barcode_serial_field, defaults["codice_a_barre"]), + "n_pieces": row.get("n_componenti", defaults["n_componenti"]), + "barcode_input_2": row.get("barcode_input_2", "-"), + "barcode_input_3": row.get("barcode_input_3", "-"), + "barcode_input_4": row.get("barcode_input_4", "-"), + "barcode_input_5": row.get("barcode_input_5", "-"), + }, + "resistance": { + "scale": locale.atof(row.get("scala_resistenza", defaults["scala_resistenza"])), + "expected": locale.atof(rcsv), + "tolerance_pos": locale.atof(get_default_value(row, "tolleranza_resistenza_pos")), + "tolerance_neg": locale.atof(get_default_value(row, "tolleranza_resistenza_neg")), + }, + "screws": { + "quantity": row.get("viti", defaults["viti"]), + }, + "instruction": {}, # Empty placeholder for future extensions + "leak_1": { + "pre_filling_time": safe_parse(row.get("tempo_pre_riempimento", defaults["tempo_pre_riempimento"])), + "pre_filling_pressure": safe_parse( + row.get("pressione_pre_riempimento", defaults["pressione_pre_riempimento"])), + "filling_time": safe_parse(row.get("tempo_riempimento", defaults["tempo_riempimento"])), + "settling_time": safe_parse(get_default_value(row, "tempo_assestamento")), + "settling_pressure_min_percent": safe_parse( + row.get("percentuale_minima_pressione_assestamento", + defaults["percentuale_minima_pressione_assestamento"]) + ), + "settling_pressure_max_percent": safe_parse( + row.get("percentuale_massima_pressione_assestamento", + defaults["percentuale_massima_pressione_assestamento"]) + ), + "test_time": safe_parse(row.get("tempo_di_test", defaults["tempo_di_test"])), + "test_pressure_qneg": safe_parse( + row.get("pressione_di_test_delta_minimo", defaults["pressione_di_test_delta_minimo"])), + "test_pressure": safe_parse(row.get("pressione_di_test", defaults["pressione_di_test"])), + "test_pressure_qpos": safe_parse( + row.get("pressione_di_test_delta_massimo", defaults["pressione_di_test_delta_massimo"])), + "flush_time": safe_parse(row.get("tempo_svuotamento", defaults["tempo_svuotamento"])), + "flush_pressure": safe_parse(row.get("pressione_svuotamento", defaults["pressione_svuotamento"])), + "chan_sel": safe_parse(row.get("canale_di_prova", defaults["canale_di_prova"])), + "ext_flush_time": safe_parse(row.get("tempo_svuotamento_esterno", defaults["tempo_svuotamento_esterno"])), + "ext_blow_time": safe_parse(row.get("tempo_soffiaggio_esterno", defaults["tempo_soffiaggio_esterno"])), + }, + "leak_2": { + "pre_filling_time": safe_parse(row.get("tempo_pre_riempimento_2", defaults["tempo_pre_riempimento_2"])), + "pre_filling_pressure": safe_parse( + row.get("pressione_pre_riempimento_2", defaults["pressione_pre_riempimento_2"])), + "filling_time": safe_parse(row.get("tempo_riempimento_2", defaults["tempo_riempimento_2"])), + "settling_time": safe_parse(row.get("tempo_assestamento_2", defaults["tempo_assestamento_2"])), + "settling_pressure_min_percent": safe_parse( + row.get("percentuale_minima_pressione_assestamento_2", + defaults["percentuale_minima_pressione_assestamento_2"]) + ), + "settling_pressure_max_percent": safe_parse( + row.get("percentuale_massima_pressione_assestamento_2", + defaults["percentuale_massima_pressione_assestamento_2"]) + ), + "test_time": safe_parse(row.get("tempo_di_test_2", defaults["tempo_di_test_2"])), + "test_pressure_qneg": safe_parse( + row.get("pressione_di_test_delta_minimo_2", defaults["pressione_di_test_delta_minimo_2"])), + "test_pressure": safe_parse(row.get("pressione_di_test_2", defaults["pressione_di_test_2"])), + "test_pressure_qpos": safe_parse( + row.get("pressione_di_test_delta_massimo_2", defaults["pressione_di_test_delta_massimo_2"])), + "flush_time": safe_parse(row.get("tempo_svuotamento_2", defaults["tempo_svuotamento_2"])), + "flush_pressure": safe_parse(row.get("pressione_svuotamento_2", defaults["pressione_svuotamento_2"])), + "chan_sel": safe_parse(row.get("canale_di_prova_2", defaults["canale_di_prova_2"])), + "ext_flush_time": safe_parse(row.get("tempo_svuotamento_esterno_2", defaults["tempo_svuotamento_esterno"])), + "ext_blow_time": safe_parse(row.get("tempo_soffiaggio_esterno_2", defaults["tempo_soffiaggio_esterno"])), + }, + "vision": { + "recipe": row.get("ricetta_visione", defaults["ricetta_visione"]), + }, + "print": { + "template": row.get(print_template_field, defaults["modello_etichetta"]), + "labeltxt_1": row.get("testo_etich_1", ""), + "labeltxt_2": row.get("testo_etich_2", ""), + "labeltxt_3": row.get("testo_etich_3", ""), + "labeltxt_4": row.get("testo_etich_4", ""), + "labeltxt_5": row.get("barcode_input_finelinea", ""), + "extra_label": row.get("etichette_supplementari", ""), + }, + } + + # Remove unsupported steps if specified + if unsupported_steps: + for step in unsupported_steps: + steps.pop(step, None) + + return steps + + + +def import_recipes(config, csv_path=None, defaults=None, unsupported_steps=None, logger=None): + """ + Import recipes from CSV and update or create new ones in the database. + + :param config: Configuration object with recipe settings. + :param csv_path: Path to the CSV file (optional). If None, a file dialog will open. + :param defaults: Default values to use for missing fields in the CSV. + :param unsupported_steps: A list of unsupported step names to exclude. + :param logger: Logger object for logging messages (optional). + """ + if defaults is None: + defaults = config.get("recipes_defaults", lambda k: None) + + # Open file dialog if csv_path is not provided + if csv_path is None: + options = QFileDialog.Options() + options |= QFileDialog.DontUseNativeDialog + csv_path, _ = QFileDialog.getOpenFileName( + None, + "Import Recipes", + "recipes.csv", + "CSV files (*.csv);;All Files (*)", + options=options, + ) + csv_path = str(csv_path) + if not len(csv_path): + return + + if logger: + logger.info(f"Importing recipes from: {csv_path}.") + + # Get field mappings from the config + recipe_name_field = config.get("recipe", {}).get("recipe_name_field", "codice_ricetta").strip() + part_number_field = config.get("recipe", {}).get("part_number_field", "part_number").strip() + description_field = config.get("recipe", {}).get("description_field", "descrizione").strip() + barcode_enable_field = config.get( + "recipe", {} + ).get("barcode_enable_field", "verifica_codice_a_barre_abilitata").strip() + + with open(csv_path, "r", encoding="utf-8-sig") as file: + reader = csv.DictReader(file) + count = 0 + + for ucrow in reader: + # Normalize row keys to lowercase for consistency + row = dict((k.lower(), v) for k, v in ucrow.items()) + recipe_name = row.get(recipe_name_field, defaults["codice_ricetta"]) + steps_specs = read_steps(row, config, defaults=defaults, unsupported_steps=unsupported_steps) + + # Create or update recipe in the database + try: + # Try to fetch existing recipe + recipe = Recipes.get_by_id(recipe_name) + recipe_is_new = False + except Recipes.DoesNotExist: + # Create a new recipe if it doesn't exist + recipe = Recipes(name=recipe_name, part_number="TEMPORARY") + recipe_is_new = True + + # Update recipe attributes + recipe.client = row.get("cliente", defaults["cliente"]) + recipe.part_number = row.get(part_number_field, defaults["part_number"]) + recipe.description = row.get(description_field, defaults["descrizione"]) + + # Recipe specifications + steps = {} + for step_name, step_spec in steps_specs.items(): + if unsupported_steps is None or step_name not in unsupported_steps: + steps[step_name] = step_spec + + recipe.spec = { + "count": len( + row.get("dimensione_lotto_abilitata", defaults["dimensione_lotto_abilitata"])) and "count" not in ( + unsupported_steps or []), + "connector": len(row.get("verifica_connettore_abilitata", + defaults["verifica_connettore_abilitata"])) and "connector" not in ( + unsupported_steps or []), + "barcodes": len(row.get(barcode_enable_field, + defaults["verifica_codice_a_barre_abilitata"])) and "barcodes" not in ( + unsupported_steps or []), + "resistance": len(row.get("verifica_resistenza_connettore_abilitata", defaults[ + "verifica_resistenza_connettore_abilitata"])) and "resistance" not in (unsupported_steps or []), + "screws": len(row.get("avvitatura_abilitata", defaults["avvitatura_abilitata"])) and "screws" not in ( + unsupported_steps or []), + "instruction": len( + row.get("istruzione_abilitata", defaults["istruzione_abilitata"])) and "instruction" not in ( + unsupported_steps or []), + "instruction_extra": len(row.get("istruzione_abilitata_extra", defaults[ + "istruzione_abilitata_extra"])) and "instruction_extra" not in (unsupported_steps or []), + "leak_1": len( + row.get("prova_tenuta_abilitata", defaults["prova_tenuta_abilitata"])) and "leak_1" not in ( + unsupported_steps or []), + "leak_2": len( + row.get("prova_tenuta_abilitata_2", defaults["prova_tenuta_abilitata_2"])) and "leak_2" not in ( + unsupported_steps or []), + "vision": len( + row.get("test_visione_abilitato", defaults["test_visione_abilitato"])) and "vision" not in ( + unsupported_steps or []), + "print": len( + row.get("stampa_etichetta_abilitata", defaults["stampa_etichetta_abilitata"])) and "print" not in ( + unsupported_steps or []), + "steps": steps_specs, + } + + if recipe_is_new: + recipe.save(force_insert=True) # Insert new recipe + else: + recipe.save() # Update existing recipe + + count += 1 # Increment imported recipe count + + db.commit() # Commit all changes to the database + + if logger: + logger.info(f"Imported {count} recipes.") + + + +def export_recipes(config, csv_path=None, logger=None): + if csv_path is None: + options = QFileDialog.Options() + options |= QFileDialog.DontUseNativeDialog + csv_path, _ = QFileDialog.getSaveFileName( + None, + "Export Recipes", + "recipes.csv", + "CSV files (*.csv);;All Files (*)", + options=options, + ) + csv_path = str(csv_path) + if not len(csv_path): + return + + if not csv_path.lower().endswith(".csv"): + csv_path += ".csv" + os.makedirs(os.path.dirname(csv_path), exist_ok=True) + + recipe_name_field = config.get("recipe", {}).get("recipe_name_field", "codice_ricetta").strip() + barcode_enable_field = config.get("recipe", {}).get("barcode_enable_field", + "verifica_codice_a_barre_abilitata").strip() + barcode_serial_field = config.get("recipe", {}).get("barcode_serial_field", "codice_a_barre").strip() + print_template_field = config.get("recipe", {}).get("label_template_field", "modello_etichetta").strip() + data = [] + fieldnames = set() # Use a set to avoid duplicates + + # Iterate over all recipes in the database + for recipe in Recipes.select(): + steps = recipe.get_steps_map() + exportable = { + # Base fields + recipe_name_field: recipe.name, + "cliente": recipe.client, + "part_number": recipe.part_number, + } + + # Add base fields to the fieldnames + fieldnames.update([recipe_name_field, "cliente", "part_number"]) + + # Check and add steps conditionally + if "connector" in steps: + exportable.update({ + "verifica_connettore_abilitata": "x", + "connettore": steps["connector"].spec["connector"] + }) + fieldnames.update(["verifica_connettore_abilitata", "connettore"]) + + if "resistance" in steps: + exportable.update({ + "verifica_resistenza_connettore_abilitata": "x", + "scala_resistenza": steps["resistance"].spec["scale"], + "r nominale": steps["resistance"].spec["expected"], + "tolleranza_resistenza_pos": steps["resistance"].spec["tolerance_pos"], + "tolleranza_resistenza_neg": steps["resistance"].spec["tolerance_neg"], + }) + fieldnames.update(["verifica_resistenza_connettore_abilitata", "scala_resistenza", "r nominale", + "tolleranza_resistenza_pos", "tolleranza_resistenza_neg"]) + + if "barcodes" in steps: + exportable.update({ + barcode_enable_field: "x", + barcode_serial_field: steps["barcodes"].spec["serial"] + }) + fieldnames.update([barcode_enable_field, barcode_serial_field]) + + if "screws" in steps: + exportable.update({ + "avvitatura_abilitata": "x", + "viti": steps["screws"].spec["quantity"] + }) + fieldnames.update(["avvitatura_abilitata", "viti"]) + + if "leak_1" in steps: + exportable.update({ + "prova_tenuta_abilitata": "x", + "tempo_pre_riempimento": steps["leak_1"].spec["pre_filling_time"], + "pressione_pre_riempimento": steps["leak_1"].spec["pre_filling_pressure"], + "tempo_di_test": steps["leak_1"].spec["test_time"], + "pressione_di_test": steps["leak_1"].spec["test_pressure"], + }) + fieldnames.update(["prova_tenuta_abilitata", "tempo_pre_riempimento", "pressione_pre_riempimento", + "tempo_di_test", "pressione_di_test"]) + + if "leak_2" in steps: + exportable.update({ + "prova_tenuta_abilitata_2": "x", + "tempo_pre_riempimento_2": steps["leak_2"].spec["pre_filling_time"], + "pressione_pre_riempimento_2": steps["leak_2"].spec["pre_filling_pressure"], + "tempo_di_test_2": steps["leak_2"].spec["test_time"], + "pressione_di_test_2": steps["leak_2"].spec["test_pressure"], + }) + fieldnames.update(["prova_tenuta_abilitata_2", "tempo_pre_riempimento_2", "pressione_pre_riempimento_2", + "tempo_di_test_2", "pressione_di_test_2"]) + + if "vision" in steps: + exportable.update({ + "test_visione_abilitato": steps["vision"].spec.get("enabled", ""), + "ricetta_visione": steps["vision"].spec["recipe"] + }) + fieldnames.update(["test_visione_abilitato", "ricetta_visione"]) + + if "print" in steps: + exportable.update({ + "stampa_etichetta_abilitata": "x", + print_template_field: steps["print"].spec["template"], + }) + fieldnames.update(["stampa_etichetta_abilitata", print_template_field]) + + # Append the exportable row to the data + data.append(exportable) + + # Export data to CSV if there is any data + if len(data): + if logger: + logger.info(f"Exporting recipes to {csv_path}") + with open(csv_path, "w", newline="") as f: + writer = csv.DictWriter(f, fieldnames=list(fieldnames)) + writer.writeheader() + writer.writerows(data) + if logger: + logger.info(f"Exported {len(data)} recipes to {csv_path}.") + + +def backup_current_recipes(config, logger=None): + """ + Back up current recipes to a timestamped CSV file in the predefined backup directory. + """ + # Define the backup directory and file name + backup_dir = os.path.join('config', 'csv_import', 'backup_csv') + timestamp = datetime.now().strftime("%d%m%y%H%M%S") + backup_file = f"backup_{timestamp}.csv" + backup_path = os.path.join(backup_dir, backup_file) + + # Ensure the backup directory exists + os.makedirs(backup_dir, exist_ok=True) + + # Export current recipes to the backup path + export_recipes(config=config, csv_path=backup_path, logger=logger) + + if logger: + logger.info(f"Backup created at: {backup_path}") + + return backup_path # Return the backup path for reference if needed + +