Compare commits
3 commits
c8034066c9
...
d5381a3c33
Author | SHA1 | Date | |
---|---|---|---|
d5381a3c33 | |||
0b5a917e40 | |||
1ff4adf5e4 |
|
@ -9,6 +9,8 @@ import (
|
||||||
"github.com/kschamplin/gotelem/skylab"
|
"github.com/kschamplin/gotelem/skylab"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Broker is a Bus Event broadcast system. You can subscribe to events,
|
||||||
|
// and send events.
|
||||||
type Broker struct {
|
type Broker struct {
|
||||||
subs map[string]chan skylab.BusEvent // contains the channel for each subsciber
|
subs map[string]chan skylab.BusEvent // contains the channel for each subsciber
|
||||||
|
|
||||||
|
@ -17,6 +19,7 @@ type Broker struct {
|
||||||
bufsize int // size of chan buffer in elements.
|
bufsize int // size of chan buffer in elements.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewBroker creates a new broker with a given logger.
|
||||||
func NewBroker(bufsize int, logger *slog.Logger) *Broker {
|
func NewBroker(bufsize int, logger *slog.Logger) *Broker {
|
||||||
return &Broker{
|
return &Broker{
|
||||||
subs: make(map[string]chan skylab.BusEvent),
|
subs: make(map[string]chan skylab.BusEvent),
|
||||||
|
@ -25,6 +28,7 @@ func NewBroker(bufsize int, logger *slog.Logger) *Broker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Subscribe joins the broker with the given name. The name must be unique.
|
||||||
func (b *Broker) Subscribe(name string) (ch chan skylab.BusEvent, err error) {
|
func (b *Broker) Subscribe(name string) (ch chan skylab.BusEvent, err error) {
|
||||||
// get rw lock.
|
// get rw lock.
|
||||||
b.lock.Lock()
|
b.lock.Lock()
|
||||||
|
@ -40,6 +44,9 @@ func (b *Broker) Subscribe(name string) (ch chan skylab.BusEvent, err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Unsubscribe removes a subscriber matching the name. It doesn't do anything
|
||||||
|
// if there's nobody subscribed with that name
|
||||||
func (b *Broker) Unsubscribe(name string) {
|
func (b *Broker) Unsubscribe(name string) {
|
||||||
// remove the channel from the map. We don't need to close it.
|
// remove the channel from the map. We don't need to close it.
|
||||||
b.lock.Lock()
|
b.lock.Lock()
|
||||||
|
@ -47,6 +54,8 @@ func (b *Broker) Unsubscribe(name string) {
|
||||||
delete(b.subs, name)
|
delete(b.subs, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Publish sends a bus event to all subscribers. It includes a sender
|
||||||
|
// string which prevents loopback.
|
||||||
func (b *Broker) Publish(sender string, message skylab.BusEvent) {
|
func (b *Broker) Publish(sender string, message skylab.BusEvent) {
|
||||||
b.lock.RLock()
|
b.lock.RLock()
|
||||||
defer b.lock.RUnlock()
|
defer b.lock.RUnlock()
|
||||||
|
|
|
@ -92,7 +92,7 @@ func run(ctx *cli.Context) (err error) {
|
||||||
|
|
||||||
fileReader := bufio.NewReader(istream)
|
fileReader := bufio.NewReader(istream)
|
||||||
|
|
||||||
var pfun logparsers.BusParserFunc
|
var pfun logparsers.BusEventParser
|
||||||
|
|
||||||
pfun, ok := logparsers.ParsersMap[ctx.String("format")]
|
pfun, ok := logparsers.ParsersMap[ctx.String("format")]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
|
@ -2,6 +2,7 @@ package logparsers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -35,10 +36,10 @@ func NewFormatError(msg string, err error) error {
|
||||||
return &FormatError{msg: msg, err: err}
|
return &FormatError{msg: msg, err: err}
|
||||||
}
|
}
|
||||||
|
|
||||||
// type LineParserFunc is a function that takes a string
|
// type CanFrameParser is a function that takes a string
|
||||||
// and returns a can frame. This is useful for common
|
// and returns a can frame. This is useful for common
|
||||||
// can dump formats.
|
// can dump formats.
|
||||||
type LineParserFunc func(string) (can.Frame, time.Time, error)
|
type CanFrameParser func(string) (can.Frame, time.Time, error)
|
||||||
|
|
||||||
var candumpRegex = regexp.MustCompile(`^\((\d+)\.(\d{6})\) \w+ (\w+)#(\w+)$`)
|
var candumpRegex = regexp.MustCompile(`^\((\d+)\.(\d{6})\) \w+ (\w+)#(\w+)$`)
|
||||||
|
|
||||||
|
@ -157,20 +158,27 @@ func parseTelemLogLine(line string) (frame can.Frame, ts time.Time, err error) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// BusParserFunc is a function that takes a string and returns a busevent.
|
// BusEventParser is a function that takes a string and returns a busevent.
|
||||||
type BusParserFunc func(string) (skylab.BusEvent, error)
|
type BusEventParser func(string) (skylab.BusEvent, error)
|
||||||
|
|
||||||
// parserBusEventMapper takes a line parser (that returns a can frame)
|
// skylabify JSON parser.
|
||||||
|
func parseSkylabifyLogLine(input string) (skylab.BusEvent, error) {
|
||||||
|
var b = skylab.BusEvent{}
|
||||||
|
err := json.Unmarshal([]byte(input), &b)
|
||||||
|
return b, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// frameParseToBusEvent takes a line parser (that returns a can frame)
|
||||||
// and makes it return a busEvent instead.
|
// and makes it return a busEvent instead.
|
||||||
func parserBusEventMapper(f LineParserFunc) BusParserFunc {
|
func frameParseToBusEvent(fun CanFrameParser) BusEventParser {
|
||||||
return func(s string) (skylab.BusEvent, error) {
|
return func(s string) (skylab.BusEvent, error) {
|
||||||
var b = skylab.BusEvent{}
|
var b = skylab.BusEvent{}
|
||||||
f, ts, err := f(s)
|
frame, ts, err := fun(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return b, err
|
return b, err
|
||||||
}
|
}
|
||||||
b.Timestamp = ts
|
b.Timestamp = ts
|
||||||
b.Data, err = skylab.FromCanFrame(f)
|
b.Data, err = skylab.FromCanFrame(frame)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return b, err
|
return b, err
|
||||||
}
|
}
|
||||||
|
@ -179,7 +187,8 @@ func parserBusEventMapper(f LineParserFunc) BusParserFunc {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var ParsersMap = map[string]BusParserFunc{
|
var ParsersMap = map[string]BusEventParser{
|
||||||
"telem": parserBusEventMapper(parseTelemLogLine),
|
"telem": frameParseToBusEvent(parseTelemLogLine),
|
||||||
"candump": parserBusEventMapper(parseCanDumpLine),
|
"candump": frameParseToBusEvent(parseCanDumpLine),
|
||||||
|
"json": parseSkylabifyLogLine,
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/kschamplin/gotelem/internal/can"
|
"github.com/kschamplin/gotelem/internal/can"
|
||||||
|
"github.com/kschamplin/gotelem/skylab"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Test_parseCanDumpLine(t *testing.T) {
|
func Test_parseCanDumpLine(t *testing.T) {
|
||||||
|
@ -170,3 +171,42 @@ func Test_parseTelemLogLine_errors(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Test_parseSkylabifyLogLine(t *testing.T) {
|
||||||
|
type args struct {
|
||||||
|
input string
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
args args
|
||||||
|
want skylab.BusEvent
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "basic test",
|
||||||
|
args: args{
|
||||||
|
input: `{"ts":1685141873612,"id":259,"name":"wsl_velocity","data":{"motor_velocity":89.97547,"vehicle_velocity":2.38853}}`},
|
||||||
|
want: skylab.BusEvent{
|
||||||
|
Timestamp: time.UnixMilli(1685141873612),
|
||||||
|
Name: "wsl_velocity",
|
||||||
|
Data: &skylab.WslVelocity{
|
||||||
|
MotorVelocity: 89.97547,
|
||||||
|
VehicleVelocity: 2.38853,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
got, err := parseSkylabifyLogLine(tt.args.input)
|
||||||
|
if (err != nil) != tt.wantErr {
|
||||||
|
t.Errorf("parseSkylabifyLogLine() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(got, tt.want) {
|
||||||
|
t.Errorf("parseSkylabifyLogLine() = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
1275
skylab/skylab_gen.go
1275
skylab/skylab_gen.go
File diff suppressed because one or more lines are too long
File diff suppressed because it is too large
Load diff
|
@ -3,6 +3,7 @@ package skylab
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
"reflect"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -44,4 +45,20 @@ func TestJSON{{$structName}}(t *testing.T) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCanFrame{{$structName}}(t *testing.T) {
|
||||||
|
v := &{{$structName}}{}
|
||||||
|
frame, err := ToCanFrame(v)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("expected no error, got %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
retpkt, err := FromCanFrame(frame)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("expected no error, got %v", err)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(v, retpkt) {
|
||||||
|
t.Fatalf("decoded packet did not match sent %v got %v", v, retpkt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
{{- end }}
|
{{- end }}
|
||||||
|
|
|
@ -29,7 +29,7 @@ func TestCanSocket(t *testing.T) {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("test name", func(t *testing.T) {
|
t.Run("test interface name", func(t *testing.T) {
|
||||||
sock, _ := NewCanSocket("vcan0")
|
sock, _ := NewCanSocket("vcan0")
|
||||||
defer sock.Close()
|
defer sock.Close()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue