Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: m-labs/nmigen-stdio
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 2919626302e5
Choose a base ref
...
head repository: m-labs/nmigen-stdio
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 39cd61c33d6b
Choose a head ref
  • 1 commit
  • 3 files changed
  • 1 contributor

Commits on Sep 20, 2019

  1. serial: WIP

    whitequark committed Sep 20, 2019
    Copy the full SHA
    39cd61c View commit details
Showing with 288 additions and 0 deletions.
  1. +3 −0 .gitignore
  2. +177 −0 nmigen_stdio/serial.py
  3. +108 −0 nmigen_stdio/test/test_serial.py
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -2,3 +2,6 @@
*.pyc
/*.egg-info
/.eggs

# nMigen
*.vcd
177 changes: 177 additions & 0 deletions nmigen_stdio/serial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
from nmigen import *
from nmigen.lib.cdc import MultiReg
from nmigen.tools import bits_for


__all__ = ["AsyncSerialRX", "AsyncSerialTX", "AsyncSerial"]


def _check_parity(parity):
choices = ("none", "mark", "space", "even", "odd")
if parity not in choices:
raise ValueError("Invalid parity {!r}; must be one of {}"
.format(parity, ", ".join(choices)))


def _compute_parity_bit(data, parity):
if parity == "none":
return C(0, 0)
if parity == "mark":
return C(1, 1)
if parity == "space":
return C(0, 1)
if parity == "even":
return data.xor()
if parity == "odd":
return ~data.xor()
assert False


def _wire_layout(data_bits, parity="none"):
return [
("stop", 1),
("parity", 0 if parity == "none" else 1),
("data", data_bits),
("start", 1),
]


class AsyncSerialRX(Elaboratable):
def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pins=None):
_check_parity(parity)
self._parity = parity

self.divisor = Signal(divisor_bits or bits_for(divisor), reset=divisor)

self.data = Signal(data_bits)
self.err = Record([
("overflow", 1),
("frame", 1),
("parity", 1),
])
self.rdy = Signal()
self.ack = Signal()

self.i = Signal(reset=1)

self._pins = pins

def elaborate(self, platform):
m = Module()

timer = Signal.like(self.divisor)
shreg = Record(_wire_layout(len(self.data), self._parity))
bitno = Signal.range(len(shreg))

if self._pins is not None:
m.d.submodules += MultiReg(self._pins.rx.i, self.i, reset=1)

with m.FSM() as fsm:
with m.State("IDLE"):
with m.If(~self.i):
m.d.sync += [
bitno.eq(len(shreg) - 1),
timer.eq(self.divisor >> 1),
]
m.next = "BUSY"

with m.State("BUSY"):
with m.If(timer != 0):
m.d.sync += timer.eq(timer - 1)
with m.Else():
m.d.sync += [
shreg.eq(Cat(self.i, shreg)),
bitno.eq(bitno - 1),
timer.eq(self.divisor),
]
with m.If(bitno == 0):
m.next = "DONE"

with m.State("DONE"):
with m.If(self.ack):
m.d.sync += [
self.data.eq(shreg.data),
self.err.frame .eq(~((shreg.start == 0) & (shreg.stop == 1))),
self.err.parity.eq(~(shreg.parity ==
_compute_parity_bit(shreg.data, self._parity))),
]
m.d.sync += self.err.overflow.eq(~self.ack)
m.next = "IDLE"

with m.If(self.ack):
m.d.sync += self.rdy.eq(fsm.ongoing("DONE"))

return m


class AsyncSerialTX(Elaboratable):
def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pins=None):
_check_parity(parity)
self._parity = parity

self.divisor = Signal(divisor_bits or bits_for(divisor), reset=divisor)

self.data = Signal(data_bits)
self.rdy = Signal()
self.ack = Signal()

self.o = Signal()

self._pins = pins

def elaborate(self, platform):
m = Module()

timer = Signal.like(self.divisor)
shreg = Record(_wire_layout(len(self.data), self._parity))
bitno = Signal.range(len(shreg))

if self._pins is not None:
m.d.comb += self._pins.tx.o.eq(self.o)

with m.FSM():
with m.State("IDLE"):
m.d.comb += self.rdy.eq(1)
m.d.sync += self.o.eq(shreg[0])
with m.If(self.ack):
m.d.sync += [
shreg.start .eq(0),
shreg.data .eq(self.data),
shreg.parity.eq(_compute_parity_bit(self.data, self._parity)),
shreg.stop .eq(1),
bitno.eq(len(shreg) - 1),
timer.eq(self.divisor),
]
m.next = "BUSY"

with m.State("BUSY"):
with m.If(timer != 0):
m.d.sync += timer.eq(timer - 1)
with m.Else():
m.d.sync += [
Cat(self.o, shreg).eq(shreg),
bitno.eq(bitno - 1),
timer.eq(self.divisor),
]
with m.If(bitno == 0):
m.next = "IDLE"

return m


class AsyncSerial(Elaboratable):
def __init__(self, *, divisor, divisor_bits=None, **kwargs):
self.divisor = Signal(divisor_bits or bits_for(divisor), reset=divisor)

self.rx = AsyncSerialRX(**kwargs)
self.tx = AsyncSerialTX(**kwargs)

def elaborate(self, platform):
m = Module()
m.submodules.rx = self.rx
m.submodules.tx = self.tx
m.d.comb += [
self.rx.divisor.eq(self.divisor),
self.tx.divisor.eq(self.divisor),
]
return m
108 changes: 108 additions & 0 deletions nmigen_stdio/test/test_serial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import unittest
from nmigen import *
from nmigen.lib.fifo import SyncFIFO
from nmigen.back.pysim import *

from ..serial import *


def simulation_test(dut, process):
with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim:
sim.add_clock(1e-6)
sim.add_sync_process(process)
sim.run()


class AsyncSerialRXTestCase(unittest.TestCase):
def tx_period(self):
for _ in range((yield self.dut.divisor) + 1):
yield

def tx_bits(self, bits):
for bit in bits:
yield from self.tx_period()
yield self.dut.i.eq(bit)

def rx_test(self, bits, *, data=None, errors=None):
def process():
self.assertFalse((yield self.dut.rdy))
yield self.dut.ack.eq(1)
yield from self.tx_bits(bits)
while not (yield self.dut.rdy):
yield
if data is not None:
self.assertFalse((yield self.dut.err))
self.assertEqual((yield self.dut.data), data)
if errors is not None:
self.assertTrue((yield self.dut.err))
for error in errors:
self.assertTrue((yield getattr(self.dut.err, error)))
simulation_test(self.dut, process)

def test_8n1(self):
self.dut = AsyncSerialRX(divisor=7, data_bits=8, parity="none")
self.rx_test([0, 1,0,1,0,1,1,1,0, 1], data=0b10101110)

def test_16n1(self):
self.dut = AsyncSerialRX(divisor=7, data_bits=16, parity="none")
self.rx_test([0, 1,0,1,0,1,1,1,0,1,1,1,1,0,0,0,0, 1],
data=0b1010111011110000)

def test_8m1(self):
self.dut = AsyncSerialRX(divisor=7, data_bits=8, parity="mark")
self.rx_test([0, 1,0,1,0,1,1,1,0, 1, 1], data=0b10101110)
self.rx_test([0, 1,0,1,0,1,1,0,0, 1, 1], data=0b10101100)
self.rx_test([0, 1,0,1,0,1,1,1,0, 0, 1], errors={"parity"})

def test_8s1(self):
self.dut = AsyncSerialRX(divisor=7, data_bits=8, parity="space")
self.rx_test([0, 1,0,1,0,1,1,1,0, 0, 1], data=0b10101110)
self.rx_test([0, 1,0,1,0,1,1,0,0, 0, 1], data=0b10101100)
self.rx_test([0, 1,0,1,0,1,1,1,0, 1, 1], errors={"parity"})

def test_8e1(self):
self.dut = AsyncSerialRX(divisor=7, data_bits=8, parity="even")
self.rx_test([0, 1,0,1,0,1,1,1,0, 1, 1], data=0b10101110)
self.rx_test([0, 1,0,1,0,1,1,0,0, 0, 1], data=0b10101100)
self.rx_test([0, 1,0,1,0,1,1,1,0, 0, 1], errors={"parity"})

def test_8o1(self):
self.dut = AsyncSerialRX(divisor=7, data_bits=8, parity="odd")
self.rx_test([0, 1,0,1,0,1,1,1,0, 0, 1], data=0b10101110)
self.rx_test([0, 1,0,1,0,1,1,0,0, 1, 1], data=0b10101100)
self.rx_test([0, 1,0,1,0,1,1,1,0, 1, 1], errors={"parity"})

def test_err_frame(self):
self.dut = AsyncSerialRX(divisor=7)
self.rx_test([0, 0,0,0,0,0,0,0,0, 0], errors={"frame"})

def test_err_overflow(self):
self.dut = AsyncSerialRX(divisor=7)
def process():
self.assertFalse((yield self.dut.rdy))
yield from self.tx_bits([0, 0,0,0,0,0,0,0,0, 1])
yield from self.tx_period()
self.assertFalse((yield self.dut.rdy))
self.assertTrue((yield self.dut.err.overflow))
simulation_test(self.dut, process)

def test_fifo(self):
self.dut = AsyncSerialRX(divisor=7)
self.fifo = SyncFIFO(width=8, depth=4)
m = Module()
m.submodules.rx = self.dut
m.submodules.fifo = self.fifo
m.d.comb += [
self.dut.ack.eq(self.fifo.w_rdy),
self.fifo.w_en.eq(self.dut.rdy),
self.fifo.w_data.eq(self.dut.data),
]
def process():
yield from self.tx_bits([0, 1,0,1,0,1,0,1,0, 1,
0, 0,1,0,1,0,1,0,1, 1])
yield from self.tx_period()
self.assertEqual((yield from self.fifo.read()), 0xAA)
self.assertEqual((yield from self.fifo.read()), 0x55)
yield
self.assertFalse((yield self.fifo.r_rdy))
simulation_test(m, process)