From 40f70acd14192313d25e3a9ed16390a2b00493b7 Mon Sep 17 00:00:00 2001 From: saji <9110284+kschamplin@users.noreply.github.com> Date: Fri, 26 May 2023 16:49:34 -0500 Subject: [PATCH] migrate to jbroker --- cmd/gotelem/cli/root.go | 3 -- cmd/gotelem/cli/server.go | 61 ++++++++++++++---------------------- cmd/gotelem/cli/socketcan.go | 50 +++++++++++++++++++++++------ 3 files changed, 64 insertions(+), 50 deletions(-) diff --git a/cmd/gotelem/cli/root.go b/cmd/gotelem/cli/root.go index 02f30d3..973e74a 100644 --- a/cmd/gotelem/cli/root.go +++ b/cmd/gotelem/cli/root.go @@ -1,7 +1,6 @@ package cli import ( - "fmt" "log" "os" @@ -21,8 +20,6 @@ func Execute() { Commands: subCmds, } - fmt.Println(serveFlags) - if err := app.Run(os.Args); err != nil { log.Fatal(err) } diff --git a/cmd/gotelem/cli/server.go b/cmd/gotelem/cli/server.go index 9b55e9f..9e2142f 100644 --- a/cmd/gotelem/cli/server.go +++ b/cmd/gotelem/cli/server.go @@ -1,7 +1,6 @@ package cli import ( - "encoding/binary" "encoding/json" "fmt" "net" @@ -56,6 +55,7 @@ type service interface { var serveThings = []service{ &XBeeService{}, &CanLoggerService{}, + &rpcService{}, } @@ -71,50 +71,38 @@ func serve(cCtx *cli.Context) error { slog.SetDefault(logger) broker := gotelem.NewBroker(3, logger.WithGroup("broker")) - done := make(chan struct{}) - // start the can listener - - - wg := sync.WaitGroup{} for _, svc := range serveThings { - svcLogger := deriveLogger(logger, svc) logger.Info("starting service", "svc", svc.String()) - go func(mySvc service) { - wg.Add(1) + wg.Add(1) + go func(mySvc service, baseLogger *slog.Logger) { + svcLogger := logger.With("svc", mySvc.String()) defer wg.Done() err := mySvc.Start(cCtx, broker, svcLogger) if err != nil { logger.Error("service stopped!", "err", err, "svc", mySvc.String()) } - }(svc) + }(svc, logger) } wg.Wait() - // tcp listener server. - ln, err := net.Listen("tcp", ":8082") - if err != nil { - fmt.Printf("Error listening: %v\n", err) - } - logger.Info("TCP listener started", "addr", ln.Addr().String()) - - for { - conn, err := ln.Accept() - if err != nil { - fmt.Printf("error accepting: %v\n", err) - } - go handleCon(conn, broker, logger.WithGroup("tcp"), done) - } + return nil } type rpcService struct { } -func tcpSvc(ctx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) error { +func (r *rpcService) Status() { +} +func (r *rpcService) String() string { + return "rpcService" +} + +func (r *rpcService) Start(ctx *cli.Context, broker *gotelem.JBroker, logger *slog.Logger) error { // TODO: extract port/ip from cli context. ln, err := net.Listen("tcp", ":8082") if err != nil { @@ -130,30 +118,30 @@ func tcpSvc(ctx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) error } } -func handleCon(conn net.Conn, broker *gotelem.Broker, l *slog.Logger, done <-chan struct{}) { +func handleCon(conn net.Conn, broker *gotelem.JBroker, l *slog.Logger, done <-chan struct{}) { // reader := msgp.NewReader(conn) subname := fmt.Sprint("tcp", conn.RemoteAddr().String()) l.Info("started handling", "name", subname) - - rxCh := broker.Subscribe(subname) - defer broker.Unsubscribe(subname) defer conn.Close() + rxCh, err := broker.Subscribe(subname) + if err != nil { + l.Error("error subscribing to connection", "err", err) + return + } + defer broker.Unsubscribe(subname) + + jEncode := json.NewEncoder(conn) for { select { case msg := <-rxCh: l.Info("got packet") // FIXME: poorly optimized - buf := make([]byte, 0) - buf = binary.BigEndian.AppendUint32(buf, msg.Id) - buf = append(buf, msg.Data...) - - _, err := conn.Write(buf) + err := jEncode.Encode(msg) if err != nil { - l.Error("error writing tcp packet", "err", err) - return + l.Warn("error encoding json", "err", err) } case <-done: return @@ -165,7 +153,6 @@ func handleCon(conn net.Conn, broker *gotelem.Broker, l *slog.Logger, done <-cha // this spins up a new can socket on vcan0 and broadcasts a packet every second. for testing. type CanLoggerService struct { - cw gotelem.CanWriter } func (c *CanLoggerService) String() string { diff --git a/cmd/gotelem/cli/socketcan.go b/cmd/gotelem/cli/socketcan.go index ffdfb25..e88988f 100644 --- a/cmd/gotelem/cli/socketcan.go +++ b/cmd/gotelem/cli/socketcan.go @@ -7,6 +7,7 @@ import ( "time" "github.com/kschamplin/gotelem" + "github.com/kschamplin/gotelem/skylab" "github.com/kschamplin/gotelem/socketcan" "github.com/urfave/cli/v2" "golang.org/x/exp/slog" @@ -53,10 +54,11 @@ func (s *socketCANService) String() string { return s.name } -func (s *socketCANService) Start(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) (err error) { +func (s *socketCANService) Start(cCtx *cli.Context, broker *gotelem.JBroker, logger *slog.Logger) (err error) { // vcan0 demo if cCtx.String("can") == "" { + logger.Info("no can device provided") return } @@ -73,7 +75,10 @@ func (s *socketCANService) Start(cCtx *cli.Context, broker *gotelem.Broker, logg s.name = sock.Name() // connect to the broker - rxCh := broker.Subscribe("socketCAN") + rxCh, err := broker.Subscribe("socketCAN") + if err != nil { + return err + } defer broker.Unsubscribe("socketCAN") @@ -90,12 +95,30 @@ func (s *socketCANService) Start(cCtx *cli.Context, broker *gotelem.Broker, logg } }() + var frame gotelem.Frame for { select { case msg := <-rxCh: - sock.Send(&msg) + + id, d, _ := skylab.CanSend(msg.Data) + + frame.Id = id + frame.Data = d + + sock.Send(&frame) + case msg := <-rxCan: - broker.Publish("socketCAN", msg) + p, err := skylab.FromCanFrame(msg.Id, msg.Data) + if err != nil { + logger.Warn("error parsing can packet", "id", msg.Id) + continue + } + cde := gotelem.CANDumpEntry{ + Timestamp: float64(time.Now().Unix()), + Id: uint64(msg.Id), + Data: p, + } + broker.Publish("socketCAN", cde) case <-cCtx.Done(): return } @@ -133,15 +156,22 @@ func vcanTest(devname string) { slog.Error("error opening socket", "err", err) return } - testFrame := &gotelem.Frame{ - Id: 0x234, - Kind: gotelem.CanSFFFrame, - Data: []byte{0, 1, 2, 3, 4, 5, 6, 7}, + testPkt := skylab.WslMotorCurrentVector{ + Iq: 0.1, + Id: 0.2, } - for { + id, data, err := skylab.CanSend(&testPkt) + testFrame := gotelem.Frame{ + Id: id, + Data: data, + Kind: gotelem.CanSFFFrame, + } + + + for { slog.Info("sending test packet") - sock.Send(testFrame) + sock.Send(&testFrame) time.Sleep(1 * time.Second) } }