diff --git a/cmd/gotelem/cli/server.go b/cmd/gotelem/cli/server.go index de2cd1e..4cafb1d 100644 --- a/cmd/gotelem/cli/server.go +++ b/cmd/gotelem/cli/server.go @@ -68,6 +68,7 @@ var serveThings = []service{ &xBeeService{}, &canLoggerService{}, &rpcService{}, + &dbLoggingService{}, &httpService{}, } @@ -96,6 +97,7 @@ func serve(cCtx *cli.Context) error { if cCtx.IsSet("db") { dbPath = cCtx.Path("db") } + logger.Info("opening database", "path", dbPath) db, err := db.OpenTelemDb(dbPath) if err != nil { return err @@ -310,3 +312,33 @@ func (h *httpService) Start(cCtx *cli.Context, deps svcDeps) (err error) { http.ListenAndServe(":8080", r) return } + +// dbLoggingService listens to the CAN packet broker and saves packets to the database. +type dbLoggingService struct { +} + +func (d *dbLoggingService) Status() { + +} + +func (d *dbLoggingService) String() string { + return "db logger" +} + +func (d *dbLoggingService) Start(cCtx *cli.Context, deps svcDeps) (err error) { + + // put CAN packets from the broker into the database. + tdb := deps.Db + rxCh, err := deps.Broker.Subscribe("dbRecorder") + defer deps.Broker.Unsubscribe("dbRecorder") + + for { + select { + case msg := <-rxCh: + deps.Logger.Info("boop", "msg", msg) + tdb.AddEventsCtx(cCtx.Context, msg) + case <-cCtx.Done(): + return + } + } +} diff --git a/http.go b/http.go index 281b13c..55e036f 100644 --- a/http.go +++ b/http.go @@ -14,6 +14,7 @@ import ( "github.com/kschamplin/gotelem/skylab" "golang.org/x/exp/slog" "nhooyr.io/websocket" + "nhooyr.io/websocket/wsjson" ) type slogHttpLogger struct { @@ -75,7 +76,6 @@ func apiV1(broker *Broker, db *db.TelemDb) chi.Router { } // we have a list of packets now. let's commit them. db.AddEvents(pkgs...) - return }) r.Get("/", func(w http.ResponseWriter, r *http.Request) { // this should use http query params o return a list of packets. @@ -141,15 +141,14 @@ func apiV1PacketSubscribe(broker *Broker, db *db.TelemDb) http.HandlerFunc { case msgIn := <-sub: if len(sess.idFilter) == 0 { // send it. - goto escapeFilter + wsjson.Write(r.Context(), c, msgIn) } for _, id := range sess.idFilter { if id == msgIn.Id { // send it + break } } - escapeFilter: - return } diff --git a/internal/db/db.go b/internal/db/db.go index 3c4e1bb..a6b39de 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -5,8 +5,11 @@ package db import ( "context" "database/sql" + "embed" "encoding/json" "fmt" + "io/fs" + "regexp" "strconv" "strings" "time" @@ -22,6 +25,51 @@ func init() { }) } +// embed the migrations into applications so they can update databases. + +//go:embed migrations +var migrations embed.FS + +var migrationRegex = regexp.MustCompile(`^([0-9]+)_(.*)_(down|up)\.sql$`) + +type Migration struct { + Name string + Version uint + FileName string +} + +// GetMigrations returns a list of migrations, which are correctly index. zero is nil. + +// use len to get the highest number migration. +func RunMigrations(currentVer int) (finalVer int) { + + res := make(map[int]map[string]Migration) // version number -> direction -> migration. + + fs.WalkDir(migrations, ".", func(path string, d fs.DirEntry, err error) error { + + if d.IsDir() { + return nil + } + m := migrationRegex.FindStringSubmatch(d.Name()) + if len(m) != 5 { + panic("error parsing migration name") + } + migrationVer, _ := strconv.ParseInt(m[1], 10, 64) + + mig := Migration{ + Name: m[2], + Version: uint(migrationVer), + FileName: d.Name(), + } + + res[int(migrationVer)][m] + + return nil + }) + + return res +} + type TelemDb struct { db *sqlx.DB } @@ -46,6 +94,15 @@ func OpenTelemDb(path string, options ...TelemDbOption) (tdb *TelemDb, err error // execute database up statement (better hope it is idempotent!) // FIXME: only do this when it's a new database (instead warn the user about potential version mismatches) // TODO: store gotelem version (commit hash?) in DB (PRAGMA user_version) + + var version int + err = tdb.db.Get(&version, "PRAGMA user_version") + if err != nil { + return + } + + // get latest version of migrations - then run the SQL in order. + _, err = tdb.db.Exec(sqlDbUp) return tdb, err @@ -78,6 +135,8 @@ CREATE TABLE "drive_records" ( PRIMARY KEY("id" AUTOINCREMENT), CONSTRAINT "duration_valid" CHECK(end_time is null or start_time < end_time) ); + + -- gps logs TODO: use GEOJSON/Spatialite tracks instead? CREATE TABLE "position_logs" ( @@ -106,7 +165,7 @@ DROP TABLE "position_logs"; // sql expression to insert a bus event into the packets database.1 const sqlInsertEvent = ` -INSERT INTO "bus_events" (time, can_id, name, packet) VALUES ($1, $2, $3, json($4)); +INSERT INTO "bus_events" (ts, id, name, data) VALUES ($1, $2, $3, json($4)); ` // AddEvent adds the bus event to the database. @@ -126,7 +185,13 @@ func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...skylab.BusEvent) tx.Rollback() return } - tx.ExecContext(ctx, sqlInsertEvent, b.Timestamp.UnixMilli(), b.Id, b.Data.String(), j) + _, err = tx.ExecContext(ctx, sqlInsertEvent, b.Timestamp.UnixMilli(), b.Id, b.Data.String(), j) + + if err != nil { + tx.Rollback() + return + } + } tx.Commit() return @@ -200,8 +265,6 @@ func (q QueryOr) Query() string { return fmt.Sprintf("(%s)", strings.Join(qStrings, " OR ")) } -const eventQueryFmtString = `SELECT * FROM "bus_events" WHERE %s LIMIT %d` - // GetEvents is the mechanism to request underlying event data. // it takes functions (which are defined in db.go) that modify the query, // and then return the results. @@ -212,7 +275,7 @@ func (tdb *TelemDb) GetEvents(limit int, where ...QueryFrag) (events []skylab.Bu for _, f := range where { fragStr = append(fragStr, f.Query()) } - qString := fmt.Sprintf("SELECT * FROM \"bus_events\" WHERE %s LIMIT %d", strings.Join(fragStr, " AND "), limit) + qString := fmt.Sprintf(`SELECT * FROM "bus_events" WHERE %s LIMIT %d`, strings.Join(fragStr, " AND "), limit) rows, err := tdb.db.Queryx(qString) if err != nil { return @@ -247,3 +310,21 @@ func (tdb *TelemDb) GetEvents(limit int, where ...QueryFrag) (events []skylab.Bu return } + +// GetActiveDrive finds the non-null drive and returns it, if any. +func (tdb *TelemDb) GetActiveDrive() (res int, err error) { + err = tdb.db.Get(&res, "SELECT id FROM drive_records WHERE end_time IS NULL LIMIT 1") + return +} + +func (tdb *TelemDb) NewDrive(start time.Time, note string) { + +} + +func (tdb *TelemDb) EndDrive() { + +} + +func (tdb *TelemDb) UpdateDrive(id int, note string) { + +} diff --git a/internal/db/migrations/1_initial_down.sql b/internal/db/migrations/1_initial_down.sql new file mode 100644 index 0000000..577cc25 --- /dev/null +++ b/internal/db/migrations/1_initial_down.sql @@ -0,0 +1,3 @@ +DROP TABLE "bus_events"; +DROP INDEX "ids_timestamped"; +DROP INDEX "times"; \ No newline at end of file diff --git a/internal/db/migrations/1_initial_up.sql b/internal/db/migrations/1_initial_up.sql new file mode 100644 index 0000000..f699ab2 --- /dev/null +++ b/internal/db/migrations/1_initial_up.sql @@ -0,0 +1,15 @@ +CREATE TABLE "bus_events" ( + "ts" INTEGER NOT NULL, -- timestamp, unix milliseconds + "id" INTEGER NOT NULL, -- can ID + "name" TEXT NOT NULL, -- name of base packet + "data" TEXT NOT NULL CHECK(json_valid(data)) -- JSON object describing the data, including index if any +); + +CREATE INDEX "ids_timestamped" ON "bus_events" ( + "id", + "ts" DESC +); + +CREATE INDEX "times" ON "bus_events" ( + "ts" DESC +); \ No newline at end of file diff --git a/internal/db/migrations/2_addl_tables_down.sql b/internal/db/migrations/2_addl_tables_down.sql new file mode 100644 index 0000000..37f1d85 --- /dev/null +++ b/internal/db/migrations/2_addl_tables_down.sql @@ -0,0 +1,2 @@ +DROP TABLE "drive_records"; +DROP TABLE "position_logs"; \ No newline at end of file diff --git a/internal/db/migrations/2_addl_tables_up.sql b/internal/db/migrations/2_addl_tables_up.sql new file mode 100644 index 0000000..0169b71 --- /dev/null +++ b/internal/db/migrations/2_addl_tables_up.sql @@ -0,0 +1,18 @@ +-- this table shows when we started/stopped logging. +CREATE TABLE "drive_records" ( + "id" INTEGER NOT NULL UNIQUE, -- unique ID of the drive. + "start_time" INTEGER NOT NULL, -- when the drive started + "end_time" INTEGER, -- when it ended, or NULL if it's ongoing. + "note" TEXT, -- optional description of the segment/experiment/drive + PRIMARY KEY("id" AUTOINCREMENT), + CONSTRAINT "duration_valid" CHECK(end_time is null or start_time < end_time) +); + +CREATE TABLE "position_logs" ( + "ts" INTEGER NOT NULL, + "source" TEXT NOT NULL, + "lat" REAL NOT NULL, + "lon" REAL NOT NULL, + "elevation" REAL, + CONSTRAINT "no_empty_source" CHECK(source is not "") +); \ No newline at end of file