make dual-mode data driver for hub75

add tests
This commit is contained in:
Saji 2024-09-28 14:34:38 -05:00
parent 2c88d73ded
commit 5ae230ac00
3 changed files with 200 additions and 11 deletions

View file

@ -14,6 +14,7 @@ class RGBLayout(data.StructLayout):
) )
Rgb888Layout = RGBLayout(8, 8, 8)
Rgb666Layout = RGBLayout(6, 6, 6) Rgb666Layout = RGBLayout(6, 6, 6)
Rgb111Layout = RGBLayout(1, 1, 1) Rgb111Layout = RGBLayout(1, 1, 1)

View file

@ -1,11 +1,11 @@
from amaranth import Module, Cat, Mux, ShapeLike, Signal, Assert from amaranth import Module, Cat, Mux, ShapeLike, Signal, Assert, Array
from amaranth.build import Platform from amaranth.build import Platform
from amaranth.lib import wiring, data from amaranth.lib import wiring, data
from amaranth.lib.wiring import In, Out from amaranth.lib.wiring import In, Out
from amaranth.lib.memory import Memory, ReadPort, WritePort from amaranth.lib.memory import Memory, ReadPort, WritePort
from amaranth.utils import ceil_log2 from amaranth.utils import ceil_log2
from .common import Rgb666Layout, Hub75Stream, Hub75Ctrl, Hub75Data from .common import Rgb666Layout, Hub75Stream, Hub75Ctrl, Hub75Data, Rgb888Layout
class SwapBuffer(wiring.Component): class SwapBuffer(wiring.Component):
@ -80,6 +80,128 @@ class SwapBuffer(wiring.Component):
return m return m
class Hub75DataDriver(wiring.Component):
def __init__(
self,
panel_length: int = 128,
data_shape=Rgb888Layout,
double_fetch=True,
*,
src_loc_at=0,
):
self.panel_length = panel_length
self.double_fetch = double_fetch
# if we're using double_fetch then each bram cell is exactly one color,
# if we're doing single fetch, it's 2 colors stacked.
bram_shape = data_shape if double_fetch else data.ArrayLayout(data_shape, 2)
addr_width = (
ceil_log2(panel_length * 2) if double_fetch else ceil_log2(panel_length)
)
super().__init__(
{
"bcm_select": In(ceil_log2(data_shape["red"].shape.width)),
"done": Out(1),
"start": In(1),
"bram": In(
ReadPort.Signature(
addr_width=addr_width,
shape=bram_shape,
)
),
"data": Out(Hub75Data()),
},
src_loc_at=src_loc_at,
)
def elaborate(self, platform: Platform) -> Module:
m = Module()
counter = Signal(32)
pixnum = Signal(range(self.panel_length), init=self.panel_length - 1)
if self.double_fetch:
pixrow = Signal(1)
m.d.comb += self.bram.addr.eq(Cat(pixrow, pixnum))
ram_rgb_slice = Cat(
self.bram.data["red"].bit_select(self.bcm_select, 1),
self.bram.data["blue"].bit_select(self.bcm_select, 1),
self.bram.data["green"].bit_select(self.bcm_select, 1),
)
else:
ram_rgb_slice = Array(
[
Cat(
self.bram.data[0]["red"].bit_select(self.bcm_select, 1),
self.bram.data[0]["blue"].bit_select(self.bcm_select, 1),
self.bram.data[0]["green"].bit_select(self.bcm_select, 1),
),
Cat(
self.bram.data[1]["red"].bit_select(self.bcm_select, 1),
self.bram.data[1]["blue"].bit_select(self.bcm_select, 1),
self.bram.data[1]["green"].bit_select(self.bcm_select, 1),
),
]
)
m.d.comb += self.bram.addr.eq(pixnum)
with m.FSM():
with m.State("init"):
m.d.sync += [
self.done.eq(0),
counter.eq(0),
pixnum.eq(pixnum.reset),
]
if self.double_fetch:
m.d.sync += pixrow.eq(0)
with m.If(self.start == 1):
m.d.sync += self.bram.en.eq(1)
m.next = "prefetch" if self.double_fetch else "writerow"
with m.State("prefetch"):
# TODO: do we need this
m.next = "writerow"
with m.State("writerow"):
if self.double_fetch:
c = counter[0:1]
with m.If(c == 0):
m.d.sync += [
self.data.rgb0.eq(ram_rgb_slice),
pixrow.eq(1),
]
with m.If(c == 1):
m.d.sync += self.data.rgb1.eq(ram_rgb_slice)
with m.If(c == 2):
pass
with m.If(c == 3):
m.d.sync += [
counter.eq(0),
pixnum.eq(pixnum - 1),
pixrow.eq(0),
]
with m.If(pixnum == 0):
m.next = "done"
else:
with m.If(counter[0] == 0):
m.d.sync += [
self.data.rgb0.eq(ram_rgb_slice[0]),
self.data.rgb1.eq(ram_rgb_slice[1]),
]
with m.If(counter[0] == 1):
m.d.sync += [
pixnum.eq(pixnum - 1),
counter.eq(0),
]
with m.If(pixnum == 0):
m.next = "done"
with m.State("done"):
m.d.sync += [self.done.eq(1), self.bram.en.eq(0)]
m.next = "init"
return m
class Hub75StringDriver(wiring.Component): class Hub75StringDriver(wiring.Component):
"""A data driver for Hub75 panels. This accesses the line memory and feeds out the data. """A data driver for Hub75 panels. This accesses the line memory and feeds out the data.
It is controlled by a Hub75Coordinator to signal when it should run and what bit of the data It is controlled by a Hub75Coordinator to signal when it should run and what bit of the data
@ -126,7 +248,7 @@ class Hub75StringDriver(wiring.Component):
] ]
m.d.sync += Assert(self.bcm_select < 6) m.d.sync += Assert(self.bcm_select < 6)
pixnum = Signal(ceil_log2(self.panel_length), init=self.panel_length - 1) pixnum = Signal(range(self.panel_length), init=self.panel_length - 1)
m.d.comb += self.bram_port.addr.eq(pixnum) m.d.comb += self.bram_port.addr.eq(pixnum)
with m.FSM(): with m.FSM():
@ -180,11 +302,12 @@ class Hub75Coordinator(wiring.Component):
donearr = [] donearr = []
startStrings = Signal(1) startStrings = Signal(1)
stringsDone = Signal(1) stringsDone = Signal(1)
for i in range(self.n_strings): for i in range(self.n_strings):
sb = SwapBuffer(depth=128, shape=data.ArrayLayout(Rgb666Layout, 2)) sb = SwapBuffer(depth=128, shape=data.ArrayLayout(Rgb666Layout, 2))
bufs += sb bufs.append(sb)
stringdriver = Hub75StringDriver(128) stringdriver = Hub75StringDriver(128)
strings += stringdriver strings.append(stringdriver)
wiring.connect(m, sb.read_port, stringdriver.bram_port) wiring.connect(m, sb.read_port, stringdriver.bram_port)
m.d.comb += [ m.d.comb += [
self.data[i].eq(stringdriver.display_out), self.data[i].eq(stringdriver.display_out),
@ -192,12 +315,41 @@ class Hub75Coordinator(wiring.Component):
sb.selector.eq(swapline), sb.selector.eq(swapline),
] ]
m.submodules += [sb, stringdriver] m.submodules += [sb, stringdriver]
donearr += stringdriver.done donearr.append(stringdriver.done)
# combine the done signals into one signal with AND-reduction
m.d.comb += stringsDone.eq(Cat(*donearr).all()) m.d.comb += stringsDone.eq(Cat(*donearr).all())
self.addr = Signal(5)
# handle the fetch side.
# WIP: pass in fetcher/pixgen/geometry.
# right now we assume that it's just one panel,
# address is (string_number, hi/lo, addr)
for i in range(self.n_strings):
lookup_addr = Cat(i, self.addr, 0)
# generate a sequence of transfers.
with m.FSM():
with m.State("init"):
# go to preload.
pass
with m.State("preload"):
pass
with m.State("run"):
pass
with m.State("swap"):
pass
return m return m
# fetch line
#
class Hub75EDriver(wiring.Component): class Hub75EDriver(wiring.Component):
"""An optimized driver for hub75 strings. """An optimized driver for hub75 strings.
This version is faster than most implementations by merging the exposure This version is faster than most implementations by merging the exposure

