remove streaming db insert
This commit is contained in:
parent
33c2f3f023
commit
2d9d32dbf4
|
@ -71,8 +71,7 @@ func (tdb *TelemDb) SetVersion(version int) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// sql expression to insert a bus event into the packets database.1
|
// sql expression to insert a bus event into the packets database.1
|
||||||
const sqlInsertEvent = `
|
const sqlInsertEvent =`INSERT INTO "bus_events" (ts, name, data) VALUES `
|
||||||
INSERT INTO "bus_events" (ts, name, data) VALUES `
|
|
||||||
|
|
||||||
// AddEvent adds the bus event to the database.
|
// AddEvent adds the bus event to the database.
|
||||||
func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...skylab.BusEvent) (n int64, err error) {
|
func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...skylab.BusEvent) (n int64, err error) {
|
||||||
|
@ -125,63 +124,6 @@ func (tdb *TelemDb) AddEvents(events ...skylab.BusEvent) (int64, error) {
|
||||||
return tdb.AddEventsCtx(context.Background(), events...)
|
return tdb.AddEventsCtx(context.Background(), events...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Streaming logger.
|
|
||||||
func (tdb *TelemDb) AddEventStreamCtx(ctx context.Context, events <-chan skylab.BusEvent, done chan<- bool) error {
|
|
||||||
const BatchSize = 500
|
|
||||||
|
|
||||||
tx, err := tdb.db.BeginTx(ctx, nil)
|
|
||||||
defer tx.Commit()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
makePreparedStmt := func(ctx context.Context, tx *sql.Tx, n int) (*sql.Stmt, error) {
|
|
||||||
sqlStmt := sqlInsertEvent
|
|
||||||
const rowSql = "(?, ?, json(?))"
|
|
||||||
inserts := make([]string, n)
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
inserts[n] = rowSql
|
|
||||||
}
|
|
||||||
sqlStmt = sqlStmt + strings.Join(inserts, ",")
|
|
||||||
return tx.PrepareContext(ctx, sqlStmt)
|
|
||||||
}
|
|
||||||
|
|
||||||
bulkStmt, err := makePreparedStmt(ctx, tx, BatchSize)
|
|
||||||
|
|
||||||
// this is the list of values that we use.
|
|
||||||
valBatch := make([]interface{}, 0, BatchSize*3)
|
|
||||||
batchIdx := 0
|
|
||||||
for {
|
|
||||||
e, more := <-events
|
|
||||||
if more {
|
|
||||||
j, err := json.Marshal(e.Data)
|
|
||||||
if err != nil {
|
|
||||||
continue // skip things that couldn't be marshalled
|
|
||||||
}
|
|
||||||
valBatch = append(valBatch, e.Timestamp.UnixMilli(), e.Data.String(), j)
|
|
||||||
batchIdx++
|
|
||||||
if batchIdx >= BatchSize {
|
|
||||||
_, err := bulkStmt.ExecContext(ctx, valBatch...)
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
// create a statement for the remaining items.
|
|
||||||
lastStmt, err := makePreparedStmt(ctx, tx, batchIdx)
|
|
||||||
if err != nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
_, err = lastStmt.ExecContext(ctx, valBatch...)
|
|
||||||
done <- true
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Query fragment guide:
|
/// Query fragment guide:
|
||||||
/// We need to be able to easily construct safe(!) and meaningful queries programatically
|
/// We need to be able to easily construct safe(!) and meaningful queries programatically
|
||||||
/// so we make some new types that can be turned into SQL fragments that go inside the where clause.
|
/// so we make some new types that can be turned into SQL fragments that go inside the where clause.
|
||||||
|
|
Loading…
Reference in a new issue