From 6b63b17bb862770cf28dadd884a7d3d7d0e4f504 Mon Sep 17 00:00:00 2001 From: saji Date: Mon, 21 Oct 2024 12:42:35 -0500 Subject: [PATCH] working address stream generator --- src/groovylight/fetcher.py | 36 ++++++++--- src/groovylight/tests/test_fetcher.py | 90 +++++++++++++++++++++++++++ 2 files changed, 116 insertions(+), 10 deletions(-) create mode 100644 src/groovylight/tests/test_fetcher.py diff --git a/src/groovylight/fetcher.py b/src/groovylight/fetcher.py index fe9e7b8..bbd9413 100644 --- a/src/groovylight/fetcher.py +++ b/src/groovylight/fetcher.py @@ -19,7 +19,8 @@ from .geom import DisplayRotation, DisplayString logger = logging.getLogger(__name__) -CoordLayout = data.StructLayout({"x": unsigned(32), "y": unsigned(32)}) +# FIXME: sizing should be based off of screen size. +CoordLayout = data.StructLayout({"x": unsigned(10), "y": unsigned(10)}) class AddressConverter(wiring.Component): @@ -31,7 +32,7 @@ class AddressConverter(wiring.Component): { "input_x": In(geom.dimensions.length), "addr": In(unsigned(geom.dimensions.addr_bits)), - "output": Out(CoordLayout).array(geom.dimensions.mux), + "output": Out(data.ArrayLayout(CoordLayout, geom.dimensions.mux)), }, src_loc_at=src_loc_at, ) @@ -40,9 +41,9 @@ class AddressConverter(wiring.Component): m = Module() for mux in range(self.geom.dimensions.mux): - m.d.comb += self.output[mux].eq( - self.geom.translate_coord(self.input_x, self.addr, mux) - ) + o = self.geom.translate_coord(self.input_x, self.addr, mux) + m.d.comb += self.output[mux]["x"].eq(o["x"]) + m.d.comb += self.output[mux]["y"].eq(o["y"]) return m @@ -54,7 +55,8 @@ class AddressGenerator(wiring.Component): self.geom = geom super().__init__( { - "coordstream": Out( stream.Signature(CoordLayout.array(geom.dimensions.mux)) + "coordstream": Out( + stream.Signature(data.ArrayLayout(CoordLayout, geom.dimensions.mux)) ), "start": In(1), "done": Out(1), @@ -68,22 +70,36 @@ class AddressGenerator(wiring.Component): counter = Signal(self.geom.dimensions.length) + addr = Signal(self.addr.shape()) + # based on the geometry we generate x,y pairs. m.submodules.translate = translate = AddressConverter(self.geom) + m.d.comb += translate.input_x.eq(counter) + m.d.comb += translate.addr.eq(addr) + + m.d.comb += self.coordstream.payload.eq(translate.output) with m.FSM(): with m.State("init"): - m.d.comb += self.done.eq(0) - m.d.sync += counter.eq(0) - m.d.comb += self.coordstream.valid.eq(0) + m.d.comb += [self.done.eq(0), self.coordstream.valid.eq(0)] + m.d.sync += [counter.eq(0), addr.eq(self.addr)] with m.If(self.start): m.next = "run" with m.State("run"): + m.d.comb += self.coordstream.valid.eq(1) # stream data out as long as it's valid. + with m.If( + self.coordstream.ready & (counter == self.geom.dimensions.length - 1) + ): + m.next = "done" + with m.Elif(self.coordstream.ready): + m.d.sync += counter.eq(counter + 1) + pass with m.State("done"): + m.d.comb += self.coordstream.valid.eq(0) m.d.comb += self.done.eq(1) m.next = "init" @@ -99,7 +115,7 @@ class BasicFetcher(wiring.Component): self.dfunc = dfunc super().__init__( { - "pixstream": Out(stream.Signature(Rgb888Layout)), + "pixstream": Out(stream.Signature(data.ArrayLayout(Rgb888Layout, 2))), "start": In(1), "addr": In(geom.dimensions.addr_bits), }, diff --git a/src/groovylight/tests/test_fetcher.py b/src/groovylight/tests/test_fetcher.py new file mode 100644 index 0000000..e105fff --- /dev/null +++ b/src/groovylight/tests/test_fetcher.py @@ -0,0 +1,90 @@ +from amaranth.lib import wiring, data +from amaranth.sim import Simulator +import random +from random import randrange +import pytest + +from groovylight.fetcher import AddressConverter, AddressGenerator +from groovylight.geom import DisplayString, Coord, DisplayDimensions, DisplayRotation + +ds_testdata = [ + (DisplayRotation.R0, (0, 0), {"x": 3, "y": 0}), + (DisplayRotation.R0, (40, 2), {"x": 43, "y": 2}), + (DisplayRotation.R90, (40, 0), {"x": 66, "y": 40}), + (DisplayRotation.R90, (120, 2), {"x": 64, "y": 120}), +] + + +@pytest.mark.parametrize("rot, inp, expected", ds_testdata) +def test_converter(rot, inp, expected): + ds = DisplayString(Coord(3, 0), DisplayDimensions(128, 64), rot) + dut = AddressConverter(ds) + sim = Simulator(dut) + + async def testbench(ctx): + await ctx.delay(1e-6) + ctx.set(dut.input_x, inp[0]) + ctx.set(dut.addr, inp[1]) + await ctx.delay(1e-6) + assert ctx.get(dut.output)[0]["x"] == expected["x"] + assert ctx.get(dut.output)[0]["y"] == expected["y"] + + sim.add_testbench(testbench) + + with sim.write_vcd("output.vcd"): + sim.run() + + +# Helper functions for stream management +async def stream_get(ctx, stream): + ctx.set(stream.ready, 1) + (payload,) = await ctx.tick().sample(stream.payload).until(stream.valid) + ctx.set(stream.ready, 0) + return payload + + +async def stream_put(ctx, stream, payload): + ctx.set(stream.valid, 1) + ctx.set(stream.payload, payload) + await ctx.tick().until(stream.ready) + ctx.set(stream.valid, 0) + + +generator_tests = [ + (0, DisplayRotation.R0), + (0, DisplayRotation.R90), + (4, DisplayRotation.R90), + + ] + +@pytest.mark.parametrize("addr, rot", generator_tests) +def test_generator(addr, rot): + ds = DisplayString(Coord(3, 0), DisplayDimensions(128, 64), rot) + dut = AddressGenerator(ds) + sim = Simulator(dut) + sim.add_clock(1e-6) + + async def runner(ctx): + # TODO: set inputs + ctx.set(dut.addr, addr) + await ctx.tick() + ctx.set(dut.start, 1) + await ctx.tick() + ctx.set(dut.start, 0) + await ctx.tick().until(dut.done == 1) + + expected = [ + [ds.translate_coord(x, addr, 0), ds.translate_coord(x, addr, 1)] for x in range(128) + ] + + expected.reverse() + + async def stream_checker(ctx): + while ctx.get(dut.done) == 0: + payload = await stream_get(ctx, dut.coordstream) + assert expected.pop() == payload + + sim.add_testbench(runner) + sim.add_testbench(stream_checker) + with sim.write_vcd("generator.vcd"): + sim.run()