Compare commits

..

No commits in common. "7a0f59c9f6dcce25a8785bcbb0337e32640bce21" and "95ca2447cb47cf824eb296ac4a2611ec66b67afc" have entirely different histories.

6 changed files with 167 additions and 187 deletions

View file

@ -1,4 +1,4 @@
from amaranth import unsigned, Cat from amaranth import Array, unsigned
from amaranth.lib import wiring, data from amaranth.lib import wiring, data
from amaranth.lib.wiring import Out from amaranth.lib.wiring import Out
@ -13,9 +13,6 @@ class RGBLayout(data.StructLayout):
} }
) )
def __call__(self, value):
return RGBView(self, value)
Rgb888Layout = RGBLayout(8, 8, 8) Rgb888Layout = RGBLayout(8, 8, 8)
Rgb666Layout = RGBLayout(6, 6, 6) Rgb666Layout = RGBLayout(6, 6, 6)
@ -23,23 +20,6 @@ Rgb666Layout = RGBLayout(6, 6, 6)
Rgb111Layout = RGBLayout(1, 1, 1) Rgb111Layout = RGBLayout(1, 1, 1)
class RGBView(data.View):
def channel_size(self) -> int:
return self.red.shape()
def channel_slice(self, bit: int) -> Rgb111Layout:
"""Select bits from each channel and use it to form an Rgb111Layout.
This is useful for BCM stuff, since the bits are sliced to form a bitplane.
"""
return Rgb111Layout(
Cat(
self.red.bit_select(bit, 1),
self.green.bit_select(bit, 1),
self.blue.bit_select(bit, 1),
)
)
class Hub75Stream(wiring.Signature): class Hub75Stream(wiring.Signature):
"""A Hub75E Driver for a single string of panels.""" """A Hub75E Driver for a single string of panels."""

View file

