Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: whitequark/glasgow
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 05de04395af7
Choose a base ref
...
head repository: whitequark/glasgow
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: fed42691247f
Choose a head ref
  • 2 commits
  • 2 files changed
  • 1 contributor

Commits on Nov 29, 2018

  1. support.pyrepl: do not crash if _ is not defined.

    This happens e.g. if the first entered statement is `import foo`.
    whitequark committed Nov 29, 2018
    Copy the full SHA
    c39afbf View commit details
  2. support.chunked_fifo: implement.

    Required for GlasgowEmbedded#79.
    whitequark committed Nov 29, 2018
    Copy the full SHA
    fed4269 View commit details
Showing with 129 additions and 3 deletions.
  1. +125 −0 software/glasgow/support/chunked_fifo.py
  2. +4 −3 software/glasgow/support/pyrepl.py
125 changes: 125 additions & 0 deletions software/glasgow/support/chunked_fifo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from collections import deque


class ChunkedFIFO:
"""
A first-in first-out byte buffer that uses discontiguous storage to operate without copying.
"""
def __init__(self):
self._queue = deque()
self._chunk = None
self._offset = 0

def write(self, data):
"""Enqueue ``data``."""
if not data:
return

self._queue.append(memoryview(data))

def read(self, max_length=None):
"""
Dequeue at most ``max_length`` bytes. If ``max_length`` is not specified, dequeue
the maximum possible contiguous amount of bytes (at least one).
Regardless of what was written into the FIFO, ``read`` always returns a ``memoryview``
object.
"""
if max_length is None and self._chunk is None:
# Fast path.
return self._queue.popleft()

if max_length == 0:
return memoryview(b"")

if self._chunk is None:
self._chunk = self._queue.popleft()
self._offset = 0

if max_length is None:
result = self._chunk[self._offset:]
else:
result = self._chunk[self._offset:self._offset + max_length]

if self._offset + len(result) == len(self._chunk):
self._chunk = None
else:
self._offset += len(result)

return result

def __bool__(self):
return bool(self._queue)

def __len__(self):
length = sum(len(chunk) for chunk in self._queue)
if self._chunk is not None:
length += len(self._chunk) - self._offset
return length

# -------------------------------------------------------------------------------------------------

import unittest


class ChunkedFIFOTestCase(unittest.TestCase):
def setUp(self):
self.fifo = ChunkedFIFO()

def test_zero_write(self):
self.fifo.write(b"")
with self.assertRaises(IndexError):
self.fifo.read()

def test_zero_read(self):
self.assertEqual(self.fifo.read(0), b"")
self.fifo.write(b"A")
self.assertEqual(self.fifo.read(0), b"")
self.assertEqual(self.fifo.read(), b"A")

def test_fast(self):
self.fifo.write(b"AB")
self.fifo.write(b"CD")
self.assertEqual(self.fifo.read(), b"AB")
self.assertEqual(self.fifo.read(), b"CD")

def test_chunked(self):
self.fifo.write(b"ABCD")
self.fifo.write(b"EF")
self.assertEqual(self.fifo.read(1), b"A")
self.assertEqual(self.fifo.read(1), b"B")
self.assertEqual(self.fifo.read(2), b"CD")
self.assertEqual(self.fifo.read(), b"EF")

def test_chunked_chunked(self):
self.fifo.write(b"ABCD")
self.fifo.write(b"EF")
self.assertEqual(self.fifo.read(1), b"A")
self.assertEqual(self.fifo.read(1), b"B")
self.assertEqual(self.fifo.read(2), b"CD")
self.assertEqual(self.fifo.read(1), b"E")
self.assertEqual(self.fifo.read(1), b"F")

def test_chunked_fast(self):
self.fifo.write(b"ABCD")
self.fifo.write(b"EF")
self.assertEqual(self.fifo.read(1), b"A")
self.assertEqual(self.fifo.read(1), b"B")
self.assertEqual(self.fifo.read(), b"CD")
self.assertEqual(self.fifo.read(), b"EF")

def test_bool(self):
self.assertFalse(self.fifo)
self.fifo.write(b"A")
self.assertTrue(self.fifo)

def test_len(self):
self.assertEqual(len(self.fifo), 0)
self.fifo.write(b"ABCD")
self.assertEqual(len(self.fifo), 4)
self.fifo.write(b"EF")
self.assertEqual(len(self.fifo), 6)
self.fifo.read(1)
self.assertEqual(len(self.fifo), 5)
self.fifo.read(3)
self.assertEqual(len(self.fifo), 2)
7 changes: 4 additions & 3 deletions software/glasgow/support/pyrepl.py
Original file line number Diff line number Diff line change
@@ -32,9 +32,10 @@ def save_history(self):
def runcode(self, code):
try:
exec(code, self.locals)
result = self.locals["__builtins__"]["_"]
if asyncio.iscoroutine(result):
self._fut = self.locals["__builtins__"]["_"] = asyncio.ensure_future(result)
builtins = self.locals["__builtins__"]
if "_" in builtins:
if asyncio.iscoroutine(builtins["_"]):
self._fut = builtins["_"] = asyncio.ensure_future(builtins["_"])
except SystemExit:
raise
except: