combine separate packages
All checks were successful
Go / build (1.21) (push) Successful in 1m16s
Go / build (1.22) (push) Successful in 1m15s

This commit is contained in:
saji 2024-03-02 21:48:55 -06:00
parent 00fa67a67d
commit 4e6f8db7ed
21 changed files with 406 additions and 452 deletions

View file

@ -9,9 +9,9 @@ import (
"strings" "strings"
"sync/atomic" "sync/atomic"
"github.com/kschamplin/gotelem/internal/db"
"github.com/kschamplin/gotelem/internal/logparsers" "github.com/kschamplin/gotelem/internal/logparsers"
"github.com/kschamplin/gotelem/skylab" "github.com/kschamplin/gotelem/skylab"
"github.com/kschamplin/gotelem"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
@ -81,7 +81,7 @@ func importAction(ctx *cli.Context) error {
} }
dbPath := ctx.Path("database") dbPath := ctx.Path("database")
db, err := db.OpenTelemDb(dbPath) db, err := gotelem.OpenTelemDb(dbPath)
if err != nil { if err != nil {
return fmt.Errorf("error opening database: %w", err) return fmt.Errorf("error opening database: %w", err)
} }

View file

@ -11,8 +11,6 @@ import (
"log/slog" "log/slog"
"github.com/kschamplin/gotelem" "github.com/kschamplin/gotelem"
"github.com/kschamplin/gotelem/internal/api"
"github.com/kschamplin/gotelem/internal/db"
"github.com/kschamplin/gotelem/skylab" "github.com/kschamplin/gotelem/skylab"
"github.com/kschamplin/gotelem/xbee" "github.com/kschamplin/gotelem/xbee"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
@ -59,7 +57,7 @@ type service interface {
type svcDeps struct { type svcDeps struct {
Broker *gotelem.Broker Broker *gotelem.Broker
Db *db.TelemDb Db *gotelem.TelemDb
Logger *slog.Logger Logger *slog.Logger
} }
@ -99,7 +97,7 @@ func serve(cCtx *cli.Context) error {
dbPath = cCtx.Path("db") dbPath = cCtx.Path("db")
} }
logger.Info("opening database", "path", dbPath) logger.Info("opening database", "path", dbPath)
db, err := db.OpenTelemDb(dbPath) db, err := gotelem.OpenTelemDb(dbPath)
if err != nil { if err != nil {
return err return err
} }
@ -220,7 +218,7 @@ func (h *httpService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
broker := deps.Broker broker := deps.Broker
db := deps.Db db := deps.Db
r := api.TelemRouter(logger, broker, db) r := gotelem.TelemRouter(logger, broker, db)
// //

335
db.go Normal file
View file

@ -0,0 +1,335 @@
package gotelem
// this file implements the database functions to load/store/read from a sql database.
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/jmoiron/sqlx"
"github.com/kschamplin/gotelem/skylab"
_ "github.com/mattn/go-sqlite3"
)
type TelemDb struct {
db *sqlx.DB
}
// TelemDbOption lets you customize the behavior of the sqlite database
type TelemDbOption func(*TelemDb) error
// this function is internal use. It actually opens the database, but uses
// a raw path string instead of formatting one like the exported functions.
func OpenRawDb(rawpath string, options ...TelemDbOption) (tdb *TelemDb, err error) {
tdb = &TelemDb{}
tdb.db, err = sqlx.Connect("sqlite3", rawpath)
if err != nil {
return
}
for _, fn := range options {
err = fn(tdb)
if err != nil {
return
}
}
// perform any database migrations
version, err := tdb.GetVersion()
if err != nil {
return
}
// TODO: use logging instead of printf
fmt.Printf("starting version %d\n", version)
version, err = RunMigrations(tdb)
fmt.Printf("ending version %d\n", version)
return tdb, err
}
// this string is used to open the read-write db.
// the extra options improve performance significantly.
const ProductionDbURI = "file:%s?_journal_mode=wal&mode=rwc&_txlock=immediate&_timeout=10000"
// OpenTelemDb opens a new telemetry database at the given path.
func OpenTelemDb(path string, options ...TelemDbOption) (*TelemDb, error) {
dbStr := fmt.Sprintf(ProductionDbURI, path)
return OpenRawDb(dbStr, options...)
}
func (tdb *TelemDb) GetVersion() (int, error) {
var version int
err := tdb.db.Get(&version, "PRAGMA user_version")
return version, err
}
func (tdb *TelemDb) SetVersion(version int) error {
stmt := fmt.Sprintf("PRAGMA user_version = %d", version)
_, err := tdb.db.Exec(stmt)
return err
}
// sql expression to insert a bus event into the packets database.1
const sqlInsertEvent = `INSERT INTO "bus_events" (ts, name, data) VALUES `
// AddEvent adds the bus event to the database.
func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...skylab.BusEvent) (n int64, err error) {
// edge case - zero events.
if len(events) == 0 {
return 0, nil
}
n = 0
tx, err := tdb.db.BeginTx(ctx, nil)
defer tx.Rollback()
if err != nil {
return
}
sqlStmt := sqlInsertEvent
const rowSql = "(?, ?, json(?))"
inserts := make([]string, len(events))
vals := []interface{}{}
idx := 0 // we have to manually increment, because sometimes we don't insert.
for _, b := range events {
inserts[idx] = rowSql
var j []byte
j, err = json.Marshal(b.Data)
if err != nil {
// we had some error turning the packet into json.
continue // we silently skip.
}
vals = append(vals, b.Timestamp.UnixMilli(), b.Data.String(), j)
idx++
}
// construct the full statement now
sqlStmt = sqlStmt + strings.Join(inserts[:idx], ",")
stmt, err := tx.PrepareContext(ctx, sqlStmt)
// defer stmt.Close()
if err != nil {
return
}
res, err := stmt.ExecContext(ctx, vals...)
if err != nil {
return
}
n, err = res.RowsAffected()
tx.Commit()
return
}
func (tdb *TelemDb) AddEvents(events ...skylab.BusEvent) (int64, error) {
return tdb.AddEventsCtx(context.Background(), events...)
}
// QueryModifier augments SQL strings.
type QueryModifier interface {
ModifyStatement(*strings.Builder) error
}
// LimitOffsetModifier is a modifier to support pagniation.
type LimitOffsetModifier struct {
Limit int
Offset int
}
func (l LimitOffsetModifier) ModifyStatement(sb *strings.Builder) error {
clause := fmt.Sprintf(" LIMIT %d OFFSET %d", l.Limit, l.Offset)
sb.WriteString(clause)
return nil
}
// BusEventFilter is a filter for bus events.
type BusEventFilter struct {
Names []string
TimerangeStart time.Time
TimerangeEnd time.Time
}
// now we can optionally add a limit.
func (tdb *TelemDb) GetPackets(ctx context.Context, filter BusEventFilter, options ...QueryModifier) ([]skylab.BusEvent, error) {
// construct a simple
var whereFrags = make([]string, 0)
// if we're filtering by names, add a where clause for it.
if len(filter.Names) > 0 {
names := strings.Join(filter.Names, ", ")
qString := fmt.Sprintf("name IN (%s)", names)
whereFrags = append(whereFrags, qString)
}
// TODO: identify if we need a special case for both time ranges
// using BETWEEN since apparenlty that can be better?
// next, check if we have a start/end time, add constraints
if !filter.TimerangeEnd.IsZero() {
qString := fmt.Sprintf("ts <= %d", filter.TimerangeEnd.UnixMilli())
whereFrags = append(whereFrags, qString)
}
if !filter.TimerangeStart.IsZero() {
// we have an end range
qString := fmt.Sprintf("ts >= %d", filter.TimerangeStart.UnixMilli())
whereFrags = append(whereFrags, qString)
}
sb := strings.Builder{}
sb.WriteString("SELECT * from \"bus_events\"")
// construct the full statement.
if len(whereFrags) > 0 {
// use the where clauses.
sb.WriteString(" WHERE ")
sb.WriteString(strings.Join(whereFrags, " AND "))
}
// Augment our data further if there's i.e a limit modifier.
// TODO: factor this out maybe?
for _, m := range options {
m.ModifyStatement(&sb)
}
rows, err := tdb.db.QueryxContext(ctx, sb.String())
if err != nil {
return nil, err
}
defer rows.Close()
var events = make([]skylab.BusEvent, 0, 10)
for rows.Next() {
var ev skylab.RawJsonEvent
err := rows.Scan(&ev.Timestamp, &ev.Name, (*[]byte)(&ev.Data))
if err != nil {
return nil, err
}
BusEv := skylab.BusEvent{
Timestamp: time.UnixMilli(int64(ev.Timestamp)),
Name: ev.Name,
}
BusEv.Data, err = skylab.FromJson(ev.Name, ev.Data)
if err != nil {
return events, nil
}
events = append(events, BusEv)
}
err = rows.Err()
return events, err
}
// We now need a different use-case: we would like to extract a value from
// a specific packet.
// Datum is a single measurement - it is more granular than a packet.
// the classic example is bms_measurement.current
type Datum struct {
Timestamp time.Time `db:"timestamp"`
Value any `db:"val"`
}
// GetValues queries the database for values in a given time range.
// A value is a specific data point. For example, bms_measurement.current
// would be a value.
func (tdb *TelemDb) GetValues(ctx context.Context, bef BusEventFilter,
field string, opts ...QueryModifier) ([]Datum, error) {
// this fragment uses json_extract from sqlite to get a single
// nested value.
sb := strings.Builder{}
sb.WriteString(`SELECT ts as timestamp, json_extract(data, '$.' || ?) as val FROM bus_events WHERE `)
if len(bef.Names) != 1 {
return nil, errors.New("invalid number of names")
}
qStrings := []string{"name is ?"}
// add timestamp limit.
if !bef.TimerangeStart.IsZero() {
qString := fmt.Sprintf("ts >= %d", bef.TimerangeStart.UnixMilli())
qStrings = append(qStrings, qString)
}
if !bef.TimerangeEnd.IsZero() {
qString := fmt.Sprintf("ts <= %d", bef.TimerangeEnd.UnixMilli())
qStrings = append(qStrings, qString)
}
// join qstrings with AND
sb.WriteString(strings.Join(qStrings, " AND "))
for _, m := range opts {
if m == nil {
continue
}
m.ModifyStatement(&sb)
}
rows, err := tdb.db.QueryxContext(ctx, sb.String(), field, bef.Names[0])
if err != nil {
return nil, err
}
defer rows.Close()
data := make([]Datum, 0, 10)
for rows.Next() {
var d Datum = Datum{}
var ts int64
err = rows.Scan(&ts, &d.Value)
d.Timestamp = time.UnixMilli(ts)
if err != nil {
fmt.Print(err)
return data, err
}
data = append(data, d)
}
fmt.Print(rows.Err())
return data, nil
}
// PacketDef is a database packet model
type PacketDef struct {
Name string
Description string
Id int
}
type FieldDef struct {
Name string
SubName string
Packet string
Type string
}
// PacketNotFoundError is when a matching packet cannot be found.
type PacketNotFoundError string
func (e *PacketNotFoundError) Error() string {
return "packet not found: " + string(*e)
}
// GetPacketDefN retrieves a packet matching the given name, if it exists.
// returns PacketNotFoundError if a matching packet could not be found.
func (tdb *TelemDb) GetPacketDefN(name string) (*PacketDef, error) {
return nil, nil
}
// GetPacketDefF retrieves the parent packet for a given field.
// This function cannot return PacketNotFoundError since we have SQL FKs enforcing.
func (tdb *TelemDb) GetPacketDefF(field FieldDef) (*PacketDef, error) {
return nil, nil
}
// GetFieldDefs returns the given fields for a given packet definition.
func (tdb *TelemDb) GetFieldDefs(pkt PacketDef) ([]FieldDef, error) {
return nil, nil
}

View file

@ -1,4 +1,4 @@
package db package gotelem
import ( import (
"bufio" "bufio"

View file

@ -1,4 +1,4 @@
package api package gotelem
// this file defines the HTTP handlers and routes. // this file defines the HTTP handlers and routes.
@ -6,20 +6,70 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"strconv"
"time"
"log/slog" "log/slog"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware" "github.com/go-chi/chi/v5/middleware"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/kschamplin/gotelem"
"github.com/kschamplin/gotelem/internal/db"
"github.com/kschamplin/gotelem/skylab" "github.com/kschamplin/gotelem/skylab"
"nhooyr.io/websocket" "nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson" "nhooyr.io/websocket/wsjson"
) )
func TelemRouter(log *slog.Logger, broker *gotelem.Broker, db *db.TelemDb) http.Handler { func extractBusEventFilter(r *http.Request) (*BusEventFilter, error) {
bef := &BusEventFilter{}
v := r.URL.Query()
bef.Names = v["name"] // put all the names in.
if el := v.Get("start"); el != "" {
// parse the start time query.
t, err := time.Parse(time.RFC3339, el)
if err != nil {
return bef, err
}
bef.TimerangeStart = t
}
if el := v.Get("end"); el != "" {
// parse the start time query.
t, err := time.Parse(time.RFC3339, el)
if err != nil {
return bef, err
}
bef.TimerangeStart = t
}
return bef, nil
}
func extractLimitModifier(r *http.Request) (*LimitOffsetModifier, error) {
lim := &LimitOffsetModifier{}
v := r.URL.Query()
if el := v.Get("limit"); el != "" {
val, err := strconv.ParseInt(el, 10, 64)
if err != nil {
return nil, err
}
lim.Limit = int(val)
// next, we check if we have an offset.
// we only check offset if we also have a limit.
// offset without limit isn't valid and is ignored.
if el := v.Get("offset"); el != "" {
val, err := strconv.ParseInt(el, 10, 64)
if err != nil {
return nil, err
}
lim.Offset = int(val)
}
return lim, nil
}
// we use the nil case to indicate that no limit was provided.
return nil, nil
}
func TelemRouter(log *slog.Logger, broker *Broker, db *TelemDb) http.Handler {
r := chi.NewRouter() r := chi.NewRouter()
r.Use(middleware.RequestID) r.Use(middleware.RequestID)
@ -41,7 +91,7 @@ func TelemRouter(log *slog.Logger, broker *gotelem.Broker, db *db.TelemDb) http.
} }
// define API version 1 routes. // define API version 1 routes.
func apiV1(broker *gotelem.Broker, tdb *db.TelemDb) chi.Router { func apiV1(broker *Broker, tdb *TelemDb) chi.Router {
r := chi.NewRouter() r := chi.NewRouter()
// this API only accepts JSON. // this API only accepts JSON.
r.Use(middleware.AllowContentType("application/json")) r.Use(middleware.AllowContentType("application/json"))
@ -91,9 +141,8 @@ func apiV1(broker *gotelem.Broker, tdb *db.TelemDb) chi.Router {
return r return r
} }
// this is a websocket stream. // this is a websocket stream.
func apiV1PacketSubscribe(broker *gotelem.Broker, db *db.TelemDb) http.HandlerFunc { func apiV1PacketSubscribe(broker *Broker, db *TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
// pull filter from url query params. // pull filter from url query params.
bef, err := extractBusEventFilter(r) bef, err := extractBusEventFilter(r)
@ -122,8 +171,6 @@ func apiV1PacketSubscribe(broker *gotelem.Broker, db *db.TelemDb) http.HandlerFu
// we get a context to use from it. // we get a context to use from it.
ctx := c.CloseRead(r.Context()) ctx := c.CloseRead(r.Context())
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -148,7 +195,7 @@ func apiV1PacketSubscribe(broker *gotelem.Broker, db *db.TelemDb) http.HandlerFu
} }
} }
func apiV1GetPackets(tdb *db.TelemDb) http.HandlerFunc { func apiV1GetPackets(tdb *TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
// this should use http query params to return a list of packets. // this should use http query params to return a list of packets.
bef, err := extractBusEventFilter(r) bef, err := extractBusEventFilter(r)
@ -192,7 +239,7 @@ func apiV1GetPackets(tdb *db.TelemDb) http.HandlerFunc {
// apiV1GetValues is a function that creates a handler for // apiV1GetValues is a function that creates a handler for
// getting the specific value from a packet. // getting the specific value from a packet.
// this is useful for OpenMCT or other viewer APIs // this is useful for OpenMCT or other viewer APIs
func apiV1GetValues(db *db.TelemDb) http.HandlerFunc { func apiV1GetValues(db *TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
var err error var err error
@ -231,26 +278,26 @@ func apiV1GetValues(db *db.TelemDb) http.HandlerFunc {
} }
// TODO: rename. record is not a clear name. Runs? drives? segments? // TODO: rename. record is not a clear name. Runs? drives? segments?
func apiV1GetRecords(db *db.TelemDb) http.HandlerFunc { func apiV1GetRecords(db *TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
} }
} }
func apiV1GetActiveRecord(db *db.TelemDb) http.HandlerFunc { func apiV1GetActiveRecord(db *TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
} }
} }
func apiV1StartRecord(db *db.TelemDb) http.HandlerFunc { func apiV1StartRecord(db *TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {} return func(w http.ResponseWriter, r *http.Request) {}
} }
func apiV1GetRecord(db *db.TelemDb) http.HandlerFunc { func apiV1GetRecord(db *TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {} return func(w http.ResponseWriter, r *http.Request) {}
} }
func apiV1UpdateRecord(db *db.TelemDb) http.HandlerFunc { func apiV1UpdateRecord(db *TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {} return func(w http.ResponseWriter, r *http.Request) {}
} }

