got realtime working
Some checks failed
Go / build (1.21) (push) Failing after 1m6s
Go / build (1.22) (push) Failing after 1m5s

added demo livestream for testing
added openMCT realtime plugin
fixed websocket cross-origin fail
This commit is contained in:
saji 2024-03-08 11:51:59 -06:00
parent fe4cdfa0a4
commit 13205c1668
5 changed files with 182 additions and 3 deletions

View file

@ -2,6 +2,7 @@ package gotelem
import ( import (
"log/slog" "log/slog"
"math"
"os" "os"
"sync" "sync"
"testing" "testing"
@ -23,6 +24,55 @@ func makeEvent() skylab.BusEvent {
} }
} }
// makeLiveSystem starts a process that is used to continuously stream
// data into a Broker. Every 100ms it will send either a BmsMeasurement
// or WslVelocity. The values will be static for WslVelocity (to
// make comparison easier) but will be dynamic for BmsMeasurement.
//
func liveStream(done chan bool, broker *Broker) {
bmsPkt := &skylab.BmsMeasurement{
Current: 1.23,
BatteryVoltage: 11111,
AuxVoltage: 22222,
}
wslPkt := &skylab.WslVelocity{
MotorVelocity: 0,
VehicleVelocity: 100.0,
}
var next skylab.Packet = bmsPkt
for {
select {
case <-done:
return
case <-time.After(100 * time.Millisecond):
// send the next packet.
if next == bmsPkt {
bmsPkt.Current = float32(math.Sin(float64(time.Now().Unix()) / 2.0))
ev := skylab.BusEvent{
Timestamp: time.Now(),
Name: next.String(),
Data: next,
}
broker.Publish("livestream", ev)
next = wslPkt
} else {
// send the wsl
ev := skylab.BusEvent{
Timestamp: time.Now(),
Name: next.String(),
Data: next,
}
broker.Publish("livestream", ev)
next = bmsPkt
}
}
}
}
func TestBroker(t *testing.T) { func TestBroker(t *testing.T) {
t.Parallel() t.Parallel()

View file

@ -5,6 +5,8 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"math"
"time"
"os" "os"
"sync" "sync"
@ -35,6 +37,10 @@ var serveFlags = []cli.Flag{
DefaultText: "gotelem.db", DefaultText: "gotelem.db",
Usage: "database to serve, if not specified will use memory", Usage: "database to serve, if not specified will use memory",
}, },
&cli.BoolFlag{
Name: "demo",
Usage: "enable the demo packet stream",
},
} }
var serveCmd = &cli.Command{ var serveCmd = &cli.Command{
@ -65,8 +71,8 @@ type svcDeps struct {
// or if certain features are present (see cli/sqlite.go) // or if certain features are present (see cli/sqlite.go)
var serveThings = []service{ var serveThings = []service{
&xBeeService{}, &xBeeService{},
// &canLoggerService{},
&httpService{}, &httpService{},
&DemoService{},
} }
func serve(cCtx *cli.Context) error { func serve(cCtx *cli.Context) error {
@ -235,3 +241,59 @@ func (h *httpService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
} }
return return
} }
type DemoService struct {
}
func (d *DemoService) String() string {
return "demo service"
}
func (d *DemoService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
if !cCtx.Bool("demo") {
return
}
broker := deps.Broker
bmsPkt := &skylab.BmsMeasurement{
Current: 1.23,
BatteryVoltage: 11111,
AuxVoltage: 22222,
}
wslPkt := &skylab.WslVelocity{
MotorVelocity: 0,
VehicleVelocity: 100.0,
}
var next skylab.Packet = bmsPkt
for {
select {
case <-cCtx.Done():
return
case <-time.After(100 * time.Millisecond):
// send the next packet.
if next == bmsPkt {
bmsPkt.Current = float32(math.Sin(float64(time.Now().UnixMilli()) / 2000.0))
ev := skylab.BusEvent{
Timestamp: time.Now(),
Name: next.String(),
Data: next,
}
broker.Publish("livestream", ev)
next = wslPkt
} else {
// send the wsl
ev := skylab.BusEvent{
Timestamp: time.Now(),
Name: next.String(),
Data: next,
}
broker.Publish("livestream", ev)
next = bmsPkt
}
}
}
}

View file

@ -181,7 +181,9 @@ func apiV1PacketSubscribe(broker *Broker) http.HandlerFunc {
defer broker.Unsubscribe(conn_id) defer broker.Unsubscribe(conn_id)
// setup websocket // setup websocket
c, err := websocket.Accept(w, r, nil) c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
InsecureSkipVerify: true,
})
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return

View file

@ -5,6 +5,7 @@
"checkJs": true, "checkJs": true,
"allowJs": true, "allowJs": true,
"moduleResolution": "nodenext", "moduleResolution": "nodenext",
"module": "nodenext",
"allowSyntheticDefaultImports": true, "allowSyntheticDefaultImports": true,
"paths": { "paths": {
"openmct": ["./node_modules/openmct/dist/openmct.d.ts"] "openmct": ["./node_modules/openmct/dist/openmct.d.ts"]

View file

@ -142,7 +142,7 @@ const TelemHistoryProvider = {
start: new Date(opt.start).toISOString(), start: new Date(opt.start).toISOString(),
end: new Date(opt.end).toISOString(), end: new Date(opt.end).toISOString(),
}) })
console.log((opt.end - opt.start)/opt.size) console.log((opt.end - opt.start) / opt.size)
return fetch(url + params).then((resp) => { return fetch(url + params).then((resp) => {
return resp.json() return resp.json()
}) })
@ -151,6 +151,69 @@ const TelemHistoryProvider = {
} }
function TelemRealtimeProvider() {
return function (openmct) {
const url = `${process.env.BASE_URL.replace(/^http/, 'ws')}/api/v1/packets/subscribe?`
// we put our websocket connection here.
let connection = new WebSocket(url)
// connections contains name: callback mapping
let callbacks = {}
// names contains a set of *packet names*
let names = new Set()
function handleMessage(event) {
const data = JSON.parse(event.data)
for (const [key, value] of Object.entries(data.data)) {
const id = `${data.name}.${key}`
if (id in callbacks) {
// we should construct a telem point and make a callback.
callbacks[id]({
"ts": data.ts,
"val": value
})
}
}
}
function updateWebsocket() {
const params = new URLSearchParams({
name: Array.from(names)
})
connection = new WebSocket(url + params)
connection.onmessage = handleMessage
}
let provider = {
supportsSubscribe: function (dObj) {
return dObj.type === "umnsvp-datum"
},
subscribe: function (dObj, callback) {
console.log("subscribe called %s", JSON.stringify(dObj))
// identifier is packetname.fieldname. we add the packet name to the set.
const key = dObj.identifier.key
const [pktName, _] = key.split('.')
// add our callback to the dictionary,
// add the packet name to the set
callbacks[key] = callback
names.add(pktName)
// update the websocket URL with the new name.
updateWebsocket()
return function unsubscribe() {
names.delete(pktName)
delete callbacks[key]
}
}
}
openmct.telemetry.addProvider(provider)
}
}
function GotelemPlugin() { function GotelemPlugin() {
return function install(openmct) { return function install(openmct) {
@ -170,5 +233,6 @@ function GotelemPlugin() {
} }
openmct.install(GotelemPlugin()) openmct.install(GotelemPlugin())
openmct.install(TelemRealtimeProvider())
openmct.start(); openmct.start();