st-ten-1/src/components/usb_586x.py
2023-11-07 16:57:58 +01:00

259 lines
9.6 KiB
Python

import ctypes
import sys
import platform
import time
from enum import Enum
from PyQt5.QtCore import QMutex, Qt, QTimer, pyqtSlot, pyqtSignal
from PyQt5.QtWidgets import QMessageBox, QApplication
from .component import Component
is_win = platform.system().lower() == "windows"
if "--sim-io" not in sys.argv:
if is_win:
from components.Automation.BDaq import *
from components.Automation.BDaq.InstantDoCtrl import InstantDoCtrl
from components.Automation.BDaq.InstantDiCtrl import InstantDiCtrl
else:
libbiodaq = ctypes.CDLL("/opt/advantech/libs/libbiodaq.so")
else:
from components.dummies.Automation.BDaq import *
from components.dummies.Automation.BDaq.InstantDoCtrl import InstantDoCtrl
from components.dummies.Automation.BDaq.InstantDiCtrl import InstantDiCtrl
#is_win=False
#import components.dummies.libbiodaq as libbiodaq
from components.dummies.libbiodaq import ErrorCode
class USB_586x(Component):
class DeviceInformation(ctypes.Structure):
_fields_ = [
('DeviceNumber', ctypes.c_uint32),
('DeviceMode', ctypes.c_uint32),
('ModuleIndex', ctypes.c_uint32),
('Description', ctypes.c_wchar_p)
]
update=pyqtSignal(object)
masks = [1 << n for n in range(8)]
def __init__(self, config=None, name=None, period=1, lazy=True, paused=False, threaded=True):
super().__init__(config=config, name=name, period=period, lazy=lazy, paused=paused, threaded=threaded)
self.buffer = None
self.io_ok = False
self.mutex = QMutex()
self.simulate="--sim-io" in sys.argv
# DEVICE INFORMATION
self.id=config[name]["id"]
if "5860" in self.id:
self.type = "5860"
self.in_size = 1
self.out_size = 1
if "5862" in self.id:
self.type = "5862"
self.in_size = 2
self.out_size = 2
self.info = self.DeviceInformation()
self.info.Description = self.id
self.info.DeviceMode = 1
self.info.ModuleIndex = 0
self.open_device()
# SET ALL RELAYS OFF
for bit in range(0, self.out_size * 8):
self.set_bit(int(bit/8), bit % 8, False)
self.state_delay = 1
self.last_get = None
self.state_count = None
self.last_out = None
def open_device(self):
# DIGITAL I/O CLASS
if not self.simulate:
self.log.info("OPENING USB MODULE...")
if is_win:
try:
self.di_ctrl = InstantDiCtrl(self.info.Description)
self.do_ctrl = InstantDoCtrl(self.info.Description)
self.di_read = self.di_ctrl.readAny
self.do_write_bit = self.do_ctrl.writeBit
self.buffer = ctypes.create_string_buffer(2)
self.io_ok=True
except ValueError:
QMessageBox.critical(None, "ERRORE", f"ERRORE I/O DIGITALE - VERIFICARE CONNESSIONE USB")
exit(-1)
self.io_ok = False
time.sleep(1)
else:
self.di_create = libbiodaq.AdxInstantDiCtrlCreate
self.di_create.restype = ctypes.c_void_p
self.di_setSelectedDevice = libbiodaq.InstantDiCtrl_setSelectedDevice
self.di_setSelectedDevice.argtypes = [ctypes.c_void_p, ctypes.POINTER(self.DeviceInformation)]
self.di_setSelectedDevice.restype = ctypes.c_uint32
# DIGITAL INPUTS READ FUNCTION
self.di_read = libbiodaq.InstantDiCtrl_ReadAny
self.di_read.argtypes = [ctypes.c_void_p, ctypes.c_int32, ctypes.c_int32, ctypes.c_char_p]
self.di_read.restype = ctypes.c_int32
# DIGITAL OUTPUTS CLASS
self.do_create = libbiodaq.AdxInstantDoCtrlCreate
self.do_create.restype = ctypes.c_void_p
# SET SELECTED DEVICE
self.do_setSelectedDevice = libbiodaq.InstantDoCtrl_setSelectedDevice
self.do_setSelectedDevice.argtypes = [ctypes.c_void_p, ctypes.POINTER(self.DeviceInformation)]
self.do_setSelectedDevice.restype = ctypes.c_uint32
# get ports
self.get_ports = libbiodaq.InstantDoCtrl_getPortDirection
self.get_ports.argtypes = [ctypes.c_void_p]
self.get_ports.restype = ctypes.POINTER(ctypes.c_void_p)
# DIGITAL OUTPUTS WRITE FUNCTION
self.do_write_bit = libbiodaq.InstantDoCtrl_WriteBit
self.do_write_bit.argtypes = [ctypes.c_void_p, ctypes.c_int32, ctypes.c_int32, ctypes.c_char]
self.do_write_bit.restype = ctypes.c_int32
self.buffer = ctypes.create_string_buffer(2)
# INIT OBJECTS
self.di_ctrl = self.di_create()
self.do_ctrl = self.do_create()
self.di_init_status = self.di_setSelectedDevice(self.di_ctrl, ctypes.byref(self.info))
self.do_init_status = self.do_setSelectedDevice(self.do_ctrl, ctypes.byref(self.info))
self.io_ok = True
else:
self.di_ctrl = InstantDiCtrl(self.info.Description)
self.do_ctrl = InstantDoCtrl(self.info.Description)
self.di_read = self.di_ctrl.readAny
self.do_write_bit = self.do_ctrl.writeBit
self.buffer = ctypes.create_string_buffer(2)
self.io_ok = True
def close_device(self):
pass
@pyqtSlot()
def start(self):
# ACQUISITION TIMER
self.timer = QTimer()
self.timer.setTimerType(Qt.PreciseTimer)
self.timer.setInterval(int(1000 / 20))
self.timer.timeout.connect(self._get)
self.timer.start()
super().start()
# Read data for buffer
@pyqtSlot()
def _get(self):
get = self.get()
# print(self.last_get)
if self.state_count is None or self.last_get is None:
self.last_out = get
self.state_count = [[1 for bit in byte] for byte in get]
else:
for byte_n, byte in enumerate(get):
for bit_n, bit in enumerate(byte):
if bit == self.last_get[byte_n][bit_n]:
self.state_count[byte_n][bit_n] += 1
else:
self.state_count[byte_n][bit_n] = 1
if self.state_count[byte_n][bit_n] > self.state_delay:
self.last_out[byte_n][bit_n] = bit
self.last_get = get
#self.update.emit([time.time(), self.last_out])
super()._get([self.last_out])
def get_all(self):
return self.get()
# Read input bytes
def get(self):
self.mutex.lock()
read = []
retry=0
max_retry = 3
while retry < max_retry:
if is_win or self.simulate:
if self.simulate:
read=[1,0,0,0,0,0,0,0],[0,0,0,0,0,0,0,0]
break
else:
if self.io_ok:
ret = self.di_read(0, self.in_size)
if ret[0].value == ErrorCode.Success.value:
self.buffer = ret[1]
for byte_num in range(len(self.buffer)):
byte = self.buffer[byte_num]
read.append([bool(byte & m) for m in self.masks])
else:
self.buffer = None
self.log.error(f"READ ERROR")
self.di_ctrl.dispose()
self.do_ctrl.dispose()
self.io_ok = False
if self.io_ok:
break
else:
time.sleep(1)
self.open_device()
else:
self.di_read(self.di_ctrl, 0, self.in_size, self.buffer)
for byte_num in range(len(self.buffer)):
byte = int.from_bytes(self.buffer[byte_num], "little")
read.append([bool(byte & m) for m in self.masks])
self.mutex.unlock()
return read
# Read data bit
def get_bit(self, byte, bit):
return self.get()[byte][bit]
# Send data byte
def set(self, start_byte, val):
for byte, v in enumerate(val, start=start_byte):
for bit, bv in enumerate([bool(v & m) for m in self.masks]):
self.set_bit(byte, bit, bv)
# Write single bit
def set_bit(self, byte, bit, val):
self.mutex.lock()
# print("set", byte, bit, not val, flush=True)
if self.io_ok:
if not self.simulate:
if is_win:
ret=self.do_write_bit(byte, bit, int(val))
else:
ret=self.do_write_bit(self.do_ctrl, byte, bit, int(val))
else:
ret = ErrorCode.Success
else:
ret =False
self.mutex.unlock()
return ret
def set_bit_verify(self, byte, bit, val):
ok = False
retry=0
max_retry = 3
while not ok and retry < max_retry:
ret = self.set_bit(byte, bit, val)
if ret.value != ErrorCode.Success.value and not self.simulate:
self.log.error(f"SET BIT ERROR")
time.sleep(1)
self.open_device()
retry += 1
else:
ok = True
return ok
# val from buffer channel
def vfbc(self, buffer, channel):
return buffer[channel[0]][channel[1]]