Compare commits

...

3 commits

Author SHA1 Message Date
saji 7a0f59c9f6 add more tests to display data driver
All checks were successful
Unit Tests / Test (push) Successful in 2m25s
fix timing issues with display driver
2024-09-29 14:35:45 -05:00
saji 245108a07a move swapbuffer test to hub75.py 2024-09-28 17:43:22 -05:00
saji 8477698d77 add rgbview with channel_slice operator 2024-09-28 17:42:48 -05:00
6 changed files with 189 additions and 169 deletions

View file

@ -1,4 +1,4 @@
from amaranth import Array, unsigned
from amaranth import unsigned, Cat
from amaranth.lib import wiring, data
from amaranth.lib.wiring import Out
@ -13,6 +13,9 @@ class RGBLayout(data.StructLayout):
}
)
def __call__(self, value):
return RGBView(self, value)
Rgb888Layout = RGBLayout(8, 8, 8)
Rgb666Layout = RGBLayout(6, 6, 6)
@ -20,6 +23,23 @@ Rgb666Layout = RGBLayout(6, 6, 6)
Rgb111Layout = RGBLayout(1, 1, 1)
class RGBView(data.View):
def channel_size(self) -> int:
return self.red.shape()
def channel_slice(self, bit: int) -> Rgb111Layout:
"""Select bits from each channel and use it to form an Rgb111Layout.
This is useful for BCM stuff, since the bits are sliced to form a bitplane.
"""
return Rgb111Layout(
Cat(
self.red.bit_select(bit, 1),
self.green.bit_select(bit, 1),
self.blue.bit_select(bit, 1),
)
)
class Hub75Stream(wiring.Signature):
"""A Hub75E Driver for a single string of panels."""

View file

