st-ten-1/src/ui/test_pipe_cutter/test_pipe_cutter.py

180 lines
6.4 KiB
Python

from PyQt5.QtGui import QMovie
import logging
from src.ui.test_test import Test_Test
import time
from PyQt5.QtWidgets import QMessageBox, QDialog, QApplication
from PyQt5.QtCore import QByteArray
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Test_Pipe_Cutter(Test_Test):
"""
A class to manage and operate a pipe cutter machine in a test environment.
"""
def __init__(self, config, components, recipe=None, run_once=False,reset_on_start=True, enable_override=False, bench_name="generic", step=None,):
super().__init__(components=components, recipe=recipe, step=step, run_once=run_once,reset_on_start=reset_on_start, enable_override=enable_override,)
self.current_status = None
self.previous_status = None
self.end_cut = None
self.config = config
self.bench_name = bench_name
self.step=step
self.current_cut_length = 0
self.get_connection = None
self.to_calibrate=False
gif_path = "config/instruction_images/st-ten-10/pipe.gif"
self.movie = QMovie(gif_path,QByteArray(), self)
self.pipe_gif.setMovie(self.movie)
self.movie.start()
def start(self, recipe=None, step=None, pieces=None):
"""
Iterate through pipe cutter statuses, handling each status appropriately.
Stop cutting only when the status transitions from 102 -> 103 -> 102.
"""
self.pipe_gif.setMovie(self.movie)
# Call the parent start process
show = super().start(recipe=recipe, step=step, pieces=pieces)
if show is False:
return show
self.current_status = None
self.get_connection = self.components["pipe_cutter"].out.connect(self.get)
self.components["pipe_cutter"].resume()
return show
def start_cutting(self):
"""
Perform the pipe cutting process when in the 'ready for operation' state (102).
"""
self.components["pipe_cutter"].set_machine_mode()
if self.to_calibrate:
self.length = 100
self.diameter = 21
self.to_calibrate=False # resetting flag
else:
self.length = self.step.spec["length"]
self.diameter = self.step.spec["diameter"]
if self.current_status == 102:
try:
self.length = int(self.length)*100
self.diameter = int(self.diameter)*100
self.components["pipe_cutter"].write_od_of_pipe(self.diameter)
time.sleep(1)
self.components["pipe_cutter"].write_od_of_pipe(self.diameter)
time.sleep(1)
self.components["pipe_cutter"].write_bit_with_delay(600, 0, 100)
except Exception as e:
logger.error(f"Failed to start the pipe cutting process: {e}")
def stop_cutting(self):
"""
Stop the pipe cutting process and deactivate the pipe cutter component.
"""
try:
self.components["pipe_cutter"].write_bit_with_delay(600, 2, 100)
if self.current_status is 105:
self.end_cut = False
else:
self.end_cut = True
except Exception as e:
logger.error(f"Failed to stop the pipe cutting process: {e}")
def stop(self):
self.components["pipe_cutter"].pause()
self.disconnect(self.get_connection)
super().stop()
def reset_machine(self):
"""
Reset the pipe cutter machine to its initial state.
"""
try:
self.components["pipe_cutter"].write_bit_with_delay(600, 1, 100)
except Exception as e:
logger.error(f"Failed to reset the pipe cutter machine: {e}")
def get(self, data=None, override=False):
if self.done: # avoid processing if completed
return
if data is None or data[-1] is None:
super().get(None, override=override)
return
data = data[-1]
self.previus_status = None
self.current_status = self.components["pipe_cutter"].read(register=766)
#print(self.current_status)
# Status: 102 (ready for operation)
if self.current_status == 102:
if self.previous_status == 103:
#print("Transition detected: 103 -> 102. Stopping the cutting process.")
self.stop_cutting()
# Start cutting if machine is ready
self.start_cutting()
# Status: 103 (running)
elif self.current_status == 103:
#print(f"Status {self.current_status}: The pipe cutter is running. Monitoring the system.")
self.display_text(text="TAGLIO CORRUGATO IN CORSO", bg_color="green")
# Status: 105 (emergency)
elif self.current_status == 105:
#print(f"Status {self.current_status}: Emergency detected! Resetting the system.")
try:
self.display_text(text="EMERGENZA INTERVENUTA RIPETERE IL TEST", bg_color="red")
self.end_cut=False
except Exception as e:
logger.error(f"Failed to handle the emergency state: {e}")
# Status: 100 (need to reset)
elif self.current_status == 100:
#print(f"Status {self.current_status}: The system requires resetting. Performing reset.")
try:
self.display_text(text="RESET IN CORSO", bg_color="yellow")
self.reset_machine()
except Exception as e:
logger.error(f"Failed to reset the system: {e}")
# Unknown status
else:
# print(f"Unknown status {self.current_status}. Skipping operation.")
logger.warning(f"Unknown pipe cutter status {self.current_status} detected.")
self.previous_status = self.current_status
if self.end_cut is True:
ok = True
else:
ok = None
results = {"ok": ok}
super().get([{
"time": data.get("time", None),
"results": results
}], override=override, fail=ok is False)
def display_text(self,text="", bg_color=None,text_color=None):
if self.parent_assembly_widget is not None:
self.parent_assembly_widget().set_text(text=text, bg_color=bg_color,text_color=text_color)
QApplication.processEvents()
time.sleep(0.3)
QApplication.processEvents()
def to_calibrate(self):
self.to_calibrate=True