diff --git a/broker_test.go b/broker_test.go index ff2bf35..f984593 100644 --- a/broker_test.go +++ b/broker_test.go @@ -2,6 +2,7 @@ package gotelem import ( "log/slog" + "math" "os" "sync" "testing" @@ -23,6 +24,55 @@ func makeEvent() skylab.BusEvent { } } + +// makeLiveSystem starts a process that is used to continuously stream +// data into a Broker. Every 100ms it will send either a BmsMeasurement +// or WslVelocity. The values will be static for WslVelocity (to +// make comparison easier) but will be dynamic for BmsMeasurement. +// +func liveStream(done chan bool, broker *Broker) { + bmsPkt := &skylab.BmsMeasurement{ + Current: 1.23, + BatteryVoltage: 11111, + AuxVoltage: 22222, + } + wslPkt := &skylab.WslVelocity{ + MotorVelocity: 0, + VehicleVelocity: 100.0, + } + var next skylab.Packet = bmsPkt + for { + select { + case <-done: + return + case <-time.After(100 * time.Millisecond): + // send the next packet. + if next == bmsPkt { + bmsPkt.Current = float32(math.Sin(float64(time.Now().Unix()) / 2.0)) + ev := skylab.BusEvent{ + Timestamp: time.Now(), + Name: next.String(), + Data: next, + } + broker.Publish("livestream", ev) + next = wslPkt + } else { + // send the wsl + ev := skylab.BusEvent{ + Timestamp: time.Now(), + Name: next.String(), + Data: next, + } + broker.Publish("livestream", ev) + next = bmsPkt + } + + + + } + } +} + func TestBroker(t *testing.T) { t.Parallel() diff --git a/cmd/gotelem/cli/server.go b/cmd/gotelem/cli/server.go index 965ffbc..e1e8388 100644 --- a/cmd/gotelem/cli/server.go +++ b/cmd/gotelem/cli/server.go @@ -5,6 +5,8 @@ import ( "fmt" "io" "net/http" + "math" + "time" "os" "sync" @@ -35,6 +37,10 @@ var serveFlags = []cli.Flag{ DefaultText: "gotelem.db", Usage: "database to serve, if not specified will use memory", }, + &cli.BoolFlag{ + Name: "demo", + Usage: "enable the demo packet stream", + }, } var serveCmd = &cli.Command{ @@ -65,8 +71,8 @@ type svcDeps struct { // or if certain features are present (see cli/sqlite.go) var serveThings = []service{ &xBeeService{}, - // &canLoggerService{}, &httpService{}, + &DemoService{}, } func serve(cCtx *cli.Context) error { @@ -235,3 +241,59 @@ func (h *httpService) Start(cCtx *cli.Context, deps svcDeps) (err error) { } return } + + +type DemoService struct { +} + +func (d *DemoService) String() string { + return "demo service" +} + +func (d *DemoService) Start(cCtx *cli.Context, deps svcDeps) (err error) { + if !cCtx.Bool("demo") { + return + } + + broker := deps.Broker + bmsPkt := &skylab.BmsMeasurement{ + Current: 1.23, + BatteryVoltage: 11111, + AuxVoltage: 22222, + } + wslPkt := &skylab.WslVelocity{ + MotorVelocity: 0, + VehicleVelocity: 100.0, + } + var next skylab.Packet = bmsPkt + for { + select { + case <-cCtx.Done(): + return + case <-time.After(100 * time.Millisecond): + // send the next packet. + if next == bmsPkt { + bmsPkt.Current = float32(math.Sin(float64(time.Now().UnixMilli()) / 2000.0)) + ev := skylab.BusEvent{ + Timestamp: time.Now(), + Name: next.String(), + Data: next, + } + broker.Publish("livestream", ev) + next = wslPkt + } else { + // send the wsl + ev := skylab.BusEvent{ + Timestamp: time.Now(), + Name: next.String(), + Data: next, + } + broker.Publish("livestream", ev) + next = bmsPkt + } + + + + } + } +} diff --git a/http.go b/http.go index ec36a41..ca05d53 100644 --- a/http.go +++ b/http.go @@ -181,7 +181,9 @@ func apiV1PacketSubscribe(broker *Broker) http.HandlerFunc { defer broker.Unsubscribe(conn_id) // setup websocket - c, err := websocket.Accept(w, r, nil) + c, err := websocket.Accept(w, r, &websocket.AcceptOptions{ + InsecureSkipVerify: true, + }) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/web/jsconfig.json b/web/jsconfig.json index 6cf3f0d..ed1f1bd 100644 --- a/web/jsconfig.json +++ b/web/jsconfig.json @@ -5,6 +5,7 @@ "checkJs": true, "allowJs": true, "moduleResolution": "nodenext", + "module": "nodenext", "allowSyntheticDefaultImports": true, "paths": { "openmct": ["./node_modules/openmct/dist/openmct.d.ts"] diff --git a/web/src/app.js b/web/src/app.js index 6be1494..7d51a06 100644 --- a/web/src/app.js +++ b/web/src/app.js @@ -142,7 +142,7 @@ const TelemHistoryProvider = { start: new Date(opt.start).toISOString(), end: new Date(opt.end).toISOString(), }) - console.log((opt.end - opt.start)/opt.size) + console.log((opt.end - opt.start) / opt.size) return fetch(url + params).then((resp) => { return resp.json() }) @@ -151,6 +151,69 @@ const TelemHistoryProvider = { } + +function TelemRealtimeProvider() { + + + return function (openmct) { + + const url = `${process.env.BASE_URL.replace(/^http/, 'ws')}/api/v1/packets/subscribe?` + // we put our websocket connection here. + let connection = new WebSocket(url) + // connections contains name: callback mapping + let callbacks = {} + // names contains a set of *packet names* + let names = new Set() + + function handleMessage(event) { + const data = JSON.parse(event.data) + for (const [key, value] of Object.entries(data.data)) { + const id = `${data.name}.${key}` + if (id in callbacks) { + // we should construct a telem point and make a callback. + callbacks[id]({ + "ts": data.ts, + "val": value + }) + } + } + } + + function updateWebsocket() { + const params = new URLSearchParams({ + name: Array.from(names) + }) + connection = new WebSocket(url + params) + + connection.onmessage = handleMessage + } + + let provider = { + supportsSubscribe: function (dObj) { + return dObj.type === "umnsvp-datum" + }, + subscribe: function (dObj, callback) { + console.log("subscribe called %s", JSON.stringify(dObj)) + // identifier is packetname.fieldname. we add the packet name to the set. + const key = dObj.identifier.key + const [pktName, _] = key.split('.') + // add our callback to the dictionary, + // add the packet name to the set + callbacks[key] = callback + names.add(pktName) + // update the websocket URL with the new name. + updateWebsocket() + return function unsubscribe() { + names.delete(pktName) + delete callbacks[key] + } + } + } + openmct.telemetry.addProvider(provider) + } +} + + function GotelemPlugin() { return function install(openmct) { @@ -170,5 +233,6 @@ function GotelemPlugin() { } openmct.install(GotelemPlugin()) +openmct.install(TelemRealtimeProvider()) openmct.start();