From 00fa67a67d587e77f6cf9355f23ed53c448ff9af Mon Sep 17 00:00:00 2001 From: saji Date: Sat, 2 Mar 2024 21:23:35 -0600 Subject: [PATCH] migrate websocket api to buseventfilter --- internal/api/http.go | 30 +++++++++++++++++++----------- internal/db/db.go | 8 ++++---- internal/db/db_test.go | 3 ++- internal/db/packets.go | 40 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 65 insertions(+), 16 deletions(-) diff --git a/internal/api/http.go b/internal/api/http.go index 7cc8b91..f5d21ba 100644 --- a/internal/api/http.go +++ b/internal/api/http.go @@ -91,14 +91,16 @@ func apiV1(broker *gotelem.Broker, tdb *db.TelemDb) chi.Router { return r } -// apiV1Subscriber is a websocket session for the v1 api. -type apiV1Subscriber struct { - nameFilter []string // names of packets we care about. -} // this is a websocket stream. func apiV1PacketSubscribe(broker *gotelem.Broker, db *db.TelemDb) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + // pull filter from url query params. + bef, err := extractBusEventFilter(r) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + // setup connection conn_id := r.RemoteAddr + uuid.New().String() sub, err := broker.Subscribe(conn_id) if err != nil { @@ -107,29 +109,35 @@ func apiV1PacketSubscribe(broker *gotelem.Broker, db *db.TelemDb) http.HandlerFu return } defer broker.Unsubscribe(conn_id) + + // setup websocket c, err := websocket.Accept(w, r, nil) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } c.Ping(r.Context()) + // closeread handles protocol/status messages, + // also handles clients closing the connection. + // we get a context to use from it. + ctx := c.CloseRead(r.Context()) - // TODO: use K/V with session token? - sess := &apiV1Subscriber{} + for { select { - case <-r.Context().Done(): + case <-ctx.Done(): return case msgIn := <-sub: - if len(sess.nameFilter) == 0 { - // send it. + // short circuit if there's no names - send everything + if len(bef.Names) == 0 { wsjson.Write(r.Context(), c, msgIn) } - for _, name := range sess.nameFilter { + // otherwise, send it if it matches one of our names. + for _, name := range bef.Names { if name == msgIn.Name { // send it - wsjson.Write(r.Context(), c, msgIn) + wsjson.Write(ctx, c, msgIn) break } } diff --git a/internal/db/db.go b/internal/db/db.go index 27f47a9..cf60081 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -25,7 +25,7 @@ type TelemDbOption func(*TelemDb) error // this function is internal use. It actually opens the database, but uses // a raw path string instead of formatting one like the exported functions. -func openRawDb(rawpath string, options ...TelemDbOption) (tdb *TelemDb, err error) { +func OpenRawDb(rawpath string, options ...TelemDbOption) (tdb *TelemDb, err error) { tdb = &TelemDb{} tdb.db, err = sqlx.Connect("sqlite3", rawpath) if err != nil { @@ -55,12 +55,12 @@ func openRawDb(rawpath string, options ...TelemDbOption) (tdb *TelemDb, err erro // this string is used to open the read-write db. // the extra options improve performance significantly. -const rwDbPathFmt = "file:%s?_journal_mode=wal&mode=rwc&_txlock=immediate&_timeout=10000" +const ProductionDbURI = "file:%s?_journal_mode=wal&mode=rwc&_txlock=immediate&_timeout=10000" // OpenTelemDb opens a new telemetry database at the given path. func OpenTelemDb(path string, options ...TelemDbOption) (*TelemDb, error) { - dbStr := fmt.Sprintf(rwDbPathFmt, path) - return openRawDb(dbStr, options...) + dbStr := fmt.Sprintf(ProductionDbURI, path) + return OpenRawDb(dbStr, options...) } func (tdb *TelemDb) GetVersion() (int, error) { diff --git a/internal/db/db_test.go b/internal/db/db_test.go index 6e018ec..35d9520 100644 --- a/internal/db/db_test.go +++ b/internal/db/db_test.go @@ -60,7 +60,7 @@ const exampleData = `1698013005.164 1455ED8FDBDFF4FC3BD // MakeMockDatabase creates a new dummy database. func MakeMockDatabase(name string) *TelemDb { fstring := fmt.Sprintf("file:%s?mode=memory&cache=shared", name) - tdb, err := openRawDb(fstring) + tdb, err := OpenRawDb(fstring) if err != nil { panic(err) } @@ -147,6 +147,7 @@ func TestTelemDb(t *testing.T) { if len(pkt) != 1 { t.Fatalf("expected exactly one response, got %d", len(pkt)) } + // todo - validate what this should be. }) t.Run("test read-write packet", func(t *testing.T) { diff --git a/internal/db/packets.go b/internal/db/packets.go index 3e17c0f..0dc8440 100644 --- a/internal/db/packets.go +++ b/internal/db/packets.go @@ -1,3 +1,43 @@ package db // This file implements Packet modelling, which allows us to look up fields by name +type PacketDef struct { + Name string + Description string + Id int +} + +type FieldDef struct { + Name string + SubName string + Packet string + Type string +} + +// PacketNotFoundError is when a matching packet cannot be found. +type PacketNotFoundError string + +func (e *PacketNotFoundError) Error() string { + return "packet not found: " + string(*e) +} + + +// GetPacketDefN retrieves a packet matching the given name, if it exists. +// returns PacketNotFoundError if a matching packet could not be found. +func (tdb *TelemDb) GetPacketDefN(name string) (*PacketDef, error) { + return nil, nil +} + +// GetPacketDefF retrieves the parent packet for a given field. +// This function cannot return PacketNotFoundError since we have SQL FKs enforcing. +func (tdb *TelemDb) GetPacketDefF(field FieldDef) (*PacketDef, error) { + return nil, nil +} + + +// GetFieldDefs returns the given fields for a given packet definition. +func (tdb *TelemDb) GetFieldDefs(pkt PacketDef) ([]FieldDef, error) { + return nil, nil +} + +