generated from saji/ecp5-template
Compare commits
No commits in common. "5f54b8acd8978fd8281b3dc8359041049b94c747" and "2893262f87dbfc7d6af9bdf308cd4ad0f529b437" have entirely different histories.
5f54b8acd8
...
2893262f87
|
@ -3,22 +3,23 @@
|
||||||
# during operation, it is given a row index, and responds with the data.
|
# during operation, it is given a row index, and responds with the data.
|
||||||
|
|
||||||
|
|
||||||
from amaranth import Module, Signal, unsigned, Cat
|
from amaranth import Module, Cat, Mux, ShapeLike, Signal, Assert, Array, unsigned
|
||||||
from amaranth.build import Platform
|
from amaranth.build import Platform
|
||||||
from amaranth.lib import wiring, data
|
from amaranth.lib import wiring, data
|
||||||
from amaranth.lib.wiring import In, Out
|
from amaranth.lib.wiring import In, Out
|
||||||
|
from amaranth.lib.memory import Memory, ReadPort, WritePort
|
||||||
from amaranth.lib import stream
|
from amaranth.lib import stream
|
||||||
|
from amaranth.utils import ceil_log2
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from .common import Rgb888Layout
|
from .common import Rgb666Layout, Hub75Stream, Hub75Ctrl, Hub75Data, Rgb888Layout
|
||||||
from .geom import DisplayString
|
from .geom import DisplayRotation, DisplayString
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
# FIXME: sizing should be based off of screen size.
|
CoordLayout = data.StructLayout({"x": unsigned(32), "y": unsigned(32)})
|
||||||
CoordLayout = data.StructLayout({"x": unsigned(10), "y": unsigned(10)})
|
|
||||||
|
|
||||||
|
|
||||||
class AddressConverter(wiring.Component):
|
class AddressConverter(wiring.Component):
|
||||||
|
@ -30,7 +31,7 @@ class AddressConverter(wiring.Component):
|
||||||
{
|
{
|
||||||
"input_x": In(geom.dimensions.length),
|
"input_x": In(geom.dimensions.length),
|
||||||
"addr": In(unsigned(geom.dimensions.addr_bits)),
|
"addr": In(unsigned(geom.dimensions.addr_bits)),
|
||||||
"output": Out(data.ArrayLayout(CoordLayout, geom.dimensions.mux)),
|
"output": Out(CoordLayout).array(geom.dimensions.mux),
|
||||||
},
|
},
|
||||||
src_loc_at=src_loc_at,
|
src_loc_at=src_loc_at,
|
||||||
)
|
)
|
||||||
|
@ -39,9 +40,9 @@ class AddressConverter(wiring.Component):
|
||||||
m = Module()
|
m = Module()
|
||||||
|
|
||||||
for mux in range(self.geom.dimensions.mux):
|
for mux in range(self.geom.dimensions.mux):
|
||||||
o = self.geom.translate_coord(self.input_x, self.addr, mux)
|
m.d.comb += self.output[mux].eq(
|
||||||
m.d.comb += self.output[mux]["x"].eq(o["x"])
|
self.geom.translate_coord(self.input_x, self.addr, mux)
|
||||||
m.d.comb += self.output[mux]["y"].eq(o["y"])
|
)
|
||||||
|
|
||||||
return m
|
return m
|
||||||
|
|
||||||
|
@ -53,8 +54,7 @@ class AddressGenerator(wiring.Component):
|
||||||
self.geom = geom
|
self.geom = geom
|
||||||
super().__init__(
|
super().__init__(
|
||||||
{
|
{
|
||||||
"coordstream": Out(
|
"coordstream": Out( stream.Signature(CoordLayout.array(geom.dimensions.mux))
|
||||||
stream.Signature(data.ArrayLayout(CoordLayout, geom.dimensions.mux))
|
|
||||||
),
|
),
|
||||||
"start": In(1),
|
"start": In(1),
|
||||||
"done": Out(1),
|
"done": Out(1),
|
||||||
|
@ -68,37 +68,22 @@ class AddressGenerator(wiring.Component):
|
||||||
|
|
||||||
counter = Signal(self.geom.dimensions.length)
|
counter = Signal(self.geom.dimensions.length)
|
||||||
|
|
||||||
addr = Signal(self.addr.shape())
|
|
||||||
|
|
||||||
# based on the geometry we generate x,y pairs.
|
# based on the geometry we generate x,y pairs.
|
||||||
m.submodules.translate = translate = AddressConverter(self.geom)
|
m.submodules.translate = translate = AddressConverter(self.geom)
|
||||||
m.d.comb += translate.input_x.eq(counter)
|
|
||||||
m.d.comb += translate.addr.eq(addr)
|
|
||||||
|
|
||||||
m.d.comb += self.coordstream.payload.eq(translate.output)
|
|
||||||
|
|
||||||
with m.FSM():
|
with m.FSM():
|
||||||
with m.State("init"):
|
with m.State("init"):
|
||||||
m.d.comb += [self.done.eq(0), self.coordstream.valid.eq(0)]
|
m.d.comb += self.done.eq(0)
|
||||||
m.d.sync += [counter.eq(0), addr.eq(self.addr)]
|
m.d.sync += counter.eq(0)
|
||||||
|
m.d.comb += self.coordstream.valid.eq(0)
|
||||||
with m.If(self.start):
|
with m.If(self.start):
|
||||||
m.next = "run"
|
m.next = "run"
|
||||||
|
|
||||||
with m.State("run"):
|
with m.State("run"):
|
||||||
m.d.comb += self.coordstream.valid.eq(1)
|
|
||||||
# stream data out as long as it's valid.
|
# stream data out as long as it's valid.
|
||||||
|
|
||||||
with m.If(
|
|
||||||
self.coordstream.ready
|
|
||||||
& (counter == self.geom.dimensions.length - 1)
|
|
||||||
):
|
|
||||||
m.next = "done"
|
|
||||||
with m.Elif(self.coordstream.ready):
|
|
||||||
m.d.sync += counter.eq(counter + 1)
|
|
||||||
|
|
||||||
pass
|
pass
|
||||||
with m.State("done"):
|
with m.State("done"):
|
||||||
m.d.comb += self.coordstream.valid.eq(0)
|
|
||||||
m.d.comb += self.done.eq(1)
|
m.d.comb += self.done.eq(1)
|
||||||
m.next = "init"
|
m.next = "init"
|
||||||
|
|
||||||
|
@ -106,42 +91,17 @@ class AddressGenerator(wiring.Component):
|
||||||
|
|
||||||
|
|
||||||
class BasicFetcher(wiring.Component):
|
class BasicFetcher(wiring.Component):
|
||||||
"""A generic function-based fetcher. Takes a function of the form f(x,y: int) -> RGB."""
|
"""A generic fetcher. Takes a function of the form f(x,y: int) -> RGB."""
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self, geom: DisplayString, dfunc, data_shape=Rgb888Layout, *, src_loc_at=0
|
self, geom: DisplayString, dfunc, data_shape=Rgb888Layout, *, src_loc_at=0
|
||||||
):
|
):
|
||||||
self.geom = geom
|
|
||||||
self.dfunc = dfunc
|
self.dfunc = dfunc
|
||||||
super().__init__(
|
super().__init__(
|
||||||
{
|
{
|
||||||
"input": In(
|
"pixstream": Out(stream.Signature(Rgb888Layout)),
|
||||||
stream.Signature(data.ArrayLayout(CoordLayout, geom.dimensions.mux))
|
"start": In(1),
|
||||||
),
|
"addr": In(geom.dimensions.addr_bits),
|
||||||
"pixstream": Out(
|
|
||||||
stream.Signature(data.ArrayLayout(data_shape, geom.dimensions.mux))
|
|
||||||
),
|
|
||||||
},
|
},
|
||||||
src_loc_at=src_loc_at,
|
src_loc_at=src_loc_at,
|
||||||
)
|
)
|
||||||
|
|
||||||
def elaborate(self, platform: Platform) -> Module:
|
|
||||||
m = Module()
|
|
||||||
|
|
||||||
# test mode - pass through, r = x + y, g = x - y, b = {y,x}
|
|
||||||
|
|
||||||
colors = self.pixstream.payload
|
|
||||||
m.d.comb += [
|
|
||||||
self.input.valid.eq(self.pixstream.valid),
|
|
||||||
self.input.ready.eq(self.pixstream.ready),
|
|
||||||
]
|
|
||||||
|
|
||||||
for i in range(self.geom.dimensions.mux):
|
|
||||||
inp = self.input.payload[i]
|
|
||||||
m.d.comb += [
|
|
||||||
colors[i].red.eq(inp.x + inp.y),
|
|
||||||
colors[i].green.eq(inp.x - inp.y),
|
|
||||||
colors[i].blue.eq(inp.x ^ inp.y),
|
|
||||||
]
|
|
||||||
|
|
||||||
return m
|
|
||||||
|
|
|
@ -1,119 +0,0 @@
|
||||||
from amaranth.lib import wiring, data
|
|
||||||
from amaranth.sim import Simulator
|
|
||||||
import random
|
|
||||||
from random import randrange
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from groovylight.fetcher import AddressConverter, AddressGenerator, BasicFetcher
|
|
||||||
from groovylight.geom import DisplayString, Coord, DisplayDimensions, DisplayRotation
|
|
||||||
|
|
||||||
ds_testdata = [
|
|
||||||
(DisplayRotation.R0, (0, 0), {"x": 3, "y": 0}),
|
|
||||||
(DisplayRotation.R0, (40, 2), {"x": 43, "y": 2}),
|
|
||||||
(DisplayRotation.R90, (40, 0), {"x": 66, "y": 40}),
|
|
||||||
(DisplayRotation.R90, (120, 2), {"x": 64, "y": 120}),
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("rot, inp, expected", ds_testdata)
|
|
||||||
def test_converter(rot, inp, expected):
|
|
||||||
ds = DisplayString(Coord(3, 0), DisplayDimensions(128, 64), rot)
|
|
||||||
dut = AddressConverter(ds)
|
|
||||||
sim = Simulator(dut)
|
|
||||||
|
|
||||||
async def testbench(ctx):
|
|
||||||
await ctx.delay(1e-6)
|
|
||||||
ctx.set(dut.input_x, inp[0])
|
|
||||||
ctx.set(dut.addr, inp[1])
|
|
||||||
await ctx.delay(1e-6)
|
|
||||||
assert ctx.get(dut.output)[0]["x"] == expected["x"]
|
|
||||||
assert ctx.get(dut.output)[0]["y"] == expected["y"]
|
|
||||||
|
|
||||||
sim.add_testbench(testbench)
|
|
||||||
|
|
||||||
with sim.write_vcd("output.vcd"):
|
|
||||||
sim.run()
|
|
||||||
|
|
||||||
|
|
||||||
# Helper functions for stream management
|
|
||||||
async def stream_get(ctx, stream):
|
|
||||||
ctx.set(stream.ready, 1)
|
|
||||||
(payload,) = await ctx.tick().sample(stream.payload).until(stream.valid)
|
|
||||||
ctx.set(stream.ready, 0)
|
|
||||||
return payload
|
|
||||||
|
|
||||||
|
|
||||||
async def stream_put(ctx, stream, payload):
|
|
||||||
ctx.set(stream.valid, 1)
|
|
||||||
ctx.set(stream.payload, payload)
|
|
||||||
await ctx.tick().until(stream.ready)
|
|
||||||
ctx.set(stream.valid, 0)
|
|
||||||
|
|
||||||
|
|
||||||
generator_tests = [
|
|
||||||
(0, DisplayRotation.R0),
|
|
||||||
(0, DisplayRotation.R90),
|
|
||||||
(4, DisplayRotation.R90),
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("addr, rot", generator_tests)
|
|
||||||
def test_generator(addr, rot):
|
|
||||||
ds = DisplayString(Coord(3, 0), DisplayDimensions(128, 64), rot)
|
|
||||||
dut = AddressGenerator(ds)
|
|
||||||
sim = Simulator(dut)
|
|
||||||
sim.add_clock(1e-6)
|
|
||||||
|
|
||||||
async def runner(ctx):
|
|
||||||
# TODO: set inputs
|
|
||||||
ctx.set(dut.addr, addr)
|
|
||||||
await ctx.tick()
|
|
||||||
ctx.set(dut.start, 1)
|
|
||||||
await ctx.tick()
|
|
||||||
ctx.set(dut.start, 0)
|
|
||||||
await ctx.tick().until(dut.done == 1)
|
|
||||||
|
|
||||||
expected = [
|
|
||||||
[ds.translate_coord(x, addr, 0), ds.translate_coord(x, addr, 1)]
|
|
||||||
for x in range(128)
|
|
||||||
]
|
|
||||||
|
|
||||||
expected.reverse()
|
|
||||||
|
|
||||||
async def stream_checker(ctx):
|
|
||||||
while ctx.get(dut.done) == 0:
|
|
||||||
payload = await stream_get(ctx, dut.coordstream)
|
|
||||||
assert expected.pop() == payload
|
|
||||||
|
|
||||||
sim.add_testbench(runner)
|
|
||||||
sim.add_testbench(stream_checker)
|
|
||||||
with sim.write_vcd("generator.vcd"):
|
|
||||||
sim.run()
|
|
||||||
|
|
||||||
|
|
||||||
basicFetcher_tests = [
|
|
||||||
({"x": 0, "y": 0}, {"red": 0, "green": 0, "blue": 0}),
|
|
||||||
({"x": 1, "y": 1}, {"red": 2, "green": 0, "blue": 0}),
|
|
||||||
({"x": 9, "y": 1}, {"red": 10, "green": 8, "blue": 8}),
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("inp, expected", basicFetcher_tests)
|
|
||||||
def test_basic_fetcher(inp, expected):
|
|
||||||
ds = DisplayString(
|
|
||||||
Coord(3, 0), DisplayDimensions(128, 64, mux=1), DisplayRotation.R0
|
|
||||||
)
|
|
||||||
dut = BasicFetcher(ds, None)
|
|
||||||
sim = Simulator(dut)
|
|
||||||
|
|
||||||
async def test(ctx):
|
|
||||||
ctx.set(dut.input.payload[0], inp)
|
|
||||||
res = ctx.get(dut.pixstream.payload)[0]
|
|
||||||
assert res["red"] == expected["red"]
|
|
||||||
assert res["green"] == expected["green"]
|
|
||||||
assert res["blue"] == expected["blue"]
|
|
||||||
|
|
||||||
sim.add_testbench(test)
|
|
||||||
|
|
||||||
with sim.write_vcd("fetcher.vcd"):
|
|
||||||
sim.run()
|
|
|
@ -146,6 +146,7 @@ def test_datadriver_single(bcm):
|
||||||
with sim.write_vcd("output.vcd"):
|
with sim.write_vcd("output.vcd"):
|
||||||
sim.run()
|
sim.run()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skip()
|
@pytest.mark.skip()
|
||||||
def test_hub75_coordinator():
|
def test_hub75_coordinator():
|
||||||
m = Module()
|
m = Module()
|
||||||
|
|
Loading…
Reference in a new issue