Compare commits
No commits in common. "90e8c3f101fdbc13500b8bad8a581e956c4d0d0b" and "54b7427428074c9bfcd6401a9793a54a3613b319" have entirely different histories.
90e8c3f101
...
54b7427428
|
@ -52,10 +52,7 @@ func (b *Broker) Unsubscribe(name string) {
|
||||||
b.lock.Lock()
|
b.lock.Lock()
|
||||||
defer b.lock.Unlock()
|
defer b.lock.Unlock()
|
||||||
b.logger.Debug("unsubscribe", "name", name)
|
b.logger.Debug("unsubscribe", "name", name)
|
||||||
if _, ok := b.subs[name]; ok {
|
|
||||||
close(b.subs[name])
|
|
||||||
delete(b.subs, name)
|
delete(b.subs, name)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Publish sends a bus event to all subscribers. It includes a sender
|
// Publish sends a bus event to all subscribers. It includes a sender
|
||||||
|
|
120
broker_test.go
120
broker_test.go
|
@ -1,120 +0,0 @@
|
||||||
package gotelem
|
|
||||||
|
|
||||||
import (
|
|
||||||
"log/slog"
|
|
||||||
"os"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/kschamplin/gotelem/skylab"
|
|
||||||
)
|
|
||||||
|
|
||||||
func makeEvent() skylab.BusEvent {
|
|
||||||
var pkt skylab.Packet = &skylab.BmsMeasurement{
|
|
||||||
BatteryVoltage: 12000,
|
|
||||||
AuxVoltage: 24000,
|
|
||||||
Current: 1.23,
|
|
||||||
}
|
|
||||||
return skylab.BusEvent{
|
|
||||||
Timestamp: time.Now(),
|
|
||||||
Name: pkt.String(),
|
|
||||||
Data: pkt,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBroker(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
t.Run("test send", func(t *testing.T) {
|
|
||||||
flog := slog.New(slog.NewTextHandler(os.Stderr, nil))
|
|
||||||
broker := NewBroker(10, flog)
|
|
||||||
|
|
||||||
sub, err := broker.Subscribe("testSub")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("error subscribing: %v", err)
|
|
||||||
}
|
|
||||||
testEvent := makeEvent()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
time.Sleep(time.Millisecond * 1)
|
|
||||||
broker.Publish("other", testEvent)
|
|
||||||
}()
|
|
||||||
|
|
||||||
var recvEvent skylab.BusEvent
|
|
||||||
select {
|
|
||||||
case recvEvent = <-sub:
|
|
||||||
if !testEvent.Equals(&recvEvent) {
|
|
||||||
t.Fatalf("events not equal, want %v got %v", testEvent, recvEvent)
|
|
||||||
}
|
|
||||||
case <-time.After(1 * time.Second):
|
|
||||||
t.Fatalf("timeout waiting for packet")
|
|
||||||
}
|
|
||||||
|
|
||||||
})
|
|
||||||
t.Run("multiple broadcast", func(t *testing.T) {
|
|
||||||
flog := slog.New(slog.NewTextHandler(os.Stderr, nil))
|
|
||||||
broker := NewBroker(10, flog)
|
|
||||||
testEvent := makeEvent()
|
|
||||||
wg := sync.WaitGroup{}
|
|
||||||
|
|
||||||
clientFn := func(name string) {
|
|
||||||
sub, err := broker.Subscribe(name)
|
|
||||||
if err != nil {
|
|
||||||
t.Log(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
<-sub
|
|
||||||
wg.Done()
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Add(2)
|
|
||||||
go clientFn("client1")
|
|
||||||
go clientFn("client2")
|
|
||||||
|
|
||||||
// yes this is stupid. otherwise we race.
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
|
|
||||||
broker.Publish("sender", testEvent)
|
|
||||||
|
|
||||||
done := make(chan bool)
|
|
||||||
go func() {
|
|
||||||
wg.Wait()
|
|
||||||
done <- true
|
|
||||||
}()
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
|
|
||||||
case <-time.After(1 * time.Second):
|
|
||||||
t.Fatal("timeout waiting for clients")
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("name collision", func(t *testing.T) {
|
|
||||||
flog := slog.New(slog.NewTextHandler(os.Stderr, nil))
|
|
||||||
broker := NewBroker(10, flog)
|
|
||||||
_, err := broker.Subscribe("collide")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
_, err = broker.Subscribe("collide")
|
|
||||||
if err == nil {
|
|
||||||
t.Fatal("expected error, got nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("unsubscribe", func(t *testing.T) {
|
|
||||||
flog := slog.New(slog.NewTextHandler(os.Stderr, nil))
|
|
||||||
broker := NewBroker(10, flog)
|
|
||||||
ch, err := broker.Subscribe("test")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
broker.Unsubscribe("test")
|
|
||||||
_, ok := <-ch
|
|
||||||
if ok {
|
|
||||||
t.Fatal("expected dead channel, but channel returned result")
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
4
db.go
4
db.go
|
@ -195,7 +195,7 @@ func (tdb *TelemDb) GetPackets(ctx context.Context, filter BusEventFilter, lim *
|
||||||
sb.WriteString(strings.Join(whereFrags, " AND "))
|
sb.WriteString(strings.Join(whereFrags, " AND "))
|
||||||
}
|
}
|
||||||
|
|
||||||
sb.WriteString(" ORDER BY ts ASC")
|
sb.WriteString(" ORDER BY ts DESC")
|
||||||
|
|
||||||
// Augment our data further if there's i.e a limit modifier.
|
// Augment our data further if there's i.e a limit modifier.
|
||||||
// TODO: factor this out maybe?
|
// TODO: factor this out maybe?
|
||||||
|
@ -278,7 +278,7 @@ func (tdb *TelemDb) GetValues(ctx context.Context, filter BusEventFilter,
|
||||||
// join qstrings with AND
|
// join qstrings with AND
|
||||||
sb.WriteString(strings.Join(whereFrags, " AND "))
|
sb.WriteString(strings.Join(whereFrags, " AND "))
|
||||||
|
|
||||||
sb.WriteString(" ORDER BY ts ASC")
|
sb.WriteString(" ORDER BY ts DESC")
|
||||||
|
|
||||||
if lim != nil {
|
if lim != nil {
|
||||||
lim.ModifyStatement(&sb)
|
lim.ModifyStatement(&sb)
|
||||||
|
|
21
db_test.go
21
db_test.go
|
@ -68,10 +68,6 @@ func MakeMockDatabase(name string) *TelemDb {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
return tdb
|
|
||||||
}
|
|
||||||
|
|
||||||
func SeedMockDatabase(tdb *TelemDb) {
|
|
||||||
// seed the database now.
|
// seed the database now.
|
||||||
scanner := bufio.NewScanner(strings.NewReader(exampleData))
|
scanner := bufio.NewScanner(strings.NewReader(exampleData))
|
||||||
|
|
||||||
|
@ -87,22 +83,8 @@ func SeedMockDatabase(tdb *TelemDb) {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
func GetSeedEvents() []skylab.BusEvent {
|
return tdb
|
||||||
evs := make([]skylab.BusEvent, 0)
|
|
||||||
scanner := bufio.NewScanner(strings.NewReader(exampleData))
|
|
||||||
|
|
||||||
for scanner.Scan() {
|
|
||||||
str := scanner.Text()
|
|
||||||
|
|
||||||
bev, err := logparsers.ParsersMap["telem"](str)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
evs = append(evs, bev)
|
|
||||||
}
|
|
||||||
return evs
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTelemDb(t *testing.T) {
|
func TestTelemDb(t *testing.T) {
|
||||||
|
@ -157,7 +139,6 @@ func TestTelemDb(t *testing.T) {
|
||||||
|
|
||||||
t.Run("test getting packets", func(t *testing.T) {
|
t.Run("test getting packets", func(t *testing.T) {
|
||||||
tdb := MakeMockDatabase(t.Name())
|
tdb := MakeMockDatabase(t.Name())
|
||||||
SeedMockDatabase(tdb)
|
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
f := BusEventFilter{}
|
f := BusEventFilter{}
|
||||||
|
|
2
http.go
2
http.go
|
@ -223,9 +223,11 @@ func apiV1GetPackets(tdb *TelemDb) http.HandlerFunc {
|
||||||
lim, err := extractLimitModifier(r)
|
lim, err := extractLimitModifier(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
fmt.Print(lim)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: is the following check needed?
|
||||||
var res []skylab.BusEvent
|
var res []skylab.BusEvent
|
||||||
res, err = tdb.GetPackets(r.Context(), *bef, lim)
|
res, err = tdb.GetPackets(r.Context(), *bef, lim)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
216
http_test.go
216
http_test.go
|
@ -1,216 +0,0 @@
|
||||||
package gotelem
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"net/http"
|
|
||||||
"net/http/httptest"
|
|
||||||
"net/url"
|
|
||||||
"reflect"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/kschamplin/gotelem/skylab"
|
|
||||||
)
|
|
||||||
|
|
||||||
func Test_extractBusEventFilter(t *testing.T) {
|
|
||||||
makeReq := func(path string) *http.Request {
|
|
||||||
return httptest.NewRequest(http.MethodGet, path, nil)
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
req *http.Request
|
|
||||||
want *BusEventFilter
|
|
||||||
wantErr bool
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "test no extractions",
|
|
||||||
req: makeReq("http://localhost/"),
|
|
||||||
want: &BusEventFilter{},
|
|
||||||
wantErr: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "test single name extract",
|
|
||||||
req: makeReq("http://localhost/?name=hi"),
|
|
||||||
want: &BusEventFilter{
|
|
||||||
Names: []string{"hi"},
|
|
||||||
},
|
|
||||||
wantErr: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "test multi name extract",
|
|
||||||
req: makeReq("http://localhost/?name=hi1&name=hi2"),
|
|
||||||
want: &BusEventFilter{
|
|
||||||
Names: []string{"hi1", "hi2"},
|
|
||||||
},
|
|
||||||
wantErr: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "test start time valid extract",
|
|
||||||
req: makeReq(fmt.Sprintf("http://localhost/?start=%s", url.QueryEscape(time.Unix(160000000, 0).Format(time.RFC3339)))),
|
|
||||||
want: &BusEventFilter{
|
|
||||||
StartTime: time.Unix(160000000, 0),
|
|
||||||
},
|
|
||||||
wantErr: false,
|
|
||||||
},
|
|
||||||
// {
|
|
||||||
// name: "test start time invalid extract",
|
|
||||||
// req: makeReq(fmt.Sprintf("http://localhost/?start=%s", url.QueryEscape("ajlaskdj"))),
|
|
||||||
// wantErr: true,
|
|
||||||
// },
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
t.Logf("Testing URL %s", tt.req.URL.String())
|
|
||||||
got, err := extractBusEventFilter(tt.req)
|
|
||||||
if (err != nil) != tt.wantErr {
|
|
||||||
t.Errorf("extractBusEventFilter() error = %v, wantErr %v", err, tt.wantErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// we have to manually compare fields because timestamps can't be deeply compared.
|
|
||||||
if !reflect.DeepEqual(got.Names, tt.want.Names) {
|
|
||||||
t.Errorf("extractBusEventFilter() Names bad = %v, want %v", got.Names, tt.want.Names)
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(got.Indexes, tt.want.Indexes) {
|
|
||||||
t.Errorf("extractBusEventFilter() Indexes bad = %v, want %v", got.Indexes, tt.want.Indexes)
|
|
||||||
}
|
|
||||||
if !got.StartTime.Equal(tt.want.StartTime) {
|
|
||||||
t.Errorf("extractBusEventFilter() StartTime mismatch = %v, want %v", got.StartTime, tt.want.StartTime)
|
|
||||||
}
|
|
||||||
if !got.EndTime.Equal(tt.want.EndTime) {
|
|
||||||
t.Errorf("extractBusEventFilter() EndTime mismatch = %v, want %v", got.EndTime, tt.want.EndTime)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func Test_extractLimitModifier(t *testing.T) {
|
|
||||||
makeReq := func(path string) *http.Request {
|
|
||||||
return httptest.NewRequest(http.MethodGet, path, nil)
|
|
||||||
}
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
req *http.Request
|
|
||||||
want *LimitOffsetModifier
|
|
||||||
wantErr bool
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "test no limit/offset",
|
|
||||||
req: makeReq("http://localhost/"),
|
|
||||||
want: nil,
|
|
||||||
wantErr: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "test limit, no offset",
|
|
||||||
req: makeReq("http://localhost/?limit=10"),
|
|
||||||
want: &LimitOffsetModifier{Limit: 10},
|
|
||||||
wantErr: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "test limit and offset",
|
|
||||||
req: makeReq("http://localhost/?limit=100&offset=200"),
|
|
||||||
want: &LimitOffsetModifier{Limit: 100, Offset: 200},
|
|
||||||
wantErr: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "test only offset",
|
|
||||||
req: makeReq("http://localhost/?&offset=200"),
|
|
||||||
want: nil,
|
|
||||||
wantErr: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "test bad limit",
|
|
||||||
req: makeReq("http://localhost/?limit=aaaa"),
|
|
||||||
want: nil,
|
|
||||||
wantErr: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "test good limit, bad offset",
|
|
||||||
req: makeReq("http://localhost/?limit=10&offset=jjjj"),
|
|
||||||
want: nil,
|
|
||||||
wantErr: true,
|
|
||||||
},
|
|
||||||
|
|
||||||
}
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
got, err := extractLimitModifier(tt.req)
|
|
||||||
if (err != nil) != tt.wantErr {
|
|
||||||
t.Errorf("extractLimitModifier() error = %v, wantErr %v", err, tt.wantErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(got, tt.want) {
|
|
||||||
t.Errorf("extractLimitModifier() = %v, want %v", got, tt.want)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func Test_ApiV1GetPackets(t *testing.T) {
|
|
||||||
tdb := MakeMockDatabase(t.Name())
|
|
||||||
SeedMockDatabase(tdb)
|
|
||||||
evs := GetSeedEvents()
|
|
||||||
handler := apiV1GetPackets(tdb)
|
|
||||||
|
|
||||||
tests := []struct{
|
|
||||||
name string
|
|
||||||
req *http.Request
|
|
||||||
statusCode int
|
|
||||||
expectedResults []skylab.BusEvent
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "get all packets test",
|
|
||||||
req: httptest.NewRequest(http.MethodGet, "http://localhost/", nil),
|
|
||||||
statusCode: http.StatusOK,
|
|
||||||
expectedResults: evs,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "filter name test",
|
|
||||||
req: httptest.NewRequest(http.MethodGet, "http://localhost/?name=bms_module", nil),
|
|
||||||
statusCode: http.StatusOK,
|
|
||||||
expectedResults: func() []skylab.BusEvent {
|
|
||||||
filtered := make([]skylab.BusEvent, 0)
|
|
||||||
for _, pkt := range evs {
|
|
||||||
if pkt.Name == "bms_module" {
|
|
||||||
filtered = append(filtered, pkt)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return filtered
|
|
||||||
}(),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
// construct the recorder
|
|
||||||
w := httptest.NewRecorder()
|
|
||||||
handler(w, tt.req)
|
|
||||||
|
|
||||||
resp := w.Result()
|
|
||||||
|
|
||||||
if tt.statusCode != resp.StatusCode {
|
|
||||||
t.Errorf("incorrect status code: expected %d got %d", tt.statusCode, resp.StatusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
decoder := json.NewDecoder(resp.Body)
|
|
||||||
var resultEvents []skylab.BusEvent
|
|
||||||
err := decoder.Decode(&resultEvents)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("could not parse JSON response: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(resultEvents) != len(tt.expectedResults) {
|
|
||||||
t.Fatalf("response length did not match, want %d got %d", len(tt.expectedResults), len(resultEvents))
|
|
||||||
}
|
|
||||||
|
|
||||||
for idx := range tt.expectedResults {
|
|
||||||
expected := tt.expectedResults[idx]
|
|
||||||
actual := resultEvents[idx]
|
|
||||||
if !expected.Equals(&actual) {
|
|
||||||
t.Errorf("packet did not match, want %v got %v", expected, actual)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -3,7 +3,6 @@
|
||||||
package skylab
|
package skylab
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -95,9 +94,9 @@ type RawJsonEvent struct {
|
||||||
|
|
||||||
// BusEvent is a timestamped Skylab packet - it contains
|
// BusEvent is a timestamped Skylab packet - it contains
|
||||||
type BusEvent struct {
|
type BusEvent struct {
|
||||||
Timestamp time.Time
|
Timestamp time.Time `json:"ts"`
|
||||||
Name string
|
Name string `json:"id"`
|
||||||
Data Packet
|
Data Packet `json:"data"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e BusEvent) MarshalJSON() (b []byte, err error) {
|
func (e BusEvent) MarshalJSON() (b []byte, err error) {
|
||||||
|
@ -117,9 +116,6 @@ func (e BusEvent) MarshalJSON() (b []byte, err error) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnmarshalJSON implements JSON unmarshalling. Note that this
|
|
||||||
// uses RawJSON events, which are formatted differently.
|
|
||||||
// also it uses int64 milliseconds instead of times.
|
|
||||||
func (e *BusEvent) UnmarshalJSON(b []byte) error {
|
func (e *BusEvent) UnmarshalJSON(b []byte) error {
|
||||||
j := &RawJsonEvent{}
|
j := &RawJsonEvent{}
|
||||||
|
|
||||||
|
@ -136,22 +132,6 @@ func (e *BusEvent) UnmarshalJSON(b []byte) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Equals compares two bus events deeply.
|
|
||||||
func (e *BusEvent) Equals(other *BusEvent) bool {
|
|
||||||
if e.Name != other.Name {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if !e.Timestamp.Equal(other.Timestamp) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
pkt1, _ := e.Data.MarshalPacket()
|
|
||||||
pkt2, _ := e.Data.MarshalPacket()
|
|
||||||
if !bytes.Equal(pkt1, pkt2) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// we need to be able to parse the JSON as well. this is done using the
|
// we need to be able to parse the JSON as well. this is done using the
|
||||||
// generator since we can use the switch/case thing since it's the fastest
|
// generator since we can use the switch/case thing since it's the fastest
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue