Compare commits
No commits in common. "456f84b5c711779f04568d23b9935bc2322b1918" and "d591fa21b66bc86b030c44234695a60769b7b3e4" have entirely different histories.
456f84b5c7
...
d591fa21b6
196
db.go
196
db.go
|
@ -130,6 +130,11 @@ func (tdb *TelemDb) AddEvents(events ...skylab.BusEvent) (int64, error) {
|
||||||
return tdb.AddEventsCtx(context.Background(), events...)
|
return tdb.AddEventsCtx(context.Background(), events...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueryModifier augments SQL strings.
|
||||||
|
type QueryModifier interface {
|
||||||
|
ModifyStatement(*strings.Builder) error
|
||||||
|
}
|
||||||
|
|
||||||
// LimitOffsetModifier is a modifier to support pagniation.
|
// LimitOffsetModifier is a modifier to support pagniation.
|
||||||
type LimitOffsetModifier struct {
|
type LimitOffsetModifier struct {
|
||||||
Limit int
|
Limit int
|
||||||
|
@ -142,24 +147,30 @@ func (l *LimitOffsetModifier) ModifyStatement(sb *strings.Builder) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type OrderByTimestampModifer struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *OrderByTimestampModifer) ModifyStatement(sb *strings.Builder) error {
|
||||||
|
sb.WriteString(" ORDER BY ts DESC")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// BusEventFilter is a filter for bus events.
|
// BusEventFilter is a filter for bus events.
|
||||||
type BusEventFilter struct {
|
type BusEventFilter struct {
|
||||||
Names []string // The name(s) of packets to filter for
|
Names []string
|
||||||
StartTime time.Time // Starting time range. All packets >= StartTime
|
TimerangeStart time.Time
|
||||||
EndTime time.Time // Ending time range. All packets <= EndTime
|
TimerangeEnd time.Time
|
||||||
Indexes []int // The specific index of the packets to index.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// now we can optionally add a limit.
|
// now we can optionally add a limit.
|
||||||
|
|
||||||
func (tdb *TelemDb) GetPackets(ctx context.Context, filter BusEventFilter, lim *LimitOffsetModifier) ([]skylab.BusEvent, error) {
|
func (tdb *TelemDb) GetPackets(ctx context.Context, filter BusEventFilter, options ...QueryModifier) ([]skylab.BusEvent, error) {
|
||||||
// construct a simple
|
// construct a simple
|
||||||
var whereFrags = make([]string, 0)
|
var whereFrags = make([]string, 0)
|
||||||
|
|
||||||
// if we're filtering by names, add a where clause for it.
|
// if we're filtering by names, add a where clause for it.
|
||||||
if len(filter.Names) > 0 {
|
if len(filter.Names) > 0 {
|
||||||
// we have to quote our individual names
|
names := strings.Join(filter.Names, ", ")
|
||||||
names := strings.Join(filter.Names, `", "`)
|
|
||||||
qString := fmt.Sprintf(`name IN ("%s")`, names)
|
qString := fmt.Sprintf(`name IN ("%s")`, names)
|
||||||
whereFrags = append(whereFrags, qString)
|
whereFrags = append(whereFrags, qString)
|
||||||
}
|
}
|
||||||
|
@ -167,27 +178,18 @@ func (tdb *TelemDb) GetPackets(ctx context.Context, filter BusEventFilter, lim *
|
||||||
// using BETWEEN since apparenlty that can be better?
|
// using BETWEEN since apparenlty that can be better?
|
||||||
|
|
||||||
// next, check if we have a start/end time, add constraints
|
// next, check if we have a start/end time, add constraints
|
||||||
if !filter.EndTime.IsZero() {
|
if !filter.TimerangeEnd.IsZero() {
|
||||||
qString := fmt.Sprintf("ts <= %d", filter.EndTime.UnixMilli())
|
qString := fmt.Sprintf("ts <= %d", filter.TimerangeEnd.UnixMilli())
|
||||||
whereFrags = append(whereFrags, qString)
|
whereFrags = append(whereFrags, qString)
|
||||||
}
|
}
|
||||||
if !filter.StartTime.IsZero() {
|
if !filter.TimerangeStart.IsZero() {
|
||||||
// we have an end range
|
// we have an end range
|
||||||
qString := fmt.Sprintf("ts >= %d", filter.StartTime.UnixMilli())
|
qString := fmt.Sprintf("ts >= %d", filter.TimerangeStart.UnixMilli())
|
||||||
whereFrags = append(whereFrags, qString)
|
|
||||||
}
|
|
||||||
if len(filter.Indexes) > 0 {
|
|
||||||
s := make([]string, 0)
|
|
||||||
for _, idx := range filter.Indexes {
|
|
||||||
s = append(s, fmt.Sprint(idx))
|
|
||||||
}
|
|
||||||
idxs := strings.Join(s, ", ")
|
|
||||||
qString := fmt.Sprintf(`idx in (%s)`, idxs)
|
|
||||||
whereFrags = append(whereFrags, qString)
|
whereFrags = append(whereFrags, qString)
|
||||||
}
|
}
|
||||||
|
|
||||||
sb := strings.Builder{}
|
sb := strings.Builder{}
|
||||||
sb.WriteString(`SELECT ts, name, data from "bus_events"`)
|
sb.WriteString(`SELECT * from "bus_events"`)
|
||||||
// construct the full statement.
|
// construct the full statement.
|
||||||
if len(whereFrags) > 0 {
|
if len(whereFrags) > 0 {
|
||||||
// use the where clauses.
|
// use the where clauses.
|
||||||
|
@ -195,12 +197,10 @@ func (tdb *TelemDb) GetPackets(ctx context.Context, filter BusEventFilter, lim *
|
||||||
sb.WriteString(strings.Join(whereFrags, " AND "))
|
sb.WriteString(strings.Join(whereFrags, " AND "))
|
||||||
}
|
}
|
||||||
|
|
||||||
sb.WriteString(" ORDER BY ts DESC")
|
|
||||||
|
|
||||||
// Augment our data further if there's i.e a limit modifier.
|
// Augment our data further if there's i.e a limit modifier.
|
||||||
// TODO: factor this out maybe?
|
// TODO: factor this out maybe?
|
||||||
if lim != nil {
|
for _, m := range options {
|
||||||
lim.ModifyStatement(&sb)
|
m.ModifyStatement(&sb)
|
||||||
}
|
}
|
||||||
rows, err := tdb.db.QueryxContext(ctx, sb.String())
|
rows, err := tdb.db.QueryxContext(ctx, sb.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -246,45 +246,37 @@ type Datum struct {
|
||||||
// GetValues queries the database for values in a given time range.
|
// GetValues queries the database for values in a given time range.
|
||||||
// A value is a specific data point. For example, bms_measurement.current
|
// A value is a specific data point. For example, bms_measurement.current
|
||||||
// would be a value.
|
// would be a value.
|
||||||
func (tdb *TelemDb) GetValues(ctx context.Context, filter BusEventFilter,
|
func (tdb *TelemDb) GetValues(ctx context.Context, bef BusEventFilter,
|
||||||
field string, lim *LimitOffsetModifier) ([]Datum, error) {
|
field string, opts ...QueryModifier) ([]Datum, error) {
|
||||||
// this fragment uses json_extract from sqlite to get a single
|
// this fragment uses json_extract from sqlite to get a single
|
||||||
// nested value.
|
// nested value.
|
||||||
sb := strings.Builder{}
|
sb := strings.Builder{}
|
||||||
sb.WriteString(`SELECT ts as timestamp, json_extract(data, '$.' || ?) as val FROM bus_events WHERE `)
|
sb.WriteString(`SELECT ts as timestamp, json_extract(data, '$.' || ?) as val FROM bus_events WHERE `)
|
||||||
if len(filter.Names) != 1 {
|
if len(bef.Names) != 1 {
|
||||||
return nil, errors.New("invalid number of names")
|
return nil, errors.New("invalid number of names")
|
||||||
}
|
}
|
||||||
whereFrags := []string{"name is ?"}
|
|
||||||
|
|
||||||
if !filter.StartTime.IsZero() {
|
qStrings := []string{"name is ?"}
|
||||||
qString := fmt.Sprintf("ts >= %d", filter.StartTime.UnixMilli())
|
// add timestamp limit.
|
||||||
whereFrags = append(whereFrags, qString)
|
if !bef.TimerangeStart.IsZero() {
|
||||||
|
qString := fmt.Sprintf("ts >= %d", bef.TimerangeStart.UnixMilli())
|
||||||
|
qStrings = append(qStrings, qString)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !filter.EndTime.IsZero() {
|
if !bef.TimerangeEnd.IsZero() {
|
||||||
qString := fmt.Sprintf("ts <= %d", filter.EndTime.UnixMilli())
|
qString := fmt.Sprintf("ts <= %d", bef.TimerangeEnd.UnixMilli())
|
||||||
whereFrags = append(whereFrags, qString)
|
qStrings = append(qStrings, qString)
|
||||||
}
|
|
||||||
if len(filter.Indexes) > 0 {
|
|
||||||
s := make([]string, 0)
|
|
||||||
for _, idx := range filter.Indexes {
|
|
||||||
s = append(s, fmt.Sprint(idx))
|
|
||||||
}
|
|
||||||
idxs := strings.Join(s, ", ")
|
|
||||||
qString := fmt.Sprintf(`idx in (%s)`, idxs)
|
|
||||||
whereFrags = append(whereFrags, qString)
|
|
||||||
}
|
}
|
||||||
// join qstrings with AND
|
// join qstrings with AND
|
||||||
sb.WriteString(strings.Join(whereFrags, " AND "))
|
sb.WriteString(strings.Join(qStrings, " AND "))
|
||||||
|
|
||||||
sb.WriteString(" ORDER BY ts DESC")
|
for _, m := range opts {
|
||||||
|
if m == nil {
|
||||||
if lim != nil {
|
continue
|
||||||
lim.ModifyStatement(&sb)
|
}
|
||||||
|
m.ModifyStatement(&sb)
|
||||||
}
|
}
|
||||||
|
rows, err := tdb.db.QueryxContext(ctx, sb.String(), field, bef.Names[0])
|
||||||
rows, err := tdb.db.QueryxContext(ctx, sb.String(), field, filter.Names[0])
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -307,92 +299,40 @@ func (tdb *TelemDb) GetValues(ctx context.Context, filter BusEventFilter,
|
||||||
return data, nil
|
return data, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddDocument inserts a new document to the store if it is unique and valid.
|
// PacketDef is a database packet model
|
||||||
func (tdb *TelemDb) AddDocument(ctx context.Context, obj json.RawMessage) error {
|
type PacketDef struct {
|
||||||
const insertStmt = `INSERT INTO openmct_objects (data) VALUES (json(?))`
|
Name string
|
||||||
_, err := tdb.db.ExecContext(ctx, insertStmt, obj)
|
Description string
|
||||||
return err
|
Id int
|
||||||
}
|
}
|
||||||
|
|
||||||
// DocumentNotFoundError is when the underlying document cannot be found.
|
type FieldDef struct {
|
||||||
type DocumentNotFoundError string
|
Name string
|
||||||
|
SubName string
|
||||||
func (e DocumentNotFoundError) Error() string {
|
Packet string
|
||||||
return fmt.Sprintf("document could not find key: %s", string(e))
|
Type string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PacketNotFoundError is when a matching packet cannot be found.
|
||||||
|
type PacketNotFoundError string
|
||||||
|
|
||||||
// UpdateDocument replaces the entire contents of a document matching
|
func (e *PacketNotFoundError) Error() string {
|
||||||
// the given key. Note that the key is derived from the document,
|
return "packet not found: " + string(*e)
|
||||||
// and no checks are done to ensure that the new key is the same.
|
|
||||||
func (tdb *TelemDb) UpdateDocument(ctx context.Context, key string,
|
|
||||||
obj json.RawMessage) error {
|
|
||||||
|
|
||||||
const upd = `UPDATE openmct_objects SET data = json(?) WHERE key IS ?`
|
|
||||||
r, err := tdb.db.ExecContext(ctx, upd, obj, key)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
n, err := r.RowsAffected()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if n != 1 {
|
|
||||||
return DocumentNotFoundError(key)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetPacketDefN retrieves a packet matching the given name, if it exists.
|
||||||
// GetDocument gets the document matching the corresponding key.
|
// returns PacketNotFoundError if a matching packet could not be found.
|
||||||
func (tdb *TelemDb) GetDocument(ctx context.Context, key string) (json.RawMessage, error) {
|
func (tdb *TelemDb) GetPacketDefN(name string) (*PacketDef, error) {
|
||||||
const get = `SELECT data FROM openmct_objects WHERE key IS ?`
|
return nil, nil
|
||||||
|
|
||||||
row := tdb.db.QueryRowxContext(ctx, get, key)
|
|
||||||
|
|
||||||
var res []byte // VERY important, json.RawMessage won't work here
|
|
||||||
// since the scan function does not look at underlying types.
|
|
||||||
row.Scan(&res)
|
|
||||||
|
|
||||||
if len(res) == 0 {
|
|
||||||
return nil, DocumentNotFoundError(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
return res, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetAllDocuments returns all documents in the database.
|
// GetPacketDefF retrieves the parent packet for a given field.
|
||||||
func (tdb *TelemDb) GetAllDocuments(ctx context.Context) ([]json.RawMessage, error) {
|
// This function cannot return PacketNotFoundError since we have SQL FKs enforcing.
|
||||||
const getall = `SELECT data FROM openmct_objects`;
|
func (tdb *TelemDb) GetPacketDefF(field FieldDef) (*PacketDef, error) {
|
||||||
|
return nil, nil
|
||||||
rows, err := tdb.db.QueryxContext(ctx, getall)
|
|
||||||
defer rows.Close()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
docs := make([]json.RawMessage, 0)
|
|
||||||
for rows.Next() {
|
|
||||||
var j json.RawMessage
|
|
||||||
rows.Scan(&j)
|
|
||||||
docs = append(docs, j)
|
|
||||||
}
|
|
||||||
return docs, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteDocument removes a document from the store, or errors
|
// GetFieldDefs returns the given fields for a given packet definition.
|
||||||
// if it does not exist.
|
func (tdb *TelemDb) GetFieldDefs(pkt PacketDef) ([]FieldDef, error) {
|
||||||
func (tdb *TelemDb) DeleteDocument(ctx context.Context, key string) error {
|
return nil, nil
|
||||||
const del = `DELETE FROM openmct_objects WHERE key IS ?`
|
|
||||||
res, err := tdb.db.ExecContext(ctx, del, key)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
n, err := res.RowsAffected()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if n != 1 {
|
|
||||||
return DocumentNotFoundError(key)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
130
db_test.go
130
db_test.go
|
@ -3,11 +3,7 @@ package gotelem
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
|
||||||
"reflect"
|
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -89,6 +85,7 @@ func MakeMockDatabase(name string) *TelemDb {
|
||||||
|
|
||||||
func TestTelemDb(t *testing.T) {
|
func TestTelemDb(t *testing.T) {
|
||||||
|
|
||||||
|
|
||||||
t.Run("test opening database", func(t *testing.T) {
|
t.Run("test opening database", func(t *testing.T) {
|
||||||
// create our mock
|
// create our mock
|
||||||
tdb := MakeMockDatabase(t.Name())
|
tdb := MakeMockDatabase(t.Name())
|
||||||
|
@ -157,128 +154,3 @@ func TestTelemDb(t *testing.T) {
|
||||||
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func MockDocument(key string) json.RawMessage {
|
|
||||||
var v = make(map[string]interface{})
|
|
||||||
|
|
||||||
v["identifier"] = map[string]string{"key": key}
|
|
||||||
v["randomdata"] = rand.Int()
|
|
||||||
res, err := json.Marshal(v)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
return res
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDbDocuments(t *testing.T) {
|
|
||||||
|
|
||||||
t.Run("test inserting a document", func(t *testing.T) {
|
|
||||||
tdb := MakeMockDatabase(t.Name())
|
|
||||||
tdb.db.Ping()
|
|
||||||
ctx := context.Background()
|
|
||||||
err := tdb.AddDocument(ctx, MockDocument("hi"))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("AddDocument expected no error, got err=%v", err)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("test inserting duplicate documents", func(t *testing.T) {
|
|
||||||
tdb := MakeMockDatabase(t.Name())
|
|
||||||
tdb.db.Ping()
|
|
||||||
ctx := context.Background()
|
|
||||||
doc := MockDocument("hi")
|
|
||||||
err := tdb.AddDocument(ctx, doc)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("AddDocument expected no error, got err=%v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = tdb.AddDocument(ctx, doc)
|
|
||||||
if err == nil {
|
|
||||||
t.Fatalf("AddDocument expected duplicate key error, got nil")
|
|
||||||
}
|
|
||||||
})
|
|
||||||
t.Run("test inserting bad document", func(t *testing.T) {
|
|
||||||
tdb := MakeMockDatabase(t.Name())
|
|
||||||
tdb.db.Ping()
|
|
||||||
ctx := context.Background()
|
|
||||||
var badDoc = map[string]string{"bad":"duh"}
|
|
||||||
msg, err := json.Marshal(badDoc)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
err = tdb.AddDocument(ctx, msg)
|
|
||||||
|
|
||||||
if err == nil {
|
|
||||||
t.Fatalf("AddDocument expected error, got nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("test getting document", func(t *testing.T) {
|
|
||||||
tdb := MakeMockDatabase(t.Name())
|
|
||||||
tdb.db.Ping()
|
|
||||||
ctx := context.Background()
|
|
||||||
doc := MockDocument("hi")
|
|
||||||
err := tdb.AddDocument(ctx, doc)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("AddDocument expected no error, got err=%v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := tdb.GetDocument(ctx, "hi")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("GetDocument expected no error, got err=%v", err)
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(res, doc) {
|
|
||||||
t.Fatalf("GetDocument did not return identical document")
|
|
||||||
}
|
|
||||||
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("test getting nonexistent document", func(t *testing.T) {
|
|
||||||
tdb := MakeMockDatabase(t.Name())
|
|
||||||
tdb.db.Ping()
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
res, err := tdb.GetDocument(ctx, "hi")
|
|
||||||
|
|
||||||
if err == nil || !errors.Is(err, DocumentNotFoundError("hi")){
|
|
||||||
t.Fatalf("GetDocument expected DocumentNotFoundError, got %v", err)
|
|
||||||
}
|
|
||||||
if res != nil {
|
|
||||||
t.Fatalf("GetDocument expected nil result, got %v", res)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("test update document", func(t *testing.T) {
|
|
||||||
tdb := MakeMockDatabase(t.Name())
|
|
||||||
tdb.db.Ping()
|
|
||||||
ctx := context.Background()
|
|
||||||
doc1 := MockDocument("hi")
|
|
||||||
doc2 := MockDocument("hi") // same key, we want to update.
|
|
||||||
|
|
||||||
tdb.AddDocument(ctx, doc1)
|
|
||||||
err := tdb.UpdateDocument(ctx, "hi", doc2)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("UpdateDocument expected no error, got err=%v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// compare.
|
|
||||||
res, _ := tdb.GetDocument(ctx, "hi")
|
|
||||||
if !reflect.DeepEqual(res, doc2) {
|
|
||||||
t.Fatalf("UpdateDocument did not return new doc, got %s", res)
|
|
||||||
}
|
|
||||||
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("test update nonexistent document", func(t *testing.T) {
|
|
||||||
tdb := MakeMockDatabase(t.Name())
|
|
||||||
tdb.db.Ping()
|
|
||||||
ctx := context.Background()
|
|
||||||
doc := MockDocument("hi")
|
|
||||||
err := tdb.UpdateDocument(ctx, "badKey", doc)
|
|
||||||
if err == nil {
|
|
||||||
t.Fatalf("UpdateDocument expected error, got nil")
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
72
http.go
72
http.go
|
@ -31,7 +31,7 @@ func extractBusEventFilter(r *http.Request) (*BusEventFilter, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return bef, err
|
return bef, err
|
||||||
}
|
}
|
||||||
bef.StartTime = t
|
bef.TimerangeStart = t
|
||||||
}
|
}
|
||||||
if el := v.Get("end"); el != "" {
|
if el := v.Get("end"); el != "" {
|
||||||
// parse the start time query.
|
// parse the start time query.
|
||||||
|
@ -39,15 +39,7 @@ func extractBusEventFilter(r *http.Request) (*BusEventFilter, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return bef, err
|
return bef, err
|
||||||
}
|
}
|
||||||
bef.EndTime = t
|
bef.TimerangeEnd = t
|
||||||
}
|
|
||||||
bef.Indexes = make([]int, 0)
|
|
||||||
for _, strIdx := range v["idx"] {
|
|
||||||
idx, err := strconv.ParseInt(strIdx, 10, 32)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
bef.Indexes = append(bef.Indexes, int(idx))
|
|
||||||
}
|
}
|
||||||
return bef, nil
|
return bef, nil
|
||||||
}
|
}
|
||||||
|
@ -140,20 +132,15 @@ func apiV1(broker *Broker, tdb *TelemDb) chi.Router {
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
// OpenMCT domain object storage. Basically an arbitrary JSON document store
|
|
||||||
|
|
||||||
r.Route("/openmct", func(r chi.Router) {
|
|
||||||
// key is a column on our json store, it's nested under identifier.key
|
|
||||||
r.Get("/{key}", func(w http.ResponseWriter, r *http.Request) {})
|
|
||||||
r.Put("/{key}", func(w http.ResponseWriter, r *http.Request) {})
|
|
||||||
r.Delete("/{key}", func(w http.ResponseWriter, r *http.Request) {})
|
|
||||||
// create a new object.
|
|
||||||
r.Post("/", func(w http.ResponseWriter, r *http.Request) {})
|
|
||||||
// subscribe to object updates.
|
|
||||||
r.Get("/subscribe", func(w http.ResponseWriter, r *http.Request) {})
|
|
||||||
})
|
|
||||||
|
|
||||||
// records are driving segments/runs.
|
// records are driving segments/runs.
|
||||||
|
r.Route("/records", func(r chi.Router) {
|
||||||
|
r.Get("/", apiV1GetRecords(tdb)) // get all runs
|
||||||
|
r.Get("/active", apiV1GetActiveRecord(tdb)) // get current run (no end time)
|
||||||
|
r.Post("/", apiV1StartRecord(tdb)) // create a new run (with note). Ends active run if any, and creates new active run (no end time)
|
||||||
|
r.Get("/{id}", apiV1GetRecord(tdb)) // get details on a specific run
|
||||||
|
r.Put("/{id}", apiV1UpdateRecord(tdb)) // update a specific run. Can only be used to add notes/metadata, and not to change time/id.
|
||||||
|
|
||||||
|
})
|
||||||
|
|
||||||
r.Get("/stats", func(w http.ResponseWriter, r *http.Request) {
|
r.Get("/stats", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
|
@ -228,13 +215,16 @@ func apiV1GetPackets(tdb *TelemDb) http.HandlerFunc {
|
||||||
lim, err := extractLimitModifier(r)
|
lim, err := extractLimitModifier(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
fmt.Print(lim)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: is the following check needed?
|
// TODO: is the following check needed?
|
||||||
var res []skylab.BusEvent
|
var res []skylab.BusEvent
|
||||||
res, err = tdb.GetPackets(r.Context(), *bef, lim)
|
if lim != nil {
|
||||||
|
res, err = tdb.GetPackets(r.Context(), *bef, lim)
|
||||||
|
} else {
|
||||||
|
res, err = tdb.GetPackets(r.Context(), *bef)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
|
@ -274,10 +264,15 @@ func apiV1GetValues(db *TelemDb) http.HandlerFunc {
|
||||||
// override the bus event filter name option
|
// override the bus event filter name option
|
||||||
bef.Names = []string{name}
|
bef.Names = []string{name}
|
||||||
|
|
||||||
|
var order = &OrderByTimestampModifer{}
|
||||||
|
|
||||||
var res []Datum
|
var res []Datum
|
||||||
// make the call, skip the limit modifier if it's nil.
|
// make the call, skip the limit modifier if it's nil.
|
||||||
res, err = db.GetValues(r.Context(), *bef, field, lim)
|
if lim == nil {
|
||||||
|
res, err = db.GetValues(r.Context(), *bef, field, order)
|
||||||
|
} else {
|
||||||
|
res, err = db.GetValues(r.Context(), *bef, field, lim, order)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// 500 server error:
|
// 500 server error:
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
@ -292,3 +287,28 @@ func apiV1GetValues(db *TelemDb) http.HandlerFunc {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: rename. record is not a clear name. Runs? drives? segments?
|
||||||
|
func apiV1GetRecords(db *TelemDb) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func apiV1GetActiveRecord(db *TelemDb) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func apiV1StartRecord(db *TelemDb) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
func apiV1GetRecord(db *TelemDb) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
func apiV1UpdateRecord(db *TelemDb) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {}
|
||||||
|
}
|
||||||
|
|
|
@ -1 +1,2 @@
|
||||||
DROP TABLE "drive_records";
|
DROP TABLE "drive_records";
|
||||||
|
DROP TABLE "position_logs";
|
|
@ -7,3 +7,12 @@ CREATE TABLE "drive_records" (
|
||||||
PRIMARY KEY("id" AUTOINCREMENT),
|
PRIMARY KEY("id" AUTOINCREMENT),
|
||||||
CONSTRAINT "duration_valid" CHECK(end_time is null or start_time < end_time)
|
CONSTRAINT "duration_valid" CHECK(end_time is null or start_time < end_time)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
CREATE TABLE "position_logs" (
|
||||||
|
"ts" INTEGER NOT NULL,
|
||||||
|
"source" TEXT NOT NULL,
|
||||||
|
"lat" REAL NOT NULL,
|
||||||
|
"lon" REAL NOT NULL,
|
||||||
|
"elevation" REAL,
|
||||||
|
CONSTRAINT "no_empty_source" CHECK(source is not "")
|
||||||
|
);
|
|
@ -1 +0,0 @@
|
||||||
ALTER TABLE "bus_events" DROP COLUMN idx;
|
|
|
@ -1 +0,0 @@
|
||||||
ALTER TABLE "bus_events" ADD COLUMN idx GENERATED ALWAYS AS (json_extract(data, '$.idx')) VIRTUAL;
|
|
|
@ -1,2 +0,0 @@
|
||||||
DROP TABLE openmct_objects;
|
|
||||||
DROP INDEX openmct_key;
|
|
|
@ -1,6 +0,0 @@
|
||||||
CREATE TABLE openmct_objects (
|
|
||||||
data TEXT,
|
|
||||||
key TEXT GENERATED ALWAYS AS (json_extract(data, '$.identifier.key')) VIRTUAL UNIQUE NOT NULL
|
|
||||||
);
|
|
||||||
-- fast key-lookup
|
|
||||||
CREATE INDEX openmct_key on openmct_objects(key);
|
|
Loading…
Reference in a new issue