diff --git a/cmd/gotelem/cli/client.go b/cmd/gotelem/cli/client.go index e55f0c8..11be76f 100644 --- a/cmd/gotelem/cli/client.go +++ b/cmd/gotelem/cli/client.go @@ -9,9 +9,9 @@ import ( "strings" "sync/atomic" - "github.com/kschamplin/gotelem/internal/db" "github.com/kschamplin/gotelem/internal/logparsers" "github.com/kschamplin/gotelem/skylab" + "github.com/kschamplin/gotelem" "github.com/urfave/cli/v2" "golang.org/x/sync/errgroup" ) @@ -81,7 +81,7 @@ func importAction(ctx *cli.Context) error { } dbPath := ctx.Path("database") - db, err := db.OpenTelemDb(dbPath) + db, err := gotelem.OpenTelemDb(dbPath) if err != nil { return fmt.Errorf("error opening database: %w", err) } diff --git a/cmd/gotelem/cli/server.go b/cmd/gotelem/cli/server.go index aff2b8a..05c4ff7 100644 --- a/cmd/gotelem/cli/server.go +++ b/cmd/gotelem/cli/server.go @@ -11,8 +11,6 @@ import ( "log/slog" "github.com/kschamplin/gotelem" - "github.com/kschamplin/gotelem/internal/api" - "github.com/kschamplin/gotelem/internal/db" "github.com/kschamplin/gotelem/skylab" "github.com/kschamplin/gotelem/xbee" "github.com/urfave/cli/v2" @@ -59,7 +57,7 @@ type service interface { type svcDeps struct { Broker *gotelem.Broker - Db *db.TelemDb + Db *gotelem.TelemDb Logger *slog.Logger } @@ -99,7 +97,7 @@ func serve(cCtx *cli.Context) error { dbPath = cCtx.Path("db") } logger.Info("opening database", "path", dbPath) - db, err := db.OpenTelemDb(dbPath) + db, err := gotelem.OpenTelemDb(dbPath) if err != nil { return err } @@ -220,7 +218,7 @@ func (h *httpService) Start(cCtx *cli.Context, deps svcDeps) (err error) { broker := deps.Broker db := deps.Db - r := api.TelemRouter(logger, broker, db) + r := gotelem.TelemRouter(logger, broker, db) // diff --git a/db.go b/db.go new file mode 100644 index 0000000..b46c93b --- /dev/null +++ b/db.go @@ -0,0 +1,335 @@ +package gotelem + +// this file implements the database functions to load/store/read from a sql database. + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "time" + + "github.com/jmoiron/sqlx" + "github.com/kschamplin/gotelem/skylab" + _ "github.com/mattn/go-sqlite3" +) + +type TelemDb struct { + db *sqlx.DB +} + +// TelemDbOption lets you customize the behavior of the sqlite database +type TelemDbOption func(*TelemDb) error + +// this function is internal use. It actually opens the database, but uses +// a raw path string instead of formatting one like the exported functions. +func OpenRawDb(rawpath string, options ...TelemDbOption) (tdb *TelemDb, err error) { + tdb = &TelemDb{} + tdb.db, err = sqlx.Connect("sqlite3", rawpath) + if err != nil { + return + } + for _, fn := range options { + err = fn(tdb) + if err != nil { + return + } + } + + // perform any database migrations + version, err := tdb.GetVersion() + if err != nil { + return + } + // TODO: use logging instead of printf + fmt.Printf("starting version %d\n", version) + + version, err = RunMigrations(tdb) + fmt.Printf("ending version %d\n", version) + + return tdb, err +} + +// this string is used to open the read-write db. +// the extra options improve performance significantly. +const ProductionDbURI = "file:%s?_journal_mode=wal&mode=rwc&_txlock=immediate&_timeout=10000" + +// OpenTelemDb opens a new telemetry database at the given path. +func OpenTelemDb(path string, options ...TelemDbOption) (*TelemDb, error) { + dbStr := fmt.Sprintf(ProductionDbURI, path) + return OpenRawDb(dbStr, options...) +} + +func (tdb *TelemDb) GetVersion() (int, error) { + var version int + err := tdb.db.Get(&version, "PRAGMA user_version") + return version, err +} + +func (tdb *TelemDb) SetVersion(version int) error { + stmt := fmt.Sprintf("PRAGMA user_version = %d", version) + _, err := tdb.db.Exec(stmt) + return err +} + +// sql expression to insert a bus event into the packets database.1 +const sqlInsertEvent = `INSERT INTO "bus_events" (ts, name, data) VALUES ` + +// AddEvent adds the bus event to the database. +func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...skylab.BusEvent) (n int64, err error) { + // edge case - zero events. + if len(events) == 0 { + return 0, nil + } + n = 0 + tx, err := tdb.db.BeginTx(ctx, nil) + defer tx.Rollback() + if err != nil { + return + } + + sqlStmt := sqlInsertEvent + const rowSql = "(?, ?, json(?))" + inserts := make([]string, len(events)) + vals := []interface{}{} + idx := 0 // we have to manually increment, because sometimes we don't insert. + for _, b := range events { + inserts[idx] = rowSql + var j []byte + j, err = json.Marshal(b.Data) + + if err != nil { + // we had some error turning the packet into json. + continue // we silently skip. + } + + vals = append(vals, b.Timestamp.UnixMilli(), b.Data.String(), j) + idx++ + } + + // construct the full statement now + sqlStmt = sqlStmt + strings.Join(inserts[:idx], ",") + stmt, err := tx.PrepareContext(ctx, sqlStmt) + // defer stmt.Close() + if err != nil { + return + } + res, err := stmt.ExecContext(ctx, vals...) + if err != nil { + return + } + n, err = res.RowsAffected() + + tx.Commit() + return +} + +func (tdb *TelemDb) AddEvents(events ...skylab.BusEvent) (int64, error) { + + return tdb.AddEventsCtx(context.Background(), events...) +} + +// QueryModifier augments SQL strings. +type QueryModifier interface { + ModifyStatement(*strings.Builder) error +} + +// LimitOffsetModifier is a modifier to support pagniation. +type LimitOffsetModifier struct { + Limit int + Offset int +} + +func (l LimitOffsetModifier) ModifyStatement(sb *strings.Builder) error { + clause := fmt.Sprintf(" LIMIT %d OFFSET %d", l.Limit, l.Offset) + sb.WriteString(clause) + return nil +} + +// BusEventFilter is a filter for bus events. +type BusEventFilter struct { + Names []string + TimerangeStart time.Time + TimerangeEnd time.Time +} + +// now we can optionally add a limit. + +func (tdb *TelemDb) GetPackets(ctx context.Context, filter BusEventFilter, options ...QueryModifier) ([]skylab.BusEvent, error) { + // construct a simple + var whereFrags = make([]string, 0) + + // if we're filtering by names, add a where clause for it. + if len(filter.Names) > 0 { + names := strings.Join(filter.Names, ", ") + qString := fmt.Sprintf("name IN (%s)", names) + whereFrags = append(whereFrags, qString) + } + // TODO: identify if we need a special case for both time ranges + // using BETWEEN since apparenlty that can be better? + + // next, check if we have a start/end time, add constraints + if !filter.TimerangeEnd.IsZero() { + qString := fmt.Sprintf("ts <= %d", filter.TimerangeEnd.UnixMilli()) + whereFrags = append(whereFrags, qString) + } + if !filter.TimerangeStart.IsZero() { + // we have an end range + qString := fmt.Sprintf("ts >= %d", filter.TimerangeStart.UnixMilli()) + whereFrags = append(whereFrags, qString) + } + + sb := strings.Builder{} + sb.WriteString("SELECT * from \"bus_events\"") + // construct the full statement. + if len(whereFrags) > 0 { + // use the where clauses. + sb.WriteString(" WHERE ") + sb.WriteString(strings.Join(whereFrags, " AND ")) + } + + // Augment our data further if there's i.e a limit modifier. + // TODO: factor this out maybe? + for _, m := range options { + m.ModifyStatement(&sb) + } + rows, err := tdb.db.QueryxContext(ctx, sb.String()) + if err != nil { + return nil, err + } + defer rows.Close() + + var events = make([]skylab.BusEvent, 0, 10) + + for rows.Next() { + var ev skylab.RawJsonEvent + err := rows.Scan(&ev.Timestamp, &ev.Name, (*[]byte)(&ev.Data)) + if err != nil { + return nil, err + } + + BusEv := skylab.BusEvent{ + Timestamp: time.UnixMilli(int64(ev.Timestamp)), + Name: ev.Name, + } + BusEv.Data, err = skylab.FromJson(ev.Name, ev.Data) + if err != nil { + return events, nil + } + events = append(events, BusEv) + } + + err = rows.Err() + + return events, err +} + +// We now need a different use-case: we would like to extract a value from +// a specific packet. + +// Datum is a single measurement - it is more granular than a packet. +// the classic example is bms_measurement.current +type Datum struct { + Timestamp time.Time `db:"timestamp"` + Value any `db:"val"` +} + +// GetValues queries the database for values in a given time range. +// A value is a specific data point. For example, bms_measurement.current +// would be a value. +func (tdb *TelemDb) GetValues(ctx context.Context, bef BusEventFilter, + field string, opts ...QueryModifier) ([]Datum, error) { + // this fragment uses json_extract from sqlite to get a single + // nested value. + sb := strings.Builder{} + sb.WriteString(`SELECT ts as timestamp, json_extract(data, '$.' || ?) as val FROM bus_events WHERE `) + if len(bef.Names) != 1 { + return nil, errors.New("invalid number of names") + } + + qStrings := []string{"name is ?"} + // add timestamp limit. + if !bef.TimerangeStart.IsZero() { + qString := fmt.Sprintf("ts >= %d", bef.TimerangeStart.UnixMilli()) + qStrings = append(qStrings, qString) + } + + if !bef.TimerangeEnd.IsZero() { + qString := fmt.Sprintf("ts <= %d", bef.TimerangeEnd.UnixMilli()) + qStrings = append(qStrings, qString) + } + // join qstrings with AND + sb.WriteString(strings.Join(qStrings, " AND ")) + + for _, m := range opts { + if m == nil { + continue + } + m.ModifyStatement(&sb) + } + rows, err := tdb.db.QueryxContext(ctx, sb.String(), field, bef.Names[0]) + if err != nil { + return nil, err + } + defer rows.Close() + data := make([]Datum, 0, 10) + for rows.Next() { + var d Datum = Datum{} + var ts int64 + err = rows.Scan(&ts, &d.Value) + d.Timestamp = time.UnixMilli(ts) + + if err != nil { + fmt.Print(err) + return data, err + } + data = append(data, d) + } + fmt.Print(rows.Err()) + + return data, nil +} + + +// PacketDef is a database packet model +type PacketDef struct { + Name string + Description string + Id int +} + +type FieldDef struct { + Name string + SubName string + Packet string + Type string +} + +// PacketNotFoundError is when a matching packet cannot be found. +type PacketNotFoundError string + +func (e *PacketNotFoundError) Error() string { + return "packet not found: " + string(*e) +} + + +// GetPacketDefN retrieves a packet matching the given name, if it exists. +// returns PacketNotFoundError if a matching packet could not be found. +func (tdb *TelemDb) GetPacketDefN(name string) (*PacketDef, error) { + return nil, nil +} + +// GetPacketDefF retrieves the parent packet for a given field. +// This function cannot return PacketNotFoundError since we have SQL FKs enforcing. +func (tdb *TelemDb) GetPacketDefF(field FieldDef) (*PacketDef, error) { + return nil, nil +} + + +// GetFieldDefs returns the given fields for a given packet definition. +func (tdb *TelemDb) GetFieldDefs(pkt PacketDef) ([]FieldDef, error) { + return nil, nil +} + + diff --git a/internal/db/db_test.go b/db_test.go similarity index 99% rename from internal/db/db_test.go rename to db_test.go index 35d9520..6c2a382 100644 --- a/internal/db/db_test.go +++ b/db_test.go @@ -1,4 +1,4 @@ -package db +package gotelem import ( "bufio" diff --git a/internal/api/http.go b/http.go similarity index 77% rename from internal/api/http.go rename to http.go index f5d21ba..480a2d9 100644 --- a/internal/api/http.go +++ b/http.go @@ -1,4 +1,4 @@ -package api +package gotelem // this file defines the HTTP handlers and routes. @@ -6,20 +6,70 @@ import ( "encoding/json" "fmt" "net/http" + "strconv" + "time" "log/slog" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/google/uuid" - "github.com/kschamplin/gotelem" - "github.com/kschamplin/gotelem/internal/db" "github.com/kschamplin/gotelem/skylab" "nhooyr.io/websocket" "nhooyr.io/websocket/wsjson" ) -func TelemRouter(log *slog.Logger, broker *gotelem.Broker, db *db.TelemDb) http.Handler { +func extractBusEventFilter(r *http.Request) (*BusEventFilter, error) { + + bef := &BusEventFilter{} + + v := r.URL.Query() + bef.Names = v["name"] // put all the names in. + if el := v.Get("start"); el != "" { + // parse the start time query. + t, err := time.Parse(time.RFC3339, el) + if err != nil { + return bef, err + } + bef.TimerangeStart = t + } + if el := v.Get("end"); el != "" { + // parse the start time query. + t, err := time.Parse(time.RFC3339, el) + if err != nil { + return bef, err + } + bef.TimerangeStart = t + } + return bef, nil +} + +func extractLimitModifier(r *http.Request) (*LimitOffsetModifier, error) { + lim := &LimitOffsetModifier{} + v := r.URL.Query() + if el := v.Get("limit"); el != "" { + val, err := strconv.ParseInt(el, 10, 64) + if err != nil { + return nil, err + } + lim.Limit = int(val) + // next, we check if we have an offset. + // we only check offset if we also have a limit. + // offset without limit isn't valid and is ignored. + if el := v.Get("offset"); el != "" { + val, err := strconv.ParseInt(el, 10, 64) + if err != nil { + return nil, err + } + lim.Offset = int(val) + } + return lim, nil + } + // we use the nil case to indicate that no limit was provided. + return nil, nil +} + +func TelemRouter(log *slog.Logger, broker *Broker, db *TelemDb) http.Handler { r := chi.NewRouter() r.Use(middleware.RequestID) @@ -41,7 +91,7 @@ func TelemRouter(log *slog.Logger, broker *gotelem.Broker, db *db.TelemDb) http. } // define API version 1 routes. -func apiV1(broker *gotelem.Broker, tdb *db.TelemDb) chi.Router { +func apiV1(broker *Broker, tdb *TelemDb) chi.Router { r := chi.NewRouter() // this API only accepts JSON. r.Use(middleware.AllowContentType("application/json")) @@ -91,9 +141,8 @@ func apiV1(broker *gotelem.Broker, tdb *db.TelemDb) chi.Router { return r } - // this is a websocket stream. -func apiV1PacketSubscribe(broker *gotelem.Broker, db *db.TelemDb) http.HandlerFunc { +func apiV1PacketSubscribe(broker *Broker, db *TelemDb) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { // pull filter from url query params. bef, err := extractBusEventFilter(r) @@ -122,8 +171,6 @@ func apiV1PacketSubscribe(broker *gotelem.Broker, db *db.TelemDb) http.HandlerFu // we get a context to use from it. ctx := c.CloseRead(r.Context()) - - for { select { case <-ctx.Done(): @@ -148,7 +195,7 @@ func apiV1PacketSubscribe(broker *gotelem.Broker, db *db.TelemDb) http.HandlerFu } } -func apiV1GetPackets(tdb *db.TelemDb) http.HandlerFunc { +func apiV1GetPackets(tdb *TelemDb) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { // this should use http query params to return a list of packets. bef, err := extractBusEventFilter(r) @@ -192,7 +239,7 @@ func apiV1GetPackets(tdb *db.TelemDb) http.HandlerFunc { // apiV1GetValues is a function that creates a handler for // getting the specific value from a packet. // this is useful for OpenMCT or other viewer APIs -func apiV1GetValues(db *db.TelemDb) http.HandlerFunc { +func apiV1GetValues(db *TelemDb) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { var err error @@ -231,26 +278,26 @@ func apiV1GetValues(db *db.TelemDb) http.HandlerFunc { } // TODO: rename. record is not a clear name. Runs? drives? segments? -func apiV1GetRecords(db *db.TelemDb) http.HandlerFunc { +func apiV1GetRecords(db *TelemDb) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { } } -func apiV1GetActiveRecord(db *db.TelemDb) http.HandlerFunc { +func apiV1GetActiveRecord(db *TelemDb) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { } } -func apiV1StartRecord(db *db.TelemDb) http.HandlerFunc { +func apiV1StartRecord(db *TelemDb) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) {} } -func apiV1GetRecord(db *db.TelemDb) http.HandlerFunc { +func apiV1GetRecord(db *TelemDb) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) {} } -func apiV1UpdateRecord(db *db.TelemDb) http.HandlerFunc { +func apiV1UpdateRecord(db *TelemDb) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) {} } diff --git a/internal/api/utils.go b/internal/api/utils.go deleted file mode 100644 index 76ee88d..0000000 --- a/internal/api/utils.go +++ /dev/null @@ -1,59 +0,0 @@ -package api -// This file contains common behaviors that are used across various requests -import ( - "net/http" - "strconv" - "time" - - "github.com/kschamplin/gotelem/internal/db" -) - -func extractBusEventFilter(r *http.Request) (*db.BusEventFilter, error) { - - bef := &db.BusEventFilter{} - - v := r.URL.Query() - bef.Names = v["name"] // put all the names in. - if el := v.Get("start"); el != "" { - // parse the start time query. - t, err := time.Parse(time.RFC3339, el) - if err != nil { - return bef, err - } - bef.TimerangeStart = t - } - if el := v.Get("end"); el != "" { - // parse the start time query. - t, err := time.Parse(time.RFC3339, el) - if err != nil { - return bef, err - } - bef.TimerangeStart = t - } - return bef, nil -} - -func extractLimitModifier(r *http.Request) (*db.LimitOffsetModifier, error) { - lim := &db.LimitOffsetModifier{} - v := r.URL.Query() - if el := v.Get("limit"); el != "" { - val, err := strconv.ParseInt(el, 10, 64) - if err != nil { - return nil, err - } - lim.Limit = int(val) - // next, we check if we have an offset. - // we only check offset if we also have a limit. - // offset without limit isn't valid and is ignored. - if el := v.Get("offset"); el != "" { - val, err := strconv.ParseInt(el, 10, 64) - if err != nil { - return nil, err - } - lim.Offset = int(val) - } - return lim, nil - } - // we use the nil case to indicate that no limit was provided. - return nil, nil -} diff --git a/internal/db/db.go b/internal/db/db.go deleted file mode 100644 index cf60081..0000000 --- a/internal/db/db.go +++ /dev/null @@ -1,152 +0,0 @@ -package db - -// this file implements the database functions to load/store/read from a sql database. - -import ( - "context" - "encoding/json" - "fmt" - "strings" - "time" - - "github.com/jmoiron/sqlx" - "github.com/kschamplin/gotelem/skylab" - _ "github.com/mattn/go-sqlite3" -) - - -type TelemDb struct { - db *sqlx.DB -} - -// TelemDbOption lets you customize the behavior of the sqlite database -type TelemDbOption func(*TelemDb) error - - -// this function is internal use. It actually opens the database, but uses -// a raw path string instead of formatting one like the exported functions. -func OpenRawDb(rawpath string, options ...TelemDbOption) (tdb *TelemDb, err error) { - tdb = &TelemDb{} - tdb.db, err = sqlx.Connect("sqlite3", rawpath) - if err != nil { - return - } - for _, fn := range options { - err = fn(tdb) - if err != nil { - return - } - } - - // perform any database migrations - version, err := tdb.GetVersion() - if err != nil { - return - } - // TODO: use logging instead of printf - fmt.Printf("starting version %d\n", version) - - version, err = RunMigrations(tdb) - fmt.Printf("ending version %d\n", version) - - return tdb, err -} - - -// this string is used to open the read-write db. -// the extra options improve performance significantly. -const ProductionDbURI = "file:%s?_journal_mode=wal&mode=rwc&_txlock=immediate&_timeout=10000" - -// OpenTelemDb opens a new telemetry database at the given path. -func OpenTelemDb(path string, options ...TelemDbOption) (*TelemDb, error) { - dbStr := fmt.Sprintf(ProductionDbURI, path) - return OpenRawDb(dbStr, options...) -} - -func (tdb *TelemDb) GetVersion() (int, error) { - var version int - err := tdb.db.Get(&version, "PRAGMA user_version") - return version, err -} - -func (tdb *TelemDb) SetVersion(version int) error { - stmt := fmt.Sprintf("PRAGMA user_version = %d", version) - _, err := tdb.db.Exec(stmt) - return err -} - -// sql expression to insert a bus event into the packets database.1 -const sqlInsertEvent =`INSERT INTO "bus_events" (ts, name, data) VALUES ` - -// AddEvent adds the bus event to the database. -func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...skylab.BusEvent) (n int64, err error) { - // edge case - zero events. - if len(events) == 0 { - return 0, nil - } - n = 0 - tx, err := tdb.db.BeginTx(ctx, nil) - defer tx.Rollback() - if err != nil { - return - } - - sqlStmt := sqlInsertEvent - const rowSql = "(?, ?, json(?))" - inserts := make([]string, len(events)) - vals := []interface{}{} - idx := 0 // we have to manually increment, because sometimes we don't insert. - for _, b := range events { - inserts[idx] = rowSql - var j []byte - j, err = json.Marshal(b.Data) - - if err != nil { - // we had some error turning the packet into json. - continue // we silently skip. - } - - vals = append(vals, b.Timestamp.UnixMilli(), b.Data.String(), j) - idx++ - } - - // construct the full statement now - sqlStmt = sqlStmt + strings.Join(inserts[:idx], ",") - stmt, err := tx.PrepareContext(ctx, sqlStmt) - // defer stmt.Close() - if err != nil { - return - } - res, err := stmt.ExecContext(ctx, vals...) - if err != nil { - return - } - n, err = res.RowsAffected() - - tx.Commit() - return -} - -func (tdb *TelemDb) AddEvents(events ...skylab.BusEvent) (int64, error) { - - return tdb.AddEventsCtx(context.Background(), events...) -} - - -// GetActiveDrive finds the non-null drive and returns it, if any. -func (tdb *TelemDb) GetActiveDrive() (res int, err error) { - err = tdb.db.Get(&res, "SELECT id FROM drive_records WHERE end_time IS NULL LIMIT 1") - return -} - -func (tdb *TelemDb) NewDrive(start time.Time, note string) { - -} - -func (tdb *TelemDb) EndDrive() { - -} - -func (tdb *TelemDb) UpdateDrive(id int, note string) { - -} diff --git a/internal/db/getters.go b/internal/db/getters.go deleted file mode 100644 index efedc7e..0000000 --- a/internal/db/getters.go +++ /dev/null @@ -1,172 +0,0 @@ -package db - -import ( - "context" - "errors" - "fmt" - "strings" - "time" - - "github.com/kschamplin/gotelem/skylab" -) - -// Modifier augments SQL strings. -type Modifier interface { - ModifyStatement(*strings.Builder) error -} - -// LimitOffsetModifier is a modifier to support pagniation. -type LimitOffsetModifier struct { - Limit int - Offset int -} - -func (l LimitOffsetModifier) ModifyStatement(sb *strings.Builder) error { - clause := fmt.Sprintf(" LIMIT %d OFFSET %d", l.Limit, l.Offset) - sb.WriteString(clause) - return nil -} - -// BusEventFilter is a filter for bus events. -type BusEventFilter struct { - Names []string - TimerangeStart time.Time - TimerangeEnd time.Time -} - -// now we can optionally add a limit. - -func (tdb *TelemDb) GetPackets(ctx context.Context, filter BusEventFilter, options ...Modifier) ([]skylab.BusEvent, error) { - // construct a simple - var whereFrags = make([]string, 0) - - // if we're filtering by names, add a where clause for it. - if len(filter.Names) > 0 { - names := strings.Join(filter.Names, ", ") - qString := fmt.Sprintf("name IN (%s)", names) - whereFrags = append(whereFrags, qString) - } - // TODO: identify if we need a special case for both time ranges - // using BETWEEN since apparenlty that can be better? - - // next, check if we have a start/end time, add constraints - if !filter.TimerangeEnd.IsZero() { - qString := fmt.Sprintf("ts <= %d", filter.TimerangeEnd.UnixMilli()) - whereFrags = append(whereFrags, qString) - } - if !filter.TimerangeStart.IsZero() { - // we have an end range - qString := fmt.Sprintf("ts >= %d", filter.TimerangeStart.UnixMilli()) - whereFrags = append(whereFrags, qString) - } - - sb := strings.Builder{} - sb.WriteString("SELECT * from \"bus_events\"") - // construct the full statement. - if len(whereFrags) > 0 { - // use the where clauses. - sb.WriteString(" WHERE ") - sb.WriteString(strings.Join(whereFrags, " AND ")) - } - - // Augment our data further if there's i.e a limit modifier. - // TODO: factor this out maybe? - for _, m := range options { - m.ModifyStatement(&sb) - } - rows, err := tdb.db.QueryxContext(ctx, sb.String()) - if err != nil { - return nil, err - } - defer rows.Close() - - var events = make([]skylab.BusEvent, 0, 10) - - for rows.Next() { - var ev skylab.RawJsonEvent - err := rows.Scan(&ev.Timestamp, &ev.Name, (*[]byte)(&ev.Data)) - if err != nil { - return nil, err - } - - BusEv := skylab.BusEvent{ - Timestamp: time.UnixMilli(int64(ev.Timestamp)), - Name: ev.Name, - } - BusEv.Data, err = skylab.FromJson(ev.Name, ev.Data) - if err != nil { - return events, nil - } - events = append(events, BusEv) - } - - err = rows.Err() - - return events, err -} - -// We now need a different use-case: we would like to extract a value from -// a specific packet. - -// Datum is a single measurement - it is more granular than a packet. -// the classic example is bms_measurement.current -type Datum struct { - Timestamp time.Time `db:"timestamp"` - Value any `db:"val"` -} - -// GetValues queries the database for values in a given time range. -// A value is a specific data point. For example, bms_measurement.current -// would be a value. -func (tdb *TelemDb) GetValues(ctx context.Context, bef BusEventFilter, - field string, opts ...Modifier) ([]Datum, error) { - // this fragment uses json_extract from sqlite to get a single - // nested value. - sb := strings.Builder{} - sb.WriteString(`SELECT ts as timestamp, json_extract(data, '$.' || ?) as val FROM bus_events WHERE `) - if len(bef.Names) != 1 { - return nil, errors.New("invalid number of names") - } - - qStrings := []string{"name is ?"} - // add timestamp limit. - if !bef.TimerangeStart.IsZero() { - qString := fmt.Sprintf("ts >= %d", bef.TimerangeStart.UnixMilli()) - qStrings = append(qStrings, qString) - } - - if !bef.TimerangeEnd.IsZero() { - qString := fmt.Sprintf("ts <= %d", bef.TimerangeEnd.UnixMilli()) - qStrings = append(qStrings, qString) - } - // join qstrings with AND - sb.WriteString(strings.Join(qStrings, " AND ")) - - for _, m := range opts { - if m == nil { - continue - } - m.ModifyStatement(&sb) - } - rows, err := tdb.db.QueryxContext(ctx, sb.String(), field, bef.Names[0]) - if err != nil { - return nil, err - } - defer rows.Close() - data := make([]Datum, 0, 10) - for rows.Next() { - var d Datum = Datum{} - var ts int64 - err = rows.Scan(&ts, &d.Value) - d.Timestamp = time.UnixMilli(ts) - - if err != nil { - fmt.Print(err) - return data, err - } - data = append(data, d) - } - fmt.Print(rows.Err()) - - return data, nil -} diff --git a/internal/db/packets.go b/internal/db/packets.go deleted file mode 100644 index 0dc8440..0000000 --- a/internal/db/packets.go +++ /dev/null @@ -1,43 +0,0 @@ -package db -// This file implements Packet modelling, which allows us to look up fields by name - -type PacketDef struct { - Name string - Description string - Id int -} - -type FieldDef struct { - Name string - SubName string - Packet string - Type string -} - -// PacketNotFoundError is when a matching packet cannot be found. -type PacketNotFoundError string - -func (e *PacketNotFoundError) Error() string { - return "packet not found: " + string(*e) -} - - -// GetPacketDefN retrieves a packet matching the given name, if it exists. -// returns PacketNotFoundError if a matching packet could not be found. -func (tdb *TelemDb) GetPacketDefN(name string) (*PacketDef, error) { - return nil, nil -} - -// GetPacketDefF retrieves the parent packet for a given field. -// This function cannot return PacketNotFoundError since we have SQL FKs enforcing. -func (tdb *TelemDb) GetPacketDefF(field FieldDef) (*PacketDef, error) { - return nil, nil -} - - -// GetFieldDefs returns the given fields for a given packet definition. -func (tdb *TelemDb) GetFieldDefs(pkt PacketDef) ([]FieldDef, error) { - return nil, nil -} - - diff --git a/internal/db/migration.go b/migration.go similarity index 99% rename from internal/db/migration.go rename to migration.go index 05b8684..26c2278 100644 --- a/internal/db/migration.go +++ b/migration.go @@ -1,4 +1,4 @@ -package db +package gotelem import ( "embed" diff --git a/internal/db/migration_test.go b/migration_test.go similarity index 99% rename from internal/db/migration_test.go rename to migration_test.go index 51e018b..16c63c9 100644 --- a/internal/db/migration_test.go +++ b/migration_test.go @@ -1,4 +1,4 @@ -package db +package gotelem import ( "embed" diff --git a/internal/db/migrations/1_initial_down.sql b/migrations/1_initial_down.sql similarity index 100% rename from internal/db/migrations/1_initial_down.sql rename to migrations/1_initial_down.sql diff --git a/internal/db/migrations/1_initial_up.sql b/migrations/1_initial_up.sql similarity index 100% rename from internal/db/migrations/1_initial_up.sql rename to migrations/1_initial_up.sql diff --git a/internal/db/migrations/2_addl_tables_down.sql b/migrations/2_addl_tables_down.sql similarity index 100% rename from internal/db/migrations/2_addl_tables_down.sql rename to migrations/2_addl_tables_down.sql diff --git a/internal/db/migrations/2_addl_tables_up.sql b/migrations/2_addl_tables_up.sql similarity index 100% rename from internal/db/migrations/2_addl_tables_up.sql rename to migrations/2_addl_tables_up.sql diff --git a/internal/db/migrations/3_weather_down.sql b/migrations/3_weather_down.sql similarity index 100% rename from internal/db/migrations/3_weather_down.sql rename to migrations/3_weather_down.sql diff --git a/internal/db/migrations/3_weather_up.sql b/migrations/3_weather_up.sql similarity index 100% rename from internal/db/migrations/3_weather_up.sql rename to migrations/3_weather_up.sql diff --git a/internal/db/migrations/4_import_table_down.sql b/migrations/4_import_table_down.sql similarity index 100% rename from internal/db/migrations/4_import_table_down.sql rename to migrations/4_import_table_down.sql diff --git a/internal/db/migrations/4_import_table_up.sql b/migrations/4_import_table_up.sql similarity index 100% rename from internal/db/migrations/4_import_table_up.sql rename to migrations/4_import_table_up.sql diff --git a/internal/db/migrations/5_add_packets_down.sql b/migrations/5_add_packets_down.sql similarity index 100% rename from internal/db/migrations/5_add_packets_down.sql rename to migrations/5_add_packets_down.sql diff --git a/internal/db/migrations/5_add_packets_up.sql b/migrations/5_add_packets_up.sql similarity index 100% rename from internal/db/migrations/5_add_packets_up.sql rename to migrations/5_add_packets_up.sql