wip: broker packet refactor

This commit is contained in:
saji 2023-05-26 15:24:51 -05:00
parent 8727dc43c7
commit 6e14bb2951
4 changed files with 54 additions and 47 deletions

View file

@ -4,6 +4,8 @@ import (
"errors" "errors"
"fmt" "fmt"
"sync" "sync"
"golang.org/x/exp/slog"
) )
type BrokerRequest struct { type BrokerRequest struct {
@ -23,15 +25,6 @@ type Broker struct {
unsubCh chan BrokerClient unsubCh chan BrokerClient
} }
func NewBroker(bufsize int) *Broker {
b := &Broker{
subs: make(map[string]chan Frame),
publishCh: make(chan BrokerRequest, 3),
subsCh: make(chan BrokerClient, 3),
unsubCh: make(chan BrokerClient, 3),
}
return b
}
// Start runs the broker and sends messages to the subscribers (but not the sender) // Start runs the broker and sends messages to the subscribers (but not the sender)
func (b *Broker) Start() { func (b *Broker) Start() {
@ -90,12 +83,23 @@ func (b *Broker) Unsubscribe(name string) {
type JBroker struct { type JBroker struct {
subs map[string] chan CANDumpJSON // contains the channel for each subsciber subs map[string] chan CANDumpEntry // contains the channel for each subsciber
logger *slog.Logger
lock sync.RWMutex lock sync.RWMutex
bufsize int // size of chan buffer in elements.
} }
func (b *JBroker) Subscribe(name string) (ch chan CANDumpJSON, err error) {
func NewBroker(bufsize int, logger *slog.Logger) *JBroker {
return &JBroker{
subs: make(map[string]chan CANDumpEntry),
logger: logger,
bufsize: bufsize,
}
}
func (b *JBroker) Subscribe(name string) (ch chan CANDumpEntry, err error) {
// get rw lock. // get rw lock.
b.lock.Lock() b.lock.Lock()
defer b.lock.Unlock() defer b.lock.Unlock()
@ -103,36 +107,33 @@ func (b *JBroker) Subscribe(name string) (ch chan CANDumpJSON, err error) {
if ok { if ok {
return nil, errors.New("name already in use") return nil, errors.New("name already in use")
} }
ch = make(chan CANDumpJSON, 10) b.logger.Info("new subscriber", "name", name)
ch = make(chan CANDumpEntry, b.bufsize)
return return
} }
func (b *JBroker) Unsubscribe(name string) { func (b *JBroker) Unsubscribe(name string) {
// if the channel is in use, close it, else do nothing. // remove the channel from the map. We don't need to close it.
b.lock.Lock() b.lock.Lock()
defer b.lock.Unlock() defer b.lock.Unlock()
ch, ok := b.subs[name]
if ok {
close(ch)
}
delete(b.subs, name) delete(b.subs, name)
} }
func (b *JBroker) Publish(sender string, message CANDumpJSON) { func (b *JBroker) Publish(sender string, message CANDumpEntry) {
go func() { b.lock.RLock()
b.lock.RLock() defer b.lock.RUnlock()
defer b.lock.RUnlock() for name, ch := range b.subs {
for name, ch := range b.subs { if name == sender {
if name == sender { continue
continue
}
// non blocking send.
select {
case ch <- message:
default:
}
} }
// non blocking send.
select {
case ch <- message:
b.logger.Debug("sent message", "dest", name, "src", sender)
default:
b.logger.Warn("recipient buffer full", "dest", name)
}
}
}()
} }

View file

@ -2,6 +2,7 @@ package cli
import ( import (
"encoding/binary" "encoding/binary"
"encoding/json"
"fmt" "fmt"
"net" "net"
"os" "os"
@ -45,7 +46,7 @@ type testThing func(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logg
type service interface { type service interface {
fmt.Stringer fmt.Stringer
Start(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) (err error) Start(cCtx *cli.Context, broker *gotelem.JBroker, logger *slog.Logger) (err error)
Status() Status()
} }
@ -68,12 +69,11 @@ func serve(cCtx *cli.Context) error {
logger := slog.New(slog.NewTextHandler(os.Stderr)) logger := slog.New(slog.NewTextHandler(os.Stderr))
slog.SetDefault(logger) slog.SetDefault(logger)
broker := gotelem.NewBroker(3) broker := gotelem.NewBroker(3, logger.WithGroup("broker"))
done := make(chan struct{}) done := make(chan struct{})
// start the can listener // start the can listener
go broker.Start()
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
@ -176,27 +176,31 @@ func (c *CanLoggerService) Status() {
} }
func (c *CanLoggerService) Start(cCtx *cli.Context, broker *gotelem.Broker, l *slog.Logger) (err error) { func (c *CanLoggerService) Start(cCtx *cli.Context, broker *gotelem.JBroker, l *slog.Logger) (err error) {
rxCh := broker.Subscribe("candump") rxCh, err := broker.Subscribe("candump")
if err != nil {
return err
}
t := time.Now() t := time.Now()
fname := fmt.Sprintf("candump_%d-%02d-%02dT%02d.%02d.%02d.txt", fname := fmt.Sprintf("candump_%d-%02d-%02dT%02d.%02d.%02d.txt",
t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second())
l.Info("logging to file", "filename", fname) l.Info("logging to file", "filename", fname)
cw, err := gotelem.OpenCanWriter(fname) f, err := os.Create(fname)
if err != nil { if err != nil {
l.Error("error opening file", "filename", fname, "err", err) l.Error("error opening file", "filename", fname, "err", err)
return return
} }
enc := json.NewEncoder(f)
for { for {
select { select {
case msg := <-rxCh: case msg := <-rxCh:
cw.Send(&msg) enc.Encode(msg)
case <-cCtx.Done(): case <-cCtx.Done():
cw.Close() f.Close()
return return
} }
} }
@ -216,18 +220,21 @@ func (x *XBeeService) Status() {
} }
func (x *XBeeService) Start(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) (err error) { func (x *XBeeService) Start(cCtx *cli.Context, broker *gotelem.JBroker, logger *slog.Logger) (err error) {
if cCtx.String("xbee") == "" { if cCtx.String("xbee") == "" {
logger.Info("not using xbee") logger.Info("not using xbee")
return return
} }
transport, err := xbee.ParseDeviceString(cCtx.String("device")) transport, err := xbee.ParseDeviceString(cCtx.String("xbee"))
if err != nil { if err != nil {
logger.Error("failed to open xbee string", "err", err) logger.Error("failed to open xbee string", "err", err)
return return
} }
logger.Info("using xbee device", "transport", transport) logger.Info("using xbee device", "transport", transport)
rxCh := broker.Subscribe("xbee") rxCh, err := broker.Subscribe("xbee")
if err != nil {
logger.Error("failed to subscribe to broker", "err", err)
}
x.session, err = xbee.NewSession(transport, logger.With("device", transport.Type())) x.session, err = xbee.NewSession(transport, logger.With("device", transport.Type()))
if err != nil { if err != nil {
@ -245,8 +252,7 @@ func (x *XBeeService) Start(cCtx *cli.Context, broker *gotelem.Broker, logger *s
logger.Info("got msg", "msg", msg) logger.Info("got msg", "msg", msg)
buf := make([]byte, 0) buf := make([]byte, 0)
buf = binary.BigEndian.AppendUint32(buf, msg.Id) // FIXME: implement serialzation over xbee.
buf = append(buf, msg.Data...)
_, err := x.session.Write(buf) _, err := x.session.Write(buf)
if err != nil { if err != nil {

View file

@ -92,7 +92,7 @@ func run(ctx *cli.Context) (err error) {
segments := strings.Split(dumpLine, " ") segments := strings.Split(dumpLine, " ")
var cd gotelem.CANDumpJSON var cd gotelem.CANDumpEntry
// this is cursed but easiest way to get a float from a string. // this is cursed but easiest way to get a float from a string.
fmt.Sscanf(segments[0], "(%g)", &cd.Timestamp) fmt.Sscanf(segments[0], "(%g)", &cd.Timestamp)

View file

@ -12,7 +12,7 @@ import (
// CanWriter // CanWriter
type CanWriter struct { type CanWriter struct {
output *os.File output *os.File
cd CANDumpJSON cd CANDumpEntry
jsonBuf []byte jsonBuf []byte
} }
@ -51,7 +51,7 @@ func OpenCanWriter(name string) (*CanWriter, error) {
return cw, nil return cw, nil
} }
type CANDumpJSON struct { type CANDumpEntry struct {
Timestamp float64 `json:"ts"` Timestamp float64 `json:"ts"`
Id uint64 `json:"id"` Id uint64 `json:"id"`
Data skylab.Packet `json:"data"` Data skylab.Packet `json:"data"`