This commit is contained in:
saji 2023-06-30 07:40:50 -05:00
parent c3d6c3b553
commit c52bccb140
4 changed files with 185 additions and 114 deletions

View file

@ -1,22 +0,0 @@
package gotelem
import (
"github.com/jmoiron/sqlx"
_ "github.com/mattn/go-sqlite3"
)
// this file implements a CAN adapter for the sqlite db.
type CanDB struct {
Db *sqlx.DB
}
func (cdb *CanDB) Send(_ *Frame) error {
panic("not implemented") // TODO: Implement
}
func (cdb *CanDB) Recv() (*Frame, error) {
panic("not implemented") // TODO: Implement
}
func NewCanDB() {}

212
db.go
View file

@ -1,17 +1,27 @@
package gotelem
// this file implements the database functions to load/store/read from a sql database.
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"github.com/jmoiron/sqlx"
"github.com/kschamplin/gotelem/skylab"
_ "github.com/mattn/go-sqlite3"
sqlite3 "github.com/mattn/go-sqlite3"
)
func init() {
sql.Register("custom_sqlite3", &sqlite3.SQLiteDriver{
// TODO: add functions that convert between unix milliseconds and ISO 8601
})
}
type TelemDb struct {
db *sqlx.DB
}
@ -34,23 +44,20 @@ func OpenTelemDb(path string, options ...TelemDbOption) (tdb *TelemDb, err error
}
// execute database up statement (better hope it is idempotent!)
// FIXME: only do this when it's a new database (instead warn the user about potential version mismatches)
// TODO: store gotelem version (commit hash?) in DB (PRAGMA user_version)
_, err = tdb.db.Exec(sqlDbUp)
if err != nil {
}
return tdb, nil
return tdb, err
}
// the sql commands to create the database.
const sqlDbUp = `
CREATE TABLE IF NOT EXISTS "bus_events" (
"ts" REAL NOT NULL, -- timestamp
"ts" INTEGER NOT NULL, -- timestamp, unix milliseconds
"id" INTEGER NOT NULL, -- can ID
"name" TEXT NOT NULL, -- name of base packet
"index" INTEGER, -- index of the repeated packet (base_id = id - index)
"packet" TEXT NOT NULL CHECK(json_valid(packet)) -- JSON object describing the data
"data" TEXT NOT NULL CHECK(json_valid(data)) -- JSON object describing the data, including index if any
);
CREATE INDEX IF NOT EXISTS "ids_timestamped" ON "bus_events" (
@ -63,14 +70,26 @@ CREATE INDEX IF NOT EXISTS "times" ON "bus_events" (
);
-- this table shows when we started/stopped logging.
CREATE TABLE "bus_records" (
"id" INTEGER NOT NULL UNIQUE,
"start_time" INTEGER NOT NULL,
"end_time" INTEGER,
"note" TEXT,
CREATE TABLE "drive_records" (
"id" INTEGER NOT NULL UNIQUE, -- unique ID of the drive.
"start_time" INTEGER NOT NULL, -- when the drive started
"end_time" INTEGER, -- when it ended, or NULL if it's ongoing.
"note" TEXT, -- optional description of the segment/experiment/drive
PRIMARY KEY("id" AUTOINCREMENT),
CONSTRAINT "duration_valid" CHECK(end_time is null or start_time < end_time)
);
-- gps logs TODO: use GEOJSON/Spatialite tracks instead?
CREATE TABLE "position_logs" (
"ts" INTEGER NOT NULL,
"source" TEXT NOT NULL,
"lat" REAL NOT NULL,
"lon" REAL NOT NULL,
"elevation" REAL,
CONSTRAINT "no_empty_source" CHECK(source is not "")
);
-- TODO: ensure only one "active" (end_time == null) drive record at a time using triggers/constraints/index
`
// sql sequence to tear down the database.
@ -81,103 +100,150 @@ DROP TABLE "bus_events";
DROP INDEX "ids_timestamped";
DROP INDEX "times";
DROP TABLE "bus_records";
DROP TABLE "drive_records";
DROP TABLE "position_logs";
`
// sql expression to insert a bus event into the packets database.1
const sqlInsertEvent = `
INSERT INTO "bus_events" (time, can_id, name, index, packet) VALUES ($1, $2, $3, json($4));
INSERT INTO "bus_events" (time, can_id, name, packet) VALUES ($1, $2, $3, json($4));
`
// AddEvent adds the bus event to the database.
func (tdb *TelemDb) AddEvents(events ...skylab.BusEvent) {
func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...skylab.BusEvent) (err error) {
//
tx, err := tdb.db.Begin()
tx, err := tdb.db.BeginTx(ctx, nil)
if err != nil {
tx.Rollback()
return
}
for _, b := range events {
j, err := json.Marshal(b.Data)
var j []byte
j, err = json.Marshal(b.Data)
if err != nil {
tx.Rollback()
return
}
tx.Exec(sqlInsertEvent, b.Timestamp, b.Id, b.Name, j)
tx.ExecContext(ctx, sqlInsertEvent, b.Timestamp.UnixMilli(), b.Id, b.Data.String(), j)
}
tx.Commit()
return
}
// QueryIdString is a string that filters ids from the set. use ID query functions to
// create them.
type QueryIdString string
func (tdb *TelemDb) AddEvents(events ...skylab.BusEvent) (err error) {
// QueryIds constructs a CAN Id filter for one or more distinct Ids.
// For a range of ids, use QueryIdRange(start, stop uint32)
func QueryIds(ids ...uint32) QueryIdString {
// FIXME: zero elements case?
var idsString []string
for _, id := range ids {
idsString = append(idsString, strconv.FormatUint(uint64(id), 10))
return tdb.AddEventsCtx(context.Background(), events...)
}
/// Query fragment guide:
/// We need to be able to easily construct safe(!) and meaningful queries programatically
/// so we make some new types that can be turned into SQL fragments that go inside the where clause.
/// These all implement the QueryFrag interface, meaning the actual query function (that acts on the DB)
/// can deal with them agnostically. The Query function joins all the fragments it is given with AND.
/// to get OR,
// QueryFrag is anything that can be turned into a Query WHERE clause
type QueryFrag interface {
Query() string
}
// QueryIdRange represents a range of IDs to select for, inclusive.
type QueryIdRange struct {
Start uint32
End uint32
}
func (q *QueryIdRange) Query() string {
return fmt.Sprintf("id BETWEEN %d AND %d", q.Start, q.End)
}
// QueryIds selects for individual CAN ids
type QueryIds []uint32
func (q QueryIds) Query() string {
var idStrings []string
for _, id := range q {
idStrings = append(idStrings, strconv.FormatUint(uint64(id), 10))
}
return QueryIdString("id IN (" + strings.Join(idsString, ",") + ")")
return fmt.Sprintf("id IN (%s)", strings.Join(idStrings, ","))
}
func QueryIdsInv(ids ...uint32) QueryIdString {
// QueryTimeRange represents a query of a specific time range. For "before" or "after" queries,
// use time.Unix(0,0) or time.Now() in start and end respectively.
type QueryTimeRange struct {
Start time.Time
End time.Time
}
// QueryIdRange selects all IDs between start and end, *inclusive*.
// This function is preferred over a generated list of IDs.
func QueryIdRange(start, end uint32) QueryIdString {
startString := strconv.FormatUint(uint64(start), 10)
endString := strconv.FormatUint(uint64(end), 10)
return QueryIdString("id BETWEEN " + startString + " AND " + endString)
func (q *QueryTimeRange) Query() string {
startUnix := q.Start.UnixMilli()
endUnix := q.End.UnixMilli()
return fmt.Sprintf("ts BETWEEN %d AND %d", startUnix, endUnix)
}
// QueryIdRangeInv removes all IDs between start and end from the results.
// See QueryIdRange for more details.
func QueryIdRangeInv(start, end uint32) QueryIdString {
return QueryIdString("NOT ") + QueryIdRange(start, end)
type QueryNames []string
func (q QueryNames) Query() string {
return fmt.Sprintf("name IN (%s)", strings.Join(q, ", "))
}
type QueryTimestampString string
type QueryOr []QueryFrag
// QueryDuration takes a start and end time and filters where the packets are between that time range.
func QueryDuration(start, end time.Time) QueryTimestampString {
// the time in the database is a float, we have a time.Time so use unixNano() / 1e9 to float it.
startString := strconv.FormatFloat(float64(start.UnixNano())/1e9, 'f', -1, 64)
endString := strconv.FormatFloat(float64(start.UnixNano())/1e9, 'f', -1, 64)
return QueryTimestampString("ts BETWEEN " + startString + " AND " + endString)
func (q QueryOr) Query() string {
var qStrings []string
for _, frag := range q {
qStrings = append(qStrings, frag.Query())
}
return fmt.Sprintf("(%s)", strings.Join(qStrings, " OR "))
}
type QueryNameString string
func QueryNames(names ...string) QueryNameString
func QueryNamesInv(names ...string) QueryNameString
// Describes the parameters for an event query
type EventsQuery struct {
Ids []QueryIdString // Ids contains a list of CAN ID filters that are OR'd together.
Times []QueryTimestampString
Names []QueryNameString
Limit uint // max number of results.
}
const eventQueryFmtString = `SELECT * FROM "bus_events" WHERE %s LIMIT %d`
// GetEvents is the mechanism to request underlying event data.
// it takes functions (which are defined in db.go) that modify the query,
// and then return the results.
func (tdb *TelemDb) GetEvents(q *EventsQuery) []skylab.BusEvent {
// if function is inverse, AND and OR are switched.
// Demorgan's
// how to know if function is inverted???
return nil
func (tdb *TelemDb) GetEvents(limit int, where ...QueryFrag) (events []skylab.BusEvent, err error) {
// Simple mechanism for combining query frags:
// join with " AND ". To join expressions with or, use QueryOr
var fragStr []string
for _, f := range where {
fragStr = append(fragStr, f.Query())
}
qString := fmt.Sprintf("SELECT * FROM \"bus_events\" WHERE %s LIMIT %d", strings.Join(fragStr, " AND "), limit)
rows, err := tdb.db.Queryx(qString)
if err != nil {
return
}
defer rows.Close()
if limit < 0 { // special case: limit negative means unrestricted.
events = make([]skylab.BusEvent, 0, 20)
} else {
events = make([]skylab.BusEvent, 0, limit)
}
// scan rows into busevent list...
for rows.Next() {
var ev skylab.RawJsonEvent
err = rows.StructScan(&ev)
if err != nil {
return
}
BusEv := skylab.BusEvent{
Timestamp: time.UnixMilli(int64(ev.Timestamp)),
Id: ev.Id,
}
BusEv.Data, err = skylab.FromJson(ev.Id, ev.Data)
// FIXME: this is slow!
events = append(events, BusEv)
}
err = rows.Err()
return
}

62
http.go
View file

@ -19,12 +19,12 @@ type slogHttpLogger struct {
slog.Logger
}
func TelemRouter(log *slog.Logger, broker *JBroker, db *TelemDb) http.Handler {
func TelemRouter(log *slog.Logger, broker *Broker, db *TelemDb) http.Handler {
r := chi.NewRouter()
r.Use(middleware.RequestID)
r.Use(middleware.RealIP)
r.Use(middleware.Logger) // TODO: integrate with slog
r.Use(middleware.Logger) // TODO: integrate with slog instead of go default logger.
r.Use(middleware.Recoverer)
r.Get("/schema", func(w http.ResponseWriter, r *http.Request) {
@ -50,12 +50,16 @@ func TelemRouter(log *slog.Logger, broker *JBroker, db *TelemDb) http.Handler {
}
// define API version 1 routes.
func apiV1(broker *JBroker, db *TelemDb) chi.Router {
func apiV1(broker *Broker, db *TelemDb) chi.Router {
r := chi.NewRouter()
// this API only accepts JSON.
r.Use(middleware.AllowContentType("application/json"))
// no caching - always get the latest data.
r.Use(middleware.NoCache)
r.Get("/schema", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
// return the spicy json response.
w.WriteHeader(http.StatusOK)
// return the Skylab JSON definitions
w.Write([]byte(skylab.SkylabDefinitions))
})
@ -73,12 +77,14 @@ func apiV1(broker *JBroker, db *TelemDb) chi.Router {
return
})
r.Get("/", func(w http.ResponseWriter, r *http.Request) {
// this should use query params to return a list of packets.
// this should use http query params o return a list of packets.
})
// this is to get packets by a name.
r.Get("/{name:[a-z_]+}", func(w http.ResponseWriter, r *http.Request) {
// support field getting (matching too?)
// support limit
})
@ -86,27 +92,25 @@ func apiV1(broker *JBroker, db *TelemDb) chi.Router {
// records are driving segments/runs.
r.Route("/records", func(r chi.Router) {
r.Get("/") // get all runs
r.Get("/active") // get current run (no end time)
r.Post("/") // create a new run (with note). Ends active run if any, and creates new active run (no end time)
r.Get("/{id}") // get details on a specific run
r.Put("/{id}") // update a specific run. Can only be used to add notes/metadata, and not to change time/id.
r.Get("/", apiV1GetRecords(db)) // get all runs
r.Get("/active", apiV1GetActiveRecord(db)) // get current run (no end time)
r.Post("/", apiV1StartRecord(db)) // create a new run (with note). Ends active run if any, and creates new active run (no end time)
r.Get("/{id}", apiV1GetRecord(db)) // get details on a specific run
r.Put("/{id}", apiV1UpdateRecord(db)) // update a specific run. Can only be used to add notes/metadata, and not to change time/id.
})
r.Get("/stats") // v1 api stats (calls, clients, xbee connected, meta health ok)
r.
r.Get("/stats", func(w http.ResponseWriter, r *http.Request) {}) // v1 api stats (calls, clients, xbee connected, meta health ok)
return r
}
// apiV1Subscriber is a websocket session for the v1 api.
type apiV1Subscriber struct {
idFilter []uint64 // list of Ids to subscribe to. If it's empty, subscribes to all.
idFilter []uint32 // list of Ids to subscribe to. If it's empty, subscribes to all.
}
func apiV1PacketSubscribe(broker *JBroker, db *TelemDb) http.HandlerFunc {
func apiV1PacketSubscribe(broker *Broker, db *TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
conn_id := r.RemoteAddr + uuid.New().String()
sub, err := broker.Subscribe(conn_id)
@ -118,6 +122,7 @@ func apiV1PacketSubscribe(broker *JBroker, db *TelemDb) http.HandlerFunc {
defer broker.Unsubscribe(conn_id)
// attempt to upgrade.
c, err := websocket.Accept(w, r, nil)
c.Ping(r.Context())
if err != nil {
// TODO: is this the correct option?
w.WriteHeader(http.StatusInternalServerError)
@ -151,3 +156,28 @@ func apiV1PacketSubscribe(broker *JBroker, db *TelemDb) http.HandlerFunc {
}
}
// TODO: rename. record is not a clear name. Runs? drives? segments?
func apiV1GetRecords(db *TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
}
}
func apiV1GetActiveRecord(db *TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
}
}
func apiV1StartRecord(db *TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {}
}
func apiV1GetRecord(db *TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {}
}
func apiV1UpdateRecord(db *TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {}
}

View file

@ -1,3 +0,0 @@
package badger
// this file has a global internal K/V database used for sessions/stats/???