diff --git a/cmd/gotelem/cli/client.go b/cmd/gotelem/cli/client.go index bf8fa23..e55f0c8 100644 --- a/cmd/gotelem/cli/client.go +++ b/cmd/gotelem/cli/client.go @@ -7,13 +7,13 @@ import ( "io" "os" "strings" - "sync" "sync/atomic" "github.com/kschamplin/gotelem/internal/db" "github.com/kschamplin/gotelem/internal/logparsers" "github.com/kschamplin/gotelem/skylab" "github.com/urfave/cli/v2" + "golang.org/x/sync/errgroup" ) var parsersString string @@ -49,7 +49,7 @@ var importCmd = &cli.Command{ Name: "database", Aliases: []string{"d", "db"}, Usage: "the path of the database", - Value: "./gotelem.db", + Value: "gotelem.db", }, &cli.UintFlag{ Name: "batch-size", @@ -99,11 +99,16 @@ func importAction(ctx *cli.Context) error { n, err := db.AddEventsCtx(ctx.Context, events...) if err != nil { fmt.Printf("%v", err) + return } n_pkt.Add(n) } - var wg sync.WaitGroup + // we use this errorgroup to limit the number of + // running goroutines to a normal value. This way + // we don't thrash the system, + eg := new(errgroup.Group) + eg.SetLimit(5) var linenum int64 = 0 n_unknown := 0 n_error := 0 @@ -130,31 +135,27 @@ func importAction(ctx *cli.Context) error { linenum++ batchIdx++ if batchIdx >= int(bSize) { - // flush it!!!! - wg.Add(1) e := make([]skylab.BusEvent, bSize) copy(e, eventsBatch) - go func(e []skylab.BusEvent) { + eg.Go(func() error { delegateInsert(e) - wg.Done() - }(e) + return nil + }) batchIdx = 0 // reset the batch } } // check if we have remaining packets and flush them if batchIdx > 0 { - wg.Add(1) - - go func() { + eg.Go(func() error { // since we don't do any modification // we can avoid the copy delegateInsert(eventsBatch[:batchIdx]) - wg.Done() - }() + return nil + }) } // wait for any goroutines. - wg.Wait() + eg.Wait() fmt.Printf("import status: %d successful, %d unknown, %d errors\n", n_pkt.Load(), n_unknown, n_error) return nil diff --git a/internal/db/db.go b/internal/db/db.go index 8eddb32..4266882 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -28,14 +28,19 @@ type TelemDb struct { // TelemDbOption lets you customize the behavior of the sqlite database type TelemDbOption func(*TelemDb) error +// this string is used to open the read-write db. +// the extra options improve performance significantly. +const rwDbPathFmt = "file:%s?_journal_mode=wal&mode=rwc&_txlock=immediate&_timeout=10000" + + func OpenTelemDb(path string, options ...TelemDbOption) (tdb *TelemDb, err error) { tdb = &TelemDb{} - tdb.db, err = sqlx.Connect("sqlite3", path) + + dbStr := fmt.Sprintf(rwDbPathFmt, path) + tdb.db, err = sqlx.Connect("sqlite3", dbStr) if err != nil { return } - // TODO: add options support. - for _, fn := range options { err = fn(tdb) if err != nil { @@ -43,13 +48,12 @@ func OpenTelemDb(path string, options ...TelemDbOption) (tdb *TelemDb, err error } } - var version int - err = tdb.db.Get(&version, "PRAGMA user_version") + // perform any database migrations + version, err := tdb.GetVersion() if err != nil { return } - - // get latest version of migrations - then run the SQL in order to perform them + // TODO: use logging instead of printf fmt.Printf("starting version %d\n", version) version, err = RunMigrations(tdb)