import sys from components.component import Component from components.furness_controls_fco730_registers import registers as fco730_registers from components.furness_controls_fco780_registers import registers as fco780_registers from components.furness_controls_fco730_registers import product_tags as fco730_product_tags from components.furness_controls_fco780_registers import product_tags as fco780_product_tags if "--sim-furness-controls" in sys.argv: from components.dummies.serial import serial else: import serial ETX=b'\x03' EOT=b'\x04' ENQ=b'\x05' ACK=b'\x06' NACK=b'\x15' class FurnessControlsLeakTester(Component): def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True): super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded) self.product_tags = None self.bytesize = None self.timeout = None self.parity = None self.conn = None self.stopbits = None self.baudrate = None self.port = None self.model = None self.settings = None self.registers = None self.id1=b'0' self.id2=b'1' def config_changed(self): super().config_changed() self.model = self.config[self.name]["model"].lower() if self.model == "fco730": self.registers = fco730_registers self.product_tags = fco730_product_tags elif self.model == "fco780": self.registers = fco780_registers self.product_tags = fco780_product_tags else: raise NotImplementedError(f"Furness Controls model {self.model!r} not implemented.") self.port = self.config[self.name]["port"] self.baudrate = 9600 self.stopbits = 1 self.parity = serial.PARITY_NONE self.bytesize = serial.EIGHTBITS self.timeout = 0.2 if self.conn is not None: self.conn.close() self.conn = serial.Serial( self.port, baudrate=self.baudrate, stopbits=self.stopbits, parity=self.parity, bytesize=self.bytesize, timeout=self.timeout, write_timeout=self.timeout, #inter_byte_timeout=self.timeout, ) def _convert_from_format(self, data, formatting=None, decoding_map=None): if decoding_map is not None and data in decoding_map: data = decoding_map[data] if formatting is not None: # units = self.units[formatting] # data = [data * units[0], units[1]] data = data * self.units[formatting][0] return data @Component.reconfig_on_error def _get(self): # READ INFO current_status=self.send_enquiry("current_status") self.log.info(str(current_status)) #super()._get([]) def start_test(self, table=None): if table is None: table = self.max_program_number self.log.info(f"starting test table {table!r}") self.write("Source of test program number selection", "FROM PARAMETER (SET BY LCD OR SERIAL LINE)") self.write("Selected program", table) self.write("Start test", table) def stop_test(self): self.log.warning("stopping test") self.current_status = self.send_enquiry("current_status") status = self.get_status_param("state") if status =='0': self.log.info("ready to start") else: self.log.warning("error state, performing self check to reset") self.send_command("self_check") def get_status_param(self,param): par_len = 1 if param=="state": tag="f" tag_idx=self.current_status.find(b'f') data=self.current_status[tag_idx+1:tag_idx+1+par_len] return str(data,encoding="ascii") def get_test_results(self): self.log.info("getting test results") def write_recipe(self, recipe, step, table=None): # PREPARE DATA product_id='"'+recipe.part_number[:16]+'"' product_id=product_id.encode("ascii") test_press_bar=step.spec["test_pressure"]/1000 prefill_press_bar=step.spec["pre_filling_pressure"]/1000 tolerance=int(step.spec["settling_pressure_min_percent"]) fail_pos=test_press_bar+(int(step.spec["test_pressure_qpos"])/1000) fail_neg=test_press_bar+(int(step.spec["test_pressure_qneg"])/1000) fill_time=float(step.spec["filling_time"]) stab_time=float(step.spec["settling_time"]) test_time=float(step.spec["test_time"]) prefill_time=float(step.spec["pre_filling_time"]) vent_time=float(step.spec["flush_time"]) # SEND RECIPE PARAMETERS self.send_command("change_cur_prod_50") self.send_product_tag("product_id", product_id) self.send_product_tag("prefill_pressure", f"{prefill_press_bar:2.3f}") self.send_product_tag("test_pressure", f"{test_press_bar:2.3f}") self.send_product_tag("tolerance", f"{tolerance:3.1f}") self.send_product_tag("+fail", f"{fail_pos:3.1f}") self.send_product_tag("-fail", f"{fail_neg:3.1f}") self.send_product_tag("fill_time", f"{fill_time:3.1f}") self.send_product_tag("stab_time", f"{stab_time:3.1f}") self.send_product_tag("test_time", f"{test_time:3.1f}") self.send_product_tag("prefill_time", f"{prefill_time:3.1f}") self.send_product_tag("vent_time", f"{vent_time:3.1f}") self.send_product_tag("fail_high", f"{fail_pos:3.1f}") self.send_product_tag("fail_low", f"{fail_neg:3.1f}") self.send_product_tag("outputs_a_h", f"{int(0b01000000)}") def send_command(self,command): if type(command) is str: command=self.registers[command] out_bytes= bytearray() out_bytes.extend(EOT) out_bytes.extend(self.id1) out_bytes.extend(self.id2) out_bytes.extend(command) out_bytes.extend(ETX) checksum=self.calc_checksum(out_bytes) out_bytes.append(checksum) self.conn.write(out_bytes) response = self.conn.read(100) if response == ACK: return True else: self.log.error(f"SEND COMMAND({command}):{response}") return None def send_enquiry(self,enquiry): if type(enquiry) is str: enquiry=self.registers[enquiry] out_bytes=bytearray(EOT) out_bytes.extend(self.id1) out_bytes.extend(self.id2) out_bytes.extend(enquiry) out_bytes.extend(ENQ) checksum = self.calc_checksum(out_bytes) out_bytes.append(checksum) self.conn.write(out_bytes) response = self.conn.read(100) if len(response): read_checksum = response[-1] else: read_checksum = None calculated_checksum = self.calc_checksum(response[0:-1],start_idx=0) if read_checksum != calculated_checksum: self.log.error(f"SEND COMMAND:{response}") else: return response def send_product_tag(self,tag,tag_data): self.log.info(f"Sending tag:{tag}={tag_data}") command=bytearray(self.registers["product_data"]) if type(tag_data) is str: tag_data=bytearray(tag_data,encoding="ascii") command.extend(self.product_tags[tag]) command.extend(tag_data) self.send_command(command) @staticmethod def calc_checksum(data, start_idx=1): checksum = 0 for i, data_byte in enumerate(data): if i < start_idx: continue # skip EOT checksum = checksum ^ data_byte return checksum