diff --git a/db.go b/db.go index 843e8c7..a467f74 100644 --- a/db.go +++ b/db.go @@ -147,19 +147,12 @@ func (l *LimitOffsetModifier) ModifyStatement(sb *strings.Builder) error { return nil } -type OrderByTimestampModifer struct { -} - -func (o *OrderByTimestampModifer) ModifyStatement(sb *strings.Builder) error { - sb.WriteString(" ORDER BY ts DESC") - return nil -} - // BusEventFilter is a filter for bus events. type BusEventFilter struct { - Names []string - TimerangeStart time.Time - TimerangeEnd time.Time + Names []string // The name(s) of packets to filter for + StartTime time.Time // Starting time range. All packets >= StartTime + EndTime time.Time // Ending time range. All packets <= EndTime + Indexes []int // The specific index of the packets to index. } // now we can optionally add a limit. @@ -179,18 +172,27 @@ func (tdb *TelemDb) GetPackets(ctx context.Context, filter BusEventFilter, optio // using BETWEEN since apparenlty that can be better? // next, check if we have a start/end time, add constraints - if !filter.TimerangeEnd.IsZero() { - qString := fmt.Sprintf("ts <= %d", filter.TimerangeEnd.UnixMilli()) + if !filter.EndTime.IsZero() { + qString := fmt.Sprintf("ts <= %d", filter.EndTime.UnixMilli()) whereFrags = append(whereFrags, qString) } - if !filter.TimerangeStart.IsZero() { + if !filter.StartTime.IsZero() { // we have an end range - qString := fmt.Sprintf("ts >= %d", filter.TimerangeStart.UnixMilli()) + qString := fmt.Sprintf("ts >= %d", filter.StartTime.UnixMilli()) + whereFrags = append(whereFrags, qString) + } + if len(filter.Indexes) > 0 { + s := make([]string, 0) + for _, idx := range filter.Indexes { + s = append(s, fmt.Sprint(idx)) + } + idxs := strings.Join(s, ", ") + qString := fmt.Sprintf(`idx in (%s)`, idxs) whereFrags = append(whereFrags, qString) } sb := strings.Builder{} - sb.WriteString(`SELECT * from "bus_events"`) + sb.WriteString(`SELECT ts, name, data from "bus_events"`) // construct the full statement. if len(whereFrags) > 0 { // use the where clauses. @@ -198,6 +200,8 @@ func (tdb *TelemDb) GetPackets(ctx context.Context, filter BusEventFilter, optio sb.WriteString(strings.Join(whereFrags, " AND ")) } + sb.WriteString(" ORDER BY ts DESC") + // Augment our data further if there's i.e a limit modifier. // TODO: factor this out maybe? for _, m := range options { @@ -247,29 +251,39 @@ type Datum struct { // GetValues queries the database for values in a given time range. // A value is a specific data point. For example, bms_measurement.current // would be a value. -func (tdb *TelemDb) GetValues(ctx context.Context, bef BusEventFilter, +func (tdb *TelemDb) GetValues(ctx context.Context, filter BusEventFilter, field string, opts ...QueryModifier) ([]Datum, error) { // this fragment uses json_extract from sqlite to get a single // nested value. sb := strings.Builder{} sb.WriteString(`SELECT ts as timestamp, json_extract(data, '$.' || ?) as val FROM bus_events WHERE `) - if len(bef.Names) != 1 { + if len(filter.Names) != 1 { return nil, errors.New("invalid number of names") } + whereFrags := []string{"name is ?"} - qStrings := []string{"name is ?"} - // add timestamp limit. - if !bef.TimerangeStart.IsZero() { - qString := fmt.Sprintf("ts >= %d", bef.TimerangeStart.UnixMilli()) - qStrings = append(qStrings, qString) + if !filter.StartTime.IsZero() { + qString := fmt.Sprintf("ts >= %d", filter.StartTime.UnixMilli()) + whereFrags = append(whereFrags, qString) } - if !bef.TimerangeEnd.IsZero() { - qString := fmt.Sprintf("ts <= %d", bef.TimerangeEnd.UnixMilli()) - qStrings = append(qStrings, qString) + if !filter.EndTime.IsZero() { + qString := fmt.Sprintf("ts <= %d", filter.EndTime.UnixMilli()) + whereFrags = append(whereFrags, qString) + } + if len(filter.Indexes) > 0 { + s := make([]string, 0) + for _, idx := range filter.Indexes { + s = append(s, fmt.Sprint(idx)) + } + idxs := strings.Join(s, ", ") + qString := fmt.Sprintf(`idx in (%s)`, idxs) + whereFrags = append(whereFrags, qString) } // join qstrings with AND - sb.WriteString(strings.Join(qStrings, " AND ")) + sb.WriteString(strings.Join(whereFrags, " AND ")) + + sb.WriteString(" ORDER BY ts DESC") for _, m := range opts { if m == nil { @@ -277,7 +291,7 @@ func (tdb *TelemDb) GetValues(ctx context.Context, bef BusEventFilter, } m.ModifyStatement(&sb) } - rows, err := tdb.db.QueryxContext(ctx, sb.String(), field, bef.Names[0]) + rows, err := tdb.db.QueryxContext(ctx, sb.String(), field, filter.Names[0]) if err != nil { return nil, err } diff --git a/http.go b/http.go index 388a4d3..aa79613 100644 --- a/http.go +++ b/http.go @@ -31,7 +31,7 @@ func extractBusEventFilter(r *http.Request) (*BusEventFilter, error) { if err != nil { return bef, err } - bef.TimerangeStart = t + bef.StartTime = t } if el := v.Get("end"); el != "" { // parse the start time query. @@ -39,7 +39,15 @@ func extractBusEventFilter(r *http.Request) (*BusEventFilter, error) { if err != nil { return bef, err } - bef.TimerangeEnd = t + bef.EndTime = t + } + bef.Indexes = make([]int, 0) + for _, strIdx := range v["idx"] { + idx, err := strconv.ParseInt(strIdx, 10, 32) + if err != nil { + return nil, err + } + bef.Indexes = append(bef.Indexes, int(idx)) } return bef, nil } @@ -132,16 +140,21 @@ func apiV1(broker *Broker, tdb *TelemDb) chi.Router { }) - // records are driving segments/runs. - r.Route("/records", func(r chi.Router) { - r.Get("/", apiV1GetRecords(tdb)) // get all runs - r.Get("/active", apiV1GetActiveRecord(tdb)) // get current run (no end time) - r.Post("/", apiV1StartRecord(tdb)) // create a new run (with note). Ends active run if any, and creates new active run (no end time) - r.Get("/{id}", apiV1GetRecord(tdb)) // get details on a specific run - r.Put("/{id}", apiV1UpdateRecord(tdb)) // update a specific run. Can only be used to add notes/metadata, and not to change time/id. + // OpenMCT domain object storage. Basically an arbitrary JSON document store + r.Route("/openmct", func(r chi.Router) { + // key is a column on our json store, it's nested under identifier.key + r.Get("/{key}", func(w http.ResponseWriter, r *http.Request) {}) + r.Put("/{key}", func(w http.ResponseWriter, r *http.Request) {}) + r.Delete("/{key}", func(w http.ResponseWriter, r *http.Request) {}) + // create a new object. + r.Post("/", func(w http.ResponseWriter, r *http.Request) {}) + // subscribe to object updates. + r.Get("/subscribe", func(w http.ResponseWriter, r *http.Request) {}) }) + // records are driving segments/runs. + r.Get("/stats", func(w http.ResponseWriter, r *http.Request) { }) // v1 api stats (calls, clients, xbee connected, meta health ok) @@ -215,6 +228,7 @@ func apiV1GetPackets(tdb *TelemDb) http.HandlerFunc { lim, err := extractLimitModifier(r) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) + fmt.Print(lim) return } @@ -264,14 +278,13 @@ func apiV1GetValues(db *TelemDb) http.HandlerFunc { // override the bus event filter name option bef.Names = []string{name} - var order = &OrderByTimestampModifer{} var res []Datum // make the call, skip the limit modifier if it's nil. if lim == nil { - res, err = db.GetValues(r.Context(), *bef, field, order) + res, err = db.GetValues(r.Context(), *bef, field) } else { - res, err = db.GetValues(r.Context(), *bef, field, lim, order) + res, err = db.GetValues(r.Context(), *bef, field, lim) } if err != nil { // 500 server error: @@ -287,28 +300,3 @@ func apiV1GetValues(db *TelemDb) http.HandlerFunc { } } - -// TODO: rename. record is not a clear name. Runs? drives? segments? -func apiV1GetRecords(db *TelemDb) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - - } -} - -func apiV1GetActiveRecord(db *TelemDb) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - - } -} - -func apiV1StartRecord(db *TelemDb) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) {} -} - -func apiV1GetRecord(db *TelemDb) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) {} -} - -func apiV1UpdateRecord(db *TelemDb) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) {} -} diff --git a/migrations/6_bus_index_column_down.sql b/migrations/6_bus_index_column_down.sql new file mode 100644 index 0000000..b37fb42 --- /dev/null +++ b/migrations/6_bus_index_column_down.sql @@ -0,0 +1 @@ +ALTER TABLE "bus_events" DROP COLUMN idx; diff --git a/migrations/6_bus_index_column_up.sql b/migrations/6_bus_index_column_up.sql new file mode 100644 index 0000000..19090e6 --- /dev/null +++ b/migrations/6_bus_index_column_up.sql @@ -0,0 +1 @@ +ALTER TABLE "bus_events" ADD COLUMN idx GENERATED ALWAYS AS (json_extract(data, '$.idx')) VIRTUAL;