rework DB getters
Some checks failed
Go / build (1.21) (push) Failing after 1m6s
Go / build (1.22) (push) Failing after 1m6s

abandon generic query frag for common structures
Instead of using the QueryFrag struct, which was too generic to be
generally useful, we have moved to a BusEventFilter type, which
contains things we may filter on when we're searching for bus events.
At the moment it just contains names, and start/stop times.
Then in each function we can accept this filter struct and convert
it to fit the query.

We also support general modifiers, and currently have one implemented:
the LimitOffsetModifier. This adds a LIMIT and OFFSET clause to any
statement. these are all applied at the end and receive a stringbuilder
which may prevent certain operations from being structured.
We need to work on this one more, potentially abandoning.
This commit is contained in:
saji 2024-03-01 16:25:33 -06:00
parent 4092fdba6f
commit 68347e8b95
4 changed files with 169 additions and 141 deletions

View file

@ -132,93 +132,6 @@ func (tdb *TelemDb) AddEvents(events ...skylab.BusEvent) (int64, error) {
return tdb.AddEventsCtx(context.Background(), events...) return tdb.AddEventsCtx(context.Background(), events...)
} }
/// Query fragment guide:
/// We need to be able to easily construct safe(!) and meaningful queries programatically
/// so we make some new types that can be turned into SQL fragments that go inside the where clause.
/// These all implement the QueryFrag interface, meaning the actual query function (that acts on the DB)
/// can deal with them agnostically. The Query function joins all the fragments it is given with AND.
/// to get OR,
// QueryFrag is anything that can be turned into a Query WHERE clause
type QueryFrag interface {
Query() string
}
// QueryTimeRange represents a query of a specific time range. For "before" or "after" queries,
// use time.Unix(0,0) or time.Now() in start and end respectively.
type QueryTimeRange struct {
Start time.Time
End time.Time
}
func (q *QueryTimeRange) Query() string {
startUnix := q.Start.UnixMilli()
endUnix := q.End.UnixMilli()
return fmt.Sprintf("ts BETWEEN %d AND %d", startUnix, endUnix)
}
type QueryNames []string
func (q QueryNames) Query() string {
return fmt.Sprintf("name IN (%s)", strings.Join(q, ", "))
}
type QueryOr []QueryFrag
func (q QueryOr) Query() string {
var qStrings []string
for _, frag := range q {
qStrings = append(qStrings, frag.Query())
}
return fmt.Sprintf("(%s)", strings.Join(qStrings, " OR "))
}
// GetEvents is the mechanism to request underlying event data.
// it takes functions (which are defined in db.go) that modify the query,
// and then return the results.
func (tdb *TelemDb) GetEvents(limit int, where ...QueryFrag) (events []skylab.BusEvent, err error) {
// Simple mechanism for combining query frags:
// join with " AND ". To join expressions with or, use QueryOr
var fragStr []string
for _, f := range where {
fragStr = append(fragStr, f.Query())
}
qString := fmt.Sprintf(`SELECT * FROM "bus_events" WHERE %s LIMIT %d`, strings.Join(fragStr, " AND "), limit)
rows, err := tdb.db.Queryx(qString)
if err != nil {
return
}
defer rows.Close()
if limit < 0 { // special case: limit negative means unrestricted.
events = make([]skylab.BusEvent, 0, 20)
} else {
events = make([]skylab.BusEvent, 0, limit)
}
// scan rows into busevent list...
for rows.Next() {
var ev skylab.RawJsonEvent
err = rows.StructScan(&ev)
if err != nil {
return
}
BusEv := skylab.BusEvent{
Timestamp: time.UnixMilli(int64(ev.Timestamp)),
Name: ev.Name,
}
BusEv.Data, err = skylab.FromJson(ev.Name, ev.Data)
// FIXME: this is slow!
events = append(events, BusEv)
}
err = rows.Err()
return
}
// GetActiveDrive finds the non-null drive and returns it, if any. // GetActiveDrive finds the non-null drive and returns it, if any.
func (tdb *TelemDb) GetActiveDrive() (res int, err error) { func (tdb *TelemDb) GetActiveDrive() (res int, err error) {

View file

@ -1,12 +1,14 @@
package db package db
import ( import (
"bufio"
"context"
"fmt" "fmt"
"reflect" "strings"
"testing" "testing"
"time" "time"
"github.com/jmoiron/sqlx" "github.com/kschamplin/gotelem/internal/logparsers"
"github.com/kschamplin/gotelem/skylab" "github.com/kschamplin/gotelem/skylab"
) )
@ -24,29 +26,74 @@ func GetRandomBusEvent() skylab.BusEvent {
return ev return ev
} }
// exampleData is a telemetry log data snippet that
// we use to seed the database.
const exampleData = `1698013005.164 1455ED8FDBDFF4FC3BD
1698013005.168 1460000000000000000
1698013005.170 1470000000000000000
1698013005.172 1610000000000000000
1698013005.175 1210000000000000000
1698013005.177 157FFFFC74200000000
1698013005.181 1030000000000000000
1698013005.184 1430000000000000000
1698013005.187 04020D281405EA8FB41
1698013005.210 0413BDF81406AF70042
1698013005.212 042569F81408EF0FF41
1698013005.215 04358A8814041060242
1698013005.219 04481958140D2A40342
1698013005.221 0452DB2814042990442
1698013005.224 047AF948140C031FD41
1698013005.226 04B27A081401ACD0B42
1698013005.229 04DCEAA81403C8C0A42
1698013005.283 04E0378814024580142
1698013005.286 04F97908140BFBC0142
1698013005.289 050098A81402F0F0A42
1698013005.293 051E6AE81402AF20842
1698013005.297 0521AC081403A970742
1698013005.300 0535BB181403CEB0542
1698013005.304 054ECC0814088FE0142
1698013005.307 0554ED181401F44F341
1698013005.309 05726E48140D42BEB41
1698013005.312 059EFC98140EC400142
`
// MakeMockDatabase creates a new dummy database.
func MakeMockDatabase(name string) *TelemDb {
fstring := fmt.Sprintf("file:%s?mode=memory&cache=shared", name)
tdb, err := openRawDb(fstring)
if err != nil {
panic(err)
}
// seed the database now.
scanner := bufio.NewScanner(strings.NewReader(exampleData))
for scanner.Scan() {
str := scanner.Text()
bev, err := logparsers.ParsersMap["telem"](str)
if err != nil {
panic(err)
}
_, err = tdb.AddEvents(bev)
if err != nil {
panic(err)
}
}
return tdb
}
func TestTelemDb(t *testing.T) { func TestTelemDb(t *testing.T) {
var tdb *TelemDb
t.Run("test opening database", func(t *testing.T) { t.Run("test opening database", func(t *testing.T) {
var err error // create our mock
// we use the underlying raw database to avoid the options. tdb := MakeMockDatabase(t.Name())
tdb, err = openRawDb("file::memory:?cache=shared")
if err != nil {
t.Errorf("could not open db: %v", err)
}
tdb.db.Ping() tdb.db.Ping()
res, _ := tdb.db.Query("SELECT name FROM sqlite_master WHERE type='table'")
var table string
for res.Next() {
res.Scan(&table)
fmt.Println(table)
}
}) })
t.Run("test inserting bus event", func(t *testing.T) { t.Run("test inserting bus event", func(t *testing.T) {
tdb := MakeMockDatabase(t.Name())
type args struct { type args struct {
events []skylab.BusEvent events []skylab.BusEvent
} }
@ -69,6 +116,13 @@ func TestTelemDb(t *testing.T) {
}, },
wantErr: false, wantErr: false,
}, },
{
name: "add multiple packet",
args: args{
events: []skylab.BusEvent{GetRandomBusEvent(), GetRandomBusEvent()},
},
wantErr: false,
},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
@ -79,35 +133,21 @@ func TestTelemDb(t *testing.T) {
} }
}) })
type fields struct {
db *sqlx.DB t.Run("test getting packets", func(t *testing.T) {
} tdb := MakeMockDatabase(t.Name())
type args struct {
limit int ctx := context.Background()
where []QueryFrag f := BusEventFilter{}
} limitMod := LimitOffsetModifier{Limit: 1}
tests := []struct { pkt, err := tdb.GetPackets(ctx, f, limitMod)
name string if err != nil {
fields fields t.Fatalf("error getting packets: %v", err)
args args
wantEvents []skylab.BusEvent
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tdb := &TelemDb{
db: tt.fields.db,
}
gotEvents, err := tdb.GetEvents(tt.args.limit, tt.args.where...)
if (err != nil) != tt.wantErr {
t.Errorf("TelemDb.GetEvents() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(gotEvents, tt.wantEvents) {
t.Errorf("TelemDb.GetEvents() = %v, want %v", gotEvents, tt.wantEvents)
} }
t.Log(pkt)
})
t.Run("test read-write packet", func(t *testing.T) {
}) })
}
} }

