st-ten-1/src/components/tecna_marposs_provaset_t3.py

121 lines
4.3 KiB
Python
Raw Normal View History

2022-06-01 16:37:19 +00:00
import sys
import traceback
import serial
from lib.helpers import timing
from pymodbus.constants import Endian
from pymodbus.exceptions import ModbusIOException
from PyQt5.QtCore import QMutex, Qt, QThread, QTimer, pyqtSlot
from .atv320_registers import registers
if "--sim-inverter" not in sys.argv:
from pymodbus.client.sync import ModbusSerialClient as ModbusClient
else:
from lib.dummies.pymodbus import ModbusClient
from random import random
from .component import Component
# from pymodbus.client.sync import ModbusSerialClient as ModbusClient
# import serial
# client = ModbusClient(method="rtu", port="COM3", stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, baudrate=115200, timeout=1, strict=False)
# client.connect()
# client.read_holding_registers(1, count=1)
class TecnaMarpossProvasetT3(Component):
def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True):
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded)
self.lock = QMutex()
def config_changed(self):
self.address = self.config["tecna_marposs_provaset_t3"]["address"]
self.baudrate = int(self.config["tecna_marposs_provaset_t3"]["baudrate"])
self.stopbits = getattr(serial, self.config["tecna_marposs_provaset_t3"].get("stopbits", "stopbits_one").upper())
self.parity = getattr(serial, self.config["tecna_marposs_provaset_t3"].get("parity", "parity_none").upper())
self.bytesize = getattr(serial, self.config["tecna_marposs_provaset_t3"].get("bytesize", "eightbits").upper())
self.timeout = int(self.config["tecna_marposs_provaset_t3"].get("timeout", 1))
self.lock.lock()
self.client = ModbusClient(
method="rtu",
port=self.address,
stopbits=self.stopbits,
bytesize=self.bytesize,
parity=self.parity,
baudrate=self.baudrate,
timeout=self.timeout,
strict=False
)
if not self.client.connect():
raise ConnectionError("device not reachable (could not connect): {} ({})".format(self.name, self.port))
if not self.client.is_socket_open():
raise ConnectionError("device not reachable (socket not open): {} ({})".format(self.name, self.port))
self.lock.unlock()
self.registers = registers
self.last_get = {}
self.last_set = {}
self.last_error = 0
def _read(self, register):
if type(register) is str:
r = self.registers[register]
else:
r = register
self.lock.lock()
read = self.client.read_holding_registers(r, count=1)
self.lock.unlock()
if read.isError():
self.log.exception(traceback.format_exception(read))
else:
return read.registers[0]
decoder = BinaryPayloadDecoder.fromRegisters(result.registers, byteorder=Endian.Big, wordorder=Endian.Big)
@staticmethod
def tob(r, n=16):
if r is None:
return None
return "{0:0{n}b}".format(r, n=n)
@pyqtSlot()
def _get(self):
# print("ATV320", str(int(QThread.currentThreadId())), flush=True)
# READ INFO
info = {
"motor speed": self._read("RFRD"),
}
self.last_get = info
self.last_get = self.last_set
self.update.emit([{"time": timing(), self.name: self.last_get}])
self._timer.start()
def set(self, register, value):
if type(register) is str:
r = self.registers[register]
else:
r = register
builder = BinaryPayloadBuilder(byteorder=Endian.Big, wordorder=Endian.Big)
self.lock.lock()
wrote = self.client.write_register(r, value)
self.lock.unlock()
if wrote.isError():
self.log.exception(traceback.format_exception(wrote))
def run_motor(self, rpm):
self.set(rpm, "LFRD")
self.set("on")
self.set("enable")
def pause_motor(self):
self.set("disable")
self.set("off")
self.set("stop")
def __del__(self, event=None):
self.lock.lock()
if self.client.is_socket_open():
self.client.close()
self.lock.unlock()