integrate broker into server

This commit is contained in:
saji 2023-04-29 11:40:44 -05:00
parent 5d42f8693a
commit f0db7da230

View file

@ -37,11 +37,11 @@ type session struct {
}
func serve() {
broker := NewBroker(3)
// start the can listener
go vcanTest()
fromCanBus := make(chan can.Frame, 1)
toCanBus := make(chan can.Frame)
go canHandler(toCanBus, fromCanBus)
go canHandler(broker)
ln, err := net.Listen("tcp", ":8082")
if err != nil {
fmt.Printf("Error listening: %v\n", err)
@ -53,21 +53,53 @@ func serve() {
if err != nil {
fmt.Printf("error accepting: %v\n", err)
}
go handleCon(conn)
go handleCon(conn, broker)
}
}
func handleCon(conn net.Conn) {
func handleCon(conn net.Conn, broker *Broker) {
// reader := msgp.NewReader(conn)
rxPkts := make(chan gotelem.Data)
go func() {
// setpu our msgp reader.
scann := msgp.NewReader(conn)
data := gotelem.Data{}
for {
err := data.DecodeMsg(scann)
if err != nil {
break
}
rxPkts <- data
}
}()
// subscribe to can packets
// TODO: make this unique since remote addr could be non-unique
canCh := broker.Subscribe(conn.RemoteAddr().String())
writer := msgp.NewWriter(conn)
for {
data := gotelem.StatusBody{
BatteryPct: 1.2,
ErrCode: 0,
select {
case canFrame := <-canCh:
cf := gotelem.CanBody{
Id: canFrame.Id,
Payload: canFrame.Data,
Source: "me",
}
cf.EncodeMsg(writer)
case rxBody := <-rxPkts:
// do nothing for now.
fmt.Printf("got a body %v\n", rxBody)
case <-time.NewTimer(1 * time.Second).C: // time out.
fmt.Printf("timeout\n")
data := gotelem.StatusBody{
BatteryPct: 1.2,
ErrCode: 0,
}
data.EncodeMsg(writer)
writer.Flush()
}
data.EncodeMsg(writer)
writer.Flush()
time.Sleep(1 * time.Second)
}
}
@ -102,7 +134,7 @@ func xbeeSvc(packets <-chan can.Frame, device string, quit <-chan struct{}) {
}
}
// this spins up a new can socket on vcan0 and broadcasts a packet every second.
// this spins up a new can socket on vcan0 and broadcasts a packet every second. for testing.
func vcanTest() {
sock, _ := socketcan.NewCanSocket("vcan0")
testFrame := &can.Frame{
@ -118,34 +150,35 @@ func vcanTest() {
}
}
func canHandler(pktToSend <-chan can.Frame, pktRecv chan<- can.Frame) {
func canHandler(broker *Broker) {
rxCh := broker.Subscribe("socketcan")
sock, _ := socketcan.NewCanSocket("vcan0")
// start a simple dispatcher that just relays can frames.
r := make(chan can.Frame)
rxCan := make(chan can.Frame)
go func() {
for {
pkt, _ := sock.Recv()
r <- *pkt
rxCan <- *pkt
}
}()
for {
select {
case msg := <-pktToSend:
case msg := <-rxCh:
sock.Send(&msg)
case msg := <-r:
case msg := <-rxCan:
fmt.Printf("got a packet from the can %v\n", msg)
pktRecv <- msg
broker.Publish("socketcan", msg)
}
}
}
type BrokerRequest struct {
Source string // the name of the sender
Source string // the name of the sender
Msg can.Frame // the message to send
}
type BrokerClient struct {
Name string // the name of the client
Name string // the name of the client
Ch chan can.Frame // the channel to send frames to this client
}
type Broker struct {
@ -157,15 +190,15 @@ type Broker struct {
unsubCh chan BrokerClient
}
func NewBroker(bufsize int) *Broker{
func NewBroker(bufsize int) *Broker {
b := &Broker{
subs: make(map[string]chan can.Frame),
subs: make(map[string]chan can.Frame),
publishCh: make(chan BrokerRequest, 3),
subsCh: make(chan BrokerClient, 3),
unsubCh: make(chan BrokerClient, 3),
subsCh: make(chan BrokerClient, 3),
unsubCh: make(chan BrokerClient, 3),
}
return b
}
}
func (b *Broker) Start() {
@ -206,7 +239,7 @@ func (b *Broker) Subscribe(name string) <-chan can.Frame {
bc := BrokerClient{
Name: name,
Ch: ch,
Ch: ch,
}
b.subsCh <- bc
return ch