From 699cfb5e3dd4a6b0eecfbff598b7999cccfba078 Mon Sep 17 00:00:00 2001 From: saji Date: Fri, 30 Jun 2023 11:51:06 -0500 Subject: [PATCH] big update --- cmd/gotelem/cli/server.go | 82 +++++++++++++++++++++++----- cmd/gotelem/cli/socketcan.go | 9 ++- http.go | 18 +++--- db.go => internal/db/db.go | 2 +- db_test.go => internal/db/db_test.go | 2 +- internal/middleware/recoverer2.go | 3 + internal/middleware/slogger.go | 8 +-- skylab/skylab.go | 2 + socketcan/socketcan.go | 1 + 9 files changed, 96 insertions(+), 31 deletions(-) rename db.go => internal/db/db.go (99%) rename db_test.go => internal/db/db_test.go (99%) create mode 100644 internal/middleware/recoverer2.go diff --git a/cmd/gotelem/cli/server.go b/cmd/gotelem/cli/server.go index 4e4963d..fd7eba2 100644 --- a/cmd/gotelem/cli/server.go +++ b/cmd/gotelem/cli/server.go @@ -3,6 +3,7 @@ package cli import ( "encoding/json" "fmt" + "io" "net" "net/http" "os" @@ -10,6 +11,7 @@ import ( "time" "github.com/kschamplin/gotelem" + "github.com/kschamplin/gotelem/internal/db" "github.com/kschamplin/gotelem/xbee" "github.com/urfave/cli/v2" "golang.org/x/exp/slog" @@ -22,11 +24,16 @@ var serveFlags = []cli.Flag{ Usage: "The XBee to connect to. Leave blank to not use XBee", EnvVars: []string{"XBEE_DEVICE"}, }, - &cli.StringFlag{ - Name: "logfile", - Aliases: []string{"l"}, - Value: "log.txt", - Usage: "file to store log to", + &cli.PathFlag{ + Name: "logfile", + Aliases: []string{"l"}, + DefaultText: "log.txt", + Usage: "file to store log to", + }, + &cli.PathFlag{ + Name: "db", + Value: "gotelem.db", + Usage: "database to serve", }, } @@ -44,10 +51,16 @@ var serveCmd = &cli.Command{ type service interface { fmt.Stringer - Start(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) (err error) + Start(cCtx *cli.Context, deps svcDeps) (err error) Status() } +type svcDeps struct { + Broker *gotelem.Broker + Db *db.TelemDb + Logger *slog.Logger +} + // this variable stores all the hanlders. It has some basic ones, but also // can be extended on certain platforms (see cli/socketcan.go) // or if certain features are present (see cli/sqlite.go) @@ -60,19 +73,52 @@ var serveThings = []service{ func serve(cCtx *cli.Context) error { // TODO: output both to stderr and a file. - logger := slog.New(slog.NewTextHandler(os.Stderr)) + var output io.Writer = os.Stderr + + if cCtx.IsSet("logfile") { + // open the file. + p := cCtx.Path("logfile") + f, err := os.OpenFile(p, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + output = io.MultiWriter(os.Stderr, f) + } + // create a new logger + logger := slog.New(slog.NewTextHandler(output)) slog.SetDefault(logger) + broker := gotelem.NewBroker(3, logger.WithGroup("broker")) + // open database + dbPath := "file::memory:?cache=shared" + if cCtx.IsSet("db") { + dbPath = cCtx.Path("db") + } + db, err := db.OpenTelemDb(dbPath) + if err != nil { + return err + } + wg := sync.WaitGroup{} + + deps := svcDeps{ + Logger: logger, + Broker: broker, + Db: db, + } + for _, svc := range serveThings { logger.Info("starting service", "svc", svc.String()) wg.Add(1) go func(mySvc service, baseLogger *slog.Logger) { svcLogger := logger.With("svc", mySvc.String()) + s := deps + s.Logger = svcLogger defer wg.Done() - err := mySvc.Start(cCtx, broker, svcLogger) + // TODO: recover + err := mySvc.Start(cCtx, s) if err != nil { logger.Error("service stopped!", "err", err, "svc", mySvc.String()) } @@ -93,7 +139,9 @@ func (r *rpcService) String() string { return "rpcService" } -func (r *rpcService) Start(ctx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) error { +func (r *rpcService) Start(ctx *cli.Context, deps svcDeps) error { + logger := deps.Logger + broker := deps.Broker // TODO: extract port/ip from cli context. ln, err := net.Listen("tcp", "0.0.0.0:8082") if err != nil { @@ -153,7 +201,9 @@ func (c *canLoggerService) String() string { func (c *canLoggerService) Status() { } -func (c *canLoggerService) Start(cCtx *cli.Context, broker *gotelem.Broker, l *slog.Logger) (err error) { +func (c *canLoggerService) Start(cCtx *cli.Context, deps svcDeps) (err error) { + broker := deps.Broker + l := deps.Logger rxCh, err := broker.Subscribe("canDump") if err != nil { return err @@ -196,7 +246,9 @@ func (x *xBeeService) String() string { func (x *xBeeService) Status() { } -func (x *xBeeService) Start(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) (err error) { +func (x *xBeeService) Start(cCtx *cli.Context, deps svcDeps) (err error) { + logger := deps.Logger + broker := deps.Broker if cCtx.String("xbee") == "" { logger.Info("not using xbee") return @@ -247,9 +299,13 @@ func (h *httpService) Status() { } -func (h *httpService) Start(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) (err error) { +func (h *httpService) Start(cCtx *cli.Context, deps svcDeps) (err error) { - r := gotelem.TelemRouter(logger) + logger := deps.Logger + broker := deps.Broker + db := deps.Db + + r := gotelem.TelemRouter(logger, broker, db) http.ListenAndServe(":8080", r) return diff --git a/cmd/gotelem/cli/socketcan.go b/cmd/gotelem/cli/socketcan.go index 39a9e84..c16cdbd 100644 --- a/cmd/gotelem/cli/socketcan.go +++ b/cmd/gotelem/cli/socketcan.go @@ -54,14 +54,17 @@ func (s *socketCANService) String() string { } // Start starts the socketCAN service - emitting packets sent from the broker. -func (s *socketCANService) Start(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) (err error) { - // vcan0 demo +func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) { + + logger := deps.Logger + broker := deps.Broker if cCtx.String("can") == "" { logger.Info("no can device provided") return } + // vcan demo system - make fake packets. if strings.HasPrefix(cCtx.String("can"), "v") { go vcanTest(cCtx.String("can")) } @@ -160,7 +163,7 @@ func vcanTest(devname string) { Id: 0.2, } - id, data, err := skylab.ToCanFrame(&testPkt) + id, data, _ := skylab.ToCanFrame(&testPkt) testFrame := gotelem.Frame{ Id: id, Data: data, diff --git a/http.go b/http.go index fd9717c..fd9ac7a 100644 --- a/http.go +++ b/http.go @@ -10,6 +10,7 @@ import ( "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/google/uuid" + "github.com/kschamplin/gotelem/internal/db" "github.com/kschamplin/gotelem/skylab" "golang.org/x/exp/slog" "nhooyr.io/websocket" @@ -19,7 +20,7 @@ type slogHttpLogger struct { slog.Logger } -func TelemRouter(log *slog.Logger, broker *Broker, db *TelemDb) http.Handler { +func TelemRouter(log *slog.Logger, broker *Broker, db *db.TelemDb) http.Handler { r := chi.NewRouter() r.Use(middleware.RequestID) @@ -50,7 +51,7 @@ func TelemRouter(log *slog.Logger, broker *Broker, db *TelemDb) http.Handler { } // define API version 1 routes. -func apiV1(broker *Broker, db *TelemDb) chi.Router { +func apiV1(broker *Broker, db *db.TelemDb) chi.Router { r := chi.NewRouter() // this API only accepts JSON. r.Use(middleware.AllowContentType("application/json")) @@ -110,7 +111,7 @@ type apiV1Subscriber struct { idFilter []uint32 // list of Ids to subscribe to. If it's empty, subscribes to all. } -func apiV1PacketSubscribe(broker *Broker, db *TelemDb) http.HandlerFunc { +func apiV1PacketSubscribe(broker *Broker, db *db.TelemDb) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { conn_id := r.RemoteAddr + uuid.New().String() sub, err := broker.Subscribe(conn_id) @@ -122,7 +123,6 @@ func apiV1PacketSubscribe(broker *Broker, db *TelemDb) http.HandlerFunc { defer broker.Unsubscribe(conn_id) // attempt to upgrade. c, err := websocket.Accept(w, r, nil) - c.Ping(r.Context()) if err != nil { // TODO: is this the correct option? w.WriteHeader(http.StatusInternalServerError) @@ -158,26 +158,26 @@ func apiV1PacketSubscribe(broker *Broker, db *TelemDb) http.HandlerFunc { } // TODO: rename. record is not a clear name. Runs? drives? segments? -func apiV1GetRecords(db *TelemDb) http.HandlerFunc { +func apiV1GetRecords(db *db.TelemDb) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { } } -func apiV1GetActiveRecord(db *TelemDb) http.HandlerFunc { +func apiV1GetActiveRecord(db *db.TelemDb) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { } } -func apiV1StartRecord(db *TelemDb) http.HandlerFunc { +func apiV1StartRecord(db *db.TelemDb) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) {} } -func apiV1GetRecord(db *TelemDb) http.HandlerFunc { +func apiV1GetRecord(db *db.TelemDb) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) {} } -func apiV1UpdateRecord(db *TelemDb) http.HandlerFunc { +func apiV1UpdateRecord(db *db.TelemDb) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) {} } diff --git a/db.go b/internal/db/db.go similarity index 99% rename from db.go rename to internal/db/db.go index 640e98e..3c4e1bb 100644 --- a/db.go +++ b/internal/db/db.go @@ -1,4 +1,4 @@ -package gotelem +package db // this file implements the database functions to load/store/read from a sql database. diff --git a/db_test.go b/internal/db/db_test.go similarity index 99% rename from db_test.go rename to internal/db/db_test.go index be7ece3..038cfee 100644 --- a/db_test.go +++ b/internal/db/db_test.go @@ -1,4 +1,4 @@ -package gotelem +package db import ( "reflect" diff --git a/internal/middleware/recoverer2.go b/internal/middleware/recoverer2.go new file mode 100644 index 0000000..9aab6da --- /dev/null +++ b/internal/middleware/recoverer2.go @@ -0,0 +1,3 @@ +package middleware + +// Recoverer2 is a reimplementation of recoverer but using slog as the backend. diff --git a/internal/middleware/slogger.go b/internal/middleware/slogger.go index 0f90af2..fa25340 100644 --- a/internal/middleware/slogger.go +++ b/internal/middleware/slogger.go @@ -1,6 +1,7 @@ package middleware import ( + "context" "net/http" "time" @@ -39,7 +40,7 @@ func Slogger(sl *slog.Logger) func(next http.Handler) http.Handler { }() // embed the logger and the attrs for later items in the chain. - + r = r.WithContext(context.WithValue(r.Context(), SloggerAttrsKey, attrs)) next.ServeHTTP(ww, r) } @@ -54,13 +55,12 @@ const ( SloggerAttrsKey ) -func addSlogAttr(r *http.Request, attr slog.Attr) { +func AddSlogAttr(r *http.Request, attr slog.Attr) { ctx := r.Context() attrs, ok := ctx.Value(SloggerAttrsKey).([]slog.Attr) if !ok { return } attrs = append(attrs, attr) - - + } diff --git a/skylab/skylab.go b/skylab/skylab.go index a76b4ea..3bfde31 100644 --- a/skylab/skylab.go +++ b/skylab/skylab.go @@ -1,3 +1,5 @@ +// Package skylab provides CAN packet encoding and decoding information based off +// of skylab.yaml. It can convert packets to/from CAN raw bytes and JSON objects. package skylab import ( diff --git a/socketcan/socketcan.go b/socketcan/socketcan.go index 25217d3..1048756 100644 --- a/socketcan/socketcan.go +++ b/socketcan/socketcan.go @@ -23,6 +23,7 @@ type CanSocket struct { fd int } +// CanFilter is a filter for an interface. type CanFilter interface { Inverted() bool Mask() uint32