Compare commits

..

7 commits

Author SHA1 Message Date
saji 456f84b5c7 remove QueryModifiers, replace with explicit
Some checks failed
Go / build (1.21) (push) Failing after 1m4s
Go / build (1.22) (push) Failing after 1m3s
2024-03-06 17:19:16 -06:00
saji daf4fe97dc added DocumentNotFound error 2024-03-06 17:09:02 -06:00
saji 5b38daf74f wip: Telem DB Document API and Tests 2024-03-06 16:42:39 -06:00
saji 7a98f52542 hack: remove faulty table from old migration 2024-03-06 15:16:13 -06:00
saji 0a6a6bb66d add openmct domain object table and skeleton 2024-03-06 15:15:56 -06:00
saji c9b73ee006 add repeated packet support using index parameter 2024-03-06 14:53:39 -06:00
saji b266a84324 fix multiple name packet filter 2024-03-06 14:53:25 -06:00
9 changed files with 293 additions and 125 deletions

196
db.go
View file

@ -130,11 +130,6 @@ func (tdb *TelemDb) AddEvents(events ...skylab.BusEvent) (int64, error) {
return tdb.AddEventsCtx(context.Background(), events...) return tdb.AddEventsCtx(context.Background(), events...)
} }
// QueryModifier augments SQL strings.
type QueryModifier interface {
ModifyStatement(*strings.Builder) error
}
// LimitOffsetModifier is a modifier to support pagniation. // LimitOffsetModifier is a modifier to support pagniation.
type LimitOffsetModifier struct { type LimitOffsetModifier struct {
Limit int Limit int
@ -147,30 +142,24 @@ func (l *LimitOffsetModifier) ModifyStatement(sb *strings.Builder) error {
return nil return nil
} }
type OrderByTimestampModifer struct {
}
func (o *OrderByTimestampModifer) ModifyStatement(sb *strings.Builder) error {
sb.WriteString(" ORDER BY ts DESC")
return nil
}
// BusEventFilter is a filter for bus events. // BusEventFilter is a filter for bus events.
type BusEventFilter struct { type BusEventFilter struct {
Names []string Names []string // The name(s) of packets to filter for
TimerangeStart time.Time StartTime time.Time // Starting time range. All packets >= StartTime
TimerangeEnd time.Time EndTime time.Time // Ending time range. All packets <= EndTime
Indexes []int // The specific index of the packets to index.
} }
// now we can optionally add a limit. // now we can optionally add a limit.
func (tdb *TelemDb) GetPackets(ctx context.Context, filter BusEventFilter, options ...QueryModifier) ([]skylab.BusEvent, error) { func (tdb *TelemDb) GetPackets(ctx context.Context, filter BusEventFilter, lim *LimitOffsetModifier) ([]skylab.BusEvent, error) {
// construct a simple // construct a simple
var whereFrags = make([]string, 0) var whereFrags = make([]string, 0)
// if we're filtering by names, add a where clause for it. // if we're filtering by names, add a where clause for it.
if len(filter.Names) > 0 { if len(filter.Names) > 0 {
names := strings.Join(filter.Names, ", ") // we have to quote our individual names
names := strings.Join(filter.Names, `", "`)
qString := fmt.Sprintf(`name IN ("%s")`, names) qString := fmt.Sprintf(`name IN ("%s")`, names)
whereFrags = append(whereFrags, qString) whereFrags = append(whereFrags, qString)
} }
@ -178,18 +167,27 @@ func (tdb *TelemDb) GetPackets(ctx context.Context, filter BusEventFilter, optio
// using BETWEEN since apparenlty that can be better? // using BETWEEN since apparenlty that can be better?
// next, check if we have a start/end time, add constraints // next, check if we have a start/end time, add constraints
if !filter.TimerangeEnd.IsZero() { if !filter.EndTime.IsZero() {
qString := fmt.Sprintf("ts <= %d", filter.TimerangeEnd.UnixMilli()) qString := fmt.Sprintf("ts <= %d", filter.EndTime.UnixMilli())
whereFrags = append(whereFrags, qString) whereFrags = append(whereFrags, qString)
} }
if !filter.TimerangeStart.IsZero() { if !filter.StartTime.IsZero() {
// we have an end range // we have an end range
qString := fmt.Sprintf("ts >= %d", filter.TimerangeStart.UnixMilli()) qString := fmt.Sprintf("ts >= %d", filter.StartTime.UnixMilli())
whereFrags = append(whereFrags, qString)
}
if len(filter.Indexes) > 0 {
s := make([]string, 0)
for _, idx := range filter.Indexes {
s = append(s, fmt.Sprint(idx))
}
idxs := strings.Join(s, ", ")
qString := fmt.Sprintf(`idx in (%s)`, idxs)
whereFrags = append(whereFrags, qString) whereFrags = append(whereFrags, qString)
} }
sb := strings.Builder{} sb := strings.Builder{}
sb.WriteString(`SELECT * from "bus_events"`) sb.WriteString(`SELECT ts, name, data from "bus_events"`)
// construct the full statement. // construct the full statement.
if len(whereFrags) > 0 { if len(whereFrags) > 0 {
// use the where clauses. // use the where clauses.
@ -197,10 +195,12 @@ func (tdb *TelemDb) GetPackets(ctx context.Context, filter BusEventFilter, optio
sb.WriteString(strings.Join(whereFrags, " AND ")) sb.WriteString(strings.Join(whereFrags, " AND "))
} }
sb.WriteString(" ORDER BY ts DESC")
// Augment our data further if there's i.e a limit modifier. // Augment our data further if there's i.e a limit modifier.
// TODO: factor this out maybe? // TODO: factor this out maybe?
for _, m := range options { if lim != nil {
m.ModifyStatement(&sb) lim.ModifyStatement(&sb)
} }
rows, err := tdb.db.QueryxContext(ctx, sb.String()) rows, err := tdb.db.QueryxContext(ctx, sb.String())
if err != nil { if err != nil {
@ -246,37 +246,45 @@ type Datum struct {
// GetValues queries the database for values in a given time range. // GetValues queries the database for values in a given time range.
// A value is a specific data point. For example, bms_measurement.current // A value is a specific data point. For example, bms_measurement.current
// would be a value. // would be a value.
func (tdb *TelemDb) GetValues(ctx context.Context, bef BusEventFilter, func (tdb *TelemDb) GetValues(ctx context.Context, filter BusEventFilter,
field string, opts ...QueryModifier) ([]Datum, error) { field string, lim *LimitOffsetModifier) ([]Datum, error) {
// this fragment uses json_extract from sqlite to get a single // this fragment uses json_extract from sqlite to get a single
// nested value. // nested value.
sb := strings.Builder{} sb := strings.Builder{}
sb.WriteString(`SELECT ts as timestamp, json_extract(data, '$.' || ?) as val FROM bus_events WHERE `) sb.WriteString(`SELECT ts as timestamp, json_extract(data, '$.' || ?) as val FROM bus_events WHERE `)
if len(bef.Names) != 1 { if len(filter.Names) != 1 {
return nil, errors.New("invalid number of names") return nil, errors.New("invalid number of names")
} }
whereFrags := []string{"name is ?"}
qStrings := []string{"name is ?"} if !filter.StartTime.IsZero() {
// add timestamp limit. qString := fmt.Sprintf("ts >= %d", filter.StartTime.UnixMilli())
if !bef.TimerangeStart.IsZero() { whereFrags = append(whereFrags, qString)
qString := fmt.Sprintf("ts >= %d", bef.TimerangeStart.UnixMilli())
qStrings = append(qStrings, qString)
} }
if !bef.TimerangeEnd.IsZero() { if !filter.EndTime.IsZero() {
qString := fmt.Sprintf("ts <= %d", bef.TimerangeEnd.UnixMilli()) qString := fmt.Sprintf("ts <= %d", filter.EndTime.UnixMilli())
qStrings = append(qStrings, qString) whereFrags = append(whereFrags, qString)
}
if len(filter.Indexes) > 0 {
s := make([]string, 0)
for _, idx := range filter.Indexes {
s = append(s, fmt.Sprint(idx))
}
idxs := strings.Join(s, ", ")
qString := fmt.Sprintf(`idx in (%s)`, idxs)
whereFrags = append(whereFrags, qString)
} }
// join qstrings with AND // join qstrings with AND
sb.WriteString(strings.Join(qStrings, " AND ")) sb.WriteString(strings.Join(whereFrags, " AND "))
for _, m := range opts { sb.WriteString(" ORDER BY ts DESC")
if m == nil {
continue if lim != nil {
} lim.ModifyStatement(&sb)
m.ModifyStatement(&sb)
} }
rows, err := tdb.db.QueryxContext(ctx, sb.String(), field, bef.Names[0])
rows, err := tdb.db.QueryxContext(ctx, sb.String(), field, filter.Names[0])
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -299,40 +307,92 @@ func (tdb *TelemDb) GetValues(ctx context.Context, bef BusEventFilter,
return data, nil return data, nil
} }
// PacketDef is a database packet model // AddDocument inserts a new document to the store if it is unique and valid.
type PacketDef struct { func (tdb *TelemDb) AddDocument(ctx context.Context, obj json.RawMessage) error {
Name string const insertStmt = `INSERT INTO openmct_objects (data) VALUES (json(?))`
Description string _, err := tdb.db.ExecContext(ctx, insertStmt, obj)
Id int return err
} }
type FieldDef struct { // DocumentNotFoundError is when the underlying document cannot be found.
Name string type DocumentNotFoundError string
SubName string
Packet string func (e DocumentNotFoundError) Error() string {
Type string return fmt.Sprintf("document could not find key: %s", string(e))
} }
// PacketNotFoundError is when a matching packet cannot be found.
type PacketNotFoundError string
func (e *PacketNotFoundError) Error() string { // UpdateDocument replaces the entire contents of a document matching
return "packet not found: " + string(*e) // the given key. Note that the key is derived from the document,
// and no checks are done to ensure that the new key is the same.
func (tdb *TelemDb) UpdateDocument(ctx context.Context, key string,
obj json.RawMessage) error {
const upd = `UPDATE openmct_objects SET data = json(?) WHERE key IS ?`
r, err := tdb.db.ExecContext(ctx, upd, obj, key)
if err != nil {
return err
}
n, err := r.RowsAffected()
if err != nil {
return err
}
if n != 1 {
return DocumentNotFoundError(key)
}
return err
} }
// GetPacketDefN retrieves a packet matching the given name, if it exists.
// returns PacketNotFoundError if a matching packet could not be found. // GetDocument gets the document matching the corresponding key.
func (tdb *TelemDb) GetPacketDefN(name string) (*PacketDef, error) { func (tdb *TelemDb) GetDocument(ctx context.Context, key string) (json.RawMessage, error) {
return nil, nil const get = `SELECT data FROM openmct_objects WHERE key IS ?`
row := tdb.db.QueryRowxContext(ctx, get, key)
var res []byte // VERY important, json.RawMessage won't work here
// since the scan function does not look at underlying types.
row.Scan(&res)
if len(res) == 0 {
return nil, DocumentNotFoundError(key)
}
return res, nil
} }
// GetPacketDefF retrieves the parent packet for a given field. // GetAllDocuments returns all documents in the database.
// This function cannot return PacketNotFoundError since we have SQL FKs enforcing. func (tdb *TelemDb) GetAllDocuments(ctx context.Context) ([]json.RawMessage, error) {
func (tdb *TelemDb) GetPacketDefF(field FieldDef) (*PacketDef, error) { const getall = `SELECT data FROM openmct_objects`;
return nil, nil
rows, err := tdb.db.QueryxContext(ctx, getall)
defer rows.Close()
if err != nil {
return nil, err
}
docs := make([]json.RawMessage, 0)
for rows.Next() {
var j json.RawMessage
rows.Scan(&j)
docs = append(docs, j)
}
return docs, nil
} }
// GetFieldDefs returns the given fields for a given packet definition. // DeleteDocument removes a document from the store, or errors
func (tdb *TelemDb) GetFieldDefs(pkt PacketDef) ([]FieldDef, error) { // if it does not exist.
return nil, nil func (tdb *TelemDb) DeleteDocument(ctx context.Context, key string) error {
const del = `DELETE FROM openmct_objects WHERE key IS ?`
res, err := tdb.db.ExecContext(ctx, del, key)
if err != nil {
return err
}
n, err := res.RowsAffected()
if err != nil {
return err
}
if n != 1 {
return DocumentNotFoundError(key)
}
return err
} }

View file

@ -3,7 +3,11 @@ package gotelem
import ( import (
"bufio" "bufio"
"context" "context"
"encoding/json"
"errors"
"fmt" "fmt"
"math/rand"
"reflect"
"strings" "strings"
"testing" "testing"
"time" "time"
@ -85,7 +89,6 @@ func MakeMockDatabase(name string) *TelemDb {
func TestTelemDb(t *testing.T) { func TestTelemDb(t *testing.T) {
t.Run("test opening database", func(t *testing.T) { t.Run("test opening database", func(t *testing.T) {
// create our mock // create our mock
tdb := MakeMockDatabase(t.Name()) tdb := MakeMockDatabase(t.Name())
@ -151,6 +154,131 @@ func TestTelemDb(t *testing.T) {
}) })
t.Run("test read-write packet", func(t *testing.T) { t.Run("test read-write packet", func(t *testing.T) {
}) })
} }
func MockDocument(key string) json.RawMessage {
var v = make(map[string]interface{})
v["identifier"] = map[string]string{"key": key}
v["randomdata"] = rand.Int()
res, err := json.Marshal(v)
if err != nil {
panic(err)
}
return res
}
func TestDbDocuments(t *testing.T) {
t.Run("test inserting a document", func(t *testing.T) {
tdb := MakeMockDatabase(t.Name())
tdb.db.Ping()
ctx := context.Background()
err := tdb.AddDocument(ctx, MockDocument("hi"))
if err != nil {
t.Fatalf("AddDocument expected no error, got err=%v", err)
}
})
t.Run("test inserting duplicate documents", func(t *testing.T) {
tdb := MakeMockDatabase(t.Name())
tdb.db.Ping()
ctx := context.Background()
doc := MockDocument("hi")
err := tdb.AddDocument(ctx, doc)
if err != nil {
t.Fatalf("AddDocument expected no error, got err=%v", err)
}
err = tdb.AddDocument(ctx, doc)
if err == nil {
t.Fatalf("AddDocument expected duplicate key error, got nil")
}
})
t.Run("test inserting bad document", func(t *testing.T) {
tdb := MakeMockDatabase(t.Name())
tdb.db.Ping()
ctx := context.Background()
var badDoc = map[string]string{"bad":"duh"}
msg, err := json.Marshal(badDoc)
if err != nil {
panic(err)
}
err = tdb.AddDocument(ctx, msg)
if err == nil {
t.Fatalf("AddDocument expected error, got nil")
}
})
t.Run("test getting document", func(t *testing.T) {
tdb := MakeMockDatabase(t.Name())
tdb.db.Ping()
ctx := context.Background()
doc := MockDocument("hi")
err := tdb.AddDocument(ctx, doc)
if err != nil {
t.Fatalf("AddDocument expected no error, got err=%v", err)
}
res, err := tdb.GetDocument(ctx, "hi")
if err != nil {
t.Fatalf("GetDocument expected no error, got err=%v", err)
}
if !reflect.DeepEqual(res, doc) {
t.Fatalf("GetDocument did not return identical document")
}
})
t.Run("test getting nonexistent document", func(t *testing.T) {
tdb := MakeMockDatabase(t.Name())
tdb.db.Ping()
ctx := context.Background()
res, err := tdb.GetDocument(ctx, "hi")
if err == nil || !errors.Is(err, DocumentNotFoundError("hi")){
t.Fatalf("GetDocument expected DocumentNotFoundError, got %v", err)
}
if res != nil {
t.Fatalf("GetDocument expected nil result, got %v", res)
}
})
t.Run("test update document", func(t *testing.T) {
tdb := MakeMockDatabase(t.Name())
tdb.db.Ping()
ctx := context.Background()
doc1 := MockDocument("hi")
doc2 := MockDocument("hi") // same key, we want to update.
tdb.AddDocument(ctx, doc1)
err := tdb.UpdateDocument(ctx, "hi", doc2)
if err != nil {
t.Fatalf("UpdateDocument expected no error, got err=%v", err)
}
// compare.
res, _ := tdb.GetDocument(ctx, "hi")
if !reflect.DeepEqual(res, doc2) {
t.Fatalf("UpdateDocument did not return new doc, got %s", res)
}
})
t.Run("test update nonexistent document", func(t *testing.T) {
tdb := MakeMockDatabase(t.Name())
tdb.db.Ping()
ctx := context.Background()
doc := MockDocument("hi")
err := tdb.UpdateDocument(ctx, "badKey", doc)
if err == nil {
t.Fatalf("UpdateDocument expected error, got nil")
}
})
}

70
http.go
View file

@ -31,7 +31,7 @@ func extractBusEventFilter(r *http.Request) (*BusEventFilter, error) {
if err != nil { if err != nil {
return bef, err return bef, err
} }
bef.TimerangeStart = t bef.StartTime = t
} }
if el := v.Get("end"); el != "" { if el := v.Get("end"); el != "" {
// parse the start time query. // parse the start time query.
@ -39,7 +39,15 @@ func extractBusEventFilter(r *http.Request) (*BusEventFilter, error) {
if err != nil { if err != nil {
return bef, err return bef, err
} }
bef.TimerangeEnd = t bef.EndTime = t
}
bef.Indexes = make([]int, 0)
for _, strIdx := range v["idx"] {
idx, err := strconv.ParseInt(strIdx, 10, 32)
if err != nil {
return nil, err
}
bef.Indexes = append(bef.Indexes, int(idx))
} }
return bef, nil return bef, nil
} }
@ -132,16 +140,21 @@ func apiV1(broker *Broker, tdb *TelemDb) chi.Router {
}) })
// records are driving segments/runs. // OpenMCT domain object storage. Basically an arbitrary JSON document store
r.Route("/records", func(r chi.Router) {
r.Get("/", apiV1GetRecords(tdb)) // get all runs
r.Get("/active", apiV1GetActiveRecord(tdb)) // get current run (no end time)
r.Post("/", apiV1StartRecord(tdb)) // create a new run (with note). Ends active run if any, and creates new active run (no end time)
r.Get("/{id}", apiV1GetRecord(tdb)) // get details on a specific run
r.Put("/{id}", apiV1UpdateRecord(tdb)) // update a specific run. Can only be used to add notes/metadata, and not to change time/id.
r.Route("/openmct", func(r chi.Router) {
// key is a column on our json store, it's nested under identifier.key
r.Get("/{key}", func(w http.ResponseWriter, r *http.Request) {})
r.Put("/{key}", func(w http.ResponseWriter, r *http.Request) {})
r.Delete("/{key}", func(w http.ResponseWriter, r *http.Request) {})
// create a new object.
r.Post("/", func(w http.ResponseWriter, r *http.Request) {})
// subscribe to object updates.
r.Get("/subscribe", func(w http.ResponseWriter, r *http.Request) {})
}) })
// records are driving segments/runs.
r.Get("/stats", func(w http.ResponseWriter, r *http.Request) { r.Get("/stats", func(w http.ResponseWriter, r *http.Request) {
}) // v1 api stats (calls, clients, xbee connected, meta health ok) }) // v1 api stats (calls, clients, xbee connected, meta health ok)
@ -215,16 +228,13 @@ func apiV1GetPackets(tdb *TelemDb) http.HandlerFunc {
lim, err := extractLimitModifier(r) lim, err := extractLimitModifier(r)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
fmt.Print(lim)
return return
} }
// TODO: is the following check needed? // TODO: is the following check needed?
var res []skylab.BusEvent var res []skylab.BusEvent
if lim != nil { res, err = tdb.GetPackets(r.Context(), *bef, lim)
res, err = tdb.GetPackets(r.Context(), *bef, lim)
} else {
res, err = tdb.GetPackets(r.Context(), *bef)
}
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
@ -264,15 +274,10 @@ func apiV1GetValues(db *TelemDb) http.HandlerFunc {
// override the bus event filter name option // override the bus event filter name option
bef.Names = []string{name} bef.Names = []string{name}
var order = &OrderByTimestampModifer{}
var res []Datum var res []Datum
// make the call, skip the limit modifier if it's nil. // make the call, skip the limit modifier if it's nil.
if lim == nil { res, err = db.GetValues(r.Context(), *bef, field, lim)
res, err = db.GetValues(r.Context(), *bef, field, order)
} else {
res, err = db.GetValues(r.Context(), *bef, field, lim, order)
}
if err != nil { if err != nil {
// 500 server error: // 500 server error:
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
@ -287,28 +292,3 @@ func apiV1GetValues(db *TelemDb) http.HandlerFunc {
} }
} }
// TODO: rename. record is not a clear name. Runs? drives? segments?
func apiV1GetRecords(db *TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
}
}
func apiV1GetActiveRecord(db *TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
}
}
func apiV1StartRecord(db *TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {}
}
func apiV1GetRecord(db *TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {}
}
func apiV1UpdateRecord(db *TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {}
}

View file

@ -1,2 +1 @@
DROP TABLE "drive_records"; DROP TABLE "drive_records";
DROP TABLE "position_logs";

View file

@ -7,12 +7,3 @@ CREATE TABLE "drive_records" (
PRIMARY KEY("id" AUTOINCREMENT), PRIMARY KEY("id" AUTOINCREMENT),
CONSTRAINT "duration_valid" CHECK(end_time is null or start_time < end_time) CONSTRAINT "duration_valid" CHECK(end_time is null or start_time < end_time)
); );
CREATE TABLE "position_logs" (
"ts" INTEGER NOT NULL,
"source" TEXT NOT NULL,
"lat" REAL NOT NULL,
"lon" REAL NOT NULL,
"elevation" REAL,
CONSTRAINT "no_empty_source" CHECK(source is not "")
);

View file

@ -0,0 +1 @@
ALTER TABLE "bus_events" DROP COLUMN idx;

View file

@ -0,0 +1 @@
ALTER TABLE "bus_events" ADD COLUMN idx GENERATED ALWAYS AS (json_extract(data, '$.idx')) VIRTUAL;

View file

@ -0,0 +1,2 @@
DROP TABLE openmct_objects;
DROP INDEX openmct_key;

View file

@ -0,0 +1,6 @@
CREATE TABLE openmct_objects (
data TEXT,
key TEXT GENERATED ALWAYS AS (json_extract(data, '$.identifier.key')) VIRTUAL UNIQUE NOT NULL
);
-- fast key-lookup
CREATE INDEX openmct_key on openmct_objects(key);