Compare commits
2 commits
cf112ef561
...
54b7427428
Author | SHA1 | Date | |
---|---|---|---|
54b7427428 | |||
e08ab050ef |
|
@ -52,7 +52,6 @@ var serveCmd = &cli.Command{
|
||||||
type service interface {
|
type service interface {
|
||||||
fmt.Stringer
|
fmt.Stringer
|
||||||
Start(cCtx *cli.Context, deps svcDeps) (err error)
|
Start(cCtx *cli.Context, deps svcDeps) (err error)
|
||||||
Status()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type svcDeps struct {
|
type svcDeps struct {
|
||||||
|
@ -111,17 +110,17 @@ func serve(cCtx *cli.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, svc := range serveThings {
|
for _, svc := range serveThings {
|
||||||
logger.Info("starting service", "svc", svc.String())
|
logger.Info("starting service", "service", svc.String())
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(mySvc service, baseLogger *slog.Logger) {
|
go func(mySvc service, baseLogger *slog.Logger) {
|
||||||
svcLogger := logger.With("svc", mySvc.String())
|
svcLogger := logger.With("service", mySvc.String())
|
||||||
s := deps
|
s := deps
|
||||||
s.Logger = svcLogger
|
s.Logger = svcLogger
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
// TODO: recover
|
// TODO: recover
|
||||||
err := mySvc.Start(cCtx, s)
|
err := mySvc.Start(cCtx, s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("service stopped!", "err", err, "svc", mySvc.String())
|
logger.Error("service stopped!", "err", err, "service", mySvc.String())
|
||||||
}
|
}
|
||||||
}(svc, logger)
|
}(svc, logger)
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,8 @@
|
||||||
package cli
|
package cli
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/kschamplin/gotelem/internal/can"
|
"github.com/kschamplin/gotelem/internal/can"
|
||||||
|
@ -57,7 +59,7 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
||||||
broker := deps.Broker
|
broker := deps.Broker
|
||||||
|
|
||||||
if !cCtx.IsSet("can") {
|
if !cCtx.IsSet("can") {
|
||||||
logger.Info("no can device provided")
|
logger.Debug("no can device provided, skip")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -82,6 +84,9 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
pkt, err := s.sock.Recv()
|
pkt, err := s.sock.Recv()
|
||||||
|
if errors.Is(err, io.EOF) {
|
||||||
|
return
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Warn("error receiving CAN packet", "err", err)
|
logger.Warn("error receiving CAN packet", "err", err)
|
||||||
}
|
}
|
||||||
|
@ -95,13 +100,17 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
||||||
case msg := <-rxCh:
|
case msg := <-rxCh:
|
||||||
|
|
||||||
frame, err = skylab.ToCanFrame(msg.Data)
|
frame, err = skylab.ToCanFrame(msg.Data)
|
||||||
|
if err != nil {
|
||||||
|
logger.Warn("error encoding can frame", "name", msg.Name, "err", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
s.sock.Send(&frame)
|
s.sock.Send(&frame)
|
||||||
|
|
||||||
case msg := <-rxCan:
|
case msg := <-rxCan:
|
||||||
p, err := skylab.FromCanFrame(msg)
|
p, err := skylab.FromCanFrame(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Warn("error parsing can packet", "id", msg.Id)
|
logger.Warn("error parsing can packet", "id", msg.Id, "err", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
cde := skylab.BusEvent{
|
cde := skylab.BusEvent{
|
||||||
|
@ -111,6 +120,8 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
||||||
}
|
}
|
||||||
broker.Publish("socketCAN", cde)
|
broker.Publish("socketCAN", cde)
|
||||||
case <-cCtx.Done():
|
case <-cCtx.Done():
|
||||||
|
// close the socket.
|
||||||
|
s.sock.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
46
http.go
46
http.go
|
@ -24,7 +24,10 @@ func extractBusEventFilter(r *http.Request) (*BusEventFilter, error) {
|
||||||
bef := &BusEventFilter{}
|
bef := &BusEventFilter{}
|
||||||
|
|
||||||
v := r.URL.Query()
|
v := r.URL.Query()
|
||||||
bef.Names = v["name"] // put all the names in.
|
if v.Has("name") {
|
||||||
|
bef.Names = v["name"]
|
||||||
|
}
|
||||||
|
|
||||||
if el := v.Get("start"); el != "" {
|
if el := v.Get("start"); el != "" {
|
||||||
// parse the start time query.
|
// parse the start time query.
|
||||||
t, err := time.Parse(time.RFC3339, el)
|
t, err := time.Parse(time.RFC3339, el)
|
||||||
|
@ -41,13 +44,16 @@ func extractBusEventFilter(r *http.Request) (*BusEventFilter, error) {
|
||||||
}
|
}
|
||||||
bef.EndTime = t
|
bef.EndTime = t
|
||||||
}
|
}
|
||||||
bef.Indexes = make([]int, 0)
|
if v.Has("idx") {
|
||||||
for _, strIdx := range v["idx"] {
|
|
||||||
idx, err := strconv.ParseInt(strIdx, 10, 32)
|
bef.Indexes = make([]int, 0)
|
||||||
if err != nil {
|
for _, strIdx := range v["idx"] {
|
||||||
return nil, err
|
idx, err := strconv.ParseInt(strIdx, 10, 32)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
bef.Indexes = append(bef.Indexes, int(idx))
|
||||||
}
|
}
|
||||||
bef.Indexes = append(bef.Indexes, int(idx))
|
|
||||||
}
|
}
|
||||||
return bef, nil
|
return bef, nil
|
||||||
}
|
}
|
||||||
|
@ -141,17 +147,7 @@ func apiV1(broker *Broker, tdb *TelemDb) chi.Router {
|
||||||
})
|
})
|
||||||
|
|
||||||
// OpenMCT domain object storage. Basically an arbitrary JSON document store
|
// OpenMCT domain object storage. Basically an arbitrary JSON document store
|
||||||
|
r.Route("/openmct", apiV1OpenMCTStore(tdb))
|
||||||
r.Route("/openmct", func(r chi.Router) {
|
|
||||||
// key is a column on our json store, it's nested under identifier.key
|
|
||||||
r.Get("/{key}", func(w http.ResponseWriter, r *http.Request) {})
|
|
||||||
r.Put("/{key}", func(w http.ResponseWriter, r *http.Request) {})
|
|
||||||
r.Delete("/{key}", func(w http.ResponseWriter, r *http.Request) {})
|
|
||||||
// create a new object.
|
|
||||||
r.Post("/", func(w http.ResponseWriter, r *http.Request) {})
|
|
||||||
// subscribe to object updates.
|
|
||||||
r.Get("/subscribe", func(w http.ResponseWriter, r *http.Request) {})
|
|
||||||
})
|
|
||||||
|
|
||||||
// records are driving segments/runs.
|
// records are driving segments/runs.
|
||||||
|
|
||||||
|
@ -186,7 +182,6 @@ func apiV1PacketSubscribe(broker *Broker, db *TelemDb) http.HandlerFunc {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.Ping(r.Context())
|
|
||||||
// closeread handles protocol/status messages,
|
// closeread handles protocol/status messages,
|
||||||
// also handles clients closing the connection.
|
// also handles clients closing the connection.
|
||||||
// we get a context to use from it.
|
// we get a context to use from it.
|
||||||
|
@ -291,3 +286,16 @@ func apiV1GetValues(db *TelemDb) http.HandlerFunc {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func apiV1OpenMCTStore(db *TelemDb) func(chi.Router) {
|
||||||
|
return func(r chi.Router) {
|
||||||
|
// key is a column on our json store, it's nested under identifier.key
|
||||||
|
r.Get("/{key}", func(w http.ResponseWriter, r *http.Request) {})
|
||||||
|
r.Put("/{key}", func(w http.ResponseWriter, r *http.Request) {})
|
||||||
|
r.Delete("/{key}", func(w http.ResponseWriter, r *http.Request) {})
|
||||||
|
// create a new object.
|
||||||
|
r.Post("/", func(w http.ResponseWriter, r *http.Request) {})
|
||||||
|
// subscribe to object updates.
|
||||||
|
r.Get("/subscribe", func(w http.ResponseWriter, r *http.Request) {})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue