revert busevent pointer change

This commit is contained in:
saji 2024-02-13 10:03:39 -06:00
parent 0c8a25a2f4
commit 3ae157a3de
6 changed files with 120 additions and 29 deletions

View file

@ -52,7 +52,7 @@ var importCmd = &cli.Command{
&cli.UintFlag{ &cli.UintFlag{
Name: "batch-size", Name: "batch-size",
Usage: "the maximum size of each SQL transaction", Usage: "the maximum size of each SQL transaction",
Value: 50, Value: 800,
}, },
}, },
Action: importAction, Action: importAction,
@ -84,12 +84,12 @@ func importAction(ctx *cli.Context) error {
// we should batch data, avoiding individual transactions to the database. // we should batch data, avoiding individual transactions to the database.
bSize := ctx.Uint("batch-size") bSize := ctx.Uint("batch-size")
eventsBatch := make([]*skylab.BusEvent, bSize) eventsBatch := make([]skylab.BusEvent, bSize)
batchIdx := 0 batchIdx := 0
// stats for imports // stats for imports
n_packets := 0 var n_packets int64 = 0
n_unknown := 0 n_unknown := 0
n_error := 0 n_error := 0
for { for {
@ -109,29 +109,30 @@ func importAction(ctx *cli.Context) error {
continue continue
} else if err != nil { } else if err != nil {
// TODO: we should consider absorbing all errors. // TODO: we should consider absorbing all errors.
fmt.Printf("got an error %v\n", err) fmt.Printf("got an error processing %s: %v\n", line, err)
n_error++ n_error++
continue continue
} }
n_packets++
eventsBatch[batchIdx] = f eventsBatch[batchIdx] = f
batchIdx++ batchIdx++
if batchIdx >= int(bSize) { if batchIdx >= int(bSize) {
// flush it!!!! // flush it!!!!
err = db.AddEventsCtx(ctx.Context, eventsBatch...) n, err := db.AddEventsCtx(ctx.Context, eventsBatch...)
if err != nil { if err != nil {
fmt.Printf("error adding to database %v\n", err) fmt.Printf("error adding to database %v\n", err)
} }
n_packets += n
batchIdx = 0 // reset the batch batchIdx = 0 // reset the batch
} }
} }
// check if we have remaining packets and flush them // check if we have remaining packets and flush them
if batchIdx > 0 { if batchIdx > 0 {
err = db.AddEventsCtx(ctx.Context, eventsBatch[:batchIdx]...) // note the slice here! n, err := db.AddEventsCtx(ctx.Context, eventsBatch[:batchIdx]...) // note the slice here!
if err != nil { if err != nil {
fmt.Printf("error adding to database %v\n", err) fmt.Printf("error adding to database %v\n", err)
} }
n_packets += n
} }
fmt.Printf("import status: %d successful, %d unknown, %d errors\n", n_packets, n_unknown, n_error) fmt.Printf("import status: %d successful, %d unknown, %d errors\n", n_packets, n_unknown, n_error)

View file

@ -1,8 +1,11 @@
package cli package cli
import ( import (
"context"
"log" "log"
"os" "os"
"os/signal"
"runtime/pprof"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
) )
@ -12,15 +15,42 @@ var subCmds = []*cli.Command{
xbeeCmd, xbeeCmd,
} }
var f os.File
func Execute() { func Execute() {
app := &cli.App{ app := &cli.App{
Name: "gotelem", Name: "gotelem",
Usage: "see everything", Usage: "see everything",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "profile",
Usage: "enable profiling",
},
},
Before: func(ctx *cli.Context) error {
if ctx.Bool("profile") {
f, err := os.Create("cpuprofile")
if err != nil {
return err
}
pprof.StartCPUProfile(f)
}
return nil
},
After: func(ctx *cli.Context) error {
if ctx.Bool("profile") {
pprof.StopCPUProfile()
}
return nil
},
Commands: subCmds, Commands: subCmds,
} }
if err := app.Run(os.Args); err != nil { // setup context for cancellation.
ctx := context.Background()
ctx, _ = signal.NotifyContext(ctx, os.Interrupt)
if err := app.RunContext(ctx, os.Args); err != nil {
log.Fatal(err) log.Fatal(err)
} }
} }

View file

