import sys from components.component import Component from components.furness_controls_fco730_registers import registers as fco730_registers from components.furness_controls_fco780_registers import registers as fco780_registers from components.furness_controls_fco730_registers import settings as fco730_settings from components.furness_controls_fco780_registers import settings as fco780_settings if "--sim-furness-controls" in sys.argv: from components.dummies.serial import serial else: import serial ETX=0x03 EOT=0x04 ENQ=0x05 ACK=0X06 NACK=0X15 class FurnessControlsLeakTester(Component): def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True): super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded, registers=None) self.bytesize = None self.timeout = None self.parity = None self.conn = None self.stopbits = None self.baudrate = None self.port = None self.model = None self.settings = None self.registers = None self.id1=0 self.id2=1 def config_changed(self): super().config_changed() self.model = self.config[self.name]["model"].lower() if self.model == "fco730": self.registers = fco730_registers self.settings = fco730_settings elif self.model == "fco730": self.registers = fco780_registers self.settings = fco780_settings else: raise NotImplementedError(f"Furness Controls model {self.model!r} not implemented.") self.port = self.config[self.name]["port"] self.baudrate = int(self.config[self.name]["baudrate"]) self.stopbits = getattr(serial, self.config[self.name].get("stopbits", "stopbits_one").upper()) self.parity = getattr(serial, self.config[self.name].get("parity", "parity_none").upper()) self.bytesize = getattr(serial, self.config[self.name].get("bytesize", "eightbits").upper()) self.timeout = float(self.config[self.name].get("read_timeout", 1)) if self.conn is not None: self.conn.close() self.conn = serial.Serial( self.port, baudrate=self.baudrate, stopbits=self.stopbits, parity=self.parity, bytesize=self.bytesize, timeout=self.timeout, write_timeout=self.timeout, #inter_byte_timeout=self.timeout, ) def _convert_from_format(self, data, formatting=None, decoding_map=None): if decoding_map is not None and data in decoding_map: data = decoding_map[data] if formatting is not None: # units = self.units[formatting] # data = [data * units[0], units[1]] data = data * self.units[formatting][0] return data def _convert_to_format(self, data, formatting=None, encoding_map=None): if formatting is not None: data = int(data / self.units[formatting][0]) if encoding_map is not None and data in encoding_map: data = encoding_map[data] return data @Component.reconfig_on_error def read(self, register, *args, data_type=None, gain=None, offset=None, formatting=None, decoding_map=None, **kwargs): if type(register) is str: register, s = self.registers[register] if data_type is None: data_type = s.get("dt", None) if gain is None: gain = s.get("g", None) if offset is None: offset = s.get("o", None) if formatting is None: formatting = s.get("f", None) if decoding_map is None: decoding_map = s.get("decoding", None) if not len(args): args = s.get("a", []) if not len(kwargs): kwargs = s.get("k", {}) if data_type is None: data_type = "16bit_uint" if gain is None: gain = 1 if offset is None: offset = 0 return self._convert_from_format( super().read( register, *args, data_type=data_type, gain=gain, offset=offset, **kwargs, ), formatting=formatting, decoding_map=decoding_map, ) @Component.reconfig_on_error def write(self, register, data, *args, data_type=None, gain=None, offset=None, formatting=None, encoding_map=None, **kwargs): if type(register) is str: register, s = self.registers[register] if data_type is None: data_type = s.get("dt", None) if gain is None: gain = s.get("g", None) if offset is None: offset = s.get("o", None) if formatting is None: formatting = s.get("f", None) if encoding_map is None: encoding_map = s.get("encoding", None) if not len(args): args = s.get("a", []) if not len(kwargs): kwargs = s.get("k", {}) if data_type is None: data_type = "16bit_uint" if gain is None: gain = 1 if offset is None: offset = 0 return super().write( register, self._convert_to_format( data, formatting=formatting, encoding_map=encoding_map, ), *args, data_type=data_type, gain=gain, offset=offset, **kwargs, ) @Component.reconfig_on_error def _get(self): # READ INFO info = {r: self.read(r) for r in [ "Real time test pressure output", "Real time differential pressure output", "Real time pressure line regulator", "Active alarm flags", "Active test program number", "Running test: active phase", "Running test: test type", "Running test: sequence index", "Digital inputs status (mask)", # "Digital outputs status (mask)", ]} if self.model == "t3p": pass elif self.model == "t3l": info.update({r: self.read(r) for r in [ "Active not severe alarm flags", ]}) else: raise NotImplementedError(f"Tecna t3 model {self.model!r} not implemented.") if info["Running test: active phase"] == "FINE TEST": # "END TEST, WAITING THE START OF A NEW TEST": info.update(self.get_test_results()) for round_me in ["measured leak"]: if round_me in info.keys(): info.update({round_me:float(f"{info[round_me]:.2f}")}) self.log.debug(str(info)) super()._get([info]) @Component.reconfig_on_error def _set(self, val=None): if val is not None: # handle request: for register, value in val.items(): print(register, value) self.write(register, value) super()._set(val) def start_test(self, table=None): if table is None: table = self.max_program_number self.log.info(f"starting test table {table!r}") self.write("Source of test program number selection", "FROM PARAMETER (SET BY LCD OR SERIAL LINE)") self.write("Selected program", table) self.write("Start test", table) def stop_test(self): self.log.warning("stopping test") self.write("Reset running test", 0) def get_test_results(self): self.log.info("getting test results") return {r: self.read(r) for r in [ #"Running test: phase backwards time", "Running test: filling pressure", "Running test: pressure at the end of settling", #"Running test: burst pressure", "Running test: measured leak", #"Running test: calculated leak flow rate", #"Running test: calculate RVP%", "Running test: result", ]} def write_recipe(self, recipe, step, table=None): if table is None: table = self.max_program_number recipe_name = recipe.part_number[:16].encode("ascii") recipe_name += b"\x00" * (16 - len(recipe_name)) recipe_barcode = f"j{recipe.part_number}"[:16].encode("ascii") recipe_barcode += b"\x00" * (24 - len(recipe_barcode)) test_flags = 0b0110100001010100 if (step.spec.get("autotest", False) in ["ko_check"]) else 0b0110000001010100 pid_mode = int(self.config["recipes_defaults"]["pid_mode"])<<4 test_flags = test_flags | pid_mode pid_ramps=0b0000000000000000 | int(self.config["recipes_defaults"]["pid_level"])<<8 | int(self.config["recipes_defaults"]["pid_speed"])<<12 spec = { "Flag: Instrument settings": 0b0000000000000000, "Test program for read/write operation": table, **{719 - 1 + i: (recipe_name[i * 2 + 1] << 8) + recipe_name[i * 2] for i in range(8)}, # program name **{727 - 1 + i: (recipe_barcode[i * 2 + 1] << 8) + recipe_barcode[i * 2] for i in range(12)}, # program associated bar-code **{761 - 1 + i: (recipe_name[i * 2 + 1] << 8) + recipe_name[i * 2] for i in range(8)}, # print field 1 # **{769 - 1 + i: (recipe_name[i * 2 + 1] << 8) + recipe_name[i * 2] for i in range(8)}, # print field 2 "Print options": 0b0000000000000000 | self.saver_label_count << 12 | self.saver_print_on_fail << 8 | self.saver_label_template, "Test type": "Leak Test", "Test flags": test_flags, "T0 - Pre-filling time": step.spec["pre_filling_time"], "P0 - Pre-filling pressure": step.spec["pre_filling_pressure"], "T1 - Filling time": step.spec["filling_time"], "T2 - Settling time": step.spec["settling_time"], "PR- - Min pressure tolerance %": step.spec["settling_pressure_min_percent"], "PR+ - Max pressure tolerance % (P+)": step.spec["settling_pressure_max_percent"], "T3 - Measure time": step.spec["test_time"], "Q- Lower test leak limit": step.spec["test_pressure_qneg"], "PREL - Nominal test pressure": step.spec["test_pressure"], "Q+ Upper test leak limit": step.spec["test_pressure_qpos"], "FST - Discharge time": step.spec["flush_time"], "FSL - Discharge limit": step.spec["flush_pressure"], "PSQ - Next sequence program PSOUT mode": 0, "RAMPS: T1 configuration": pid_ramps, "PID: pressure correction": 100, "Various flags": 0b0000000000010000 if self.config["recipes_defaults"]["tecna_discharge_enable"]=="yes" else 0b0000000000000000 } self.log.debug(str(spec)) for register, value in spec.items(): self.write(register, value) def send_command(self,command): out_bytes= bytearray() out_bytes.append(EOT) out_bytes.append(self.id1) out_bytes.append(self.id2) self.conn.write(out_bytes) response = self.conn.read(1) if response != ACK: self.log.error(f"SEND COMMAND:{response}") def send_enquiry(self,enquiry): out_bytes=bytearray(EOT) out_bytes.append(self.id1) out_bytes.append(self.id2) out_bytes.append(enquiry) out_bytes.append(ENQ) checksum=self.calc_checksum(out_bytes) out_bytes.append(checksum) self.conn.write(out_bytes) response = self.conn.read(1) if response != ACK: self.log.error(f"SEND COMMAND:{response}") def calc_checksum(self,data): checksum=0 for i,data_byte in enumerate(data): if i==0: continue # skip EOT checksum=checksum^data_byte return checksum