View file

@ -1,59 +0,0 @@
package api
// This file contains common behaviors that are used across various requests
import (
"net/http"
"strconv"
"time"
"github.com/kschamplin/gotelem/internal/db"
)
func extractBusEventFilter(r *http.Request) (*db.BusEventFilter, error) {
bef := &db.BusEventFilter{}
v := r.URL.Query()
bef.Names = v["name"] // put all the names in.
if el := v.Get("start"); el != "" {
// parse the start time query.
t, err := time.Parse(time.RFC3339, el)
if err != nil {
return bef, err
}
bef.TimerangeStart = t
}
if el := v.Get("end"); el != "" {
// parse the start time query.
t, err := time.Parse(time.RFC3339, el)
if err != nil {
return bef, err
}
bef.TimerangeStart = t
}
return bef, nil
}
func extractLimitModifier(r *http.Request) (*db.LimitOffsetModifier, error) {
lim := &db.LimitOffsetModifier{}
v := r.URL.Query()
if el := v.Get("limit"); el != "" {
val, err := strconv.ParseInt(el, 10, 64)
if err != nil {
return nil, err
}
lim.Limit = int(val)
// next, we check if we have an offset.
// we only check offset if we also have a limit.
// offset without limit isn't valid and is ignored.
if el := v.Get("offset"); el != "" {
val, err := strconv.ParseInt(el, 10, 64)
if err != nil {
return nil, err
}
lim.Offset = int(val)
}
return lim, nil
}
// we use the nil case to indicate that no limit was provided.
return nil, nil
}

