diff --git a/broker.go b/broker.go index 57e9a09..1e4e348 100644 --- a/broker.go +++ b/broker.go @@ -4,6 +4,8 @@ import ( "errors" "fmt" "sync" + + "golang.org/x/exp/slog" ) type BrokerRequest struct { @@ -23,15 +25,6 @@ type Broker struct { unsubCh chan BrokerClient } -func NewBroker(bufsize int) *Broker { - b := &Broker{ - subs: make(map[string]chan Frame), - publishCh: make(chan BrokerRequest, 3), - subsCh: make(chan BrokerClient, 3), - unsubCh: make(chan BrokerClient, 3), - } - return b -} // Start runs the broker and sends messages to the subscribers (but not the sender) func (b *Broker) Start() { @@ -90,12 +83,23 @@ func (b *Broker) Unsubscribe(name string) { type JBroker struct { - subs map[string] chan CANDumpJSON // contains the channel for each subsciber + subs map[string] chan CANDumpEntry // contains the channel for each subsciber + logger *slog.Logger lock sync.RWMutex + bufsize int // size of chan buffer in elements. } -func (b *JBroker) Subscribe(name string) (ch chan CANDumpJSON, err error) { + +func NewBroker(bufsize int, logger *slog.Logger) *JBroker { + return &JBroker{ + subs: make(map[string]chan CANDumpEntry), + logger: logger, + bufsize: bufsize, + } +} + +func (b *JBroker) Subscribe(name string) (ch chan CANDumpEntry, err error) { // get rw lock. b.lock.Lock() defer b.lock.Unlock() @@ -103,36 +107,33 @@ func (b *JBroker) Subscribe(name string) (ch chan CANDumpJSON, err error) { if ok { return nil, errors.New("name already in use") } - ch = make(chan CANDumpJSON, 10) + b.logger.Info("new subscriber", "name", name) + ch = make(chan CANDumpEntry, b.bufsize) return } func (b *JBroker) Unsubscribe(name string) { - // if the channel is in use, close it, else do nothing. + // remove the channel from the map. We don't need to close it. b.lock.Lock() defer b.lock.Unlock() - ch, ok := b.subs[name] - if ok { - close(ch) - } delete(b.subs, name) } -func (b *JBroker) Publish(sender string, message CANDumpJSON) { - go func() { - b.lock.RLock() - defer b.lock.RUnlock() - for name, ch := range b.subs { - if name == sender { - continue - } - // non blocking send. - select { - case ch <- message: - default: - } +func (b *JBroker) Publish(sender string, message CANDumpEntry) { + b.lock.RLock() + defer b.lock.RUnlock() + for name, ch := range b.subs { + if name == sender { + continue } + // non blocking send. + select { + case ch <- message: + b.logger.Debug("sent message", "dest", name, "src", sender) + default: + b.logger.Warn("recipient buffer full", "dest", name) + } + } - }() } diff --git a/cmd/gotelem/cli/server.go b/cmd/gotelem/cli/server.go index c9703a0..9b55e9f 100644 --- a/cmd/gotelem/cli/server.go +++ b/cmd/gotelem/cli/server.go @@ -2,6 +2,7 @@ package cli import ( "encoding/binary" + "encoding/json" "fmt" "net" "os" @@ -45,7 +46,7 @@ type testThing func(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logg type service interface { fmt.Stringer - Start(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) (err error) + Start(cCtx *cli.Context, broker *gotelem.JBroker, logger *slog.Logger) (err error) Status() } @@ -68,12 +69,11 @@ func serve(cCtx *cli.Context) error { logger := slog.New(slog.NewTextHandler(os.Stderr)) slog.SetDefault(logger) - broker := gotelem.NewBroker(3) + broker := gotelem.NewBroker(3, logger.WithGroup("broker")) done := make(chan struct{}) // start the can listener - go broker.Start() wg := sync.WaitGroup{} @@ -176,27 +176,31 @@ func (c *CanLoggerService) Status() { } -func (c *CanLoggerService) Start(cCtx *cli.Context, broker *gotelem.Broker, l *slog.Logger) (err error) { - rxCh := broker.Subscribe("candump") +func (c *CanLoggerService) Start(cCtx *cli.Context, broker *gotelem.JBroker, l *slog.Logger) (err error) { + rxCh, err := broker.Subscribe("candump") + if err != nil { + return err + } t := time.Now() fname := fmt.Sprintf("candump_%d-%02d-%02dT%02d.%02d.%02d.txt", t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) l.Info("logging to file", "filename", fname) - cw, err := gotelem.OpenCanWriter(fname) + f, err := os.Create(fname) if err != nil { l.Error("error opening file", "filename", fname, "err", err) return } + enc := json.NewEncoder(f) for { select { case msg := <-rxCh: - cw.Send(&msg) + enc.Encode(msg) case <-cCtx.Done(): - cw.Close() + f.Close() return } } @@ -216,18 +220,21 @@ func (x *XBeeService) Status() { } -func (x *XBeeService) Start(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) (err error) { +func (x *XBeeService) Start(cCtx *cli.Context, broker *gotelem.JBroker, logger *slog.Logger) (err error) { if cCtx.String("xbee") == "" { logger.Info("not using xbee") return } - transport, err := xbee.ParseDeviceString(cCtx.String("device")) + transport, err := xbee.ParseDeviceString(cCtx.String("xbee")) if err != nil { logger.Error("failed to open xbee string", "err", err) return } logger.Info("using xbee device", "transport", transport) - rxCh := broker.Subscribe("xbee") + rxCh, err := broker.Subscribe("xbee") + if err != nil { + logger.Error("failed to subscribe to broker", "err", err) + } x.session, err = xbee.NewSession(transport, logger.With("device", transport.Type())) if err != nil { @@ -245,8 +252,7 @@ func (x *XBeeService) Start(cCtx *cli.Context, broker *gotelem.Broker, logger *s logger.Info("got msg", "msg", msg) buf := make([]byte, 0) - buf = binary.BigEndian.AppendUint32(buf, msg.Id) - buf = append(buf, msg.Data...) + // FIXME: implement serialzation over xbee. _, err := x.session.Write(buf) if err != nil { diff --git a/cmd/skylabify/skylabify.go b/cmd/skylabify/skylabify.go index ba6c3a6..b1fee59 100644 --- a/cmd/skylabify/skylabify.go +++ b/cmd/skylabify/skylabify.go @@ -92,7 +92,7 @@ func run(ctx *cli.Context) (err error) { segments := strings.Split(dumpLine, " ") - var cd gotelem.CANDumpJSON + var cd gotelem.CANDumpEntry // this is cursed but easiest way to get a float from a string. fmt.Sscanf(segments[0], "(%g)", &cd.Timestamp) diff --git a/skylab_logger.go b/skylab_logger.go index fc65fe8..946d743 100644 --- a/skylab_logger.go +++ b/skylab_logger.go @@ -12,7 +12,7 @@ import ( // CanWriter type CanWriter struct { output *os.File - cd CANDumpJSON + cd CANDumpEntry jsonBuf []byte } @@ -51,7 +51,7 @@ func OpenCanWriter(name string) (*CanWriter, error) { return cw, nil } -type CANDumpJSON struct { +type CANDumpEntry struct { Timestamp float64 `json:"ts"` Id uint64 `json:"id"` Data skylab.Packet `json:"data"`