Skip to content

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: m-labs/nmigen-soc
Failed to load repositories. Confirm that selected base ref is valid, then try again.
base: 8662815e1ea6
Choose a base ref
head repository: m-labs/nmigen-soc
Failed to load repositories. Confirm that selected head ref is valid, then try again.
compare: 8a54b574be3f
Choose a head ref
  • 3 commits
  • 7 files changed
  • 1 contributor

Commits on Oct 26, 2019

  1. csr.bus.Multiplexer: fix element w_stb getting stuck.

    Also, don't clear shadow; this would break e.g. reading a 64-bit
    CSR register through a 32-bit Wishbone bus if a code fetch happens
    between the halves. Instead, clear shadow enable flag driving OR-mux.
    whitequark committed Oct 26, 2019
    Copy the full SHA
    d52d0b9 View commit details
  2. csr.wishbone: add WishboneCSRBridge.

    whitequark committed Oct 26, 2019
    Copy the full SHA
    4f59a0b View commit details
  3. Add missing tests (100% branch coverage!)

    Found several bugs, too.
    whitequark committed Oct 26, 2019
    Copy the full SHA
    8a54b57 View commit details
Showing with 357 additions and 19 deletions.
  1. +12 −12 nmigen_soc/csr/
  2. +88 −0 nmigen_soc/csr/
  3. +4 −3 nmigen_soc/
  4. +13 −3 nmigen_soc/test/
  5. +216 −0 nmigen_soc/test/
  6. +23 −0 nmigen_soc/test/
  7. +1 −1 nmigen_soc/test/
