Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: GlasgowEmbedded/glasgow
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 4df5b8056bae^
Choose a base ref
...
head repository: GlasgowEmbedded/glasgow
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: fa0e8339d57b
Choose a head ref
  • 7 commits
  • 4 files changed
  • 1 contributor

Commits on Jul 23, 2019

  1. access.direct.demultiplexer: introduce soft_flush

    flush() ensures that all written data has been sent to the device.
    soft_flush() on the other hand ensures that all written data has been
    queued as USB URBs, but does not wait for them to complete.
    
    Use soft_flush() to ensure that every last byte of queued data will
    make it to the device in a timely manner, without stalling the entire
    USB transfer pipeline.
    marcan committed Jul 23, 2019

    Verified

    This commit was signed with the committer’s verified signature.
    dariakp Daria Pardue
    Copy the full SHA
    4df5b80 View commit details
  2. applet.video.ws2812_output: use soft_flush

    We need to make sure that all video data will make it to the Glasgow in
    a timely manner, to avoid "tearing" the frame (the LEDs cannot handle
    gaps longer than a few microseconds in the data stream without
    triggering a premature latch/reset), but we don't necessarily want to
    block for the data to actually be sent.
    marcan committed Jul 23, 2019

    Verified

    This commit was signed with the committer’s verified signature.
    dariakp Daria Pardue
    Copy the full SHA
    44f2dcb View commit details
  3. access.direct.demultiplexer: introduce software side queue length limits

    This implements a configurable thireshold for FIFO data queueing on the
    Python side, with pushback. Previously, any data was buffered eagerly,
    e.g. an applet sending data on the hardware side and never receiving
    it on the software side (or vice versa) would cause the data to pile up
    in Python buffers at full speed, with no memory usage limit.
    
    With this patch, the user may specify queue size limits. These limits
    are not strict, but rather just thresholds for pushback. If the user
    configures a 1MB FIFO limit and writes 100MB, the write will complete
    immediately, but a subsequent write will block until 99MB of data has
    been successufully queued with the USB subsystem (URB buffers do not
    count towards the limit). Similarly, configuring a 1MB read limit
    will allow a URB completion that crosses that threshold to complete,
    but no further URBs will be re-queued until the application reads data
    from the FIFO, eventually causing the on-device FIFO to fill up and
    stop accepting writes from the gateware.
    marcan committed Jul 23, 2019
    Copy the full SHA
    0f71684 View commit details
  4. support.endpoint: introduce queue_size argument

    This limits the receive queue size and pushes back on the underlying
    transport when the receive queue is full.
    marcan committed Jul 23, 2019
    Copy the full SHA
    8d20486 View commit details
  5. Copy the full SHA
    0e46d11 View commit details
  6. Copy the full SHA
    94091a2 View commit details

Commits on Jul 26, 2019

  1. Copy the full SHA
    fa0e833 View commit details
Showing with 77 additions and 17 deletions.
  1. +27 −6 software/glasgow/access/direct/demultiplexer.py
  2. +15 −5 software/glasgow/applet/video/ws2812_output/__init__.py
  3. +9 −5 software/glasgow/cli.py
  4. +26 −1 software/glasgow/support/endpoint.py
33 changes: 27 additions & 6 deletions software/glasgow/access/direct/demultiplexer.py
Original file line number Diff line number Diff line change
@@ -76,11 +76,13 @@ def __init__(self, device, pipe_count):
else:
assert False

async def claim_interface(self, applet, mux_interface, args, pull_low=set(), pull_high=set()):
async def claim_interface(self, applet, mux_interface, args, pull_low=set(), pull_high=set(),
read_buffer_size=None, write_buffer_size=None):
assert mux_interface._pipe_num not in self._claimed
self._claimed.add(mux_interface._pipe_num)

iface = DirectDemultiplexerInterface(self.device, applet, mux_interface)
iface = DirectDemultiplexerInterface(self.device, applet, mux_interface,
read_buffer_size, write_buffer_size)
self._interfaces.append(iface)

if hasattr(args, "mirror_voltage") and args.mirror_voltage:
@@ -136,9 +138,13 @@ async def claim_interface(self, applet, mux_interface, args, pull_low=set(), pul


class DirectDemultiplexerInterface(AccessDemultiplexerInterface):
def __init__(self, device, applet, mux_interface):
def __init__(self, device, applet, mux_interface, read_buffer_size=None, write_buffer_size=None):
super().__init__(device, applet)

self._write_buffer_size = write_buffer_size
self._read_buffer_size = read_buffer_size
self._in_pushback = asyncio.Condition()

self._pipe_num = mux_interface._pipe_num
self._addr_reset = mux_interface._addr_reset

@@ -198,6 +204,10 @@ async def reset(self):

async def _in_task(self):
size = self._in_packet_size * _packets_per_xfer
if self._read_buffer_size is not None:
async with self._in_pushback:
while len(self._in_buffer) > self._read_buffer_size:
await self._in_pushback.wait()
data = await self.device.bulk_read(self._endpoint_in, size)
self._in_buffer.write(data)

@@ -223,12 +233,16 @@ async def read(self, length=None):
self.logger.trace("FIFO: need %d bytes", length - len(self._in_buffer))
await self._in_tasks.wait_one()

result = self._in_buffer.read(length)
async with self._in_pushback:
result = self._in_buffer.read(length)
self._in_pushback.notify_all()
if len(result) < length:
chunks = [result]
length -= len(result)
while length > 0:
chunk = self._in_buffer.read(length)
async with self._in_pushback:
chunk = self._in_buffer.read(length)
self._in_pushback.notify_all()
chunks.append(chunk)
length -= len(chunk)
# Always return a memoryview object, to avoid hard to detect edge cases downstream.
@@ -269,6 +283,11 @@ async def write(self, data):
# Eagerly check if any of our previous queued writes errored out.
await self._out_tasks.poll()

if self._write_buffer_size is not None:
self.logger.trace("FIFO: buffer full, pushing back")
while len(self._out_buffer) > self._write_buffer_size:
await self._out_tasks.wait_one()

self.logger.trace("FIFO: write <%s>", dump_hex(data))
self._out_buffer.write(data)

@@ -297,7 +316,7 @@ async def write(self, data):
len(self._out_tasks) < _xfers_per_queue:
self._out_tasks.submit(self._out_task(self._out_slice()))

async def flush(self):
async def soft_flush(self, soft=False):
self.logger.trace("FIFO: flush")

# First, we ensure we can submit one more task. (There can be more tasks than
@@ -316,4 +335,6 @@ async def flush(self):
data += self._out_buffer.read()
self._out_tasks.submit(self._out_task(data))

async def flush(self, soft=False):
await self.soft_flush()
await self._out_tasks.wait_all()
20 changes: 15 additions & 5 deletions software/glasgow/applet/video/ws2812_output/__init__.py
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ def __init__(self, pads, sys_clk_freq, count, out_fifo):
assert t_period / sys_clk_freq < 7000e-9
t_zero = int(1 + sys_clk_freq * 100e-9)
assert t_zero < sys_clk_freq * 500e-9
t_reset = int(1 + sys_clk_freq * 1000e-6)
t_reset = int(1 + sys_clk_freq * 300e-6)

self.submodules.output = output = VideoWS2812Output(pads)

@@ -133,6 +133,9 @@ def add_build_arguments(cls, parser, access):
parser.add_argument(
"-c", "--count", metavar="N", type=int, required=True,
help="set the number of LEDs per string")
parser.add_argument(
"-b", "--buffer", metavar="N", type=int, default=16,
help="set the number of frames to buffer internally (buffered twice)")
ServerEndpoint.add_argument(parser, "endpoint")

def build(self, target, args):
@@ -147,15 +150,22 @@ def build(self, target, args):
return subtarget

async def run(self, device, args):
return await device.demultiplexer.claim_interface(self, self.mux_interface, args)
buffer_size = len(args.pin_set_out) * args.count * 3 * args.buffer
return await device.demultiplexer.claim_interface(self, self.mux_interface, args, write_buffer_size=buffer_size)

async def interact(self, device, args, leds):
endpoint = await ServerEndpoint("socket", self.logger, args.endpoint)
frame_size = len(args.pin_set_out) * args.count * 3
buffer_size = frame_size * args.buffer
endpoint = await ServerEndpoint("socket", self.logger, args.endpoint, queue_size=buffer_size)
while True:
try:
data = await asyncio.shield(endpoint.recv())
data = await asyncio.shield(endpoint.recv(buffer_size))
partial = len(data) % frame_size
while partial:
data += await asyncio.shield(endpoint.recv(frame_size - partial))
partial = len(data) % frame_size
await leds.write(data)
await leds.flush()
await leds.soft_flush()
except asyncio.CancelledError:
pass

14 changes: 9 additions & 5 deletions software/glasgow/cli.py
Original file line number Diff line number Diff line change
@@ -200,11 +200,10 @@ def add_toolchain_args(parser):
"--trace", metavar="FILENAME", type=argparse.FileType("wt"), default=None,
help="trace applet I/O to FILENAME")

