migrate to jbroker

This commit is contained in:
saji 2023-05-26 16:49:34 -05:00
parent 6e14bb2951
commit 40f70acd14
3 changed files with 64 additions and 50 deletions

View file

@ -1,7 +1,6 @@
package cli
import (
"fmt"
"log"
"os"
@ -21,8 +20,6 @@ func Execute() {
Commands: subCmds,
}
fmt.Println(serveFlags)
if err := app.Run(os.Args); err != nil {
log.Fatal(err)
}

View file

@ -1,7 +1,6 @@
package cli
import (
"encoding/binary"
"encoding/json"
"fmt"
"net"
@ -56,6 +55,7 @@ type service interface {
var serveThings = []service{
&XBeeService{},
&CanLoggerService{},
&rpcService{},
}
@ -71,50 +71,38 @@ func serve(cCtx *cli.Context) error {
slog.SetDefault(logger)
broker := gotelem.NewBroker(3, logger.WithGroup("broker"))
done := make(chan struct{})
// start the can listener
wg := sync.WaitGroup{}
for _, svc := range serveThings {
svcLogger := deriveLogger(logger, svc)
logger.Info("starting service", "svc", svc.String())
go func(mySvc service) {
wg.Add(1)
wg.Add(1)
go func(mySvc service, baseLogger *slog.Logger) {
svcLogger := logger.With("svc", mySvc.String())
defer wg.Done()
err := mySvc.Start(cCtx, broker, svcLogger)
if err != nil {
logger.Error("service stopped!", "err", err, "svc", mySvc.String())
}
}(svc)
}(svc, logger)
}
wg.Wait()
// tcp listener server.
ln, err := net.Listen("tcp", ":8082")
if err != nil {
fmt.Printf("Error listening: %v\n", err)
}
logger.Info("TCP listener started", "addr", ln.Addr().String())
for {
conn, err := ln.Accept()
if err != nil {
fmt.Printf("error accepting: %v\n", err)
}
go handleCon(conn, broker, logger.WithGroup("tcp"), done)
}
return nil
}
type rpcService struct {
}
func tcpSvc(ctx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) error {
func (r *rpcService) Status() {
}
func (r *rpcService) String() string {
return "rpcService"
}
func (r *rpcService) Start(ctx *cli.Context, broker *gotelem.JBroker, logger *slog.Logger) error {
// TODO: extract port/ip from cli context.
ln, err := net.Listen("tcp", ":8082")
if err != nil {
@ -130,30 +118,30 @@ func tcpSvc(ctx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) error
}
}
func handleCon(conn net.Conn, broker *gotelem.Broker, l *slog.Logger, done <-chan struct{}) {
func handleCon(conn net.Conn, broker *gotelem.JBroker, l *slog.Logger, done <-chan struct{}) {
// reader := msgp.NewReader(conn)
subname := fmt.Sprint("tcp", conn.RemoteAddr().String())
l.Info("started handling", "name", subname)
rxCh := broker.Subscribe(subname)
defer broker.Unsubscribe(subname)
defer conn.Close()
rxCh, err := broker.Subscribe(subname)
if err != nil {
l.Error("error subscribing to connection", "err", err)
return
}
defer broker.Unsubscribe(subname)
jEncode := json.NewEncoder(conn)
for {
select {
case msg := <-rxCh:
l.Info("got packet")
// FIXME: poorly optimized
buf := make([]byte, 0)
buf = binary.BigEndian.AppendUint32(buf, msg.Id)
buf = append(buf, msg.Data...)
_, err := conn.Write(buf)
err := jEncode.Encode(msg)
if err != nil {
l.Error("error writing tcp packet", "err", err)
return
l.Warn("error encoding json", "err", err)
}
case <-done:
return
@ -165,7 +153,6 @@ func handleCon(conn net.Conn, broker *gotelem.Broker, l *slog.Logger, done <-cha
// this spins up a new can socket on vcan0 and broadcasts a packet every second. for testing.
type CanLoggerService struct {
cw gotelem.CanWriter
}
func (c *CanLoggerService) String() string {

View file

@ -7,6 +7,7 @@ import (
"time"
"github.com/kschamplin/gotelem"
"github.com/kschamplin/gotelem/skylab"
"github.com/kschamplin/gotelem/socketcan"
"github.com/urfave/cli/v2"
"golang.org/x/exp/slog"
@ -53,10 +54,11 @@ func (s *socketCANService) String() string {
return s.name
}
func (s *socketCANService) Start(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) (err error) {
func (s *socketCANService) Start(cCtx *cli.Context, broker *gotelem.JBroker, logger *slog.Logger) (err error) {
// vcan0 demo
if cCtx.String("can") == "" {
logger.Info("no can device provided")
return
}
@ -73,7 +75,10 @@ func (s *socketCANService) Start(cCtx *cli.Context, broker *gotelem.Broker, logg
s.name = sock.Name()
// connect to the broker
rxCh := broker.Subscribe("socketCAN")
rxCh, err := broker.Subscribe("socketCAN")
if err != nil {
return err
}
defer broker.Unsubscribe("socketCAN")
@ -90,12 +95,30 @@ func (s *socketCANService) Start(cCtx *cli.Context, broker *gotelem.Broker, logg
}
}()
var frame gotelem.Frame
for {
select {
case msg := <-rxCh:
sock.Send(&msg)
id, d, _ := skylab.CanSend(msg.Data)
frame.Id = id
frame.Data = d
sock.Send(&frame)
case msg := <-rxCan:
broker.Publish("socketCAN", msg)
p, err := skylab.FromCanFrame(msg.Id, msg.Data)
if err != nil {
logger.Warn("error parsing can packet", "id", msg.Id)
continue
}
cde := gotelem.CANDumpEntry{
Timestamp: float64(time.Now().Unix()),
Id: uint64(msg.Id),
Data: p,
}
broker.Publish("socketCAN", cde)
case <-cCtx.Done():
return
}
@ -133,15 +156,22 @@ func vcanTest(devname string) {
slog.Error("error opening socket", "err", err)
return
}
testFrame := &gotelem.Frame{
Id: 0x234,
Kind: gotelem.CanSFFFrame,
Data: []byte{0, 1, 2, 3, 4, 5, 6, 7},
testPkt := skylab.WslMotorCurrentVector{
Iq: 0.1,
Id: 0.2,
}
for {
id, data, err := skylab.CanSend(&testPkt)
testFrame := gotelem.Frame{
Id: id,
Data: data,
Kind: gotelem.CanSFFFrame,
}
for {
slog.Info("sending test packet")
sock.Send(testFrame)
sock.Send(&testFrame)
time.Sleep(1 * time.Second)
}
}