move broker to separate file

This commit is contained in:
saji 2023-05-09 10:25:01 -05:00
parent 2b7426903a
commit 344353b0d6
2 changed files with 93 additions and 89 deletions

85
broker.go Normal file
View file

@ -0,0 +1,85 @@
package gotelem
import "fmt"
type BrokerRequest struct {
Source string // the name of the sender
Msg Frame // the message to send
}
type BrokerClient struct {
Name string // the name of the client
Ch chan Frame // the channel to send frames to this client
}
type Broker struct {
subs map[string]chan Frame
publishCh chan BrokerRequest
subsCh chan BrokerClient
unsubCh chan BrokerClient
}
func NewBroker(bufsize int) *Broker {
b := &Broker{
subs: make(map[string]chan Frame),
publishCh: make(chan BrokerRequest, 3),
subsCh: make(chan BrokerClient, 3),
unsubCh: make(chan BrokerClient, 3),
}
return b
}
// Start runs the broker and sends messages to the subscribers (but not the sender)
func (b *Broker) Start() {
for {
select {
case newClient := <-b.subsCh:
b.subs[newClient.Name] = newClient.Ch
case req := <-b.publishCh:
for name, ch := range b.subs {
if name == req.Source {
continue // don't send to ourselves.
}
// a kinda-inelegant non-blocking push.
// if we can't do it, we just drop it. this should ideally never happen.
select {
case ch <- req.Msg:
default:
fmt.Printf("we dropped a packet to dest %s", name)
}
}
case clientToRemove := <-b.unsubCh:
close(b.subs[clientToRemove.Name])
delete(b.subs, clientToRemove.Name)
}
}
}
func (b *Broker) Publish(name string, msg Frame) {
breq := BrokerRequest{
Source: name,
Msg: msg,
}
b.publishCh <- breq
}
func (b *Broker) Subscribe(name string) <-chan Frame {
ch := make(chan Frame, 3)
bc := BrokerClient{
Name: name,
Ch: ch,
}
b.subsCh <- bc
return ch
}
func (b *Broker) Unsubscribe(name string) {
bc := BrokerClient{
Name: name,
}
b.unsubCh <- bc
}
// TODO: don't use channels for everything to avoid using a mutex

View file

@ -5,13 +5,11 @@ import (
"net" "net"
"time" "time"
"github.com/kschamplin/gotelem/can" "github.com/kschamplin/gotelem"
"github.com/kschamplin/gotelem/socketcan" "github.com/kschamplin/gotelem/socketcan"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
) )
const xbeeCategory = "XBee settings"
var serveCmd = &cli.Command{ var serveCmd = &cli.Command{
Name: "serve", Name: "serve",
Aliases: []string{"server", "s"}, Aliases: []string{"server", "s"},
@ -27,7 +25,7 @@ var serveCmd = &cli.Command{
func serve(useXbee bool) { func serve(useXbee bool) {
broker := NewBroker(3) broker := gotelem.NewBroker(3)
// start the can listener // start the can listener
go vcanTest() go vcanTest()
go canHandler(broker) go canHandler(broker)
@ -47,22 +45,22 @@ func serve(useXbee bool) {
} }
} }
func handleCon(conn net.Conn, broker *Broker) { func handleCon(conn net.Conn, broker *gotelem.Broker) {
// reader := msgp.NewReader(conn) // reader := msgp.NewReader(conn)
conn.Close() conn.Close()
} }
func xbeeSvc(b *Broker) { func xbeeSvc(b *gotelem.Broker) {
} }
// this spins up a new can socket on vcan0 and broadcasts a packet every second. for testing. // this spins up a new can socket on vcan0 and broadcasts a packet every second. for testing.
func vcanTest() { func vcanTest() {
sock, _ := socketcan.NewCanSocket("vcan0") sock, _ := socketcan.NewCanSocket("vcan0")
testFrame := &can.Frame{ testFrame := &gotelem.Frame{
Id: 0x234, Id: 0x234,
Kind: can.SFF, Kind: gotelem.CanSFFFrame,
Data: []byte{0, 1, 2, 3, 4, 5, 6, 7}, Data: []byte{0, 1, 2, 3, 4, 5, 6, 7},
} }
for { for {
@ -73,12 +71,12 @@ func vcanTest() {
} }
} }
func canHandler(broker *Broker) { func canHandler(broker *gotelem.Broker) {
rxCh := broker.Subscribe("socketcan") rxCh := broker.Subscribe("socketcan")
sock, _ := socketcan.NewCanSocket("vcan0") sock, _ := socketcan.NewCanSocket("vcan0")
// start a simple dispatcher that just relays can frames. // start a simple dispatcher that just relays can frames.
rxCan := make(chan can.Frame) rxCan := make(chan gotelem.Frame)
go func() { go func() {
for { for {
pkt, _ := sock.Recv() pkt, _ := sock.Recv()
@ -95,82 +93,3 @@ func canHandler(broker *Broker) {
} }
} }
} }
type BrokerRequest struct {
Source string // the name of the sender
Msg can.Frame // the message to send
}
type BrokerClient struct {
Name string // the name of the client
Ch chan can.Frame // the channel to send frames to this client
}
type Broker struct {
subs map[string]chan can.Frame
publishCh chan BrokerRequest
subsCh chan BrokerClient
unsubCh chan BrokerClient
}
func NewBroker(bufsize int) *Broker {
b := &Broker{
subs: make(map[string]chan can.Frame),
publishCh: make(chan BrokerRequest, 3),
subsCh: make(chan BrokerClient, 3),
unsubCh: make(chan BrokerClient, 3),
}
return b
}
func (b *Broker) Start() {
for {
select {
case newClient := <-b.subsCh:
b.subs[newClient.Name] = newClient.Ch
case req := <-b.publishCh:
for name, ch := range b.subs {
if name == req.Source {
continue // don't send to ourselves.
}
// a kinda-inelegant non-blocking push.
// if we can't do it, we just drop it. this should ideally never happen.
select {
case ch <- req.Msg:
default:
fmt.Printf("we dropped a packet to dest %s", name)
}
}
case clientToRemove := <-b.unsubCh:
close(b.subs[clientToRemove.Name])
delete(b.subs, clientToRemove.Name)
}
}
}
func (b *Broker) Publish(name string, msg can.Frame) {
breq := BrokerRequest{
Source: name,
Msg: msg,
}
b.publishCh <- breq
}
func (b *Broker) Subscribe(name string) <-chan can.Frame {
ch := make(chan can.Frame, 3)
bc := BrokerClient{
Name: name,
Ch: ch,
}
b.subsCh <- bc
return ch
}
func (b *Broker) Unsubscribe(name string) {
bc := BrokerClient{
Name: name,
}
b.unsubCh <- bc
}