diff --git a/src/components/usb_5860.py b/src/components/usb_5860.py new file mode 100644 index 0000000..04c0c3b --- /dev/null +++ b/src/components/usb_5860.py @@ -0,0 +1,154 @@ +import ctypes +import sys +import time +from enum import Enum + +from PyQt5.QtCore import QMutex, Qt, QTimer, pyqtSlot + +from .component import Component + +if "--sim-io" not in sys.argv: + libbiodaq = ctypes.CDLL("/opt/advantech/libs/libbiodaq.so") +else: + import lib.dummies.libbiodaq as libbiodaq + + +class USB_5860(Component): + class DioPortDir(Enum): + Input = 0x00 + LoutHin = 0x0F + LinHout = 0xF0 + Output = 0xFF + + class DeviceInformation(ctypes.Structure): + _fields_ = [ + ('DeviceNumber', ctypes.c_uint32), + ('DeviceMode', ctypes.c_uint32), + ('ModuleIndex', ctypes.c_uint32), + ('Description', ctypes.c_wchar_p) + ] + + def __init__(self, bench, name, config): + super().__init__(bench, name, config) + self.mutex = QMutex() + # DEVICE INFORMATION + self.info = self.DeviceInformation() + self.info.Description = "USB-5860,BID#0" + #self.info.DeviceNumber = -1 + self.info.DeviceMode = 1 + self.info.ModuleIndex = 0 + # DIGITAL INPUTS CLASS + self.di_create = libbiodaq.AdxInstantDiCtrlCreate + self.di_create.restype = ctypes.c_void_p + self.di_setSelectedDevice = libbiodaq.InstantDiCtrl_setSelectedDevice + self.di_setSelectedDevice.argtypes = [ctypes.c_void_p, ctypes.POINTER(self.DeviceInformation)] + self.di_setSelectedDevice.restype = ctypes.c_uint32 + # DIGITAL INPUTS READ FUNCTION + self.di_read = libbiodaq.InstantDiCtrl_ReadAny + self.di_read.argtypes = [ctypes.c_void_p, ctypes.c_int32, ctypes.c_int32, ctypes.c_char_p] + self.di_read.restype = ctypes.c_int32 + # DIGITAL OUTPUTS CLASS + self.do_create = libbiodaq.AdxInstantDoCtrlCreate + self.do_create.restype = ctypes.c_void_p + # SET SELECTED DEVICE + self.do_setSelectedDevice = libbiodaq.InstantDoCtrl_setSelectedDevice + self.do_setSelectedDevice.argtypes = [ctypes.c_void_p, ctypes.POINTER(self.DeviceInformation)] + self.do_setSelectedDevice.restype = ctypes.c_uint32 + # get ports + self.get_ports = libbiodaq.InstantDoCtrl_getPortDirection + self.get_ports.argtypes = [ctypes.c_void_p] + self.get_ports.restype = ctypes.POINTER(ctypes.c_void_p) + # # Array_getItem + # self.array_getItem = libbiodaq.TArray_getItem + # self.array_getItem.argtypes = [ctypes.c_void_p, ctypes.c_int32] + # self.get_ports.restype = ctypes.POINTER(ctypes.c_void_p) + # PortDirection_setDirection + self.port_direction_set_direction = libbiodaq.PortDirection_setDirection + self.port_direction_set_direction.argtypes = [ctypes.c_void_p, ctypes.c_uint8] + self.port_direction_set_direction.restype = ctypes.c_int32 + # DIGITAL OUTPUTS WRITE FUNCTION + self.do_write_bit = libbiodaq.InstantDoCtrl_WriteBit + self.do_write_bit.argtypes = [ctypes.c_void_p, ctypes.c_int32, ctypes.c_int32, ctypes.c_char] + self.do_write_bit.restype = ctypes.c_int32 + self.buffer = ctypes.create_string_buffer(5) + # INIT OBJECTS + self.di_ctrl = self.di_create() + self.do_ctrl = self.do_create() + self.di_init_status = self.di_setSelectedDevice(self.di_ctrl, ctypes.byref(self.info)) + self.do_init_status = self.do_setSelectedDevice(self.do_ctrl, ctypes.byref(self.info)) + if "--sim-io" not in sys.argv: + self.dio_ports = self.get_ports(self.do_ctrl) + # SET ALL RELAYS OFF + for bit in range(0, 8): + self.set_bit(0, bit, False) + self.state_delay = 1 + self.last_get = None + self.state_count = None + self.last_out = None + + @pyqtSlot() + def start(self): + # ACQUISITION TIMER + self.timer = QTimer() + self.timer.setTimerType(Qt.PreciseTimer) + self.timer.setInterval(int(1000 / 20)) + self.timer.timeout.connect(self._get) + self.timer.start() + super().start() + + # Read data for buffer + @pyqtSlot() + def _get(self): + get = self.get() + # print(self.last_get) + if self.state_count is None or self.last_get is None: + self.last_out = get + self.state_count = [[1 for bit in byte] for byte in get] + else: + for byte_n, byte in enumerate(get): + for bit_n, bit in enumerate(byte): + if bit == self.last_get[byte_n][bit_n]: + self.state_count[byte_n][bit_n] += 1 + else: + self.state_count[byte_n][bit_n] = 1 + if self.state_count[byte_n][bit_n] > self.state_delay: + self.last_out[byte_n][bit_n] = bit + self.last_get = get + self.update.emit([time.time(), self.last_out]) + + masks = [1 << n for n in range(8)] + + def get_all(self): + return self.get() + + # Read all data bytes + def get(self): + self.mutex.lock() + self.di_read(self.di_ctrl, 0, 2, self.buffer) + read = [] + for byte in range(len(self.buffer)): + byte = ~int.from_bytes(self.buffer[byte], "little") + read.append([bool(byte & m) for m in self.masks]) + self.mutex.unlock() + return read + + # Read data bit + def get_bit(self, byte, bit): + return self.get()[byte][bit] + + # Send data byte + def set(self, start_byte, val): + for byte, v in enumerate(val, start=start_byte): + for bit, bv in enumerate([bool(v & m) for m in self.masks]): + self.set_bit(byte, bit, bv) + + # Write single bit + def set_bit(self, byte, bit, val): + self.mutex.lock() + # print("set", byte, bit, not val, flush=True) + self.do_write_bit(self.do_ctrl, byte, bit, int(val)) + self.mutex.unlock() + + # val from buffer channel + def vfbc(self, buffer, channel): + return buffer[channel[0]][channel[1]] diff --git a/src/test/test_usb5860.py b/src/test/test_usb5860.py new file mode 100644 index 0000000..4674628 --- /dev/null +++ b/src/test/test_usb5860.py @@ -0,0 +1,18 @@ +import time + +from src.components.usb_5860 import USB_5860 + +digital_io = USB_5860(None, None, None) +digital_io.start() + +out_val = 0 +while True: + res = digital_io.set_bit(0, 0, out_val) + input_data = digital_io.get() + print("in:{} out:{}".format(bytes(input_data[0]), out_val)) + + time.sleep(1) + if out_val == 0: + out_val = 1 + else: + out_val = 0