Performance optimizations for import

note that you need to limit the number of writing threads.
This commit is contained in:
saji 2024-02-14 10:31:41 -06:00
parent a4ca71d0ad
commit 245a654164
2 changed files with 26 additions and 21 deletions

View file

@ -7,13 +7,13 @@ import (
"io" "io"
"os" "os"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"github.com/kschamplin/gotelem/internal/db" "github.com/kschamplin/gotelem/internal/db"
"github.com/kschamplin/gotelem/internal/logparsers" "github.com/kschamplin/gotelem/internal/logparsers"
"github.com/kschamplin/gotelem/skylab" "github.com/kschamplin/gotelem/skylab"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"golang.org/x/sync/errgroup"
) )
var parsersString string var parsersString string
@ -49,7 +49,7 @@ var importCmd = &cli.Command{
Name: "database", Name: "database",
Aliases: []string{"d", "db"}, Aliases: []string{"d", "db"},
Usage: "the path of the database", Usage: "the path of the database",
Value: "./gotelem.db", Value: "gotelem.db",
}, },
&cli.UintFlag{ &cli.UintFlag{
Name: "batch-size", Name: "batch-size",
@ -99,11 +99,16 @@ func importAction(ctx *cli.Context) error {
n, err := db.AddEventsCtx(ctx.Context, events...) n, err := db.AddEventsCtx(ctx.Context, events...)
if err != nil { if err != nil {
fmt.Printf("%v", err) fmt.Printf("%v", err)
return
} }
n_pkt.Add(n) n_pkt.Add(n)
} }
var wg sync.WaitGroup // we use this errorgroup to limit the number of
// running goroutines to a normal value. This way
// we don't thrash the system,
eg := new(errgroup.Group)
eg.SetLimit(5)
var linenum int64 = 0 var linenum int64 = 0
n_unknown := 0 n_unknown := 0
n_error := 0 n_error := 0
@ -130,31 +135,27 @@ func importAction(ctx *cli.Context) error {
linenum++ linenum++
batchIdx++ batchIdx++
if batchIdx >= int(bSize) { if batchIdx >= int(bSize) {
// flush it!!!!
wg.Add(1)
e := make([]skylab.BusEvent, bSize) e := make([]skylab.BusEvent, bSize)
copy(e, eventsBatch) copy(e, eventsBatch)
go func(e []skylab.BusEvent) { eg.Go(func() error {
delegateInsert(e) delegateInsert(e)
wg.Done() return nil
}(e) })
batchIdx = 0 // reset the batch batchIdx = 0 // reset the batch
} }
} }
// check if we have remaining packets and flush them // check if we have remaining packets and flush them
if batchIdx > 0 { if batchIdx > 0 {
wg.Add(1) eg.Go(func() error {
go func() {
// since we don't do any modification // since we don't do any modification
// we can avoid the copy // we can avoid the copy
delegateInsert(eventsBatch[:batchIdx]) delegateInsert(eventsBatch[:batchIdx])
wg.Done() return nil
}() })
} }
// wait for any goroutines. // wait for any goroutines.
wg.Wait() eg.Wait()
fmt.Printf("import status: %d successful, %d unknown, %d errors\n", n_pkt.Load(), n_unknown, n_error) fmt.Printf("import status: %d successful, %d unknown, %d errors\n", n_pkt.Load(), n_unknown, n_error)
return nil return nil

View file

@ -28,14 +28,19 @@ type TelemDb struct {
// TelemDbOption lets you customize the behavior of the sqlite database // TelemDbOption lets you customize the behavior of the sqlite database
type TelemDbOption func(*TelemDb) error type TelemDbOption func(*TelemDb) error
// this string is used to open the read-write db.
// the extra options improve performance significantly.
const rwDbPathFmt = "file:%s?_journal_mode=wal&mode=rwc&_txlock=immediate&_timeout=10000"
func OpenTelemDb(path string, options ...TelemDbOption) (tdb *TelemDb, err error) { func OpenTelemDb(path string, options ...TelemDbOption) (tdb *TelemDb, err error) {
tdb = &TelemDb{} tdb = &TelemDb{}
tdb.db, err = sqlx.Connect("sqlite3", path)
dbStr := fmt.Sprintf(rwDbPathFmt, path)
tdb.db, err = sqlx.Connect("sqlite3", dbStr)
if err != nil { if err != nil {
return return
} }
// TODO: add options support.
for _, fn := range options { for _, fn := range options {
err = fn(tdb) err = fn(tdb)
if err != nil { if err != nil {
@ -43,13 +48,12 @@ func OpenTelemDb(path string, options ...TelemDbOption) (tdb *TelemDb, err error
} }
} }
var version int // perform any database migrations
err = tdb.db.Get(&version, "PRAGMA user_version") version, err := tdb.GetVersion()
if err != nil { if err != nil {
return return
} }
// TODO: use logging instead of printf
// get latest version of migrations - then run the SQL in order to perform them
fmt.Printf("starting version %d\n", version) fmt.Printf("starting version %d\n", version)
version, err = RunMigrations(tdb) version, err = RunMigrations(tdb)