From 344353b0d60a80d9ccab43f6aa3917e2363e33a3 Mon Sep 17 00:00:00 2001 From: saji Date: Tue, 9 May 2023 10:25:01 -0500 Subject: [PATCH] move broker to separate file --- broker.go | 85 ++++++++++++++++++++++++++++++++++ cmd/gotelem/cli/server.go | 97 ++++----------------------------------- 2 files changed, 93 insertions(+), 89 deletions(-) create mode 100644 broker.go diff --git a/broker.go b/broker.go new file mode 100644 index 0000000..a6285e8 --- /dev/null +++ b/broker.go @@ -0,0 +1,85 @@ +package gotelem + +import "fmt" + +type BrokerRequest struct { + Source string // the name of the sender + Msg Frame // the message to send +} +type BrokerClient struct { + Name string // the name of the client + Ch chan Frame // the channel to send frames to this client +} +type Broker struct { + subs map[string]chan Frame + + publishCh chan BrokerRequest + + subsCh chan BrokerClient + unsubCh chan BrokerClient +} + +func NewBroker(bufsize int) *Broker { + b := &Broker{ + subs: make(map[string]chan Frame), + publishCh: make(chan BrokerRequest, 3), + subsCh: make(chan BrokerClient, 3), + unsubCh: make(chan BrokerClient, 3), + } + return b +} + +// Start runs the broker and sends messages to the subscribers (but not the sender) +func (b *Broker) Start() { + + for { + select { + case newClient := <-b.subsCh: + b.subs[newClient.Name] = newClient.Ch + case req := <-b.publishCh: + for name, ch := range b.subs { + if name == req.Source { + continue // don't send to ourselves. + } + // a kinda-inelegant non-blocking push. + // if we can't do it, we just drop it. this should ideally never happen. + select { + case ch <- req.Msg: + default: + fmt.Printf("we dropped a packet to dest %s", name) + } + } + case clientToRemove := <-b.unsubCh: + close(b.subs[clientToRemove.Name]) + delete(b.subs, clientToRemove.Name) + } + } +} + +func (b *Broker) Publish(name string, msg Frame) { + breq := BrokerRequest{ + Source: name, + Msg: msg, + } + b.publishCh <- breq +} + +func (b *Broker) Subscribe(name string) <-chan Frame { + ch := make(chan Frame, 3) + + bc := BrokerClient{ + Name: name, + Ch: ch, + } + b.subsCh <- bc + return ch +} + +func (b *Broker) Unsubscribe(name string) { + bc := BrokerClient{ + Name: name, + } + b.unsubCh <- bc +} + +// TODO: don't use channels for everything to avoid using a mutex diff --git a/cmd/gotelem/cli/server.go b/cmd/gotelem/cli/server.go index ebeed86..4204e38 100644 --- a/cmd/gotelem/cli/server.go +++ b/cmd/gotelem/cli/server.go @@ -5,13 +5,11 @@ import ( "net" "time" - "github.com/kschamplin/gotelem/can" + "github.com/kschamplin/gotelem" "github.com/kschamplin/gotelem/socketcan" "github.com/urfave/cli/v2" ) -const xbeeCategory = "XBee settings" - var serveCmd = &cli.Command{ Name: "serve", Aliases: []string{"server", "s"}, @@ -27,7 +25,7 @@ var serveCmd = &cli.Command{ func serve(useXbee bool) { - broker := NewBroker(3) + broker := gotelem.NewBroker(3) // start the can listener go vcanTest() go canHandler(broker) @@ -47,22 +45,22 @@ func serve(useXbee bool) { } } -func handleCon(conn net.Conn, broker *Broker) { +func handleCon(conn net.Conn, broker *gotelem.Broker) { // reader := msgp.NewReader(conn) conn.Close() } -func xbeeSvc(b *Broker) { +func xbeeSvc(b *gotelem.Broker) { } // this spins up a new can socket on vcan0 and broadcasts a packet every second. for testing. func vcanTest() { sock, _ := socketcan.NewCanSocket("vcan0") - testFrame := &can.Frame{ + testFrame := &gotelem.Frame{ Id: 0x234, - Kind: can.SFF, + Kind: gotelem.CanSFFFrame, Data: []byte{0, 1, 2, 3, 4, 5, 6, 7}, } for { @@ -73,12 +71,12 @@ func vcanTest() { } } -func canHandler(broker *Broker) { +func canHandler(broker *gotelem.Broker) { rxCh := broker.Subscribe("socketcan") sock, _ := socketcan.NewCanSocket("vcan0") // start a simple dispatcher that just relays can frames. - rxCan := make(chan can.Frame) + rxCan := make(chan gotelem.Frame) go func() { for { pkt, _ := sock.Recv() @@ -95,82 +93,3 @@ func canHandler(broker *Broker) { } } } - -type BrokerRequest struct { - Source string // the name of the sender - Msg can.Frame // the message to send -} -type BrokerClient struct { - Name string // the name of the client - Ch chan can.Frame // the channel to send frames to this client -} -type Broker struct { - subs map[string]chan can.Frame - - publishCh chan BrokerRequest - - subsCh chan BrokerClient - unsubCh chan BrokerClient -} - -func NewBroker(bufsize int) *Broker { - b := &Broker{ - subs: make(map[string]chan can.Frame), - publishCh: make(chan BrokerRequest, 3), - subsCh: make(chan BrokerClient, 3), - unsubCh: make(chan BrokerClient, 3), - } - return b -} - -func (b *Broker) Start() { - - for { - select { - case newClient := <-b.subsCh: - b.subs[newClient.Name] = newClient.Ch - case req := <-b.publishCh: - for name, ch := range b.subs { - if name == req.Source { - continue // don't send to ourselves. - } - // a kinda-inelegant non-blocking push. - // if we can't do it, we just drop it. this should ideally never happen. - select { - case ch <- req.Msg: - default: - fmt.Printf("we dropped a packet to dest %s", name) - } - } - case clientToRemove := <-b.unsubCh: - close(b.subs[clientToRemove.Name]) - delete(b.subs, clientToRemove.Name) - } - } -} - -func (b *Broker) Publish(name string, msg can.Frame) { - breq := BrokerRequest{ - Source: name, - Msg: msg, - } - b.publishCh <- breq -} - -func (b *Broker) Subscribe(name string) <-chan can.Frame { - ch := make(chan can.Frame, 3) - - bc := BrokerClient{ - Name: name, - Ch: ch, - } - b.subsCh <- bc - return ch -} - -func (b *Broker) Unsubscribe(name string) { - bc := BrokerClient{ - Name: name, - } - b.unsubCh <- bc -}