From e73cf12878f48dbb0b52d814a99b7f3520aebe83 Mon Sep 17 00:00:00 2001 From: edo-neo Date: Thu, 21 Aug 2025 10:39:54 +0200 Subject: [PATCH] dev --- src/components/hikrobot_sc/hikrobot_sc.py | 81 +++++++++----- src/ui/test_vision/test_vision.py | 127 ++++++---------------- 2 files changed, 84 insertions(+), 124 deletions(-) diff --git a/src/components/hikrobot_sc/hikrobot_sc.py b/src/components/hikrobot_sc/hikrobot_sc.py index b9f9107..fbef493 100644 --- a/src/components/hikrobot_sc/hikrobot_sc.py +++ b/src/components/hikrobot_sc/hikrobot_sc.py @@ -544,6 +544,57 @@ class HikrobotSmartCamera(Component): self.log.error(f"Error getting current scheme: {e}") return None + def switch_scheme_sync(self, solution_name, timeout=30): + """ + Synchronously switch to a different scheme/solution and wait for completion. + + Args: + solution_name: The name of the solution to switch to + timeout: Maximum time to wait for the switch to complete (in seconds) + + Returns: + bool: True if successful, False otherwise + """ + if not self.connected: + self.log.error("Cannot switch scheme: Camera not connected") + return False + + # Get current scheme for comparison + current_scheme = self.get_current_scheme() + if current_scheme == solution_name: + self.log.info(f"Already using scheme: {solution_name}") + return True + + # Start the scheme switching process + success = self.switch_scheme(solution_name) + if not success: + self.log.error(f"Failed to initiate scheme switch to: {solution_name}") + return False + + # Wait for the scheme switching to complete + self.log.info(f"Waiting for scheme switch to complete (timeout: {timeout}s)") + + start_time = time.time() + while self.current_operation == "switch_scheme": + # Check if we've exceeded the timeout + if time.time() - start_time > timeout: + self.log.error(f"Scheme switching timed out after {timeout} seconds") + self.progress_timer.stop() + self.current_operation = None + return False + + # Sleep a bit to avoid busy waiting + time.sleep(0.5) + + # Verify the switch was successful by checking the current scheme + new_scheme = self.get_current_scheme() + if new_scheme == solution_name: + self.log.info(f"Successfully switched to scheme: {solution_name}") + return True + else: + self.log.warning(f"Scheme switch verification failed. Expected: {solution_name}, Got: {new_scheme}") + return False + def test_scheme_switch(self, solution_name=None): """ Test method to manually trigger a scheme switch. @@ -563,12 +614,6 @@ class HikrobotSmartCamera(Component): self.log.info(f"Testing scheme switch to: {solution_name}") - # Get current scheme for comparison - current_scheme = self.get_current_scheme() - if current_scheme == solution_name: - self.log.info(f"Already using scheme: {solution_name}") - return True - if not self.connected: self.log.info("Camera not connected, connecting first...") self.config_changed() @@ -576,25 +621,5 @@ class HikrobotSmartCamera(Component): self.log.error("Failed to connect to camera") return False - # Stop any ongoing operation - if self.current_operation: - self.log.warning(f"Stopping ongoing operation: {self.current_operation}") - self.progress_timer.stop() - self.current_operation = None - - # Try to switch scheme - success = self.switch_scheme(solution_name) - - # Verify the switch was successful - if success: - # Wait a bit for the switch to complete - time.sleep(2) - new_scheme = self.get_current_scheme() - if new_scheme == solution_name: - self.log.info(f"Successfully switched to scheme: {solution_name}") - return True - else: - self.log.warning(f"Scheme switch command succeeded but verification failed. Current scheme: {new_scheme}") - return False - - return success + # Use the synchronous scheme switching method + return self.switch_scheme_sync(solution_name) diff --git a/src/ui/test_vision/test_vision.py b/src/ui/test_vision/test_vision.py index 9a4cd01..db52515 100644 --- a/src/ui/test_vision/test_vision.py +++ b/src/ui/test_vision/test_vision.py @@ -1,59 +1,15 @@ import sys import logging -from PyQt5.QtCore import pyqtSignal, QTimer +from PyQt5.QtCore import pyqtSignal from PyQt5.QtGui import QColor, QImage, QPalette, QPixmap -from PyQt5.QtWidgets import QHeaderView, QProgressBar, QTableWidgetItem, QDialog, QVBoxLayout, QLabel, QProgressBar +from PyQt5.QtWidgets import QHeaderView, QProgressBar, QTableWidgetItem from src.lib.helpers.blocking_dialog import BlockingDialog from src.ui.helpers import calc_foreground_color from src.ui.test_test import Test_Test -class SchemeProgressDialog(QDialog): - """ - Dialog to show scheme switching progress and block the UI until complete. - """ - def __init__(self, parent=None): - super().__init__(parent) - self.setWindowTitle("Switching Camera Scheme") - self.setModal(True) - self.setMinimumWidth(400) - - layout = QVBoxLayout() - self.setLayout(layout) - - self.label = QLabel("Waiting for camera scheme to switch. Please wait...") - layout.addWidget(self.label) - - self.progress_bar = QProgressBar() - self.progress_bar.setMinimum(0) - self.progress_bar.setMaximum(100) - layout.addWidget(self.progress_bar) - - self.status_label = QLabel("Status: Initializing...") - layout.addWidget(self.status_label) - - # Set a timeout in case the progress signal doesn't complete - self.timer = QTimer(self) - self.timer.setSingleShot(True) - self.timer.timeout.connect(self.handle_timeout) - self.timer.start(30000) # 30 second timeout - - def update_progress(self, progress, status): - self.progress_bar.setValue(progress) - self.status_label.setText(f"Status: {status}") - - if progress >= 100 and status == "Success": - self.accept() - elif status == "Failed": - self.reject() - - def handle_timeout(self): - self.status_label.setText("Status: Timeout - Continuing anyway") - self.accept() # Continue with the cycle even if timeout occurs - - class Test_Vision(Test_Test): request_frame = pyqtSignal() @@ -94,45 +50,20 @@ class Test_Vision(Test_Test): dialog = BlockingDialog(self.ui,wait_time=wait_time, message=f"COLLEGARE CAVO ETHERNET E ATTENDERE {wait_time} SECONDI PER IL CORRETTO AVVIO DELLE TELECAMERE") dialog.exec_() - # Always check if we need to wait for scheme switching to complete - # This ensures we don't proceed until the camera is ready with the correct scheme + # Check if we need to switch the camera scheme self.log.info("Checking if camera scheme needs to be switched...") - # Get the current scheme name from the camera - current_scheme = self.components[cam_type].get_current_scheme() + # Get the target scheme from the recipe name target_scheme = self.step.spec.get("recipe", "").replace(".ini", "") - self.log.info(f"Current camera scheme: {current_scheme}, Target scheme: {target_scheme}") + # Use the synchronous scheme switching method + self.log.info(f"Switching camera scheme to: {target_scheme}") + success = self.components[cam_type].switch_scheme_sync(target_scheme, timeout=30) - # If schemes don't match or a scheme switch is already in progress, wait for it - if current_scheme != target_scheme or self.components[cam_type].current_operation == "switch_scheme": - self.log.info(f"Waiting for camera to switch to scheme: {target_scheme}") - - # Create and show the progress dialog - progress_dialog = SchemeProgressDialog(self) - progress_dialog.label.setText(f"Switching camera to scheme: {target_scheme}\nPlease wait...") - - # Connect to the progress signal - progress_connection = self.components[cam_type].progress_signal.connect( - progress_dialog.update_progress - ) - - # If a scheme switch is not already in progress, start one - if self.components[cam_type].current_operation != "switch_scheme": - self.log.info(f"Initiating scheme switch to: {target_scheme}") - self.components[cam_type].switch_scheme(target_scheme) - - # Show the dialog and wait for it to complete - result = progress_dialog.exec_() - - # Disconnect from the progress signal - self.components[cam_type].progress_signal.disconnect(progress_connection) - - if result == QDialog.Rejected: - # Scheme switching failed, but we'll continue anyway - self.log.warning("Scheme switching failed, continuing with cycle") + if success: + self.log.info(f"Successfully switched to scheme: {target_scheme}") else: - self.log.info(f"Camera already using correct scheme: {current_scheme}") + self.log.warning(f"Failed to switch to scheme: {target_scheme}, continuing anyway") self.components[cam_type].resume() else: @@ -326,7 +257,7 @@ class Test_Vision(Test_Test): def test_scheme_switching(self): """ - Test method to verify that the scheme switching wait mechanism works correctly. + Test method to verify that the scheme switching mechanism works correctly. This can be called manually for testing purposes. """ if "vision" not in self.components or "hikrobot_sc" not in self.components: @@ -337,24 +268,28 @@ class Test_Vision(Test_Test): current_scheme = self.components["hikrobot_sc"].get_current_scheme() self.log.info(f"Current scheme: {current_scheme}") - # Set a test recipe to trigger scheme switching - test_recipe = "test_absence" # Use an existing recipe name - self.log.info(f"Setting test recipe: {test_recipe}") - self.components["vision"].set_recipe(test_recipe) + # Choose a test recipe different from the current scheme + test_recipes = ["test_absence", "test_presence"] + test_recipe = test_recipes[1] if current_scheme == test_recipes[0] else test_recipes[0] - # Wait for the scheme switching dialog to appear and complete - self.log.info("Waiting for scheme switching to complete...") + self.log.info(f"Testing scheme switch to: {test_recipe}") - # The dialog should appear automatically in the start() method - # when the hikrobot_sc component's current_operation is "switch_scheme" + # Use the synchronous scheme switching method directly + success = self.components["hikrobot_sc"].switch_scheme_sync(test_recipe, timeout=30) - # Verify the scheme was switched - new_scheme = self.components["hikrobot_sc"].get_current_scheme() - self.log.info(f"New scheme: {new_scheme}") - - if new_scheme == test_recipe: - self.log.info("Scheme switching test passed!") - return True + if success: + self.log.info(f"Successfully switched to scheme: {test_recipe}") + + # Verify the scheme was switched + new_scheme = self.components["hikrobot_sc"].get_current_scheme() + self.log.info(f"Verified current scheme: {new_scheme}") + + if new_scheme == test_recipe: + self.log.info("Scheme switching test passed!") + return True + else: + self.log.error(f"Scheme verification failed. Expected: {test_recipe}, Got: {new_scheme}") + return False else: - self.log.error("Scheme switching test failed: scheme was not switched correctly") + self.log.error(f"Failed to switch to scheme: {test_recipe}") return False