import sys import traceback if "--sim-serial" in sys.argv: from components.dummies.serial import serial else: import serial from pymodbus.constants import Endian # from pymodbus.exceptions import ModbusIOException from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder if "--sim-modbus" not in sys.argv: from pymodbus.client import ModbusSerialClient as ModbusClient else: from components.dummies.pymodbus import ModbusClient from PyQt5.QtCore import QMutex from .component import Component class ModbusComponent(Component): def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True, registers=None): super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded) self.registers = registers if registers is not None else {} self.lock = QMutex() def config_changed(self): self.method = self.config[self.name].get("method", "rtu").lower() self.port = self.config[self.name]["port"] self.baudrate = int(self.config[self.name]["baudrate"]) self.stopbits = getattr(serial, self.config[self.name].get("stopbits", "stopbits_one").upper()) self.parity = getattr(serial, self.config[self.name].get("parity", "parity_none").upper()) self.bytesize = getattr(serial, self.config[self.name].get("bytesize", "eightbits").upper()) self.byteorder = getattr(Endian, self.config[self.name].get("byteorder", "Big").title()) self.wordorder = getattr(Endian, self.config[self.name].get("wordorder", "Little").title()) self.timeout = int(self.config[self.name].get("timeout", 1)) self.lock.lock() self.client = ModbusClient( method=self.method, port=self.port, stopbits=self.stopbits, bytesize=self.bytesize, parity=self.parity, baudrate=self.baudrate, timeout=self.timeout, strict=False, ) if not self.client.connect(): raise ConnectionError("device not reachable (could not connect): {} ({})".format(self.name, self.port)) if not self.client.is_socket_open(): raise ConnectionError("device not reachable (socket not open): {} ({})".format(self.name, self.port)) self.lock.unlock() def _read(self, register, count=1): while True: self.lock.lock() read = self.client.read_holding_registers(register, count=count) self.lock.unlock() if read.isError(): self.log.exception(traceback.format_exception(type(read), read, read.__traceback__)) # raise read else: break return read def _write(self, register, value): while True: self.lock.lock() wrote = self.client.write_registers(register, value, skip_encode=True) self.lock.unlock() if wrote.isError(): self.log.exception(traceback.format_exception(type(wrote), wrote, wrote.__traceback__)) # raise wrote else: break def _decode(self, read, *args, data_type="16bit_uint", gain=1, offset=0, **kwargs): decoder = BinaryPayloadDecoder.fromRegisters(read.registers, byteorder=self.byteorder, wordorder=self.wordorder) data = getattr(decoder, f"decode_{data_type}")(*args, **kwargs) data = (data - offset) / gain if data_type.endswith("uint"): data = int(abs(data)) elif data_type.endswith("int"): data = int(data) else: raise NotImplementedError(f"data_type {data_type!r} is not supported") return data def _encode(self, data, *args, data_type="16bit_uint", gain=1, offset=0, **kwargs): builder = BinaryPayloadBuilder(byteorder=self.byteorder, wordorder=self.wordorder) data = data * gain + offset if data_type.endswith("uint"): data = int(abs(data)) elif data_type.endswith("int"): data = int(data) else: raise NotImplementedError(f"data_type {data_type!r} is not supported") getattr(builder, f"add_{data_type}")(data, *args, **kwargs) return builder.build() def read(self, register, *args, data_type="16bit_uint", gain=1, offset=0, **kwargs): if data_type.startswith("16bit_"): count = 1 elif data_type.startswith("32bit_"): count = 2 else: raise NotImplementedError(f"data_type {data_type!r} is not supported") return self._decode(self._read(register, count=count), *args, data_type=data_type, gain=gain, offset=offset, **kwargs) def write(self, register, data, *args, data_type="16bit_uint", gain=1, offset=0, **kwargs): self._write(register, self._encode(data, *args, data_type=data_type, gain=gain, offset=offset, **kwargs)) # def _get(self, data): # # print("MODBUS", str(int(QThread.currentThreadId())), flush=True) # super()._get(data) def __del__(self, event=None): self.lock.lock() if self.client.is_socket_open(): self.client.close() self.lock.unlock()