Compare commits

..

2 commits

Author SHA1 Message Date
saji 54b7427428 misc cleanup
All checks were successful
Go / build (1.21) (push) Successful in 1m7s
Go / build (1.22) (push) Successful in 1m6s
2024-03-07 16:42:49 -06:00
saji e08ab050ef fix websocket endpoint 2024-03-07 16:42:27 -06:00
3 changed files with 43 additions and 25 deletions

View file

@ -52,7 +52,6 @@ var serveCmd = &cli.Command{
type service interface {
fmt.Stringer
Start(cCtx *cli.Context, deps svcDeps) (err error)
Status()
}
type svcDeps struct {
@ -111,17 +110,17 @@ func serve(cCtx *cli.Context) error {
}
for _, svc := range serveThings {
logger.Info("starting service", "svc", svc.String())
logger.Info("starting service", "service", svc.String())
wg.Add(1)
go func(mySvc service, baseLogger *slog.Logger) {
svcLogger := logger.With("svc", mySvc.String())
svcLogger := logger.With("service", mySvc.String())
s := deps
s.Logger = svcLogger
defer wg.Done()
// TODO: recover
err := mySvc.Start(cCtx, s)
if err != nil {
logger.Error("service stopped!", "err", err, "svc", mySvc.String())
logger.Error("service stopped!", "err", err, "service", mySvc.String())
}
}(svc, logger)
}

View file

@ -3,6 +3,8 @@
package cli
import (
"errors"
"io"
"time"
"github.com/kschamplin/gotelem/internal/can"
@ -57,7 +59,7 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
broker := deps.Broker
if !cCtx.IsSet("can") {
logger.Info("no can device provided")
logger.Debug("no can device provided, skip")
return
}
@ -82,6 +84,9 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
go func() {
for {
pkt, err := s.sock.Recv()
if errors.Is(err, io.EOF) {
return
}
if err != nil {
logger.Warn("error receiving CAN packet", "err", err)
}
@ -95,13 +100,17 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
case msg := <-rxCh:
frame, err = skylab.ToCanFrame(msg.Data)
if err != nil {
logger.Warn("error encoding can frame", "name", msg.Name, "err", err)
continue
}
s.sock.Send(&frame)
case msg := <-rxCan:
p, err := skylab.FromCanFrame(msg)
if err != nil {
logger.Warn("error parsing can packet", "id", msg.Id)
logger.Warn("error parsing can packet", "id", msg.Id, "err", err)
continue
}
cde := skylab.BusEvent{
@ -111,6 +120,8 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
}
broker.Publish("socketCAN", cde)
case <-cCtx.Done():
// close the socket.
s.sock.Close()
return
}
}

34
http.go
View file

@ -24,7 +24,10 @@ func extractBusEventFilter(r *http.Request) (*BusEventFilter, error) {
bef := &BusEventFilter{}
v := r.URL.Query()
bef.Names = v["name"] // put all the names in.
if v.Has("name") {
bef.Names = v["name"]
}
if el := v.Get("start"); el != "" {
// parse the start time query.
t, err := time.Parse(time.RFC3339, el)
@ -41,6 +44,8 @@ func extractBusEventFilter(r *http.Request) (*BusEventFilter, error) {
}
bef.EndTime = t
}
if v.Has("idx") {
bef.Indexes = make([]int, 0)
for _, strIdx := range v["idx"] {
idx, err := strconv.ParseInt(strIdx, 10, 32)
@ -49,6 +54,7 @@ func extractBusEventFilter(r *http.Request) (*BusEventFilter, error) {
}
bef.Indexes = append(bef.Indexes, int(idx))
}
}
return bef, nil
}
@ -141,17 +147,7 @@ func apiV1(broker *Broker, tdb *TelemDb) chi.Router {
})
// OpenMCT domain object storage. Basically an arbitrary JSON document store
r.Route("/openmct", func(r chi.Router) {
// key is a column on our json store, it's nested under identifier.key
r.Get("/{key}", func(w http.ResponseWriter, r *http.Request) {})
r.Put("/{key}", func(w http.ResponseWriter, r *http.Request) {})
r.Delete("/{key}", func(w http.ResponseWriter, r *http.Request) {})
// create a new object.
r.Post("/", func(w http.ResponseWriter, r *http.Request) {})
// subscribe to object updates.
r.Get("/subscribe", func(w http.ResponseWriter, r *http.Request) {})
})
r.Route("/openmct", apiV1OpenMCTStore(tdb))
// records are driving segments/runs.
@ -186,7 +182,6 @@ func apiV1PacketSubscribe(broker *Broker, db *TelemDb) http.HandlerFunc {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
c.Ping(r.Context())
// closeread handles protocol/status messages,
// also handles clients closing the connection.
// we get a context to use from it.
@ -291,3 +286,16 @@ func apiV1GetValues(db *TelemDb) http.HandlerFunc {
}
}
func apiV1OpenMCTStore(db *TelemDb) func(chi.Router) {
return func(r chi.Router) {
// key is a column on our json store, it's nested under identifier.key
r.Get("/{key}", func(w http.ResponseWriter, r *http.Request) {})
r.Put("/{key}", func(w http.ResponseWriter, r *http.Request) {})
r.Delete("/{key}", func(w http.ResponseWriter, r *http.Request) {})
// create a new object.
r.Post("/", func(w http.ResponseWriter, r *http.Request) {})
// subscribe to object updates.
r.Get("/subscribe", func(w http.ResponseWriter, r *http.Request) {})
}
}