From 356857bb58f04ea14f289c2868e52bb430e32c78 Mon Sep 17 00:00:00 2001 From: edo-neo Date: Tue, 14 Jan 2025 10:13:26 +0100 Subject: [PATCH] dev --- src/test/modbus_read_data.py | 72 ++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 src/test/modbus_read_data.py diff --git a/src/test/modbus_read_data.py b/src/test/modbus_read_data.py new file mode 100644 index 0000000..18e68a8 --- /dev/null +++ b/src/test/modbus_read_data.py @@ -0,0 +1,72 @@ +import serial +from pymodbus.client.sync import ModbusSerialClient as ModbusClient +from pymodbus.exceptions import ModbusIOException +import time + + +def read_modbus_data(client, starting_register, register_count): + """ + Reads data from the Modbus server. + + :param client: An instance of ModbusClient + :param starting_register: The address of the register to start reading from + :param register_count: Number of registers to read + :return: List of register values or None if there was an error + """ + try: + read_data = client.read_holding_registers(starting_register, count=register_count) + if isinstance(read_data, ModbusIOException): + print("Error: ModbusIOException occurred during read operation.") + return None + else: + return read_data.registers + except Exception as e: + print(f"Error: {e}") + return None + + +def main(): + # Modbus client configuration + client = ModbusClient( + method="rtu", + port="/dev/ttyUSB0", # Specify the correct port for your serial connection + stopbits=serial.STOPBITS_ONE, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, + baudrate=38400, + timeout=1, # Timeout in seconds + strict=False + ) + + # Attempt to connect to the Modbus server + if client.connect(): + print("Modbus client connected successfully.") + else: + print("Failed to connect to Modbus client.") + return + + # Starting the data read loop + try: + starting_register = 1 # Define the starting register address + register_count = 10 # Define the number of registers to read + + print("Starting data read loop...") + while True: + data = read_modbus_data(client, starting_register, register_count) + if data: + # Print read data + for i, value in enumerate(data): + print(f"Register {starting_register + i}: {value}") + else: + print("Failed to read data from Modbus server.") + + time.sleep(1) # Wait 1 second before the next read (adjust as necessary) + except KeyboardInterrupt: + print("\nStopping read loop...") + finally: + client.close() + print("Modbus client connection closed.") + + +if __name__ == "__main__": + main()