st-ten-1/src/components/furness_controls_leak_tester.py
2023-10-19 09:37:45 +02:00

292 lines
11 KiB
Python

import re
import sys
import time
from collections import OrderedDict
from PyQt5.QtCore import QMutex
from components.component import Component
from components.furness_controls_fco730_registers import registers as fco730_registers
from components.furness_controls_fco780_registers import registers as fco780_registers
if "--sim-furness-controls" in sys.argv:
from components.dummies.serial import serial
else:
import serial
ETX = b'\x03'
EOT = b'\x04'
ENQ = b'\x05'
ACK = b'\x06'
NACK = b'\x15'
class FurnessControlsLeakTester(Component):
def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True):
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded)
self.enums = None
self.current_status = None
self.product_tags = None
self.bytesize = None
self.timeout = None
self.parity = None
self.conn = None
self.lock = QMutex()
self.stopbits = None
self.baudrate = None
self.port = None
self.model = None
self.settings = None
self.commands = None
self.id1 = b'0'
self.id2 = b'1'
def config_changed(self):
super().config_changed()
self.model = self.config[self.name]["model"].lower()
if self.model == "fco730":
self.commands = fco730_registers["commands"]
self.product_tags = fco730_registers["product_tags"]
self.enums = fco730_registers["enums"]
else:
raise NotImplementedError(f"Furness Controls model {self.model!r} not implemented.")
self.port = self.config[self.name]["port"]
self.baudrate = 9600
self.stopbits = 1
self.parity = serial.PARITY_NONE
self.bytesize = serial.EIGHTBITS
self.timeout = 0.1
if self.conn is not None:
self.conn.close()
self.conn = serial.Serial(
self.port,
baudrate=self.baudrate,
stopbits=self.stopbits,
parity=self.parity,
bytesize=self.bytesize,
timeout=self.timeout,
write_timeout=self.timeout,
)
def _convert_from_format(self, data, formatting=None, decoding_map=None):
if decoding_map is not None and data in decoding_map:
data = decoding_map[data]
if formatting is not None:
# units = self.units[formatting]
# data = [data * units[0], units[1]]
data = data * self.units[formatting][0]
return data
@Component.reconfig_on_error
def _get(self):
# READ INFO
current_status = self.get_status()
info = {
"Real time test pressure output": current_status["pressure_reading"],
"Real time differential pressure output": 0,
"Real time pressure line regulator": 0,
"Active alarm flags": 0,
"Active test program number": 0,
"Running test: active phase": current_status["current_stage"],
"Running test: measured leak": current_status["leak_reading"],
"Running test: test type": 0,
"Running test: sequence index": 0,
"Digital inputs status (mask)": 0,
}
if current_status['new_result_available'] == '1': # NEW TEST RESULT AVAILABLE
last_test_result = self.get_last_result() # READ RESULT TO RESET AVAILABLE FLAG
info.update({"Running test: result": last_test_result['status'],
"Running test: filling pressure": current_status["pressure_reading"],
"Running test: pressure at the end of settling": current_status["pressure_reading"],
})
for round_me in ["measured leak"]:
if round_me in info.keys():
info.update({round_me: float(f"{info[round_me]:.2f}")})
self.log.debug(str(info))
super()._get([info])
def start_test(self, table=None):
self.log.info(f"starting test")
self.send_command("start_test")
def stop_test(self):
self.log.warning("resetting state...")
current_status = self.get_status()
if current_status['status'] in ('ready_to_start',):
self.log.info("ready to start")
elif current_status['status'] in ('pressure_high', 'pressure_low', 'fault'):
self.log.info(f"{current_status['status']}, performing self check to reset")
elif current_status['status'] == "testing":
self.log.info("stop running test")
self.send_command("reset_test")
else:
self.log.error(f"unknown state {current_status['status']}")
time.sleep(2)
def get_status(self):
status_str = str(self.send_enquiry("current_status"), encoding="ascii")
status_str += 'z' # dummy terminator
status_vars = OrderedDict(
{
'a': 'counter',
'b': 'product_number',
'c': 'step_number',
'd': 'new_result_available',
'e': 'mode',
'f': 'status',
'g': 'pressure_reading',
'h': 'pressure_units',
'i': 'leak_reading',
'j': 'leak_offset',
'k': 'current_stage',
'z': 'dummy'
})
status_decoded = {}
for tag, param in status_vars.items():
next_tag = list(status_vars)[list(status_vars.keys()).index(tag) + 1]
match = re.search(f"{tag}([0-9.-]+){next_tag}", status_str)
value = None
if match is not None:
value = match.group(1)
if param == 'status':
value = self.enums['status_status'][value]
elif param == 'current_stage':
value = self.enums['status_current_stage'][value]
elif param == 'pressure_reading':
value = f"{float(value) * 1000:.1f}"
status_decoded[param] = value
if next_tag == 'z':
break
return status_decoded
def get_last_result(self):
res_str = str(self.send_enquiry("last_test_result"), encoding="ascii")
res_str += 'z' # dummy terminator
status_vars = OrderedDict(
{
'a': 'counter',
'b': 'step_number',
'c': 'product_number',
'd': 'status',
'z': 'dummy'
})
res_decoded = {}
for tag, param in status_vars.items():
next_tag = list(status_vars)[list(status_vars.keys()).index(tag) + 1]
match = re.search(f"{tag}([0-9.-]+){next_tag}", res_str)
value = None
if match is not None:
value = match.group(1)
if param == 'status':
value = self.enums['status_result'].get(value, 'NOK')
res_decoded[param] = value
if next_tag == 'z':
break
return res_decoded
def write_recipe(self, recipe, step, table=None):
# PREPARE DATA
product_id = '"' + recipe.part_number[:16] + '"'
product_id = product_id.encode("ascii")
test_press_bar = step.spec["test_pressure"] / 1000
prefill_press_bar = step.spec["pre_filling_pressure"] / 1000
tolerance = int(step.spec["settling_pressure_min_percent"])
fail_pos = (int(step.spec["test_pressure_qpos"]))
fail_neg = (int(step.spec["test_pressure_qneg"]))
fill_time = float(step.spec["filling_time"])
stab_time = float(step.spec["settling_time"])
test_time = float(step.spec["test_time"])
prefill_time = float(step.spec["pre_filling_time"])
vent_time = float(step.spec["flush_time"])
# SEND RECIPE PARAMETERS
self.send_command("change_cur_prod_50")
self.send_product_tag("product_id", product_id)
self.send_product_tag("prefill_pressure", f"{prefill_press_bar:2.3f}")
self.send_product_tag("test_pressure", f"{test_press_bar:2.3f}")
self.send_product_tag("tolerance", f"{tolerance:3.1f}")
self.send_product_tag("+fail", f"{fail_pos:3.1f}")
self.send_product_tag("-fail", f"{fail_neg:3.1f}")
self.send_product_tag("fill_time", f"{fill_time:3.1f}")
self.send_product_tag("stab_time", f"{stab_time:3.1f}")
self.send_product_tag("test_time", f"{test_time:3.1f}")
self.send_product_tag("prefill_time", f"{prefill_time:3.1f}")
self.send_product_tag("vent_time", f"{vent_time:3.1f}")
self.send_product_tag("fail_high", f"{fail_pos:3.1f}")
self.send_product_tag("fail_low", f"{fail_neg:3.1f}")
self.send_product_tag("outputs_a_h", f"{int(0b01000000)}")
self.get_last_result() # CLEAR POSSIBLE NEW RESULT AVAILABLE FLAG
def send_command(self, command):
if type(command) is str:
command = self.commands[command]
out_bytes = bytearray()
out_bytes.extend(EOT)
out_bytes.extend(self.id1)
out_bytes.extend(self.id2)
out_bytes.extend(command)
out_bytes.extend(ETX)
checksum = self.calc_checksum(out_bytes)
out_bytes.append(checksum)
self.lock.lock()
self.conn.write(out_bytes)
response = self.conn.read(1)
self.lock.unlock()
if response == ACK:
return True
else:
self.log.error(f"SEND COMMAND({command}):{response}")
return None
def send_enquiry(self, enquiry):
if type(enquiry) is str:
enquiry = self.commands[enquiry]
out_bytes = bytearray(EOT)
out_bytes.extend(self.id1)
out_bytes.extend(self.id2)
out_bytes.extend(enquiry)
out_bytes.extend(ENQ)
checksum = self.calc_checksum(out_bytes)
out_bytes.append(checksum)
self.lock.lock()
self.conn.write(out_bytes)
response = self.conn.read(100)
self.lock.unlock()
if len(response):
read_checksum = response[-1]
else:
read_checksum = None
calculated_checksum = self.calc_checksum(response[0:-1], start_idx=0)
if read_checksum != calculated_checksum:
self.log.error(f"ENQUIRY RESPONSE CHECKSUM:{read_checksum}!={calculated_checksum}")
return None
else:
response = response[:-2] # strip checksum & ETX
return response
def send_product_tag(self, tag, tag_data):
self.log.info(f"Sending tag:{tag}={tag_data}")
command = bytearray(self.commands["product_data"])
if type(tag_data) is str:
tag_data = bytearray(tag_data, encoding="ascii")
command.extend(self.product_tags[tag])
command.extend(tag_data)
self.send_command(command)
@staticmethod
def calc_checksum(data, start_idx=1):
checksum = 0
for i, data_byte in enumerate(data):
if i < start_idx:
continue # skip EOT
checksum = checksum ^ data_byte
return checksum