diff --git a/src/components/hikrobot_sc/hikrobot_sc.py b/src/components/hikrobot_sc/hikrobot_sc.py index 9da9830..9f0632e 100644 --- a/src/components/hikrobot_sc/hikrobot_sc.py +++ b/src/components/hikrobot_sc/hikrobot_sc.py @@ -33,6 +33,97 @@ class HikrobotSmartCamera(Component): self.progress_timer.timeout.connect(self.check_progress) self.current_operation = None + def get_push_status(self): + """ + Get the current push status information from the camera. + + As described in section 3.3.2 of the documentation, this retrieves: + - Push Type (ScPushType) + - Push Status (ScPushState) + - Push Progress (ScPushRate) + + Returns: + dict: A dictionary containing the push status information or None if there was an error + """ + if not self.connected: + self.log.error("Cannot get push status: Camera not connected") + return None + + # Get the first camera handle + if len(self.cam_list) == 0: + self.log.error("Cannot get push status: No camera handles available") + return None + + handle = self.cam_list[0]["handle"] + + try: + # Get push type, state and rate + push_type = c_uint() + push_state = c_uint() + push_rate = c_uint() + + nRet1 = mv_lib.MV_VS_GetEnumValue(handle, b"ScPushType", byref(push_type)) + nRet2 = mv_lib.MV_VS_GetEnumValue(handle, b"ScPushState", byref(push_state)) + nRet3 = mv_lib.MV_VS_GetIntValue(handle, b"ScPushRate", byref(push_rate)) + + if nRet1 == MV_VS_OK and nRet2 == MV_VS_OK and nRet3 == MV_VS_OK: + # Map push type to a readable name + push_type_names = { + 0: "UDMR", # Update individual module results + 1: "CTRP", # Create Template Progress + 2: "LPRP", # Protocol loading progress + 3: "SPRP", # Protocol save progress + 4: "FMUP", # Firmware update progress + 5: "DFCS", # Format the verification state + 6: "LTRP", # Upload Template Progress + 7: "LTIP", # Upload Template Image Progress + 8: "UDMS", # Update all module results + 9: "LLGP", # Load log progress + 10: "CMIS", # Detects module integrity + 11: "UPRP", # Upload scenario progress + 12: "DRTP", # Device reset progress + 13: "UPMP", # Upload Teaching Model progress + 14: "UPLI", # Upload local image progress + 15: "ISTP", # Export stored images to test image progress + 16: "NTPS", # NTP Time Check Status + 17: "DCLP", # DI or COMMUNITION switch protocol load progress + 18: "SRSP", # Insufficient system resources prompt + 19: "UPUS", # Upload package progress + 20: "UPUR", # Upload resource file progress + 21: "CPRP", # New Scenario Progress + 22: "EDRP", # Modify the scheme name + 23: "TIMG", # Test map run aborted + 24: "GLOG" # Get the device log + } + + # Map push state to a readable name + push_state_names = { + 0: "Success", + 1: "Failed" + } + + push_type_name = push_type_names.get(push_type.value, f"Unknown ({push_type.value})") + push_state_name = push_state_names.get(push_state.value, f"Unknown ({push_state.value})") + + return { + "type": push_type.value, + "type_name": push_type_name, + "state": push_state.value, + "state_name": push_state_name, + "rate": push_rate.value + } + else: + if nRet1 != MV_VS_OK: + self.log.error(f"Failed to get ScPushType, error code: 0x{nRet1&0xFFFFFFFF:x}") + if nRet2 != MV_VS_OK: + self.log.error(f"Failed to get ScPushState, error code: 0x{nRet2&0xFFFFFFFF:x}") + if nRet3 != MV_VS_OK: + self.log.error(f"Failed to get ScPushRate, error code: 0x{nRet3&0xFFFFFFFF:x}") + return None + except Exception as e: + self.log.error(f"Error getting push status: {e}") + return None + def check_progress(self): """ Check the progress of the current operation by monitoring push status and progress. @@ -49,97 +140,100 @@ class HikrobotSmartCamera(Component): self.progress_timer.stop() return - handle = self.cam_list[0]["handle"] + # Get push status + push_status = self.get_push_status() - # Get push type, state and rate - push_type = c_uint() - push_state = c_uint() - push_rate = c_uint() - - try: - self.log.debug("Getting push status information") - nRet1 = mv_lib.MV_VS_GetEnumValue(handle, b"ScPushType", byref(push_type)) - nRet2 = mv_lib.MV_VS_GetEnumValue(handle, b"ScPushState", byref(push_state)) - nRet3 = mv_lib.MV_VS_GetIntValue(handle, b"ScPushRate", byref(push_rate)) + if push_status is None: + self.log.warning("Could not get push status information") + # After 10 seconds (20 checks at 500ms), force completion if we're still waiting + if not hasattr(self, '_progress_check_count'): + self._progress_check_count = 0 - if nRet1 != MV_VS_OK: - self.log.error(f"Failed to get ScPushType, error code: 0x{nRet1&0xFFFFFFFF:x}") - if nRet2 != MV_VS_OK: - self.log.error(f"Failed to get ScPushState, error code: 0x{nRet2&0xFFFFFFFF:x}") - if nRet3 != MV_VS_OK: - self.log.error(f"Failed to get ScPushRate, error code: 0x{nRet3&0xFFFFFFFF:x}") - - if nRet1 == MV_VS_OK and nRet2 == MV_VS_OK and nRet3 == MV_VS_OK: - self.log.debug(f"Push status - Type: {push_type.value}, State: {push_state.value}, Rate: {push_rate.value}") + self._progress_check_count += 1 + if self._progress_check_count >= 20: + self.log.warning(f"Force completing operation after timeout: {self.current_operation}") + self.progress_timer.stop() + self.current_operation = None + self._progress_check_count = 0 - # Check which operation we're monitoring + # If we were switching schemes, try to refresh the module list anyway if self.current_operation == "switch_scheme": - if push_type.value == 2: # LPRP 2 protocol loading progress - progress = push_rate.value - status = "Success" if push_state.value == 0 else "Failed" - - self.log.info(f"Scheme switching progress: {progress}%, Status: {status}") - self.progress_signal.emit(progress, status) - - # If progress is complete or failed, stop the timer - if progress >= 100 or push_state.value == 1: - self.progress_timer.stop() - self.current_operation = None - - if push_state.value == 0: # Success - self.log.info("Scheme switching completed successfully") - # Refresh module list after successful scheme switching - self.refresh_module_list() - else: # Failed - self.log.error("Scheme switching failed") - else: - self.log.debug(f"Waiting for push type 2 (protocol loading), current type: {push_type.value}") - - elif self.current_operation == "ntp_settings": - if push_type.value == 16: # NTPS 16 NTP Time Check Status - progress = push_rate.value - status = "Success" if push_state.value == 0 else "Failed" - - self.log.info(f"NTP settings progress: {progress}%, Status: {status}") - self.progress_signal.emit(progress, status) - - # If progress is complete or failed, stop the timer - if progress >= 100 or push_state.value == 1: - self.progress_timer.stop() - self.current_operation = None - - if push_state.value == 0: # Success - self.log.info("NTP settings completed successfully") - else: # Failed - self.log.error("NTP settings failed") - else: - self.log.debug(f"Waiting for push type 16 (NTP settings), current type: {push_type.value}") - else: - self.log.warning(f"Unknown operation type: {self.current_operation}") - else: - # If we can't get the push status after multiple attempts, try to proceed anyway - self.log.warning("Could not get push status information, continuing with operation") + self.log.info("Attempting to refresh module list after timeout") + self.refresh_module_list() + return + + # Log push status + self.log.debug(f"Push status - Type: {push_status['type_name']} ({push_status['type']}), " + + f"State: {push_status['state_name']} ({push_status['state']}), " + + f"Rate: {push_status['rate']}%") + + # Check which operation we're monitoring + if self.current_operation == "switch_scheme": + if push_status['type'] == 2: # LPRP 2 protocol loading progress + progress = push_status['rate'] + status = push_status['state_name'] - # After 10 seconds (20 checks at 500ms), force completion if we're still waiting - if not hasattr(self, '_progress_check_count'): - self._progress_check_count = 0 + self.log.info(f"Scheme switching progress: {progress}%, Status: {status}") + self.progress_signal.emit(progress, status) - self._progress_check_count += 1 - if self._progress_check_count >= 20: - self.log.warning(f"Force completing operation after timeout: {self.current_operation}") + # If progress is complete or failed, stop the timer + if progress >= 100 or push_status['state'] == 1: self.progress_timer.stop() self.current_operation = None - self._progress_check_count = 0 - # If we were switching schemes, try to refresh the module list anyway - if self.current_operation == "switch_scheme": - self.log.info("Attempting to refresh module list after timeout") + if push_status['state'] == 0: # Success + self.log.info("Scheme switching completed successfully") + # Refresh module list after successful scheme switching self.refresh_module_list() + else: # Failed + self.log.error("Scheme switching failed") + else: + self.log.debug(f"Waiting for push type 2 (protocol loading), current type: {push_status['type']}") + + elif self.current_operation == "ntp_settings": + if push_status['type'] == 16: # NTPS 16 NTP Time Check Status + progress = push_status['rate'] + status = push_status['state_name'] - except Exception as e: - self.log.error(f"Error checking progress: {e}") - self.progress_timer.stop() - self.current_operation = None + self.log.info(f"NTP settings progress: {progress}%, Status: {status}") + self.progress_signal.emit(progress, status) + + # If progress is complete or failed, stop the timer + if progress >= 100 or push_status['state'] == 1: + self.progress_timer.stop() + self.current_operation = None + + if push_status['state'] == 0: # Success + self.log.info("NTP settings completed successfully") + else: # Failed + self.log.error("NTP settings failed") + else: + self.log.debug(f"Waiting for push type 16 (NTP settings), current type: {push_status['type']}") + + elif self.current_operation == "switch_scheme_communications": + if push_status['type'] == 17: # DCLP 17 DI or COMMUNITION switch protocol load progress + progress = push_status['rate'] + status = push_status['state_name'] + + self.log.info(f"Communications scheme switching progress: {progress}%, Status: {status}") + self.progress_signal.emit(progress, status) + + # If progress is complete or failed, stop the timer + if progress >= 100 or push_status['state'] == 1: + self.progress_timer.stop() + self.current_operation = None + + if push_status['state'] == 0: # Success + self.log.info("Communications scheme switching completed successfully") + # Refresh module list after successful scheme switching + self.refresh_module_list() + else: # Failed + self.log.error("Communications scheme switching failed") + else: + self.log.debug(f"Waiting for push type 17 (communications protocol loading), current type: {push_status['type']}") + + else: + self.log.warning(f"Unknown operation type: {self.current_operation}") def refresh_module_list(self): """ @@ -176,8 +270,17 @@ class HikrobotSmartCamera(Component): """ Switch to a different scheme/solution using the API as described in the documentation. + As described in section 5.11 of the documentation: + 1. Set the SrcOperateSolutionName node as the target schema name + 2. Set the Load Scenario command + 3. Get Switch Protocol Progress (monitor ScPushType 2) + 4. Refresh Module List + Args: solution_name: The name of the solution to switch to + + Returns: + bool: True if the scheme switching process was started successfully, False otherwise """ if not self.connected: self.log.error("Cannot switch scheme: Camera not connected") @@ -212,16 +315,50 @@ class HikrobotSmartCamera(Component): else: self.log.info("Successfully sent project load command") - # Start monitoring progress + # 3. Start monitoring progress (ScPushType 2 - protocol loading progress) self.log.info("Starting progress monitoring for scheme switching") self.current_operation = "switch_scheme" self.progress_timer.start(500) # Check every 500ms + # 4. Refresh Module List will be done automatically after the progress is complete + # in the check_progress method + return True except Exception as e: self.log.error(f"Error switching scheme: {e}") return False + def switch_scheme_communications(self, solution_name, comm_str, comm_ret_str): + """ + Switch to a different scheme/solution using the Communications switching mode. + + This method combines setting the Communications switching mode and then monitoring + the progress of the switch. + + Args: + solution_name: The name of the solution to switch to + comm_str: The communication string + comm_ret_str: The communication return string + + Returns: + bool: True if the scheme switching process was started successfully, False otherwise + """ + if not self.connected: + self.log.error("Cannot switch scheme: Camera not connected") + return False + + # First set the Communications switching mode + if not self.set_scheme_switch_mode_communications(solution_name, comm_str, comm_ret_str): + self.log.error("Failed to set Communications switching mode") + return False + + # Start monitoring progress for Communications scheme switching + self.log.info("Starting progress monitoring for Communications scheme switching") + self.current_operation = "switch_scheme_communications" + self.progress_timer.start(500) # Check every 500ms + + return True + def get_ntp_parameters(self): """ Get the current NTP parameters from the camera. @@ -544,6 +681,132 @@ class HikrobotSmartCamera(Component): self.log.error(f"Error getting current scheme: {e}") return None + def set_scheme_switch_mode_off(self, solution_name): + """ + Set the scheme switching mode to OFF. + + As described in section 5.12.1 of the documentation: + 1. Set the DstOperateSolutionName node as the target schema name + 2. Set the protocol switch mode to off + 3. Set protocol parameters command + + Args: + solution_name: The name of the solution to set + + Returns: + bool: True if successful, False otherwise + """ + if not self.connected: + self.log.error("Cannot set scheme switch mode: Camera not connected") + return False + + # Get the first camera handle + if len(self.cam_list) == 0: + self.log.error("Cannot set scheme switch mode: No camera handles available") + return False + + handle = self.cam_list[0]["handle"] + + try: + self.log.info(f"Setting scheme switch mode to OFF for solution: {solution_name}") + + # 1. Set the DstOperateSolutionName node as the target schema name + solution_name_bytes = solution_name.encode('utf-8') + nRet = mv_lib.MV_VS_SetStringValue(handle, b"SrcOperateSolutionName", solution_name_bytes) + if nRet != MV_VS_OK: + self.log.error(f"Failed to set solution name: {solution_name}, error code: 0x{nRet&0xFFFFFFFF:x}") + return False + + # 2. Set the protocol switch mode to off + nRet = mv_lib.MV_VS_SetEnumValueByString(handle, b"SolutionSwitchMode", b"off") + if nRet != MV_VS_OK: + self.log.error(f"Failed to set solution switch mode to OFF, error code: 0x{nRet&0xFFFFFFFF:x}") + return False + + # 3. Set protocol parameters command + nRet = mv_lib.MV_VS_SetCommandValue(handle, b"CommandSolutionSwitchParam") + if nRet != MV_VS_OK: + self.log.error(f"Failed to set solution switch parameters, error code: 0x{nRet&0xFFFFFFFF:x}") + return False + + self.log.info(f"Successfully set scheme switch mode to OFF for solution: {solution_name}") + return True + except Exception as e: + self.log.error(f"Error setting scheme switch mode to OFF: {e}") + return False + + def set_scheme_switch_mode_communications(self, solution_name, comm_str, comm_ret_str): + """ + Set the scheme switching mode to Communications switching. + + As described in section 5.12.2 of the documentation: + 1. Set the DstOperateSolutionName node as the target schema name + 2. Set the protocol switch mode to TriggerCommunication + 3. Set the communication string + 4. Set the communication return value + 5. Set the protocol parameter command + + Args: + solution_name: The name of the solution to set + comm_str: The communication string + comm_ret_str: The communication return string + + Returns: + bool: True if successful, False otherwise + """ + if not self.connected: + self.log.error("Cannot set scheme switch mode: Camera not connected") + return False + + # Get the first camera handle + if len(self.cam_list) == 0: + self.log.error("Cannot set scheme switch mode: No camera handles available") + return False + + handle = self.cam_list[0]["handle"] + + try: + self.log.info(f"Setting scheme switch mode to Communications for solution: {solution_name}") + + # 1. Set the DstOperateSolutionName node as the target schema name + solution_name_bytes = solution_name.encode('utf-8') + nRet = mv_lib.MV_VS_SetStringValue(handle, b"DstOperateSolutionName", solution_name_bytes) + if nRet != MV_VS_OK: + self.log.error(f"Failed to set destination solution name: {solution_name}, error code: 0x{nRet&0xFFFFFFFF:x}") + return False + + # 2. Set the protocol switch mode to TriggerCommunication + nRet = mv_lib.MV_VS_SetEnumValueByString(handle, b"SolutionSwitchMode", b"TriggerCommunication") + if nRet != MV_VS_OK: + self.log.error(f"Failed to set solution switch mode to TriggerCommunication, error code: 0x{nRet&0xFFFFFFFF:x}") + return False + + # 3. Set the communication string + comm_str_bytes = comm_str.encode('utf-8') + nRet = mv_lib.MV_VS_SetStringValue(handle, b"SolutionSwitchString", comm_str_bytes) + if nRet != MV_VS_OK: + self.log.error(f"Failed to set solution switch string: {comm_str}, error code: 0x{nRet&0xFFFFFFFF:x}") + return False + + # 4. Set the communication return value + comm_ret_str_bytes = comm_ret_str.encode('utf-8') + nRet = mv_lib.MV_VS_SetStringValue(handle, b"SolutionSwitchRetString", comm_ret_str_bytes) + if nRet != MV_VS_OK: + self.log.error(f"Failed to set solution switch return string: {comm_ret_str}, error code: 0x{nRet&0xFFFFFFFF:x}") + return False + + # 5. Set the protocol parameter command + nRet = mv_lib.MV_VS_SetCommandValue(handle, b"CommandSolutionSwitchParam") + if nRet != MV_VS_OK: + self.log.error(f"Failed to set solution switch parameters, error code: 0x{nRet&0xFFFFFFFF:x}") + return False + + self.log.info(f"Successfully set scheme switch mode to Communications for solution: {solution_name}") + return True + except Exception as e: + self.log.error(f"Error setting scheme switch mode to Communications: {e}") + return False + def switch_scheme_sync(self, solution_name, timeout=None): """ Synchronously switch to a different scheme/solution and wait for completion.