This commit is contained in:
edo-neo 2024-12-24 11:44:57 +01:00
parent e827089e31
commit 533d010881

View File

@ -0,0 +1,405 @@
import os
import csv
import locale
from datetime import datetime
import shutil
from PyQt5.QtWidgets import QFileDialog
from lib.db import Recipes, db # Assuming these are part of your project structure
def read_steps(row, config, defaults=None, unsupported_steps=None):
if defaults is None:
defaults = config.get("recipes_defaults", lambda k: None)
# Configurable fields from the config object
barcode_serial_field = config.get("recipe", {}).get("barcode_serial_field", "codice_a_barre").strip()
warning_image_field = config.get("recipe", {}).get("warning_image_field", "warning_img").strip()
print_template_field = config.get("recipe", {}).get("label_template_field", "modello_etichetta").strip()
decsep = locale.localeconv()["decimal_point"]
# Extract and clean "r nominale" value
rcsv = (
row.get("r nominale", defaults["r nominale"])
.replace(" ", "").replace(",", decsep).replace("Ω", "").replace("?", "")
)
if rcsv == "":
rcsv = "999" # Default fallback for "r nominale" if empty
# Helper functions
def get_default_value(field, key):
value = field.get(key, defaults[key])
return value if value != "" else defaults[key]
def safe_parse(value):
try:
return int(float(value))
except ValueError:
return 0 # Default to 0 if parsing fails
# Define the steps dictionary
steps = {
"count": {
"amount": row.get("dimensione_lotto", defaults["dimensione_lotto"]),
"warning_img": row.get(warning_image_field, defaults["warning_img"]),
"require_discard_piece": row.get("richiedi_inserimento_scarto", defaults["richiedi_inserimento_scarto"]),
},
"connector": {
"connector": row.get("connettore", defaults["connettore"]),
},
"barcodes": {
"serial": row.get(barcode_serial_field, defaults["codice_a_barre"]),
"n_pieces": row.get("n_componenti", defaults["n_componenti"]),
"barcode_input_2": row.get("barcode_input_2", "-"),
"barcode_input_3": row.get("barcode_input_3", "-"),
"barcode_input_4": row.get("barcode_input_4", "-"),
"barcode_input_5": row.get("barcode_input_5", "-"),
},
"resistance": {
"scale": locale.atof(row.get("scala_resistenza", defaults["scala_resistenza"])),
"expected": locale.atof(rcsv),
"tolerance_pos": locale.atof(get_default_value(row, "tolleranza_resistenza_pos")),
"tolerance_neg": locale.atof(get_default_value(row, "tolleranza_resistenza_neg")),
},
"screws": {
"quantity": row.get("viti", defaults["viti"]),
},
"instruction": {}, # Empty placeholder for future extensions
"leak_1": {
"pre_filling_time": safe_parse(row.get("tempo_pre_riempimento", defaults["tempo_pre_riempimento"])),
"pre_filling_pressure": safe_parse(
row.get("pressione_pre_riempimento", defaults["pressione_pre_riempimento"])),
"filling_time": safe_parse(row.get("tempo_riempimento", defaults["tempo_riempimento"])),
"settling_time": safe_parse(get_default_value(row, "tempo_assestamento")),
"settling_pressure_min_percent": safe_parse(
row.get("percentuale_minima_pressione_assestamento",
defaults["percentuale_minima_pressione_assestamento"])
),
"settling_pressure_max_percent": safe_parse(
row.get("percentuale_massima_pressione_assestamento",
defaults["percentuale_massima_pressione_assestamento"])
),
"test_time": safe_parse(row.get("tempo_di_test", defaults["tempo_di_test"])),
"test_pressure_qneg": safe_parse(
row.get("pressione_di_test_delta_minimo", defaults["pressione_di_test_delta_minimo"])),
"test_pressure": safe_parse(row.get("pressione_di_test", defaults["pressione_di_test"])),
"test_pressure_qpos": safe_parse(
row.get("pressione_di_test_delta_massimo", defaults["pressione_di_test_delta_massimo"])),
"flush_time": safe_parse(row.get("tempo_svuotamento", defaults["tempo_svuotamento"])),
"flush_pressure": safe_parse(row.get("pressione_svuotamento", defaults["pressione_svuotamento"])),
"chan_sel": safe_parse(row.get("canale_di_prova", defaults["canale_di_prova"])),
"ext_flush_time": safe_parse(row.get("tempo_svuotamento_esterno", defaults["tempo_svuotamento_esterno"])),
"ext_blow_time": safe_parse(row.get("tempo_soffiaggio_esterno", defaults["tempo_soffiaggio_esterno"])),
},
"leak_2": {
"pre_filling_time": safe_parse(row.get("tempo_pre_riempimento_2", defaults["tempo_pre_riempimento_2"])),
"pre_filling_pressure": safe_parse(
row.get("pressione_pre_riempimento_2", defaults["pressione_pre_riempimento_2"])),
"filling_time": safe_parse(row.get("tempo_riempimento_2", defaults["tempo_riempimento_2"])),
"settling_time": safe_parse(row.get("tempo_assestamento_2", defaults["tempo_assestamento_2"])),
"settling_pressure_min_percent": safe_parse(
row.get("percentuale_minima_pressione_assestamento_2",
defaults["percentuale_minima_pressione_assestamento_2"])
),
"settling_pressure_max_percent": safe_parse(
row.get("percentuale_massima_pressione_assestamento_2",
defaults["percentuale_massima_pressione_assestamento_2"])
),
"test_time": safe_parse(row.get("tempo_di_test_2", defaults["tempo_di_test_2"])),
"test_pressure_qneg": safe_parse(
row.get("pressione_di_test_delta_minimo_2", defaults["pressione_di_test_delta_minimo_2"])),
"test_pressure": safe_parse(row.get("pressione_di_test_2", defaults["pressione_di_test_2"])),
"test_pressure_qpos": safe_parse(
row.get("pressione_di_test_delta_massimo_2", defaults["pressione_di_test_delta_massimo_2"])),
"flush_time": safe_parse(row.get("tempo_svuotamento_2", defaults["tempo_svuotamento_2"])),
"flush_pressure": safe_parse(row.get("pressione_svuotamento_2", defaults["pressione_svuotamento_2"])),
"chan_sel": safe_parse(row.get("canale_di_prova_2", defaults["canale_di_prova_2"])),
"ext_flush_time": safe_parse(row.get("tempo_svuotamento_esterno_2", defaults["tempo_svuotamento_esterno"])),
"ext_blow_time": safe_parse(row.get("tempo_soffiaggio_esterno_2", defaults["tempo_soffiaggio_esterno"])),
},
"vision": {
"recipe": row.get("ricetta_visione", defaults["ricetta_visione"]),
},
"print": {
"template": row.get(print_template_field, defaults["modello_etichetta"]),
"labeltxt_1": row.get("testo_etich_1", ""),
"labeltxt_2": row.get("testo_etich_2", ""),
"labeltxt_3": row.get("testo_etich_3", ""),
"labeltxt_4": row.get("testo_etich_4", ""),
"labeltxt_5": row.get("barcode_input_finelinea", ""),
"extra_label": row.get("etichette_supplementari", ""),
},
}
# Remove unsupported steps if specified
if unsupported_steps:
for step in unsupported_steps:
steps.pop(step, None)
return steps
def import_recipes(config, csv_path=None, defaults=None, unsupported_steps=None, logger=None):
"""
Import recipes from CSV and update or create new ones in the database.
:param config: Configuration object with recipe settings.
:param csv_path: Path to the CSV file (optional). If None, a file dialog will open.
:param defaults: Default values to use for missing fields in the CSV.
:param unsupported_steps: A list of unsupported step names to exclude.
:param logger: Logger object for logging messages (optional).
"""
if defaults is None:
defaults = config.get("recipes_defaults", lambda k: None)
# Open file dialog if csv_path is not provided
if csv_path is None:
options = QFileDialog.Options()
options |= QFileDialog.DontUseNativeDialog
csv_path, _ = QFileDialog.getOpenFileName(
None,
"Import Recipes",
"recipes.csv",
"CSV files (*.csv);;All Files (*)",
options=options,
)
csv_path = str(csv_path)
if not len(csv_path):
return
if logger:
logger.info(f"Importing recipes from: {csv_path}.")
# Get field mappings from the config
recipe_name_field = config.get("recipe", {}).get("recipe_name_field", "codice_ricetta").strip()
part_number_field = config.get("recipe", {}).get("part_number_field", "part_number").strip()
description_field = config.get("recipe", {}).get("description_field", "descrizione").strip()
barcode_enable_field = config.get(
"recipe", {}
).get("barcode_enable_field", "verifica_codice_a_barre_abilitata").strip()
with open(csv_path, "r", encoding="utf-8-sig") as file:
reader = csv.DictReader(file)
count = 0
for ucrow in reader:
# Normalize row keys to lowercase for consistency
row = dict((k.lower(), v) for k, v in ucrow.items())
recipe_name = row.get(recipe_name_field, defaults["codice_ricetta"])
steps_specs = read_steps(row, config, defaults=defaults, unsupported_steps=unsupported_steps)
# Create or update recipe in the database
try:
# Try to fetch existing recipe
recipe = Recipes.get_by_id(recipe_name)
recipe_is_new = False
except Recipes.DoesNotExist:
# Create a new recipe if it doesn't exist
recipe = Recipes(name=recipe_name, part_number="TEMPORARY")
recipe_is_new = True
# Update recipe attributes
recipe.client = row.get("cliente", defaults["cliente"])
recipe.part_number = row.get(part_number_field, defaults["part_number"])
recipe.description = row.get(description_field, defaults["descrizione"])
# Recipe specifications
steps = {}
for step_name, step_spec in steps_specs.items():
if unsupported_steps is None or step_name not in unsupported_steps:
steps[step_name] = step_spec
recipe.spec = {
"count": len(
row.get("dimensione_lotto_abilitata", defaults["dimensione_lotto_abilitata"])) and "count" not in (
unsupported_steps or []),
"connector": len(row.get("verifica_connettore_abilitata",
defaults["verifica_connettore_abilitata"])) and "connector" not in (
unsupported_steps or []),
"barcodes": len(row.get(barcode_enable_field,
defaults["verifica_codice_a_barre_abilitata"])) and "barcodes" not in (
unsupported_steps or []),
"resistance": len(row.get("verifica_resistenza_connettore_abilitata", defaults[
"verifica_resistenza_connettore_abilitata"])) and "resistance" not in (unsupported_steps or []),
"screws": len(row.get("avvitatura_abilitata", defaults["avvitatura_abilitata"])) and "screws" not in (
unsupported_steps or []),
"instruction": len(
row.get("istruzione_abilitata", defaults["istruzione_abilitata"])) and "instruction" not in (
unsupported_steps or []),
"instruction_extra": len(row.get("istruzione_abilitata_extra", defaults[
"istruzione_abilitata_extra"])) and "instruction_extra" not in (unsupported_steps or []),
"leak_1": len(
row.get("prova_tenuta_abilitata", defaults["prova_tenuta_abilitata"])) and "leak_1" not in (
unsupported_steps or []),
"leak_2": len(
row.get("prova_tenuta_abilitata_2", defaults["prova_tenuta_abilitata_2"])) and "leak_2" not in (
unsupported_steps or []),
"vision": len(
row.get("test_visione_abilitato", defaults["test_visione_abilitato"])) and "vision" not in (
unsupported_steps or []),
"print": len(
row.get("stampa_etichetta_abilitata", defaults["stampa_etichetta_abilitata"])) and "print" not in (
unsupported_steps or []),
"steps": steps_specs,
}
if recipe_is_new:
recipe.save(force_insert=True) # Insert new recipe
else:
recipe.save() # Update existing recipe
count += 1 # Increment imported recipe count
db.commit() # Commit all changes to the database
if logger:
logger.info(f"Imported {count} recipes.")
def export_recipes(config, csv_path=None, logger=None):
if csv_path is None:
options = QFileDialog.Options()
options |= QFileDialog.DontUseNativeDialog
csv_path, _ = QFileDialog.getSaveFileName(
None,
"Export Recipes",
"recipes.csv",
"CSV files (*.csv);;All Files (*)",
options=options,
)
csv_path = str(csv_path)
if not len(csv_path):
return
if not csv_path.lower().endswith(".csv"):
csv_path += ".csv"
os.makedirs(os.path.dirname(csv_path), exist_ok=True)
recipe_name_field = config.get("recipe", {}).get("recipe_name_field", "codice_ricetta").strip()
barcode_enable_field = config.get("recipe", {}).get("barcode_enable_field",
"verifica_codice_a_barre_abilitata").strip()
barcode_serial_field = config.get("recipe", {}).get("barcode_serial_field", "codice_a_barre").strip()
print_template_field = config.get("recipe", {}).get("label_template_field", "modello_etichetta").strip()
data = []
fieldnames = set() # Use a set to avoid duplicates
# Iterate over all recipes in the database
for recipe in Recipes.select():
steps = recipe.get_steps_map()
exportable = {
# Base fields
recipe_name_field: recipe.name,
"cliente": recipe.client,
"part_number": recipe.part_number,
}
# Add base fields to the fieldnames
fieldnames.update([recipe_name_field, "cliente", "part_number"])
# Check and add steps conditionally
if "connector" in steps:
exportable.update({
"verifica_connettore_abilitata": "x",
"connettore": steps["connector"].spec["connector"]
})
fieldnames.update(["verifica_connettore_abilitata", "connettore"])
if "resistance" in steps:
exportable.update({
"verifica_resistenza_connettore_abilitata": "x",
"scala_resistenza": steps["resistance"].spec["scale"],
"r nominale": steps["resistance"].spec["expected"],
"tolleranza_resistenza_pos": steps["resistance"].spec["tolerance_pos"],
"tolleranza_resistenza_neg": steps["resistance"].spec["tolerance_neg"],
})
fieldnames.update(["verifica_resistenza_connettore_abilitata", "scala_resistenza", "r nominale",
"tolleranza_resistenza_pos", "tolleranza_resistenza_neg"])
if "barcodes" in steps:
exportable.update({
barcode_enable_field: "x",
barcode_serial_field: steps["barcodes"].spec["serial"]
})
fieldnames.update([barcode_enable_field, barcode_serial_field])
if "screws" in steps:
exportable.update({
"avvitatura_abilitata": "x",
"viti": steps["screws"].spec["quantity"]
})
fieldnames.update(["avvitatura_abilitata", "viti"])
if "leak_1" in steps:
exportable.update({
"prova_tenuta_abilitata": "x",
"tempo_pre_riempimento": steps["leak_1"].spec["pre_filling_time"],
"pressione_pre_riempimento": steps["leak_1"].spec["pre_filling_pressure"],
"tempo_di_test": steps["leak_1"].spec["test_time"],
"pressione_di_test": steps["leak_1"].spec["test_pressure"],
})
fieldnames.update(["prova_tenuta_abilitata", "tempo_pre_riempimento", "pressione_pre_riempimento",
"tempo_di_test", "pressione_di_test"])
if "leak_2" in steps:
exportable.update({
"prova_tenuta_abilitata_2": "x",
"tempo_pre_riempimento_2": steps["leak_2"].spec["pre_filling_time"],
"pressione_pre_riempimento_2": steps["leak_2"].spec["pre_filling_pressure"],
"tempo_di_test_2": steps["leak_2"].spec["test_time"],
"pressione_di_test_2": steps["leak_2"].spec["test_pressure"],
})
fieldnames.update(["prova_tenuta_abilitata_2", "tempo_pre_riempimento_2", "pressione_pre_riempimento_2",
"tempo_di_test_2", "pressione_di_test_2"])
if "vision" in steps:
exportable.update({
"test_visione_abilitato": steps["vision"].spec.get("enabled", ""),
"ricetta_visione": steps["vision"].spec["recipe"]
})
fieldnames.update(["test_visione_abilitato", "ricetta_visione"])
if "print" in steps:
exportable.update({
"stampa_etichetta_abilitata": "x",
print_template_field: steps["print"].spec["template"],
})
fieldnames.update(["stampa_etichetta_abilitata", print_template_field])
# Append the exportable row to the data
data.append(exportable)
# Export data to CSV if there is any data
if len(data):
if logger:
logger.info(f"Exporting recipes to {csv_path}")
with open(csv_path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=list(fieldnames))
writer.writeheader()
writer.writerows(data)
if logger:
logger.info(f"Exported {len(data)} recipes to {csv_path}.")
def backup_current_recipes(config, logger=None):
"""
Back up current recipes to a timestamped CSV file in the predefined backup directory.
"""
# Define the backup directory and file name
backup_dir = os.path.join('config', 'csv_import', 'backup_csv')
timestamp = datetime.now().strftime("%d%m%y%H%M%S")
backup_file = f"backup_{timestamp}.csv"
backup_path = os.path.join(backup_dir, backup_file)
# Ensure the backup directory exists
os.makedirs(backup_dir, exist_ok=True)
# Export current recipes to the backup path
export_recipes(config=config, csv_path=backup_path, logger=logger)
if logger:
logger.info(f"Backup created at: {backup_path}")
return backup_path # Return the backup path for reference if needed