Compare commits

...

3 commits

Author SHA1 Message Date
saji 7a0f59c9f6 add more tests to display data driver
All checks were successful
Unit Tests / Test (push) Successful in 2m25s
fix timing issues with display driver
2024-09-29 14:35:45 -05:00
saji 245108a07a move swapbuffer test to hub75.py 2024-09-28 17:43:22 -05:00
saji 8477698d77 add rgbview with channel_slice operator 2024-09-28 17:42:48 -05:00
6 changed files with 189 additions and 169 deletions

View file

@ -1,4 +1,4 @@
from amaranth import Array, unsigned from amaranth import unsigned, Cat
from amaranth.lib import wiring, data from amaranth.lib import wiring, data
from amaranth.lib.wiring import Out from amaranth.lib.wiring import Out
@ -13,6 +13,9 @@ class RGBLayout(data.StructLayout):
} }
) )
def __call__(self, value):
return RGBView(self, value)
Rgb888Layout = RGBLayout(8, 8, 8) Rgb888Layout = RGBLayout(8, 8, 8)
Rgb666Layout = RGBLayout(6, 6, 6) Rgb666Layout = RGBLayout(6, 6, 6)
@ -20,6 +23,23 @@ Rgb666Layout = RGBLayout(6, 6, 6)
Rgb111Layout = RGBLayout(1, 1, 1) Rgb111Layout = RGBLayout(1, 1, 1)
class RGBView(data.View):
def channel_size(self) -> int:
return self.red.shape()
def channel_slice(self, bit: int) -> Rgb111Layout:
"""Select bits from each channel and use it to form an Rgb111Layout.
This is useful for BCM stuff, since the bits are sliced to form a bitplane.
"""
return Rgb111Layout(
Cat(
self.red.bit_select(bit, 1),
self.green.bit_select(bit, 1),
self.blue.bit_select(bit, 1),
)
)
class Hub75Stream(wiring.Signature): class Hub75Stream(wiring.Signature):
"""A Hub75E Driver for a single string of panels.""" """A Hub75E Driver for a single string of panels."""

View file

