From 13205c166828a178d337b2b7858ec20011353c7a Mon Sep 17 00:00:00 2001
From: saji <champ189@umn.edu>
Date: Fri, 8 Mar 2024 11:51:59 -0600
Subject: [PATCH] got realtime working added demo livestream for testing added
 openMCT realtime plugin fixed websocket cross-origin fail

---
 broker_test.go            | 50 +++++++++++++++++++++++++++++
 cmd/gotelem/cli/server.go | 64 ++++++++++++++++++++++++++++++++++++-
 http.go                   |  4 ++-
 web/jsconfig.json         |  1 +
 web/src/app.js            | 66 ++++++++++++++++++++++++++++++++++++++-
 5 files changed, 182 insertions(+), 3 deletions(-)

diff --git a/broker_test.go b/broker_test.go
index ff2bf35..f984593 100644
--- a/broker_test.go
+++ b/broker_test.go
@@ -2,6 +2,7 @@ package gotelem
 
 import (
 	"log/slog"
+	"math"
 	"os"
 	"sync"
 	"testing"
@@ -23,6 +24,55 @@ func makeEvent() skylab.BusEvent {
 	}
 }
 
+
+// makeLiveSystem starts a process that is used to continuously stream
+// data into a Broker. Every 100ms it will send either a BmsMeasurement
+// or WslVelocity. The values will be static for WslVelocity (to
+// make comparison easier) but will be dynamic for BmsMeasurement.
+//
+func liveStream(done chan bool, broker *Broker) {
+	bmsPkt := &skylab.BmsMeasurement{
+		Current: 1.23,
+		BatteryVoltage: 11111,
+		AuxVoltage: 22222,
+	}
+	wslPkt := &skylab.WslVelocity{
+		MotorVelocity: 0,
+		VehicleVelocity: 100.0,
+	}
+	var next skylab.Packet = bmsPkt
+	for {
+		select {
+		case <-done:
+			return
+		case <-time.After(100 * time.Millisecond):
+			// send the next packet.
+			if next == bmsPkt {
+				bmsPkt.Current = float32(math.Sin(float64(time.Now().Unix()) / 2.0))
+				ev := skylab.BusEvent{
+					Timestamp: time.Now(),
+					Name: next.String(),
+					Data: next,
+				}
+				broker.Publish("livestream", ev)
+				next = wslPkt
+			} else {
+				// send the wsl
+				ev := skylab.BusEvent{
+					Timestamp: time.Now(),
+					Name: next.String(),
+					Data: next,
+				}
+				broker.Publish("livestream", ev)
+				next = bmsPkt
+			}
+
+
+
+		}
+	}
+}
+
 func TestBroker(t *testing.T) {
 	t.Parallel()
 
diff --git a/cmd/gotelem/cli/server.go b/cmd/gotelem/cli/server.go
index 965ffbc..e1e8388 100644
--- a/cmd/gotelem/cli/server.go
+++ b/cmd/gotelem/cli/server.go
@@ -5,6 +5,8 @@ import (
 	"fmt"
 	"io"
 	"net/http"
+	"math"
+	"time"
 	"os"
 	"sync"
 
@@ -35,6 +37,10 @@ var serveFlags = []cli.Flag{
 		DefaultText: "gotelem.db",
 		Usage:       "database to serve, if not specified will use memory",
 	},
+	&cli.BoolFlag{
+		Name: "demo",
+		Usage: "enable the demo packet stream",
+	},
 }
 
 var serveCmd = &cli.Command{
@@ -65,8 +71,8 @@ type svcDeps struct {
 // or if certain features are present (see cli/sqlite.go)
 var serveThings = []service{
 	&xBeeService{},
-	// &canLoggerService{},
 	&httpService{},
+	&DemoService{},
 }
 
 func serve(cCtx *cli.Context) error {
@@ -235,3 +241,59 @@ func (h *httpService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
 	}
 	return
 }
+
+
+type DemoService struct {
+}
+
+func (d *DemoService) String() string {
+	return "demo service"
+}
+
+func (d *DemoService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
+	if !cCtx.Bool("demo") {
+		return 
+	}
+
+	broker := deps.Broker
+	bmsPkt := &skylab.BmsMeasurement{
+		Current: 1.23,
+		BatteryVoltage: 11111,
+		AuxVoltage: 22222,
+	}
+	wslPkt := &skylab.WslVelocity{
+		MotorVelocity: 0,
+		VehicleVelocity: 100.0,
+	}
+	var next skylab.Packet = bmsPkt
+	for {
+		select {
+		case <-cCtx.Done():
+			return
+		case <-time.After(100 * time.Millisecond):
+			// send the next packet.
+			if next == bmsPkt {
+				bmsPkt.Current = float32(math.Sin(float64(time.Now().UnixMilli()) / 2000.0))
+				ev := skylab.BusEvent{
+					Timestamp: time.Now(),
+					Name: next.String(),
+					Data: next,
+				}
+				broker.Publish("livestream", ev)
+				next = wslPkt
+			} else {
+				// send the wsl
+				ev := skylab.BusEvent{
+					Timestamp: time.Now(),
+					Name: next.String(),
+					Data: next,
+				}
+				broker.Publish("livestream", ev)
+				next = bmsPkt
+			}
+
+
+
+		}
+	}
+}
diff --git a/http.go b/http.go
index ec36a41..ca05d53 100644
--- a/http.go
+++ b/http.go
@@ -181,7 +181,9 @@ func apiV1PacketSubscribe(broker *Broker) http.HandlerFunc {
 		defer broker.Unsubscribe(conn_id)
 
 		// setup websocket
-		c, err := websocket.Accept(w, r, nil)
+		c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
+			InsecureSkipVerify: true,
+		})
 		if err != nil {
 			http.Error(w, err.Error(), http.StatusInternalServerError)
 			return
diff --git a/web/jsconfig.json b/web/jsconfig.json
index 6cf3f0d..ed1f1bd 100644
--- a/web/jsconfig.json
+++ b/web/jsconfig.json
@@ -5,6 +5,7 @@
     "checkJs": true,
     "allowJs": true,
     "moduleResolution": "nodenext",
+    "module": "nodenext",
     "allowSyntheticDefaultImports": true,
     "paths": {
       "openmct": ["./node_modules/openmct/dist/openmct.d.ts"]
diff --git a/web/src/app.js b/web/src/app.js
index 6be1494..7d51a06 100644
--- a/web/src/app.js
+++ b/web/src/app.js
@@ -142,7 +142,7 @@ const TelemHistoryProvider = {
             start: new Date(opt.start).toISOString(),
             end: new Date(opt.end).toISOString(),
         })
-        console.log((opt.end - opt.start)/opt.size)
+        console.log((opt.end - opt.start) / opt.size)
         return fetch(url + params).then((resp) => {
             return resp.json()
         })
@@ -151,6 +151,69 @@ const TelemHistoryProvider = {
 }
 
 
+
+function TelemRealtimeProvider() {
+
+
+    return function (openmct) {
+
+        const url = `${process.env.BASE_URL.replace(/^http/, 'ws')}/api/v1/packets/subscribe?`
+        // we put our websocket connection here.
+        let connection = new WebSocket(url)
+        // connections contains name: callback mapping
+        let callbacks = {}
+        // names contains a set of *packet names*
+        let names = new Set()
+
+        function handleMessage(event) {
+            const data = JSON.parse(event.data)
+            for (const [key, value] of Object.entries(data.data)) {
+                const id = `${data.name}.${key}`
+                if (id in callbacks) {
+                    // we should construct a telem point and make a callback.
+                    callbacks[id]({
+                        "ts": data.ts,
+                        "val": value
+                    })
+                }
+            }
+        }
+
+        function updateWebsocket() {
+            const params = new URLSearchParams({
+                name: Array.from(names)
+            })
+            connection = new WebSocket(url + params)
+
+            connection.onmessage = handleMessage
+        }
+
+        let provider = {
+            supportsSubscribe: function (dObj) {
+                return dObj.type === "umnsvp-datum"
+            },
+            subscribe: function (dObj, callback) {
+                console.log("subscribe called %s", JSON.stringify(dObj))
+                // identifier is packetname.fieldname. we add the packet name to the set.
+                const key = dObj.identifier.key
+                const [pktName, _] = key.split('.')
+                // add our callback to the dictionary,
+                // add the packet name to the set
+                callbacks[key] = callback
+                names.add(pktName)
+                // update the websocket URL with the new name.
+                updateWebsocket()
+                return function unsubscribe() {
+                    names.delete(pktName)
+                    delete callbacks[key]
+                }
+            }
+        }
+        openmct.telemetry.addProvider(provider)
+    }
+}
+
+
 function GotelemPlugin() {
     return function install(openmct) {
 
@@ -170,5 +233,6 @@ function GotelemPlugin() {
 }
 
 openmct.install(GotelemPlugin())
+openmct.install(TelemRealtimeProvider())
 
 openmct.start();