92 lines
3.4 KiB
Python
92 lines
3.4 KiB
Python
import sys
|
|
|
|
if "--sim-multicomp" in sys.argv:
|
|
from components.dummies.serial import serial
|
|
else:
|
|
import serial
|
|
|
|
from .component import Component
|
|
|
|
|
|
class Multicomp730424(Component):
|
|
def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True):
|
|
self.conn = None
|
|
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded)
|
|
|
|
def config_changed(self):
|
|
self.cable_resistance = float(self.config[self.name].get("cable_resistance", 0))
|
|
self.port = self.config[self.name]["port"]
|
|
self.baudrate = int(self.config[self.name]["baudrate"])
|
|
self.stopbits = getattr(serial, self.config[self.name].get("stopbits", "stopbits_one").upper())
|
|
self.parity = getattr(serial, self.config[self.name].get("parity", "parity_none").upper())
|
|
self.bytesize = getattr(serial, self.config[self.name].get("bytesize", "eightbits").upper())
|
|
self.read_timeout = float(self.config[self.name].get("read_timeout", 1))
|
|
self.write_timeout = float(self.config[self.name].get("write_timeout", 1))
|
|
self.inter_byte_timeout = self.config[self.name].get("inter_byte_timeout", None)
|
|
if self.inter_byte_timeout is not None:
|
|
self.inter_byte_timeout = float(self.inter_byte_timeout)
|
|
if self.conn is not None:
|
|
self.conn.close()
|
|
self.conn = serial.Serial(
|
|
self.port,
|
|
baudrate=self.baudrate,
|
|
stopbits=self.stopbits,
|
|
parity=self.parity,
|
|
bytesize=self.bytesize,
|
|
timeout=self.read_timeout,
|
|
write_timeout=self.write_timeout,
|
|
inter_byte_timeout=self.inter_byte_timeout,
|
|
)
|
|
self.write("FUNC1 \"RES\"")
|
|
self.write("RATE F")
|
|
self.write("AUTO 0")
|
|
if self.config[self.name]["4wire"] == "enabled":
|
|
self.write("CONF:FRES 500")
|
|
else:
|
|
self.write("CONF:RES 500")
|
|
|
|
def set_resistance_scale(self, resistance=None):
|
|
if resistance is None:
|
|
self.write("AUTO 1")
|
|
else:
|
|
if self.config[self.name]["4wire"] == "enabled":
|
|
self.write("CONF:FRES 500")
|
|
else:
|
|
self.write("CONF:RES 500")
|
|
|
|
@Component.reconfig_on_error
|
|
def read(self):
|
|
response = self.conn.readline()
|
|
try:
|
|
decoded = response.decode("ascii", errors="ignore").strip()
|
|
except UnicodeDecodeError:
|
|
return response
|
|
try:
|
|
value = float(decoded)
|
|
except ValueError:
|
|
return decoded
|
|
return value
|
|
|
|
@Component.reconfig_on_error
|
|
def write(self, command):
|
|
self.conn.write(f"{command}\n".encode("ascii"))
|
|
|
|
@Component.reconfig_on_error
|
|
def _get(self):
|
|
self.conn.flush()
|
|
self.conn.reset_output_buffer()
|
|
self.conn.reset_input_buffer()
|
|
info = {}
|
|
for value_name, command in {
|
|
"function": "FUNC1?",
|
|
"scale": "RANGE?",
|
|
"measure": "MEAS1?",
|
|
}.items():
|
|
self.write(command)
|
|
info[value_name] = self.read()
|
|
if isinstance(info["measure"], float) and info["measure"] < 1e9:
|
|
if self.config[self.name]["4wire"] != "enabled":
|
|
info["measure"] -= self.cable_resistance
|
|
self.log.debug(str(info))
|
|
super()._get([info])
|