big update

This commit is contained in:
saji 2023-06-30 11:51:06 -05:00
parent 88a170825c
commit 699cfb5e3d
9 changed files with 96 additions and 31 deletions

View file

@ -3,6 +3,7 @@ package cli
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"net" "net"
"net/http" "net/http"
"os" "os"
@ -10,6 +11,7 @@ import (
"time" "time"
"github.com/kschamplin/gotelem" "github.com/kschamplin/gotelem"
"github.com/kschamplin/gotelem/internal/db"
"github.com/kschamplin/gotelem/xbee" "github.com/kschamplin/gotelem/xbee"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"golang.org/x/exp/slog" "golang.org/x/exp/slog"
@ -22,12 +24,17 @@ var serveFlags = []cli.Flag{
Usage: "The XBee to connect to. Leave blank to not use XBee", Usage: "The XBee to connect to. Leave blank to not use XBee",
EnvVars: []string{"XBEE_DEVICE"}, EnvVars: []string{"XBEE_DEVICE"},
}, },
&cli.StringFlag{ &cli.PathFlag{
Name: "logfile", Name: "logfile",
Aliases: []string{"l"}, Aliases: []string{"l"},
Value: "log.txt", DefaultText: "log.txt",
Usage: "file to store log to", Usage: "file to store log to",
}, },
&cli.PathFlag{
Name: "db",
Value: "gotelem.db",
Usage: "database to serve",
},
} }
var serveCmd = &cli.Command{ var serveCmd = &cli.Command{
@ -44,10 +51,16 @@ var serveCmd = &cli.Command{
type service interface { type service interface {
fmt.Stringer fmt.Stringer
Start(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) (err error) Start(cCtx *cli.Context, deps svcDeps) (err error)
Status() Status()
} }
type svcDeps struct {
Broker *gotelem.Broker
Db *db.TelemDb
Logger *slog.Logger
}
// this variable stores all the hanlders. It has some basic ones, but also // this variable stores all the hanlders. It has some basic ones, but also
// can be extended on certain platforms (see cli/socketcan.go) // can be extended on certain platforms (see cli/socketcan.go)
// or if certain features are present (see cli/sqlite.go) // or if certain features are present (see cli/sqlite.go)
@ -60,19 +73,52 @@ var serveThings = []service{
func serve(cCtx *cli.Context) error { func serve(cCtx *cli.Context) error {
// TODO: output both to stderr and a file. // TODO: output both to stderr and a file.
logger := slog.New(slog.NewTextHandler(os.Stderr)) var output io.Writer = os.Stderr
if cCtx.IsSet("logfile") {
// open the file.
p := cCtx.Path("logfile")
f, err := os.OpenFile(p, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
output = io.MultiWriter(os.Stderr, f)
}
// create a new logger
logger := slog.New(slog.NewTextHandler(output))
slog.SetDefault(logger) slog.SetDefault(logger)
broker := gotelem.NewBroker(3, logger.WithGroup("broker")) broker := gotelem.NewBroker(3, logger.WithGroup("broker"))
// open database
dbPath := "file::memory:?cache=shared"
if cCtx.IsSet("db") {
dbPath = cCtx.Path("db")
}
db, err := db.OpenTelemDb(dbPath)
if err != nil {
return err
}
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
deps := svcDeps{
Logger: logger,
Broker: broker,
Db: db,
}
for _, svc := range serveThings { for _, svc := range serveThings {
logger.Info("starting service", "svc", svc.String()) logger.Info("starting service", "svc", svc.String())
wg.Add(1) wg.Add(1)
go func(mySvc service, baseLogger *slog.Logger) { go func(mySvc service, baseLogger *slog.Logger) {
svcLogger := logger.With("svc", mySvc.String()) svcLogger := logger.With("svc", mySvc.String())
s := deps
s.Logger = svcLogger
defer wg.Done() defer wg.Done()
err := mySvc.Start(cCtx, broker, svcLogger) // TODO: recover
err := mySvc.Start(cCtx, s)
if err != nil { if err != nil {
logger.Error("service stopped!", "err", err, "svc", mySvc.String()) logger.Error("service stopped!", "err", err, "svc", mySvc.String())
} }
@ -93,7 +139,9 @@ func (r *rpcService) String() string {
return "rpcService" return "rpcService"
} }
func (r *rpcService) Start(ctx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) error { func (r *rpcService) Start(ctx *cli.Context, deps svcDeps) error {
logger := deps.Logger
broker := deps.Broker
// TODO: extract port/ip from cli context. // TODO: extract port/ip from cli context.
ln, err := net.Listen("tcp", "0.0.0.0:8082") ln, err := net.Listen("tcp", "0.0.0.0:8082")
if err != nil { if err != nil {
@ -153,7 +201,9 @@ func (c *canLoggerService) String() string {
func (c *canLoggerService) Status() { func (c *canLoggerService) Status() {
} }
func (c *canLoggerService) Start(cCtx *cli.Context, broker *gotelem.Broker, l *slog.Logger) (err error) { func (c *canLoggerService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
broker := deps.Broker
l := deps.Logger
rxCh, err := broker.Subscribe("canDump") rxCh, err := broker.Subscribe("canDump")
if err != nil { if err != nil {
return err return err
@ -196,7 +246,9 @@ func (x *xBeeService) String() string {
func (x *xBeeService) Status() { func (x *xBeeService) Status() {
} }
func (x *xBeeService) Start(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) (err error) { func (x *xBeeService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
logger := deps.Logger
broker := deps.Broker
if cCtx.String("xbee") == "" { if cCtx.String("xbee") == "" {
logger.Info("not using xbee") logger.Info("not using xbee")
return return
@ -247,9 +299,13 @@ func (h *httpService) Status() {
} }
func (h *httpService) Start(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) (err error) { func (h *httpService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
r := gotelem.TelemRouter(logger) logger := deps.Logger
broker := deps.Broker
db := deps.Db
r := gotelem.TelemRouter(logger, broker, db)
http.ListenAndServe(":8080", r) http.ListenAndServe(":8080", r)
return return

View file

@ -54,14 +54,17 @@ func (s *socketCANService) String() string {
} }
// Start starts the socketCAN service - emitting packets sent from the broker. // Start starts the socketCAN service - emitting packets sent from the broker.
func (s *socketCANService) Start(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) (err error) { func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
// vcan0 demo
logger := deps.Logger
broker := deps.Broker
if cCtx.String("can") == "" { if cCtx.String("can") == "" {
logger.Info("no can device provided") logger.Info("no can device provided")
return return
} }
// vcan demo system - make fake packets.
if strings.HasPrefix(cCtx.String("can"), "v") { if strings.HasPrefix(cCtx.String("can"), "v") {
go vcanTest(cCtx.String("can")) go vcanTest(cCtx.String("can"))
} }
@ -160,7 +163,7 @@ func vcanTest(devname string) {
Id: 0.2, Id: 0.2,
} }
id, data, err := skylab.ToCanFrame(&testPkt) id, data, _ := skylab.ToCanFrame(&testPkt)
testFrame := gotelem.Frame{ testFrame := gotelem.Frame{
Id: id, Id: id,
Data: data, Data: data,

18
http.go
View file

@ -10,6 +10,7 @@ import (
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware" "github.com/go-chi/chi/v5/middleware"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/kschamplin/gotelem/internal/db"
"github.com/kschamplin/gotelem/skylab" "github.com/kschamplin/gotelem/skylab"
"golang.org/x/exp/slog" "golang.org/x/exp/slog"
"nhooyr.io/websocket" "nhooyr.io/websocket"
@ -19,7 +20,7 @@ type slogHttpLogger struct {
slog.Logger slog.Logger
} }
func TelemRouter(log *slog.Logger, broker *Broker, db *TelemDb) http.Handler { func TelemRouter(log *slog.Logger, broker *Broker, db *db.TelemDb) http.Handler {
r := chi.NewRouter() r := chi.NewRouter()
r.Use(middleware.RequestID) r.Use(middleware.RequestID)
@ -50,7 +51,7 @@ func TelemRouter(log *slog.Logger, broker *Broker, db *TelemDb) http.Handler {
} }
// define API version 1 routes. // define API version 1 routes.
func apiV1(broker *Broker, db *TelemDb) chi.Router { func apiV1(broker *Broker, db *db.TelemDb) chi.Router {
r := chi.NewRouter() r := chi.NewRouter()
// this API only accepts JSON. // this API only accepts JSON.
r.Use(middleware.AllowContentType("application/json")) r.Use(middleware.AllowContentType("application/json"))
@ -110,7 +111,7 @@ type apiV1Subscriber struct {
idFilter []uint32 // list of Ids to subscribe to. If it's empty, subscribes to all. idFilter []uint32 // list of Ids to subscribe to. If it's empty, subscribes to all.
} }
func apiV1PacketSubscribe(broker *Broker, db *TelemDb) http.HandlerFunc { func apiV1PacketSubscribe(broker *Broker, db *db.TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
conn_id := r.RemoteAddr + uuid.New().String() conn_id := r.RemoteAddr + uuid.New().String()
sub, err := broker.Subscribe(conn_id) sub, err := broker.Subscribe(conn_id)
@ -122,7 +123,6 @@ func apiV1PacketSubscribe(broker *Broker, db *TelemDb) http.HandlerFunc {
defer broker.Unsubscribe(conn_id) defer broker.Unsubscribe(conn_id)
// attempt to upgrade. // attempt to upgrade.
c, err := websocket.Accept(w, r, nil) c, err := websocket.Accept(w, r, nil)
c.Ping(r.Context())
if err != nil { if err != nil {
// TODO: is this the correct option? // TODO: is this the correct option?
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
@ -158,26 +158,26 @@ func apiV1PacketSubscribe(broker *Broker, db *TelemDb) http.HandlerFunc {
} }
// TODO: rename. record is not a clear name. Runs? drives? segments? // TODO: rename. record is not a clear name. Runs? drives? segments?
func apiV1GetRecords(db *TelemDb) http.HandlerFunc { func apiV1GetRecords(db *db.TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
} }
} }
func apiV1GetActiveRecord(db *TelemDb) http.HandlerFunc { func apiV1GetActiveRecord(db *db.TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
} }
} }
func apiV1StartRecord(db *TelemDb) http.HandlerFunc { func apiV1StartRecord(db *db.TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {} return func(w http.ResponseWriter, r *http.Request) {}
} }
func apiV1GetRecord(db *TelemDb) http.HandlerFunc { func apiV1GetRecord(db *db.TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {} return func(w http.ResponseWriter, r *http.Request) {}
} }
func apiV1UpdateRecord(db *TelemDb) http.HandlerFunc { func apiV1UpdateRecord(db *db.TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {} return func(w http.ResponseWriter, r *http.Request) {}
} }

View file

@ -1,4 +1,4 @@
package gotelem package db
// this file implements the database functions to load/store/read from a sql database. // this file implements the database functions to load/store/read from a sql database.

View file

@ -1,4 +1,4 @@
package gotelem package db
import ( import (
"reflect" "reflect"

View file

@ -0,0 +1,3 @@
package middleware
// Recoverer2 is a reimplementation of recoverer but using slog as the backend.

View file

@ -1,6 +1,7 @@
package middleware package middleware
import ( import (
"context"
"net/http" "net/http"
"time" "time"
@ -39,7 +40,7 @@ func Slogger(sl *slog.Logger) func(next http.Handler) http.Handler {
}() }()
// embed the logger and the attrs for later items in the chain. // embed the logger and the attrs for later items in the chain.
r = r.WithContext(context.WithValue(r.Context(), SloggerAttrsKey, attrs))
next.ServeHTTP(ww, r) next.ServeHTTP(ww, r)
} }
@ -54,7 +55,7 @@ const (
SloggerAttrsKey SloggerAttrsKey
) )
func addSlogAttr(r *http.Request, attr slog.Attr) { func AddSlogAttr(r *http.Request, attr slog.Attr) {
ctx := r.Context() ctx := r.Context()
attrs, ok := ctx.Value(SloggerAttrsKey).([]slog.Attr) attrs, ok := ctx.Value(SloggerAttrsKey).([]slog.Attr)
if !ok { if !ok {
@ -62,5 +63,4 @@ func addSlogAttr(r *http.Request, attr slog.Attr) {
} }
attrs = append(attrs, attr) attrs = append(attrs, attr)
} }

View file

@ -1,3 +1,5 @@
// Package skylab provides CAN packet encoding and decoding information based off
// of skylab.yaml. It can convert packets to/from CAN raw bytes and JSON objects.
package skylab package skylab
import ( import (

View file

@ -23,6 +23,7 @@ type CanSocket struct {
fd int fd int
} }
// CanFilter is a filter for an interface.
type CanFilter interface { type CanFilter interface {
Inverted() bool Inverted() bool
Mask() uint32 Mask() uint32