Compare commits
2 commits
cf112ef561
...
54b7427428
Author | SHA1 | Date | |
---|---|---|---|
54b7427428 | |||
e08ab050ef |
|
@ -52,7 +52,6 @@ var serveCmd = &cli.Command{
|
|||
type service interface {
|
||||
fmt.Stringer
|
||||
Start(cCtx *cli.Context, deps svcDeps) (err error)
|
||||
Status()
|
||||
}
|
||||
|
||||
type svcDeps struct {
|
||||
|
@ -111,17 +110,17 @@ func serve(cCtx *cli.Context) error {
|
|||
}
|
||||
|
||||
for _, svc := range serveThings {
|
||||
logger.Info("starting service", "svc", svc.String())
|
||||
logger.Info("starting service", "service", svc.String())
|
||||
wg.Add(1)
|
||||
go func(mySvc service, baseLogger *slog.Logger) {
|
||||
svcLogger := logger.With("svc", mySvc.String())
|
||||
svcLogger := logger.With("service", mySvc.String())
|
||||
s := deps
|
||||
s.Logger = svcLogger
|
||||
defer wg.Done()
|
||||
// TODO: recover
|
||||
err := mySvc.Start(cCtx, s)
|
||||
if err != nil {
|
||||
logger.Error("service stopped!", "err", err, "svc", mySvc.String())
|
||||
logger.Error("service stopped!", "err", err, "service", mySvc.String())
|
||||
}
|
||||
}(svc, logger)
|
||||
}
|
||||
|
|
|
@ -3,6 +3,8 @@
|
|||
package cli
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/kschamplin/gotelem/internal/can"
|
||||
|
@ -57,7 +59,7 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
|||
broker := deps.Broker
|
||||
|
||||
if !cCtx.IsSet("can") {
|
||||
logger.Info("no can device provided")
|
||||
logger.Debug("no can device provided, skip")
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -82,6 +84,9 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
|||
go func() {
|
||||
for {
|
||||
pkt, err := s.sock.Recv()
|
||||
if errors.Is(err, io.EOF) {
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
logger.Warn("error receiving CAN packet", "err", err)
|
||||
}
|
||||
|
@ -95,13 +100,17 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
|||
case msg := <-rxCh:
|
||||
|
||||
frame, err = skylab.ToCanFrame(msg.Data)
|
||||
if err != nil {
|
||||
logger.Warn("error encoding can frame", "name", msg.Name, "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
s.sock.Send(&frame)
|
||||
|
||||
case msg := <-rxCan:
|
||||
p, err := skylab.FromCanFrame(msg)
|
||||
if err != nil {
|
||||
logger.Warn("error parsing can packet", "id", msg.Id)
|
||||
logger.Warn("error parsing can packet", "id", msg.Id, "err", err)
|
||||
continue
|
||||
}
|
||||
cde := skylab.BusEvent{
|
||||
|
@ -111,6 +120,8 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
|||
}
|
||||
broker.Publish("socketCAN", cde)
|
||||
case <-cCtx.Done():
|
||||
// close the socket.
|
||||
s.sock.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
46
http.go
46
http.go
|
@ -24,7 +24,10 @@ func extractBusEventFilter(r *http.Request) (*BusEventFilter, error) {
|
|||
bef := &BusEventFilter{}
|
||||
|
||||
v := r.URL.Query()
|
||||
bef.Names = v["name"] // put all the names in.
|
||||
if v.Has("name") {
|
||||
bef.Names = v["name"]
|
||||
}
|
||||
|
||||
if el := v.Get("start"); el != "" {
|
||||
// parse the start time query.
|
||||
t, err := time.Parse(time.RFC3339, el)
|
||||
|
@ -41,13 +44,16 @@ func extractBusEventFilter(r *http.Request) (*BusEventFilter, error) {
|
|||
}
|
||||
bef.EndTime = t
|
||||
}
|
||||
bef.Indexes = make([]int, 0)
|
||||
for _, strIdx := range v["idx"] {
|
||||
idx, err := strconv.ParseInt(strIdx, 10, 32)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if v.Has("idx") {
|
||||
|
||||
bef.Indexes = make([]int, 0)
|
||||
for _, strIdx := range v["idx"] {
|
||||
idx, err := strconv.ParseInt(strIdx, 10, 32)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bef.Indexes = append(bef.Indexes, int(idx))
|
||||
}
|
||||
bef.Indexes = append(bef.Indexes, int(idx))
|
||||
}
|
||||
return bef, nil
|
||||
}
|
||||
|
@ -141,17 +147,7 @@ func apiV1(broker *Broker, tdb *TelemDb) chi.Router {
|
|||
})
|
||||
|
||||
// OpenMCT domain object storage. Basically an arbitrary JSON document store
|
||||
|
||||
r.Route("/openmct", func(r chi.Router) {
|
||||
// key is a column on our json store, it's nested under identifier.key
|
||||
r.Get("/{key}", func(w http.ResponseWriter, r *http.Request) {})
|
||||
r.Put("/{key}", func(w http.ResponseWriter, r *http.Request) {})
|
||||
r.Delete("/{key}", func(w http.ResponseWriter, r *http.Request) {})
|
||||
// create a new object.
|
||||
r.Post("/", func(w http.ResponseWriter, r *http.Request) {})
|
||||
// subscribe to object updates.
|
||||
r.Get("/subscribe", func(w http.ResponseWriter, r *http.Request) {})
|
||||
})
|
||||
r.Route("/openmct", apiV1OpenMCTStore(tdb))
|
||||
|
||||
// records are driving segments/runs.
|
||||
|
||||
|
@ -186,7 +182,6 @@ func apiV1PacketSubscribe(broker *Broker, db *TelemDb) http.HandlerFunc {
|
|||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
c.Ping(r.Context())
|
||||
// closeread handles protocol/status messages,
|
||||
// also handles clients closing the connection.
|
||||
// we get a context to use from it.
|
||||
|
@ -291,3 +286,16 @@ func apiV1GetValues(db *TelemDb) http.HandlerFunc {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
func apiV1OpenMCTStore(db *TelemDb) func(chi.Router) {
|
||||
return func(r chi.Router) {
|
||||
// key is a column on our json store, it's nested under identifier.key
|
||||
r.Get("/{key}", func(w http.ResponseWriter, r *http.Request) {})
|
||||
r.Put("/{key}", func(w http.ResponseWriter, r *http.Request) {})
|
||||
r.Delete("/{key}", func(w http.ResponseWriter, r *http.Request) {})
|
||||
// create a new object.
|
||||
r.Post("/", func(w http.ResponseWriter, r *http.Request) {})
|
||||
// subscribe to object updates.
|
||||
r.Get("/subscribe", func(w http.ResponseWriter, r *http.Request) {})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue