Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: m-labs/nmigen
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: f22106e5ef00
Choose a base ref
...
head repository: m-labs/nmigen
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: ce1eff5464e5
Choose a head ref
  • 1 commit
  • 2 files changed
  • 1 contributor

Commits on Apr 21, 2019

  1. hdl.rec: implement Record.connect.

    Fixes #31.
    whitequark committed Apr 21, 2019
    Copy the full SHA
    ce1eff5 View commit details
Showing with 169 additions and 0 deletions.
  1. +51 −0 nmigen/hdl/rec.py
  2. +118 −0 nmigen/test/test_hdl_rec.py
51 changes: 51 additions & 0 deletions nmigen/hdl/rec.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from enum import Enum
from collections import OrderedDict
from functools import reduce

from .. import tracer
from ..tools import union
@@ -119,3 +120,53 @@ def __repr__(self):
if name is None:
name = "<unnamed>"
return "(rec {} {})".format(name, " ".join(fields))

def connect(self, *subordinates, include=None, exclude=None):
def rec_name(record):
if record.name is None:
return "unnamed record"
else:
return "record '{}'".format(record.name)

for field in include or {}:
if field not in self.fields:
raise AttributeError("Cannot include field '{}' because it is not present in {}"
.format(field, rec_name(self)))
for field in exclude or {}:
if field not in self.fields:
raise AttributeError("Cannot exclude field '{}' because it is not present in {}"
.format(field, rec_name(self)))

stmts = []
for field in self.fields:
if include is not None and field not in include:
continue
if exclude is not None and field in exclude:
continue

shape, direction = self.layout[field]
if not isinstance(shape, Layout) and direction == DIR_NONE:
raise TypeError("Cannot connect field '{}' of {} because it does not have "
"a direction"
.format(field, rec_name(self)))

item = self.fields[field]
subord_items = []
for subord in subordinates:
if field not in subord.fields:
raise AttributeError("Cannot connect field '{}' of {} to subordinate {} "
"because the subordinate record does not have this field"
.format(field, rec_name(self), rec_name(subord)))
subord_items.append(subord.fields[field])

if isinstance(shape, Layout):
sub_include = include[field] if include and field in include else None
sub_exclude = exclude[field] if exclude and field in exclude else None
stmts += item.connect(*subord_items, include=sub_include, exclude=sub_exclude)
else:
if direction == DIR_FANOUT:
stmts += [sub_item.eq(item) for sub_item in subord_items]
if direction == DIR_FANIN:
stmts += [item.eq(reduce(lambda a, b: a | b, subord_items))]

return stmts
118 changes: 118 additions & 0 deletions nmigen/test/test_hdl_rec.py
Original file line number Diff line number Diff line change
@@ -105,3 +105,121 @@ def test_wrong_field_unnamed(self):
with self.assertRaises(NameError,
msg="Unnamed record does not have a field 'en'. Did you mean one of: stb, ack?"):
r.en


class ConnectTestCase(FHDLTestCase):
def setUp_flat(self):
self.core_layout = [
("addr", 32, DIR_FANOUT),
("data_r", 32, DIR_FANIN),
("data_w", 32, DIR_FANIN),
]
self.periph_layout = [
("addr", 32, DIR_FANOUT),
("data_r", 32, DIR_FANIN),
("data_w", 32, DIR_FANIN),
]

def setUp_nested(self):
self.core_layout = [
("addr", 32, DIR_FANOUT),
("data", [
("r", 32, DIR_FANIN),
("w", 32, DIR_FANIN),
]),
]
self.periph_layout = [
("addr", 32, DIR_FANOUT),
("data", [
("r", 32, DIR_FANIN),
("w", 32, DIR_FANIN),
]),
]

def test_flat(self):
self.setUp_flat()

core = Record(self.core_layout)
periph1 = Record(self.periph_layout)
periph2 = Record(self.periph_layout)

stmts = core.connect(periph1, periph2)
self.assertRepr(stmts, """(
(eq (sig periph1__addr) (sig core__addr))
(eq (sig periph2__addr) (sig core__addr))
(eq (sig core__data_r) (| (sig periph1__data_r) (sig periph2__data_r)))
(eq (sig core__data_w) (| (sig periph1__data_w) (sig periph2__data_w)))
)""")

def test_flat_include(self):
self.setUp_flat()

core = Record(self.core_layout)
periph1 = Record(self.periph_layout)
periph2 = Record(self.periph_layout)

stmts = core.connect(periph1, periph2, include={"addr": True})
self.assertRepr(stmts, """(
(eq (sig periph1__addr) (sig core__addr))
(eq (sig periph2__addr) (sig core__addr))
)""")

def test_flat_exclude(self):
self.setUp_flat()

core = Record(self.core_layout)
periph1 = Record(self.periph_layout)
periph2 = Record(self.periph_layout)

stmts = core.connect(periph1, periph2, exclude={"addr": True})
self.assertRepr(stmts, """(
(eq (sig core__data_r) (| (sig periph1__data_r) (sig periph2__data_r)))
(eq (sig core__data_w) (| (sig periph1__data_w) (sig periph2__data_w)))
)""")

def test_nested(self):
self.setUp_nested()

core = Record(self.core_layout)
periph1 = Record(self.periph_layout)
periph2 = Record(self.periph_layout)

stmts = core.connect(periph1, periph2)
self.maxDiff = None
self.assertRepr(stmts, """(
(eq (sig periph1__addr) (sig core__addr))
(eq (sig periph2__addr) (sig core__addr))
(eq (sig core__data__r) (| (sig periph1__data__r) (sig periph2__data__r)))
(eq (sig core__data__w) (| (sig periph1__data__w) (sig periph2__data__w)))
)""")

def test_wrong_include_exclude(self):
self.setUp_flat()

core = Record(self.core_layout)
periph = Record(self.periph_layout)

with self.assertRaises(AttributeError,
msg="Cannot include field 'foo' because it is not present in record 'core'"):
core.connect(periph, include={"foo": True})

with self.assertRaises(AttributeError,
msg="Cannot exclude field 'foo' because it is not present in record 'core'"):
core.connect(periph, exclude={"foo": True})

def test_wrong_direction(self):
recs = [Record([("x", 1)]) for _ in range(2)]

with self.assertRaises(TypeError,
msg="Cannot connect field 'x' of unnamed record because it does not have "
"a direction"):
recs[0].connect(recs[1])

def test_wrong_missing_field(self):
core = Record([("addr", 32, DIR_FANOUT)])
periph = Record([])

with self.assertRaises(AttributeError,
msg="Cannot connect field 'addr' of record 'core' to subordinate record 'periph' "
"because the subordinate record does not have this field"):
core.connect(periph)