diff --git a/db.go b/db.go index 50af08d..14ba60c 100644 --- a/db.go +++ b/db.go @@ -62,6 +62,15 @@ CREATE INDEX IF NOT EXISTS "times" ON "bus_events" ( "ts" DESC ); +-- this table shows when we started/stopped logging. +CREATE TABLE "bus_records" ( + "id" INTEGER NOT NULL UNIQUE, + "start_time" INTEGER NOT NULL, + "end_time" INTEGER, + "note" TEXT, + PRIMARY KEY("id" AUTOINCREMENT), + CONSTRAINT "duration_valid" CHECK(end_time is null or start_time < end_time) +); ` // sql sequence to tear down the database. @@ -71,6 +80,8 @@ const sqlDbDown = ` DROP TABLE "bus_events"; DROP INDEX "ids_timestamped"; DROP INDEX "times"; + +DROP TABLE "bus_records"; ` // sql expression to insert a bus event into the packets database.1 diff --git a/http.go b/http.go index d8302d0..cf2610e 100644 --- a/http.go +++ b/http.go @@ -15,7 +15,6 @@ import ( "nhooyr.io/websocket" ) - type slogHttpLogger struct { slog.Logger } @@ -23,6 +22,8 @@ type slogHttpLogger struct { func TelemRouter(log *slog.Logger, broker *JBroker, db *TelemDb) http.Handler { r := chi.NewRouter() + r.Use(middleware.RequestID) + r.Use(middleware.RealIP) r.Use(middleware.Logger) // TODO: integrate with slog r.Use(middleware.Recoverer) @@ -58,35 +59,48 @@ func apiV1(broker *JBroker, db *TelemDb) chi.Router { w.Write([]byte(skylab.SkylabDefinitions)) }) - r.Get("/ws", func(w http.ResponseWriter, r *http.Request) { - c, err := websocket.Accept(w, r, nil) - if err != nil { - return - } - - }) - r.Route("/packets", func(r chi.Router) { r.Get("/subscribe", apiV1PacketSubscribe(broker, db)) r.Post("/", func(w http.ResponseWriter, r *http.Request) { var pkgs []skylab.BusEvent decoder := json.NewDecoder(r.Body) - if err := decoder.Decode(&pkgs); err != nil{ + if err := decoder.Decode(&pkgs); err != nil { w.WriteHeader(http.StatusTeapot) return } // we have a list of packets now. let's commit them. db.AddEvents(pkgs...) - return + return }) + r.Get("/", func(w http.ResponseWriter, r *http.Request) { + // this should use query params to return a list of packets. + + }) + + // this is to get packets by a name. + r.Get("/{name:[a-z_]+}", func(w http.ResponseWriter, r *http.Request) { + + }) + }) + // records are driving segments/runs. + r.Route("/records", func(r chi.Router) { + r.Get("/") // get all runs + r.Get("/active") // get current run (no end time) + r.Post("/") // create a new run (with note). Ends active run if any, and creates new active run (no end time) + r.Get("/{id}") // get details on a specific run + r.Put("/{id}") // update a specific run. Can only be used to add notes/metadata, and not to change time/id. + }) + + r.Get("/stats") // v1 api stats (calls, clients, xbee connected, meta health ok) + + r. return r } - // apiV1Subscriber is a websocket session for the v1 api. type apiV1Subscriber struct { idFilter []uint64 // list of Ids to subscribe to. If it's empty, subscribes to all. @@ -105,16 +119,18 @@ func apiV1PacketSubscribe(broker *JBroker, db *TelemDb) http.HandlerFunc { // attempt to upgrade. c, err := websocket.Accept(w, r, nil) if err != nil { + // TODO: is this the correct option? w.WriteHeader(http.StatusInternalServerError) fmt.Fprintf(w, "error ws handshake: %s", err) return } + // TODO: use K/V with session token? sess := &apiV1Subscriber{} for { select { - case <- r.Context().Done(): + case <-r.Context().Done(): return case msgIn := <-sub: if len(sess.idFilter) == 0 { @@ -126,16 +142,12 @@ func apiV1PacketSubscribe(broker *JBroker, db *TelemDb) http.HandlerFunc { // send it } } - escapeFilter: + escapeFilter: return } - } - - } } - diff --git a/internal/badger/db.go b/internal/badger/db.go new file mode 100644 index 0000000..eb423d4 --- /dev/null +++ b/internal/badger/db.go @@ -0,0 +1,3 @@ +package badger + +// this file has a global internal K/V database used for sessions/stats/??? diff --git a/internal/middleware/slogger.go b/internal/middleware/slogger.go new file mode 100644 index 0000000..0f90af2 --- /dev/null +++ b/internal/middleware/slogger.go @@ -0,0 +1,66 @@ +package middleware + +import ( + "net/http" + "time" + + chi_middleware "github.com/go-chi/chi/v5/middleware" + "golang.org/x/exp/slog" +) + +// Slogger is a slog-enabled logging middleware. +// It logs the start and end of the request, and logs info +// about the request itself, response status, and response time. + +// Slogger returns a log handler that uses the given slog logger as the base. +func Slogger(sl *slog.Logger) func(next http.Handler) http.Handler { + + logger := sl.WithGroup("http") + return func(next http.Handler) http.Handler { + + // this triple-nested function is strange, but basically the Slogger() call makes a new middleware function (above) + // the middleware function returns a handler that calls the next handler in the chain(wrapping it) + + fn := func(w http.ResponseWriter, r *http.Request) { + // wrap writer allows us to get info on the response from further handlers. + ww := chi_middleware.NewWrapResponseWriter(w, r.ProtoMajor) + t1 := time.Now() + // attrs is stored to allow for the helpers to add additional elements to the main record. + attrs := make([]slog.Attr, 0) + + // This function runs at the end and adds all the response details to the attrs before logging them. + defer func() { + attrs = append(attrs, slog.Int("status_code", ww.Status())) + attrs = append(attrs, slog.Int("resp_size", ww.BytesWritten())) + attrs = append(attrs, slog.Duration("duration", time.Since(t1))) + attrs = append(attrs, slog.String("method", r.Method)) + logger.LogAttrs(r.Context(), slog.LevelInfo, r.RequestURI, attrs...) + + }() + + // embed the logger and the attrs for later items in the chain. + + next.ServeHTTP(ww, r) + } + + return http.HandlerFunc(fn) + } +} + +type slogKeyType int + +const ( + SloggerLogKey slogKeyType = iota + SloggerAttrsKey +) + +func addSlogAttr(r *http.Request, attr slog.Attr) { + ctx := r.Context() + attrs, ok := ctx.Value(SloggerAttrsKey).([]slog.Attr) + if !ok { + return + } + attrs = append(attrs, attr) + + +} diff --git a/mprpc/rpc.go b/mprpc/rpc.go index 49546ad..4a2249d 100644 --- a/mprpc/rpc.go +++ b/mprpc/rpc.go @@ -67,7 +67,7 @@ type ServiceFunc func(params msgp.Raw) (res msgp.Raw, err error) // "server" aka listener, and client. type RPCConn struct { // TODO: use io.readwritecloser? - rwc io.ReadWriteCloser + rwc io.ReadWriteCloser handlers map[string]ServiceFunc ct rpcConnTrack @@ -75,23 +75,21 @@ type RPCConn struct { logger slog.Logger } - // creates a new RPC connection on top of an io.ReadWriteCloser. Can be // pre-seeded with handlers. func NewRPC(rwc io.ReadWriteCloser, logger *slog.Logger, initialHandlers map[string]ServiceFunc) (rpc *RPCConn, err error) { rpc = &RPCConn{ - rwc: rwc, + rwc: rwc, handlers: make(map[string]ServiceFunc), - ct: NewRPCConnTrack(), + ct: NewRPCConnTrack(), } if initialHandlers != nil { - for k,v := range initialHandlers { + for k, v := range initialHandlers { rpc.handlers[k] = v } } - return } @@ -141,7 +139,6 @@ func (rpc *RPCConn) RegisterHandler(name string, fn ServiceFunc) error { return nil } - // Removes a handler, if it exists. Never errors. No-op if the name // is not a registered handler. func (rpc *RPCConn) RemoveHandler(name string) error { @@ -199,9 +196,6 @@ func (rpc *RPCConn) Serve() { } } - - - // INTERNAL functions for rpcConn // dispatch is an internal method used to execute a Request sent by the remote:w @@ -238,6 +232,7 @@ func (rpc *RPCConn) dispatchNotif(req Notification) { // Next, we define some helper generic functions that can be used to make // implementing a msg wrapper easier. +// msgpackObject is anything that has implemented all the msgpack interfaces. type msgpackObject interface { msgp.Decodable msgp.Encodable @@ -248,7 +243,7 @@ type msgpackObject interface { // MakeService is a generic wrapper function. It takes a function with the signature // of func(T msgpObject)(R msgpObject, error) where T and R can be *concrete* types. // and returns a new function that handles conversion to/from msgp.Raw. -// the function returned can be used by the RPCConn as a handler function. +// The function returned can be used by the RPCConn as a handler function. // This function can typically have it's paramters inferred. func MakeService[T, R msgpackObject](fn func(T) (R, error)) ServiceFunc { return func(p msgp.Raw) (msgp.Raw, error) { @@ -339,9 +334,11 @@ func MakeBoundCaller[T, R msgpackObject](rpc *RPCConn, method string) func(T) (R } } -func MakeNotifier[T msgpackObject]() func(string, T, *RPCConn) { - return func(method string, param T, rpc *RPCConn) { - rawParam, _ := param.MarshalMsg([]byte{}) +// MakeNotifier creates a new notification function that notifies the remote +func MakeNotifier[T msgpackObject](method string) func(T, *RPCConn) error { + return func(param T, rpc *RPCConn) error { + rawParam, err := param.MarshalMsg([]byte{}) rpc.Notify(method, rawParam) + return err } }