From c52bccb140851aedc91557681dfae2a119d3e1fc Mon Sep 17 00:00:00 2001 From: saji Date: Fri, 30 Jun 2023 07:40:50 -0500 Subject: [PATCH] db fixes --- can_sqlite.go | 22 ----- db.go | 212 +++++++++++++++++++++++++++--------------- http.go | 62 ++++++++---- internal/badger/db.go | 3 - 4 files changed, 185 insertions(+), 114 deletions(-) delete mode 100644 can_sqlite.go delete mode 100644 internal/badger/db.go diff --git a/can_sqlite.go b/can_sqlite.go deleted file mode 100644 index 3d551aa..0000000 --- a/can_sqlite.go +++ /dev/null @@ -1,22 +0,0 @@ -package gotelem - -import ( - "github.com/jmoiron/sqlx" - _ "github.com/mattn/go-sqlite3" -) - -// this file implements a CAN adapter for the sqlite db. - -type CanDB struct { - Db *sqlx.DB -} - -func (cdb *CanDB) Send(_ *Frame) error { - panic("not implemented") // TODO: Implement -} - -func (cdb *CanDB) Recv() (*Frame, error) { - panic("not implemented") // TODO: Implement -} - -func NewCanDB() {} diff --git a/db.go b/db.go index 14ba60c..640e98e 100644 --- a/db.go +++ b/db.go @@ -1,17 +1,27 @@ package gotelem // this file implements the database functions to load/store/read from a sql database. + import ( + "context" + "database/sql" "encoding/json" + "fmt" "strconv" "strings" "time" "github.com/jmoiron/sqlx" "github.com/kschamplin/gotelem/skylab" - _ "github.com/mattn/go-sqlite3" + sqlite3 "github.com/mattn/go-sqlite3" ) +func init() { + sql.Register("custom_sqlite3", &sqlite3.SQLiteDriver{ + // TODO: add functions that convert between unix milliseconds and ISO 8601 + }) +} + type TelemDb struct { db *sqlx.DB } @@ -34,23 +44,20 @@ func OpenTelemDb(path string, options ...TelemDbOption) (tdb *TelemDb, err error } // execute database up statement (better hope it is idempotent!) + // FIXME: only do this when it's a new database (instead warn the user about potential version mismatches) + // TODO: store gotelem version (commit hash?) in DB (PRAGMA user_version) _, err = tdb.db.Exec(sqlDbUp) - if err != nil { - - } - - return tdb, nil + return tdb, err } // the sql commands to create the database. const sqlDbUp = ` CREATE TABLE IF NOT EXISTS "bus_events" ( - "ts" REAL NOT NULL, -- timestamp + "ts" INTEGER NOT NULL, -- timestamp, unix milliseconds "id" INTEGER NOT NULL, -- can ID "name" TEXT NOT NULL, -- name of base packet - "index" INTEGER, -- index of the repeated packet (base_id = id - index) - "packet" TEXT NOT NULL CHECK(json_valid(packet)) -- JSON object describing the data + "data" TEXT NOT NULL CHECK(json_valid(data)) -- JSON object describing the data, including index if any ); CREATE INDEX IF NOT EXISTS "ids_timestamped" ON "bus_events" ( @@ -63,14 +70,26 @@ CREATE INDEX IF NOT EXISTS "times" ON "bus_events" ( ); -- this table shows when we started/stopped logging. -CREATE TABLE "bus_records" ( - "id" INTEGER NOT NULL UNIQUE, - "start_time" INTEGER NOT NULL, - "end_time" INTEGER, - "note" TEXT, +CREATE TABLE "drive_records" ( + "id" INTEGER NOT NULL UNIQUE, -- unique ID of the drive. + "start_time" INTEGER NOT NULL, -- when the drive started + "end_time" INTEGER, -- when it ended, or NULL if it's ongoing. + "note" TEXT, -- optional description of the segment/experiment/drive PRIMARY KEY("id" AUTOINCREMENT), CONSTRAINT "duration_valid" CHECK(end_time is null or start_time < end_time) ); + +-- gps logs TODO: use GEOJSON/Spatialite tracks instead? +CREATE TABLE "position_logs" ( + "ts" INTEGER NOT NULL, + "source" TEXT NOT NULL, + "lat" REAL NOT NULL, + "lon" REAL NOT NULL, + "elevation" REAL, + CONSTRAINT "no_empty_source" CHECK(source is not "") +); + +-- TODO: ensure only one "active" (end_time == null) drive record at a time using triggers/constraints/index ` // sql sequence to tear down the database. @@ -81,103 +100,150 @@ DROP TABLE "bus_events"; DROP INDEX "ids_timestamped"; DROP INDEX "times"; -DROP TABLE "bus_records"; +DROP TABLE "drive_records"; +DROP TABLE "position_logs"; ` // sql expression to insert a bus event into the packets database.1 const sqlInsertEvent = ` -INSERT INTO "bus_events" (time, can_id, name, index, packet) VALUES ($1, $2, $3, json($4)); +INSERT INTO "bus_events" (time, can_id, name, packet) VALUES ($1, $2, $3, json($4)); ` // AddEvent adds the bus event to the database. -func (tdb *TelemDb) AddEvents(events ...skylab.BusEvent) { +func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...skylab.BusEvent) (err error) { // - tx, err := tdb.db.Begin() + tx, err := tdb.db.BeginTx(ctx, nil) if err != nil { tx.Rollback() return } for _, b := range events { - j, err := json.Marshal(b.Data) + var j []byte + j, err = json.Marshal(b.Data) if err != nil { tx.Rollback() return } - tx.Exec(sqlInsertEvent, b.Timestamp, b.Id, b.Name, j) + tx.ExecContext(ctx, sqlInsertEvent, b.Timestamp.UnixMilli(), b.Id, b.Data.String(), j) } tx.Commit() + return } -// QueryIdString is a string that filters ids from the set. use ID query functions to -// create them. -type QueryIdString string +func (tdb *TelemDb) AddEvents(events ...skylab.BusEvent) (err error) { -// QueryIds constructs a CAN Id filter for one or more distinct Ids. -// For a range of ids, use QueryIdRange(start, stop uint32) -func QueryIds(ids ...uint32) QueryIdString { - // FIXME: zero elements case? - var idsString []string - for _, id := range ids { - idsString = append(idsString, strconv.FormatUint(uint64(id), 10)) + return tdb.AddEventsCtx(context.Background(), events...) +} + +/// Query fragment guide: +/// We need to be able to easily construct safe(!) and meaningful queries programatically +/// so we make some new types that can be turned into SQL fragments that go inside the where clause. +/// These all implement the QueryFrag interface, meaning the actual query function (that acts on the DB) +/// can deal with them agnostically. The Query function joins all the fragments it is given with AND. +/// to get OR, + +// QueryFrag is anything that can be turned into a Query WHERE clause +type QueryFrag interface { + Query() string +} + +// QueryIdRange represents a range of IDs to select for, inclusive. +type QueryIdRange struct { + Start uint32 + End uint32 +} + +func (q *QueryIdRange) Query() string { + return fmt.Sprintf("id BETWEEN %d AND %d", q.Start, q.End) +} + +// QueryIds selects for individual CAN ids +type QueryIds []uint32 + +func (q QueryIds) Query() string { + var idStrings []string + for _, id := range q { + idStrings = append(idStrings, strconv.FormatUint(uint64(id), 10)) } - - return QueryIdString("id IN (" + strings.Join(idsString, ",") + ")") + return fmt.Sprintf("id IN (%s)", strings.Join(idStrings, ",")) } -func QueryIdsInv(ids ...uint32) QueryIdString { - +// QueryTimeRange represents a query of a specific time range. For "before" or "after" queries, +// use time.Unix(0,0) or time.Now() in start and end respectively. +type QueryTimeRange struct { + Start time.Time + End time.Time } -// QueryIdRange selects all IDs between start and end, *inclusive*. -// This function is preferred over a generated list of IDs. -func QueryIdRange(start, end uint32) QueryIdString { - startString := strconv.FormatUint(uint64(start), 10) - endString := strconv.FormatUint(uint64(end), 10) - return QueryIdString("id BETWEEN " + startString + " AND " + endString) +func (q *QueryTimeRange) Query() string { + startUnix := q.Start.UnixMilli() + endUnix := q.End.UnixMilli() + + return fmt.Sprintf("ts BETWEEN %d AND %d", startUnix, endUnix) } -// QueryIdRangeInv removes all IDs between start and end from the results. -// See QueryIdRange for more details. -func QueryIdRangeInv(start, end uint32) QueryIdString { - return QueryIdString("NOT ") + QueryIdRange(start, end) +type QueryNames []string + +func (q QueryNames) Query() string { + return fmt.Sprintf("name IN (%s)", strings.Join(q, ", ")) } -type QueryTimestampString string +type QueryOr []QueryFrag -// QueryDuration takes a start and end time and filters where the packets are between that time range. -func QueryDuration(start, end time.Time) QueryTimestampString { - - // the time in the database is a float, we have a time.Time so use unixNano() / 1e9 to float it. - startString := strconv.FormatFloat(float64(start.UnixNano())/1e9, 'f', -1, 64) - endString := strconv.FormatFloat(float64(start.UnixNano())/1e9, 'f', -1, 64) - return QueryTimestampString("ts BETWEEN " + startString + " AND " + endString) +func (q QueryOr) Query() string { + var qStrings []string + for _, frag := range q { + qStrings = append(qStrings, frag.Query()) + } + return fmt.Sprintf("(%s)", strings.Join(qStrings, " OR ")) } -type QueryNameString string - -func QueryNames(names ...string) QueryNameString - -func QueryNamesInv(names ...string) QueryNameString - -// Describes the parameters for an event query -type EventsQuery struct { - Ids []QueryIdString // Ids contains a list of CAN ID filters that are OR'd together. - - Times []QueryTimestampString - - Names []QueryNameString - - Limit uint // max number of results. -} +const eventQueryFmtString = `SELECT * FROM "bus_events" WHERE %s LIMIT %d` // GetEvents is the mechanism to request underlying event data. // it takes functions (which are defined in db.go) that modify the query, // and then return the results. -func (tdb *TelemDb) GetEvents(q *EventsQuery) []skylab.BusEvent { - // if function is inverse, AND and OR are switched. - // Demorgan's - // how to know if function is inverted??? - return nil +func (tdb *TelemDb) GetEvents(limit int, where ...QueryFrag) (events []skylab.BusEvent, err error) { + // Simple mechanism for combining query frags: + // join with " AND ". To join expressions with or, use QueryOr + var fragStr []string + for _, f := range where { + fragStr = append(fragStr, f.Query()) + } + qString := fmt.Sprintf("SELECT * FROM \"bus_events\" WHERE %s LIMIT %d", strings.Join(fragStr, " AND "), limit) + rows, err := tdb.db.Queryx(qString) + if err != nil { + return + } + defer rows.Close() + + if limit < 0 { // special case: limit negative means unrestricted. + events = make([]skylab.BusEvent, 0, 20) + } else { + events = make([]skylab.BusEvent, 0, limit) + } + // scan rows into busevent list... + for rows.Next() { + var ev skylab.RawJsonEvent + err = rows.StructScan(&ev) + if err != nil { + return + } + + BusEv := skylab.BusEvent{ + Timestamp: time.UnixMilli(int64(ev.Timestamp)), + Id: ev.Id, + } + BusEv.Data, err = skylab.FromJson(ev.Id, ev.Data) + + // FIXME: this is slow! + events = append(events, BusEv) + + } + + err = rows.Err() + + return } diff --git a/http.go b/http.go index cf2610e..fd9717c 100644 --- a/http.go +++ b/http.go @@ -19,12 +19,12 @@ type slogHttpLogger struct { slog.Logger } -func TelemRouter(log *slog.Logger, broker *JBroker, db *TelemDb) http.Handler { +func TelemRouter(log *slog.Logger, broker *Broker, db *TelemDb) http.Handler { r := chi.NewRouter() r.Use(middleware.RequestID) r.Use(middleware.RealIP) - r.Use(middleware.Logger) // TODO: integrate with slog + r.Use(middleware.Logger) // TODO: integrate with slog instead of go default logger. r.Use(middleware.Recoverer) r.Get("/schema", func(w http.ResponseWriter, r *http.Request) { @@ -50,12 +50,16 @@ func TelemRouter(log *slog.Logger, broker *JBroker, db *TelemDb) http.Handler { } // define API version 1 routes. -func apiV1(broker *JBroker, db *TelemDb) chi.Router { +func apiV1(broker *Broker, db *TelemDb) chi.Router { r := chi.NewRouter() + // this API only accepts JSON. + r.Use(middleware.AllowContentType("application/json")) + // no caching - always get the latest data. + r.Use(middleware.NoCache) + r.Get("/schema", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") - // return the spicy json response. - w.WriteHeader(http.StatusOK) + // return the Skylab JSON definitions w.Write([]byte(skylab.SkylabDefinitions)) }) @@ -73,12 +77,14 @@ func apiV1(broker *JBroker, db *TelemDb) chi.Router { return }) r.Get("/", func(w http.ResponseWriter, r *http.Request) { - // this should use query params to return a list of packets. + // this should use http query params o return a list of packets. }) // this is to get packets by a name. r.Get("/{name:[a-z_]+}", func(w http.ResponseWriter, r *http.Request) { + // support field getting (matching too?) + // support limit }) @@ -86,27 +92,25 @@ func apiV1(broker *JBroker, db *TelemDb) chi.Router { // records are driving segments/runs. r.Route("/records", func(r chi.Router) { - r.Get("/") // get all runs - r.Get("/active") // get current run (no end time) - r.Post("/") // create a new run (with note). Ends active run if any, and creates new active run (no end time) - r.Get("/{id}") // get details on a specific run - r.Put("/{id}") // update a specific run. Can only be used to add notes/metadata, and not to change time/id. + r.Get("/", apiV1GetRecords(db)) // get all runs + r.Get("/active", apiV1GetActiveRecord(db)) // get current run (no end time) + r.Post("/", apiV1StartRecord(db)) // create a new run (with note). Ends active run if any, and creates new active run (no end time) + r.Get("/{id}", apiV1GetRecord(db)) // get details on a specific run + r.Put("/{id}", apiV1UpdateRecord(db)) // update a specific run. Can only be used to add notes/metadata, and not to change time/id. }) - r.Get("/stats") // v1 api stats (calls, clients, xbee connected, meta health ok) - - r. + r.Get("/stats", func(w http.ResponseWriter, r *http.Request) {}) // v1 api stats (calls, clients, xbee connected, meta health ok) return r } // apiV1Subscriber is a websocket session for the v1 api. type apiV1Subscriber struct { - idFilter []uint64 // list of Ids to subscribe to. If it's empty, subscribes to all. + idFilter []uint32 // list of Ids to subscribe to. If it's empty, subscribes to all. } -func apiV1PacketSubscribe(broker *JBroker, db *TelemDb) http.HandlerFunc { +func apiV1PacketSubscribe(broker *Broker, db *TelemDb) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { conn_id := r.RemoteAddr + uuid.New().String() sub, err := broker.Subscribe(conn_id) @@ -118,6 +122,7 @@ func apiV1PacketSubscribe(broker *JBroker, db *TelemDb) http.HandlerFunc { defer broker.Unsubscribe(conn_id) // attempt to upgrade. c, err := websocket.Accept(w, r, nil) + c.Ping(r.Context()) if err != nil { // TODO: is this the correct option? w.WriteHeader(http.StatusInternalServerError) @@ -151,3 +156,28 @@ func apiV1PacketSubscribe(broker *JBroker, db *TelemDb) http.HandlerFunc { } } + +// TODO: rename. record is not a clear name. Runs? drives? segments? +func apiV1GetRecords(db *TelemDb) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + + } +} + +func apiV1GetActiveRecord(db *TelemDb) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + + } +} + +func apiV1StartRecord(db *TelemDb) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) {} +} + +func apiV1GetRecord(db *TelemDb) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) {} +} + +func apiV1UpdateRecord(db *TelemDb) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) {} +} diff --git a/internal/badger/db.go b/internal/badger/db.go deleted file mode 100644 index eb423d4..0000000 --- a/internal/badger/db.go +++ /dev/null @@ -1,3 +0,0 @@ -package badger - -// this file has a global internal K/V database used for sessions/stats/???