name in json

This commit is contained in:
saji 2023-06-29 23:59:16 -05:00
parent 04308611ff
commit 5ceaa7bf9d
5 changed files with 37 additions and 36 deletions

View file

@ -8,7 +8,7 @@ import (
"golang.org/x/exp/slog" "golang.org/x/exp/slog"
) )
type JBroker struct { type Broker struct {
subs map[string]chan skylab.BusEvent // contains the channel for each subsciber subs map[string]chan skylab.BusEvent // contains the channel for each subsciber
logger *slog.Logger logger *slog.Logger
@ -16,15 +16,15 @@ type JBroker struct {
bufsize int // size of chan buffer in elements. bufsize int // size of chan buffer in elements.
} }
func NewBroker(bufsize int, logger *slog.Logger) *JBroker { func NewBroker(bufsize int, logger *slog.Logger) *Broker {
return &JBroker{ return &Broker{
subs: make(map[string]chan skylab.BusEvent), subs: make(map[string]chan skylab.BusEvent),
logger: logger, logger: logger,
bufsize: bufsize, bufsize: bufsize,
} }
} }
func (b *JBroker) Subscribe(name string) (ch chan skylab.BusEvent, err error) { func (b *Broker) Subscribe(name string) (ch chan skylab.BusEvent, err error) {
// get rw lock. // get rw lock.
b.lock.Lock() b.lock.Lock()
defer b.lock.Unlock() defer b.lock.Unlock()
@ -39,14 +39,14 @@ func (b *JBroker) Subscribe(name string) (ch chan skylab.BusEvent, err error) {
return return
} }
func (b *JBroker) Unsubscribe(name string) { func (b *Broker) Unsubscribe(name string) {
// remove the channel from the map. We don't need to close it. // remove the channel from the map. We don't need to close it.
b.lock.Lock() b.lock.Lock()
defer b.lock.Unlock() defer b.lock.Unlock()
delete(b.subs, name) delete(b.subs, name)
} }
func (b *JBroker) Publish(sender string, message skylab.BusEvent) { func (b *Broker) Publish(sender string, message skylab.BusEvent) {
b.lock.RLock() b.lock.RLock()
defer b.lock.RUnlock() defer b.lock.RUnlock()
for name, ch := range b.subs { for name, ch := range b.subs {

View file

@ -44,7 +44,7 @@ var serveCmd = &cli.Command{
type service interface { type service interface {
fmt.Stringer fmt.Stringer
Start(cCtx *cli.Context, broker *gotelem.JBroker, logger *slog.Logger) (err error) Start(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) (err error)
Status() Status()
} }
@ -93,7 +93,7 @@ func (r *rpcService) String() string {
return "rpcService" return "rpcService"
} }
func (r *rpcService) Start(ctx *cli.Context, broker *gotelem.JBroker, logger *slog.Logger) error { func (r *rpcService) Start(ctx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) error {
// TODO: extract port/ip from cli context. // TODO: extract port/ip from cli context.
ln, err := net.Listen("tcp", "0.0.0.0:8082") ln, err := net.Listen("tcp", "0.0.0.0:8082")
if err != nil { if err != nil {
@ -109,7 +109,7 @@ func (r *rpcService) Start(ctx *cli.Context, broker *gotelem.JBroker, logger *sl
} }
} }
func handleCon(conn net.Conn, broker *gotelem.JBroker, l *slog.Logger, done <-chan struct{}) { func handleCon(conn net.Conn, broker *gotelem.Broker, l *slog.Logger, done <-chan struct{}) {
// reader := msgp.NewReader(conn) // reader := msgp.NewReader(conn)
subname := fmt.Sprint("tcp", conn.RemoteAddr().String()) subname := fmt.Sprint("tcp", conn.RemoteAddr().String())
@ -153,7 +153,7 @@ func (c *canLoggerService) String() string {
func (c *canLoggerService) Status() { func (c *canLoggerService) Status() {
} }
func (c *canLoggerService) Start(cCtx *cli.Context, broker *gotelem.JBroker, l *slog.Logger) (err error) { func (c *canLoggerService) Start(cCtx *cli.Context, broker *gotelem.Broker, l *slog.Logger) (err error) {
rxCh, err := broker.Subscribe("canDump") rxCh, err := broker.Subscribe("canDump")
if err != nil { if err != nil {
return err return err
@ -196,7 +196,7 @@ func (x *xBeeService) String() string {
func (x *xBeeService) Status() { func (x *xBeeService) Status() {
} }
func (x *xBeeService) Start(cCtx *cli.Context, broker *gotelem.JBroker, logger *slog.Logger) (err error) { func (x *xBeeService) Start(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) (err error) {
if cCtx.String("xbee") == "" { if cCtx.String("xbee") == "" {
logger.Info("not using xbee") logger.Info("not using xbee")
return return
@ -247,7 +247,7 @@ func (h *httpService) Status() {
} }
func (h *httpService) Start(cCtx *cli.Context, broker *gotelem.JBroker, logger *slog.Logger) (err error) { func (h *httpService) Start(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) (err error) {
r := gotelem.TelemRouter(logger) r := gotelem.TelemRouter(logger)

View file

@ -53,7 +53,8 @@ func (s *socketCANService) String() string {
return s.name return s.name
} }
func (s *socketCANService) Start(cCtx *cli.Context, broker *gotelem.JBroker, logger *slog.Logger) (err error) { // Start starts the socketCAN service - emitting packets sent from the broker.
func (s *socketCANService) Start(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) (err error) {
// vcan0 demo // vcan0 demo
if cCtx.String("can") == "" { if cCtx.String("can") == "" {
@ -112,8 +113,8 @@ func (s *socketCANService) Start(cCtx *cli.Context, broker *gotelem.JBroker, log
continue continue
} }
cde := skylab.BusEvent{ cde := skylab.BusEvent{
Timestamp: float64(time.Now().UnixNano()) / 1e9, Timestamp: time.Now(),
Id: uint64(msg.Id), Id: msg.Id,
Data: p, Data: p,
} }
broker.Publish("socketCAN", cde) broker.Publish("socketCAN", cde)

View file

@ -100,10 +100,11 @@ func run(ctx *cli.Context) (err error) {
hexes := strings.Split(segments[2], "#") hexes := strings.Split(segments[2], "#")
// get the id // get the id
cd.Id, err = strconv.ParseUint(hexes[0], 16, 64) id, err := strconv.ParseUint(hexes[0], 16, 64)
if err != nil { if err != nil {
return err return err
} }
cd.Id = uint32(id)
// get the data to a []byte // get the data to a []byte
rawData, err := hex.DecodeString(hexes[1]) rawData, err := hex.DecodeString(hexes[1])

View file

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"math" "math"
"time"
// this is needed so that we can run make_skylab.go // this is needed so that we can run make_skylab.go
// without this, the yaml library will be removed // without this, the yaml library will be removed
@ -81,25 +82,25 @@ func ToCanFrame(p Packet) (id uint32, data []byte, err error) {
// ---- other wire encoding business ---- // ---- other wire encoding business ----
// internal structure for partially decoding json object. // internal structure for partially decoding json object.
type jsonRawEvent struct { // includes
Timestamp uint32 type RawJsonEvent struct {
Id uint32 Timestamp uint32 `json:"ts" db:"ts"`
Name string Id uint32 `json:"id"`
Data json.RawMessage Name string `json:"name"`
Data json.RawMessage `json:"data"`
} }
// BusEvent is a timestamped Skylab packet // BusEvent is a timestamped Skylab packet
type BusEvent struct { type BusEvent struct {
Timestamp uint32 `json:"ts"` Timestamp time.Time `json:"ts"`
Id uint32 `json:"id"` Id uint32 `json:"id"`
Name string `json:"name"`
Data Packet `json:"data"` Data Packet `json:"data"`
} }
func (e *BusEvent) MarshalJSON() (b []byte, err error) { func (e *BusEvent) MarshalJSON() (b []byte, err error) {
// create the underlying raw event // create the underlying raw event
j := &jsonRawEvent{ j := &RawJsonEvent{
Timestamp: e.Timestamp, Timestamp: uint32(e.Timestamp.UnixMilli()),
Id: uint32(e.Id), Id: uint32(e.Id),
Name: e.Data.String(), Name: e.Data.String(),
} }
@ -114,18 +115,17 @@ func (e *BusEvent) MarshalJSON() (b []byte, err error) {
} }
func (e *BusEvent) UnmarshalJSON(b []byte) error { func (e *BusEvent) UnmarshalJSON(b []byte) error {
var jRaw *jsonRawEvent j := &RawJsonEvent{}
err := json.Unmarshal(b, jRaw) err := json.Unmarshal(b, j)
if err != nil { if err != nil {
return err return err
} }
e.Timestamp = jRaw.Timestamp e.Timestamp = time.UnixMilli(int64(j.Timestamp))
e.Id = jRaw.Id e.Id = j.Id
e.Data, err = FromJson(jRaw.Id, jRaw.Data) e.Data, err = FromJson(j.Id, j.Data)
e.Name = e.Data.String()
return err return err
} }
@ -139,7 +139,7 @@ func (e *BusEvent) MarshalMsg(b []byte) ([]byte, error) {
return nil, err return nil, err
} }
rawEv := &msgpRawEvent{ rawEv := &msgpRawEvent{
Timestamp: e.Timestamp, Timestamp: uint32(e.Timestamp.UnixMilli()),
Id: uint32(e.Id), Id: uint32(e.Id),
Data: data, Data: data,
} }
@ -153,10 +153,9 @@ func (e *BusEvent) UnmarshalMsg(b []byte) ([]byte, error) {
if err != nil { if err != nil {
return remain, err return remain, err
} }
e.Timestamp = rawEv.Timestamp e.Timestamp = time.UnixMilli(int64(rawEv.Timestamp))
e.Id = rawEv.Id e.Id = rawEv.Id
e.Data, err = FromCanFrame(rawEv.Id, rawEv.Data) e.Data, err = FromCanFrame(rawEv.Id, rawEv.Data)
e.Name = e.Data.String()
return remain, err return remain, err
} }