working address stream generator

This commit is contained in:
saji 2024-10-21 12:42:35 -05:00
parent 2893262f87
commit 6b63b17bb8
2 changed files with 116 additions and 10 deletions

View file

@ -19,7 +19,8 @@ from .geom import DisplayRotation, DisplayString
logger = logging.getLogger(__name__)
CoordLayout = data.StructLayout({"x": unsigned(32), "y": unsigned(32)})
# FIXME: sizing should be based off of screen size.
CoordLayout = data.StructLayout({"x": unsigned(10), "y": unsigned(10)})
class AddressConverter(wiring.Component):
@ -31,7 +32,7 @@ class AddressConverter(wiring.Component):
{
"input_x": In(geom.dimensions.length),
"addr": In(unsigned(geom.dimensions.addr_bits)),
"output": Out(CoordLayout).array(geom.dimensions.mux),
"output": Out(data.ArrayLayout(CoordLayout, geom.dimensions.mux)),
},
src_loc_at=src_loc_at,
)
@ -40,9 +41,9 @@ class AddressConverter(wiring.Component):
m = Module()
for mux in range(self.geom.dimensions.mux):
m.d.comb += self.output[mux].eq(
self.geom.translate_coord(self.input_x, self.addr, mux)
)
o = self.geom.translate_coord(self.input_x, self.addr, mux)
m.d.comb += self.output[mux]["x"].eq(o["x"])
m.d.comb += self.output[mux]["y"].eq(o["y"])
return m
@ -54,7 +55,8 @@ class AddressGenerator(wiring.Component):
self.geom = geom
super().__init__(
{
"coordstream": Out( stream.Signature(CoordLayout.array(geom.dimensions.mux))
"coordstream": Out(
stream.Signature(data.ArrayLayout(CoordLayout, geom.dimensions.mux))
),
"start": In(1),
"done": Out(1),
@ -68,22 +70,36 @@ class AddressGenerator(wiring.Component):
counter = Signal(self.geom.dimensions.length)
addr = Signal(self.addr.shape())
# based on the geometry we generate x,y pairs.
m.submodules.translate = translate = AddressConverter(self.geom)
m.d.comb += translate.input_x.eq(counter)
m.d.comb += translate.addr.eq(addr)
m.d.comb += self.coordstream.payload.eq(translate.output)
with m.FSM():
with m.State("init"):
m.d.comb += self.done.eq(0)
m.d.sync += counter.eq(0)
m.d.comb += self.coordstream.valid.eq(0)
m.d.comb += [self.done.eq(0), self.coordstream.valid.eq(0)]
m.d.sync += [counter.eq(0), addr.eq(self.addr)]
with m.If(self.start):
m.next = "run"
with m.State("run"):
m.d.comb += self.coordstream.valid.eq(1)
# stream data out as long as it's valid.
with m.If(
self.coordstream.ready & (counter == self.geom.dimensions.length - 1)
):
m.next = "done"
with m.Elif(self.coordstream.ready):
m.d.sync += counter.eq(counter + 1)
pass
with m.State("done"):
m.d.comb += self.coordstream.valid.eq(0)
m.d.comb += self.done.eq(1)
m.next = "init"
@ -99,7 +115,7 @@ class BasicFetcher(wiring.Component):
self.dfunc = dfunc
super().__init__(
{
"pixstream": Out(stream.Signature(Rgb888Layout)),
"pixstream": Out(stream.Signature(data.ArrayLayout(Rgb888Layout, 2))),
"start": In(1),
"addr": In(geom.dimensions.addr_bits),
},

View file

@ -0,0 +1,90 @@
from amaranth.lib import wiring, data
from amaranth.sim import Simulator
import random
from random import randrange
import pytest
from groovylight.fetcher import AddressConverter, AddressGenerator
from groovylight.geom import DisplayString, Coord, DisplayDimensions, DisplayRotation
ds_testdata = [
(DisplayRotation.R0, (0, 0), {"x": 3, "y": 0}),
(DisplayRotation.R0, (40, 2), {"x": 43, "y": 2}),
(DisplayRotation.R90, (40, 0), {"x": 66, "y": 40}),
(DisplayRotation.R90, (120, 2), {"x": 64, "y": 120}),
]
@pytest.mark.parametrize("rot, inp, expected", ds_testdata)
def test_converter(rot, inp, expected):
ds = DisplayString(Coord(3, 0), DisplayDimensions(128, 64), rot)
dut = AddressConverter(ds)
sim = Simulator(dut)
async def testbench(ctx):
await ctx.delay(1e-6)
ctx.set(dut.input_x, inp[0])
ctx.set(dut.addr, inp[1])
await ctx.delay(1e-6)
assert ctx.get(dut.output)[0]["x"] == expected["x"]
assert ctx.get(dut.output)[0]["y"] == expected["y"]
sim.add_testbench(testbench)
with sim.write_vcd("output.vcd"):
sim.run()
# Helper functions for stream management
async def stream_get(ctx, stream):
ctx.set(stream.ready, 1)
(payload,) = await ctx.tick().sample(stream.payload).until(stream.valid)
ctx.set(stream.ready, 0)
return payload
async def stream_put(ctx, stream, payload):
ctx.set(stream.valid, 1)
ctx.set(stream.payload, payload)
await ctx.tick().until(stream.ready)
ctx.set(stream.valid, 0)
generator_tests = [
(0, DisplayRotation.R0),
(0, DisplayRotation.R90),
(4, DisplayRotation.R90),
]
@pytest.mark.parametrize("addr, rot", generator_tests)
def test_generator(addr, rot):
ds = DisplayString(Coord(3, 0), DisplayDimensions(128, 64), rot)
dut = AddressGenerator(ds)
sim = Simulator(dut)
sim.add_clock(1e-6)
async def runner(ctx):
# TODO: set inputs
ctx.set(dut.addr, addr)
await ctx.tick()
ctx.set(dut.start, 1)
await ctx.tick()
ctx.set(dut.start, 0)
await ctx.tick().until(dut.done == 1)
expected = [
[ds.translate_coord(x, addr, 0), ds.translate_coord(x, addr, 1)] for x in range(128)
]
expected.reverse()
async def stream_checker(ctx):
while ctx.get(dut.done) == 0:
payload = await stream_get(ctx, dut.coordstream)
assert expected.pop() == payload
sim.add_testbench(runner)
sim.add_testbench(stream_checker)
with sim.write_vcd("generator.vcd"):
sim.run()