Compare commits

...

2 commits

Author SHA1 Message Date
saji 54b7427428 misc cleanup
All checks were successful
Go / build (1.21) (push) Successful in 1m7s
Go / build (1.22) (push) Successful in 1m6s
2024-03-07 16:42:49 -06:00
saji e08ab050ef fix websocket endpoint 2024-03-07 16:42:27 -06:00
3 changed files with 43 additions and 25 deletions

View file

@ -52,7 +52,6 @@ var serveCmd = &cli.Command{
type service interface { type service interface {
fmt.Stringer fmt.Stringer
Start(cCtx *cli.Context, deps svcDeps) (err error) Start(cCtx *cli.Context, deps svcDeps) (err error)
Status()
} }
type svcDeps struct { type svcDeps struct {
@ -111,17 +110,17 @@ func serve(cCtx *cli.Context) error {
} }
for _, svc := range serveThings { for _, svc := range serveThings {
logger.Info("starting service", "svc", svc.String()) logger.Info("starting service", "service", svc.String())
wg.Add(1) wg.Add(1)
go func(mySvc service, baseLogger *slog.Logger) { go func(mySvc service, baseLogger *slog.Logger) {
svcLogger := logger.With("svc", mySvc.String()) svcLogger := logger.With("service", mySvc.String())
s := deps s := deps
s.Logger = svcLogger s.Logger = svcLogger
defer wg.Done() defer wg.Done()
// TODO: recover // TODO: recover
err := mySvc.Start(cCtx, s) err := mySvc.Start(cCtx, s)
if err != nil { if err != nil {
logger.Error("service stopped!", "err", err, "svc", mySvc.String()) logger.Error("service stopped!", "err", err, "service", mySvc.String())
} }
}(svc, logger) }(svc, logger)
} }

View file

@ -3,6 +3,8 @@
package cli package cli
import ( import (
"errors"
"io"
"time" "time"
"github.com/kschamplin/gotelem/internal/can" "github.com/kschamplin/gotelem/internal/can"
@ -57,7 +59,7 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
broker := deps.Broker broker := deps.Broker
if !cCtx.IsSet("can") { if !cCtx.IsSet("can") {
logger.Info("no can device provided") logger.Debug("no can device provided, skip")
return return
} }
@ -82,6 +84,9 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
go func() { go func() {
for { for {
pkt, err := s.sock.Recv() pkt, err := s.sock.Recv()
if errors.Is(err, io.EOF) {
return
}
if err != nil { if err != nil {
logger.Warn("error receiving CAN packet", "err", err) logger.Warn("error receiving CAN packet", "err", err)
} }
@ -95,13 +100,17 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
case msg := <-rxCh: case msg := <-rxCh:
frame, err = skylab.ToCanFrame(msg.Data) frame, err = skylab.ToCanFrame(msg.Data)
if err != nil {
logger.Warn("error encoding can frame", "name", msg.Name, "err", err)
continue
}
s.sock.Send(&frame) s.sock.Send(&frame)
case msg := <-rxCan: case msg := <-rxCan:
p, err := skylab.FromCanFrame(msg) p, err := skylab.FromCanFrame(msg)
if err != nil { if err != nil {
logger.Warn("error parsing can packet", "id", msg.Id) logger.Warn("error parsing can packet", "id", msg.Id, "err", err)
continue continue
} }
cde := skylab.BusEvent{ cde := skylab.BusEvent{
@ -111,6 +120,8 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
} }
broker.Publish("socketCAN", cde) broker.Publish("socketCAN", cde)
case <-cCtx.Done(): case <-cCtx.Done():
// close the socket.
s.sock.Close()
return return
} }
} }

46
http.go
View file

@ -24,7 +24,10 @@ func extractBusEventFilter(r *http.Request) (*BusEventFilter, error) {
bef := &BusEventFilter{} bef := &BusEventFilter{}
v := r.URL.Query() v := r.URL.Query()
bef.Names = v["name"] // put all the names in. if v.Has("name") {
bef.Names = v["name"]
}
if el := v.Get("start"); el != "" { if el := v.Get("start"); el != "" {
// parse the start time query. // parse the start time query.
t, err := time.Parse(time.RFC3339, el) t, err := time.Parse(time.RFC3339, el)
@ -41,13 +44,16 @@ func extractBusEventFilter(r *http.Request) (*BusEventFilter, error) {
} }
bef.EndTime = t bef.EndTime = t
} }
bef.Indexes = make([]int, 0) if v.Has("idx") {
for _, strIdx := range v["idx"] {
idx, err := strconv.ParseInt(strIdx, 10, 32) bef.Indexes = make([]int, 0)
if err != nil { for _, strIdx := range v["idx"] {
return nil, err idx, err := strconv.ParseInt(strIdx, 10, 32)
if err != nil {
return nil, err
}
bef.Indexes = append(bef.Indexes, int(idx))
} }
bef.Indexes = append(bef.Indexes, int(idx))
} }
return bef, nil return bef, nil
} }
@ -141,17 +147,7 @@ func apiV1(broker *Broker, tdb *TelemDb) chi.Router {
}) })
// OpenMCT domain object storage. Basically an arbitrary JSON document store // OpenMCT domain object storage. Basically an arbitrary JSON document store
r.Route("/openmct", apiV1OpenMCTStore(tdb))
r.Route("/openmct", func(r chi.Router) {
// key is a column on our json store, it's nested under identifier.key
r.Get("/{key}", func(w http.ResponseWriter, r *http.Request) {})
r.Put("/{key}", func(w http.ResponseWriter, r *http.Request) {})
r.Delete("/{key}", func(w http.ResponseWriter, r *http.Request) {})
// create a new object.
r.Post("/", func(w http.ResponseWriter, r *http.Request) {})
// subscribe to object updates.
r.Get("/subscribe", func(w http.ResponseWriter, r *http.Request) {})
})
// records are driving segments/runs. // records are driving segments/runs.
@ -186,7 +182,6 @@ func apiV1PacketSubscribe(broker *Broker, db *TelemDb) http.HandlerFunc {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
c.Ping(r.Context())
// closeread handles protocol/status messages, // closeread handles protocol/status messages,
// also handles clients closing the connection. // also handles clients closing the connection.
// we get a context to use from it. // we get a context to use from it.
@ -291,3 +286,16 @@ func apiV1GetValues(db *TelemDb) http.HandlerFunc {
} }
} }
func apiV1OpenMCTStore(db *TelemDb) func(chi.Router) {
return func(r chi.Router) {
// key is a column on our json store, it's nested under identifier.key
r.Get("/{key}", func(w http.ResponseWriter, r *http.Request) {})
r.Put("/{key}", func(w http.ResponseWriter, r *http.Request) {})
r.Delete("/{key}", func(w http.ResponseWriter, r *http.Request) {})
// create a new object.
r.Post("/", func(w http.ResponseWriter, r *http.Request) {})
// subscribe to object updates.
r.Get("/subscribe", func(w http.ResponseWriter, r *http.Request) {})
}
}