24 changes: 12 additions & 12 deletions nmigen_soc/csr/
Original file line number Diff line number Diff line change
@@ -201,7 +201,8 @@ class Multiplexer(Elaboratable):
CSR bus providing access to registers.
def __init__(self, *, addr_width, data_width, alignment=0):
self.bus = Interface(addr_width=addr_width, data_width=data_width, alignment=alignment)
self.bus = Interface(addr_width=addr_width, data_width=data_width, alignment=alignment,
self._map = self.bus.memory_map

def align_to(self, alignment):
@@ -235,29 +236,30 @@ def elaborate(self, platform):

for elem, (elem_start, elem_end) in self._map.resources():
shadow = Signal(elem.width, name="{}__shadow".format(
if elem.access.readable():
shadow_en = Signal(elem_end - elem_start, name="{}__shadow_en".format(
m.d.sync += shadow_en.eq(0)
if elem.access.writable():
m.d.comb += elem.w_data.eq(shadow)
m.d.sync += elem.w_stb.eq(0)

# Enumerate every address used by the register explicitly, rather than using
# arithmetic comparisons, since some toolchains (e.g. Yosys) are too eager to infer
# carry chains for comparisons, even with a constant. (Register sizes don't have
# to be powers of 2.)
with m.Switch(self.bus.addr):
for chunk_offset, chunk_addr in enumerate(range(elem_start, elem_end)):
with m.Case(chunk_addr):
shadow_slice = shadow[chunk_offset * self.bus.data_width:
(chunk_offset + 1) * self.bus.data_width]
shadow_slice = shadow.word_select(chunk_offset, self.bus.data_width)

with m.Case(chunk_addr):
if elem.access.readable():
chunk_r_stb = Signal(self.bus.data_width,
name="{}__r_stb_{}".format(, chunk_offset))
r_data_fanin |= Mux(chunk_r_stb, shadow_slice, 0)
r_data_fanin |= Mux(shadow_en[chunk_offset], shadow_slice, 0)
if chunk_addr == elem_start:
m.d.comb += elem.r_stb.eq(self.bus.r_stb)
with m.If(self.bus.r_stb):
m.d.sync += shadow.eq(elem.r_data)
# Delay by 1 cycle, allowing reads to be pipelined.
m.d.sync += chunk_r_stb.eq(self.bus.r_stb)
m.d.sync += shadow_en.eq(self.bus.r_stb << chunk_offset)

if elem.access.writable():
if chunk_addr == elem_end - 1:
@@ -267,9 +269,6 @@ def elaborate(self, platform):
with m.If(self.bus.w_stb):
m.d.sync += shadow_slice.eq(self.bus.w_data)

with m.Default():
m.d.sync += shadow.eq(0)

m.d.comb += self.bus.r_data.eq(r_data_fanin)

return m
@@ -307,7 +306,8 @@ class Decoder(Elaboratable):
CSR bus providing access to subordinate buses.
def __init__(self, *, addr_width, data_width, alignment=0):
self.bus = Interface(addr_width=addr_width, data_width=data_width, alignment=alignment)
self.bus = Interface(addr_width=addr_width, data_width=data_width, alignment=alignment,
self._map = self.bus.memory_map
self._subs = dict()

88 changes: 88 additions & 0 deletions nmigen_soc/csr/
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from nmigen import *
from nmigen.utils import log2_int

from . import Interface as CSRInterface
from ..wishbone import Interface as WishboneInterface

__all__ = ["WishboneCSRBridge"]

class WishboneCSRBridge(Elaboratable):
"""Wishbone to CSR bridge.
A bus bridge for accessing CSR registers from Wishbone. This bridge supports any Wishbone
data width greater or equal to CSR data width and performs appropriate address translation.
Reads and writes always take ``self.data_width // csr_bus.data_width + 1`` cycles to complete,
regardless of the select inputs. Write side effects occur simultaneously with acknowledgement.
csr_bus : :class:`..csr.Interface`
CSR bus driven by the bridge.
data_width : int or None
Wishbone bus data width. If not specified, defaults to ``csr_bus.data_width``.
wb_bus : :class:`..wishbone.Interface`
Wishbone bus provided by the bridge.
def __init__(self, csr_bus, *, data_width=None):
if not isinstance(csr_bus, CSRInterface):
raise ValueError("CSR bus must be an instance of CSRInterface, not {!r}"
if csr_bus.data_width not in (8, 16, 32, 64):
raise ValueError("CSR bus data width must be one of 8, 16, 32, 64, not {!r}"
if data_width is None:
data_width = csr_bus.data_width

self.csr_bus = csr_bus
self.wb_bus = WishboneInterface(
addr_width=max(0, csr_bus.addr_width - log2_int(data_width // csr_bus.data_width)),

# Since granularity of the Wishbone interface matches the data width of the CSR bus,
# no width conversion is performed, even if the Wishbone data width is greater.

def elaborate(self, platform):
csr_bus = self.csr_bus
wb_bus = self.wb_bus

m = Module()

cycle = Signal(range(len(wb_bus.sel) + 1))
m.d.comb += csr_bus.addr.eq(Cat(cycle[:log2_int(len(wb_bus.sel))], wb_bus.adr))

with m.If(wb_bus.cyc & wb_bus.stb):
with m.Switch(cycle):
def segment(index):
return slice(index * wb_bus.granularity, (index + 1) * wb_bus.granularity)

for index, sel_index in enumerate(wb_bus.sel):
with m.Case(index):
if index > 0:
# CSR reads are registered, and we need to re-register them.
m.d.sync += wb_bus.dat_r[segment(index - 1)].eq(csr_bus.r_data)
m.d.comb += csr_bus.r_stb.eq(sel_index & ~wb_bus.we)
m.d.comb += csr_bus.w_data.eq(wb_bus.dat_w[segment(index)])
m.d.comb += csr_bus.w_stb.eq(sel_index & wb_bus.we)
m.d.sync += cycle.eq(index + 1)

with m.Default():
m.d.sync += wb_bus.dat_r[segment(index)].eq(csr_bus.r_data)
m.d.sync += wb_bus.ack.eq(1)

with m.Else():
m.d.sync += cycle.eq(0)
m.d.sync += wb_bus.ack.eq(0)

return m
7 changes: 4 additions & 3 deletions nmigen_soc/
Original file line number Diff line number Diff line change
@@ -151,8 +151,9 @@ def _compute_addr_range(self, addr, size, step=1, *, alignment):
overlap_descrs.append("resource {!r} at {:#x}..{:#x}"
.format(overlap, resource_range.start, resource_range.stop))
if overlap in self._windows:
window_range = self._windows[overlap]
overlap_descrs.append("window {!r} at {:#x}..{:#x}"
.format(overlap, resource_range.start, resource_range.stop))
.format(overlap, window_range.start, window_range.stop))
raise ValueError("Address range {:#x}..{:#x} overlaps with {}"
.format(addr, addr + size, ", ".join(overlap_descrs)))

@@ -351,7 +352,7 @@ def all_resources(self):
for sub_resource, sub_descr in assignment.all_resources():
yield sub_resource, self._translate(*sub_descr, assignment, addr_range)
assert False
assert False # :nocov:

def find_resource(self, resource):
"""Find address range corresponding to a resource.
@@ -409,4 +410,4 @@ def decode_address(self, address):
addr_range = self._windows[assignment]
return assignment.decode_address((address - addr_range.start) // addr_range.step)
assert False
assert False # :nocov:
16 changes: 13 additions & 3 deletions nmigen_soc/test/
Original file line number Diff line number Diff line change
@@ -110,9 +110,9 @@ def test_add_two(self):
(2, 3))

def test_add_wrong(self):
with self.assertRaisesRegex(ValueError,
r"Width must be a non-negative integer, not -1"):
Element(-1, "rw")
with self.assertRaisesRegex(TypeError,
r"Element must be an instance of csr\.Element, not 'foo'"):

def test_align_to(self):
self.assertEqual(self.dut.add(Element(8, "rw")),
@@ -167,10 +167,13 @@ def sim_test():
yield bus.w_stb.eq(1)
yield bus.w_stb.eq(0)
yield bus.addr.eq(2) # change address
self.assertEqual((yield elem_8_w.w_stb), 1)
self.assertEqual((yield elem_8_w.w_data), 0x3d)
self.assertEqual((yield elem_16_rw.w_stb), 0)
self.assertEqual((yield elem_8_w.w_stb), 0)

yield bus.addr.eq(2)
yield bus.w_data.eq(0x55)
@@ -260,6 +263,13 @@ def setUp(self):
self.dut = Decoder(addr_width=16, data_width=8)
Fragment.get(self.dut, platform=None) # silence UnusedElaboratable

def test_align_to(self):
self.assertEqual(self.dut.add(Interface(addr_width=10, data_width=8)),
(0, 0x400, 1))
self.assertEqual(self.dut.align_to(12), 0x1000)
self.assertEqual(self.dut.add(Interface(addr_width=10, data_width=8)),
(0x1000, 0x1400, 1))

def test_add_wrong_sub_bus(self):
with self.assertRaisesRegex(TypeError,
r"Subordinate bus must be an instance of csr\.Interface, not 1"):