diff --git a/cmd/gotelem/cli/client.go b/cmd/gotelem/cli/client.go index 7d97ef6..a32222d 100644 --- a/cmd/gotelem/cli/client.go +++ b/cmd/gotelem/cli/client.go @@ -52,7 +52,7 @@ var importCmd = &cli.Command{ &cli.UintFlag{ Name: "batch-size", Usage: "the maximum size of each SQL transaction", - Value: 50, + Value: 800, }, }, Action: importAction, @@ -84,12 +84,12 @@ func importAction(ctx *cli.Context) error { // we should batch data, avoiding individual transactions to the database. bSize := ctx.Uint("batch-size") - eventsBatch := make([]*skylab.BusEvent, bSize) + eventsBatch := make([]skylab.BusEvent, bSize) batchIdx := 0 // stats for imports - n_packets := 0 + var n_packets int64 = 0 n_unknown := 0 n_error := 0 for { @@ -109,29 +109,30 @@ func importAction(ctx *cli.Context) error { continue } else if err != nil { // TODO: we should consider absorbing all errors. - fmt.Printf("got an error %v\n", err) + fmt.Printf("got an error processing %s: %v\n", line, err) n_error++ continue } - n_packets++ eventsBatch[batchIdx] = f batchIdx++ if batchIdx >= int(bSize) { // flush it!!!! - err = db.AddEventsCtx(ctx.Context, eventsBatch...) + n, err := db.AddEventsCtx(ctx.Context, eventsBatch...) if err != nil { fmt.Printf("error adding to database %v\n", err) } + n_packets += n batchIdx = 0 // reset the batch } } // check if we have remaining packets and flush them if batchIdx > 0 { - err = db.AddEventsCtx(ctx.Context, eventsBatch[:batchIdx]...) // note the slice here! + n, err := db.AddEventsCtx(ctx.Context, eventsBatch[:batchIdx]...) // note the slice here! if err != nil { fmt.Printf("error adding to database %v\n", err) } + n_packets += n } fmt.Printf("import status: %d successful, %d unknown, %d errors\n", n_packets, n_unknown, n_error) diff --git a/cmd/gotelem/cli/root.go b/cmd/gotelem/cli/root.go index 973e74a..a6cea41 100644 --- a/cmd/gotelem/cli/root.go +++ b/cmd/gotelem/cli/root.go @@ -1,8 +1,11 @@ package cli import ( + "context" "log" "os" + "os/signal" + "runtime/pprof" "github.com/urfave/cli/v2" ) @@ -12,15 +15,42 @@ var subCmds = []*cli.Command{ xbeeCmd, } +var f os.File func Execute() { app := &cli.App{ Name: "gotelem", Usage: "see everything", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "profile", + Usage: "enable profiling", + }, + }, + Before: func(ctx *cli.Context) error { + if ctx.Bool("profile") { + f, err := os.Create("cpuprofile") + if err != nil { + return err + } + pprof.StartCPUProfile(f) + } + return nil + }, + After: func(ctx *cli.Context) error { + if ctx.Bool("profile") { + pprof.StopCPUProfile() + } + return nil + }, Commands: subCmds, } - if err := app.Run(os.Args); err != nil { + // setup context for cancellation. + ctx := context.Background() + ctx, _ = signal.NotifyContext(ctx, os.Interrupt) + + if err := app.RunContext(ctx, os.Args); err != nil { log.Fatal(err) } } diff --git a/cmd/gotelem/cli/server.go b/cmd/gotelem/cli/server.go index 3437c30..d06283e 100644 --- a/cmd/gotelem/cli/server.go +++ b/cmd/gotelem/cli/server.go @@ -353,7 +353,7 @@ func (d *dbLoggingService) Start(cCtx *cli.Context, deps svcDeps) (err error) { for { select { case msg := <-rxCh: - tdb.AddEventsCtx(cCtx.Context, &msg) + tdb.AddEventsCtx(cCtx.Context, msg) case <-cCtx.Done(): return } diff --git a/http.go b/http.go index a762296..20a75fc 100644 --- a/http.go +++ b/http.go @@ -67,7 +67,7 @@ func apiV1(broker *Broker, db *db.TelemDb) chi.Router { r.Route("/packets", func(r chi.Router) { r.Get("/subscribe", apiV1PacketSubscribe(broker, db)) r.Post("/", func(w http.ResponseWriter, r *http.Request) { - var pkgs []*skylab.BusEvent + var pkgs []skylab.BusEvent decoder := json.NewDecoder(r.Body) if err := decoder.Decode(&pkgs); err != nil { w.WriteHeader(http.StatusTeapot) diff --git a/internal/db/db.go b/internal/db/db.go index 7f98916..6867b87 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -17,7 +17,7 @@ import ( func init() { sql.Register("custom_sqlite3", &sqlite3.SQLiteDriver{ - // TODO: add functions that convert between unix milliseconds and ISO 8601 + // TODO: add helper that convert between unix milliseconds and sqlite times? }) } @@ -25,6 +25,7 @@ type TelemDb struct { db *sqlx.DB } +// TelemDbOption lets you customize the behavior of the sqlite database type TelemDbOption func(*TelemDb) error func OpenTelemDb(path string, options ...TelemDbOption) (tdb *TelemDb, err error) { @@ -75,11 +76,11 @@ INSERT INTO "bus_events" (ts, name, data) VALUES ` // AddEvent adds the bus event to the database. -func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...*skylab.BusEvent) (err error) { +func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...skylab.BusEvent) (n int64, err error) { // + n = 0 tx, err := tdb.db.BeginTx(ctx, nil) if err != nil { - tx.Rollback() return } @@ -87,41 +88,101 @@ func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...*skylab.BusEvent const rowSql = "(?, ?, json(?))" inserts := make([]string, len(events)) vals := []interface{}{} - for idx, b := range events { + idx := 0 // we have to manually increment, because sometimes we don't insert. + for _, b := range events { inserts[idx] = rowSql var j []byte j, err = json.Marshal(b.Data) if err != nil { - tx.Rollback() - return + // we had some error turning the packet into json. + continue // we silently skip. } + vals = append(vals, b.Timestamp.UnixMilli(), b.Data.String(), j) + idx++ } // construct the full statement now - sqlStmt = sqlStmt + strings.Join(inserts, ",") + sqlStmt = sqlStmt + strings.Join(inserts[:idx], ",") stmt, err := tx.PrepareContext(ctx, sqlStmt) if err != nil { tx.Rollback() return } - //TODO: log the number of rows modified/inserted - _, err = stmt.ExecContext(ctx, vals...) + res, err := stmt.ExecContext(ctx, vals...) if err != nil { tx.Rollback() return } + n, err = res.RowsAffected() tx.Commit() return } -func (tdb *TelemDb) AddEvents(events ...*skylab.BusEvent) (err error) { +func (tdb *TelemDb) AddEvents(events ...skylab.BusEvent) (int64, error) { return tdb.AddEventsCtx(context.Background(), events...) } +// Streaming logger. +func (tdb *TelemDb) AddEventStreamCtx(ctx context.Context, events <-chan skylab.BusEvent, done chan<- bool) error { + const BatchSize = 500 + + tx, err := tdb.db.BeginTx(ctx, nil) + defer tx.Commit() + if err != nil { + return err + } + + makePreparedStmt := func(ctx context.Context, tx *sql.Tx, n int) (*sql.Stmt, error) { + sqlStmt := sqlInsertEvent + const rowSql = "(?, ?, json(?))" + inserts := make([]string, n) + for i := 0; i < n; i++ { + inserts[n] = rowSql + } + sqlStmt = sqlStmt + strings.Join(inserts, ",") + return tx.PrepareContext(ctx, sqlStmt) + } + + bulkStmt, err := makePreparedStmt(ctx, tx, BatchSize) + + // this is the list of values that we use. + valBatch := make([]interface{}, 0, BatchSize * 3) + batchIdx := 0 + for { + e, more := <-events + if more { + j, err := json.Marshal(e.Data) + if err != nil { + continue // skip things that couldn't be marshalled + } + valBatch = append(valBatch, e.Timestamp.UnixMilli(), e.Data.String(), j) + batchIdx++ + if batchIdx >= BatchSize { + _, err := bulkStmt.ExecContext(ctx, valBatch...) + if err != nil { + continue + } + } + } else { + break + } + + } + // create a statement for the remaining items. + lastStmt, err := makePreparedStmt(ctx, tx, batchIdx) + if err != nil { + return nil + } + _, err = lastStmt.ExecContext(ctx, valBatch...) + done <- true + + return nil +} + /// Query fragment guide: /// We need to be able to easily construct safe(!) and meaningful queries programatically /// so we make some new types that can be turned into SQL fragments that go inside the where clause. diff --git a/internal/logparsers/parsers.go b/internal/logparsers/parsers.go index e4b47cc..17515b9 100644 --- a/internal/logparsers/parsers.go +++ b/internal/logparsers/parsers.go @@ -37,9 +37,10 @@ func NewFormatError(msg string, err error) error { // A Parser takes a string containing one line of a particular log file // and returns an associated skylab.BusEvent representing the packet. // if no packet is found, an error is returned instead. -type ParserFunc func(string) (*skylab.BusEvent, error) +type ParserFunc func(string) (skylab.BusEvent, error) -func parseCanDumpLine(dumpLine string) (b *skylab.BusEvent, err error) { +func parseCanDumpLine(dumpLine string) (b skylab.BusEvent, err error) { + b = skylab.BusEvent{} // dumpline looks like this: // (1684538768.521889) can0 200#8D643546 // remove trailing newline @@ -74,9 +75,7 @@ func parseCanDumpLine(dumpLine string) (b *skylab.BusEvent, err error) { Kind: can.CanDataFrame, } - b = &skylab.BusEvent{ - Timestamp: time.Unix(unixSeconds, unixMicros), - } + b.Timestamp = time.Unix(unixSeconds, unixMicros) b.Data, err = skylab.FromCanFrame(frame) @@ -91,7 +90,8 @@ func parseCanDumpLine(dumpLine string) (b *skylab.BusEvent, err error) { return } -func parseTelemLogLine(line string) (b *skylab.BusEvent, err error) { +func parseTelemLogLine(line string) (b skylab.BusEvent, err error) { + b = skylab.BusEvent{} // strip trailng newline since we rely on it being gone line = strings.TrimSpace(line) // data is of the form @@ -149,9 +149,8 @@ func parseTelemLogLine(line string) (b *skylab.BusEvent, err error) { Kind: can.CanDataFrame, } - b = &skylab.BusEvent{ - Timestamp: ts, - } + b.Timestamp = ts + b.Data, err = skylab.FromCanFrame(frame) if err != nil { err = NewFormatError("failed to parse can frame", err)