@ -1,4 +1,4 @@
from amaranth import Module, Cat, Mux, Print, ShapeLike, Signal, Assert, Array from amaranth import Module, Cat, Mux, ShapeLike, Signal, Assert, Array
from amaranth.build import Platform from amaranth.build import Platform
from amaranth.lib import wiring, data from amaranth.lib import wiring, data
from amaranth.lib.wiring import In, Out from amaranth.lib.wiring import In, Out
@ -80,63 +80,6 @@ class SwapBuffer(wiring.Component):
return m return m
class DisplayClock(wiring.Component):
"""Generates the display clock automatically with the correct delay
when start is asserted. Stops when `done` is strobed.
Can either be used as a /2 clock divider or a /4 with the `double_fetch` parameter.
The startup delay is chosen based on the mode or can be set with `startup_delay`
"""
def __init__(self, *, double_fetch: bool = True, startup_delay=None, src_loc_at=0):
self.double_fetch = double_fetch
if startup_delay is None:
self.startup_delay = 4 if double_fetch else 1 # FIXME: choose right values.
else:
self.startup_delay = startup_delay
super().__init__(
{
"start": In(1),
"done": In(1),
"clk": Out(1),
},
src_loc_at=src_loc_at,
)
def elaborate(self, platform: Platform) -> Module:
m = Module()
counter = Signal(range(max(self.startup_delay, 2) + 1))
with m.FSM():
with m.State("init"):
m.d.sync += [counter.eq(0), self.clk.eq(0)]
with m.If(self.start == 1):
m.next = "warmup"
with m.State("warmup"):
m.d.sync += counter.eq(counter + 1)
with m.If(counter == self.startup_delay - 1):
m.next = "run"
m.d.sync += counter.eq(0)
with m.State("run"):
with m.If(self.done == 1):
m.next = "init"
if self.double_fetch:
m.d.sync += [
self.clk.eq(~counter[1]),
counter.eq(counter + 1),
]
else:
m.d.sync += [
self.clk.eq(~counter),
counter.eq(~counter),
]
return m
class Hub75DataDriver(wiring.Component): class Hub75DataDriver(wiring.Component):
def __init__( def __init__(
self, self,
@ -174,28 +117,40 @@ class Hub75DataDriver(wiring.Component):
m = Module() m = Module()
counter = Signal(32) counter = Signal(32)
m.d.sync += counter.eq(counter + 1)
pixnum = Signal(range(self.panel_length), init=self.panel_length - 1) pixnum = Signal(range(self.panel_length), init=self.panel_length - 1)
if self.double_fetch: if self.double_fetch:
pixrow = Signal(1) pixrow = Signal(1)
m.d.comb += self.bram.addr.eq(Cat(pixrow, pixnum)) m.d.comb += self.bram.addr.eq(Cat(pixrow, pixnum))
ram_rgb_slice = self.bram.data.channel_slice(self.bcm_select) ram_rgb_slice = Cat(
self.bram.data["red"].bit_select(self.bcm_select, 1),
self.bram.data["blue"].bit_select(self.bcm_select, 1),
self.bram.data["green"].bit_select(self.bcm_select, 1),
)
else: else:
m.d.comb += self.bram.addr.eq(pixnum)
ram_rgb_slice = Array( ram_rgb_slice = Array(
[ [
self.bram.data[0].channel_slice(self.bcm_select), Cat(
self.bram.data[1].channel_slice(self.bcm_select), self.bram.data[0]["red"].bit_select(self.bcm_select, 1),
self.bram.data[0]["blue"].bit_select(self.bcm_select, 1),
self.bram.data[0]["green"].bit_select(self.bcm_select, 1),
),
Cat(
self.bram.data[1]["red"].bit_select(self.bcm_select, 1),
self.bram.data[1]["blue"].bit_select(self.bcm_select, 1),
self.bram.data[1]["green"].bit_select(self.bcm_select, 1),
),
] ]
) )
m.d.comb += self.bram.addr.eq(pixnum)
with m.FSM(): with m.FSM():
with m.State("init"): with m.State("init"):
m.d.sync += [ m.d.sync += [
self.done.eq(0), self.done.eq(0),
counter.eq(0), counter.eq(0),
pixnum.eq(pixnum.init), pixnum.eq(pixnum.reset),
] ]
if self.double_fetch: if self.double_fetch:
m.d.sync += pixrow.eq(0) m.d.sync += pixrow.eq(0)
@ -203,31 +158,30 @@ class Hub75DataDriver(wiring.Component):
m.d.sync += self.bram.en.eq(1) m.d.sync += self.bram.en.eq(1)
m.next = "prefetch" if self.double_fetch else "writerow" m.next = "prefetch" if self.double_fetch else "writerow"
with m.State("prefetch"): with m.State("prefetch"):
# Allow the BRAM to settle after being enabled. # TODO: do we need this
m.d.sync += counter.eq(0)
m.next = "writerow" m.next = "writerow"
with m.State("writerow"): with m.State("writerow"):
if self.double_fetch: if self.double_fetch:
c = counter[0:2] c = counter[0:1]
with m.If(c == 0b0): with m.If(c == 0):
m.d.sync += [ m.d.sync += [
self.data.rgb0.eq(ram_rgb_slice), self.data.rgb0.eq(ram_rgb_slice),
pixrow.eq(1), pixrow.eq(1),
] ]
with m.If(c == 0b01): with m.If(c == 1):
pass
with m.If(c == 0b10):
m.d.sync += self.data.rgb1.eq(ram_rgb_slice) m.d.sync += self.data.rgb1.eq(ram_rgb_slice)
with m.If(c == 2):
pass
with m.If(c == 3):
m.d.sync += [ m.d.sync += [
counter.eq(0),
pixnum.eq(pixnum - 1), pixnum.eq(pixnum - 1),
pixrow.eq(0), pixrow.eq(0),
] ]
with m.If(pixnum == 0): with m.If(pixnum == 0):
m.next = "done" m.next = "done"
with m.If(c == 0b11):
m.d.sync += counter.eq(0)
else: else:
with m.If(counter[0] == 0): with m.If(counter[0] == 0):
m.d.sync += [ m.d.sync += [
@ -248,6 +202,82 @@ class Hub75DataDriver(wiring.Component):
return m return m
class Hub75StringDriver(wiring.Component):
"""A data driver for Hub75 panels. This accesses the line memory and feeds out the data.
It is controlled by a Hub75Coordinator to signal when it should run and what bit of the data
it should send.
"""
def __init__(self, panel_length: int = 128, *, src_loc_at=0):
self.panel_length = panel_length
super().__init__(
{
"bcm_select": In(3),
"done": Out(1),
"start": In(1),
"bram_port": In(
ReadPort.Signature(
addr_width=ceil_log2(panel_length),
shape=data.ArrayLayout(Rgb666Layout, 2),
)
),
"display_out": Out(Hub75Data()),
},
src_loc_at=src_loc_at,
)
def elaborate(self, platform: Platform) -> Module:
m = Module()
self._counter = counter = Signal(32) # unused count is optimized out
m.d.sync += counter.eq(counter + 1)
ram_rgb0_slice = Cat(
self.bram_port.data[0]["red"].bit_select(self.bcm_select, 1),
self.bram_port.data[0]["blue"].bit_select(self.bcm_select, 1),
self.bram_port.data[0]["green"].bit_select(self.bcm_select, 1),
)
ram_rgb1_slice = Cat(
self.bram_port.data[1]["red"].bit_select(self.bcm_select, 1),
self.bram_port.data[1]["blue"].bit_select(self.bcm_select, 1),
self.bram_port.data[1]["green"].bit_select(self.bcm_select, 1),
)
m.d.comb += [
self.display_out.rgb0.eq(ram_rgb0_slice),
self.display_out.rgb1.eq(ram_rgb1_slice),
]
m.d.sync += Assert(self.bcm_select < 6)
pixnum = Signal(range(self.panel_length), init=self.panel_length - 1)
m.d.comb += self.bram_port.addr.eq(pixnum)
with m.FSM():
with m.State("init"):
m.d.sync += [
self.done.eq(0),
counter.eq(0),
pixnum.eq(self.panel_length - 1),
]
with m.If(self.start == 1):
m.d.sync += self.bram_port.en.eq(1)
m.next = "writerow"
with m.State("writerow"):
with m.If(counter[0] == 0):
# do nothing
pass
with m.If((counter[0] == 1) & (pixnum != 0)):
m.d.sync += pixnum.eq(pixnum - 1)
with m.Else():
m.next = "done"
with m.State("done"):
m.d.sync += [self.done.eq(1), self.bram_port.en.eq(0)]
m.next = "init"
return m
class Hub75Coordinator(wiring.Component): class Hub75Coordinator(wiring.Component):
"""A shared-control hub75 driver""" """A shared-control hub75 driver"""
@ -276,9 +306,7 @@ class Hub75Coordinator(wiring.Component):
for i in range(self.n_strings): for i in range(self.n_strings):
sb = SwapBuffer(depth=128, shape=data.ArrayLayout(Rgb666Layout, 2)) sb = SwapBuffer(depth=128, shape=data.ArrayLayout(Rgb666Layout, 2))
bufs.append(sb) bufs.append(sb)
stringdriver = Hub75DataDriver( stringdriver = Hub75StringDriver(128)
128, data_shape=Rgb666Layout, double_fetch=False
)
strings.append(stringdriver) strings.append(stringdriver)
wiring.connect(m, sb.read_port, stringdriver.bram_port) wiring.connect(m, sb.read_port, stringdriver.bram_port)
m.d.comb += [ m.d.comb += [

View file

@ -1,7 +0,0 @@
import pytest
@pytest.fixture()
def simfixture(request: pytest.FixtureRequest, tmp_path):
pass

View file

@ -1,23 +0,0 @@
from amaranth import unsigned
import pytest
from groovylight.common import Rgb888Layout, Rgb666Layout, RGBView
def test_rgbview():
rgb = Rgb888Layout(0xAABBCC)
assert rgb.channel_size() == unsigned(8)
rgb18 = Rgb666Layout(0x2DEFD)
slice = rgb.channel_slice(1)
assert isinstance(slice, RGBView), "channel_slice should return another rgbview"
assert slice.channel_size() == unsigned(1), "channel_slice channel size should be 1"
assert isinstance(
rgb18.channel_slice(5), RGBView
), "channel_slice should return another rgbview"
with pytest.raises(ValueError, match="Target of a view is 0 bit"):
rgb.channel_slice(8)

View file

@ -4,40 +4,37 @@ from amaranth.lib.memory import Memory
from amaranth.sim import Simulator from amaranth.sim import Simulator
import pytest import pytest
from groovylight.common import Rgb888Layout, Rgb666Layout from groovylight.common import Rgb888Layout
from groovylight.hub75 import ( from ..hub75 import Hub75Coordinator, Hub75DataDriver, Hub75StringDriver, Rgb666Layout
DisplayClock,
Hub75Coordinator,
Hub75DataDriver, def test_stringdriver():
SwapBuffer, # the string driver test must
# 1. finish
# 2. strobe through all of the data in the array
# 3. slice the correct bit from the data.
m = Module()
m.submodules.dut = dut = Hub75StringDriver()
m.submodules.mem = mem = Memory(
shape=data.ArrayLayout(Rgb666Layout, 2), depth=128, init=[]
) )
port = mem.read_port()
wiring.connect(m, port, dut.bram_port)
def test_swapbuffer():
dut = SwapBuffer(Rgb666Layout, 512)
sim = Simulator(dut)
sim.add_clock(1e-6)
async def testbench(ctx): async def testbench(ctx):
init_color = {"red": 0, "green": 0, "blue": 0} # select a bit, strobe start, read values, test against known.
test_color = {"red": 8, "green": 8, "blue": 8} ctx.set(dut.bcm_select, 5)
ctx.set(dut.selector, 0) ctx.set(dut.start, 1)
ctx.set(dut.write_port.addr, 1)
ctx.set(dut.read_port.addr, 1)
ctx.set(dut.write_port.data, test_color)
await ctx.tick() await ctx.tick()
# assert that the read port addr 1 = 0 ctx.set(dut.start, 0)
assert ctx.get(dut.read_port.data) == init_color assert ctx.get(dut.bram_port.en) == 1
# swap buffer pass
ctx.set(dut.selector, 1)
await ctx.tick().repeat( sim = Simulator(m)
2 sim.add_clock(1e-6)
) # takes two clocks after switching selector to output data.
assert ctx.get(dut.read_port.data) == test_color
# TODO: add more assertions/verification
sim.add_testbench(testbench)
with sim.write_vcd("output.vcd"): with sim.write_vcd("output.vcd"):
sim.run_until(1e-6 * 1000) sim.run_until(1e-6 * 1000)
@ -47,57 +44,31 @@ def test_datadriver():
# 1. finish # 1. finish
# 2. strobe through all of the data in the array # 2. strobe through all of the data in the array
# 3. slice the correct bit from the data. # 3. slice the correct bit from the data.
memdata = [{"red": x, "green": x, "blue": x} for x in range(256)]
m = Module() m = Module()
m.submodules.dut = dut = Hub75DataDriver() m.submodules.dut = dut = Hub75DataDriver()
m.submodules.mem = mem = Memory(shape=Rgb888Layout, depth=256, init=memdata) m.submodules.mem = mem = Memory(shape=Rgb888Layout, depth=256, init=[])
m.submodules.clocker = clocker = DisplayClock()
m.d.comb += [
clocker.start.eq(dut.start),
clocker.done.eq(dut.done),
]
port = mem.read_port() port = mem.read_port()
wiring.connect(m, port, dut.bram) wiring.connect(m, port, dut.bram)
async def testbench(ctx): async def testbench(ctx):
# select a bit, strobe start, read values, test against known. # select a bit, strobe start, read values, test against known.
ctx.set(dut.bcm_select, 7) ctx.set(dut.bcm_select, 5)
ctx.set(dut.start, 1) ctx.set(dut.start, 1)
await ctx.tick() await ctx.tick()
ctx.set(dut.start, 0) ctx.set(dut.start, 0)
assert ctx.get(dut.bram.en) == 1 assert ctx.get(dut.bram_port.en) == 1
await ctx.tick().until(dut.done == 1) pass
async def rgbtest(ctx):
await ctx.tick().until(dut.start == 1)
counter = 127
bitslice = 7
async for _, rgb0, rgb1 in ctx.posedge(clocker.clk).sample(dut.data.rgb0, dut.data.rgb1):
assert counter >= 0, "should not do more than 128 clocks"
e0 = ctx.get(mem.data[counter << 1])
e1 = ctx.get(mem.data[(counter << 1) + 1])
print(counter)
for r, e in [(rgb0, e0), (rgb1, e1)]:
assert r.red == (e.red >> bitslice) & 1
assert r.green == (e.green >> bitslice) & 1
assert r.blue == (e.blue >> bitslice) & 1
counter = counter - 1
sim = Simulator(m) sim = Simulator(m)
sim.add_clock(1e-6) sim.add_clock(1e-6)
sim.add_testbench(testbench)
sim.add_testbench(rgbtest, background=True)
with sim.write_vcd("output.vcd"): with sim.write_vcd("output.vcd"):
sim.run() sim.run_until(1e-6 * 1000)
# sim.run_until(1e-6 * 4000)
@pytest.mark.skip() @pytest.mark.skip()
def test_hub75_coordinator(): def test_hub75():
m = Module() m = Module()
m.submodules.dut = dut = Hub75Coordinator(1) m.submodules.dut = dut = Hub75Coordinator(1)

View file

@ -0,0 +1,31 @@
from amaranth.sim import Simulator
from ..hub75 import Hub75StringDriver, Rgb666Layout, SwapBuffer
def test_swapbuffer():
dut = SwapBuffer(Rgb666Layout, 512)
sim = Simulator(dut)
sim.add_clock(1e-6)
async def testbench(ctx):
init_color = {"red": 0, "green": 0, "blue": 0}
test_color = {"red": 8, "green": 8, "blue": 8}
ctx.set(dut.selector, 0)
ctx.set(dut.write_port.addr, 1)
ctx.set(dut.read_port.addr, 1)
ctx.set(dut.write_port.data, test_color)
await ctx.tick()
# assert that the read port addr 1 = 0
assert ctx.get(dut.read_port.data) == init_color
# swap buffer
ctx.set(dut.selector, 1)
await ctx.tick().repeat(
2
) # takes two clocks after switching selector to output data.
assert ctx.get(dut.read_port.data) == test_color
# TODO: add more assertions/verification
sim.add_testbench(testbench)
with sim.write_vcd("output.vcd"):
sim.run_until(1e-6 * 1000)