Compare commits

..

2 commits

Author SHA1 Message Date
saji 5f54b8acd8 fetcher basic
All checks were successful
Unit Tests / Test (push) Successful in 2m8s
doesn't support variable functions
2024-10-28 16:54:00 -05:00
saji 6b63b17bb8 working address stream generator 2024-10-21 18:23:23 -05:00
3 changed files with 177 additions and 19 deletions

View file

@ -3,23 +3,22 @@
# during operation, it is given a row index, and responds with the data. # during operation, it is given a row index, and responds with the data.
from amaranth import Module, Cat, Mux, ShapeLike, Signal, Assert, Array, unsigned from amaranth import Module, Signal, unsigned, Cat
from amaranth.build import Platform from amaranth.build import Platform
from amaranth.lib import wiring, data from amaranth.lib import wiring, data
from amaranth.lib.wiring import In, Out from amaranth.lib.wiring import In, Out
from amaranth.lib.memory import Memory, ReadPort, WritePort
from amaranth.lib import stream from amaranth.lib import stream
from amaranth.utils import ceil_log2
import logging import logging
from .common import Rgb666Layout, Hub75Stream, Hub75Ctrl, Hub75Data, Rgb888Layout from .common import Rgb888Layout
from .geom import DisplayRotation, DisplayString from .geom import DisplayString
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
CoordLayout = data.StructLayout({"x": unsigned(32), "y": unsigned(32)}) # FIXME: sizing should be based off of screen size.
CoordLayout = data.StructLayout({"x": unsigned(10), "y": unsigned(10)})
class AddressConverter(wiring.Component): class AddressConverter(wiring.Component):
@ -31,7 +30,7 @@ class AddressConverter(wiring.Component):
{ {
"input_x": In(geom.dimensions.length), "input_x": In(geom.dimensions.length),
"addr": In(unsigned(geom.dimensions.addr_bits)), "addr": In(unsigned(geom.dimensions.addr_bits)),
"output": Out(CoordLayout).array(geom.dimensions.mux), "output": Out(data.ArrayLayout(CoordLayout, geom.dimensions.mux)),
}, },
src_loc_at=src_loc_at, src_loc_at=src_loc_at,
) )
@ -40,9 +39,9 @@ class AddressConverter(wiring.Component):
m = Module() m = Module()
for mux in range(self.geom.dimensions.mux): for mux in range(self.geom.dimensions.mux):
m.d.comb += self.output[mux].eq( o = self.geom.translate_coord(self.input_x, self.addr, mux)
self.geom.translate_coord(self.input_x, self.addr, mux) m.d.comb += self.output[mux]["x"].eq(o["x"])
) m.d.comb += self.output[mux]["y"].eq(o["y"])
return m return m
@ -54,7 +53,8 @@ class AddressGenerator(wiring.Component):
self.geom = geom self.geom = geom
super().__init__( super().__init__(
{ {
"coordstream": Out( stream.Signature(CoordLayout.array(geom.dimensions.mux)) "coordstream": Out(
stream.Signature(data.ArrayLayout(CoordLayout, geom.dimensions.mux))
), ),
"start": In(1), "start": In(1),
"done": Out(1), "done": Out(1),
@ -68,22 +68,37 @@ class AddressGenerator(wiring.Component):
counter = Signal(self.geom.dimensions.length) counter = Signal(self.geom.dimensions.length)
addr = Signal(self.addr.shape())
# based on the geometry we generate x,y pairs. # based on the geometry we generate x,y pairs.
m.submodules.translate = translate = AddressConverter(self.geom) m.submodules.translate = translate = AddressConverter(self.geom)
m.d.comb += translate.input_x.eq(counter)
m.d.comb += translate.addr.eq(addr)
m.d.comb += self.coordstream.payload.eq(translate.output)
with m.FSM(): with m.FSM():
with m.State("init"): with m.State("init"):
m.d.comb += self.done.eq(0) m.d.comb += [self.done.eq(0), self.coordstream.valid.eq(0)]
m.d.sync += counter.eq(0) m.d.sync += [counter.eq(0), addr.eq(self.addr)]
m.d.comb += self.coordstream.valid.eq(0)
with m.If(self.start): with m.If(self.start):
m.next = "run" m.next = "run"
with m.State("run"): with m.State("run"):
m.d.comb += self.coordstream.valid.eq(1)
# stream data out as long as it's valid. # stream data out as long as it's valid.
with m.If(
self.coordstream.ready
& (counter == self.geom.dimensions.length - 1)
):
m.next = "done"
with m.Elif(self.coordstream.ready):
m.d.sync += counter.eq(counter + 1)
pass pass
with m.State("done"): with m.State("done"):
m.d.comb += self.coordstream.valid.eq(0)
m.d.comb += self.done.eq(1) m.d.comb += self.done.eq(1)
m.next = "init" m.next = "init"
@ -91,17 +106,42 @@ class AddressGenerator(wiring.Component):
class BasicFetcher(wiring.Component): class BasicFetcher(wiring.Component):
"""A generic fetcher. Takes a function of the form f(x,y: int) -> RGB.""" """A generic function-based fetcher. Takes a function of the form f(x,y: int) -> RGB."""
def __init__( def __init__(
self, geom: DisplayString, dfunc, data_shape=Rgb888Layout, *, src_loc_at=0 self, geom: DisplayString, dfunc, data_shape=Rgb888Layout, *, src_loc_at=0
): ):
self.geom = geom
self.dfunc = dfunc self.dfunc = dfunc
super().__init__( super().__init__(
{ {
"pixstream": Out(stream.Signature(Rgb888Layout)), "input": In(
"start": In(1), stream.Signature(data.ArrayLayout(CoordLayout, geom.dimensions.mux))
"addr": In(geom.dimensions.addr_bits), ),
"pixstream": Out(
stream.Signature(data.ArrayLayout(data_shape, geom.dimensions.mux))
),
}, },
src_loc_at=src_loc_at, src_loc_at=src_loc_at,
) )
def elaborate(self, platform: Platform) -> Module:
m = Module()
# test mode - pass through, r = x + y, g = x - y, b = {y,x}
colors = self.pixstream.payload
m.d.comb += [
self.input.valid.eq(self.pixstream.valid),
self.input.ready.eq(self.pixstream.ready),
]
for i in range(self.geom.dimensions.mux):
inp = self.input.payload[i]
m.d.comb += [
colors[i].red.eq(inp.x + inp.y),
colors[i].green.eq(inp.x - inp.y),
colors[i].blue.eq(inp.x ^ inp.y),
]
return m

View file

@ -0,0 +1,119 @@
from amaranth.lib import wiring, data
from amaranth.sim import Simulator
import random
from random import randrange
import pytest
from groovylight.fetcher import AddressConverter, AddressGenerator, BasicFetcher
from groovylight.geom import DisplayString, Coord, DisplayDimensions, DisplayRotation
ds_testdata = [
(DisplayRotation.R0, (0, 0), {"x": 3, "y": 0}),
(DisplayRotation.R0, (40, 2), {"x": 43, "y": 2}),
(DisplayRotation.R90, (40, 0), {"x": 66, "y": 40}),
(DisplayRotation.R90, (120, 2), {"x": 64, "y": 120}),
]
@pytest.mark.parametrize("rot, inp, expected", ds_testdata)
def test_converter(rot, inp, expected):
ds = DisplayString(Coord(3, 0), DisplayDimensions(128, 64), rot)
dut = AddressConverter(ds)
sim = Simulator(dut)
async def testbench(ctx):
await ctx.delay(1e-6)
ctx.set(dut.input_x, inp[0])
ctx.set(dut.addr, inp[1])
await ctx.delay(1e-6)
assert ctx.get(dut.output)[0]["x"] == expected["x"]
assert ctx.get(dut.output)[0]["y"] == expected["y"]
sim.add_testbench(testbench)
with sim.write_vcd("output.vcd"):
sim.run()
# Helper functions for stream management
async def stream_get(ctx, stream):
ctx.set(stream.ready, 1)
(payload,) = await ctx.tick().sample(stream.payload).until(stream.valid)
ctx.set(stream.ready, 0)
return payload
async def stream_put(ctx, stream, payload):
ctx.set(stream.valid, 1)
ctx.set(stream.payload, payload)
await ctx.tick().until(stream.ready)
ctx.set(stream.valid, 0)
generator_tests = [
(0, DisplayRotation.R0),
(0, DisplayRotation.R90),
(4, DisplayRotation.R90),
]
@pytest.mark.parametrize("addr, rot", generator_tests)
def test_generator(addr, rot):
ds = DisplayString(Coord(3, 0), DisplayDimensions(128, 64), rot)
dut = AddressGenerator(ds)
sim = Simulator(dut)
sim.add_clock(1e-6)
async def runner(ctx):
# TODO: set inputs
ctx.set(dut.addr, addr)
await ctx.tick()
ctx.set(dut.start, 1)
await ctx.tick()
ctx.set(dut.start, 0)
await ctx.tick().until(dut.done == 1)
expected = [
[ds.translate_coord(x, addr, 0), ds.translate_coord(x, addr, 1)]
for x in range(128)
]
expected.reverse()
async def stream_checker(ctx):
while ctx.get(dut.done) == 0:
payload = await stream_get(ctx, dut.coordstream)
assert expected.pop() == payload
sim.add_testbench(runner)
sim.add_testbench(stream_checker)
with sim.write_vcd("generator.vcd"):
sim.run()
basicFetcher_tests = [
({"x": 0, "y": 0}, {"red": 0, "green": 0, "blue": 0}),
({"x": 1, "y": 1}, {"red": 2, "green": 0, "blue": 0}),
({"x": 9, "y": 1}, {"red": 10, "green": 8, "blue": 8}),
]
@pytest.mark.parametrize("inp, expected", basicFetcher_tests)
def test_basic_fetcher(inp, expected):
ds = DisplayString(
Coord(3, 0), DisplayDimensions(128, 64, mux=1), DisplayRotation.R0
)
dut = BasicFetcher(ds, None)
sim = Simulator(dut)
async def test(ctx):
ctx.set(dut.input.payload[0], inp)
res = ctx.get(dut.pixstream.payload)[0]
assert res["red"] == expected["red"]
assert res["green"] == expected["green"]
assert res["blue"] == expected["blue"]
sim.add_testbench(test)
with sim.write_vcd("fetcher.vcd"):
sim.run()

View file

@ -146,7 +146,6 @@ def test_datadriver_single(bcm):
with sim.write_vcd("output.vcd"): with sim.write_vcd("output.vcd"):
sim.run() sim.run()
@pytest.mark.skip() @pytest.mark.skip()
def test_hub75_coordinator(): def test_hub75_coordinator():
m = Module() m = Module()