@ -1,4 +1,4 @@
from amaranth import Module, Cat, Mux, ShapeLike, Signal, Assert, Array from amaranth import Module, Cat, Mux, Print, ShapeLike, Signal, Assert, Array
from amaranth.build import Platform from amaranth.build import Platform
from amaranth.lib import wiring, data from amaranth.lib import wiring, data
from amaranth.lib.wiring import In, Out from amaranth.lib.wiring import In, Out
@ -80,6 +80,63 @@ class SwapBuffer(wiring.Component):
return m return m
class DisplayClock(wiring.Component):
"""Generates the display clock automatically with the correct delay
when start is asserted. Stops when `done` is strobed.
Can either be used as a /2 clock divider or a /4 with the `double_fetch` parameter.
The startup delay is chosen based on the mode or can be set with `startup_delay`
"""
def __init__(self, *, double_fetch: bool = True, startup_delay=None, src_loc_at=0):
self.double_fetch = double_fetch
if startup_delay is None:
self.startup_delay = 4 if double_fetch else 1 # FIXME: choose right values.
else:
self.startup_delay = startup_delay
super().__init__(
{
"start": In(1),
"done": In(1),
"clk": Out(1),
},
src_loc_at=src_loc_at,
)
def elaborate(self, platform: Platform) -> Module:
m = Module()
counter = Signal(range(max(self.startup_delay, 2) + 1))
with m.FSM():
with m.State("init"):
m.d.sync += [counter.eq(0), self.clk.eq(0)]
with m.If(self.start == 1):
m.next = "warmup"
with m.State("warmup"):
m.d.sync += counter.eq(counter + 1)
with m.If(counter == self.startup_delay - 1):
m.next = "run"
m.d.sync += counter.eq(0)
with m.State("run"):
with m.If(self.done == 1):
m.next = "init"
if self.double_fetch:
m.d.sync += [
self.clk.eq(~counter[1]),
counter.eq(counter + 1),
]
else:
m.d.sync += [
self.clk.eq(~counter),
counter.eq(~counter),
]
return m
class Hub75DataDriver(wiring.Component): class Hub75DataDriver(wiring.Component):
def __init__( def __init__(
self, self,
@ -117,40 +174,28 @@ class Hub75DataDriver(wiring.Component):
m = Module() m = Module()
counter = Signal(32) counter = Signal(32)
m.d.sync += counter.eq(counter + 1)
pixnum = Signal(range(self.panel_length), init=self.panel_length - 1) pixnum = Signal(range(self.panel_length), init=self.panel_length - 1)
if self.double_fetch: if self.double_fetch:
pixrow = Signal(1) pixrow = Signal(1)
m.d.comb += self.bram.addr.eq(Cat(pixrow, pixnum)) m.d.comb += self.bram.addr.eq(Cat(pixrow, pixnum))
ram_rgb_slice = Cat( ram_rgb_slice = self.bram.data.channel_slice(self.bcm_select)
self.bram.data["red"].bit_select(self.bcm_select, 1),
self.bram.data["blue"].bit_select(self.bcm_select, 1),
self.bram.data["green"].bit_select(self.bcm_select, 1),
)
else: else:
m.d.comb += self.bram.addr.eq(pixnum)
ram_rgb_slice = Array( ram_rgb_slice = Array(
[ [
Cat( self.bram.data[0].channel_slice(self.bcm_select),
self.bram.data[0]["red"].bit_select(self.bcm_select, 1), self.bram.data[1].channel_slice(self.bcm_select),
self.bram.data[0]["blue"].bit_select(self.bcm_select, 1),
self.bram.data[0]["green"].bit_select(self.bcm_select, 1),
),
Cat(
self.bram.data[1]["red"].bit_select(self.bcm_select, 1),
self.bram.data[1]["blue"].bit_select(self.bcm_select, 1),
self.bram.data[1]["green"].bit_select(self.bcm_select, 1),
),
] ]
) )
m.d.comb += self.bram.addr.eq(pixnum)
with m.FSM(): with m.FSM():
with m.State("init"): with m.State("init"):
m.d.sync += [ m.d.sync += [
self.done.eq(0), self.done.eq(0),
counter.eq(0), counter.eq(0),
pixnum.eq(pixnum.reset), pixnum.eq(pixnum.init),
] ]
if self.double_fetch: if self.double_fetch:
m.d.sync += pixrow.eq(0) m.d.sync += pixrow.eq(0)
@ -158,30 +203,31 @@ class Hub75DataDriver(wiring.Component):
m.d.sync += self.bram.en.eq(1) m.d.sync += self.bram.en.eq(1)
m.next = "prefetch" if self.double_fetch else "writerow" m.next = "prefetch" if self.double_fetch else "writerow"
with m.State("prefetch"): with m.State("prefetch"):
# TODO: do we need this # Allow the BRAM to settle after being enabled.
m.d.sync += counter.eq(0)
m.next = "writerow" m.next = "writerow"
with m.State("writerow"): with m.State("writerow"):
if self.double_fetch: if self.double_fetch:
c = counter[0:1] c = counter[0:2]
with m.If(c == 0): with m.If(c == 0b0):
m.d.sync += [ m.d.sync += [
self.data.rgb0.eq(ram_rgb_slice), self.data.rgb0.eq(ram_rgb_slice),
pixrow.eq(1), pixrow.eq(1),
] ]
with m.If(c == 1): with m.If(c == 0b01):
m.d.sync += self.data.rgb1.eq(ram_rgb_slice)
with m.If(c == 2):
pass pass
with m.If(c == 0b10):
with m.If(c == 3): m.d.sync += self.data.rgb1.eq(ram_rgb_slice)
m.d.sync += [ m.d.sync += [
counter.eq(0),
pixnum.eq(pixnum - 1), pixnum.eq(pixnum - 1),
pixrow.eq(0), pixrow.eq(0),
] ]
with m.If(pixnum == 0): with m.If(pixnum == 0):
m.next = "done" m.next = "done"
with m.If(c == 0b11):
m.d.sync += counter.eq(0)
else: else:
with m.If(counter[0] == 0): with m.If(counter[0] == 0):
m.d.sync += [ m.d.sync += [
@ -202,82 +248,6 @@ class Hub75DataDriver(wiring.Component):
return m return m
class Hub75StringDriver(wiring.Component):
"""A data driver for Hub75 panels. This accesses the line memory and feeds out the data.
It is controlled by a Hub75Coordinator to signal when it should run and what bit of the data
it should send.
"""
def __init__(self, panel_length: int = 128, *, src_loc_at=0):
self.panel_length = panel_length
super().__init__(
{
"bcm_select": In(3),
"done": Out(1),
"start": In(1),
"bram_port": In(
ReadPort.Signature(
addr_width=ceil_log2(panel_length),
shape=data.ArrayLayout(Rgb666Layout, 2),
)
),
"display_out": Out(Hub75Data()),
},
src_loc_at=src_loc_at,
)
def elaborate(self, platform: Platform) -> Module:
m = Module()
self._counter = counter = Signal(32) # unused count is optimized out
m.d.sync += counter.eq(counter + 1)
ram_rgb0_slice = Cat(
self.bram_port.data[0]["red"].bit_select(self.bcm_select, 1),
self.bram_port.data[0]["blue"].bit_select(self.bcm_select, 1),
self.bram_port.data[0]["green"].bit_select(self.bcm_select, 1),
)
ram_rgb1_slice = Cat(
self.bram_port.data[1]["red"].bit_select(self.bcm_select, 1),
self.bram_port.data[1]["blue"].bit_select(self.bcm_select, 1),
self.bram_port.data[1]["green"].bit_select(self.bcm_select, 1),
)
m.d.comb += [
self.display_out.rgb0.eq(ram_rgb0_slice),
self.display_out.rgb1.eq(ram_rgb1_slice),
]
m.d.sync += Assert(self.bcm_select < 6)
pixnum = Signal(range(self.panel_length), init=self.panel_length - 1)
m.d.comb += self.bram_port.addr.eq(pixnum)
with m.FSM():
with m.State("init"):
m.d.sync += [
self.done.eq(0),
counter.eq(0),
pixnum.eq(self.panel_length - 1),
]
with m.If(self.start == 1):
m.d.sync += self.bram_port.en.eq(1)
m.next = "writerow"
with m.State("writerow"):
with m.If(counter[0] == 0):
# do nothing
pass
with m.If((counter[0] == 1) & (pixnum != 0)):
m.d.sync += pixnum.eq(pixnum - 1)
with m.Else():
m.next = "done"
with m.State("done"):
m.d.sync += [self.done.eq(1), self.bram_port.en.eq(0)]
m.next = "init"
return m
class Hub75Coordinator(wiring.Component): class Hub75Coordinator(wiring.Component):
"""A shared-control hub75 driver""" """A shared-control hub75 driver"""
@ -306,7 +276,9 @@ class Hub75Coordinator(wiring.Component):
for i in range(self.n_strings): for i in range(self.n_strings):
sb = SwapBuffer(depth=128, shape=data.ArrayLayout(Rgb666Layout, 2)) sb = SwapBuffer(depth=128, shape=data.ArrayLayout(Rgb666Layout, 2))
bufs.append(sb) bufs.append(sb)
stringdriver = Hub75StringDriver(128) stringdriver = Hub75DataDriver(
128, data_shape=Rgb666Layout, double_fetch=False
)
strings.append(stringdriver) strings.append(stringdriver)
wiring.connect(m, sb.read_port, stringdriver.bram_port) wiring.connect(m, sb.read_port, stringdriver.bram_port)
m.d.comb += [ m.d.comb += [

View file

@ -0,0 +1,7 @@
import pytest
@pytest.fixture()
def simfixture(request: pytest.FixtureRequest, tmp_path):
pass

View file

@ -0,0 +1,23 @@
from amaranth import unsigned
import pytest
from groovylight.common import Rgb888Layout, Rgb666Layout, RGBView
def test_rgbview():
rgb = Rgb888Layout(0xAABBCC)
assert rgb.channel_size() == unsigned(8)
rgb18 = Rgb666Layout(0x2DEFD)
slice = rgb.channel_slice(1)
assert isinstance(slice, RGBView), "channel_slice should return another rgbview"
assert slice.channel_size() == unsigned(1), "channel_slice channel size should be 1"
assert isinstance(
rgb18.channel_slice(5), RGBView
), "channel_slice should return another rgbview"
with pytest.raises(ValueError, match="Target of a view is 0 bit"):
rgb.channel_slice(8)

View file

@ -4,37 +4,40 @@ from amaranth.lib.memory import Memory
from amaranth.sim import Simulator from amaranth.sim import Simulator
import pytest import pytest
from groovylight.common import Rgb888Layout from groovylight.common import Rgb888Layout, Rgb666Layout
from ..hub75 import Hub75Coordinator, Hub75DataDriver, Hub75StringDriver, Rgb666Layout from groovylight.hub75 import (
DisplayClock,
Hub75Coordinator,
Hub75DataDriver,
SwapBuffer,
)
def test_stringdriver(): def test_swapbuffer():
# the string driver test must dut = SwapBuffer(Rgb666Layout, 512)
# 1. finish sim = Simulator(dut)
# 2. strobe through all of the data in the array
# 3. slice the correct bit from the data.
m = Module()
m.submodules.dut = dut = Hub75StringDriver()
m.submodules.mem = mem = Memory(
shape=data.ArrayLayout(Rgb666Layout, 2), depth=128, init=[]
)
port = mem.read_port()
wiring.connect(m, port, dut.bram_port)
async def testbench(ctx):
# select a bit, strobe start, read values, test against known.
ctx.set(dut.bcm_select, 5)
ctx.set(dut.start, 1)
await ctx.tick()
ctx.set(dut.start, 0)
assert ctx.get(dut.bram_port.en) == 1
pass
sim = Simulator(m)
sim.add_clock(1e-6) sim.add_clock(1e-6)
async def testbench(ctx):
init_color = {"red": 0, "green": 0, "blue": 0}
test_color = {"red": 8, "green": 8, "blue": 8}
ctx.set(dut.selector, 0)
ctx.set(dut.write_port.addr, 1)
ctx.set(dut.read_port.addr, 1)
ctx.set(dut.write_port.data, test_color)
await ctx.tick()
# assert that the read port addr 1 = 0
assert ctx.get(dut.read_port.data) == init_color
# swap buffer
ctx.set(dut.selector, 1)
await ctx.tick().repeat(
2
) # takes two clocks after switching selector to output data.
assert ctx.get(dut.read_port.data) == test_color
# TODO: add more assertions/verification
sim.add_testbench(testbench)
with sim.write_vcd("output.vcd"): with sim.write_vcd("output.vcd"):
sim.run_until(1e-6 * 1000) sim.run_until(1e-6 * 1000)
@ -44,31 +47,57 @@ def test_datadriver():
# 1. finish # 1. finish
# 2. strobe through all of the data in the array # 2. strobe through all of the data in the array
# 3. slice the correct bit from the data. # 3. slice the correct bit from the data.
memdata = [{"red": x, "green": x, "blue": x} for x in range(256)]
m = Module() m = Module()
m.submodules.dut = dut = Hub75DataDriver() m.submodules.dut = dut = Hub75DataDriver()
m.submodules.mem = mem = Memory(shape=Rgb888Layout, depth=256, init=[]) m.submodules.mem = mem = Memory(shape=Rgb888Layout, depth=256, init=memdata)
m.submodules.clocker = clocker = DisplayClock()
m.d.comb += [
clocker.start.eq(dut.start),
clocker.done.eq(dut.done),
]
port = mem.read_port() port = mem.read_port()
wiring.connect(m, port, dut.bram) wiring.connect(m, port, dut.bram)
async def testbench(ctx): async def testbench(ctx):
# select a bit, strobe start, read values, test against known. # select a bit, strobe start, read values, test against known.
ctx.set(dut.bcm_select, 5) ctx.set(dut.bcm_select, 7)
ctx.set(dut.start, 1) ctx.set(dut.start, 1)
await ctx.tick() await ctx.tick()
ctx.set(dut.start, 0) ctx.set(dut.start, 0)
assert ctx.get(dut.bram_port.en) == 1 assert ctx.get(dut.bram.en) == 1
pass await ctx.tick().until(dut.done == 1)
async def rgbtest(ctx):
await ctx.tick().until(dut.start == 1)
counter = 127
bitslice = 7
async for _, rgb0, rgb1 in ctx.posedge(clocker.clk).sample(dut.data.rgb0, dut.data.rgb1):
assert counter >= 0, "should not do more than 128 clocks"
e0 = ctx.get(mem.data[counter << 1])
e1 = ctx.get(mem.data[(counter << 1) + 1])
print(counter)
for r, e in [(rgb0, e0), (rgb1, e1)]:
assert r.red == (e.red >> bitslice) & 1
assert r.green == (e.green >> bitslice) & 1
assert r.blue == (e.blue >> bitslice) & 1
counter = counter - 1
sim = Simulator(m) sim = Simulator(m)
sim.add_clock(1e-6) sim.add_clock(1e-6)
sim.add_testbench(testbench)
sim.add_testbench(rgbtest, background=True)
with sim.write_vcd("output.vcd"): with sim.write_vcd("output.vcd"):
sim.run_until(1e-6 * 1000) sim.run()
# sim.run_until(1e-6 * 4000)
@pytest.mark.skip() @pytest.mark.skip()
def test_hub75(): def test_hub75_coordinator():
m = Module() m = Module()
m.submodules.dut = dut = Hub75Coordinator(1) m.submodules.dut = dut = Hub75Coordinator(1)

View file

@ -1,31 +0,0 @@
from amaranth.sim import Simulator
from ..hub75 import Hub75StringDriver, Rgb666Layout, SwapBuffer
def test_swapbuffer():
dut = SwapBuffer(Rgb666Layout, 512)
sim = Simulator(dut)
sim.add_clock(1e-6)
async def testbench(ctx):
init_color = {"red": 0, "green": 0, "blue": 0}
test_color = {"red": 8, "green": 8, "blue": 8}
ctx.set(dut.selector, 0)
ctx.set(dut.write_port.addr, 1)
ctx.set(dut.read_port.addr, 1)
ctx.set(dut.write_port.data, test_color)
await ctx.tick()
# assert that the read port addr 1 = 0
assert ctx.get(dut.read_port.data) == init_color
# swap buffer
ctx.set(dut.selector, 1)
await ctx.tick().repeat(
2
) # takes two clocks after switching selector to output data.
assert ctx.get(dut.read_port.data) == test_color
# TODO: add more assertions/verification
sim.add_testbench(testbench)
with sim.write_vcd("output.vcd"):
sim.run_until(1e-6 * 1000)