View file

@ -1,152 +0,0 @@
package db
// this file implements the database functions to load/store/read from a sql database.
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"github.com/jmoiron/sqlx"
"github.com/kschamplin/gotelem/skylab"
_ "github.com/mattn/go-sqlite3"
)
type TelemDb struct {
db *sqlx.DB
}
// TelemDbOption lets you customize the behavior of the sqlite database
type TelemDbOption func(*TelemDb) error
// this function is internal use. It actually opens the database, but uses
// a raw path string instead of formatting one like the exported functions.
func OpenRawDb(rawpath string, options ...TelemDbOption) (tdb *TelemDb, err error) {
tdb = &TelemDb{}
tdb.db, err = sqlx.Connect("sqlite3", rawpath)
if err != nil {
return
}
for _, fn := range options {
err = fn(tdb)
if err != nil {
return
}
}
// perform any database migrations
version, err := tdb.GetVersion()
if err != nil {
return
}
// TODO: use logging instead of printf
fmt.Printf("starting version %d\n", version)
version, err = RunMigrations(tdb)
fmt.Printf("ending version %d\n", version)
return tdb, err
}
// this string is used to open the read-write db.
// the extra options improve performance significantly.
const ProductionDbURI = "file:%s?_journal_mode=wal&mode=rwc&_txlock=immediate&_timeout=10000"
// OpenTelemDb opens a new telemetry database at the given path.
func OpenTelemDb(path string, options ...TelemDbOption) (*TelemDb, error) {
dbStr := fmt.Sprintf(ProductionDbURI, path)
return OpenRawDb(dbStr, options...)
}
func (tdb *TelemDb) GetVersion() (int, error) {
var version int
err := tdb.db.Get(&version, "PRAGMA user_version")
return version, err
}
func (tdb *TelemDb) SetVersion(version int) error {
stmt := fmt.Sprintf("PRAGMA user_version = %d", version)
_, err := tdb.db.Exec(stmt)
return err
}
// sql expression to insert a bus event into the packets database.1
const sqlInsertEvent =`INSERT INTO "bus_events" (ts, name, data) VALUES `
// AddEvent adds the bus event to the database.
func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...skylab.BusEvent) (n int64, err error) {
// edge case - zero events.
if len(events) == 0 {
return 0, nil
}
n = 0
tx, err := tdb.db.BeginTx(ctx, nil)
defer tx.Rollback()
if err != nil {
return
}
sqlStmt := sqlInsertEvent
const rowSql = "(?, ?, json(?))"
inserts := make([]string, len(events))
vals := []interface{}{}
idx := 0 // we have to manually increment, because sometimes we don't insert.
for _, b := range events {
inserts[idx] = rowSql
var j []byte
j, err = json.Marshal(b.Data)
if err != nil {
// we had some error turning the packet into json.
continue // we silently skip.
}
vals = append(vals, b.Timestamp.UnixMilli(), b.Data.String(), j)
idx++
}
// construct the full statement now
sqlStmt = sqlStmt + strings.Join(inserts[:idx], ",")
stmt, err := tx.PrepareContext(ctx, sqlStmt)
// defer stmt.Close()
if err != nil {
return
}
res, err := stmt.ExecContext(ctx, vals...)
if err != nil {
return
}
n, err = res.RowsAffected()
tx.Commit()
return
}
func (tdb *TelemDb) AddEvents(events ...skylab.BusEvent) (int64, error) {
return tdb.AddEventsCtx(context.Background(), events...)
}
// GetActiveDrive finds the non-null drive and returns it, if any.
func (tdb *TelemDb) GetActiveDrive() (res int, err error) {
err = tdb.db.Get(&res, "SELECT id FROM drive_records WHERE end_time IS NULL LIMIT 1")
return
}
func (tdb *TelemDb) NewDrive(start time.Time, note string) {
}
func (tdb *TelemDb) EndDrive() {
}
func (tdb *TelemDb) UpdateDrive(id int, note string) {
}

