diff --git a/cmd/gotelem/cli/server.go b/cmd/gotelem/cli/server.go index 4c36d6a..965ffbc 100644 --- a/cmd/gotelem/cli/server.go +++ b/cmd/gotelem/cli/server.go @@ -66,7 +66,6 @@ type svcDeps struct { var serveThings = []service{ &xBeeService{}, // &canLoggerService{}, - &dbWriterService{}, &httpService{}, } @@ -145,6 +144,7 @@ func (x *xBeeService) Status() { func (x *xBeeService) Start(cCtx *cli.Context, deps svcDeps) (err error) { logger := deps.Logger broker := deps.Broker + tdb := deps.Db if cCtx.String("xbee") == "" { logger.Info("not using xbee") return @@ -172,8 +172,6 @@ func (x *xBeeService) Start(cCtx *cli.Context, deps svcDeps) (err error) { xbeeTxer := json.NewEncoder(x.session) xbeeRxer := json.NewDecoder(x.session) - // xbeePackets := make(chan skylab.BusEvent) - // background task to read json packets off of the xbee and send them to the go func() { for { var p skylab.BusEvent @@ -182,6 +180,7 @@ func (x *xBeeService) Start(cCtx *cli.Context, deps svcDeps) (err error) { logger.Error("failed to decode xbee packet") } broker.Publish("xbee", p) + tdb.AddEventsCtx(cCtx.Context, p) } }() for { @@ -191,7 +190,7 @@ func (x *xBeeService) Start(cCtx *cli.Context, deps svcDeps) (err error) { return case msg := <-rxCh: logger.Info("got msg", "msg", msg) - xbeeTxer.Encode(msg) + err := xbeeTxer.Encode(msg) if err != nil { logger.Warn("error writing to xbee", "err", err) } @@ -236,34 +235,3 @@ func (h *httpService) Start(cCtx *cli.Context, deps svcDeps) (err error) { } return } - -// dbWriterService listens to the CAN packet broker and saves packets to the database. -type dbWriterService struct { -} - -func (d *dbWriterService) Status() { - -} - -func (d *dbWriterService) String() string { - return "db logger" -} - -func (d *dbWriterService) Start(cCtx *cli.Context, deps svcDeps) (err error) { - - // put CAN packets from the broker into the database. - tdb := deps.Db - rxCh, err := deps.Broker.Subscribe("dbWriter") - defer deps.Broker.Unsubscribe("dbWriter") - - // TODO: add buffering + timeout/backpressure - - for { - select { - case msg := <-rxCh: - tdb.AddEventsCtx(cCtx.Context, msg) - case <-cCtx.Done(): - return - } - } -} diff --git a/cmd/gotelem/cli/socketcan.go b/cmd/gotelem/cli/socketcan.go index b99e2f8..fb5ce49 100644 --- a/cmd/gotelem/cli/socketcan.go +++ b/cmd/gotelem/cli/socketcan.go @@ -57,6 +57,7 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) { logger := deps.Logger broker := deps.Broker + tdb := deps.Db if !cCtx.IsSet("can") { logger.Debug("no can device provided, skip") @@ -113,12 +114,13 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) { logger.Warn("error parsing can packet", "id", msg.Id, "err", err) continue } - cde := skylab.BusEvent{ + event := skylab.BusEvent{ Timestamp: time.Now(), Name: p.String(), Data: p, } - broker.Publish("socketCAN", cde) + broker.Publish("socketCAN", event) + tdb.AddEventsCtx(cCtx.Context, event) case <-cCtx.Done(): // close the socket. s.sock.Close() diff --git a/db.go b/db.go index f774f43..3f9bf13 100644 --- a/db.go +++ b/db.go @@ -19,23 +19,15 @@ type TelemDb struct { db *sqlx.DB } -// TelemDbOption lets you customize the behavior of the sqlite database -type TelemDbOption func(*TelemDb) error // this function is internal use. It actually opens the database, but uses // a raw path string instead of formatting one like the exported functions. -func OpenRawDb(rawpath string, options ...TelemDbOption) (tdb *TelemDb, err error) { +func OpenRawDb(rawpath string) (tdb *TelemDb, err error) { tdb = &TelemDb{} tdb.db, err = sqlx.Connect("sqlite3", rawpath) if err != nil { return } - for _, fn := range options { - err = fn(tdb) - if err != nil { - return - } - } // perform any database migrations version, err := tdb.GetVersion() @@ -56,9 +48,9 @@ func OpenRawDb(rawpath string, options ...TelemDbOption) (tdb *TelemDb, err erro const ProductionDbURI = "file:%s?_journal_mode=wal&mode=rwc&_txlock=immediate&_timeout=10000" // OpenTelemDb opens a new telemetry database at the given path. -func OpenTelemDb(path string, options ...TelemDbOption) (*TelemDb, error) { +func OpenTelemDb(path string) (*TelemDb, error) { dbStr := fmt.Sprintf(ProductionDbURI, path) - return OpenRawDb(dbStr, options...) + return OpenRawDb(dbStr) } func (tdb *TelemDb) GetVersion() (int, error) { diff --git a/http.go b/http.go index 20aaef4..ec36a41 100644 --- a/http.go +++ b/http.go @@ -128,15 +128,19 @@ func apiV1(broker *Broker, tdb *TelemDb) chi.Router { }) r.Route("/packets", func(r chi.Router) { - r.Get("/subscribe", apiV1PacketSubscribe(broker, tdb)) + r.Get("/subscribe", apiV1PacketSubscribe(broker)) r.Post("/", func(w http.ResponseWriter, r *http.Request) { - var pkgs []skylab.BusEvent + var pkts []skylab.BusEvent decoder := json.NewDecoder(r.Body) - if err := decoder.Decode(&pkgs); err != nil { + if err := decoder.Decode(&pkts); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - tdb.AddEvents(pkgs...) + conn_id := r.RemoteAddr + uuid.NewString() + for _, pkt := range pkts { + broker.Publish(conn_id, pkt) + } + tdb.AddEventsCtx(r.Context(), pkts...) }) // general packet history get. r.Get("/", apiV1GetPackets(tdb)) @@ -159,7 +163,7 @@ func apiV1(broker *Broker, tdb *TelemDb) chi.Router { } // this is a websocket stream. -func apiV1PacketSubscribe(broker *Broker, db *TelemDb) http.HandlerFunc { +func apiV1PacketSubscribe(broker *Broker) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { // pull filter from url query params. bef, err := extractBusEventFilter(r) @@ -167,7 +171,7 @@ func apiV1PacketSubscribe(broker *Broker, db *TelemDb) http.HandlerFunc { http.Error(w, err.Error(), http.StatusInternalServerError) } // setup connection - conn_id := r.RemoteAddr + uuid.New().String() + conn_id := r.RemoteAddr + uuid.NewString() sub, err := broker.Subscribe(conn_id) if err != nil { w.WriteHeader(http.StatusInternalServerError)