diff --git a/src/components/hikrobot_sc/hikrobot_sc.py b/src/components/hikrobot_sc/hikrobot_sc.py index 5b1905b..784fd65 100644 --- a/src/components/hikrobot_sc/hikrobot_sc.py +++ b/src/components/hikrobot_sc/hikrobot_sc.py @@ -5,528 +5,29 @@ import time from ctypes import * import sys -from PyQt5.QtCore import pyqtSignal, QMutex, QTimer +from PyQt5.QtCore import pyqtSignal, QMutex from PyQt5.QtWidgets import QMessageBox -from ..component import Component +from components.component import Component from .hikrobot_dll import * class HikrobotSmartCamera(Component): - # Signal to notify about progress updates - progress_signal = pyqtSignal(int, str) - def __init__(self, config=None, name=None, period=0.2, lazy=True, paused=False, threaded=True): super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded) self.simulate = "--sim-camera" in sys.argv - self.num_cameras=1 + self.num_cameras = 1 self.connected = False self.ok_memory = True self.ok_frames = [] - self.ok_results =[] - self.solution_name = None - self.rotations = "0" # Default rotation value - - # Progress monitoring - self.progress_timer = QTimer() - self.progress_timer.timeout.connect(self.check_progress) - self.current_operation = None - - def check_progress(self): - """ - Check the progress of the current operation by monitoring push status and progress. - This is called periodically by the timer when an operation is in progress. - """ - if not self.connected or not self.current_operation: - self.log.debug("Progress check stopped: not connected or no operation in progress") - self.progress_timer.stop() - return - - # Get the first camera handle - if len(self.cam_list) == 0: - self.log.error("Progress check stopped: no camera handles available") - self.progress_timer.stop() - return - - handle = self.cam_list[0]["handle"] - - # Get push type, state and rate - push_type = c_uint() - push_state = c_uint() - push_rate = c_uint() - - try: - self.log.info("Getting push status information for operation: " + str(self.current_operation)) - - # Get additional camera status information for debugging - self.log.info("Current camera state before getting push status:") - self._debug_camera_state(handle) - - nRet1 = mv_lib.MV_VS_GetEnumValue(handle, b"ScPushType", byref(push_type)) - nRet2 = mv_lib.MV_VS_GetEnumValue(handle, b"ScPushState", byref(push_state)) - nRet3 = mv_lib.MV_VS_GetIntValue(handle, b"ScPushRate", byref(push_rate)) - - # Check for specific error codes - has_generic_error = False - - if nRet1 != MV_VS_OK: - error_code = nRet1 & 0xFFFFFFFF - self.log.error(f"Failed to get ScPushType, error code: 0x{error_code:x}") - if error_code == 0x80030100: # MV_VS_E_GC_GENERIC - has_generic_error = True - - if nRet2 != MV_VS_OK: - error_code = nRet2 & 0xFFFFFFFF - self.log.error(f"Failed to get ScPushState, error code: 0x{error_code:x}") - if error_code == 0x80030100: # MV_VS_E_GC_GENERIC - has_generic_error = True - - if nRet3 != MV_VS_OK: - error_code = nRet3 & 0xFFFFFFFF - self.log.error(f"Failed to get ScPushRate, error code: 0x{error_code:x}") - if error_code == 0x80030100: # MV_VS_E_GC_GENERIC - has_generic_error = True - - # If we encountered the generic error, log additional information - if has_generic_error: - self.log.warning("Detected MV_VS_E_GC_GENERIC error (0x80030100): General camera error") - self.log.info("This error often occurs during scheme switching when the camera is busy") - - if nRet1 == MV_VS_OK and nRet2 == MV_VS_OK and nRet3 == MV_VS_OK: - self.log.info(f"Push status - Type: {push_type.value}, State: {push_state.value}, Rate: {push_rate.value}") - - # Check which operation we're monitoring - if self.current_operation == "switch_scheme": - # For scheme switching, we expect push_type.value == 2 - if push_type.value == 2: # LPRP 2 protocol loading progress - progress = push_rate.value - status = "Success" if push_state.value == 0 else "Failed" - - self.log.info(f"Scheme switching progress: {progress}%, Status: {status}") - - # Always emit progress updates, even if no progress is made - self.progress_signal.emit(progress, status) - - # If progress is complete or failed, stop the timer - if progress >= 100 or push_state.value == 1: - self.progress_timer.stop() - self.current_operation = None - - if push_state.value == 0: # Success - self.log.info("Scheme switching completed successfully") - # Refresh module list after successful scheme switching - self.refresh_module_list() - else: # Failed - self.log.error("Scheme switching failed") - else: - # If we're not getting the expected push type, log it and still update the UI - self.log.info(f"Waiting for push type 2 (protocol loading), current type: {push_type.value}") - - # Send a progress update with the current status - status_message = f"Initializing (Type: {push_type.value})" - self.progress_signal.emit(10, status_message) # Use 10% as a placeholder for initialization - - elif self.current_operation == "ntp_settings": - if push_type.value == 16: # NTPS 16 NTP Time Check Status - progress = push_rate.value - status = "Success" if push_state.value == 0 else "Failed" - - self.log.info(f"NTP settings progress: {progress}%, Status: {status}") - self.progress_signal.emit(progress, status) - - # If progress is complete or failed, stop the timer - if progress >= 100 or push_state.value == 1: - self.progress_timer.stop() - self.current_operation = None - - if push_state.value == 0: # Success - self.log.info("NTP settings completed successfully") - else: # Failed - self.log.error("NTP settings failed") - else: - self.log.info(f"Waiting for push type 16 (NTP settings), current type: {push_type.value}") - # Send a progress update with the current status - status_message = f"Initializing (Type: {push_type.value})" - self.progress_signal.emit(10, status_message) - else: - self.log.warning(f"Unknown operation type: {self.current_operation}") - else: - # If we can't get the push status, log detailed error information - self.log.warning(f"Could not get push status information, error codes: 0x{nRet1&0xFFFFFFFF:x}, 0x{nRet2&0xFFFFFFFF:x}, 0x{nRet3&0xFFFFFFFF:x}") - - # Try to get additional camera status for debugging - self._debug_camera_state(handle) - - # Check if we have the generic error - if has_generic_error: - # For the generic error, we'll send a more specific status message - status_message = "Initializing (Camera busy)" - self.progress_signal.emit(15, status_message) - else: - # Send a progress update to the UI - status_message = "Initializing (Status retrieval failed)" - self.progress_signal.emit(5, status_message) - - # After 10 seconds (20 checks at 500ms), force completion if we're still waiting - if not hasattr(self, '_progress_check_count'): - self._progress_check_count = 0 - - self._progress_check_count += 1 - self.log.info(f"Progress check count: {self._progress_check_count}/20") - - if self._progress_check_count >= 20: - current_op = self.current_operation - self.log.warning(f"Force completing operation after timeout: {current_op}") - self.progress_timer.stop() - self.current_operation = None - self._progress_check_count = 0 - - # If we were switching schemes, try to refresh the module list anyway - if current_op == "switch_scheme": - self.log.info("Attempting to refresh module list after timeout") - self.refresh_module_list() - - except Exception as e: - self.log.error(f"Error checking progress: {e}") - self.progress_timer.stop() - self.current_operation = None - - def _debug_camera_state(self, handle): - """ - Get additional camera state information for debugging purposes. - - Args: - handle: Camera handle - """ - try: - # Try to get the current solution name - solution_name = create_string_buffer(256) - nRet = mv_lib.MV_VS_GetStringValue(handle, b"SrcOperateSolutionName", solution_name, 256) - if nRet == MV_VS_OK: - self.log.info(f"Current solution name: {solution_name.value.decode('utf-8')}") - else: - self.log.info(f"Failed to get solution name, error code: 0x{nRet&0xFFFFFFFF:x}") - - # Try to get the device status - device_status = c_uint() - nRet = mv_lib.MV_VS_GetEnumValue(handle, b"DeviceStatus", byref(device_status)) - if nRet == MV_VS_OK: - self.log.info(f"Device status: {device_status.value}") - else: - error_code = nRet & 0xFFFFFFFF - self.log.info(f"Failed to get device status, error code: 0x{error_code:x}") - - # Special handling for 0x80030100 (MV_VS_E_GC_GENERIC) - if error_code == 0x80030100: - self.log.warning("Detected MV_VS_E_GC_GENERIC error (0x80030100): General camera error") - self.log.info("This error often occurs during scheme switching when the camera is busy") - - # Try to get the acquisition status - acquisition_status = c_uint() - nRet = mv_lib.MV_VS_GetEnumValue(handle, b"AcquisitionStatus", byref(acquisition_status)) - if nRet == MV_VS_OK: - self.log.info(f"Acquisition status: {acquisition_status.value}") - else: - error_code = nRet & 0xFFFFFFFF - self.log.info(f"Failed to get acquisition status, error code: 0x{error_code:x}") - - # Special handling for 0x80030100 (MV_VS_E_GC_GENERIC) - if error_code == 0x80030100: - self.log.warning("Detected MV_VS_E_GC_GENERIC error (0x80030100): General camera error") - self.log.info("This error often occurs during scheme switching when the camera is busy") - - except Exception as e: - self.log.error(f"Error getting debug camera state: {e}") - - def refresh_module_list(self): - """ - Refresh the module list after switching schemes. - - Returns: - bool: True if successful, False otherwise - """ - if not self.connected: - self.log.error("Cannot refresh module list: Camera not connected") - return False - - # Get the first camera handle - if len(self.cam_list) == 0: - self.log.error("Cannot refresh module list: No camera handles available") - return False - - handle = self.cam_list[0]["handle"] - - try: - # Command to refresh module list - nRet = mv_lib.MV_VS_SetCommandValue(handle, b"CommandRefreshModuleList") - if nRet != MV_VS_OK: - error_code = nRet & 0xFFFFFFFF - self.log.error(f"Failed to refresh module list, error code: 0x{error_code:x}") - - # Special handling for 0x80030100 (MV_VS_E_GC_GENERIC) - if error_code == 0x80030100: - self.log.warning("Detected MV_VS_E_GC_GENERIC error (0x80030100) during module list refresh") - self.log.info("This is a non-critical error, continuing operation") - # We'll return True here to avoid treating this as a fatal error - return True - - return False - - self.log.info("Module list refreshed successfully") - return True - except Exception as e: - self.log.error(f"Error refreshing module list: {e}") - # Log the full exception traceback for debugging - import traceback - self.log.error(f"Exception traceback: {traceback.format_exc()}") - return False - - def switch_scheme(self, solution_name, retry_count=0, max_retries=2): - """ - Switch to a different scheme/solution using the API as described in the documentation. - - Args: - solution_name: The name of the solution to switch to - retry_count: Current retry attempt (used internally for recursion) - max_retries: Maximum number of retry attempts for error recovery - - Returns: - bool: True if the scheme switching process was successfully initiated, False otherwise - """ - if not self.connected: - self.log.error("Cannot switch scheme: Camera not connected") - return False - - # Get the first camera handle - if len(self.cam_list) == 0: - self.log.error("Cannot switch scheme: No camera handles available") - return False - - # Check if an operation is already in progress - if self.current_operation is not None: - self.log.warning(f"Cannot switch scheme: Another operation is already in progress: {self.current_operation}") - # Try to stop the current operation - if self.progress_timer.isActive(): - self.log.warning(f"Stopping ongoing operation: {self.current_operation}") - self.progress_timer.stop() - self.current_operation = None - - handle = self.cam_list[0]["handle"] - - # Get current camera state for debugging - self.log.info("Getting camera state before scheme switch:") - self._debug_camera_state(handle) - - try: - - - # Check if the solution name is valid (not empty) - if not solution_name or solution_name.strip() == "": - self.log.error("Cannot switch scheme: Solution name is empty") - return False - - # 1. Set the SrcOperateSolutionName node as the target schema name - solution_name_bytes = solution_name.encode('utf-8') - self.log.info(f"Setting solution name: {solution_name} (bytes: {solution_name_bytes})") - nRet = mv_lib.MV_VS_SetStringValue(handle, b"SrcOperateSolutionName", solution_name_bytes) - if nRet != MV_VS_OK: - error_code = nRet & 0xFFFFFFFF - self.log.error(f"Failed to set solution name: {solution_name}, error code: 0x{error_code:x}") - - # Try to get more information about the error - self._debug_camera_state(handle) - - - else: - self.log.info(f"Successfully set solution name: {solution_name}") - - # 2. Set the Load Scenario command - self.log.info("Sending CommandProjectLoad command") - nRet = mv_lib.MV_VS_SetCommandValue(handle, b"CommandProjectLoad") - if nRet != MV_VS_OK: - error_code = nRet & 0xFFFFFFFF - self.log.error(f"Failed to load project, error code: 0x{error_code:x}") - - # Try to get more information about the error - self._debug_camera_state(handle) - - - - return False - else: - self.log.info("Successfully sent project load command") - - # Reset progress check count - self._progress_check_count = 0 - - # Start monitoring progress - self.log.info("Starting progress monitoring for scheme switching") - self.current_operation = "switch_scheme" - self.progress_timer.start(500) # Check every 500ms - - return True - except Exception as e: - self.log.error(f"Error switching scheme: {e}") - # Log the full exception traceback for debugging - import traceback - self.log.error(f"Exception traceback: {traceback.format_exc()}") - - - - return False - - def get_ntp_parameters(self): - """ - Get the current NTP parameters from the camera. - - Returns: - dict: A dictionary containing the NTP parameters (server_ip, port, interval) - or None if there was an error - """ - if not self.connected: - self.log.error("Cannot get NTP parameters: Camera not connected") - return None - - # Get the first camera handle - if len(self.cam_list) == 0: - self.log.error("Cannot get NTP parameters: No camera handles available") - return None - - handle = self.cam_list[0]["handle"] - - try: - # Get NTP server IP - server_ip = c_uint() - nRet1 = mv_lib.MV_VS_GetIntValue(handle, b"ScDeviceNTPServerIP", byref(server_ip)) - - # Get NTP port - port = c_uint() - nRet2 = mv_lib.MV_VS_GetIntValue(handle, b"ScDeviceNTPPort", byref(port)) - - # Get NTP interval - interval = c_uint() - nRet3 = mv_lib.MV_VS_GetIntValue(handle, b"ScDeviceNTPInterval", byref(interval)) - - if nRet1 == MV_VS_OK and nRet2 == MV_VS_OK and nRet3 == MV_VS_OK: - # Convert IP from integer to string format - ip_str = f"{(server_ip.value >> 24) & 0xFF}.{(server_ip.value >> 16) & 0xFF}.{(server_ip.value >> 8) & 0xFF}.{server_ip.value & 0xFF}" - - return { - "server_ip": ip_str, - "port": port.value, - "interval": interval.value - } - else: - self.log.error("Failed to get NTP parameters") - return None - except Exception as e: - self.log.error(f"Error getting NTP parameters: {e}") - return None - - def set_ntp_parameters(self, server_ip, port, interval): - """ - Set the NTP parameters on the camera. - - Args: - server_ip (str): The NTP server IP address in format "xxx.xxx.xxx.xxx" - port (int): The NTP server port - interval (int): The NTP sync interval - - Returns: - bool: True if successful, False otherwise - """ - if not self.connected: - self.log.error("Cannot set NTP parameters: Camera not connected") - return False - - # Get the first camera handle - if len(self.cam_list) == 0: - self.log.error("Cannot set NTP parameters: No camera handles available") - return False - - handle = self.cam_list[0]["handle"] - - try: - # Convert IP string to integer format - ip_parts = server_ip.split('.') - if len(ip_parts) != 4: - self.log.error(f"Invalid IP address format: {server_ip}") - return False - - ip_int = (int(ip_parts[0]) << 24) + (int(ip_parts[1]) << 16) + (int(ip_parts[2]) << 8) + int(ip_parts[3]) - - # Set NTP server IP - nRet1 = mv_lib.MV_VS_SetIntValue(handle, b"ScDeviceNTPServerIP", ip_int) - - # Set NTP port - nRet2 = mv_lib.MV_VS_SetIntValue(handle, b"ScDeviceNTPPort", port) - - # Set NTP interval - nRet3 = mv_lib.MV_VS_SetIntValue(handle, b"ScDeviceNTPInterval", interval) - - if nRet1 != MV_VS_OK or nRet2 != MV_VS_OK or nRet3 != MV_VS_OK: - self.log.error("Failed to set NTP parameters") - return False - - # Confirm the settings - nRet = mv_lib.MV_VS_SetCommandValue(handle, b"ScDeviceSettingTimeCommand") - if nRet != MV_VS_OK: - self.log.error("Failed to confirm NTP settings") - return False - - # Start monitoring progress for NTP settings - self.current_operation = "ntp_settings" - self.progress_timer.start(500) # Check every 500ms - - return True - except Exception as e: - self.log.error(f"Error setting NTP parameters: {e}") - return False - - def update_solution_name(self, recipe_name): - """ - This method is called when the vision component's recipe changes. - It updates the solution_name attribute and triggers a scheme switch. - - It also attempts to get the rotations value from the vision component if available. - """ - self.log.info(f"Updating solution name to: {recipe_name}") - self.solution_name = recipe_name - - # Try to get rotations from vision component if available - try: - if hasattr(self, "components") and "vision" in self.components and hasattr(self.components["vision"], "vision_config"): - if self.components["vision"].vision_config is not None and "rotations" in self.components["vision"].vision_config: - self.rotations = self.components["vision"].vision_config["rotations"] - self.log.info(f"Updated rotations to: {self.rotations}") - except Exception as e: - self.log.warning(f"Could not update rotations: {e}") - - # Store the solution name for later use, but don't switch schemes yet - # The actual scheme switching will be handled by the Test_Vision component - # when it calls start() and checks if a scheme switch is needed - if not self.connected: - self.log.info("Camera not connected, will switch scheme after connection") - self.reconfigure() # This will connect and then switch scheme - else: - # Stop any ongoing operation - if self.current_operation: - self.log.warning(f"Stopping ongoing operation: {self.current_operation}") - self.progress_timer.stop() - self.current_operation = None - - # Start the scheme switching process (asynchronously) - # The Test_Vision component will wait for this to complete - self.switch_scheme(recipe_name) + self.ok_results = [] def config_changed(self): self.connected = False global MV_VS_OK device_info_list = MV_VS_DEVICE_INFO_LIST() - nRet = mv_lib.MV_VS_EnumDevices(ctypes.byref(device_info_list)) - enum_ok = False for n in range(3): if nRet != MV_VS_OK: @@ -534,7 +35,6 @@ class HikrobotSmartCamera(Component): return -2 else: print("Enum devices OK") - if device_info_list.nDeviceNum > 0: print_device_info(device_info_list) if device_info_list.nDeviceNum >= self.num_cameras: @@ -572,6 +72,79 @@ class HikrobotSmartCamera(Component): if not login_ok: return -2 time.sleep(0.2) + + # --- Start of Added Code: Switch Scheme --- + # As per SDK PDF Chapter 5.11, this section switches the camera's active scheme. + # It reads a comma-separated list of scheme names from the config file. + try: + solution_name=self.config.vision_config.get("solution_name", None) + if solution_name is not None: + scheme_name = solution_name + self.log.info(f"CAM{cam_idx} Attempting to switch to scheme: {scheme_name.decode()}") + + # Step 1: Set the scheme name to be switched using MV_VS_SetStringValue. + # The node name is "SolutionName". + nRet = mv_lib.MV_VS_SetStringValue(pHandle, b"SolutionName", c_char_p(scheme_name)) + if nRet != MV_VS_OK: + self.log.error(f"CAM{cam_idx} Set SolutionName failed! nRet [{nRet & 0xFFFFFFFF:#x}]") + # Clean up resources before returning + mv_lib.MV_VS_Logout(pHandle) + mv_lib.MV_VS_DestroyHandle(pHandle) + return -2 + + # Step 2: Call the scheme switch command using MV_VS_SetCommandValue. + # The command is "CommandSolutionSwitch". + nRet = mv_lib.MV_VS_SetCommandValue(pHandle, b"CommandSolutionSwitch") + if nRet != MV_VS_OK: + self.log.error(f"CAM{cam_idx} CommandSolutionSwitch failed! nRet [{nRet & 0xFFFFFFFF:#x}]") + # Clean up resources before returning + mv_lib.MV_VS_Logout(pHandle) + mv_lib.MV_VS_DestroyHandle(pHandle) + return -2 + + self.log.info(f"CAM{cam_idx} Switched to scheme '{scheme_name.decode()}' successfully.") + + # Step 3: Wait for the scheme switch to complete. + # As per SDK PDF Chapter 3.3.2, we poll "SolutionSwitchStatus". + # A value of 0 indicates the switch is complete. + self.log.info(f"CAM{cam_idx} Waiting for scheme switch to complete...") + switch_timeout = 30 # 30-second timeout for the switch + start_time = time.time() + while True: + if time.time() - start_time > switch_timeout: + self.log.error(f"CAM{cam_idx} Timed out waiting for scheme switch.") + mv_lib.MV_VS_Logout(pHandle) + mv_lib.MV_VS_DestroyHandle(pHandle) + return -2 + + # Create a variable to store the status + status = ctypes.c_int64() + nRet = mv_lib.MV_VS_GetIntValue(pHandle, b"SolutionSwitchStatus", byref(status)) + if nRet != MV_VS_OK: + self.log.error(f"CAM{cam_idx} Get SolutionSwitchStatus failed! nRet [{nRet & 0xFFFFFFFF:#x}]") + mv_lib.MV_VS_Logout(pHandle) + mv_lib.MV_VS_DestroyHandle(pHandle) + return -2 + + if status.value == 0: + self.log.info(f"CAM{cam_idx} Scheme switch completed successfully.") + break + else: + self.log.info(f"CAM{cam_idx} Scheme switch not yet complete. Waiting...") + time.sleep(1) + + else: + self.log.warning(f"CAM{cam_idx} No scheme name defined in config for this camera index. Using default scheme.") + except KeyError: + self.log.warning("'schemes' key not found in vision_config. Using camera's default/current scheme.") + except Exception as e: + self.log.error(f"CAM{cam_idx} An unexpected error occurred during scheme switching: {e}") + # Clean up resources before returning + mv_lib.MV_VS_Logout(pHandle) + mv_lib.MV_VS_DestroyHandle(pHandle) + return -2 + # --- End of Added Code --- + nRet = mv_lib.MV_VS_SetBoolValue(pHandle, b"CommandImageMode", 0) if nRet != MV_VS_OK: self.log.error(f"CAM{cam_idx} CommandImageMode failed! [{nRet&0xFFFFFFFF:#x}]") @@ -581,17 +154,12 @@ class HikrobotSmartCamera(Component): self.log.error(f"CAM{cam_idx} AcquisitionMode failed! [{nRet&0xFFFFFFFF:#x}]") return -2 - # Set the camera recipe - we'll use the switch_scheme method after connection is established - # This will be handled after the camera is fully initialized - # Start the camera test nRet = mv_lib.MV_VS_StartRun(pHandle) if nRet != MV_VS_OK: self.log.error(f"CAM{cam_idx} Start run failed! [{nRet & 0xFFFFFFFF:#x}]") - self.connected = True - @Component.reconfig_on_error def _get(self): if not self.connected: @@ -600,142 +168,47 @@ class HikrobotSmartCamera(Component): return concat_frame = None concat_results = [] - - try: - rot = self.rotations.split(",") - for cam_idx in range(self.num_cameras): - try: - cam = self.cam_list[cam_idx] - self.log.info(f"GET FRAME CAMERA # {cam_idx}") - - # Initialize default values in case of error - frame = None - res = True # Default to error condition - - if self.ok_results[cam_idx]: - # keep last good result - frame = self.ok_frames[cam_idx] - res = False - else: - try: - # Trigger acquisition & test execution - nRet = mv_lib.MV_VS_SetCommandValue(cam["handle"], b"AcquisitionStart") - if nRet != MV_VS_OK: - error_code = nRet & 0xFFFFFFFF - self.log.error(f"CAM{cam_idx} AcquisitionStart failed! [0x{error_code:x}]") - - # Special handling for 0x80030100 (MV_VS_E_GC_GENERIC) - if error_code == 0x80030100: - self.log.warning("Detected MV_VS_E_GC_GENERIC error (0x80030100) during acquisition") - self.log.info("This is a non-critical error, using last good frame if available") - - # Use last good frame if available - if self.ok_frames[cam_idx] is not None: - frame = self.ok_frames[cam_idx] - res = False - else: - # No good frame available, mark as disconnected - self.log.error("No previous good frame available") - self.connected = False - frame_res = None - else: - # For other errors, mark as disconnected - self.connected = False - frame_res = None - else: - # get frame - frame_res = MV_VS_GetFrame(cam["handle"]) + rot = self.config.vision_config["rotations"].split(",") + for cam_idx in range(self.num_cameras): + cam = self.cam_list[cam_idx] + self.log.info(f"GET FRAME CAMERA # {cam_idx}") + if self.ok_results[cam_idx]: + # keep last good result + frame = self.ok_frames[cam_idx] + res = False + else: + # Trigger acquisition & test execution + nRet = mv_lib.MV_VS_SetCommandValue(cam["handle"], b"AcquisitionStart") + if nRet != MV_VS_OK: + self.log.error(f"CAM{cam_idx} AcquisitionStart failed! [{nRet&0xFFFFFFFF:#x}]") + self.connected = False + frame_res = None + else: + # get frame + frame_res = MV_VS_GetFrame(cam["handle"]) - if frame_res is not None: - try: - # Safely access the result data with error checking - if "res" in frame_res and "ScDeviceSolutionRunningResult" in frame_res["res"]: - res = frame_res["res"]["ScDeviceSolutionRunningResult"] - frame = frame_res["frame"] - - # rotate frame - if len(rot) > cam_idx: # Check if rotation info exists for this camera - if rot[cam_idx] == "1": - frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE) - if rot[cam_idx] == "2": - frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE) - if rot[cam_idx] == "3": - frame = cv2.rotate(frame, cv2.ROTATE_180) - else: - self.log.warning(f"CAM{cam_idx} Invalid frame result format") - # Use last good frame if available - if self.ok_frames[cam_idx] is not None: - frame = self.ok_frames[cam_idx] - res = False - else: - self.connected = False - except Exception as e: - self.log.error(f"Error processing frame for CAM{cam_idx}: {e}") - # Use last good frame if available - if self.ok_frames[cam_idx] is not None: - frame = self.ok_frames[cam_idx] - res = False - else: - self.connected = False - else: - self.log.warning(f"CAM{cam_idx} Failed to get frame") - # Use last good frame if available - if self.ok_frames[cam_idx] is not None: - frame = self.ok_frames[cam_idx] - res = False - else: - self.connected = False - except Exception as e: - self.log.error(f"Error during frame acquisition for CAM{cam_idx}: {e}") - # Log the full exception traceback for debugging - import traceback - self.log.error(f"Exception traceback: {traceback.format_exc()}") - - # Use last good frame if available - if self.ok_frames[cam_idx] is not None: - frame = self.ok_frames[cam_idx] - res = False - else: - self.connected = False - - # Skip this camera if we couldn't get a valid frame - if frame is None: - self.log.warning(f"Skipping CAM{cam_idx} due to missing frame") - continue - - # Process the frame - if concat_frame is None: - concat_frame = copy.deepcopy(frame) - else: - try: - concat_frame = cv2.hconcat([concat_frame, frame]) - except Exception as e: - self.log.error(f"Error concatenating frames: {e}") - # Use only the first frame if concatenation fails - if concat_frame is None and frame is not None: - concat_frame = copy.deepcopy(frame) - - concat_results.append(not res) - if not res: - self.ok_results[cam_idx] = True - self.ok_frames[cam_idx] = copy.deepcopy(frame) - except Exception as e: - self.log.error(f"Error processing camera {cam_idx}: {e}") - # Continue with next camera - continue - - # If we couldn't get any frames, return + if frame_res is not None: + res = frame_res["res"]["ScDeviceSolutionRunningResult"] + frame = frame_res["frame"] + # rotate frame + if rot[cam_idx] == "1": + frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE) + if rot[cam_idx] == "2": + frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE) + if rot[cam_idx] == "3": + frame = cv2.rotate(frame, cv2.ROTATE_180) + else: + self.connected = False if concat_frame is None: - self.log.error("No valid frames obtained from any camera") - return - - super()._get([concat_frame, self.ok_results]) - except Exception as e: - self.log.error(f"Error in _get method: {e}") - # Log the full exception traceback for debugging - import traceback - self.log.error(f"Exception traceback: {traceback.format_exc()}") - # Don't propagate the exception to avoid crashing the application + concat_frame = copy.deepcopy(frame) + else: + concat_frame = cv2.hconcat([concat_frame, frame]) + concat_results.append(not res) + if not res: + self.ok_results[cam_idx] = True + self.ok_frames[cam_idx] = copy.deepcopy(frame) + + super()._get([concat_frame, self.ok_results]) def resume(self): self.log.info(f"RESUMING") @@ -749,122 +222,9 @@ class HikrobotSmartCamera(Component): if len(self.cam_list) == self.num_cameras: # Dummy acquisition for each camera to avoid reading past images for cam_idx in range(self.num_cameras): - try: - cam = self.cam_list[cam_idx] - nRet = mv_lib.MV_VS_SetCommandValue(cam["handle"], b"AcquisitionStart") - - if nRet != MV_VS_OK: - error_code = nRet & 0xFFFFFFFF - self.log.warning(f"CAM{cam_idx} AcquisitionStart warning, error code: 0x{error_code:x}") - - # Special handling for 0x80030100 (MV_VS_E_GC_GENERIC) - if error_code == 0x80030100: - self.log.warning("Detected MV_VS_E_GC_GENERIC error (0x80030100) during acquisition start") - self.log.info("This is a non-critical error, continuing operation") - # Skip frame acquisition for this camera - continue - - # Only try to get a frame if AcquisitionStart was successful - frame_res = MV_VS_GetFrame(cam["handle"]) - - # Check if frame acquisition was successful - if frame_res is None: - self.log.warning(f"CAM{cam_idx} Failed to get initial frame, but continuing operation") - except Exception as e: - self.log.error(f"Error during camera {cam_idx} initialization: {e}") - # Log the full exception traceback for debugging - import traceback - self.log.error(f"Exception traceback: {traceback.format_exc()}") - # Continue with other cameras - continue + cam = self.cam_list[cam_idx] + nRet = mv_lib.MV_VS_SetCommandValue(cam["handle"], b"AcquisitionStart") + frame_res = MV_VS_GetFrame(cam["handle"]) else: QMessageBox.critical(None, "Errore", f"ERRORE CONNESSIONE TELECAMERE\n") super().resume() - - def get_current_scheme(self): - """ - Get the current scheme/solution name from the camera. - - Returns: - str: The current scheme name or None if there was an error - """ - if not self.connected: - self.log.error("Cannot get current scheme: Camera not connected") - return None - - # Get the first camera handle - if len(self.cam_list) == 0: - self.log.error("Cannot get current scheme: No camera handles available") - return None - - handle = self.cam_list[0]["handle"] - - try: - # Get the current solution name - solution_name = create_string_buffer(256) - nRet = mv_lib.MV_VS_GetStringValue(handle, b"SrcOperateSolutionName", solution_name, 256) - if nRet != MV_VS_OK: - self.log.error(f"Failed to get current solution name, error code: 0x{nRet&0xFFFFFFFF:x}") - return None - - current_scheme = solution_name.value.decode('utf-8') - self.log.info(f"Current scheme: {current_scheme}") - return current_scheme - except Exception as e: - self.log.error(f"Error getting current scheme: {e}") - return None - - def switch_scheme_sync(self, solution_name, timeout=30): - """ - Synchronously switch to a different scheme/solution and wait for completion. - - Args: - solution_name: The name of the solution to switch to - timeout: Maximum time in seconds to wait for scheme switching to complete (default: 30) - - Returns: - bool: True if successful, False otherwise - """ - if not self.connected: - self.log.error("Cannot switch scheme: Camera not connected") - return False - - # Get current scheme for comparison - current_scheme = self.get_current_scheme() - if current_scheme == solution_name: - self.log.info(f"Already using scheme: {solution_name}") - return True - - # Start the scheme switching process - success = self.switch_scheme(solution_name) - if not success: - self.log.error(f"Failed to initiate scheme switch to: {solution_name}") - return False - - # Wait for the scheme switching to complete with timeout - self.log.info(f"Waiting for scheme switch to complete (timeout: {timeout}s)") - - start_time = time.time() - while self.current_operation == "switch_scheme": - # Sleep a bit to avoid busy waiting - time.sleep(0.5) - - # Check if we've exceeded the timeout - if timeout is not None and (time.time() - start_time) > timeout: - self.log.warning(f"Scheme switch timeout after {timeout} seconds") - # Force stop the progress monitoring - if self.progress_timer.isActive(): - self.progress_timer.stop() - self.current_operation = None - break - - # Verify the switch was successful by checking the current scheme - new_scheme = self.get_current_scheme() - if new_scheme == solution_name: - self.log.info(f"Successfully switched to scheme: {solution_name}") - return True - else: - self.log.warning(f"Scheme switch verification failed. Expected: {solution_name}, Got: {new_scheme}") - return False - -