View file

@ -1,172 +0,0 @@
package db
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/kschamplin/gotelem/skylab"
)
// Modifier augments SQL strings.
type Modifier interface {
ModifyStatement(*strings.Builder) error
}
// LimitOffsetModifier is a modifier to support pagniation.
type LimitOffsetModifier struct {
Limit int
Offset int
}
func (l LimitOffsetModifier) ModifyStatement(sb *strings.Builder) error {
clause := fmt.Sprintf(" LIMIT %d OFFSET %d", l.Limit, l.Offset)
sb.WriteString(clause)
return nil
}
// BusEventFilter is a filter for bus events.
type BusEventFilter struct {
Names []string
TimerangeStart time.Time
TimerangeEnd time.Time
}
// now we can optionally add a limit.
func (tdb *TelemDb) GetPackets(ctx context.Context, filter BusEventFilter, options ...Modifier) ([]skylab.BusEvent, error) {
// construct a simple
var whereFrags = make([]string, 0)
// if we're filtering by names, add a where clause for it.
if len(filter.Names) > 0 {
names := strings.Join(filter.Names, ", ")
qString := fmt.Sprintf("name IN (%s)", names)
whereFrags = append(whereFrags, qString)
}
// TODO: identify if we need a special case for both time ranges
// using BETWEEN since apparenlty that can be better?
// next, check if we have a start/end time, add constraints
if !filter.TimerangeEnd.IsZero() {
qString := fmt.Sprintf("ts <= %d", filter.TimerangeEnd.UnixMilli())
whereFrags = append(whereFrags, qString)
}
if !filter.TimerangeStart.IsZero() {
// we have an end range
qString := fmt.Sprintf("ts >= %d", filter.TimerangeStart.UnixMilli())
whereFrags = append(whereFrags, qString)
}
sb := strings.Builder{}
sb.WriteString("SELECT * from \"bus_events\"")
// construct the full statement.
if len(whereFrags) > 0 {
// use the where clauses.
sb.WriteString(" WHERE ")
sb.WriteString(strings.Join(whereFrags, " AND "))
}
// Augment our data further if there's i.e a limit modifier.
// TODO: factor this out maybe?
for _, m := range options {
m.ModifyStatement(&sb)
}
rows, err := tdb.db.QueryxContext(ctx, sb.String())
if err != nil {
return nil, err
}
defer rows.Close()
var events = make([]skylab.BusEvent, 0, 10)
for rows.Next() {
var ev skylab.RawJsonEvent
err := rows.Scan(&ev.Timestamp, &ev.Name, (*[]byte)(&ev.Data))
if err != nil {
return nil, err
}
BusEv := skylab.BusEvent{
Timestamp: time.UnixMilli(int64(ev.Timestamp)),
Name: ev.Name,
}
BusEv.Data, err = skylab.FromJson(ev.Name, ev.Data)
if err != nil {
return events, nil
}
events = append(events, BusEv)
}
err = rows.Err()
return events, err
}
// We now need a different use-case: we would like to extract a value from
// a specific packet.
// Datum is a single measurement - it is more granular than a packet.
// the classic example is bms_measurement.current
type Datum struct {
Timestamp time.Time `db:"timestamp"`
Value any `db:"val"`
}
// GetValues queries the database for values in a given time range.
// A value is a specific data point. For example, bms_measurement.current
// would be a value.
func (tdb *TelemDb) GetValues(ctx context.Context, bef BusEventFilter,
field string, opts ...Modifier) ([]Datum, error) {
// this fragment uses json_extract from sqlite to get a single
// nested value.
sb := strings.Builder{}
sb.WriteString(`SELECT ts as timestamp, json_extract(data, '$.' || ?) as val FROM bus_events WHERE `)
if len(bef.Names) != 1 {
return nil, errors.New("invalid number of names")
}
qStrings := []string{"name is ?"}
// add timestamp limit.
if !bef.TimerangeStart.IsZero() {
qString := fmt.Sprintf("ts >= %d", bef.TimerangeStart.UnixMilli())
qStrings = append(qStrings, qString)
}
if !bef.TimerangeEnd.IsZero() {
qString := fmt.Sprintf("ts <= %d", bef.TimerangeEnd.UnixMilli())
qStrings = append(qStrings, qString)
}
// join qstrings with AND
sb.WriteString(strings.Join(qStrings, " AND "))
for _, m := range opts {
if m == nil {
continue
}
m.ModifyStatement(&sb)
}
rows, err := tdb.db.QueryxContext(ctx, sb.String(), field, bef.Names[0])
if err != nil {
return nil, err
}
defer rows.Close()
data := make([]Datum, 0, 10)
for rows.Next() {
var d Datum = Datum{}
var ts int64
err = rows.Scan(&ts, &d.Value)
d.Timestamp = time.UnixMilli(ts)
if err != nil {
fmt.Print(err)
return data, err
}
data = append(data, d)
}
fmt.Print(rows.Err())
return data, nil
}

