migrate websocket api to buseventfilter
All checks were successful
Go / build (1.22) (push) Successful in 1m13s
Go / build (1.21) (push) Successful in 2m31s

This commit is contained in:
saji 2024-03-02 21:23:35 -06:00
parent 7b48dd0d1c
commit 00fa67a67d
4 changed files with 65 additions and 16 deletions

View file

@ -91,14 +91,16 @@ func apiV1(broker *gotelem.Broker, tdb *db.TelemDb) chi.Router {
return r return r
} }
// apiV1Subscriber is a websocket session for the v1 api.
type apiV1Subscriber struct {
nameFilter []string // names of packets we care about.
}
// this is a websocket stream. // this is a websocket stream.
func apiV1PacketSubscribe(broker *gotelem.Broker, db *db.TelemDb) http.HandlerFunc { func apiV1PacketSubscribe(broker *gotelem.Broker, db *db.TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
// pull filter from url query params.
bef, err := extractBusEventFilter(r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
// setup connection
conn_id := r.RemoteAddr + uuid.New().String() conn_id := r.RemoteAddr + uuid.New().String()
sub, err := broker.Subscribe(conn_id) sub, err := broker.Subscribe(conn_id)
if err != nil { if err != nil {
@ -107,29 +109,35 @@ func apiV1PacketSubscribe(broker *gotelem.Broker, db *db.TelemDb) http.HandlerFu
return return
} }
defer broker.Unsubscribe(conn_id) defer broker.Unsubscribe(conn_id)
// setup websocket
c, err := websocket.Accept(w, r, nil) c, err := websocket.Accept(w, r, nil)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
c.Ping(r.Context()) c.Ping(r.Context())
// closeread handles protocol/status messages,
// also handles clients closing the connection.
// we get a context to use from it.
ctx := c.CloseRead(r.Context())
// TODO: use K/V with session token?
sess := &apiV1Subscriber{}
for { for {
select { select {
case <-r.Context().Done(): case <-ctx.Done():
return return
case msgIn := <-sub: case msgIn := <-sub:
if len(sess.nameFilter) == 0 { // short circuit if there's no names - send everything
// send it. if len(bef.Names) == 0 {
wsjson.Write(r.Context(), c, msgIn) wsjson.Write(r.Context(), c, msgIn)
} }
for _, name := range sess.nameFilter { // otherwise, send it if it matches one of our names.
for _, name := range bef.Names {
if name == msgIn.Name { if name == msgIn.Name {
// send it // send it
wsjson.Write(r.Context(), c, msgIn) wsjson.Write(ctx, c, msgIn)
break break
} }
} }

View file

@ -25,7 +25,7 @@ type TelemDbOption func(*TelemDb) error
// this function is internal use. It actually opens the database, but uses // this function is internal use. It actually opens the database, but uses
// a raw path string instead of formatting one like the exported functions. // a raw path string instead of formatting one like the exported functions.
func openRawDb(rawpath string, options ...TelemDbOption) (tdb *TelemDb, err error) { func OpenRawDb(rawpath string, options ...TelemDbOption) (tdb *TelemDb, err error) {
tdb = &TelemDb{} tdb = &TelemDb{}
tdb.db, err = sqlx.Connect("sqlite3", rawpath) tdb.db, err = sqlx.Connect("sqlite3", rawpath)
if err != nil { if err != nil {
@ -55,12 +55,12 @@ func openRawDb(rawpath string, options ...TelemDbOption) (tdb *TelemDb, err erro
// this string is used to open the read-write db. // this string is used to open the read-write db.
// the extra options improve performance significantly. // the extra options improve performance significantly.
const rwDbPathFmt = "file:%s?_journal_mode=wal&mode=rwc&_txlock=immediate&_timeout=10000" const ProductionDbURI = "file:%s?_journal_mode=wal&mode=rwc&_txlock=immediate&_timeout=10000"
// OpenTelemDb opens a new telemetry database at the given path. // OpenTelemDb opens a new telemetry database at the given path.
func OpenTelemDb(path string, options ...TelemDbOption) (*TelemDb, error) { func OpenTelemDb(path string, options ...TelemDbOption) (*TelemDb, error) {
dbStr := fmt.Sprintf(rwDbPathFmt, path) dbStr := fmt.Sprintf(ProductionDbURI, path)
return openRawDb(dbStr, options...) return OpenRawDb(dbStr, options...)
} }
func (tdb *TelemDb) GetVersion() (int, error) { func (tdb *TelemDb) GetVersion() (int, error) {

View file

@ -60,7 +60,7 @@ const exampleData = `1698013005.164 1455ED8FDBDFF4FC3BD
// MakeMockDatabase creates a new dummy database. // MakeMockDatabase creates a new dummy database.
func MakeMockDatabase(name string) *TelemDb { func MakeMockDatabase(name string) *TelemDb {
fstring := fmt.Sprintf("file:%s?mode=memory&cache=shared", name) fstring := fmt.Sprintf("file:%s?mode=memory&cache=shared", name)
tdb, err := openRawDb(fstring) tdb, err := OpenRawDb(fstring)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -147,6 +147,7 @@ func TestTelemDb(t *testing.T) {
if len(pkt) != 1 { if len(pkt) != 1 {
t.Fatalf("expected exactly one response, got %d", len(pkt)) t.Fatalf("expected exactly one response, got %d", len(pkt))
} }
// todo - validate what this should be.
}) })
t.Run("test read-write packet", func(t *testing.T) { t.Run("test read-write packet", func(t *testing.T) {

View file

@ -1,3 +1,43 @@
package db package db
// This file implements Packet modelling, which allows us to look up fields by name // This file implements Packet modelling, which allows us to look up fields by name
type PacketDef struct {
Name string
Description string
Id int
}
type FieldDef struct {
Name string
SubName string
Packet string
Type string
}
// PacketNotFoundError is when a matching packet cannot be found.
type PacketNotFoundError string
func (e *PacketNotFoundError) Error() string {
return "packet not found: " + string(*e)
}
// GetPacketDefN retrieves a packet matching the given name, if it exists.
// returns PacketNotFoundError if a matching packet could not be found.
func (tdb *TelemDb) GetPacketDefN(name string) (*PacketDef, error) {
return nil, nil
}
// GetPacketDefF retrieves the parent packet for a given field.
// This function cannot return PacketNotFoundError since we have SQL FKs enforcing.
func (tdb *TelemDb) GetPacketDefF(field FieldDef) (*PacketDef, error) {
return nil, nil
}
// GetFieldDefs returns the given fields for a given packet definition.
func (tdb *TelemDb) GetFieldDefs(pkt PacketDef) ([]FieldDef, error) {
return nil, nil
}