@ -1,4 +1,4 @@
from amaranth import Module, Cat, Mux, ShapeLike, Signal, Assert, Array
from amaranth import Module, Cat, Mux, Print, ShapeLike, Signal, Assert, Array
from amaranth.build import Platform
from amaranth.lib import wiring, data
from amaranth.lib.wiring import In, Out
@ -80,6 +80,63 @@ class SwapBuffer(wiring.Component):
return m
class DisplayClock(wiring.Component):
"""Generates the display clock automatically with the correct delay
when start is asserted. Stops when `done` is strobed.
Can either be used as a /2 clock divider or a /4 with the `double_fetch` parameter.
The startup delay is chosen based on the mode or can be set with `startup_delay`
"""
def __init__(self, *, double_fetch: bool = True, startup_delay=None, src_loc_at=0):
self.double_fetch = double_fetch
if startup_delay is None:
self.startup_delay = 4 if double_fetch else 1 # FIXME: choose right values.
else:
self.startup_delay = startup_delay
super().__init__(
{
"start": In(1),
"done": In(1),
"clk": Out(1),
},
src_loc_at=src_loc_at,
)
def elaborate(self, platform: Platform) -> Module:
m = Module()
counter = Signal(range(max(self.startup_delay, 2) + 1))
with m.FSM():
with m.State("init"):
m.d.sync += [counter.eq(0), self.clk.eq(0)]
with m.If(self.start == 1):
m.next = "warmup"
with m.State("warmup"):
m.d.sync += counter.eq(counter + 1)
with m.If(counter == self.startup_delay - 1):
m.next = "run"
m.d.sync += counter.eq(0)
with m.State("run"):
with m.If(self.done == 1):
m.next = "init"
if self.double_fetch:
m.d.sync += [
self.clk.eq(~counter[1]),
counter.eq(counter + 1),
]
else:
m.d.sync += [
self.clk.eq(~counter),
counter.eq(~counter),
]
return m
class Hub75DataDriver(wiring.Component):
def __init__(
self,
@ -117,40 +174,28 @@ class Hub75DataDriver(wiring.Component):
m = Module()
counter = Signal(32)
m.d.sync += counter.eq(counter + 1)
pixnum = Signal(range(self.panel_length), init=self.panel_length - 1)
if self.double_fetch:
pixrow = Signal(1)
m.d.comb += self.bram.addr.eq(Cat(pixrow, pixnum))
ram_rgb_slice = Cat(
self.bram.data["red"].bit_select(self.bcm_select, 1),
self.bram.data["blue"].bit_select(self.bcm_select, 1),
self.bram.data["green"].bit_select(self.bcm_select, 1),
)
ram_rgb_slice = self.bram.data.channel_slice(self.bcm_select)
else:
m.d.comb += self.bram.addr.eq(pixnum)
ram_rgb_slice = Array(
[
Cat(
self.bram.data[0]["red"].bit_select(self.bcm_select, 1),
self.bram.data[0]["blue"].bit_select(self.bcm_select, 1),
self.bram.data[0]["green"].bit_select(self.bcm_select, 1),
),
Cat(
self.bram.data[1]["red"].bit_select(self.bcm_select, 1),
self.bram.data[1]["blue"].bit_select(self.bcm_select, 1),
self.bram.data[1]["green"].bit_select(self.bcm_select, 1),
),
self.bram.data[0].channel_slice(self.bcm_select),
self.bram.data[1].channel_slice(self.bcm_select),
]
)
m.d.comb += self.bram.addr.eq(pixnum)
with m.FSM():
with m.State("init"):
m.d.sync += [
self.done.eq(0),
counter.eq(0),
pixnum.eq(pixnum.reset),
pixnum.eq(pixnum.init),
]
if self.double_fetch:
m.d.sync += pixrow.eq(0)
@ -158,30 +203,31 @@ class Hub75DataDriver(wiring.Component):
m.d.sync += self.bram.en.eq(1)
m.next = "prefetch" if self.double_fetch else "writerow"
with m.State("prefetch"):
# TODO: do we need this
# Allow the BRAM to settle after being enabled.
m.d.sync += counter.eq(0)
m.next = "writerow"
with m.State("writerow"):
if self.double_fetch:
c = counter[0:1]
with m.If(c == 0):
c = counter[0:2]
with m.If(c == 0b0):
m.d.sync += [
self.data.rgb0.eq(ram_rgb_slice),
pixrow.eq(1),
]
with m.If(c == 1):
m.d.sync += self.data.rgb1.eq(ram_rgb_slice)
with m.If(c == 2):
with m.If(c == 0b01):
pass
with m.If(c == 3):
with m.If(c == 0b10):
m.d.sync += self.data.rgb1.eq(ram_rgb_slice)
m.d.sync += [
counter.eq(0),
pixnum.eq(pixnum - 1),
pixrow.eq(0),
]
with m.If(pixnum == 0):
m.next = "done"
with m.If(c == 0b11):
m.d.sync += counter.eq(0)
else:
with m.If(counter[0] == 0):
m.d.sync += [
@ -202,82 +248,6 @@ class Hub75DataDriver(wiring.Component):
return m
class Hub75StringDriver(wiring.Component):
"""A data driver for Hub75 panels. This accesses the line memory and feeds out the data.
It is controlled by a Hub75Coordinator to signal when it should run and what bit of the data
it should send.
"""
def __init__(self, panel_length: int = 128, *, src_loc_at=0):
self.panel_length = panel_length
super().__init__(
{
"bcm_select": In(3),
"done": Out(1),
"start": In(1),
"bram_port": In(
ReadPort.Signature(
addr_width=ceil_log2(panel_length),
shape=data.ArrayLayout(Rgb666Layout, 2),
)
),
"display_out": Out(Hub75Data()),
},
src_loc_at=src_loc_at,
)
def elaborate(self, platform: Platform) -> Module:
m = Module()
self._counter = counter = Signal(32) # unused count is optimized out
m.d.sync += counter.eq(counter + 1)
ram_rgb0_slice = Cat(
self.bram_port.data[0]["red"].bit_select(self.bcm_select, 1),
self.bram_port.data[0]["blue"].bit_select(self.bcm_select, 1),
self.bram_port.data[0]["green"].bit_select(self.bcm_select, 1),
)
ram_rgb1_slice = Cat(
self.bram_port.data[1]["red"].bit_select(self.bcm_select, 1),
self.bram_port.data[1]["blue"].bit_select(self.bcm_select, 1),
self.bram_port.data[1]["green"].bit_select(self.bcm_select, 1),
)
m.d.comb += [
self.display_out.rgb0.eq(ram_rgb0_slice),
self.display_out.rgb1.eq(ram_rgb1_slice),
]
m.d.sync += Assert(self.bcm_select < 6)
pixnum = Signal(range(self.panel_length), init=self.panel_length - 1)
m.d.comb += self.bram_port.addr.eq(pixnum)
with m.FSM():
with m.State("init"):
m.d.sync += [
self.done.eq(0),
counter.eq(0),
pixnum.eq(self.panel_length - 1),
]
with m.If(self.start == 1):
m.d.sync += self.bram_port.en.eq(1)
m.next = "writerow"
with m.State("writerow"):
with m.If(counter[0] == 0):
# do nothing
pass
with m.If((counter[0] == 1) & (pixnum != 0)):
m.d.sync += pixnum.eq(pixnum - 1)
with m.Else():
m.next = "done"
with m.State("done"):
m.d.sync += [self.done.eq(1), self.bram_port.en.eq(0)]
m.next = "init"
return m
class Hub75Coordinator(wiring.Component):
"""A shared-control hub75 driver"""
@ -306,7 +276,9 @@ class Hub75Coordinator(wiring.Component):
for i in range(self.n_strings):
sb = SwapBuffer(depth=128, shape=data.ArrayLayout(Rgb666Layout, 2))
bufs.append(sb)
stringdriver = Hub75StringDriver(128)
stringdriver = Hub75DataDriver(
128, data_shape=Rgb666Layout, double_fetch=False
)
strings.append(stringdriver)
wiring.connect(m, sb.read_port, stringdriver.bram_port)
m.d.comb += [

View file

@ -0,0 +1,7 @@
import pytest
@pytest.fixture()
def simfixture(request: pytest.FixtureRequest, tmp_path):
pass

View file

@ -0,0 +1,23 @@
from amaranth import unsigned
import pytest
from groovylight.common import Rgb888Layout, Rgb666Layout, RGBView
def test_rgbview():
rgb = Rgb888Layout(0xAABBCC)
assert rgb.channel_size() == unsigned(8)
rgb18 = Rgb666Layout(0x2DEFD)
slice = rgb.channel_slice(1)
assert isinstance(slice, RGBView), "channel_slice should return another rgbview"
assert slice.channel_size() == unsigned(1), "channel_slice channel size should be 1"
assert isinstance(
rgb18.channel_slice(5), RGBView
), "channel_slice should return another rgbview"
with pytest.raises(ValueError, match="Target of a view is 0 bit"):
rgb.channel_slice(8)

View file

@ -4,37 +4,40 @@ from amaranth.lib.memory import Memory
from amaranth.sim import Simulator
import pytest
from groovylight.common import Rgb888Layout
from groovylight.common import Rgb888Layout, Rgb666Layout
from ..hub75 import Hub75Coordinator, Hub75DataDriver, Hub75StringDriver, Rgb666Layout
from groovylight.hub75 import (
DisplayClock,
Hub75Coordinator,
Hub75DataDriver,
SwapBuffer,
)
def test_stringdriver():
# the string driver test must
# 1. finish
# 2. strobe through all of the data in the array
# 3. slice the correct bit from the data.
m = Module()
m.submodules.dut = dut = Hub75StringDriver()
m.submodules.mem = mem = Memory(
shape=data.ArrayLayout(Rgb666Layout, 2), depth=128, init=[]
)
port = mem.read_port()
wiring.connect(m, port, dut.bram_port)
async def testbench(ctx):
# select a bit, strobe start, read values, test against known.
ctx.set(dut.bcm_select, 5)
ctx.set(dut.start, 1)
await ctx.tick()
ctx.set(dut.start, 0)
assert ctx.get(dut.bram_port.en) == 1
pass
sim = Simulator(m)
def test_swapbuffer():
dut = SwapBuffer(Rgb666Layout, 512)
sim = Simulator(dut)
sim.add_clock(1e-6)
async def testbench(ctx):
init_color = {"red": 0, "green": 0, "blue": 0}
test_color = {"red": 8, "green": 8, "blue": 8}
ctx.set(dut.selector, 0)
ctx.set(dut.write_port.addr, 1)
ctx.set(dut.read_port.addr, 1)
ctx.set(dut.write_port.data, test_color)
await ctx.tick()
# assert that the read port addr 1 = 0
assert ctx.get(dut.read_port.data) == init_color
# swap buffer
ctx.set(dut.selector, 1)
await ctx.tick().repeat(
2
) # takes two clocks after switching selector to output data.
assert ctx.get(dut.read_port.data) == test_color
# TODO: add more assertions/verification
sim.add_testbench(testbench)
with sim.write_vcd("output.vcd"):
sim.run_until(1e-6 * 1000)
@ -44,31 +47,57 @@ def test_datadriver():
# 1. finish
# 2. strobe through all of the data in the array
# 3. slice the correct bit from the data.
memdata = [{"red": x, "green": x, "blue": x} for x in range(256)]
m = Module()
m.submodules.dut = dut = Hub75DataDriver()
m.submodules.mem = mem = Memory(shape=Rgb888Layout, depth=256, init=[])
m.submodules.mem = mem = Memory(shape=Rgb888Layout, depth=256, init=memdata)
m.submodules.clocker = clocker = DisplayClock()
m.d.comb += [
clocker.start.eq(dut.start),
clocker.done.eq(dut.done),
]
port = mem.read_port()
wiring.connect(m, port, dut.bram)
async def testbench(ctx):
# select a bit, strobe start, read values, test against known.
ctx.set(dut.bcm_select, 5)
ctx.set(dut.bcm_select, 7)
ctx.set(dut.start, 1)
await ctx.tick()
ctx.set(dut.start, 0)
assert ctx.get(dut.bram_port.en) == 1
pass
assert ctx.get(dut.bram.en) == 1
await ctx.tick().until(dut.done == 1)
async def rgbtest(ctx):
await ctx.tick().until(dut.start == 1)
counter = 127
bitslice = 7
async for _, rgb0, rgb1 in ctx.posedge(clocker.clk).sample(dut.data.rgb0, dut.data.rgb1):
assert counter >= 0, "should not do more than 128 clocks"
e0 = ctx.get(mem.data[counter << 1])
e1 = ctx.get(mem.data[(counter << 1) + 1])
print(counter)
for r, e in [(rgb0, e0), (rgb1, e1)]:
assert r.red == (e.red >> bitslice) & 1
assert r.green == (e.green >> bitslice) & 1
assert r.blue == (e.blue >> bitslice) & 1
counter = counter - 1
sim = Simulator(m)
sim.add_clock(1e-6)
sim.add_testbench(testbench)
sim.add_testbench(rgbtest, background=True)
with sim.write_vcd("output.vcd"):
sim.run_until(1e-6 * 1000)
sim.run()
# sim.run_until(1e-6 * 4000)
@pytest.mark.skip()
def test_hub75():
def test_hub75_coordinator():
m = Module()
m.submodules.dut = dut = Hub75Coordinator(1)

View file

@ -1,31 +0,0 @@
from amaranth.sim import Simulator
from ..hub75 import Hub75StringDriver, Rgb666Layout, SwapBuffer
def test_swapbuffer():
dut = SwapBuffer(Rgb666Layout, 512)
sim = Simulator(dut)
sim.add_clock(1e-6)
async def testbench(ctx):
init_color = {"red": 0, "green": 0, "blue": 0}
test_color = {"red": 8, "green": 8, "blue": 8}
ctx.set(dut.selector, 0)
ctx.set(dut.write_port.addr, 1)
ctx.set(dut.read_port.addr, 1)
ctx.set(dut.write_port.data, test_color)
await ctx.tick()
# assert that the read port addr 1 = 0
assert ctx.get(dut.read_port.data) == init_color
# swap buffer
ctx.set(dut.selector, 1)
await ctx.tick().repeat(
2
) # takes two clocks after switching selector to output data.
assert ctx.get(dut.read_port.data) == test_color
# TODO: add more assertions/verification
sim.add_testbench(testbench)
with sim.write_vcd("output.vcd"):
sim.run_until(1e-6 * 1000)