@ -353,7 +353,7 @@ func (d *dbLoggingService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
for { for {
select { select {
case msg := <-rxCh: case msg := <-rxCh:
tdb.AddEventsCtx(cCtx.Context, &msg) tdb.AddEventsCtx(cCtx.Context, msg)
case <-cCtx.Done(): case <-cCtx.Done():
return return
} }

View file

@ -67,7 +67,7 @@ func apiV1(broker *Broker, db *db.TelemDb) chi.Router {
r.Route("/packets", func(r chi.Router) { r.Route("/packets", func(r chi.Router) {
r.Get("/subscribe", apiV1PacketSubscribe(broker, db)) r.Get("/subscribe", apiV1PacketSubscribe(broker, db))
r.Post("/", func(w http.ResponseWriter, r *http.Request) { r.Post("/", func(w http.ResponseWriter, r *http.Request) {
var pkgs []*skylab.BusEvent var pkgs []skylab.BusEvent
decoder := json.NewDecoder(r.Body) decoder := json.NewDecoder(r.Body)
if err := decoder.Decode(&pkgs); err != nil { if err := decoder.Decode(&pkgs); err != nil {
w.WriteHeader(http.StatusTeapot) w.WriteHeader(http.StatusTeapot)

View file

@ -17,7 +17,7 @@ import (
func init() { func init() {
sql.Register("custom_sqlite3", &sqlite3.SQLiteDriver{ sql.Register("custom_sqlite3", &sqlite3.SQLiteDriver{
// TODO: add functions that convert between unix milliseconds and ISO 8601 // TODO: add helper that convert between unix milliseconds and sqlite times?
}) })
} }
@ -25,6 +25,7 @@ type TelemDb struct {
db *sqlx.DB db *sqlx.DB
} }
// TelemDbOption lets you customize the behavior of the sqlite database
type TelemDbOption func(*TelemDb) error type TelemDbOption func(*TelemDb) error
func OpenTelemDb(path string, options ...TelemDbOption) (tdb *TelemDb, err error) { func OpenTelemDb(path string, options ...TelemDbOption) (tdb *TelemDb, err error) {
@ -75,11 +76,11 @@ INSERT INTO "bus_events" (ts, name, data) VALUES
` `
// AddEvent adds the bus event to the database. // AddEvent adds the bus event to the database.
func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...*skylab.BusEvent) (err error) { func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...skylab.BusEvent) (n int64, err error) {
// //
n = 0
tx, err := tdb.db.BeginTx(ctx, nil) tx, err := tdb.db.BeginTx(ctx, nil)
if err != nil { if err != nil {
tx.Rollback()
return return
} }
@ -87,41 +88,101 @@ func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...*skylab.BusEvent
const rowSql = "(?, ?, json(?))" const rowSql = "(?, ?, json(?))"
inserts := make([]string, len(events)) inserts := make([]string, len(events))
vals := []interface{}{} vals := []interface{}{}
for idx, b := range events { idx := 0 // we have to manually increment, because sometimes we don't insert.
for _, b := range events {
inserts[idx] = rowSql inserts[idx] = rowSql
var j []byte var j []byte
j, err = json.Marshal(b.Data) j, err = json.Marshal(b.Data)
if err != nil { if err != nil {
tx.Rollback() // we had some error turning the packet into json.
return continue // we silently skip.
} }
vals = append(vals, b.Timestamp.UnixMilli(), b.Data.String(), j) vals = append(vals, b.Timestamp.UnixMilli(), b.Data.String(), j)
idx++
} }
// construct the full statement now // construct the full statement now
sqlStmt = sqlStmt + strings.Join(inserts, ",") sqlStmt = sqlStmt + strings.Join(inserts[:idx], ",")
stmt, err := tx.PrepareContext(ctx, sqlStmt) stmt, err := tx.PrepareContext(ctx, sqlStmt)
if err != nil { if err != nil {
tx.Rollback() tx.Rollback()
return return
} }
//TODO: log the number of rows modified/inserted res, err := stmt.ExecContext(ctx, vals...)
_, err = stmt.ExecContext(ctx, vals...)
if err != nil { if err != nil {
tx.Rollback() tx.Rollback()
return return
} }
n, err = res.RowsAffected()
tx.Commit() tx.Commit()
return return
} }
func (tdb *TelemDb) AddEvents(events ...*skylab.BusEvent) (err error) { func (tdb *TelemDb) AddEvents(events ...skylab.BusEvent) (int64, error) {
return tdb.AddEventsCtx(context.Background(), events...) return tdb.AddEventsCtx(context.Background(), events...)
} }
// Streaming logger.
func (tdb *TelemDb) AddEventStreamCtx(ctx context.Context, events <-chan skylab.BusEvent, done chan<- bool) error {
const BatchSize = 500
tx, err := tdb.db.BeginTx(ctx, nil)
defer tx.Commit()
if err != nil {
return err
}
makePreparedStmt := func(ctx context.Context, tx *sql.Tx, n int) (*sql.Stmt, error) {
sqlStmt := sqlInsertEvent
const rowSql = "(?, ?, json(?))"
inserts := make([]string, n)
for i := 0; i < n; i++ {
inserts[n] = rowSql
}
sqlStmt = sqlStmt + strings.Join(inserts, ",")
return tx.PrepareContext(ctx, sqlStmt)
}
bulkStmt, err := makePreparedStmt(ctx, tx, BatchSize)
// this is the list of values that we use.
valBatch := make([]interface{}, 0, BatchSize * 3)
batchIdx := 0
for {
e, more := <-events
if more {
j, err := json.Marshal(e.Data)
if err != nil {
continue // skip things that couldn't be marshalled
}
valBatch = append(valBatch, e.Timestamp.UnixMilli(), e.Data.String(), j)
batchIdx++
if batchIdx >= BatchSize {
_, err := bulkStmt.ExecContext(ctx, valBatch...)
if err != nil {
continue
}
}
} else {
break
}
}
// create a statement for the remaining items.
lastStmt, err := makePreparedStmt(ctx, tx, batchIdx)
if err != nil {
return nil
}
_, err = lastStmt.ExecContext(ctx, valBatch...)
done <- true
return nil
}
/// Query fragment guide: /// Query fragment guide:
/// We need to be able to easily construct safe(!) and meaningful queries programatically /// We need to be able to easily construct safe(!) and meaningful queries programatically
/// so we make some new types that can be turned into SQL fragments that go inside the where clause. /// so we make some new types that can be turned into SQL fragments that go inside the where clause.

View file

@ -37,9 +37,10 @@ func NewFormatError(msg string, err error) error {
// A Parser takes a string containing one line of a particular log file // A Parser takes a string containing one line of a particular log file
// and returns an associated skylab.BusEvent representing the packet. // and returns an associated skylab.BusEvent representing the packet.
// if no packet is found, an error is returned instead. // if no packet is found, an error is returned instead.
type ParserFunc func(string) (*skylab.BusEvent, error) type ParserFunc func(string) (skylab.BusEvent, error)
func parseCanDumpLine(dumpLine string) (b *skylab.BusEvent, err error) { func parseCanDumpLine(dumpLine string) (b skylab.BusEvent, err error) {
b = skylab.BusEvent{}
// dumpline looks like this: // dumpline looks like this:
// (1684538768.521889) can0 200#8D643546 // (1684538768.521889) can0 200#8D643546
// remove trailing newline // remove trailing newline
@ -74,9 +75,7 @@ func parseCanDumpLine(dumpLine string) (b *skylab.BusEvent, err error) {
Kind: can.CanDataFrame, Kind: can.CanDataFrame,
} }
b = &skylab.BusEvent{ b.Timestamp = time.Unix(unixSeconds, unixMicros)
Timestamp: time.Unix(unixSeconds, unixMicros),
}
b.Data, err = skylab.FromCanFrame(frame) b.Data, err = skylab.FromCanFrame(frame)
@ -91,7 +90,8 @@ func parseCanDumpLine(dumpLine string) (b *skylab.BusEvent, err error) {
return return
} }
func parseTelemLogLine(line string) (b *skylab.BusEvent, err error) { func parseTelemLogLine(line string) (b skylab.BusEvent, err error) {
b = skylab.BusEvent{}
// strip trailng newline since we rely on it being gone // strip trailng newline since we rely on it being gone
line = strings.TrimSpace(line) line = strings.TrimSpace(line)
// data is of the form // data is of the form
@ -149,9 +149,8 @@ func parseTelemLogLine(line string) (b *skylab.BusEvent, err error) {
Kind: can.CanDataFrame, Kind: can.CanDataFrame,
} }
b = &skylab.BusEvent{ b.Timestamp = ts
Timestamp: ts,
}
b.Data, err = skylab.FromCanFrame(frame) b.Data, err = skylab.FromCanFrame(frame)
if err != nil { if err != nil {
err = NewFormatError("failed to parse can frame", err) err = NewFormatError("failed to parse can frame", err)