remote import recipes under testing

This commit is contained in:
edo-neo 2024-12-18 14:46:25 +01:00
parent 434450656a
commit b7a09646a9
2 changed files with 69 additions and 256 deletions

View File

@ -19,6 +19,8 @@ from PyQt5.QtCore import QThread
from requests.adapters import HTTPAdapter, Retry
from urllib3.exceptions import InsecureRequestWarning
from lib.helpers.recipe_manager import import_recipes, backup_current_recipes
from .component import Component
from ui.helpers import get_main_window
# Suppress insecure request warning
@ -135,6 +137,9 @@ class ArchiveSynchronizer(Component):
return True
def parse_response_and_execute(self, response):
"""
Parse the response and execute actions based on the `ACTIONS_TO_DO` received.
"""
try:
data = response.json()
if not isinstance(data, dict):
@ -147,11 +152,53 @@ class ArchiveSynchronizer(Component):
actions = [actions]
for action in actions:
remote_path = action.get("remote_path")
local_path = action.get("local_path")
self.log.info(f"Executing remote fetch with remote_path: {remote_path} and local_path: {local_path}")
result = self.remote_fetch(remote_path=remote_path, local_path=local_path)
self.log.info(f"Remote fetch result: {result}")
action_type = action.get("action") # Determine which type of action to perform
if action_type == "import": # Handle import action
remote_path = action.get("remote_path")
if not remote_path:
self.log.warning("Import action received without a remote_path.")
continue
# Use remote_fetch to download the recipe file from the server
fetch_result = self.remote_fetch(remote_path=remote_path, local_path="tmp")
if 'downloaded_file' in fetch_result:
downloaded_file = fetch_result['downloaded_file']
self.log.info(f"Recipe file downloaded successfully to {downloaded_file}.")
# Perform the import action
try:
# Backup current recipes before importing
backup_path = backup_current_recipes(
config=self.config, # Backup configuration object
logger=self.log # Logger for backup messages
)
self.log.info(f"Backup created successfully at {backup_path}.")
# Proceed with importing recipes
import_recipes(
config=self.config,
csv_path=downloaded_file, # Use the downloaded file path
logger=self.log
)
self.log.info(f"Imported recipes successfully from {downloaded_file}.")
except Exception as e:
self.log.error(f"Failed to import recipes: {str(e)}")
continue
else:
self.log.warning(f"Failed to fetch the recipe file: {fetch_result.get('error')}.")
elif action_type == "download": # Handle fetch action
remote_path = action.get("remote_path")
local_path = action.get("local_path", "tmp") # Use "tmp" as a fallback local path
self.log.info(
f"Executing remote fetch with remote_path: {remote_path} and local_path: {local_path}")
result = self.remote_fetch(remote_path=remote_path, local_path=local_path)
self.log.info(f"Remote fetch result: {result}")
else:
self.log.warning(f"Unhandled action type: {action_type}")
except json.JSONDecodeError:
self.log.error("Failed to decode JSON response")
@ -283,8 +330,10 @@ class ArchiveSynchronizer(Component):
self.log_to_db(log_time, log_info_type, log_info)
return {"error": "Unexpected HTTP response status", "last_update_info": last_update_info}
# Make sure the local path exists
os.makedirs(local_path, exist_ok=True)
# Construct the correct file path
local_file_path = os.path.join(local_path, os.path.basename(remote_path))
with open(local_file_path, "wb") as f:
f.write(response.content)
@ -292,6 +341,7 @@ class ArchiveSynchronizer(Component):
log_info += f" - File downloaded successfully: {local_file_path}"
self.log.info(log_info)
self.log_to_db(log_time, log_info_type, log_info)
return {"downloaded_file": local_file_path, "last_update_info": last_update_info}
except requests.ConnectionError as e:

View File

