This commit is contained in:
edo-neo 2025-08-21 10:39:54 +02:00
parent 03db39bb41
commit e73cf12878
2 changed files with 84 additions and 124 deletions

View File

@ -544,6 +544,57 @@ class HikrobotSmartCamera(Component):
self.log.error(f"Error getting current scheme: {e}")
return None
def switch_scheme_sync(self, solution_name, timeout=30):
"""
Synchronously switch to a different scheme/solution and wait for completion.
Args:
solution_name: The name of the solution to switch to
timeout: Maximum time to wait for the switch to complete (in seconds)
Returns:
bool: True if successful, False otherwise
"""
if not self.connected:
self.log.error("Cannot switch scheme: Camera not connected")
return False
# Get current scheme for comparison
current_scheme = self.get_current_scheme()
if current_scheme == solution_name:
self.log.info(f"Already using scheme: {solution_name}")
return True
# Start the scheme switching process
success = self.switch_scheme(solution_name)
if not success:
self.log.error(f"Failed to initiate scheme switch to: {solution_name}")
return False
# Wait for the scheme switching to complete
self.log.info(f"Waiting for scheme switch to complete (timeout: {timeout}s)")
start_time = time.time()
while self.current_operation == "switch_scheme":
# Check if we've exceeded the timeout
if time.time() - start_time > timeout:
self.log.error(f"Scheme switching timed out after {timeout} seconds")
self.progress_timer.stop()
self.current_operation = None
return False
# Sleep a bit to avoid busy waiting
time.sleep(0.5)
# Verify the switch was successful by checking the current scheme
new_scheme = self.get_current_scheme()
if new_scheme == solution_name:
self.log.info(f"Successfully switched to scheme: {solution_name}")
return True
else:
self.log.warning(f"Scheme switch verification failed. Expected: {solution_name}, Got: {new_scheme}")
return False
def test_scheme_switch(self, solution_name=None):
"""
Test method to manually trigger a scheme switch.
@ -563,12 +614,6 @@ class HikrobotSmartCamera(Component):
self.log.info(f"Testing scheme switch to: {solution_name}")
# Get current scheme for comparison
current_scheme = self.get_current_scheme()
if current_scheme == solution_name:
self.log.info(f"Already using scheme: {solution_name}")
return True
if not self.connected:
self.log.info("Camera not connected, connecting first...")
self.config_changed()
@ -576,25 +621,5 @@ class HikrobotSmartCamera(Component):
self.log.error("Failed to connect to camera")
return False
# Stop any ongoing operation
if self.current_operation:
self.log.warning(f"Stopping ongoing operation: {self.current_operation}")
self.progress_timer.stop()
self.current_operation = None
# Try to switch scheme
success = self.switch_scheme(solution_name)
# Verify the switch was successful
if success:
# Wait a bit for the switch to complete
time.sleep(2)
new_scheme = self.get_current_scheme()
if new_scheme == solution_name:
self.log.info(f"Successfully switched to scheme: {solution_name}")
return True
else:
self.log.warning(f"Scheme switch command succeeded but verification failed. Current scheme: {new_scheme}")
return False
return success
# Use the synchronous scheme switching method
return self.switch_scheme_sync(solution_name)

View File

@ -1,59 +1,15 @@
import sys
import logging
from PyQt5.QtCore import pyqtSignal, QTimer
from PyQt5.QtCore import pyqtSignal
from PyQt5.QtGui import QColor, QImage, QPalette, QPixmap
from PyQt5.QtWidgets import QHeaderView, QProgressBar, QTableWidgetItem, QDialog, QVBoxLayout, QLabel, QProgressBar
from PyQt5.QtWidgets import QHeaderView, QProgressBar, QTableWidgetItem
from src.lib.helpers.blocking_dialog import BlockingDialog
from src.ui.helpers import calc_foreground_color
from src.ui.test_test import Test_Test
class SchemeProgressDialog(QDialog):
"""
Dialog to show scheme switching progress and block the UI until complete.
"""
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("Switching Camera Scheme")
self.setModal(True)
self.setMinimumWidth(400)
layout = QVBoxLayout()
self.setLayout(layout)
self.label = QLabel("Waiting for camera scheme to switch. Please wait...")
layout.addWidget(self.label)
self.progress_bar = QProgressBar()
self.progress_bar.setMinimum(0)
self.progress_bar.setMaximum(100)
layout.addWidget(self.progress_bar)
self.status_label = QLabel("Status: Initializing...")
layout.addWidget(self.status_label)
# Set a timeout in case the progress signal doesn't complete
self.timer = QTimer(self)
self.timer.setSingleShot(True)
self.timer.timeout.connect(self.handle_timeout)
self.timer.start(30000) # 30 second timeout
def update_progress(self, progress, status):
self.progress_bar.setValue(progress)
self.status_label.setText(f"Status: {status}")
if progress >= 100 and status == "Success":
self.accept()
elif status == "Failed":
self.reject()
def handle_timeout(self):
self.status_label.setText("Status: Timeout - Continuing anyway")
self.accept() # Continue with the cycle even if timeout occurs
class Test_Vision(Test_Test):
request_frame = pyqtSignal()
@ -94,45 +50,20 @@ class Test_Vision(Test_Test):
dialog = BlockingDialog(self.ui,wait_time=wait_time, message=f"COLLEGARE CAVO ETHERNET E ATTENDERE {wait_time} SECONDI PER IL CORRETTO AVVIO DELLE TELECAMERE")
dialog.exec_()
# Always check if we need to wait for scheme switching to complete
# This ensures we don't proceed until the camera is ready with the correct scheme
# Check if we need to switch the camera scheme
self.log.info("Checking if camera scheme needs to be switched...")
# Get the current scheme name from the camera
current_scheme = self.components[cam_type].get_current_scheme()
# Get the target scheme from the recipe name
target_scheme = self.step.spec.get("recipe", "").replace(".ini", "")
self.log.info(f"Current camera scheme: {current_scheme}, Target scheme: {target_scheme}")
# Use the synchronous scheme switching method
self.log.info(f"Switching camera scheme to: {target_scheme}")
success = self.components[cam_type].switch_scheme_sync(target_scheme, timeout=30)
# If schemes don't match or a scheme switch is already in progress, wait for it
if current_scheme != target_scheme or self.components[cam_type].current_operation == "switch_scheme":
self.log.info(f"Waiting for camera to switch to scheme: {target_scheme}")
# Create and show the progress dialog
progress_dialog = SchemeProgressDialog(self)
progress_dialog.label.setText(f"Switching camera to scheme: {target_scheme}\nPlease wait...")
# Connect to the progress signal
progress_connection = self.components[cam_type].progress_signal.connect(
progress_dialog.update_progress
)
# If a scheme switch is not already in progress, start one
if self.components[cam_type].current_operation != "switch_scheme":
self.log.info(f"Initiating scheme switch to: {target_scheme}")
self.components[cam_type].switch_scheme(target_scheme)
# Show the dialog and wait for it to complete
result = progress_dialog.exec_()
# Disconnect from the progress signal
self.components[cam_type].progress_signal.disconnect(progress_connection)
if result == QDialog.Rejected:
# Scheme switching failed, but we'll continue anyway
self.log.warning("Scheme switching failed, continuing with cycle")
if success:
self.log.info(f"Successfully switched to scheme: {target_scheme}")
else:
self.log.info(f"Camera already using correct scheme: {current_scheme}")
self.log.warning(f"Failed to switch to scheme: {target_scheme}, continuing anyway")
self.components[cam_type].resume()
else:
@ -326,7 +257,7 @@ class Test_Vision(Test_Test):
def test_scheme_switching(self):
"""
Test method to verify that the scheme switching wait mechanism works correctly.
Test method to verify that the scheme switching mechanism works correctly.
This can be called manually for testing purposes.
"""
if "vision" not in self.components or "hikrobot_sc" not in self.components:
@ -337,24 +268,28 @@ class Test_Vision(Test_Test):
current_scheme = self.components["hikrobot_sc"].get_current_scheme()
self.log.info(f"Current scheme: {current_scheme}")
# Set a test recipe to trigger scheme switching
test_recipe = "test_absence" # Use an existing recipe name
self.log.info(f"Setting test recipe: {test_recipe}")
self.components["vision"].set_recipe(test_recipe)
# Choose a test recipe different from the current scheme
test_recipes = ["test_absence", "test_presence"]
test_recipe = test_recipes[1] if current_scheme == test_recipes[0] else test_recipes[0]
# Wait for the scheme switching dialog to appear and complete
self.log.info("Waiting for scheme switching to complete...")
self.log.info(f"Testing scheme switch to: {test_recipe}")
# The dialog should appear automatically in the start() method
# when the hikrobot_sc component's current_operation is "switch_scheme"
# Use the synchronous scheme switching method directly
success = self.components["hikrobot_sc"].switch_scheme_sync(test_recipe, timeout=30)
# Verify the scheme was switched
new_scheme = self.components["hikrobot_sc"].get_current_scheme()
self.log.info(f"New scheme: {new_scheme}")
if new_scheme == test_recipe:
self.log.info("Scheme switching test passed!")
return True
if success:
self.log.info(f"Successfully switched to scheme: {test_recipe}")
# Verify the scheme was switched
new_scheme = self.components["hikrobot_sc"].get_current_scheme()
self.log.info(f"Verified current scheme: {new_scheme}")
if new_scheme == test_recipe:
self.log.info("Scheme switching test passed!")
return True
else:
self.log.error(f"Scheme verification failed. Expected: {test_recipe}, Got: {new_scheme}")
return False
else:
self.log.error("Scheme switching test failed: scheme was not switched correctly")
self.log.error(f"Failed to switch to scheme: {test_recipe}")
return False