diff --git a/internal/db/db.go b/internal/db/db.go index da4eb43..27f47a9 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -132,93 +132,6 @@ func (tdb *TelemDb) AddEvents(events ...skylab.BusEvent) (int64, error) { return tdb.AddEventsCtx(context.Background(), events...) } -/// Query fragment guide: -/// We need to be able to easily construct safe(!) and meaningful queries programatically -/// so we make some new types that can be turned into SQL fragments that go inside the where clause. -/// These all implement the QueryFrag interface, meaning the actual query function (that acts on the DB) -/// can deal with them agnostically. The Query function joins all the fragments it is given with AND. -/// to get OR, - -// QueryFrag is anything that can be turned into a Query WHERE clause -type QueryFrag interface { - Query() string -} - -// QueryTimeRange represents a query of a specific time range. For "before" or "after" queries, -// use time.Unix(0,0) or time.Now() in start and end respectively. -type QueryTimeRange struct { - Start time.Time - End time.Time -} - -func (q *QueryTimeRange) Query() string { - startUnix := q.Start.UnixMilli() - endUnix := q.End.UnixMilli() - - return fmt.Sprintf("ts BETWEEN %d AND %d", startUnix, endUnix) -} - -type QueryNames []string - -func (q QueryNames) Query() string { - return fmt.Sprintf("name IN (%s)", strings.Join(q, ", ")) -} - -type QueryOr []QueryFrag - -func (q QueryOr) Query() string { - var qStrings []string - for _, frag := range q { - qStrings = append(qStrings, frag.Query()) - } - return fmt.Sprintf("(%s)", strings.Join(qStrings, " OR ")) -} - -// GetEvents is the mechanism to request underlying event data. -// it takes functions (which are defined in db.go) that modify the query, -// and then return the results. -func (tdb *TelemDb) GetEvents(limit int, where ...QueryFrag) (events []skylab.BusEvent, err error) { - // Simple mechanism for combining query frags: - // join with " AND ". To join expressions with or, use QueryOr - var fragStr []string - for _, f := range where { - fragStr = append(fragStr, f.Query()) - } - qString := fmt.Sprintf(`SELECT * FROM "bus_events" WHERE %s LIMIT %d`, strings.Join(fragStr, " AND "), limit) - rows, err := tdb.db.Queryx(qString) - if err != nil { - return - } - defer rows.Close() - - if limit < 0 { // special case: limit negative means unrestricted. - events = make([]skylab.BusEvent, 0, 20) - } else { - events = make([]skylab.BusEvent, 0, limit) - } - // scan rows into busevent list... - for rows.Next() { - var ev skylab.RawJsonEvent - err = rows.StructScan(&ev) - if err != nil { - return - } - - BusEv := skylab.BusEvent{ - Timestamp: time.UnixMilli(int64(ev.Timestamp)), - Name: ev.Name, - } - BusEv.Data, err = skylab.FromJson(ev.Name, ev.Data) - - // FIXME: this is slow! - events = append(events, BusEv) - - } - - err = rows.Err() - - return -} // GetActiveDrive finds the non-null drive and returns it, if any. func (tdb *TelemDb) GetActiveDrive() (res int, err error) { diff --git a/internal/db/db_test.go b/internal/db/db_test.go index 4785393..031f90f 100644 --- a/internal/db/db_test.go +++ b/internal/db/db_test.go @@ -1,12 +1,14 @@ package db import ( + "bufio" + "context" "fmt" - "reflect" + "strings" "testing" "time" - "github.com/jmoiron/sqlx" + "github.com/kschamplin/gotelem/internal/logparsers" "github.com/kschamplin/gotelem/skylab" ) @@ -24,29 +26,74 @@ func GetRandomBusEvent() skylab.BusEvent { return ev } +// exampleData is a telemetry log data snippet that +// we use to seed the database. +const exampleData = `1698013005.164 1455ED8FDBDFF4FC3BD +1698013005.168 1460000000000000000 +1698013005.170 1470000000000000000 +1698013005.172 1610000000000000000 +1698013005.175 1210000000000000000 +1698013005.177 157FFFFC74200000000 +1698013005.181 1030000000000000000 +1698013005.184 1430000000000000000 +1698013005.187 04020D281405EA8FB41 +1698013005.210 0413BDF81406AF70042 +1698013005.212 042569F81408EF0FF41 +1698013005.215 04358A8814041060242 +1698013005.219 04481958140D2A40342 +1698013005.221 0452DB2814042990442 +1698013005.224 047AF948140C031FD41 +1698013005.226 04B27A081401ACD0B42 +1698013005.229 04DCEAA81403C8C0A42 +1698013005.283 04E0378814024580142 +1698013005.286 04F97908140BFBC0142 +1698013005.289 050098A81402F0F0A42 +1698013005.293 051E6AE81402AF20842 +1698013005.297 0521AC081403A970742 +1698013005.300 0535BB181403CEB0542 +1698013005.304 054ECC0814088FE0142 +1698013005.307 0554ED181401F44F341 +1698013005.309 05726E48140D42BEB41 +1698013005.312 059EFC98140EC400142 +` + +// MakeMockDatabase creates a new dummy database. +func MakeMockDatabase(name string) *TelemDb { + fstring := fmt.Sprintf("file:%s?mode=memory&cache=shared", name) + tdb, err := openRawDb(fstring) + if err != nil { + panic(err) + } + // seed the database now. + scanner := bufio.NewScanner(strings.NewReader(exampleData)) + + for scanner.Scan() { + str := scanner.Text() + + bev, err := logparsers.ParsersMap["telem"](str) + if err != nil { + panic(err) + } + _, err = tdb.AddEvents(bev) + if err != nil { + panic(err) + } + } + + return tdb +} + func TestTelemDb(t *testing.T) { - var tdb *TelemDb t.Run("test opening database", func(t *testing.T) { - var err error - // we use the underlying raw database to avoid the options. - tdb, err = openRawDb("file::memory:?cache=shared") - if err != nil { - t.Errorf("could not open db: %v", err) - } + // create our mock + tdb := MakeMockDatabase(t.Name()) tdb.db.Ping() - res, _ := tdb.db.Query("SELECT name FROM sqlite_master WHERE type='table'") - - var table string - - for res.Next() { - res.Scan(&table) - fmt.Println(table) - } }) t.Run("test inserting bus event", func(t *testing.T) { + tdb := MakeMockDatabase(t.Name()) type args struct { events []skylab.BusEvent } @@ -69,6 +116,13 @@ func TestTelemDb(t *testing.T) { }, wantErr: false, }, + { + name: "add multiple packet", + args: args{ + events: []skylab.BusEvent{GetRandomBusEvent(), GetRandomBusEvent()}, + }, + wantErr: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -79,35 +133,21 @@ func TestTelemDb(t *testing.T) { } }) - type fields struct { - db *sqlx.DB - } - type args struct { - limit int - where []QueryFrag - } - tests := []struct { - name string - fields fields - args args - wantEvents []skylab.BusEvent - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tdb := &TelemDb{ - db: tt.fields.db, - } - gotEvents, err := tdb.GetEvents(tt.args.limit, tt.args.where...) - if (err != nil) != tt.wantErr { - t.Errorf("TelemDb.GetEvents() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(gotEvents, tt.wantEvents) { - t.Errorf("TelemDb.GetEvents() = %v, want %v", gotEvents, tt.wantEvents) - } - }) - } + + t.Run("test getting packets", func(t *testing.T) { + tdb := MakeMockDatabase(t.Name()) + + ctx := context.Background() + f := BusEventFilter{} + limitMod := LimitOffsetModifier{Limit: 1} + pkt, err := tdb.GetPackets(ctx, f, limitMod) + if err != nil { + t.Fatalf("error getting packets: %v", err) + } + t.Log(pkt) + }) + + t.Run("test read-write packet", func(t *testing.T) { + + }) } diff --git a/internal/db/getters.go b/internal/db/getters.go index 1b55d9f..8392554 100644 --- a/internal/db/getters.go +++ b/internal/db/getters.go @@ -3,29 +3,104 @@ package db import ( "context" "fmt" + "strings" "time" + + "github.com/kschamplin/gotelem/skylab" ) // Modifier augments SQL strings. type Modifier interface { - ModifyStatement(string) string + ModifyStatement(*strings.Builder) error } - +// LimitOffsetModifier is a modifier to support pagniation. type LimitOffsetModifier struct { - Limit int + Limit int Offset int } +func (l LimitOffsetModifier) ModifyStatement(sb *strings.Builder) error { + clause := fmt.Sprintf(" LIMIT %d OFFSET %d", l.Limit, l.Offset) + sb.WriteString(clause) + return nil +} + // BusEventFilter is a filter for bus events. type BusEventFilter struct { - Names []string + Names []string TimerangeStart time.Time - TimerangeEnd time.Time + TimerangeEnd time.Time } // now we can optionally add a limit. +func (tdb *TelemDb) GetPackets(ctx context.Context, filter BusEventFilter, options ...Modifier) ([]skylab.BusEvent, error) { + // construct a simple + var whereFrags = make([]string, 0) + + // if we're filtering by names, add a where clause for it. + if len(filter.Names) > 0 { + names := strings.Join(filter.Names, ", ") + qString := fmt.Sprintf("name IN (%s)", names) + whereFrags = append(whereFrags, qString) + } + // TODO: identify if we need a special case for both time ranges + // using BETWEEN since apparenlty that can be better? + + // next, check if we have a start/end time, add constraints + if !filter.TimerangeEnd.IsZero() { + qString := fmt.Sprintf("ts <= %d", filter.TimerangeEnd.UnixMilli()) + whereFrags = append(whereFrags, qString) + } + if !filter.TimerangeStart.IsZero() { + // we have an end range + qString := fmt.Sprintf("ts >= %d", filter.TimerangeStart.UnixMilli()) + whereFrags = append(whereFrags, qString) + } + + sb := strings.Builder{} + sb.WriteString("SELECT * from \"bus_events\"") + // construct the full statement. + if len(whereFrags) > 0 { + // use the where clauses. + sb.WriteString(" WHERE ") + sb.WriteString(strings.Join(whereFrags, " AND ")) + } + + // Augment our data further if there's i.e a limit modifier. + // TODO: factor this out maybe? + for _, m := range options { + m.ModifyStatement(&sb) + } + rows, err := tdb.db.QueryxContext(ctx, sb.String()) + if err != nil { + return nil, err + } + defer rows.Close() + + var events = make([]skylab.BusEvent, 0, 10) + + for rows.Next() { + var ev skylab.RawJsonEvent + err := rows.Scan(&ev.Timestamp, &ev.Name, (*[]byte)(&ev.Data)) + if err != nil { + return nil, err + } + + BusEv := skylab.BusEvent { + Timestamp: time.UnixMilli(int64(ev.Timestamp)), + Name: ev.Name, + } + BusEv.Data, err = skylab.FromJson(ev.Name, ev.Data) + events = append(events, BusEv) + } + + err = rows.Err() + + return events, err +} + // Datum is a single measurement - it is more granular than a packet. // the classic example is bms_measurement.current type Datum struct { diff --git a/internal/db/migrations/1_initial_up.sql b/internal/db/migrations/1_initial_up.sql index 1e6357c..006ce07 100644 --- a/internal/db/migrations/1_initial_up.sql +++ b/internal/db/migrations/1_initial_up.sql @@ -1,7 +1,7 @@ CREATE TABLE "bus_events" ( "ts" INTEGER NOT NULL, -- timestamp, unix milliseconds "name" TEXT NOT NULL, -- name of base packet - "data" TEXT NOT NULL CHECK(json_valid(data)) -- JSON object describing the data, including index if any + "data" JSON NOT NULL CHECK(json_valid(data)) -- JSON object describing the data, including index if any ); CREATE INDEX "ids_timestamped" ON "bus_events" (