diff --git a/cmd/gotelem/cli/client.go b/cmd/gotelem/cli/client.go index a32222d..03626fe 100644 --- a/cmd/gotelem/cli/client.go +++ b/cmd/gotelem/cli/client.go @@ -7,6 +7,8 @@ import ( "io" "os" "strings" + "sync" + "sync/atomic" "github.com/kschamplin/gotelem/internal/db" "github.com/kschamplin/gotelem/internal/logparsers" @@ -89,7 +91,17 @@ func importAction(ctx *cli.Context) error { batchIdx := 0 // stats for imports - var n_packets int64 = 0 + var n_pkt atomic.Int64 + + delegateInsert := func(events []skylab.BusEvent) { + n, err := db.AddEventsCtx(ctx.Context, events...) + if err != nil { + fmt.Printf("%v", err) + } + n_pkt.Add(n) + } + + var wg sync.WaitGroup n_unknown := 0 n_error := 0 for { @@ -109,7 +121,7 @@ func importAction(ctx *cli.Context) error { continue } else if err != nil { // TODO: we should consider absorbing all errors. - fmt.Printf("got an error processing %s: %v\n", line, err) + fmt.Printf("got an error processing '%s': %v\n", strings.TrimSpace(line), err) n_error++ continue } @@ -117,24 +129,29 @@ func importAction(ctx *cli.Context) error { batchIdx++ if batchIdx >= int(bSize) { // flush it!!!! - n, err := db.AddEventsCtx(ctx.Context, eventsBatch...) - if err != nil { - fmt.Printf("error adding to database %v\n", err) - } - n_packets += n + wg.Add(1) + e := make([]skylab.BusEvent, 0, bSize) + copy(e, eventsBatch) + go func(e []skylab.BusEvent) { + delegateInsert(e) + wg.Done() + }(e) batchIdx = 0 // reset the batch } } // check if we have remaining packets and flush them if batchIdx > 0 { - n, err := db.AddEventsCtx(ctx.Context, eventsBatch[:batchIdx]...) // note the slice here! - if err != nil { - fmt.Printf("error adding to database %v\n", err) - } - n_packets += n + wg.Add(1) + + go func() { + // n, err := db.AddEventsCtx(ctx.Context, eventsBatch[:batchIdx]...) // note the slice here! + delegateInsert(eventsBatch[:batchIdx]) + wg.Done() + }() } - fmt.Printf("import status: %d successful, %d unknown, %d errors\n", n_packets, n_unknown, n_error) + wg.Wait() + fmt.Printf("import status: %d successful, %d unknown, %d errors\n", n_pkt.Load(), n_unknown, n_error) return nil } diff --git a/internal/db/db.go b/internal/db/db.go index 6867b87..4ca105f 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -72,8 +72,7 @@ func (tdb *TelemDb) SetVersion(version int) error { // sql expression to insert a bus event into the packets database.1 const sqlInsertEvent = ` -INSERT INTO "bus_events" (ts, name, data) VALUES -` +INSERT INTO "bus_events" (ts, name, data) VALUES ` // AddEvent adds the bus event to the database. func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...skylab.BusEvent) (n int64, err error) { @@ -150,7 +149,7 @@ func (tdb *TelemDb) AddEventStreamCtx(ctx context.Context, events <-chan skylab. bulkStmt, err := makePreparedStmt(ctx, tx, BatchSize) // this is the list of values that we use. - valBatch := make([]interface{}, 0, BatchSize * 3) + valBatch := make([]interface{}, 0, BatchSize*3) batchIdx := 0 for { e, more := <-events