2023-10-11 07:46:38 +00:00
|
|
|
import sys
|
|
|
|
|
|
2023-10-11 13:15:42 +00:00
|
|
|
from components.component import Component
|
|
|
|
|
from components.furness_controls_fco730_registers import registers as fco730_registers
|
|
|
|
|
from components.furness_controls_fco780_registers import registers as fco780_registers
|
|
|
|
|
from components.furness_controls_fco730_registers import settings as fco730_settings
|
|
|
|
|
from components.furness_controls_fco780_registers import settings as fco780_settings
|
2023-10-11 07:46:38 +00:00
|
|
|
if "--sim-furness-controls" in sys.argv:
|
|
|
|
|
from components.dummies.serial import serial
|
|
|
|
|
else:
|
|
|
|
|
import serial
|
|
|
|
|
|
2023-10-11 14:26:34 +00:00
|
|
|
ETX=b'\x03'
|
|
|
|
|
EOT=b'\x04'
|
|
|
|
|
ENQ=b'\x05'
|
|
|
|
|
ACK=b'\x06'
|
|
|
|
|
NACK=b'\x15'
|
2023-10-07 16:14:30 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
class FurnessControlsLeakTester(Component):
|
|
|
|
|
|
|
|
|
|
def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True):
|
2023-10-11 14:26:34 +00:00
|
|
|
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded)
|
2023-10-11 07:46:38 +00:00
|
|
|
self.bytesize = None
|
|
|
|
|
self.timeout = None
|
|
|
|
|
self.parity = None
|
|
|
|
|
self.conn = None
|
|
|
|
|
self.stopbits = None
|
|
|
|
|
self.baudrate = None
|
|
|
|
|
self.port = None
|
|
|
|
|
self.model = None
|
|
|
|
|
self.settings = None
|
|
|
|
|
self.registers = None
|
2023-10-11 14:26:34 +00:00
|
|
|
self.id1=b'0'
|
|
|
|
|
self.id2=b'1'
|
2023-10-07 16:14:30 +00:00
|
|
|
|
|
|
|
|
def config_changed(self):
|
|
|
|
|
super().config_changed()
|
|
|
|
|
self.model = self.config[self.name]["model"].lower()
|
|
|
|
|
if self.model == "fco730":
|
|
|
|
|
self.registers = fco730_registers
|
2023-10-11 07:46:38 +00:00
|
|
|
self.settings = fco730_settings
|
2023-10-07 16:14:30 +00:00
|
|
|
elif self.model == "fco730":
|
|
|
|
|
self.registers = fco780_registers
|
2023-10-11 07:46:38 +00:00
|
|
|
self.settings = fco780_settings
|
2023-10-07 16:14:30 +00:00
|
|
|
else:
|
|
|
|
|
raise NotImplementedError(f"Furness Controls model {self.model!r} not implemented.")
|
2023-10-11 07:46:38 +00:00
|
|
|
self.port = self.config[self.name]["port"]
|
2023-10-11 14:26:34 +00:00
|
|
|
self.baudrate = 9600
|
|
|
|
|
self.stopbits = 1
|
|
|
|
|
self.parity = serial.PARITY_NONE
|
|
|
|
|
self.bytesize = serial.EIGHTBITS
|
|
|
|
|
self.timeout = 0.5
|
2023-10-11 07:46:38 +00:00
|
|
|
if self.conn is not None:
|
|
|
|
|
self.conn.close()
|
|
|
|
|
self.conn = serial.Serial(
|
|
|
|
|
self.port,
|
|
|
|
|
baudrate=self.baudrate,
|
|
|
|
|
stopbits=self.stopbits,
|
|
|
|
|
parity=self.parity,
|
|
|
|
|
bytesize=self.bytesize,
|
|
|
|
|
timeout=self.timeout,
|
|
|
|
|
write_timeout=self.timeout,
|
|
|
|
|
#inter_byte_timeout=self.timeout,
|
|
|
|
|
)
|
2023-10-07 16:14:30 +00:00
|
|
|
|
|
|
|
|
def _convert_from_format(self, data, formatting=None, decoding_map=None):
|
|
|
|
|
if decoding_map is not None and data in decoding_map:
|
|
|
|
|
data = decoding_map[data]
|
|
|
|
|
if formatting is not None:
|
|
|
|
|
# units = self.units[formatting]
|
|
|
|
|
# data = [data * units[0], units[1]]
|
|
|
|
|
data = data * self.units[formatting][0]
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
def _convert_to_format(self, data, formatting=None, encoding_map=None):
|
|
|
|
|
if formatting is not None:
|
|
|
|
|
data = int(data / self.units[formatting][0])
|
|
|
|
|
if encoding_map is not None and data in encoding_map:
|
|
|
|
|
data = encoding_map[data]
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
@Component.reconfig_on_error
|
|
|
|
|
def read(self, register, *args, data_type=None, gain=None, offset=None, formatting=None, decoding_map=None, **kwargs):
|
|
|
|
|
if type(register) is str:
|
|
|
|
|
register, s = self.registers[register]
|
|
|
|
|
if data_type is None:
|
|
|
|
|
data_type = s.get("dt", None)
|
|
|
|
|
if gain is None:
|
|
|
|
|
gain = s.get("g", None)
|
|
|
|
|
if offset is None:
|
|
|
|
|
offset = s.get("o", None)
|
|
|
|
|
if formatting is None:
|
|
|
|
|
formatting = s.get("f", None)
|
|
|
|
|
if decoding_map is None:
|
|
|
|
|
decoding_map = s.get("decoding", None)
|
|
|
|
|
if not len(args):
|
|
|
|
|
args = s.get("a", [])
|
|
|
|
|
if not len(kwargs):
|
|
|
|
|
kwargs = s.get("k", {})
|
|
|
|
|
if data_type is None:
|
|
|
|
|
data_type = "16bit_uint"
|
|
|
|
|
if gain is None:
|
|
|
|
|
gain = 1
|
|
|
|
|
if offset is None:
|
|
|
|
|
offset = 0
|
|
|
|
|
return self._convert_from_format(
|
|
|
|
|
super().read(
|
|
|
|
|
register,
|
|
|
|
|
*args,
|
|
|
|
|
data_type=data_type,
|
|
|
|
|
gain=gain,
|
|
|
|
|
offset=offset,
|
|
|
|
|
**kwargs,
|
|
|
|
|
),
|
|
|
|
|
formatting=formatting,
|
|
|
|
|
decoding_map=decoding_map,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
@Component.reconfig_on_error
|
|
|
|
|
def write(self, register, data, *args, data_type=None, gain=None, offset=None, formatting=None, encoding_map=None, **kwargs):
|
|
|
|
|
if type(register) is str:
|
|
|
|
|
register, s = self.registers[register]
|
|
|
|
|
if data_type is None:
|
|
|
|
|
data_type = s.get("dt", None)
|
|
|
|
|
if gain is None:
|
|
|
|
|
gain = s.get("g", None)
|
|
|
|
|
if offset is None:
|
|
|
|
|
offset = s.get("o", None)
|
|
|
|
|
if formatting is None:
|
|
|
|
|
formatting = s.get("f", None)
|
|
|
|
|
if encoding_map is None:
|
|
|
|
|
encoding_map = s.get("encoding", None)
|
|
|
|
|
if not len(args):
|
|
|
|
|
args = s.get("a", [])
|
|
|
|
|
if not len(kwargs):
|
|
|
|
|
kwargs = s.get("k", {})
|
|
|
|
|
if data_type is None:
|
|
|
|
|
data_type = "16bit_uint"
|
|
|
|
|
if gain is None:
|
|
|
|
|
gain = 1
|
|
|
|
|
if offset is None:
|
|
|
|
|
offset = 0
|
|
|
|
|
return super().write(
|
|
|
|
|
register,
|
|
|
|
|
self._convert_to_format(
|
|
|
|
|
data,
|
|
|
|
|
formatting=formatting,
|
|
|
|
|
encoding_map=encoding_map,
|
|
|
|
|
),
|
|
|
|
|
*args,
|
|
|
|
|
data_type=data_type,
|
|
|
|
|
gain=gain,
|
|
|
|
|
offset=offset,
|
|
|
|
|
**kwargs,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
@Component.reconfig_on_error
|
|
|
|
|
def _get(self):
|
|
|
|
|
# READ INFO
|
|
|
|
|
info = {r: self.read(r) for r in [
|
|
|
|
|
"Real time test pressure output",
|
|
|
|
|
"Real time differential pressure output",
|
|
|
|
|
"Real time pressure line regulator",
|
|
|
|
|
"Active alarm flags",
|
|
|
|
|
"Active test program number",
|
|
|
|
|
"Running test: active phase",
|
|
|
|
|
"Running test: test type",
|
|
|
|
|
"Running test: sequence index",
|
|
|
|
|
"Digital inputs status (mask)",
|
|
|
|
|
# "Digital outputs status (mask)",
|
|
|
|
|
]}
|
|
|
|
|
if self.model == "t3p":
|
|
|
|
|
pass
|
|
|
|
|
elif self.model == "t3l":
|
|
|
|
|
info.update({r: self.read(r) for r in [
|
|
|
|
|
"Active not severe alarm flags",
|
|
|
|
|
]})
|
|
|
|
|
else:
|
|
|
|
|
raise NotImplementedError(f"Tecna t3 model {self.model!r} not implemented.")
|
|
|
|
|
if info["Running test: active phase"] == "FINE TEST": # "END TEST, WAITING THE START OF A NEW TEST":
|
|
|
|
|
info.update(self.get_test_results())
|
|
|
|
|
for round_me in ["measured leak"]:
|
|
|
|
|
if round_me in info.keys():
|
|
|
|
|
info.update({round_me:float(f"{info[round_me]:.2f}")})
|
|
|
|
|
self.log.debug(str(info))
|
|
|
|
|
super()._get([info])
|
|
|
|
|
|
|
|
|
|
@Component.reconfig_on_error
|
|
|
|
|
def _set(self, val=None):
|
|
|
|
|
if val is not None: # handle request:
|
|
|
|
|
for register, value in val.items():
|
|
|
|
|
print(register, value)
|
|
|
|
|
self.write(register, value)
|
|
|
|
|
super()._set(val)
|
|
|
|
|
|
|
|
|
|
def start_test(self, table=None):
|
|
|
|
|
if table is None:
|
|
|
|
|
table = self.max_program_number
|
|
|
|
|
self.log.info(f"starting test table {table!r}")
|
|
|
|
|
self.write("Source of test program number selection", "FROM PARAMETER (SET BY LCD OR SERIAL LINE)")
|
|
|
|
|
self.write("Selected program", table)
|
|
|
|
|
self.write("Start test", table)
|
|
|
|
|
|
|
|
|
|
def stop_test(self):
|
|
|
|
|
self.log.warning("stopping test")
|
2023-10-11 14:40:46 +00:00
|
|
|
pass
|
2023-10-07 16:14:30 +00:00
|
|
|
|
|
|
|
|
def get_test_results(self):
|
|
|
|
|
self.log.info("getting test results")
|
|
|
|
|
return {r: self.read(r) for r in [
|
|
|
|
|
#"Running test: phase backwards time",
|
|
|
|
|
"Running test: filling pressure",
|
|
|
|
|
"Running test: pressure at the end of settling",
|
|
|
|
|
#"Running test: burst pressure",
|
|
|
|
|
"Running test: measured leak",
|
|
|
|
|
#"Running test: calculated leak flow rate",
|
|
|
|
|
#"Running test: calculate RVP%",
|
|
|
|
|
"Running test: result",
|
|
|
|
|
]}
|
|
|
|
|
|
|
|
|
|
def write_recipe(self, recipe, step, table=None):
|
|
|
|
|
if table is None:
|
|
|
|
|
table = self.max_program_number
|
|
|
|
|
recipe_name = recipe.part_number[:16].encode("ascii")
|
|
|
|
|
recipe_name += b"\x00" * (16 - len(recipe_name))
|
|
|
|
|
recipe_barcode = f"j{recipe.part_number}"[:16].encode("ascii")
|
|
|
|
|
recipe_barcode += b"\x00" * (24 - len(recipe_barcode))
|
|
|
|
|
test_flags = 0b0110100001010100 if (step.spec.get("autotest", False) in ["ko_check"]) else 0b0110000001010100
|
|
|
|
|
pid_mode = int(self.config["recipes_defaults"]["pid_mode"])<<4
|
|
|
|
|
test_flags = test_flags | pid_mode
|
|
|
|
|
pid_ramps=0b0000000000000000 | int(self.config["recipes_defaults"]["pid_level"])<<8 | int(self.config["recipes_defaults"]["pid_speed"])<<12
|
|
|
|
|
spec = {
|
|
|
|
|
"Flag: Instrument settings": 0b0000000000000000,
|
|
|
|
|
"Test program for read/write operation": table,
|
|
|
|
|
**{719 - 1 + i: (recipe_name[i * 2 + 1] << 8) + recipe_name[i * 2] for i in range(8)}, # program name
|
|
|
|
|
**{727 - 1 + i: (recipe_barcode[i * 2 + 1] << 8) + recipe_barcode[i * 2] for i in range(12)}, # program associated bar-code
|
|
|
|
|
**{761 - 1 + i: (recipe_name[i * 2 + 1] << 8) + recipe_name[i * 2] for i in range(8)}, # print field 1
|
|
|
|
|
# **{769 - 1 + i: (recipe_name[i * 2 + 1] << 8) + recipe_name[i * 2] for i in range(8)}, # print field 2
|
|
|
|
|
"Print options": 0b0000000000000000 | self.saver_label_count << 12 | self.saver_print_on_fail << 8 | self.saver_label_template,
|
|
|
|
|
"Test type": "Leak Test",
|
|
|
|
|
"Test flags": test_flags,
|
|
|
|
|
"T0 - Pre-filling time": step.spec["pre_filling_time"],
|
|
|
|
|
"P0 - Pre-filling pressure": step.spec["pre_filling_pressure"],
|
|
|
|
|
"T1 - Filling time": step.spec["filling_time"],
|
|
|
|
|
"T2 - Settling time": step.spec["settling_time"],
|
|
|
|
|
"PR- - Min pressure tolerance %": step.spec["settling_pressure_min_percent"],
|
|
|
|
|
"PR+ - Max pressure tolerance % (P+)": step.spec["settling_pressure_max_percent"],
|
|
|
|
|
"T3 - Measure time": step.spec["test_time"],
|
|
|
|
|
"Q- Lower test leak limit": step.spec["test_pressure_qneg"],
|
|
|
|
|
"PREL - Nominal test pressure": step.spec["test_pressure"],
|
|
|
|
|
"Q+ Upper test leak limit": step.spec["test_pressure_qpos"],
|
|
|
|
|
"FST - Discharge time": step.spec["flush_time"],
|
|
|
|
|
"FSL - Discharge limit": step.spec["flush_pressure"],
|
|
|
|
|
"PSQ - Next sequence program PSOUT mode": 0,
|
|
|
|
|
"RAMPS: T1 configuration": pid_ramps,
|
|
|
|
|
"PID: pressure correction": 100,
|
|
|
|
|
"Various flags": 0b0000000000010000 if self.config["recipes_defaults"]["tecna_discharge_enable"]=="yes" else 0b0000000000000000
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
self.log.debug(str(spec))
|
|
|
|
|
for register, value in spec.items():
|
|
|
|
|
self.write(register, value)
|
2023-10-11 07:46:38 +00:00
|
|
|
|
|
|
|
|
def send_command(self,command):
|
|
|
|
|
out_bytes= bytearray()
|
2023-10-11 14:26:34 +00:00
|
|
|
out_bytes.extend(EOT)
|
|
|
|
|
out_bytes.extend(self.id1)
|
|
|
|
|
out_bytes.extend(self.id2)
|
|
|
|
|
out_bytes.extend(command)
|
|
|
|
|
out_bytes.extend(ETX)
|
|
|
|
|
checksum=self.calc_checksum(out_bytes)
|
|
|
|
|
out_bytes.append(checksum)
|
2023-10-11 07:46:38 +00:00
|
|
|
|
|
|
|
|
self.conn.write(out_bytes)
|
2023-10-11 14:26:34 +00:00
|
|
|
response = self.conn.read(100)
|
|
|
|
|
if response == ACK:
|
|
|
|
|
return True
|
|
|
|
|
else:
|
2023-10-11 07:46:38 +00:00
|
|
|
self.log.error(f"SEND COMMAND:{response}")
|
2023-10-11 14:26:34 +00:00
|
|
|
return None
|
2023-10-11 07:46:38 +00:00
|
|
|
|
|
|
|
|
def send_enquiry(self,enquiry):
|
|
|
|
|
out_bytes=bytearray(EOT)
|
2023-10-11 14:26:34 +00:00
|
|
|
out_bytes.extend(self.id1)
|
|
|
|
|
out_bytes.extend(self.id2)
|
|
|
|
|
out_bytes.extend(enquiry)
|
|
|
|
|
out_bytes.extend(ENQ)
|
2023-10-11 07:46:38 +00:00
|
|
|
checksum=self.calc_checksum(out_bytes)
|
|
|
|
|
out_bytes.append(checksum)
|
|
|
|
|
|
|
|
|
|
self.conn.write(out_bytes)
|
2023-10-11 14:26:34 +00:00
|
|
|
response = self.conn.read(100)
|
|
|
|
|
read_checksum = response[-1]
|
|
|
|
|
calculated_checksum = self.calc_checksum(response[0:-1],start_idx=0)
|
|
|
|
|
if read_checksum != calculated_checksum:
|
2023-10-11 07:46:38 +00:00
|
|
|
self.log.error(f"SEND COMMAND:{response}")
|
2023-10-11 14:26:34 +00:00
|
|
|
else:
|
|
|
|
|
return response
|
2023-10-11 07:46:38 +00:00
|
|
|
|
2023-10-11 14:26:34 +00:00
|
|
|
def calc_checksum(self,data,start_idx=1):
|
2023-10-11 07:46:38 +00:00
|
|
|
checksum=0
|
|
|
|
|
for i,data_byte in enumerate(data):
|
2023-10-11 14:26:34 +00:00
|
|
|
if i<start_idx:
|
2023-10-11 07:46:38 +00:00
|
|
|
continue # skip EOT
|
|
|
|
|
checksum=checksum^data_byte
|
|
|
|
|
return checksum
|