@ -10,6 +10,8 @@ from PyQt5.QtCore import QTimer, pyqtSignal
from PyQt5.QtGui import QKeySequence
from PyQt5.QtWidgets import QFileDialog, QMessageBox, QShortcut
import shutil
from lib.helpers.recipe_manager import export_recipes, import_recipes
from ui.crud import Crud, Json_External_Dialog_Editor_Cell_Widget
from ui.helpers import replace_widget
from ui.recipe_spec_and_step_editor import Recipe_Spec_And_Step_Editor
@ -275,227 +277,22 @@ class Recipe_Selection(Widget):
# IMPORT RECIPES FROM CSV FILE TO DATABASE
def import_recipes(self, csv_path=None, defaults=None):
if defaults is None:
global noner
defaults = self.config.get("recipes_defaults", noner)
if csv_path is None:
options = QFileDialog.Options()
options |= QFileDialog.DontUseNativeDialog
csv_path, _ = QFileDialog.getOpenFileName(
self,
"Importazione ricette",
"ricette.csv",
"CSV data (*.csv);;All Files (*)",
options=options,
)
csv_path = str(csv_path)
if not len(csv_path):
return
self.log.info(f"recipes: importing recipes from {csv_path}")
recipe_name_field = self.config.get("recipe", {}).get("recipe_name_field", "codice_ricetta").strip()
part_number_field = self.config.get("recipe", {}).get("part_number_field", "part_number").strip()
description_field = self.config.get("recipe", {}).get("description_field", "descrizione").strip()
barcode_enable_field = self.config.get("recipe", {}).get("barcode_enable_field", "verifica_codice_a_barre_abilitata").strip()
with open(csv_path, "r", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
count = 0
for ucrow in reader:
row = dict((k.lower(), v) for k, v in ucrow.items())
recipe_name = row.get(recipe_name_field, defaults["codice_ricetta"])
steps_specs = self.read_steps(row, defaults=defaults)
# create recipe or update existing one in DB
try:
recipe = Recipes.get_by_id(recipe_name)
steps = recipe.get_steps_map()
recipe_is_new = False
except Recipes.DoesNotExist:
recipe = Recipes(name=recipe_name, part_number="TEMPORARY")
steps = {}
for step_name, step_spec in steps_specs.items():
if step_name not in self.unsupported_steps:
steps[step_name] = step_spec
recipe_is_new = True
recipe.client = row.get("cliente", defaults["cliente"])
recipe.part_number = row.get(part_number_field, defaults["part_number"])
recipe.description = row.get(description_field, defaults["descrizione"])
recipe.spec = {
"count": len(row.get("dimensione_lotto_abilitata", defaults["dimensione_lotto_abilitata"])) and "count" not in self.unsupported_steps,
"connector": len(
row.get("verifica_connettore_abilitata", defaults["verifica_connettore_abilitata"])) and "connector" not in self.unsupported_steps,
"barcodes": len(row.get(barcode_enable_field, defaults["verifica_codice_a_barre_abilitata"])) and "barcodes" not in self.unsupported_steps,
"resistance": len(row.get("verifica_resistenza_connettore_abilitata",
defaults["verifica_resistenza_connettore_abilitata"])) and "resistance" not in self.unsupported_steps,
"screws": len(row.get("avvitatura_abilitata", defaults["avvitatura_abilitata"])) and "screws" not in self.unsupported_steps,
"instruction": len(row.get("istruzione_abilitata", defaults["istruzione_abilitata"])) and "instruction" not in self.unsupported_steps,
"instruction_extra": len(row.get("istruzione_abilitata_extra", defaults["istruzione_abilitata_extra"])) and "instruction_extra" not in self.unsupported_steps,
"leak_1": len(row.get("prova_tenuta_abilitata", defaults["prova_tenuta_abilitata"])) and "leak_1" not in self.unsupported_steps,
"leak_2": len(row.get("prova_tenuta_abilitata_2", defaults["prova_tenuta_abilitata_2"])) and "leak_2" not in self.unsupported_steps,
"vision": len(row.get("test_visione_abilitato", defaults["test_visione_abilitato"])) and "vision" not in self.unsupported_steps,
"print": len(row.get("stampa_etichetta_abilitata", defaults["stampa_etichetta_abilitata"])) and "print" not in self.unsupported_steps,
"steps": steps,
}
recipe.spec["steps"]=steps_specs
if recipe_is_new:
recipe.save(force_insert=True)
else:
recipe.save()
count += 1
db.commit()
self.log.info(f"recipes: imported {count} rows.")
self.crud.refresh()
import_recipes(
config=self.config,
csv_path=csv_path,
defaults=defaults,
unsupported_steps=self.unsupported_steps,
logger=self.log,
)
# EXPORT RECIPES TABLE TO CSV FILE
def export_recipes(self, csv_path=None):
if csv_path is None:
options = QFileDialog.Options()
options |= QFileDialog.DontUseNativeDialog
csv_path, _ = QFileDialog.getSaveFileName(
self,
"Esportazione ricette",
"ricette.csv",
"CSV data (*.csv);;All Files (*)",
options=options,
)
csv_path = str(csv_path)
if not len(csv_path):
return
if not csv_path.lower().endswith(".csv"):
csv_path += ".csv"
csv_dir = os.path.dirname(csv_path)
if len(csv_dir):
os.makedirs(csv_dir, exist_ok=True)
recipe_name_field = self.config.get("recipe", {}).get("recipe_name_field", "codice_ricetta").strip()
barcode_enable_field = self.config.get("recipe", {}).get("barcode_enable_field", "verifica_codice_a_barre_abilitata").strip()
barcode_serial_field = self.config.get("recipe", {}).get("barcode_serial_field", "codice_a_barre").strip()
print_template_field = self.config.get("recipe", {}).get("label_template_field", "modello_etichetta").strip()
data = []
fieldnames = set() # Use a set to avoid duplicates
for recipe in list(Recipes.select()):
steps = recipe.get_steps_map()
exportable = {
# BASE SECTION
recipe_name_field: recipe.name,
"cliente": recipe.client,
"part_number": recipe.part_number,
}
export_recipes(
config=self.config,
csv_path=csv_path,
logger=self.log,
)
# Add base fields to the fieldnames
fieldnames.update([recipe_name_field, "cliente", "part_number"])
# Check and add fields conditionally for each section
if "connector" in steps:
exportable.update({
"verifica_connettore_abilitata": "x",
"connettore": steps["connector"].spec["connector"]
})
fieldnames.update(["verifica_connettore_abilitata", "connettore"])
if "resistance" in steps:
exportable.update({
"verifica_resistenza_connettore_abilitata": "x",
"scala_resistenza": steps["resistance"].spec["scale"],
"r nominale": steps["resistance"].spec["expected"],
"tolleranza_resistenza_pos": steps["resistance"].spec["tolerance_pos"],
"tolleranza_resistenza_neg": steps["resistance"].spec["tolerance_neg"]
})
fieldnames.update(["verifica_resistenza_connettore_abilitata", "scala_resistenza", "r nominale",
"tolleranza_resistenza_pos", "tolleranza_resistenza_neg"])
if "barcodes" in steps:
exportable.update({
barcode_enable_field: "x",
barcode_serial_field: steps["barcodes"].spec["serial"]
})
fieldnames.update([barcode_enable_field, barcode_serial_field])
if recipe.spec.get("steps", {}).get("screws") and "screws" in steps:
exportable.update({
"avvitatura_abilitata": "x",
"viti": steps["screws"].spec["quantity"]
})
fieldnames.update(["avvitatura_abilitata", "viti"])
if "leak_1" in steps:
exportable.update({
"prova_tenuta_abilitata": "x",
"tempo_pre_riempimento": steps["leak_1"].spec["pre_filling_time"],
"pressione_pre_riempimento": steps["leak_1"].spec["pre_filling_pressure"],
"tempo_riempimento": steps["leak_1"].spec["filling_time"],
"tempo_assestamento": steps["leak_1"].spec["settling_time"],
"percentuale_minima_pressione_assestamento": steps["leak_1"].spec["settling_pressure_min_percent"],
"percentuale_massima_pressione_assestamento": steps["leak_1"].spec["settling_pressure_max_percent"],
"tempo_di_test": steps["leak_1"].spec["test_time"],
"pressione_di_test_delta_minimo": steps["leak_1"].spec["test_pressure_qneg"],
"pressione_di_test": steps["leak_1"].spec["test_pressure"],
"pressione_di_test_delta_massimo": steps["leak_1"].spec["test_pressure_qpos"],
"tempo_svuotamento": steps["leak_1"].spec["flush_time"],
"pressione_svuotamento": steps["leak_1"].spec["flush_pressure"],
})
fieldnames.update(["prova_tenuta_abilitata", "tempo_pre_riempimento", "pressione_pre_riempimento",
"tempo_riempimento", "tempo_assestamento",
"percentuale_minima_pressione_assestamento",
"percentuale_massima_pressione_assestamento", "tempo_di_test",
"pressione_di_test_delta_minimo",
"pressione_di_test", "pressione_di_test_delta_massimo", "tempo_svuotamento",
"pressione_svuotamento"])
if "leak_2" in steps:
exportable.update({
"prova_tenuta_abilitata_2": "x",
"tempo_pre_riempimento_2": steps["leak_2"].spec["pre_filling_time"],
"pressione_pre_riempimento_2": steps["leak_2"].spec["pre_filling_pressure"],
"tempo_riempimento_2": steps["leak_2"].spec["filling_time"],
"tempo_assestamento_2": steps["leak_2"].spec["settling_time"],
"percentuale_minima_pressione_assestamento_2": steps["leak_2"].spec[
"settling_pressure_min_percent"],
"percentuale_massima_pressione_assestamento_2": steps["leak_2"].spec[
"settling_pressure_max_percent"],
"tempo_di_test_2": steps["leak_2"].spec["test_time"],
"pressione_di_test_delta_minimo_2": steps["leak_2"].spec["test_pressure_qneg"],
"pressione_di_test_2": steps["leak_2"].spec["test_pressure"],
"pressione_di_test_delta_massimo_2": steps["leak_2"].spec["test_pressure_qpos"],
"tempo_svuotamento_2": steps["leak_2"].spec["flush_time"],
"pressione_svuotamento_2": steps["leak_2"].spec["flush_pressure"],
})
fieldnames.update(["prova_tenuta_abilitata_2", "tempo_pre_riempimento_2", "pressione_pre_riempimento_2",
"tempo_riempimento_2", "tempo_assestamento_2",
"percentuale_minima_pressione_assestamento_2",
"percentuale_massima_pressione_assestamento_2", "tempo_di_test_2",
"pressione_di_test_delta_minimo_2", "pressione_di_test_2",
"pressione_di_test_delta_massimo_2",
"tempo_svuotamento_2", "pressione_svuotamento_2"])
if "vision" in steps:
exportable.update({
"test_visione_abilitato": recipe.spec["vision"],
"ricetta_visione": steps["vision"].spec["recipe"]
})
fieldnames.update(["test_visione_abilitato", "ricetta_visione"])
if "print" in steps:
exportable.update({
"stampa_etichetta_abilitata": "x",
print_template_field: steps["print"].spec["template"],
"etichette_supplementari": steps["print"].spec["extra_label"]
})
fieldnames.update(["stampa_etichetta_abilitata", print_template_field, "etichette_supplementari"])
# Append the exportable dictionary to the data list
data.append(exportable)
# Convert the set to a list for CSV writing
fieldnames = list(fieldnames)
# Export to CSV if there is data
if len(data):
self.log.info(f"recipes: exporting recipes to {csv_path}")
with open(csv_path, "w", newline="") as f:
w = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
w.writeheader()
w.writerows(data)
self.log.info(f"recipes: exported {len(data)} rows.")
def delete_recipes(self):
ret = QMessageBox.warning(
None,
@ -507,40 +304,6 @@ class Recipe_Selection(Widget):
if ret == QMessageBox.Ok:
Recipes.delete().execute()
self.crud.refresh()
def backup_current_recipes(self):
# Define the backup directory and file name
backup_dir = os.path.join('config', 'csv_import', 'backup_csv')
timestamp = datetime.now().strftime("%d%m%y%H%M%S")
backup_file = f"backup_{timestamp}.csv"
backup_path = os.path.join(backup_dir, backup_file)
# Ensure the backup directory exists
os.makedirs(backup_dir, exist_ok=True)
# Export current recipes to backup file
self.export_recipes(csv_path=backup_path)
def move_imported_csv(self, csv_path):
# Move the imported CSV to the 'imported_csv' directory
imported_dir = os.path.join('config', 'csv_import', 'imported_csv')
os.makedirs(imported_dir, exist_ok=True)
imported_path = os.path.join(imported_dir, os.path.basename(csv_path))
shutil.move(csv_path, imported_path)
self.log.info(f"Imported CSV moved to {imported_path}")
return imported_path
def check_and_import_auto_csv(self):
# Define the directory to check
auto_import_dir = os.path.join('config', 'csv_import', 'auto_csv_import')
# Check if the directory exists and is not empty
if os.path.exists(auto_import_dir) and os.listdir(auto_import_dir):
# Perform backup
self.backup_current_recipes()
# Move and import each CSV file in the directory
for csv_file in os.listdir(auto_import_dir):
csv_path = os.path.join(auto_import_dir, csv_file)
if os.path.isfile(csv_path) and csv_file.endswith(".csv"):
self.import_recipes(csv_path=csv_path)
self.move_imported_csv(csv_path)