wip: db migration
This commit is contained in:
parent
ec02284657
commit
63087deffb
|
@ -68,6 +68,7 @@ var serveThings = []service{
|
||||||
&xBeeService{},
|
&xBeeService{},
|
||||||
&canLoggerService{},
|
&canLoggerService{},
|
||||||
&rpcService{},
|
&rpcService{},
|
||||||
|
&dbLoggingService{},
|
||||||
&httpService{},
|
&httpService{},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,6 +97,7 @@ func serve(cCtx *cli.Context) error {
|
||||||
if cCtx.IsSet("db") {
|
if cCtx.IsSet("db") {
|
||||||
dbPath = cCtx.Path("db")
|
dbPath = cCtx.Path("db")
|
||||||
}
|
}
|
||||||
|
logger.Info("opening database", "path", dbPath)
|
||||||
db, err := db.OpenTelemDb(dbPath)
|
db, err := db.OpenTelemDb(dbPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -310,3 +312,33 @@ func (h *httpService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
||||||
http.ListenAndServe(":8080", r)
|
http.ListenAndServe(":8080", r)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// dbLoggingService listens to the CAN packet broker and saves packets to the database.
|
||||||
|
type dbLoggingService struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dbLoggingService) Status() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dbLoggingService) String() string {
|
||||||
|
return "db logger"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dbLoggingService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
||||||
|
|
||||||
|
// put CAN packets from the broker into the database.
|
||||||
|
tdb := deps.Db
|
||||||
|
rxCh, err := deps.Broker.Subscribe("dbRecorder")
|
||||||
|
defer deps.Broker.Unsubscribe("dbRecorder")
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case msg := <-rxCh:
|
||||||
|
deps.Logger.Info("boop", "msg", msg)
|
||||||
|
tdb.AddEventsCtx(cCtx.Context, msg)
|
||||||
|
case <-cCtx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
7
http.go
7
http.go
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/kschamplin/gotelem/skylab"
|
"github.com/kschamplin/gotelem/skylab"
|
||||||
"golang.org/x/exp/slog"
|
"golang.org/x/exp/slog"
|
||||||
"nhooyr.io/websocket"
|
"nhooyr.io/websocket"
|
||||||
|
"nhooyr.io/websocket/wsjson"
|
||||||
)
|
)
|
||||||
|
|
||||||
type slogHttpLogger struct {
|
type slogHttpLogger struct {
|
||||||
|
@ -75,7 +76,6 @@ func apiV1(broker *Broker, db *db.TelemDb) chi.Router {
|
||||||
}
|
}
|
||||||
// we have a list of packets now. let's commit them.
|
// we have a list of packets now. let's commit them.
|
||||||
db.AddEvents(pkgs...)
|
db.AddEvents(pkgs...)
|
||||||
return
|
|
||||||
})
|
})
|
||||||
r.Get("/", func(w http.ResponseWriter, r *http.Request) {
|
r.Get("/", func(w http.ResponseWriter, r *http.Request) {
|
||||||
// this should use http query params o return a list of packets.
|
// this should use http query params o return a list of packets.
|
||||||
|
@ -141,15 +141,14 @@ func apiV1PacketSubscribe(broker *Broker, db *db.TelemDb) http.HandlerFunc {
|
||||||
case msgIn := <-sub:
|
case msgIn := <-sub:
|
||||||
if len(sess.idFilter) == 0 {
|
if len(sess.idFilter) == 0 {
|
||||||
// send it.
|
// send it.
|
||||||
goto escapeFilter
|
wsjson.Write(r.Context(), c, msgIn)
|
||||||
}
|
}
|
||||||
for _, id := range sess.idFilter {
|
for _, id := range sess.idFilter {
|
||||||
if id == msgIn.Id {
|
if id == msgIn.Id {
|
||||||
// send it
|
// send it
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
escapeFilter:
|
|
||||||
return
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,8 +5,11 @@ package db
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"embed"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/fs"
|
||||||
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
@ -22,6 +25,51 @@ func init() {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// embed the migrations into applications so they can update databases.
|
||||||
|
|
||||||
|
//go:embed migrations
|
||||||
|
var migrations embed.FS
|
||||||
|
|
||||||
|
var migrationRegex = regexp.MustCompile(`^([0-9]+)_(.*)_(down|up)\.sql$`)
|
||||||
|
|
||||||
|
type Migration struct {
|
||||||
|
Name string
|
||||||
|
Version uint
|
||||||
|
FileName string
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetMigrations returns a list of migrations, which are correctly index. zero is nil.
|
||||||
|
|
||||||
|
// use len to get the highest number migration.
|
||||||
|
func RunMigrations(currentVer int) (finalVer int) {
|
||||||
|
|
||||||
|
res := make(map[int]map[string]Migration) // version number -> direction -> migration.
|
||||||
|
|
||||||
|
fs.WalkDir(migrations, ".", func(path string, d fs.DirEntry, err error) error {
|
||||||
|
|
||||||
|
if d.IsDir() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
m := migrationRegex.FindStringSubmatch(d.Name())
|
||||||
|
if len(m) != 5 {
|
||||||
|
panic("error parsing migration name")
|
||||||
|
}
|
||||||
|
migrationVer, _ := strconv.ParseInt(m[1], 10, 64)
|
||||||
|
|
||||||
|
mig := Migration{
|
||||||
|
Name: m[2],
|
||||||
|
Version: uint(migrationVer),
|
||||||
|
FileName: d.Name(),
|
||||||
|
}
|
||||||
|
|
||||||
|
res[int(migrationVer)][m]
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
type TelemDb struct {
|
type TelemDb struct {
|
||||||
db *sqlx.DB
|
db *sqlx.DB
|
||||||
}
|
}
|
||||||
|
@ -46,6 +94,15 @@ func OpenTelemDb(path string, options ...TelemDbOption) (tdb *TelemDb, err error
|
||||||
// execute database up statement (better hope it is idempotent!)
|
// execute database up statement (better hope it is idempotent!)
|
||||||
// FIXME: only do this when it's a new database (instead warn the user about potential version mismatches)
|
// FIXME: only do this when it's a new database (instead warn the user about potential version mismatches)
|
||||||
// TODO: store gotelem version (commit hash?) in DB (PRAGMA user_version)
|
// TODO: store gotelem version (commit hash?) in DB (PRAGMA user_version)
|
||||||
|
|
||||||
|
var version int
|
||||||
|
err = tdb.db.Get(&version, "PRAGMA user_version")
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// get latest version of migrations - then run the SQL in order.
|
||||||
|
|
||||||
_, err = tdb.db.Exec(sqlDbUp)
|
_, err = tdb.db.Exec(sqlDbUp)
|
||||||
|
|
||||||
return tdb, err
|
return tdb, err
|
||||||
|
@ -79,6 +136,8 @@ CREATE TABLE "drive_records" (
|
||||||
CONSTRAINT "duration_valid" CHECK(end_time is null or start_time < end_time)
|
CONSTRAINT "duration_valid" CHECK(end_time is null or start_time < end_time)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
-- gps logs TODO: use GEOJSON/Spatialite tracks instead?
|
-- gps logs TODO: use GEOJSON/Spatialite tracks instead?
|
||||||
CREATE TABLE "position_logs" (
|
CREATE TABLE "position_logs" (
|
||||||
"ts" INTEGER NOT NULL,
|
"ts" INTEGER NOT NULL,
|
||||||
|
@ -106,7 +165,7 @@ DROP TABLE "position_logs";
|
||||||
|
|
||||||
// sql expression to insert a bus event into the packets database.1
|
// sql expression to insert a bus event into the packets database.1
|
||||||
const sqlInsertEvent = `
|
const sqlInsertEvent = `
|
||||||
INSERT INTO "bus_events" (time, can_id, name, packet) VALUES ($1, $2, $3, json($4));
|
INSERT INTO "bus_events" (ts, id, name, data) VALUES ($1, $2, $3, json($4));
|
||||||
`
|
`
|
||||||
|
|
||||||
// AddEvent adds the bus event to the database.
|
// AddEvent adds the bus event to the database.
|
||||||
|
@ -126,7 +185,13 @@ func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...skylab.BusEvent)
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
tx.ExecContext(ctx, sqlInsertEvent, b.Timestamp.UnixMilli(), b.Id, b.Data.String(), j)
|
_, err = tx.ExecContext(ctx, sqlInsertEvent, b.Timestamp.UnixMilli(), b.Id, b.Data.String(), j)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
tx.Rollback()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
tx.Commit()
|
tx.Commit()
|
||||||
return
|
return
|
||||||
|
@ -200,8 +265,6 @@ func (q QueryOr) Query() string {
|
||||||
return fmt.Sprintf("(%s)", strings.Join(qStrings, " OR "))
|
return fmt.Sprintf("(%s)", strings.Join(qStrings, " OR "))
|
||||||
}
|
}
|
||||||
|
|
||||||
const eventQueryFmtString = `SELECT * FROM "bus_events" WHERE %s LIMIT %d`
|
|
||||||
|
|
||||||
// GetEvents is the mechanism to request underlying event data.
|
// GetEvents is the mechanism to request underlying event data.
|
||||||
// it takes functions (which are defined in db.go) that modify the query,
|
// it takes functions (which are defined in db.go) that modify the query,
|
||||||
// and then return the results.
|
// and then return the results.
|
||||||
|
@ -212,7 +275,7 @@ func (tdb *TelemDb) GetEvents(limit int, where ...QueryFrag) (events []skylab.Bu
|
||||||
for _, f := range where {
|
for _, f := range where {
|
||||||
fragStr = append(fragStr, f.Query())
|
fragStr = append(fragStr, f.Query())
|
||||||
}
|
}
|
||||||
qString := fmt.Sprintf("SELECT * FROM \"bus_events\" WHERE %s LIMIT %d", strings.Join(fragStr, " AND "), limit)
|
qString := fmt.Sprintf(`SELECT * FROM "bus_events" WHERE %s LIMIT %d`, strings.Join(fragStr, " AND "), limit)
|
||||||
rows, err := tdb.db.Queryx(qString)
|
rows, err := tdb.db.Queryx(qString)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
@ -247,3 +310,21 @@ func (tdb *TelemDb) GetEvents(limit int, where ...QueryFrag) (events []skylab.Bu
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetActiveDrive finds the non-null drive and returns it, if any.
|
||||||
|
func (tdb *TelemDb) GetActiveDrive() (res int, err error) {
|
||||||
|
err = tdb.db.Get(&res, "SELECT id FROM drive_records WHERE end_time IS NULL LIMIT 1")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tdb *TelemDb) NewDrive(start time.Time, note string) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tdb *TelemDb) EndDrive() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tdb *TelemDb) UpdateDrive(id int, note string) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
3
internal/db/migrations/1_initial_down.sql
Normal file
3
internal/db/migrations/1_initial_down.sql
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
DROP TABLE "bus_events";
|
||||||
|
DROP INDEX "ids_timestamped";
|
||||||
|
DROP INDEX "times";
|
15
internal/db/migrations/1_initial_up.sql
Normal file
15
internal/db/migrations/1_initial_up.sql
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
CREATE TABLE "bus_events" (
|
||||||
|
"ts" INTEGER NOT NULL, -- timestamp, unix milliseconds
|
||||||
|
"id" INTEGER NOT NULL, -- can ID
|
||||||
|
"name" TEXT NOT NULL, -- name of base packet
|
||||||
|
"data" TEXT NOT NULL CHECK(json_valid(data)) -- JSON object describing the data, including index if any
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX "ids_timestamped" ON "bus_events" (
|
||||||
|
"id",
|
||||||
|
"ts" DESC
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX "times" ON "bus_events" (
|
||||||
|
"ts" DESC
|
||||||
|
);
|
2
internal/db/migrations/2_addl_tables_down.sql
Normal file
2
internal/db/migrations/2_addl_tables_down.sql
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
DROP TABLE "drive_records";
|
||||||
|
DROP TABLE "position_logs";
|
18
internal/db/migrations/2_addl_tables_up.sql
Normal file
18
internal/db/migrations/2_addl_tables_up.sql
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
-- this table shows when we started/stopped logging.
|
||||||
|
CREATE TABLE "drive_records" (
|
||||||
|
"id" INTEGER NOT NULL UNIQUE, -- unique ID of the drive.
|
||||||
|
"start_time" INTEGER NOT NULL, -- when the drive started
|
||||||
|
"end_time" INTEGER, -- when it ended, or NULL if it's ongoing.
|
||||||
|
"note" TEXT, -- optional description of the segment/experiment/drive
|
||||||
|
PRIMARY KEY("id" AUTOINCREMENT),
|
||||||
|
CONSTRAINT "duration_valid" CHECK(end_time is null or start_time < end_time)
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE "position_logs" (
|
||||||
|
"ts" INTEGER NOT NULL,
|
||||||
|
"source" TEXT NOT NULL,
|
||||||
|
"lat" REAL NOT NULL,
|
||||||
|
"lon" REAL NOT NULL,
|
||||||
|
"elevation" REAL,
|
||||||
|
CONSTRAINT "no_empty_source" CHECK(source is not "")
|
||||||
|
);
|
Loading…
Reference in a new issue