-
Notifications
You must be signed in to change notification settings - Fork 201
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
1 changed file
with
10 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,9 @@ | |
|
||
from ctypes import byref, c_ulong | ||
import numpy as np | ||
import logging | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class DAQmxSim: | ||
|
@@ -14,12 +17,6 @@ def close(self): | |
def ping(self): | ||
return True | ||
|
||
def string_to_bytes(string, name): | ||
if isinstance(string, str): | ||
string = bytes(string, encoding="ascii") | ||
elif not isinstance(string, bytes): | ||
raise ValueError("{} must be of either str or bytes type".format(name)) | ||
return string | ||
|
||
class DAQmx: | ||
"""NI PXI6733 DAQ interface.""" | ||
|
@@ -38,13 +35,15 @@ def __init__(self, channels, clock): | |
|
||
import PyDAQmx as daq | ||
|
||
self.channels = string_to_bytes(channels, "channels") | ||
self.clock = string_to_bytes(clock, "clock") | ||
self.channels = channels.encode() | ||
self.clock = clock.encode() | ||
self.task = None | ||
self.daq = daq | ||
|
||
def done_callback_py(self, taskhandle, status, callback_data): | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
fallen
Author
Contributor
|
||
if taskhandle == self.task: | ||
if taskhandle != self.task: | ||
logger.warning("done callback called with unexpected task") | ||
else: | ||
self.clear_pending_task() | ||
|
||
def ping(self): | ||
|
@@ -60,7 +59,7 @@ def load_sample_values(self, sampling_freq, values): | |
This loads sample values into the PXI 6733 device. | ||
The device will output samples at each clock rising edge. | ||
This comment has been minimized.
Sorry, something went wrong. |
||
The first sample is output at the first clock rising edge. | ||
The device waits for a clock rising edge to output the first sample. | ||
When using several channels simultaneously, you must concatenate the | ||
values for the different channels in the ``values`` array. | ||
|
@@ -92,7 +91,7 @@ def load_sample_values(self, sampling_freq, values): | |
channel_number = self.daq.int32() | ||
t.GetTaskNumChans(byref(channel_number)) | ||
nb_values = len(values) | ||
if nb_values % channel_number.value > 0: | ||
if nb_values % channel_number.value: | ||
self.daq.DAQmxClearTask(t.taskHandle) | ||
raise ValueError("The size of the values array must be a multiple " | ||
"of the number of channels ({})" | ||
|
What does _py mean?