From 0e8904ebecd7bc0ef99c07858fe2041990529438 Mon Sep 17 00:00:00 2001 From: saji Date: Tue, 27 Jun 2023 18:22:24 -0500 Subject: [PATCH] wip: http subscribe endpoint --- db.go | 5 ++-- go.mod | 1 + go.sum | 2 ++ http.go | 80 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 83 insertions(+), 5 deletions(-) diff --git a/db.go b/db.go index 1dfc43b..50af08d 100644 --- a/db.go +++ b/db.go @@ -49,6 +49,7 @@ CREATE TABLE IF NOT EXISTS "bus_events" ( "ts" REAL NOT NULL, -- timestamp "id" INTEGER NOT NULL, -- can ID "name" TEXT NOT NULL, -- name of base packet + "index" INTEGER, -- index of the repeated packet (base_id = id - index) "packet" TEXT NOT NULL CHECK(json_valid(packet)) -- JSON object describing the data ); @@ -74,7 +75,7 @@ DROP INDEX "times"; // sql expression to insert a bus event into the packets database.1 const sqlInsertEvent = ` -INSERT INTO "bus_events" (time, can_id, name, packet) VALUES ($1, $2, $3, json($4)); +INSERT INTO "bus_events" (time, can_id, name, index, packet) VALUES ($1, $2, $3, json($4)); ` // AddEvent adds the bus event to the database. @@ -157,7 +158,7 @@ type EventsQuery struct { Names []QueryNameString - Limit uint // asdf + Limit uint // max number of results. } // GetEvents is the mechanism to request underlying event data. diff --git a/go.mod b/go.mod index 70007cd..f36a42e 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.20 require ( github.com/go-chi/chi/v5 v5.0.8 + github.com/google/uuid v1.3.0 github.com/jmoiron/sqlx v1.3.5 github.com/mattn/go-sqlite3 v1.14.16 github.com/tinylib/msgp v1.1.8 diff --git a/go.sum b/go.sum index 12e2f96..1981130 100644 --- a/go.sum +++ b/go.sum @@ -32,6 +32,8 @@ github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgj github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g= diff --git a/http.go b/http.go index d7150c8..d8302d0 100644 --- a/http.go +++ b/http.go @@ -3,20 +3,24 @@ package gotelem // this file defines the HTTP handlers and routes. import ( + "encoding/json" + "fmt" "net/http" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" + "github.com/google/uuid" "github.com/kschamplin/gotelem/skylab" "golang.org/x/exp/slog" "nhooyr.io/websocket" ) + type slogHttpLogger struct { slog.Logger } -func TelemRouter(log *slog.Logger, broker *JBroker) http.Handler { +func TelemRouter(log *slog.Logger, broker *JBroker, db *TelemDb) http.Handler { r := chi.NewRouter() r.Use(middleware.Logger) // TODO: integrate with slog @@ -34,7 +38,7 @@ func TelemRouter(log *slog.Logger, broker *JBroker) http.Handler { w.Write([]byte("pong")) }) - r.Mount("/api/v1", apiV1(broker)) + r.Mount("/api/v1", apiV1(broker, db)) // To future residents - you can add new API calls/systems in /api/v2 // Don't break anything in api v1! keep legacy code working! @@ -45,7 +49,7 @@ func TelemRouter(log *slog.Logger, broker *JBroker) http.Handler { } // define API version 1 routes. -func apiV1(broker *JBroker) chi.Router { +func apiV1(broker *JBroker, db *TelemDb) chi.Router { r := chi.NewRouter() r.Get("/schema", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") @@ -62,6 +66,76 @@ func apiV1(broker *JBroker) chi.Router { }) + r.Route("/packets", func(r chi.Router) { + r.Get("/subscribe", apiV1PacketSubscribe(broker, db)) + r.Post("/", func(w http.ResponseWriter, r *http.Request) { + var pkgs []skylab.BusEvent + decoder := json.NewDecoder(r.Body) + if err := decoder.Decode(&pkgs); err != nil{ + w.WriteHeader(http.StatusTeapot) + return + } + // we have a list of packets now. let's commit them. + db.AddEvents(pkgs...) + return + }) + }) + + + return r } + +// apiV1Subscriber is a websocket session for the v1 api. +type apiV1Subscriber struct { + idFilter []uint64 // list of Ids to subscribe to. If it's empty, subscribes to all. +} + +func apiV1PacketSubscribe(broker *JBroker, db *TelemDb) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + conn_id := r.RemoteAddr + uuid.New().String() + sub, err := broker.Subscribe(conn_id) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "error subscribing: %s", err) + return + } + defer broker.Unsubscribe(conn_id) + // attempt to upgrade. + c, err := websocket.Accept(w, r, nil) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "error ws handshake: %s", err) + return + } + + sess := &apiV1Subscriber{} + + for { + select { + case <- r.Context().Done(): + return + case msgIn := <-sub: + if len(sess.idFilter) == 0 { + // send it. + goto escapeFilter + } + for _, id := range sess.idFilter { + if id == msgIn.Id { + // send it + } + } + escapeFilter: + return + + } + + + } + + + + } +} +