View file

@ -1,43 +0,0 @@
package db
// This file implements Packet modelling, which allows us to look up fields by name
type PacketDef struct {
Name string
Description string
Id int
}
type FieldDef struct {
Name string
SubName string
Packet string
Type string
}
// PacketNotFoundError is when a matching packet cannot be found.
type PacketNotFoundError string
func (e *PacketNotFoundError) Error() string {
return "packet not found: " + string(*e)
}
// GetPacketDefN retrieves a packet matching the given name, if it exists.
// returns PacketNotFoundError if a matching packet could not be found.
func (tdb *TelemDb) GetPacketDefN(name string) (*PacketDef, error) {
return nil, nil
}
// GetPacketDefF retrieves the parent packet for a given field.
// This function cannot return PacketNotFoundError since we have SQL FKs enforcing.
func (tdb *TelemDb) GetPacketDefF(field FieldDef) (*PacketDef, error) {
return nil, nil
}
// GetFieldDefs returns the given fields for a given packet definition.
func (tdb *TelemDb) GetFieldDefs(pkt PacketDef) ([]FieldDef, error) {
return nil, nil
}

View file

@ -1,4 +1,4 @@
package db package gotelem
import ( import (
"embed" "embed"

View file

@ -1,4 +1,4 @@
package db package gotelem
import ( import (
"embed" "embed"