Compare commits

..

No commits in common. "54b7427428074c9bfcd6401a9793a54a3613b319" and "cf112ef561151c848cbf59890e0c127a60ddf674" have entirely different histories.

3 changed files with 25 additions and 43 deletions

View file

@ -52,6 +52,7 @@ var serveCmd = &cli.Command{
type service interface { type service interface {
fmt.Stringer fmt.Stringer
Start(cCtx *cli.Context, deps svcDeps) (err error) Start(cCtx *cli.Context, deps svcDeps) (err error)
Status()
} }
type svcDeps struct { type svcDeps struct {
@ -110,17 +111,17 @@ func serve(cCtx *cli.Context) error {
} }
for _, svc := range serveThings { for _, svc := range serveThings {
logger.Info("starting service", "service", svc.String()) logger.Info("starting service", "svc", svc.String())
wg.Add(1) wg.Add(1)
go func(mySvc service, baseLogger *slog.Logger) { go func(mySvc service, baseLogger *slog.Logger) {
svcLogger := logger.With("service", mySvc.String()) svcLogger := logger.With("svc", mySvc.String())
s := deps s := deps
s.Logger = svcLogger s.Logger = svcLogger
defer wg.Done() defer wg.Done()
// TODO: recover // TODO: recover
err := mySvc.Start(cCtx, s) err := mySvc.Start(cCtx, s)
if err != nil { if err != nil {
logger.Error("service stopped!", "err", err, "service", mySvc.String()) logger.Error("service stopped!", "err", err, "svc", mySvc.String())
} }
}(svc, logger) }(svc, logger)
} }

View file

@ -3,8 +3,6 @@
package cli package cli
import ( import (
"errors"
"io"
"time" "time"
"github.com/kschamplin/gotelem/internal/can" "github.com/kschamplin/gotelem/internal/can"
@ -59,7 +57,7 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
broker := deps.Broker broker := deps.Broker
if !cCtx.IsSet("can") { if !cCtx.IsSet("can") {
logger.Debug("no can device provided, skip") logger.Info("no can device provided")
return return
} }
@ -84,9 +82,6 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
go func() { go func() {
for { for {
pkt, err := s.sock.Recv() pkt, err := s.sock.Recv()
if errors.Is(err, io.EOF) {
return
}
if err != nil { if err != nil {
logger.Warn("error receiving CAN packet", "err", err) logger.Warn("error receiving CAN packet", "err", err)
} }
@ -100,17 +95,13 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
case msg := <-rxCh: case msg := <-rxCh:
frame, err = skylab.ToCanFrame(msg.Data) frame, err = skylab.ToCanFrame(msg.Data)
if err != nil {
logger.Warn("error encoding can frame", "name", msg.Name, "err", err)
continue
}
s.sock.Send(&frame) s.sock.Send(&frame)
case msg := <-rxCan: case msg := <-rxCan:
p, err := skylab.FromCanFrame(msg) p, err := skylab.FromCanFrame(msg)
if err != nil { if err != nil {
logger.Warn("error parsing can packet", "id", msg.Id, "err", err) logger.Warn("error parsing can packet", "id", msg.Id)
continue continue
} }
cde := skylab.BusEvent{ cde := skylab.BusEvent{
@ -120,8 +111,6 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
} }
broker.Publish("socketCAN", cde) broker.Publish("socketCAN", cde)
case <-cCtx.Done(): case <-cCtx.Done():
// close the socket.
s.sock.Close()
return return
} }
} }

46
http.go
View file

@ -24,10 +24,7 @@ func extractBusEventFilter(r *http.Request) (*BusEventFilter, error) {
bef := &BusEventFilter{} bef := &BusEventFilter{}
v := r.URL.Query() v := r.URL.Query()
if v.Has("name") { bef.Names = v["name"] // put all the names in.
bef.Names = v["name"]
}
if el := v.Get("start"); el != "" { if el := v.Get("start"); el != "" {
// parse the start time query. // parse the start time query.
t, err := time.Parse(time.RFC3339, el) t, err := time.Parse(time.RFC3339, el)
@ -44,16 +41,13 @@ func extractBusEventFilter(r *http.Request) (*BusEventFilter, error) {
} }
bef.EndTime = t bef.EndTime = t
} }
if v.Has("idx") { bef.Indexes = make([]int, 0)
for _, strIdx := range v["idx"] {
bef.Indexes = make([]int, 0) idx, err := strconv.ParseInt(strIdx, 10, 32)
for _, strIdx := range v["idx"] { if err != nil {
idx, err := strconv.ParseInt(strIdx, 10, 32) return nil, err
if err != nil {
return nil, err
}
bef.Indexes = append(bef.Indexes, int(idx))
} }
bef.Indexes = append(bef.Indexes, int(idx))
} }
return bef, nil return bef, nil
} }
@ -147,7 +141,17 @@ func apiV1(broker *Broker, tdb *TelemDb) chi.Router {
}) })
// OpenMCT domain object storage. Basically an arbitrary JSON document store // OpenMCT domain object storage. Basically an arbitrary JSON document store
r.Route("/openmct", apiV1OpenMCTStore(tdb))
r.Route("/openmct", func(r chi.Router) {
// key is a column on our json store, it's nested under identifier.key
r.Get("/{key}", func(w http.ResponseWriter, r *http.Request) {})
r.Put("/{key}", func(w http.ResponseWriter, r *http.Request) {})
r.Delete("/{key}", func(w http.ResponseWriter, r *http.Request) {})
// create a new object.
r.Post("/", func(w http.ResponseWriter, r *http.Request) {})
// subscribe to object updates.
r.Get("/subscribe", func(w http.ResponseWriter, r *http.Request) {})
})
// records are driving segments/runs. // records are driving segments/runs.
@ -182,6 +186,7 @@ func apiV1PacketSubscribe(broker *Broker, db *TelemDb) http.HandlerFunc {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
c.Ping(r.Context())
// closeread handles protocol/status messages, // closeread handles protocol/status messages,
// also handles clients closing the connection. // also handles clients closing the connection.
// we get a context to use from it. // we get a context to use from it.
@ -286,16 +291,3 @@ func apiV1GetValues(db *TelemDb) http.HandlerFunc {
} }
} }
func apiV1OpenMCTStore(db *TelemDb) func(chi.Router) {
return func(r chi.Router) {
// key is a column on our json store, it's nested under identifier.key
r.Get("/{key}", func(w http.ResponseWriter, r *http.Request) {})
r.Put("/{key}", func(w http.ResponseWriter, r *http.Request) {})
r.Delete("/{key}", func(w http.ResponseWriter, r *http.Request) {})
// create a new object.
r.Post("/", func(w http.ResponseWriter, r *http.Request) {})
// subscribe to object updates.
r.Get("/subscribe", func(w http.ResponseWriter, r *http.Request) {})
}
}