View file

@ -1,11 +1,12 @@
from amaranth import Array, Module, Cat, Signal, Assert, unsigned from amaranth import Module
from amaranth.build import Platform
from amaranth.lib import wiring, data from amaranth.lib import wiring, data
from amaranth.lib.wiring import In, Out from amaranth.lib.memory import Memory
from amaranth.lib.memory import Memory, WritePort
from amaranth.sim import Simulator from amaranth.sim import Simulator
import pytest
from ..hub75 import Hub75Coordinator, Hub75StringDriver, Rgb666Layout from groovylight.common import Rgb888Layout
from ..hub75 import Hub75Coordinator, Hub75DataDriver, Hub75StringDriver, Rgb666Layout
def test_stringdriver(): def test_stringdriver():
@ -38,6 +39,41 @@ def test_stringdriver():
sim.run_until(1e-6 * 1000) sim.run_until(1e-6 * 1000)
def test_datadriver():
# the string driver test must
# 1. finish
# 2. strobe through all of the data in the array
# 3. slice the correct bit from the data.
m = Module()
m.submodules.dut = dut = Hub75DataDriver()
m.submodules.mem = mem = Memory(shape=Rgb888Layout, depth=256, init=[])
port = mem.read_port()
wiring.connect(m, port, dut.bram)
async def testbench(ctx):
# select a bit, strobe start, read values, test against known.
ctx.set(dut.bcm_select, 5)
ctx.set(dut.start, 1)
await ctx.tick()
ctx.set(dut.start, 0)
assert ctx.get(dut.bram_port.en) == 1
pass
sim = Simulator(m)
sim.add_clock(1e-6)
with sim.write_vcd("output.vcd"):
sim.run_until(1e-6 * 1000)
@pytest.mark.skip()
def test_hub75(): def test_hub75():
m = Module() m = Module()
m.submodules.dut = dut = Hub75Coordinator(1) m.submodules.dut = dut = Hub75Coordinator(1)
sim = Simulator(m)
sim.add_clock(1e-6)
with sim.write_vcd("output.vcd"):
sim.run_until(1e-6 * 1000)