From f0db7da230a472bd004fee9f4601c48282a1dafc Mon Sep 17 00:00:00 2001 From: saji <9110284+kschamplin@users.noreply.github.com> Date: Sat, 29 Apr 2023 11:40:44 -0500 Subject: [PATCH] integrate broker into server --- cmd/server.go | 85 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 59 insertions(+), 26 deletions(-) diff --git a/cmd/server.go b/cmd/server.go index 701dbff..11a8e22 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -37,11 +37,11 @@ type session struct { } func serve() { + + broker := NewBroker(3) // start the can listener go vcanTest() - fromCanBus := make(chan can.Frame, 1) - toCanBus := make(chan can.Frame) - go canHandler(toCanBus, fromCanBus) + go canHandler(broker) ln, err := net.Listen("tcp", ":8082") if err != nil { fmt.Printf("Error listening: %v\n", err) @@ -53,21 +53,53 @@ func serve() { if err != nil { fmt.Printf("error accepting: %v\n", err) } - go handleCon(conn) + go handleCon(conn, broker) } } -func handleCon(conn net.Conn) { +func handleCon(conn net.Conn, broker *Broker) { // reader := msgp.NewReader(conn) + rxPkts := make(chan gotelem.Data) + go func() { + // setpu our msgp reader. + scann := msgp.NewReader(conn) + data := gotelem.Data{} + for { + err := data.DecodeMsg(scann) + if err != nil { + break + } + rxPkts <- data + } + + }() + + // subscribe to can packets + // TODO: make this unique since remote addr could be non-unique + canCh := broker.Subscribe(conn.RemoteAddr().String()) writer := msgp.NewWriter(conn) for { - data := gotelem.StatusBody{ - BatteryPct: 1.2, - ErrCode: 0, + + select { + case canFrame := <-canCh: + cf := gotelem.CanBody{ + Id: canFrame.Id, + Payload: canFrame.Data, + Source: "me", + } + cf.EncodeMsg(writer) + case rxBody := <-rxPkts: + // do nothing for now. + fmt.Printf("got a body %v\n", rxBody) + case <-time.NewTimer(1 * time.Second).C: // time out. + fmt.Printf("timeout\n") + data := gotelem.StatusBody{ + BatteryPct: 1.2, + ErrCode: 0, + } + data.EncodeMsg(writer) + writer.Flush() } - data.EncodeMsg(writer) - writer.Flush() - time.Sleep(1 * time.Second) } } @@ -102,7 +134,7 @@ func xbeeSvc(packets <-chan can.Frame, device string, quit <-chan struct{}) { } } -// this spins up a new can socket on vcan0 and broadcasts a packet every second. +// this spins up a new can socket on vcan0 and broadcasts a packet every second. for testing. func vcanTest() { sock, _ := socketcan.NewCanSocket("vcan0") testFrame := &can.Frame{ @@ -118,34 +150,35 @@ func vcanTest() { } } -func canHandler(pktToSend <-chan can.Frame, pktRecv chan<- can.Frame) { +func canHandler(broker *Broker) { + rxCh := broker.Subscribe("socketcan") sock, _ := socketcan.NewCanSocket("vcan0") // start a simple dispatcher that just relays can frames. - r := make(chan can.Frame) + rxCan := make(chan can.Frame) go func() { for { pkt, _ := sock.Recv() - r <- *pkt + rxCan <- *pkt } }() for { select { - case msg := <-pktToSend: + case msg := <-rxCh: sock.Send(&msg) - case msg := <-r: + case msg := <-rxCan: fmt.Printf("got a packet from the can %v\n", msg) - pktRecv <- msg + broker.Publish("socketcan", msg) } } } type BrokerRequest struct { - Source string // the name of the sender + Source string // the name of the sender Msg can.Frame // the message to send } type BrokerClient struct { - Name string // the name of the client + Name string // the name of the client Ch chan can.Frame // the channel to send frames to this client } type Broker struct { @@ -157,15 +190,15 @@ type Broker struct { unsubCh chan BrokerClient } -func NewBroker(bufsize int) *Broker{ +func NewBroker(bufsize int) *Broker { b := &Broker{ - subs: make(map[string]chan can.Frame), + subs: make(map[string]chan can.Frame), publishCh: make(chan BrokerRequest, 3), - subsCh: make(chan BrokerClient, 3), - unsubCh: make(chan BrokerClient, 3), + subsCh: make(chan BrokerClient, 3), + unsubCh: make(chan BrokerClient, 3), } return b -} +} func (b *Broker) Start() { @@ -206,7 +239,7 @@ func (b *Broker) Subscribe(name string) <-chan can.Frame { bc := BrokerClient{ Name: name, - Ch: ch, + Ch: ch, } b.subsCh <- bc return ch