From 2d9d32dbf442e5e5b7ad466b211130434969a1ed Mon Sep 17 00:00:00 2001 From: saji Date: Tue, 13 Feb 2024 13:41:32 -0600 Subject: [PATCH] remove streaming db insert --- internal/db/db.go | 60 +---------------------------------------------- 1 file changed, 1 insertion(+), 59 deletions(-) diff --git a/internal/db/db.go b/internal/db/db.go index 4ca105f..0532e03 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -71,8 +71,7 @@ func (tdb *TelemDb) SetVersion(version int) error { } // sql expression to insert a bus event into the packets database.1 -const sqlInsertEvent = ` -INSERT INTO "bus_events" (ts, name, data) VALUES ` +const sqlInsertEvent =`INSERT INTO "bus_events" (ts, name, data) VALUES ` // AddEvent adds the bus event to the database. func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...skylab.BusEvent) (n int64, err error) { @@ -125,63 +124,6 @@ func (tdb *TelemDb) AddEvents(events ...skylab.BusEvent) (int64, error) { return tdb.AddEventsCtx(context.Background(), events...) } -// Streaming logger. -func (tdb *TelemDb) AddEventStreamCtx(ctx context.Context, events <-chan skylab.BusEvent, done chan<- bool) error { - const BatchSize = 500 - - tx, err := tdb.db.BeginTx(ctx, nil) - defer tx.Commit() - if err != nil { - return err - } - - makePreparedStmt := func(ctx context.Context, tx *sql.Tx, n int) (*sql.Stmt, error) { - sqlStmt := sqlInsertEvent - const rowSql = "(?, ?, json(?))" - inserts := make([]string, n) - for i := 0; i < n; i++ { - inserts[n] = rowSql - } - sqlStmt = sqlStmt + strings.Join(inserts, ",") - return tx.PrepareContext(ctx, sqlStmt) - } - - bulkStmt, err := makePreparedStmt(ctx, tx, BatchSize) - - // this is the list of values that we use. - valBatch := make([]interface{}, 0, BatchSize*3) - batchIdx := 0 - for { - e, more := <-events - if more { - j, err := json.Marshal(e.Data) - if err != nil { - continue // skip things that couldn't be marshalled - } - valBatch = append(valBatch, e.Timestamp.UnixMilli(), e.Data.String(), j) - batchIdx++ - if batchIdx >= BatchSize { - _, err := bulkStmt.ExecContext(ctx, valBatch...) - if err != nil { - continue - } - } - } else { - break - } - - } - // create a statement for the remaining items. - lastStmt, err := makePreparedStmt(ctx, tx, batchIdx) - if err != nil { - return nil - } - _, err = lastStmt.ExecContext(ctx, valBatch...) - done <- true - - return nil -} - /// Query fragment guide: /// We need to be able to easily construct safe(!) and meaningful queries programatically /// so we make some new types that can be turned into SQL fragments that go inside the where clause.