diff --git a/broker.go b/broker.go index 4c44fb1..90da398 100644 --- a/broker.go +++ b/broker.go @@ -8,7 +8,7 @@ import ( "golang.org/x/exp/slog" ) -type JBroker struct { +type Broker struct { subs map[string]chan skylab.BusEvent // contains the channel for each subsciber logger *slog.Logger @@ -16,15 +16,15 @@ type JBroker struct { bufsize int // size of chan buffer in elements. } -func NewBroker(bufsize int, logger *slog.Logger) *JBroker { - return &JBroker{ +func NewBroker(bufsize int, logger *slog.Logger) *Broker { + return &Broker{ subs: make(map[string]chan skylab.BusEvent), logger: logger, bufsize: bufsize, } } -func (b *JBroker) Subscribe(name string) (ch chan skylab.BusEvent, err error) { +func (b *Broker) Subscribe(name string) (ch chan skylab.BusEvent, err error) { // get rw lock. b.lock.Lock() defer b.lock.Unlock() @@ -39,14 +39,14 @@ func (b *JBroker) Subscribe(name string) (ch chan skylab.BusEvent, err error) { return } -func (b *JBroker) Unsubscribe(name string) { +func (b *Broker) Unsubscribe(name string) { // remove the channel from the map. We don't need to close it. b.lock.Lock() defer b.lock.Unlock() delete(b.subs, name) } -func (b *JBroker) Publish(sender string, message skylab.BusEvent) { +func (b *Broker) Publish(sender string, message skylab.BusEvent) { b.lock.RLock() defer b.lock.RUnlock() for name, ch := range b.subs { diff --git a/cmd/gotelem/cli/server.go b/cmd/gotelem/cli/server.go index 89c44f7..4e4963d 100644 --- a/cmd/gotelem/cli/server.go +++ b/cmd/gotelem/cli/server.go @@ -44,7 +44,7 @@ var serveCmd = &cli.Command{ type service interface { fmt.Stringer - Start(cCtx *cli.Context, broker *gotelem.JBroker, logger *slog.Logger) (err error) + Start(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) (err error) Status() } @@ -93,7 +93,7 @@ func (r *rpcService) String() string { return "rpcService" } -func (r *rpcService) Start(ctx *cli.Context, broker *gotelem.JBroker, logger *slog.Logger) error { +func (r *rpcService) Start(ctx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) error { // TODO: extract port/ip from cli context. ln, err := net.Listen("tcp", "0.0.0.0:8082") if err != nil { @@ -109,7 +109,7 @@ func (r *rpcService) Start(ctx *cli.Context, broker *gotelem.JBroker, logger *sl } } -func handleCon(conn net.Conn, broker *gotelem.JBroker, l *slog.Logger, done <-chan struct{}) { +func handleCon(conn net.Conn, broker *gotelem.Broker, l *slog.Logger, done <-chan struct{}) { // reader := msgp.NewReader(conn) subname := fmt.Sprint("tcp", conn.RemoteAddr().String()) @@ -153,7 +153,7 @@ func (c *canLoggerService) String() string { func (c *canLoggerService) Status() { } -func (c *canLoggerService) Start(cCtx *cli.Context, broker *gotelem.JBroker, l *slog.Logger) (err error) { +func (c *canLoggerService) Start(cCtx *cli.Context, broker *gotelem.Broker, l *slog.Logger) (err error) { rxCh, err := broker.Subscribe("canDump") if err != nil { return err @@ -196,7 +196,7 @@ func (x *xBeeService) String() string { func (x *xBeeService) Status() { } -func (x *xBeeService) Start(cCtx *cli.Context, broker *gotelem.JBroker, logger *slog.Logger) (err error) { +func (x *xBeeService) Start(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) (err error) { if cCtx.String("xbee") == "" { logger.Info("not using xbee") return @@ -247,7 +247,7 @@ func (h *httpService) Status() { } -func (h *httpService) Start(cCtx *cli.Context, broker *gotelem.JBroker, logger *slog.Logger) (err error) { +func (h *httpService) Start(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) (err error) { r := gotelem.TelemRouter(logger) diff --git a/cmd/gotelem/cli/socketcan.go b/cmd/gotelem/cli/socketcan.go index b138a18..39a9e84 100644 --- a/cmd/gotelem/cli/socketcan.go +++ b/cmd/gotelem/cli/socketcan.go @@ -53,7 +53,8 @@ func (s *socketCANService) String() string { return s.name } -func (s *socketCANService) Start(cCtx *cli.Context, broker *gotelem.JBroker, logger *slog.Logger) (err error) { +// Start starts the socketCAN service - emitting packets sent from the broker. +func (s *socketCANService) Start(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) (err error) { // vcan0 demo if cCtx.String("can") == "" { @@ -112,8 +113,8 @@ func (s *socketCANService) Start(cCtx *cli.Context, broker *gotelem.JBroker, log continue } cde := skylab.BusEvent{ - Timestamp: float64(time.Now().UnixNano()) / 1e9, - Id: uint64(msg.Id), + Timestamp: time.Now(), + Id: msg.Id, Data: p, } broker.Publish("socketCAN", cde) diff --git a/cmd/skylabify/skylabify.go b/cmd/skylabify/skylabify.go index 1e708a8..bc6fb17 100644 --- a/cmd/skylabify/skylabify.go +++ b/cmd/skylabify/skylabify.go @@ -100,10 +100,11 @@ func run(ctx *cli.Context) (err error) { hexes := strings.Split(segments[2], "#") // get the id - cd.Id, err = strconv.ParseUint(hexes[0], 16, 64) + id, err := strconv.ParseUint(hexes[0], 16, 64) if err != nil { return err } + cd.Id = uint32(id) // get the data to a []byte rawData, err := hex.DecodeString(hexes[1]) diff --git a/skylab/skylab.go b/skylab/skylab.go index e10f8f7..41e457b 100644 --- a/skylab/skylab.go +++ b/skylab/skylab.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "math" + "time" // this is needed so that we can run make_skylab.go // without this, the yaml library will be removed @@ -81,25 +82,25 @@ func ToCanFrame(p Packet) (id uint32, data []byte, err error) { // ---- other wire encoding business ---- // internal structure for partially decoding json object. -type jsonRawEvent struct { - Timestamp uint32 - Id uint32 - Name string - Data json.RawMessage +// includes +type RawJsonEvent struct { + Timestamp uint32 `json:"ts" db:"ts"` + Id uint32 `json:"id"` + Name string `json:"name"` + Data json.RawMessage `json:"data"` } // BusEvent is a timestamped Skylab packet type BusEvent struct { - Timestamp uint32 `json:"ts"` - Id uint32 `json:"id"` - Name string `json:"name"` - Data Packet `json:"data"` + Timestamp time.Time `json:"ts"` + Id uint32 `json:"id"` + Data Packet `json:"data"` } func (e *BusEvent) MarshalJSON() (b []byte, err error) { // create the underlying raw event - j := &jsonRawEvent{ - Timestamp: e.Timestamp, + j := &RawJsonEvent{ + Timestamp: uint32(e.Timestamp.UnixMilli()), Id: uint32(e.Id), Name: e.Data.String(), } @@ -114,18 +115,17 @@ func (e *BusEvent) MarshalJSON() (b []byte, err error) { } func (e *BusEvent) UnmarshalJSON(b []byte) error { - var jRaw *jsonRawEvent + j := &RawJsonEvent{} - err := json.Unmarshal(b, jRaw) + err := json.Unmarshal(b, j) if err != nil { return err } - e.Timestamp = jRaw.Timestamp - e.Id = jRaw.Id - e.Data, err = FromJson(jRaw.Id, jRaw.Data) - e.Name = e.Data.String() + e.Timestamp = time.UnixMilli(int64(j.Timestamp)) + e.Id = j.Id + e.Data, err = FromJson(j.Id, j.Data) return err } @@ -139,7 +139,7 @@ func (e *BusEvent) MarshalMsg(b []byte) ([]byte, error) { return nil, err } rawEv := &msgpRawEvent{ - Timestamp: e.Timestamp, + Timestamp: uint32(e.Timestamp.UnixMilli()), Id: uint32(e.Id), Data: data, } @@ -153,10 +153,9 @@ func (e *BusEvent) UnmarshalMsg(b []byte) ([]byte, error) { if err != nil { return remain, err } - e.Timestamp = rawEv.Timestamp + e.Timestamp = time.UnixMilli(int64(rawEv.Timestamp)) e.Id = rawEv.Id e.Data, err = FromCanFrame(rawEv.Id, rawEv.Data) - e.Name = e.Data.String() return remain, err }