st-ten-1/src/components/pipe_cutter_component.py

183 lines
6.9 KiB
Python
Raw Normal View History

2025-01-21 14:44:47 +00:00
import time
from .component import Component
2025-01-17 14:14:28 +00:00
from .modbus_component import ModbusComponent
2025-01-21 09:51:06 +00:00
import logging
2025-01-17 14:14:28 +00:00
class PipeCutterComponent(ModbusComponent):
2025-01-21 14:44:47 +00:00
def __init__(self, config=None, name="pipe_cutter", period=1, lazy=True, paused=False, threaded=True):
2025-01-17 14:14:28 +00:00
"""Initialize the Pipe Cutter Component."""
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded)
self.log = logging.getLogger(self.name)
self.current_total_length = None
self.current_od_of_pipe = None
self.machine_status = None
2025-01-21 09:51:06 +00:00
def config_changed(self):
2025-01-21 14:44:47 +00:00
# Initialize client
2025-01-21 09:51:06 +00:00
super().config_changed()
2025-01-21 14:44:47 +00:00
self.log.info(f"Configuration changed for '{self.name}'. Client initialized with: {self.client}")
2025-01-21 09:51:06 +00:00
2025-01-17 14:14:28 +00:00
def read_total_length(self):
2025-01-21 09:51:06 +00:00
"""Read the total length from Register 4X30."""
2025-01-17 14:14:28 +00:00
try:
2025-01-21 09:51:06 +00:00
self.current_total_length = self._read(register=30)
self.log.info(f"Total Length Read (Register 4X30): {self.current_total_length}")
2025-01-17 14:14:28 +00:00
return self.current_total_length
except Exception as e:
2025-01-21 09:51:06 +00:00
self.log.error(f"Error reading total length: {e}")
2025-01-17 14:14:28 +00:00
raise
def read_od_of_pipe(self):
2025-01-21 09:51:06 +00:00
"""Read the OD of the pipe from Register 4X60."""
2025-01-17 14:14:28 +00:00
try:
2025-01-21 09:51:06 +00:00
self.current_od_of_pipe = self._read(register=60)
self.log.info(f"OD of Pipe Read (Register 4X60): {self.current_od_of_pipe}")
2025-01-17 14:14:28 +00:00
return self.current_od_of_pipe
except Exception as e:
2025-01-21 09:51:06 +00:00
self.log.error(f"Error reading OD of pipe: {e}")
2025-01-17 14:14:28 +00:00
raise
2025-01-21 14:44:47 +00:00
@Component.reconfig_on_error
def _get(self):
"""
Retrieve the machine status from register 1 at Modbus address 4x766.
"""
2025-01-17 14:14:28 +00:00
try:
2025-01-21 14:44:47 +00:00
# Read holding register at 4x767 (register 767) for a single register (count=1)
response = self.read(register=766, count=1,)
print(response) #DEBUG
status = response
super()._get([status])
return status
except KeyError as ke:
self.log.error(f"KeyError while accessing Pipe Cutter register 4x766: {ke}")
raise
except ValueError as ve:
self.log.error(f"ValueError: {ve}")
2025-01-17 14:14:28 +00:00
raise
2025-01-21 14:44:47 +00:00
2025-01-17 14:14:28 +00:00
except Exception as e:
2025-01-21 14:44:47 +00:00
self.log.error(f"Unexpected error in Pipe Cutter _get method at 4x766: {e}")
2025-01-17 14:14:28 +00:00
raise
2025-01-21 14:44:47 +00:00
def _convert_from_format(self, data, formatting=None, decoding_map=None):
if decoding_map is not None and data in decoding_map:
data = decoding_map[data]
if formatting is not None:
data = data * self.units[formatting][0]
return data
def _convert_to_format(self, data, formatting=None, encoding_map=None):
if formatting is not None:
data = int(data / self.units[formatting][0])
if encoding_map is not None and data in encoding_map:
data = encoding_map[data]
return data
@Component.reconfig_on_error
def read(self, register, *args, data_type=None, gain=None, offset=None, formatting=None, decoding_map=None,
**kwargs):
if type(register) is str:
register, s = self.registers[register]
if data_type is None:
data_type = s.get("dt", None)
if gain is None:
gain = s.get("g", None)
if offset is None:
offset = s.get("o", None)
if formatting is None:
formatting = s.get("f", None)
if decoding_map is None:
decoding_map = s.get("decoding", None)
if not len(args):
args = s.get("a", [])
if not len(kwargs):
kwargs = s.get("k", {})
if data_type is None:
data_type = "16bit_uint"
if gain is None:
gain = 1
if offset is None:
offset = 0
return self._convert_from_format(
super().read(
register,
*args,
data_type=data_type,
gain=gain,
offset=offset,
slave=1,
**kwargs,
),
formatting=formatting,
decoding_map=decoding_map,
)
@Component.reconfig_on_error
def write(self, register, data, *args, data_type=None, gain=None, offset=None, formatting=None, encoding_map=None,
**kwargs):
if type(register) is str:
register, s = self.registers[register]
if data_type is None:
data_type = s.get("dt", None)
if gain is None:
gain = s.get("g", None)
if offset is None:
offset = s.get("o", None)
if formatting is None:
formatting = s.get("f", None)
if encoding_map is None:
encoding_map = s.get("encoding", None)
if not len(args):
args = s.get("a", [])
if not len(kwargs):
kwargs = s.get("k", {})
if data_type is None:
data_type = "16bit_uint"
if gain is None:
gain = 1
if offset is None:
offset = 0
return super().write(
register,
self._convert_to_format(
data,
formatting=formatting,
encoding_map=encoding_map,
),
*args,
data_type=data_type,
gain=gain,
offset=offset,
**kwargs,
)
def write_bit_with_delay(self, register_address, bit_position, delay_ms):
"""
Set a specific bit in a Modbus register, wait for a specified time, and reset it.
:param register_address: The register address to write to.
:param bit_position: The bit position to modify (0-15).
:param delay_ms: The delay in milliseconds before resetting the bit to 0 .
"""
try:
# Step 1: Set the bit to 1
current_value = self.read(register_address, data_type="16bit_uint")
new_value = current_value | (1 << bit_position) # Set the bit
self.write(register_address, new_value, data_type="16bit_uint")
self.log.info(f"Set bit {bit_position} in register {register_address} to 1")
# Step 2: Wait for the specified delay
time.sleep(delay_ms / 1000) # Convert milliseconds to seconds
# Step 3: Reset the bit to 0
current_value = self.read(register_address, data_type="16bit_uint")
new_value = current_value & ~(1 << bit_position) # Clear the bit
self.write(register_address, new_value, data_type="16bit_uint")
self.log.info(f"Reset bit {bit_position} in register {register_address} to 0")
except Exception as e:
self.log.error(f"Failed to set and reset bit {bit_position} in register {register_address}: {e}")
raise