Compare commits

...

2 commits

Author SHA1 Message Date
saji 5f54b8acd8 fetcher basic
All checks were successful
Unit Tests / Test (push) Successful in 2m8s
doesn't support variable functions
2024-10-28 16:54:00 -05:00
saji 6b63b17bb8 working address stream generator 2024-10-21 18:23:23 -05:00
3 changed files with 177 additions and 19 deletions

View file

@ -3,23 +3,22 @@
# during operation, it is given a row index, and responds with the data.
from amaranth import Module, Cat, Mux, ShapeLike, Signal, Assert, Array, unsigned
from amaranth import Module, Signal, unsigned, Cat
from amaranth.build import Platform
from amaranth.lib import wiring, data
from amaranth.lib.wiring import In, Out
from amaranth.lib.memory import Memory, ReadPort, WritePort
from amaranth.lib import stream
from amaranth.utils import ceil_log2
import logging
from .common import Rgb666Layout, Hub75Stream, Hub75Ctrl, Hub75Data, Rgb888Layout
from .geom import DisplayRotation, DisplayString
from .common import Rgb888Layout
from .geom import DisplayString
logger = logging.getLogger(__name__)
CoordLayout = data.StructLayout({"x": unsigned(32), "y": unsigned(32)})
# FIXME: sizing should be based off of screen size.
CoordLayout = data.StructLayout({"x": unsigned(10), "y": unsigned(10)})
class AddressConverter(wiring.Component):
@ -31,7 +30,7 @@ class AddressConverter(wiring.Component):
{
"input_x": In(geom.dimensions.length),
"addr": In(unsigned(geom.dimensions.addr_bits)),
"output": Out(CoordLayout).array(geom.dimensions.mux),
"output": Out(data.ArrayLayout(CoordLayout, geom.dimensions.mux)),
},
src_loc_at=src_loc_at,
)
@ -40,9 +39,9 @@ class AddressConverter(wiring.Component):
m = Module()
for mux in range(self.geom.dimensions.mux):
m.d.comb += self.output[mux].eq(
self.geom.translate_coord(self.input_x, self.addr, mux)
)
o = self.geom.translate_coord(self.input_x, self.addr, mux)
m.d.comb += self.output[mux]["x"].eq(o["x"])
m.d.comb += self.output[mux]["y"].eq(o["y"])
return m
@ -54,7 +53,8 @@ class AddressGenerator(wiring.Component):
self.geom = geom
super().__init__(
{
"coordstream": Out( stream.Signature(CoordLayout.array(geom.dimensions.mux))
"coordstream": Out(
stream.Signature(data.ArrayLayout(CoordLayout, geom.dimensions.mux))
),
"start": In(1),
"done": Out(1),
@ -68,22 +68,37 @@ class AddressGenerator(wiring.Component):
counter = Signal(self.geom.dimensions.length)
addr = Signal(self.addr.shape())
# based on the geometry we generate x,y pairs.
m.submodules.translate = translate = AddressConverter(self.geom)
m.d.comb += translate.input_x.eq(counter)
m.d.comb += translate.addr.eq(addr)
m.d.comb += self.coordstream.payload.eq(translate.output)
with m.FSM():
with m.State("init"):
m.d.comb += self.done.eq(0)
m.d.sync += counter.eq(0)
m.d.comb += self.coordstream.valid.eq(0)
m.d.comb += [self.done.eq(0), self.coordstream.valid.eq(0)]
m.d.sync += [counter.eq(0), addr.eq(self.addr)]
with m.If(self.start):
m.next = "run"
with m.State("run"):
m.d.comb += self.coordstream.valid.eq(1)
# stream data out as long as it's valid.
with m.If(
self.coordstream.ready
& (counter == self.geom.dimensions.length - 1)
):
m.next = "done"
with m.Elif(self.coordstream.ready):
m.d.sync += counter.eq(counter + 1)
pass
with m.State("done"):
m.d.comb += self.coordstream.valid.eq(0)
m.d.comb += self.done.eq(1)
m.next = "init"
@ -91,17 +106,42 @@ class AddressGenerator(wiring.Component):
class BasicFetcher(wiring.Component):
"""A generic fetcher. Takes a function of the form f(x,y: int) -> RGB."""
"""A generic function-based fetcher. Takes a function of the form f(x,y: int) -> RGB."""
def __init__(
self, geom: DisplayString, dfunc, data_shape=Rgb888Layout, *, src_loc_at=0
):
self.geom = geom
self.dfunc = dfunc
super().__init__(
{
"pixstream": Out(stream.Signature(Rgb888Layout)),
"start": In(1),
"addr": In(geom.dimensions.addr_bits),
"input": In(
stream.Signature(data.ArrayLayout(CoordLayout, geom.dimensions.mux))
),
"pixstream": Out(
stream.Signature(data.ArrayLayout(data_shape, geom.dimensions.mux))
),
},
src_loc_at=src_loc_at,
)
def elaborate(self, platform: Platform) -> Module:
m = Module()
# test mode - pass through, r = x + y, g = x - y, b = {y,x}
colors = self.pixstream.payload
m.d.comb += [
self.input.valid.eq(self.pixstream.valid),
self.input.ready.eq(self.pixstream.ready),
]
for i in range(self.geom.dimensions.mux):
inp = self.input.payload[i]
m.d.comb += [
colors[i].red.eq(inp.x + inp.y),
colors[i].green.eq(inp.x - inp.y),
colors[i].blue.eq(inp.x ^ inp.y),
]
return m

View file

@ -0,0 +1,119 @@
from amaranth.lib import wiring, data
from amaranth.sim import Simulator
import random
from random import randrange
import pytest
from groovylight.fetcher import AddressConverter, AddressGenerator, BasicFetcher
from groovylight.geom import DisplayString, Coord, DisplayDimensions, DisplayRotation
ds_testdata = [
(DisplayRotation.R0, (0, 0), {"x": 3, "y": 0}),
(DisplayRotation.R0, (40, 2), {"x": 43, "y": 2}),
(DisplayRotation.R90, (40, 0), {"x": 66, "y": 40}),
(DisplayRotation.R90, (120, 2), {"x": 64, "y": 120}),
]
@pytest.mark.parametrize("rot, inp, expected", ds_testdata)
def test_converter(rot, inp, expected):
ds = DisplayString(Coord(3, 0), DisplayDimensions(128, 64), rot)
dut = AddressConverter(ds)
sim = Simulator(dut)
async def testbench(ctx):
await ctx.delay(1e-6)
ctx.set(dut.input_x, inp[0])
ctx.set(dut.addr, inp[1])
await ctx.delay(1e-6)
assert ctx.get(dut.output)[0]["x"] == expected["x"]
assert ctx.get(dut.output)[0]["y"] == expected["y"]
sim.add_testbench(testbench)
with sim.write_vcd("output.vcd"):
sim.run()
# Helper functions for stream management
async def stream_get(ctx, stream):
ctx.set(stream.ready, 1)
(payload,) = await ctx.tick().sample(stream.payload).until(stream.valid)
ctx.set(stream.ready, 0)
return payload
async def stream_put(ctx, stream, payload):
ctx.set(stream.valid, 1)
ctx.set(stream.payload, payload)
await ctx.tick().until(stream.ready)
ctx.set(stream.valid, 0)
generator_tests = [
(0, DisplayRotation.R0),
(0, DisplayRotation.R90),
(4, DisplayRotation.R90),
]
@pytest.mark.parametrize("addr, rot", generator_tests)
def test_generator(addr, rot):
ds = DisplayString(Coord(3, 0), DisplayDimensions(128, 64), rot)
dut = AddressGenerator(ds)
sim = Simulator(dut)
sim.add_clock(1e-6)
async def runner(ctx):
# TODO: set inputs
ctx.set(dut.addr, addr)
await ctx.tick()
ctx.set(dut.start, 1)
await ctx.tick()
ctx.set(dut.start, 0)
await ctx.tick().until(dut.done == 1)
expected = [
[ds.translate_coord(x, addr, 0), ds.translate_coord(x, addr, 1)]
for x in range(128)
]
expected.reverse()
async def stream_checker(ctx):
while ctx.get(dut.done) == 0:
payload = await stream_get(ctx, dut.coordstream)
assert expected.pop() == payload
sim.add_testbench(runner)
sim.add_testbench(stream_checker)
with sim.write_vcd("generator.vcd"):
sim.run()
basicFetcher_tests = [
({"x": 0, "y": 0}, {"red": 0, "green": 0, "blue": 0}),
({"x": 1, "y": 1}, {"red": 2, "green": 0, "blue": 0}),
({"x": 9, "y": 1}, {"red": 10, "green": 8, "blue": 8}),
]
@pytest.mark.parametrize("inp, expected", basicFetcher_tests)
def test_basic_fetcher(inp, expected):
ds = DisplayString(
Coord(3, 0), DisplayDimensions(128, 64, mux=1), DisplayRotation.R0
)
dut = BasicFetcher(ds, None)
sim = Simulator(dut)
async def test(ctx):
ctx.set(dut.input.payload[0], inp)
res = ctx.get(dut.pixstream.payload)[0]
assert res["red"] == expected["red"]
assert res["green"] == expected["green"]
assert res["blue"] == expected["blue"]
sim.add_testbench(test)
with sim.write_vcd("fetcher.vcd"):
sim.run()

View file

@ -146,7 +146,6 @@ def test_datadriver_single(bcm):
with sim.write_vcd("output.vcd"):
sim.run()
@pytest.mark.skip()
def test_hub75_coordinator():
m = Module()