usb-5860 test ok
This commit is contained in:
parent
46deb81013
commit
bbad2f28d0
154
src/components/usb_5860.py
Normal file
154
src/components/usb_5860.py
Normal file
|
|
@ -0,0 +1,154 @@
|
|||
import ctypes
|
||||
import sys
|
||||
import time
|
||||
from enum import Enum
|
||||
|
||||
from PyQt5.QtCore import QMutex, Qt, QTimer, pyqtSlot
|
||||
|
||||
from .component import Component
|
||||
|
||||
if "--sim-io" not in sys.argv:
|
||||
libbiodaq = ctypes.CDLL("/opt/advantech/libs/libbiodaq.so")
|
||||
else:
|
||||
import lib.dummies.libbiodaq as libbiodaq
|
||||
|
||||
|
||||
class USB_5860(Component):
|
||||
class DioPortDir(Enum):
|
||||
Input = 0x00
|
||||
LoutHin = 0x0F
|
||||
LinHout = 0xF0
|
||||
Output = 0xFF
|
||||
|
||||
class DeviceInformation(ctypes.Structure):
|
||||
_fields_ = [
|
||||
('DeviceNumber', ctypes.c_uint32),
|
||||
('DeviceMode', ctypes.c_uint32),
|
||||
('ModuleIndex', ctypes.c_uint32),
|
||||
('Description', ctypes.c_wchar_p)
|
||||
]
|
||||
|
||||
def __init__(self, bench, name, config):
|
||||
super().__init__(bench, name, config)
|
||||
self.mutex = QMutex()
|
||||
# DEVICE INFORMATION
|
||||
self.info = self.DeviceInformation()
|
||||
self.info.Description = "USB-5860,BID#0"
|
||||
#self.info.DeviceNumber = -1
|
||||
self.info.DeviceMode = 1
|
||||
self.info.ModuleIndex = 0
|
||||
# DIGITAL INPUTS CLASS
|
||||
self.di_create = libbiodaq.AdxInstantDiCtrlCreate
|
||||
self.di_create.restype = ctypes.c_void_p
|
||||
self.di_setSelectedDevice = libbiodaq.InstantDiCtrl_setSelectedDevice
|
||||
self.di_setSelectedDevice.argtypes = [ctypes.c_void_p, ctypes.POINTER(self.DeviceInformation)]
|
||||
self.di_setSelectedDevice.restype = ctypes.c_uint32
|
||||
# DIGITAL INPUTS READ FUNCTION
|
||||
self.di_read = libbiodaq.InstantDiCtrl_ReadAny
|
||||
self.di_read.argtypes = [ctypes.c_void_p, ctypes.c_int32, ctypes.c_int32, ctypes.c_char_p]
|
||||
self.di_read.restype = ctypes.c_int32
|
||||
# DIGITAL OUTPUTS CLASS
|
||||
self.do_create = libbiodaq.AdxInstantDoCtrlCreate
|
||||
self.do_create.restype = ctypes.c_void_p
|
||||
# SET SELECTED DEVICE
|
||||
self.do_setSelectedDevice = libbiodaq.InstantDoCtrl_setSelectedDevice
|
||||
self.do_setSelectedDevice.argtypes = [ctypes.c_void_p, ctypes.POINTER(self.DeviceInformation)]
|
||||
self.do_setSelectedDevice.restype = ctypes.c_uint32
|
||||
# get ports
|
||||
self.get_ports = libbiodaq.InstantDoCtrl_getPortDirection
|
||||
self.get_ports.argtypes = [ctypes.c_void_p]
|
||||
self.get_ports.restype = ctypes.POINTER(ctypes.c_void_p)
|
||||
# # Array_getItem
|
||||
# self.array_getItem = libbiodaq.TArray_getItem
|
||||
# self.array_getItem.argtypes = [ctypes.c_void_p, ctypes.c_int32]
|
||||
# self.get_ports.restype = ctypes.POINTER(ctypes.c_void_p)
|
||||
# PortDirection_setDirection
|
||||
self.port_direction_set_direction = libbiodaq.PortDirection_setDirection
|
||||
self.port_direction_set_direction.argtypes = [ctypes.c_void_p, ctypes.c_uint8]
|
||||
self.port_direction_set_direction.restype = ctypes.c_int32
|
||||
# DIGITAL OUTPUTS WRITE FUNCTION
|
||||
self.do_write_bit = libbiodaq.InstantDoCtrl_WriteBit
|
||||
self.do_write_bit.argtypes = [ctypes.c_void_p, ctypes.c_int32, ctypes.c_int32, ctypes.c_char]
|
||||
self.do_write_bit.restype = ctypes.c_int32
|
||||
self.buffer = ctypes.create_string_buffer(5)
|
||||
# INIT OBJECTS
|
||||
self.di_ctrl = self.di_create()
|
||||
self.do_ctrl = self.do_create()
|
||||
self.di_init_status = self.di_setSelectedDevice(self.di_ctrl, ctypes.byref(self.info))
|
||||
self.do_init_status = self.do_setSelectedDevice(self.do_ctrl, ctypes.byref(self.info))
|
||||
if "--sim-io" not in sys.argv:
|
||||
self.dio_ports = self.get_ports(self.do_ctrl)
|
||||
# SET ALL RELAYS OFF
|
||||
for bit in range(0, 8):
|
||||
self.set_bit(0, bit, False)
|
||||
self.state_delay = 1
|
||||
self.last_get = None
|
||||
self.state_count = None
|
||||
self.last_out = None
|
||||
|
||||
@pyqtSlot()
|
||||
def start(self):
|
||||
# ACQUISITION TIMER
|
||||
self.timer = QTimer()
|
||||
self.timer.setTimerType(Qt.PreciseTimer)
|
||||
self.timer.setInterval(int(1000 / 20))
|
||||
self.timer.timeout.connect(self._get)
|
||||
self.timer.start()
|
||||
super().start()
|
||||
|
||||
# Read data for buffer
|
||||
@pyqtSlot()
|
||||
def _get(self):
|
||||
get = self.get()
|
||||
# print(self.last_get)
|
||||
if self.state_count is None or self.last_get is None:
|
||||
self.last_out = get
|
||||
self.state_count = [[1 for bit in byte] for byte in get]
|
||||
else:
|
||||
for byte_n, byte in enumerate(get):
|
||||
for bit_n, bit in enumerate(byte):
|
||||
if bit == self.last_get[byte_n][bit_n]:
|
||||
self.state_count[byte_n][bit_n] += 1
|
||||
else:
|
||||
self.state_count[byte_n][bit_n] = 1
|
||||
if self.state_count[byte_n][bit_n] > self.state_delay:
|
||||
self.last_out[byte_n][bit_n] = bit
|
||||
self.last_get = get
|
||||
self.update.emit([time.time(), self.last_out])
|
||||
|
||||
masks = [1 << n for n in range(8)]
|
||||
|
||||
def get_all(self):
|
||||
return self.get()
|
||||
|
||||
# Read all data bytes
|
||||
def get(self):
|
||||
self.mutex.lock()
|
||||
self.di_read(self.di_ctrl, 0, 2, self.buffer)
|
||||
read = []
|
||||
for byte in range(len(self.buffer)):
|
||||
byte = ~int.from_bytes(self.buffer[byte], "little")
|
||||
read.append([bool(byte & m) for m in self.masks])
|
||||
self.mutex.unlock()
|
||||
return read
|
||||
|
||||
# Read data bit
|
||||
def get_bit(self, byte, bit):
|
||||
return self.get()[byte][bit]
|
||||
|
||||
# Send data byte
|
||||
def set(self, start_byte, val):
|
||||
for byte, v in enumerate(val, start=start_byte):
|
||||
for bit, bv in enumerate([bool(v & m) for m in self.masks]):
|
||||
self.set_bit(byte, bit, bv)
|
||||
|
||||
# Write single bit
|
||||
def set_bit(self, byte, bit, val):
|
||||
self.mutex.lock()
|
||||
# print("set", byte, bit, not val, flush=True)
|
||||
self.do_write_bit(self.do_ctrl, byte, bit, int(val))
|
||||
self.mutex.unlock()
|
||||
|
||||
# val from buffer channel
|
||||
def vfbc(self, buffer, channel):
|
||||
return buffer[channel[0]][channel[1]]
|
||||
18
src/test/test_usb5860.py
Normal file
18
src/test/test_usb5860.py
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
import time
|
||||
|
||||
from src.components.usb_5860 import USB_5860
|
||||
|
||||
digital_io = USB_5860(None, None, None)
|
||||
digital_io.start()
|
||||
|
||||
out_val = 0
|
||||
while True:
|
||||
res = digital_io.set_bit(0, 0, out_val)
|
||||
input_data = digital_io.get()
|
||||
print("in:{} out:{}".format(bytes(input_data[0]), out_val))
|
||||
|
||||
time.sleep(1)
|
||||
if out_val == 0:
|
||||
out_val = 1
|
||||
else:
|
||||
out_val = 0
|
||||
Loading…
Reference in New Issue
Block a user