This commit is contained in:
saji 2023-06-28 19:23:08 -05:00
parent 8262d42332
commit 61dbc7765c
5 changed files with 121 additions and 32 deletions

11
db.go
View file

@ -62,6 +62,15 @@ CREATE INDEX IF NOT EXISTS "times" ON "bus_events" (
"ts" DESC
);
-- this table shows when we started/stopped logging.
CREATE TABLE "bus_records" (
"id" INTEGER NOT NULL UNIQUE,
"start_time" INTEGER NOT NULL,
"end_time" INTEGER,
"note" TEXT,
PRIMARY KEY("id" AUTOINCREMENT),
CONSTRAINT "duration_valid" CHECK(end_time is null or start_time < end_time)
);
`
// sql sequence to tear down the database.
@ -71,6 +80,8 @@ const sqlDbDown = `
DROP TABLE "bus_events";
DROP INDEX "ids_timestamped";
DROP INDEX "times";
DROP TABLE "bus_records";
`
// sql expression to insert a bus event into the packets database.1

48
http.go
View file

@ -15,7 +15,6 @@ import (
"nhooyr.io/websocket"
)
type slogHttpLogger struct {
slog.Logger
}
@ -23,6 +22,8 @@ type slogHttpLogger struct {
func TelemRouter(log *slog.Logger, broker *JBroker, db *TelemDb) http.Handler {
r := chi.NewRouter()
r.Use(middleware.RequestID)
r.Use(middleware.RealIP)
r.Use(middleware.Logger) // TODO: integrate with slog
r.Use(middleware.Recoverer)
@ -58,35 +59,48 @@ func apiV1(broker *JBroker, db *TelemDb) chi.Router {
w.Write([]byte(skylab.SkylabDefinitions))
})
r.Get("/ws", func(w http.ResponseWriter, r *http.Request) {
c, err := websocket.Accept(w, r, nil)
if err != nil {
return
}
})
r.Route("/packets", func(r chi.Router) {
r.Get("/subscribe", apiV1PacketSubscribe(broker, db))
r.Post("/", func(w http.ResponseWriter, r *http.Request) {
var pkgs []skylab.BusEvent
decoder := json.NewDecoder(r.Body)
if err := decoder.Decode(&pkgs); err != nil{
if err := decoder.Decode(&pkgs); err != nil {
w.WriteHeader(http.StatusTeapot)
return
}
// we have a list of packets now. let's commit them.
db.AddEvents(pkgs...)
return
return
})
r.Get("/", func(w http.ResponseWriter, r *http.Request) {
// this should use query params to return a list of packets.
})
// this is to get packets by a name.
r.Get("/{name:[a-z_]+}", func(w http.ResponseWriter, r *http.Request) {
})
})
// records are driving segments/runs.
r.Route("/records", func(r chi.Router) {
r.Get("/") // get all runs
r.Get("/active") // get current run (no end time)
r.Post("/") // create a new run (with note). Ends active run if any, and creates new active run (no end time)
r.Get("/{id}") // get details on a specific run
r.Put("/{id}") // update a specific run. Can only be used to add notes/metadata, and not to change time/id.
})
r.Get("/stats") // v1 api stats (calls, clients, xbee connected, meta health ok)
r.
return r
}
// apiV1Subscriber is a websocket session for the v1 api.
type apiV1Subscriber struct {
idFilter []uint64 // list of Ids to subscribe to. If it's empty, subscribes to all.
@ -105,16 +119,18 @@ func apiV1PacketSubscribe(broker *JBroker, db *TelemDb) http.HandlerFunc {
// attempt to upgrade.
c, err := websocket.Accept(w, r, nil)
if err != nil {
// TODO: is this the correct option?
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "error ws handshake: %s", err)
return
}
// TODO: use K/V with session token?
sess := &apiV1Subscriber{}
for {
select {
case <- r.Context().Done():
case <-r.Context().Done():
return
case msgIn := <-sub:
if len(sess.idFilter) == 0 {
@ -126,16 +142,12 @@ func apiV1PacketSubscribe(broker *JBroker, db *TelemDb) http.HandlerFunc {
// send it
}
}
escapeFilter:
escapeFilter:
return
}
}
}
}

3
internal/badger/db.go Normal file
View file

@ -0,0 +1,3 @@
package badger
// this file has a global internal K/V database used for sessions/stats/???

View file

@ -0,0 +1,66 @@
package middleware
import (
"net/http"
"time"
chi_middleware "github.com/go-chi/chi/v5/middleware"
"golang.org/x/exp/slog"
)
// Slogger is a slog-enabled logging middleware.
// It logs the start and end of the request, and logs info
// about the request itself, response status, and response time.
// Slogger returns a log handler that uses the given slog logger as the base.
func Slogger(sl *slog.Logger) func(next http.Handler) http.Handler {
logger := sl.WithGroup("http")
return func(next http.Handler) http.Handler {
// this triple-nested function is strange, but basically the Slogger() call makes a new middleware function (above)
// the middleware function returns a handler that calls the next handler in the chain(wrapping it)
fn := func(w http.ResponseWriter, r *http.Request) {
// wrap writer allows us to get info on the response from further handlers.
ww := chi_middleware.NewWrapResponseWriter(w, r.ProtoMajor)
t1 := time.Now()
// attrs is stored to allow for the helpers to add additional elements to the main record.
attrs := make([]slog.Attr, 0)
// This function runs at the end and adds all the response details to the attrs before logging them.
defer func() {
attrs = append(attrs, slog.Int("status_code", ww.Status()))
attrs = append(attrs, slog.Int("resp_size", ww.BytesWritten()))
attrs = append(attrs, slog.Duration("duration", time.Since(t1)))
attrs = append(attrs, slog.String("method", r.Method))
logger.LogAttrs(r.Context(), slog.LevelInfo, r.RequestURI, attrs...)
}()
// embed the logger and the attrs for later items in the chain.
next.ServeHTTP(ww, r)
}
return http.HandlerFunc(fn)
}
}
type slogKeyType int
const (
SloggerLogKey slogKeyType = iota
SloggerAttrsKey
)
func addSlogAttr(r *http.Request, attr slog.Attr) {
ctx := r.Context()
attrs, ok := ctx.Value(SloggerAttrsKey).([]slog.Attr)
if !ok {
return
}
attrs = append(attrs, attr)
}

View file

@ -67,7 +67,7 @@ type ServiceFunc func(params msgp.Raw) (res msgp.Raw, err error)
// "server" aka listener, and client.
type RPCConn struct {
// TODO: use io.readwritecloser?
rwc io.ReadWriteCloser
rwc io.ReadWriteCloser
handlers map[string]ServiceFunc
ct rpcConnTrack
@ -75,23 +75,21 @@ type RPCConn struct {
logger slog.Logger
}
// creates a new RPC connection on top of an io.ReadWriteCloser. Can be
// pre-seeded with handlers.
func NewRPC(rwc io.ReadWriteCloser, logger *slog.Logger, initialHandlers map[string]ServiceFunc) (rpc *RPCConn, err error) {
rpc = &RPCConn{
rwc: rwc,
rwc: rwc,
handlers: make(map[string]ServiceFunc),
ct: NewRPCConnTrack(),
ct: NewRPCConnTrack(),
}
if initialHandlers != nil {
for k,v := range initialHandlers {
for k, v := range initialHandlers {
rpc.handlers[k] = v
}
}
return
}
@ -141,7 +139,6 @@ func (rpc *RPCConn) RegisterHandler(name string, fn ServiceFunc) error {
return nil
}
// Removes a handler, if it exists. Never errors. No-op if the name
// is not a registered handler.
func (rpc *RPCConn) RemoveHandler(name string) error {
@ -199,9 +196,6 @@ func (rpc *RPCConn) Serve() {
}
}
// INTERNAL functions for rpcConn
// dispatch is an internal method used to execute a Request sent by the remote:w
@ -238,6 +232,7 @@ func (rpc *RPCConn) dispatchNotif(req Notification) {
// Next, we define some helper generic functions that can be used to make
// implementing a msg wrapper easier.
// msgpackObject is anything that has implemented all the msgpack interfaces.
type msgpackObject interface {
msgp.Decodable
msgp.Encodable
@ -248,7 +243,7 @@ type msgpackObject interface {
// MakeService is a generic wrapper function. It takes a function with the signature
// of func(T msgpObject)(R msgpObject, error) where T and R can be *concrete* types.
// and returns a new function that handles conversion to/from msgp.Raw.
// the function returned can be used by the RPCConn as a handler function.
// The function returned can be used by the RPCConn as a handler function.
// This function can typically have it's paramters inferred.
func MakeService[T, R msgpackObject](fn func(T) (R, error)) ServiceFunc {
return func(p msgp.Raw) (msgp.Raw, error) {
@ -339,9 +334,11 @@ func MakeBoundCaller[T, R msgpackObject](rpc *RPCConn, method string) func(T) (R
}
}
func MakeNotifier[T msgpackObject]() func(string, T, *RPCConn) {
return func(method string, param T, rpc *RPCConn) {
rawParam, _ := param.MarshalMsg([]byte{})
// MakeNotifier creates a new notification function that notifies the remote
func MakeNotifier[T msgpackObject](method string) func(T, *RPCConn) error {
return func(param T, rpc *RPCConn) error {
rawParam, err := param.MarshalMsg([]byte{})
rpc.Notify(method, rawParam)
return err
}
}