from PyQt5.QtGui import QMovie import logging from src.ui.test_test import Test_Test import time from PyQt5.QtWidgets import QMessageBox, QDialog, QApplication from PyQt5.QtCore import QByteArray logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class Test_Pipe_Cutter(Test_Test): """ A class to manage and operate a pipe cutter machine in a test environment. """ def __init__(self, config, components, recipe=None, run_once=False,reset_on_start=True, enable_override=False, bench_name="generic", step=None,): super().__init__(components=components, recipe=recipe, step=step, run_once=run_once,reset_on_start=reset_on_start, enable_override=enable_override,) self.end_cut = None self.config = config self.bench_name = bench_name self.step=step self.current_cut_length = 0 self.get_connection = None gif_path = "config/instruction_images/st-ten-10/pipe.gif" self.movie = QMovie(gif_path,QByteArray(), self) self.pipe_gif.setMovie(self.movie) self.movie.start() self.pipe_gif.setMovie(self.movie) def start(self, recipe=None, step=None, pieces=None): """ Iterate through pipe cutter statuses, handling each status appropriately. """ self.get_connection = self.components["pipe_cutter"].out.connect(self.get) self.components["pipe_cutter"].resume() # Call the parent start process show = super().start(recipe=recipe, step=step, pieces=pieces) if show is False: return show # Read the current status of the pipe cutter self.current_status = self.components["pipe_cutter"].read(register=766) status_actions = { 102: "ready for operation", 103: "running", 105: "emergency", 100: "need to reset" } for idx, status in enumerate([self.current_status]): # Iterate over the current status print(f"Processing status {status_actions.get(status, 'unknown')} at index {idx}") if status == 102: # Ready for operation print(f"Status {status}: Starting the cutting process.") self.start_cutting(recipe=recipe, step=step) elif status == 103: # Running print(f"Status {status}: The pipe cutter is running. Monitoring the system.") self.display_text(text="TAGLIO CORRUGATO IN CORSO", bg_color="yellow") elif status == 105: # Emergency print(f"Status {status}: Emergency detected! Resetting the system.") try: self.display_text(text="EMERGENZA INTERNEVUTA", bg_color="red") except Exception as e: logger.error(f"Failed to handle the emergency state: {e}") elif status == 100: # Need to reset print(f"Status {status}: The system requires resetting. Performing reset.") try: self.reset() self.display_text(text="RESET IN CORSO", bg_color="yellow") except Exception as e: logger.error(f"Failed to reset the system: {e}") else: # Handle unknown statuses print(f"Unknown status: {status}. Skipping operation.") logger.warning(f"Unknown pipe cutter status {status} detected.") # Final steps after all statuses are processed if step is not None: self.stop_cutting() self.stop() return show def start_cutting(self, recipe=None, step=None): """ Perform the pipe cutting process when in the 'ready for operation' state (102). """ self.lenght = self.step.spec["lenght"] self.diameter = self.step.spec["diameter"] try: self.components["pipe_cutter"].write_total_length(self.lenght) self.components["pipe_cutter"].write_od_of_pipe(self.diameter) time.sleep(1) self.components["pipe_cutter"].write_bit_with_delay(600, 0, 100) print(f"Status 102: Pipe cutting process started successfully.") except Exception as e: logger.error(f"Failed to start the pipe cutting process: {e}") def stop_cutting(self): """ Stop the pipe cutting process and deactivate the pipe cutter component. """ try: self.components["pipe_cutter"].write_bit_with_delay(600, 2, 100) self.end_cut = True except Exception as e: logger.error(f"Failed to stop the pipe cutting process: {e}") def stop(self): self.components["pipe_cutter"].pause() self.disconnect(self.get_connection) self.display_text(text="TAGLIO INTERROTTO", bg_color="yellow") super().stop() def reset_machine(self): """ Reset the pipe cutter machine to its initial state. """ try: self.components["pipe_cutter"].write_bit_with_delay(600, 1, 100) except Exception as e: logger.error(f"Failed to reset the pipe cutter machine: {e}") def get(self, data=None, override=False): if self.done: # avoid processing if completed return if data is None or data[-1] is None: super().get(None, override=override) return data = data[-1] if self.end_cut is True: ok = True else: ok = None results = {"ok": ok} super().get([{ "time": data.get("time", None), "results": results }], override=override, fail=ok is False) def display_text(self,text="", bg_color=None,text_color=None): if self.parent_assembly_widget is not None: self.parent_assembly_widget().set_text(text=text, bg_color=bg_color,text_color=text_color) QApplication.processEvents() time.sleep(0.3) QApplication.processEvents()