wip: threaded import, doesn't work

This commit is contained in:
saji 2024-02-13 10:42:28 -06:00
parent 938b5b7000
commit 6c5162a8be
2 changed files with 32 additions and 16 deletions

View file

@ -7,6 +7,8 @@ import (
"io" "io"
"os" "os"
"strings" "strings"
"sync"
"sync/atomic"
"github.com/kschamplin/gotelem/internal/db" "github.com/kschamplin/gotelem/internal/db"
"github.com/kschamplin/gotelem/internal/logparsers" "github.com/kschamplin/gotelem/internal/logparsers"
@ -89,7 +91,17 @@ func importAction(ctx *cli.Context) error {
batchIdx := 0 batchIdx := 0
// stats for imports // stats for imports
var n_packets int64 = 0 var n_pkt atomic.Int64
delegateInsert := func(events []skylab.BusEvent) {
n, err := db.AddEventsCtx(ctx.Context, events...)
if err != nil {
fmt.Printf("%v", err)
}
n_pkt.Add(n)
}
var wg sync.WaitGroup
n_unknown := 0 n_unknown := 0
n_error := 0 n_error := 0
for { for {
@ -109,7 +121,7 @@ func importAction(ctx *cli.Context) error {
continue continue
} else if err != nil { } else if err != nil {
// TODO: we should consider absorbing all errors. // TODO: we should consider absorbing all errors.
fmt.Printf("got an error processing %s: %v\n", line, err) fmt.Printf("got an error processing '%s': %v\n", strings.TrimSpace(line), err)
n_error++ n_error++
continue continue
} }
@ -117,24 +129,29 @@ func importAction(ctx *cli.Context) error {
batchIdx++ batchIdx++
if batchIdx >= int(bSize) { if batchIdx >= int(bSize) {
// flush it!!!! // flush it!!!!
n, err := db.AddEventsCtx(ctx.Context, eventsBatch...) wg.Add(1)
if err != nil { e := make([]skylab.BusEvent, 0, bSize)
fmt.Printf("error adding to database %v\n", err) copy(e, eventsBatch)
} go func(e []skylab.BusEvent) {
n_packets += n delegateInsert(e)
wg.Done()
}(e)
batchIdx = 0 // reset the batch batchIdx = 0 // reset the batch
} }
} }
// check if we have remaining packets and flush them // check if we have remaining packets and flush them
if batchIdx > 0 { if batchIdx > 0 {
n, err := db.AddEventsCtx(ctx.Context, eventsBatch[:batchIdx]...) // note the slice here! wg.Add(1)
if err != nil {
fmt.Printf("error adding to database %v\n", err) go func() {
} // n, err := db.AddEventsCtx(ctx.Context, eventsBatch[:batchIdx]...) // note the slice here!
n_packets += n delegateInsert(eventsBatch[:batchIdx])
wg.Done()
}()
} }
fmt.Printf("import status: %d successful, %d unknown, %d errors\n", n_packets, n_unknown, n_error) wg.Wait()
fmt.Printf("import status: %d successful, %d unknown, %d errors\n", n_pkt.Load(), n_unknown, n_error)
return nil return nil
} }

View file

@ -72,8 +72,7 @@ func (tdb *TelemDb) SetVersion(version int) error {
// sql expression to insert a bus event into the packets database.1 // sql expression to insert a bus event into the packets database.1
const sqlInsertEvent = ` const sqlInsertEvent = `
INSERT INTO "bus_events" (ts, name, data) VALUES INSERT INTO "bus_events" (ts, name, data) VALUES `
`
// AddEvent adds the bus event to the database. // AddEvent adds the bus event to the database.
func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...skylab.BusEvent) (n int64, err error) { func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...skylab.BusEvent) (n int64, err error) {
@ -150,7 +149,7 @@ func (tdb *TelemDb) AddEventStreamCtx(ctx context.Context, events <-chan skylab.
bulkStmt, err := makePreparedStmt(ctx, tx, BatchSize) bulkStmt, err := makePreparedStmt(ctx, tx, BatchSize)
// this is the list of values that we use. // this is the list of values that we use.
valBatch := make([]interface{}, 0, BatchSize * 3) valBatch := make([]interface{}, 0, BatchSize*3)
batchIdx := 0 batchIdx := 0
for { for {
e, more := <-events e, more := <-events