g_run_bitstream = p_run.add_mutually_exclusive_group(required=True)
g_run_bitstream.add_argument(
p_run.add_argument(
"--bitstream", metavar="FILENAME", type=argparse.FileType("rb"),
help="read bitstream from the specified file")
add_applet_arg(g_run_bitstream, mode="run")
add_applet_arg(p_run, mode="run")

p_tool = subparsers.add_parser(
"tool", formatter_class=TextHelpFormatter,
@@ -425,8 +424,13 @@ async def _main():
target, applet = _applet(device.revision, args)
device.demultiplexer = DirectDemultiplexer(device, target.multiplexer.pipe_count)

await device.download_target(target, rebuild=args.rebuild,
toolchain_opts=_toolchain_opts(args))
if args.bitstream:
with args.bitstream as f:
logger.info("downloading bitstream from %r", f.name)
await device.download_bitstream(f.read())
else:
await device.download_target(target, rebuild=args.rebuild,
toolchain_opts=_toolchain_opts(args))

if args.trace:
logger.info("starting applet analyzer")
27 changes: 26 additions & 1 deletion software/glasgow/support/endpoint.py
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ def add_argument(cls, parser, name, default=None):
name, metavar=metavar, type=endpoint, nargs=nargs, default=default,
help=help)

async def __init__(self, name, logger, sock_addr):
async def __init__(self, name, logger, sock_addr, queue_size=None):
assert isinstance(sock_addr, tuple)

self.name = name
@@ -61,11 +61,15 @@ async def __init__(self, name, logger, sock_addr):
self._send_epoch = 0
self._recv_epoch = 1
self._queue = deque()
self._queued = 0
self._queue_size = queue_size
self._future = None

self._buffer = None
self._pos = 0

self._read_paused = False

def _log(self, level, message, *args):
self._logger.log(level, self.name + ": " + message, *args)

@@ -97,9 +101,24 @@ def connection_lost(self, exc):
self._check_future()

def data_received(self, data):
self._log(logging.TRACE, "endpoint received %d bytes", len(data))
self._queue.append(data)
self._queued += len(data)
self._check_pushback()
self._check_future()

def _check_pushback(self):
if self._queue_size is None:
return
elif not self._read_paused and self._queued >= self._queue_size:
self._log(logging.TRACE, "queue full, pausing reads")
self._transport.pause_reading()
self._read_paused = True
elif self._read_paused and self._queued < self._queue_size:
self._log(logging.TRACE, "queue not full, resuming reads")
self._transport.resume_reading()
self._read_paused = False

def _check_future(self):
if self._queue and self._future is not None:
item = self._queue.popleft()
@@ -131,6 +150,8 @@ async def recv(self, length=0):

chunk = self._buffer[:length - len(data)]
self._buffer = self._buffer[len(chunk):]
self._queued -= len(chunk)
self._check_pushback()
data += chunk

self._log(logging.TRACE, "recv <%s>", data.hex())
@@ -148,11 +169,15 @@ async def recv_until(self, separator):
index = self._buffer.index(separator)
chunk = self._buffer[:index]
self._buffer = self._buffer[index + 1:]
self._queued -= len(chunk)
self._check_pushback()
data += chunk
break

except ValueError:
data += self._buffer
self._queued -= len(self._buffer)
self._check_pushback()
self._buffer = None

self._log(logging.TRACE, "recv <%s%s>", data.hex(), separator.hex())