View file

@ -3,20 +3,29 @@ package db
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"time" "time"
"github.com/kschamplin/gotelem/skylab"
) )
// Modifier augments SQL strings. // Modifier augments SQL strings.
type Modifier interface { type Modifier interface {
ModifyStatement(string) string ModifyStatement(*strings.Builder) error
} }
// LimitOffsetModifier is a modifier to support pagniation.
type LimitOffsetModifier struct { type LimitOffsetModifier struct {
Limit int Limit int
Offset int Offset int
} }
func (l LimitOffsetModifier) ModifyStatement(sb *strings.Builder) error {
clause := fmt.Sprintf(" LIMIT %d OFFSET %d", l.Limit, l.Offset)
sb.WriteString(clause)
return nil
}
// BusEventFilter is a filter for bus events. // BusEventFilter is a filter for bus events.
type BusEventFilter struct { type BusEventFilter struct {
Names []string Names []string
@ -26,6 +35,72 @@ type BusEventFilter struct {
// now we can optionally add a limit. // now we can optionally add a limit.
func (tdb *TelemDb) GetPackets(ctx context.Context, filter BusEventFilter, options ...Modifier) ([]skylab.BusEvent, error) {
// construct a simple
var whereFrags = make([]string, 0)
// if we're filtering by names, add a where clause for it.
if len(filter.Names) > 0 {
names := strings.Join(filter.Names, ", ")
qString := fmt.Sprintf("name IN (%s)", names)
whereFrags = append(whereFrags, qString)
}
// TODO: identify if we need a special case for both time ranges
// using BETWEEN since apparenlty that can be better?
// next, check if we have a start/end time, add constraints
if !filter.TimerangeEnd.IsZero() {
qString := fmt.Sprintf("ts <= %d", filter.TimerangeEnd.UnixMilli())
whereFrags = append(whereFrags, qString)
}
if !filter.TimerangeStart.IsZero() {
// we have an end range
qString := fmt.Sprintf("ts >= %d", filter.TimerangeStart.UnixMilli())
whereFrags = append(whereFrags, qString)
}
sb := strings.Builder{}
sb.WriteString("SELECT * from \"bus_events\"")
// construct the full statement.
if len(whereFrags) > 0 {
// use the where clauses.
sb.WriteString(" WHERE ")
sb.WriteString(strings.Join(whereFrags, " AND "))
}
// Augment our data further if there's i.e a limit modifier.
// TODO: factor this out maybe?
for _, m := range options {
m.ModifyStatement(&sb)
}
rows, err := tdb.db.QueryxContext(ctx, sb.String())
if err != nil {
return nil, err
}
defer rows.Close()
var events = make([]skylab.BusEvent, 0, 10)
for rows.Next() {
var ev skylab.RawJsonEvent
err := rows.Scan(&ev.Timestamp, &ev.Name, (*[]byte)(&ev.Data))
if err != nil {
return nil, err
}
BusEv := skylab.BusEvent {
Timestamp: time.UnixMilli(int64(ev.Timestamp)),
Name: ev.Name,
}
BusEv.Data, err = skylab.FromJson(ev.Name, ev.Data)
events = append(events, BusEv)
}
err = rows.Err()
return events, err
}
// Datum is a single measurement - it is more granular than a packet. // Datum is a single measurement - it is more granular than a packet.
// the classic example is bms_measurement.current // the classic example is bms_measurement.current
type Datum struct { type Datum struct {

View file

@ -1,7 +1,7 @@
CREATE TABLE "bus_events" ( CREATE TABLE "bus_events" (
"ts" INTEGER NOT NULL, -- timestamp, unix milliseconds "ts" INTEGER NOT NULL, -- timestamp, unix milliseconds
"name" TEXT NOT NULL, -- name of base packet "name" TEXT NOT NULL, -- name of base packet
"data" TEXT NOT NULL CHECK(json_valid(data)) -- JSON object describing the data, including index if any "data" JSON NOT NULL CHECK(json_valid(data)) -- JSON object describing the data, including index if any
); );
CREATE INDEX "ids_timestamped" ON "bus_events" ( CREATE INDEX "ids_timestamped" ON "bus_events" (