add repeated packet support using index parameter
This commit is contained in:
parent
b266a84324
commit
c9b73ee006
70
db.go
70
db.go
|
@ -147,19 +147,12 @@ func (l *LimitOffsetModifier) ModifyStatement(sb *strings.Builder) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type OrderByTimestampModifer struct {
|
|
||||||
}
|
|
||||||
|
|
||||||
func (o *OrderByTimestampModifer) ModifyStatement(sb *strings.Builder) error {
|
|
||||||
sb.WriteString(" ORDER BY ts DESC")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// BusEventFilter is a filter for bus events.
|
// BusEventFilter is a filter for bus events.
|
||||||
type BusEventFilter struct {
|
type BusEventFilter struct {
|
||||||
Names []string
|
Names []string // The name(s) of packets to filter for
|
||||||
TimerangeStart time.Time
|
StartTime time.Time // Starting time range. All packets >= StartTime
|
||||||
TimerangeEnd time.Time
|
EndTime time.Time // Ending time range. All packets <= EndTime
|
||||||
|
Indexes []int // The specific index of the packets to index.
|
||||||
}
|
}
|
||||||
|
|
||||||
// now we can optionally add a limit.
|
// now we can optionally add a limit.
|
||||||
|
@ -179,18 +172,27 @@ func (tdb *TelemDb) GetPackets(ctx context.Context, filter BusEventFilter, optio
|
||||||
// using BETWEEN since apparenlty that can be better?
|
// using BETWEEN since apparenlty that can be better?
|
||||||
|
|
||||||
// next, check if we have a start/end time, add constraints
|
// next, check if we have a start/end time, add constraints
|
||||||
if !filter.TimerangeEnd.IsZero() {
|
if !filter.EndTime.IsZero() {
|
||||||
qString := fmt.Sprintf("ts <= %d", filter.TimerangeEnd.UnixMilli())
|
qString := fmt.Sprintf("ts <= %d", filter.EndTime.UnixMilli())
|
||||||
whereFrags = append(whereFrags, qString)
|
whereFrags = append(whereFrags, qString)
|
||||||
}
|
}
|
||||||
if !filter.TimerangeStart.IsZero() {
|
if !filter.StartTime.IsZero() {
|
||||||
// we have an end range
|
// we have an end range
|
||||||
qString := fmt.Sprintf("ts >= %d", filter.TimerangeStart.UnixMilli())
|
qString := fmt.Sprintf("ts >= %d", filter.StartTime.UnixMilli())
|
||||||
|
whereFrags = append(whereFrags, qString)
|
||||||
|
}
|
||||||
|
if len(filter.Indexes) > 0 {
|
||||||
|
s := make([]string, 0)
|
||||||
|
for _, idx := range filter.Indexes {
|
||||||
|
s = append(s, fmt.Sprint(idx))
|
||||||
|
}
|
||||||
|
idxs := strings.Join(s, ", ")
|
||||||
|
qString := fmt.Sprintf(`idx in (%s)`, idxs)
|
||||||
whereFrags = append(whereFrags, qString)
|
whereFrags = append(whereFrags, qString)
|
||||||
}
|
}
|
||||||
|
|
||||||
sb := strings.Builder{}
|
sb := strings.Builder{}
|
||||||
sb.WriteString(`SELECT * from "bus_events"`)
|
sb.WriteString(`SELECT ts, name, data from "bus_events"`)
|
||||||
// construct the full statement.
|
// construct the full statement.
|
||||||
if len(whereFrags) > 0 {
|
if len(whereFrags) > 0 {
|
||||||
// use the where clauses.
|
// use the where clauses.
|
||||||
|
@ -198,6 +200,8 @@ func (tdb *TelemDb) GetPackets(ctx context.Context, filter BusEventFilter, optio
|
||||||
sb.WriteString(strings.Join(whereFrags, " AND "))
|
sb.WriteString(strings.Join(whereFrags, " AND "))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sb.WriteString(" ORDER BY ts DESC")
|
||||||
|
|
||||||
// Augment our data further if there's i.e a limit modifier.
|
// Augment our data further if there's i.e a limit modifier.
|
||||||
// TODO: factor this out maybe?
|
// TODO: factor this out maybe?
|
||||||
for _, m := range options {
|
for _, m := range options {
|
||||||
|
@ -247,29 +251,39 @@ type Datum struct {
|
||||||
// GetValues queries the database for values in a given time range.
|
// GetValues queries the database for values in a given time range.
|
||||||
// A value is a specific data point. For example, bms_measurement.current
|
// A value is a specific data point. For example, bms_measurement.current
|
||||||
// would be a value.
|
// would be a value.
|
||||||
func (tdb *TelemDb) GetValues(ctx context.Context, bef BusEventFilter,
|
func (tdb *TelemDb) GetValues(ctx context.Context, filter BusEventFilter,
|
||||||
field string, opts ...QueryModifier) ([]Datum, error) {
|
field string, opts ...QueryModifier) ([]Datum, error) {
|
||||||
// this fragment uses json_extract from sqlite to get a single
|
// this fragment uses json_extract from sqlite to get a single
|
||||||
// nested value.
|
// nested value.
|
||||||
sb := strings.Builder{}
|
sb := strings.Builder{}
|
||||||
sb.WriteString(`SELECT ts as timestamp, json_extract(data, '$.' || ?) as val FROM bus_events WHERE `)
|
sb.WriteString(`SELECT ts as timestamp, json_extract(data, '$.' || ?) as val FROM bus_events WHERE `)
|
||||||
if len(bef.Names) != 1 {
|
if len(filter.Names) != 1 {
|
||||||
return nil, errors.New("invalid number of names")
|
return nil, errors.New("invalid number of names")
|
||||||
}
|
}
|
||||||
|
whereFrags := []string{"name is ?"}
|
||||||
|
|
||||||
qStrings := []string{"name is ?"}
|
if !filter.StartTime.IsZero() {
|
||||||
// add timestamp limit.
|
qString := fmt.Sprintf("ts >= %d", filter.StartTime.UnixMilli())
|
||||||
if !bef.TimerangeStart.IsZero() {
|
whereFrags = append(whereFrags, qString)
|
||||||
qString := fmt.Sprintf("ts >= %d", bef.TimerangeStart.UnixMilli())
|
|
||||||
qStrings = append(qStrings, qString)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if !bef.TimerangeEnd.IsZero() {
|
if !filter.EndTime.IsZero() {
|
||||||
qString := fmt.Sprintf("ts <= %d", bef.TimerangeEnd.UnixMilli())
|
qString := fmt.Sprintf("ts <= %d", filter.EndTime.UnixMilli())
|
||||||
qStrings = append(qStrings, qString)
|
whereFrags = append(whereFrags, qString)
|
||||||
|
}
|
||||||
|
if len(filter.Indexes) > 0 {
|
||||||
|
s := make([]string, 0)
|
||||||
|
for _, idx := range filter.Indexes {
|
||||||
|
s = append(s, fmt.Sprint(idx))
|
||||||
|
}
|
||||||
|
idxs := strings.Join(s, ", ")
|
||||||
|
qString := fmt.Sprintf(`idx in (%s)`, idxs)
|
||||||
|
whereFrags = append(whereFrags, qString)
|
||||||
}
|
}
|
||||||
// join qstrings with AND
|
// join qstrings with AND
|
||||||
sb.WriteString(strings.Join(qStrings, " AND "))
|
sb.WriteString(strings.Join(whereFrags, " AND "))
|
||||||
|
|
||||||
|
sb.WriteString(" ORDER BY ts DESC")
|
||||||
|
|
||||||
for _, m := range opts {
|
for _, m := range opts {
|
||||||
if m == nil {
|
if m == nil {
|
||||||
|
@ -277,7 +291,7 @@ func (tdb *TelemDb) GetValues(ctx context.Context, bef BusEventFilter,
|
||||||
}
|
}
|
||||||
m.ModifyStatement(&sb)
|
m.ModifyStatement(&sb)
|
||||||
}
|
}
|
||||||
rows, err := tdb.db.QueryxContext(ctx, sb.String(), field, bef.Names[0])
|
rows, err := tdb.db.QueryxContext(ctx, sb.String(), field, filter.Names[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
62
http.go
62
http.go
|
@ -31,7 +31,7 @@ func extractBusEventFilter(r *http.Request) (*BusEventFilter, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return bef, err
|
return bef, err
|
||||||
}
|
}
|
||||||
bef.TimerangeStart = t
|
bef.StartTime = t
|
||||||
}
|
}
|
||||||
if el := v.Get("end"); el != "" {
|
if el := v.Get("end"); el != "" {
|
||||||
// parse the start time query.
|
// parse the start time query.
|
||||||
|
@ -39,7 +39,15 @@ func extractBusEventFilter(r *http.Request) (*BusEventFilter, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return bef, err
|
return bef, err
|
||||||
}
|
}
|
||||||
bef.TimerangeEnd = t
|
bef.EndTime = t
|
||||||
|
}
|
||||||
|
bef.Indexes = make([]int, 0)
|
||||||
|
for _, strIdx := range v["idx"] {
|
||||||
|
idx, err := strconv.ParseInt(strIdx, 10, 32)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
bef.Indexes = append(bef.Indexes, int(idx))
|
||||||
}
|
}
|
||||||
return bef, nil
|
return bef, nil
|
||||||
}
|
}
|
||||||
|
@ -132,16 +140,21 @@ func apiV1(broker *Broker, tdb *TelemDb) chi.Router {
|
||||||
|
|
||||||
})
|
})
|
||||||
|
|
||||||
// records are driving segments/runs.
|
// OpenMCT domain object storage. Basically an arbitrary JSON document store
|
||||||
r.Route("/records", func(r chi.Router) {
|
|
||||||
r.Get("/", apiV1GetRecords(tdb)) // get all runs
|
|
||||||
r.Get("/active", apiV1GetActiveRecord(tdb)) // get current run (no end time)
|
|
||||||
r.Post("/", apiV1StartRecord(tdb)) // create a new run (with note). Ends active run if any, and creates new active run (no end time)
|
|
||||||
r.Get("/{id}", apiV1GetRecord(tdb)) // get details on a specific run
|
|
||||||
r.Put("/{id}", apiV1UpdateRecord(tdb)) // update a specific run. Can only be used to add notes/metadata, and not to change time/id.
|
|
||||||
|
|
||||||
|
r.Route("/openmct", func(r chi.Router) {
|
||||||
|
// key is a column on our json store, it's nested under identifier.key
|
||||||
|
r.Get("/{key}", func(w http.ResponseWriter, r *http.Request) {})
|
||||||
|
r.Put("/{key}", func(w http.ResponseWriter, r *http.Request) {})
|
||||||
|
r.Delete("/{key}", func(w http.ResponseWriter, r *http.Request) {})
|
||||||
|
// create a new object.
|
||||||
|
r.Post("/", func(w http.ResponseWriter, r *http.Request) {})
|
||||||
|
// subscribe to object updates.
|
||||||
|
r.Get("/subscribe", func(w http.ResponseWriter, r *http.Request) {})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// records are driving segments/runs.
|
||||||
|
|
||||||
r.Get("/stats", func(w http.ResponseWriter, r *http.Request) {
|
r.Get("/stats", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
}) // v1 api stats (calls, clients, xbee connected, meta health ok)
|
}) // v1 api stats (calls, clients, xbee connected, meta health ok)
|
||||||
|
@ -215,6 +228,7 @@ func apiV1GetPackets(tdb *TelemDb) http.HandlerFunc {
|
||||||
lim, err := extractLimitModifier(r)
|
lim, err := extractLimitModifier(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
fmt.Print(lim)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -264,14 +278,13 @@ func apiV1GetValues(db *TelemDb) http.HandlerFunc {
|
||||||
// override the bus event filter name option
|
// override the bus event filter name option
|
||||||
bef.Names = []string{name}
|
bef.Names = []string{name}
|
||||||
|
|
||||||
var order = &OrderByTimestampModifer{}
|
|
||||||
|
|
||||||
var res []Datum
|
var res []Datum
|
||||||
// make the call, skip the limit modifier if it's nil.
|
// make the call, skip the limit modifier if it's nil.
|
||||||
if lim == nil {
|
if lim == nil {
|
||||||
res, err = db.GetValues(r.Context(), *bef, field, order)
|
res, err = db.GetValues(r.Context(), *bef, field)
|
||||||
} else {
|
} else {
|
||||||
res, err = db.GetValues(r.Context(), *bef, field, lim, order)
|
res, err = db.GetValues(r.Context(), *bef, field, lim)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// 500 server error:
|
// 500 server error:
|
||||||
|
@ -287,28 +300,3 @@ func apiV1GetValues(db *TelemDb) http.HandlerFunc {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: rename. record is not a clear name. Runs? drives? segments?
|
|
||||||
func apiV1GetRecords(db *TelemDb) http.HandlerFunc {
|
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func apiV1GetActiveRecord(db *TelemDb) http.HandlerFunc {
|
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func apiV1StartRecord(db *TelemDb) http.HandlerFunc {
|
|
||||||
return func(w http.ResponseWriter, r *http.Request) {}
|
|
||||||
}
|
|
||||||
|
|
||||||
func apiV1GetRecord(db *TelemDb) http.HandlerFunc {
|
|
||||||
return func(w http.ResponseWriter, r *http.Request) {}
|
|
||||||
}
|
|
||||||
|
|
||||||
func apiV1UpdateRecord(db *TelemDb) http.HandlerFunc {
|
|
||||||
return func(w http.ResponseWriter, r *http.Request) {}
|
|
||||||
}
|
|
||||||
|
|
1
migrations/6_bus_index_column_down.sql
Normal file
1
migrations/6_bus_index_column_down.sql
Normal file
|
@ -0,0 +1 @@
|
||||||
|
ALTER TABLE "bus_events" DROP COLUMN idx;
|
1
migrations/6_bus_index_column_up.sql
Normal file
1
migrations/6_bus_index_column_up.sql
Normal file
|
@ -0,0 +1 @@
|
||||||
|
ALTER TABLE "bus_events" ADD COLUMN idx GENERATED ALWAYS AS (json_extract(data, '$.idx')) VIRTUAL;
|
Loading…
Reference in a new issue