From 7a0f59c9f6dcce25a8785bcbb0337e32640bce21 Mon Sep 17 00:00:00 2001 From: saji Date: Sat, 28 Sep 2024 19:38:22 -0500 Subject: [PATCH] add more tests to display data driver fix timing issues with display driver --- src/groovylight/hub75.py | 153 +++++++++++++--------------- src/groovylight/tests/__init__.py | 7 ++ src/groovylight/tests/test_hub75.py | 67 ++++++------ 3 files changed, 112 insertions(+), 115 deletions(-) diff --git a/src/groovylight/hub75.py b/src/groovylight/hub75.py index 4b6096f..a0278a5 100644 --- a/src/groovylight/hub75.py +++ b/src/groovylight/hub75.py @@ -1,4 +1,4 @@ -from amaranth import Module, Cat, Mux, ShapeLike, Signal, Assert, Array +from amaranth import Module, Cat, Mux, Print, ShapeLike, Signal, Assert, Array from amaranth.build import Platform from amaranth.lib import wiring, data from amaranth.lib.wiring import In, Out @@ -80,6 +80,63 @@ class SwapBuffer(wiring.Component): return m +class DisplayClock(wiring.Component): + """Generates the display clock automatically with the correct delay + when start is asserted. Stops when `done` is strobed. + + Can either be used as a /2 clock divider or a /4 with the `double_fetch` parameter. + The startup delay is chosen based on the mode or can be set with `startup_delay` + """ + + def __init__(self, *, double_fetch: bool = True, startup_delay=None, src_loc_at=0): + self.double_fetch = double_fetch + if startup_delay is None: + self.startup_delay = 4 if double_fetch else 1 # FIXME: choose right values. + else: + self.startup_delay = startup_delay + + super().__init__( + { + "start": In(1), + "done": In(1), + "clk": Out(1), + }, + src_loc_at=src_loc_at, + ) + + def elaborate(self, platform: Platform) -> Module: + m = Module() + + counter = Signal(range(max(self.startup_delay, 2) + 1)) + + with m.FSM(): + with m.State("init"): + m.d.sync += [counter.eq(0), self.clk.eq(0)] + with m.If(self.start == 1): + m.next = "warmup" + with m.State("warmup"): + m.d.sync += counter.eq(counter + 1) + with m.If(counter == self.startup_delay - 1): + m.next = "run" + m.d.sync += counter.eq(0) + + with m.State("run"): + with m.If(self.done == 1): + m.next = "init" + if self.double_fetch: + m.d.sync += [ + self.clk.eq(~counter[1]), + counter.eq(counter + 1), + ] + else: + m.d.sync += [ + self.clk.eq(~counter), + counter.eq(~counter), + ] + + return m + + class Hub75DataDriver(wiring.Component): def __init__( self, @@ -117,6 +174,7 @@ class Hub75DataDriver(wiring.Component): m = Module() counter = Signal(32) + m.d.sync += counter.eq(counter + 1) pixnum = Signal(range(self.panel_length), init=self.panel_length - 1) if self.double_fetch: @@ -137,7 +195,7 @@ class Hub75DataDriver(wiring.Component): m.d.sync += [ self.done.eq(0), counter.eq(0), - pixnum.eq(pixnum.reset), + pixnum.eq(pixnum.init), ] if self.double_fetch: m.d.sync += pixrow.eq(0) @@ -145,30 +203,31 @@ class Hub75DataDriver(wiring.Component): m.d.sync += self.bram.en.eq(1) m.next = "prefetch" if self.double_fetch else "writerow" with m.State("prefetch"): - # TODO: do we need this + # Allow the BRAM to settle after being enabled. + m.d.sync += counter.eq(0) m.next = "writerow" with m.State("writerow"): if self.double_fetch: - c = counter[0:1] - with m.If(c == 0): + c = counter[0:2] + with m.If(c == 0b0): m.d.sync += [ self.data.rgb0.eq(ram_rgb_slice), pixrow.eq(1), ] - with m.If(c == 1): - m.d.sync += self.data.rgb1.eq(ram_rgb_slice) - with m.If(c == 2): + with m.If(c == 0b01): pass - - with m.If(c == 3): + with m.If(c == 0b10): + m.d.sync += self.data.rgb1.eq(ram_rgb_slice) m.d.sync += [ - counter.eq(0), pixnum.eq(pixnum - 1), pixrow.eq(0), ] with m.If(pixnum == 0): m.next = "done" + + with m.If(c == 0b11): + m.d.sync += counter.eq(0) else: with m.If(counter[0] == 0): m.d.sync += [ @@ -189,74 +248,6 @@ class Hub75DataDriver(wiring.Component): return m -class Hub75StringDriver(wiring.Component): - """A data driver for Hub75 panels. This accesses the line memory and feeds out the data. - It is controlled by a Hub75Coordinator to signal when it should run and what bit of the data - it should send. - """ - - def __init__(self, panel_length: int = 128, *, src_loc_at=0): - self.panel_length = panel_length - super().__init__( - { - "bcm_select": In(3), - "done": Out(1), - "start": In(1), - "bram": In( - ReadPort.Signature( - addr_width=ceil_log2(panel_length), - shape=data.ArrayLayout(Rgb666Layout, 2), - ) - ), - "display_out": Out(Hub75Data()), - }, - src_loc_at=src_loc_at, - ) - - def elaborate(self, platform: Platform) -> Module: - m = Module() - - self._counter = counter = Signal(32) # unused count is optimized out - m.d.sync += counter.eq(counter + 1) - - ram_rgb0_slice = self.bram.data[0].channel_slice(self.bcm_select) - ram_rgb1_slice = self.bram.data[1].channel_slice(self.bcm_select) - m.d.comb += [ - self.display_out.rgb0.eq(ram_rgb0_slice), - self.display_out.rgb1.eq(ram_rgb1_slice), - ] - m.d.sync += Assert(self.bcm_select < 6) - - pixnum = Signal(range(self.panel_length), init=self.panel_length - 1) - m.d.comb += self.bram.addr.eq(pixnum) - - with m.FSM(): - with m.State("init"): - m.d.sync += [ - self.done.eq(0), - counter.eq(0), - pixnum.eq(self.panel_length - 1), - ] - with m.If(self.start == 1): - m.d.sync += self.bram.en.eq(1) - m.next = "writerow" - with m.State("writerow"): - with m.If(counter[0] == 0): - # do nothing - pass - - with m.If((counter[0] == 1) & (pixnum != 0)): - m.d.sync += pixnum.eq(pixnum - 1) - - with m.Else(): - m.next = "done" - with m.State("done"): - m.d.sync += [self.done.eq(1), self.bram.en.eq(0)] - m.next = "init" - - return m - - class Hub75Coordinator(wiring.Component): """A shared-control hub75 driver""" @@ -285,7 +276,9 @@ class Hub75Coordinator(wiring.Component): for i in range(self.n_strings): sb = SwapBuffer(depth=128, shape=data.ArrayLayout(Rgb666Layout, 2)) bufs.append(sb) - stringdriver = Hub75StringDriver(128) + stringdriver = Hub75DataDriver( + 128, data_shape=Rgb666Layout, double_fetch=False + ) strings.append(stringdriver) wiring.connect(m, sb.read_port, stringdriver.bram_port) m.d.comb += [ diff --git a/src/groovylight/tests/__init__.py b/src/groovylight/tests/__init__.py index e69de29..443e36b 100644 --- a/src/groovylight/tests/__init__.py +++ b/src/groovylight/tests/__init__.py @@ -0,0 +1,7 @@ + +import pytest + + +@pytest.fixture() +def simfixture(request: pytest.FixtureRequest, tmp_path): + pass diff --git a/src/groovylight/tests/test_hub75.py b/src/groovylight/tests/test_hub75.py index f4406a3..3601330 100644 --- a/src/groovylight/tests/test_hub75.py +++ b/src/groovylight/tests/test_hub75.py @@ -7,9 +7,9 @@ import pytest from groovylight.common import Rgb888Layout, Rgb666Layout from groovylight.hub75 import ( + DisplayClock, Hub75Coordinator, Hub75DataDriver, - Hub75StringDriver, SwapBuffer, ) @@ -42,65 +42,62 @@ def test_swapbuffer(): sim.run_until(1e-6 * 1000) -def test_stringdriver(): - # the string driver test must - # 1. finish - # 2. strobe through all of the data in the array - # 3. slice the correct bit from the data. - m = Module() - m.submodules.dut = dut = Hub75StringDriver() - m.submodules.mem = mem = Memory( - shape=data.ArrayLayout(Rgb666Layout, 2), depth=128, init=[] - ) - port = mem.read_port() - - wiring.connect(m, port, dut.bram) - - async def testbench(ctx): - # select a bit, strobe start, read values, test against known. - ctx.set(dut.bcm_select, 5) - ctx.set(dut.start, 1) - await ctx.tick() - ctx.set(dut.start, 0) - assert ctx.get(dut.bram.en) == 1 - pass - - sim = Simulator(m) - sim.add_clock(1e-6) - - with sim.write_vcd("output.vcd"): - sim.run_until(1e-6 * 1000) - - def test_datadriver(): # the string driver test must # 1. finish # 2. strobe through all of the data in the array # 3. slice the correct bit from the data. + memdata = [{"red": x, "green": x, "blue": x} for x in range(256)] m = Module() m.submodules.dut = dut = Hub75DataDriver() - m.submodules.mem = mem = Memory(shape=Rgb888Layout, depth=256, init=[]) + m.submodules.mem = mem = Memory(shape=Rgb888Layout, depth=256, init=memdata) + m.submodules.clocker = clocker = DisplayClock() + m.d.comb += [ + clocker.start.eq(dut.start), + clocker.done.eq(dut.done), + ] + port = mem.read_port() wiring.connect(m, port, dut.bram) async def testbench(ctx): # select a bit, strobe start, read values, test against known. - ctx.set(dut.bcm_select, 5) + ctx.set(dut.bcm_select, 7) ctx.set(dut.start, 1) await ctx.tick() ctx.set(dut.start, 0) assert ctx.get(dut.bram.en) == 1 + await ctx.tick().until(dut.done == 1) + + async def rgbtest(ctx): + await ctx.tick().until(dut.start == 1) + counter = 127 + bitslice = 7 + async for _, rgb0, rgb1 in ctx.posedge(clocker.clk).sample(dut.data.rgb0, dut.data.rgb1): + assert counter >= 0, "should not do more than 128 clocks" + e0 = ctx.get(mem.data[counter << 1]) + e1 = ctx.get(mem.data[(counter << 1) + 1]) + print(counter) + for r, e in [(rgb0, e0), (rgb1, e1)]: + assert r.red == (e.red >> bitslice) & 1 + assert r.green == (e.green >> bitslice) & 1 + assert r.blue == (e.blue >> bitslice) & 1 + counter = counter - 1 + sim = Simulator(m) sim.add_clock(1e-6) + sim.add_testbench(testbench) + sim.add_testbench(rgbtest, background=True) with sim.write_vcd("output.vcd"): - sim.run_until(1e-6 * 1000) + sim.run() + # sim.run_until(1e-6 * 4000) @pytest.mark.skip() -def test_hub75(): +def test_hub75_coordinator(): m = Module() m.submodules.dut = dut = Hub75Coordinator(1)