diff --git a/broker.go b/broker.go index 1e4e348..1c2ba33 100644 --- a/broker.go +++ b/broker.go @@ -110,6 +110,7 @@ func (b *JBroker) Subscribe(name string) (ch chan CANDumpEntry, err error) { b.logger.Info("new subscriber", "name", name) ch = make(chan CANDumpEntry, b.bufsize) + b.subs[name] = ch return } diff --git a/cmd/gotelem/cli/server.go b/cmd/gotelem/cli/server.go index 9e2142f..c844dba 100644 --- a/cmd/gotelem/cli/server.go +++ b/cmd/gotelem/cli/server.go @@ -164,7 +164,7 @@ func (c *CanLoggerService) Status() { func (c *CanLoggerService) Start(cCtx *cli.Context, broker *gotelem.JBroker, l *slog.Logger) (err error) { - rxCh, err := broker.Subscribe("candump") + rxCh, err := broker.Subscribe("canDump") if err != nil { return err } @@ -186,6 +186,7 @@ func (c *CanLoggerService) Start(cCtx *cli.Context, broker *gotelem.JBroker, l case msg := <-rxCh: enc.Encode(msg) + case <-cCtx.Done(): f.Close() return @@ -230,6 +231,7 @@ func (x *XBeeService) Start(cCtx *cli.Context, broker *gotelem.JBroker, logger * } logger.Info("connected to local xbee", "addr", x.session.LocalAddr()) + encode := json.NewEncoder(x.session) for { select { case <-cCtx.Done(): @@ -237,11 +239,7 @@ func (x *XBeeService) Start(cCtx *cli.Context, broker *gotelem.JBroker, logger * return case msg := <-rxCh: logger.Info("got msg", "msg", msg) - buf := make([]byte, 0) - - // FIXME: implement serialzation over xbee. - - _, err := x.session.Write(buf) + encode.Encode(msg) if err != nil { logger.Warn("error writing to xbee", "err", err) } diff --git a/cmd/gotelem/cli/socketcan.go b/cmd/gotelem/cli/socketcan.go index e88988f..710178e 100644 --- a/cmd/gotelem/cli/socketcan.go +++ b/cmd/gotelem/cli/socketcan.go @@ -114,7 +114,7 @@ func (s *socketCANService) Start(cCtx *cli.Context, broker *gotelem.JBroker, log continue } cde := gotelem.CANDumpEntry{ - Timestamp: float64(time.Now().Unix()), + Timestamp: float64(time.Now().UnixNano()) / 1e9, Id: uint64(msg.Id), Data: p, }