remove broker-db listener, db options
This commit is contained in:
parent
d702395d5b
commit
e9d40ce466
|
@ -66,7 +66,6 @@ type svcDeps struct {
|
||||||
var serveThings = []service{
|
var serveThings = []service{
|
||||||
&xBeeService{},
|
&xBeeService{},
|
||||||
// &canLoggerService{},
|
// &canLoggerService{},
|
||||||
&dbWriterService{},
|
|
||||||
&httpService{},
|
&httpService{},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -145,6 +144,7 @@ func (x *xBeeService) Status() {
|
||||||
func (x *xBeeService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
func (x *xBeeService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
||||||
logger := deps.Logger
|
logger := deps.Logger
|
||||||
broker := deps.Broker
|
broker := deps.Broker
|
||||||
|
tdb := deps.Db
|
||||||
if cCtx.String("xbee") == "" {
|
if cCtx.String("xbee") == "" {
|
||||||
logger.Info("not using xbee")
|
logger.Info("not using xbee")
|
||||||
return
|
return
|
||||||
|
@ -172,8 +172,6 @@ func (x *xBeeService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
||||||
xbeeTxer := json.NewEncoder(x.session)
|
xbeeTxer := json.NewEncoder(x.session)
|
||||||
xbeeRxer := json.NewDecoder(x.session)
|
xbeeRxer := json.NewDecoder(x.session)
|
||||||
|
|
||||||
// xbeePackets := make(chan skylab.BusEvent)
|
|
||||||
// background task to read json packets off of the xbee and send them to the
|
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
var p skylab.BusEvent
|
var p skylab.BusEvent
|
||||||
|
@ -182,6 +180,7 @@ func (x *xBeeService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
||||||
logger.Error("failed to decode xbee packet")
|
logger.Error("failed to decode xbee packet")
|
||||||
}
|
}
|
||||||
broker.Publish("xbee", p)
|
broker.Publish("xbee", p)
|
||||||
|
tdb.AddEventsCtx(cCtx.Context, p)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
for {
|
for {
|
||||||
|
@ -191,7 +190,7 @@ func (x *xBeeService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
||||||
return
|
return
|
||||||
case msg := <-rxCh:
|
case msg := <-rxCh:
|
||||||
logger.Info("got msg", "msg", msg)
|
logger.Info("got msg", "msg", msg)
|
||||||
xbeeTxer.Encode(msg)
|
err := xbeeTxer.Encode(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Warn("error writing to xbee", "err", err)
|
logger.Warn("error writing to xbee", "err", err)
|
||||||
}
|
}
|
||||||
|
@ -236,34 +235,3 @@ func (h *httpService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// dbWriterService listens to the CAN packet broker and saves packets to the database.
|
|
||||||
type dbWriterService struct {
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *dbWriterService) Status() {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *dbWriterService) String() string {
|
|
||||||
return "db logger"
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *dbWriterService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
|
||||||
|
|
||||||
// put CAN packets from the broker into the database.
|
|
||||||
tdb := deps.Db
|
|
||||||
rxCh, err := deps.Broker.Subscribe("dbWriter")
|
|
||||||
defer deps.Broker.Unsubscribe("dbWriter")
|
|
||||||
|
|
||||||
// TODO: add buffering + timeout/backpressure
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case msg := <-rxCh:
|
|
||||||
tdb.AddEventsCtx(cCtx.Context, msg)
|
|
||||||
case <-cCtx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -57,6 +57,7 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
||||||
|
|
||||||
logger := deps.Logger
|
logger := deps.Logger
|
||||||
broker := deps.Broker
|
broker := deps.Broker
|
||||||
|
tdb := deps.Db
|
||||||
|
|
||||||
if !cCtx.IsSet("can") {
|
if !cCtx.IsSet("can") {
|
||||||
logger.Debug("no can device provided, skip")
|
logger.Debug("no can device provided, skip")
|
||||||
|
@ -113,12 +114,13 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
||||||
logger.Warn("error parsing can packet", "id", msg.Id, "err", err)
|
logger.Warn("error parsing can packet", "id", msg.Id, "err", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
cde := skylab.BusEvent{
|
event := skylab.BusEvent{
|
||||||
Timestamp: time.Now(),
|
Timestamp: time.Now(),
|
||||||
Name: p.String(),
|
Name: p.String(),
|
||||||
Data: p,
|
Data: p,
|
||||||
}
|
}
|
||||||
broker.Publish("socketCAN", cde)
|
broker.Publish("socketCAN", event)
|
||||||
|
tdb.AddEventsCtx(cCtx.Context, event)
|
||||||
case <-cCtx.Done():
|
case <-cCtx.Done():
|
||||||
// close the socket.
|
// close the socket.
|
||||||
s.sock.Close()
|
s.sock.Close()
|
||||||
|
|
14
db.go
14
db.go
|
@ -19,23 +19,15 @@ type TelemDb struct {
|
||||||
db *sqlx.DB
|
db *sqlx.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
// TelemDbOption lets you customize the behavior of the sqlite database
|
|
||||||
type TelemDbOption func(*TelemDb) error
|
|
||||||
|
|
||||||
// this function is internal use. It actually opens the database, but uses
|
// this function is internal use. It actually opens the database, but uses
|
||||||
// a raw path string instead of formatting one like the exported functions.
|
// a raw path string instead of formatting one like the exported functions.
|
||||||
func OpenRawDb(rawpath string, options ...TelemDbOption) (tdb *TelemDb, err error) {
|
func OpenRawDb(rawpath string) (tdb *TelemDb, err error) {
|
||||||
tdb = &TelemDb{}
|
tdb = &TelemDb{}
|
||||||
tdb.db, err = sqlx.Connect("sqlite3", rawpath)
|
tdb.db, err = sqlx.Connect("sqlite3", rawpath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, fn := range options {
|
|
||||||
err = fn(tdb)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// perform any database migrations
|
// perform any database migrations
|
||||||
version, err := tdb.GetVersion()
|
version, err := tdb.GetVersion()
|
||||||
|
@ -56,9 +48,9 @@ func OpenRawDb(rawpath string, options ...TelemDbOption) (tdb *TelemDb, err erro
|
||||||
const ProductionDbURI = "file:%s?_journal_mode=wal&mode=rwc&_txlock=immediate&_timeout=10000"
|
const ProductionDbURI = "file:%s?_journal_mode=wal&mode=rwc&_txlock=immediate&_timeout=10000"
|
||||||
|
|
||||||
// OpenTelemDb opens a new telemetry database at the given path.
|
// OpenTelemDb opens a new telemetry database at the given path.
|
||||||
func OpenTelemDb(path string, options ...TelemDbOption) (*TelemDb, error) {
|
func OpenTelemDb(path string) (*TelemDb, error) {
|
||||||
dbStr := fmt.Sprintf(ProductionDbURI, path)
|
dbStr := fmt.Sprintf(ProductionDbURI, path)
|
||||||
return OpenRawDb(dbStr, options...)
|
return OpenRawDb(dbStr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tdb *TelemDb) GetVersion() (int, error) {
|
func (tdb *TelemDb) GetVersion() (int, error) {
|
||||||
|
|
16
http.go
16
http.go
|
@ -128,15 +128,19 @@ func apiV1(broker *Broker, tdb *TelemDb) chi.Router {
|
||||||
})
|
})
|
||||||
|
|
||||||
r.Route("/packets", func(r chi.Router) {
|
r.Route("/packets", func(r chi.Router) {
|
||||||
r.Get("/subscribe", apiV1PacketSubscribe(broker, tdb))
|
r.Get("/subscribe", apiV1PacketSubscribe(broker))
|
||||||
r.Post("/", func(w http.ResponseWriter, r *http.Request) {
|
r.Post("/", func(w http.ResponseWriter, r *http.Request) {
|
||||||
var pkgs []skylab.BusEvent
|
var pkts []skylab.BusEvent
|
||||||
decoder := json.NewDecoder(r.Body)
|
decoder := json.NewDecoder(r.Body)
|
||||||
if err := decoder.Decode(&pkgs); err != nil {
|
if err := decoder.Decode(&pkts); err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
tdb.AddEvents(pkgs...)
|
conn_id := r.RemoteAddr + uuid.NewString()
|
||||||
|
for _, pkt := range pkts {
|
||||||
|
broker.Publish(conn_id, pkt)
|
||||||
|
}
|
||||||
|
tdb.AddEventsCtx(r.Context(), pkts...)
|
||||||
})
|
})
|
||||||
// general packet history get.
|
// general packet history get.
|
||||||
r.Get("/", apiV1GetPackets(tdb))
|
r.Get("/", apiV1GetPackets(tdb))
|
||||||
|
@ -159,7 +163,7 @@ func apiV1(broker *Broker, tdb *TelemDb) chi.Router {
|
||||||
}
|
}
|
||||||
|
|
||||||
// this is a websocket stream.
|
// this is a websocket stream.
|
||||||
func apiV1PacketSubscribe(broker *Broker, db *TelemDb) http.HandlerFunc {
|
func apiV1PacketSubscribe(broker *Broker) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
// pull filter from url query params.
|
// pull filter from url query params.
|
||||||
bef, err := extractBusEventFilter(r)
|
bef, err := extractBusEventFilter(r)
|
||||||
|
@ -167,7 +171,7 @@ func apiV1PacketSubscribe(broker *Broker, db *TelemDb) http.HandlerFunc {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
}
|
}
|
||||||
// setup connection
|
// setup connection
|
||||||
conn_id := r.RemoteAddr + uuid.New().String()
|
conn_id := r.RemoteAddr + uuid.NewString()
|
||||||
sub, err := broker.Subscribe(conn_id)
|
sub, err := broker.Subscribe(conn_id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
|
Loading…
Reference in a new issue