st-ten-1/src/components/modbus_component.py

110 lines
4.4 KiB
Python
Raw Normal View History

2022-06-08 07:11:38 +00:00
import sys
import traceback
import serial
from pymodbus.constants import Endian
# from pymodbus.exceptions import ModbusIOException
from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder
from PyQt5.QtCore import QMutex
if "--sim-modbus" not in sys.argv:
from pymodbus.client.sync import ModbusSerialClient as ModbusClient
else:
from components.dummies.pymodbus import ModbusClient
from .component import Component
class ModbusComponent(Component):
def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True, registers=None):
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded)
self.registers = registers if registers is not None else {}
self.lock = QMutex()
def config_changed(self):
self.address = self.config[self.name]["address"]
self.baudrate = int(self.config[self.name]["baudrate"])
self.stopbits = getattr(serial, self.config[self.name].get("stopbits", "stopbits_one").upper())
self.parity = getattr(serial, self.config[self.name].get("parity", "parity_none").upper())
self.bytesize = getattr(serial, self.config[self.name].get("bytesize", "eightbits").upper())
self.byteorder = getattr(Endian, self.config[self.name].get("byteorder", "Big").title())
self.wordorder = getattr(Endian, self.config[self.name].get("wordorder", "Big").title())
self.timeout = int(self.config[self.name].get("timeout", 1))
self.lock.lock()
self.client = ModbusClient(
method="rtu",
port=self.address,
stopbits=self.stopbits,
bytesize=self.bytesize,
parity=self.parity,
baudrate=self.baudrate,
timeout=self.timeout,
strict=False
)
if not self.client.connect():
raise ConnectionError("device not reachable (could not connect): {} ({})".format(self.name, self.port))
if not self.client.is_socket_open():
raise ConnectionError("device not reachable (socket not open): {} ({})".format(self.name, self.port))
self.lock.unlock()
def _read(self, register, count=1):
self.lock.lock()
read = self.client.read_holding_registers(register, count=count)
self.lock.unlock()
if read.isError():
self.log.exception(traceback.format_exception(read))
return None
return read
def _write(self, register, value):
self.lock.lock()
2022-07-04 10:36:51 +00:00
wrote = self.client.write_registers(register, value, skip_encode=True)
2022-06-08 07:11:38 +00:00
self.lock.unlock()
if wrote.isError():
self.log.exception(traceback.format_exception(wrote))
return False
return True
def _decode(self, read, data_type, *args, **kwargs):
decoder = BinaryPayloadDecoder.fromRegisters(read.registers, byteorder=self.byteorder, wordorder=self.wordorder)
return getattr(decoder, f"decode_{data_type}")(*args, **kwargs)
def _encode(self, data, data_type, *args, **kwargs):
builder = BinaryPayloadBuilder(byteorder=self.byteorder, wordorder=self.wordorder)
getattr(builder, f"add_{data_type}")(data, *args, **kwargs)
return builder.build()
def read(self, register, *args, **kwargs):
if type(register) is str:
register, s = self.registers[register]
if not len(args):
args = [s["dt"], *s.get("a", [])]
if not len(kwargs):
kwargs = s.get("k", {})
if args[0].startswith("16bit_"):
count = 1
elif args[0].startswith("32bit_"):
count = 2
else:
raise NotImplementedError(f"data_type {args[0]!r} is not supported")
return self._decode(self._read(register, count=count), *args, **kwargs)
def write(self, register, data, *args, **kwargs):
if type(register) is str:
register, s = self.registers[register]
if not len(args):
args = [s["dt"], *s.get("a", [])]
if not len(kwargs):
kwargs = s.get("k", {})
return self._write(register, self._encode(data, *args, **kwargs))
# def _get(self, data):
# # print("MODBUS", str(int(QThread.currentThreadId())), flush=True)
# super()._get(data)
def __del__(self, event=None):
self.lock.lock()
if self.client.is_socket_open():
self.client.close()
self.lock.unlock()