gotelem/cmd/server.go

251 lines
5.1 KiB
Go
Raw Normal View History

2023-04-14 20:42:16 +00:00
package cmd
import (
2023-04-20 20:26:29 +00:00
"fmt"
"net"
"time"
2023-04-29 15:58:56 +00:00
"github.com/kschamplin/gotelem/internal/can"
2023-04-20 20:26:29 +00:00
"github.com/kschamplin/gotelem/internal/gotelem"
2023-04-29 15:58:56 +00:00
"github.com/kschamplin/gotelem/internal/socketcan"
"github.com/kschamplin/gotelem/internal/xbee"
2023-04-20 20:26:29 +00:00
"github.com/tinylib/msgp/msgp"
2023-04-14 20:42:16 +00:00
"github.com/urfave/cli/v2"
2023-04-29 15:58:56 +00:00
"go.bug.st/serial"
2023-04-14 20:42:16 +00:00
)
2023-04-20 20:26:29 +00:00
const xbeeCategory = "XBee settings"
2023-04-14 20:42:16 +00:00
var serveCmd = &cli.Command{
Name: "serve",
Aliases: []string{"server", "s"},
Usage: "Start a telemetry server",
2023-04-20 20:26:29 +00:00
Flags: []cli.Flag{
&cli.BoolFlag{Name: "xbee", Aliases: []string{"x"}, Usage: "Find and connect to an XBee"},
},
Action: func(ctx *cli.Context) error {
2023-05-01 14:49:47 +00:00
serve(ctx.Bool("xbee"))
2023-04-20 20:26:29 +00:00
return nil
},
}
2023-05-01 14:49:47 +00:00
func serve(useXbee bool) {
2023-04-29 16:40:44 +00:00
broker := NewBroker(3)
2023-04-29 15:58:56 +00:00
// start the can listener
go vcanTest()
2023-04-29 16:40:44 +00:00
go canHandler(broker)
2023-04-29 22:34:35 +00:00
go broker.Start()
2023-05-01 14:49:47 +00:00
if useXbee {
go xbeeSvc()
}
2023-04-20 20:26:29 +00:00
ln, err := net.Listen("tcp", ":8082")
if err != nil {
fmt.Printf("Error listening: %v\n", err)
}
fmt.Printf("Listening on :8082\n")
2023-04-29 15:58:56 +00:00
2023-04-20 20:26:29 +00:00
for {
conn, err := ln.Accept()
if err != nil {
fmt.Printf("error accepting: %v\n", err)
}
2023-04-29 16:40:44 +00:00
go handleCon(conn, broker)
2023-04-20 20:26:29 +00:00
}
}
2023-04-29 16:40:44 +00:00
func handleCon(conn net.Conn, broker *Broker) {
2023-04-20 20:26:29 +00:00
// reader := msgp.NewReader(conn)
2023-04-29 16:40:44 +00:00
rxPkts := make(chan gotelem.Data)
2023-05-01 14:49:47 +00:00
done := make(chan bool)
2023-04-29 16:40:44 +00:00
go func() {
// setpu our msgp reader.
scann := msgp.NewReader(conn)
data := gotelem.Data{}
for {
err := data.DecodeMsg(scann)
if err != nil {
break
}
rxPkts <- data
}
2023-05-01 14:49:47 +00:00
done <- true // if we got here, it means the connction was closed.
2023-04-29 16:40:44 +00:00
}()
// subscribe to can packets
// TODO: make this unique since remote addr could be non-unique
canCh := broker.Subscribe(conn.RemoteAddr().String())
2023-04-20 20:26:29 +00:00
writer := msgp.NewWriter(conn)
2023-05-01 14:49:47 +00:00
mainloop:
2023-04-20 20:26:29 +00:00
for {
2023-04-29 16:40:44 +00:00
select {
case canFrame := <-canCh:
cf := gotelem.CanBody{
Id: canFrame.Id,
Payload: canFrame.Data,
Source: "me",
}
cf.EncodeMsg(writer)
case rxBody := <-rxPkts:
// do nothing for now.
fmt.Printf("got a body %v\n", rxBody)
2023-04-29 22:34:35 +00:00
case <-time.After(1 * time.Second): // time out.
2023-04-29 16:40:44 +00:00
writer.Flush()
2023-05-01 14:49:47 +00:00
case <-done:
break mainloop
2023-04-20 20:26:29 +00:00
}
}
2023-05-01 14:49:47 +00:00
// unsubscribe and close the conn.
broker.Unsubscribe(conn.RemoteAddr().String())
conn.Close()
2023-04-14 20:42:16 +00:00
}
2023-04-29 15:58:56 +00:00
2023-05-01 14:49:47 +00:00
func xbeeSvc(b *Broker) {
2023-04-29 15:58:56 +00:00
// open the session.
mode := &serial.Mode{
BaudRate: 115200,
}
2023-05-01 14:49:47 +00:00
sess, err := xbee.NewSerialXBee("/dev/ttyACM0", mode)
2023-04-29 15:58:56 +00:00
if err != nil {
fmt.Printf("got error %v", err)
panic(err)
}
receivedData := make(chan gotelem.Data)
// make a simple reader that goes in the background.
go func() {
msgpReader := msgp.NewReader(sess)
var msgData gotelem.Data
msgData.DecodeMsg(msgpReader)
receivedData <- msgData
}()
for {
select {
case data := <-receivedData:
fmt.Printf("Got a data %v\n", data)
}
}
}
2023-04-29 16:40:44 +00:00
// this spins up a new can socket on vcan0 and broadcasts a packet every second. for testing.
2023-04-29 15:58:56 +00:00
func vcanTest() {
sock, _ := socketcan.NewCanSocket("vcan0")
testFrame := &can.Frame{
Id: 0x234,
Kind: can.SFF,
Data: []byte{0, 1, 2, 3, 4, 5, 6, 7},
}
for {
fmt.Printf("sending test packet\n")
sock.Send(testFrame)
time.Sleep(1 * time.Second)
}
}
2023-04-29 16:40:44 +00:00
func canHandler(broker *Broker) {
rxCh := broker.Subscribe("socketcan")
2023-04-29 15:58:56 +00:00
sock, _ := socketcan.NewCanSocket("vcan0")
// start a simple dispatcher that just relays can frames.
2023-04-29 16:40:44 +00:00
rxCan := make(chan can.Frame)
2023-04-29 15:58:56 +00:00
go func() {
for {
pkt, _ := sock.Recv()
2023-04-29 16:40:44 +00:00
rxCan <- *pkt
2023-04-29 15:58:56 +00:00
}
}()
for {
select {
2023-04-29 16:40:44 +00:00
case msg := <-rxCh:
2023-04-29 15:58:56 +00:00
sock.Send(&msg)
2023-04-29 16:40:44 +00:00
case msg := <-rxCan:
2023-04-29 15:58:56 +00:00
fmt.Printf("got a packet from the can %v\n", msg)
2023-04-29 16:40:44 +00:00
broker.Publish("socketcan", msg)
2023-04-29 15:58:56 +00:00
}
}
}
type BrokerRequest struct {
2023-04-29 16:40:44 +00:00
Source string // the name of the sender
2023-04-29 15:58:56 +00:00
Msg can.Frame // the message to send
}
type BrokerClient struct {
2023-04-29 16:40:44 +00:00
Name string // the name of the client
2023-04-29 15:58:56 +00:00
Ch chan can.Frame // the channel to send frames to this client
}
type Broker struct {
subs map[string]chan can.Frame
publishCh chan BrokerRequest
subsCh chan BrokerClient
unsubCh chan BrokerClient
}
2023-04-29 16:40:44 +00:00
func NewBroker(bufsize int) *Broker {
2023-04-29 15:58:56 +00:00
b := &Broker{
2023-04-29 16:40:44 +00:00
subs: make(map[string]chan can.Frame),
2023-04-29 15:58:56 +00:00
publishCh: make(chan BrokerRequest, 3),
2023-04-29 16:40:44 +00:00
subsCh: make(chan BrokerClient, 3),
unsubCh: make(chan BrokerClient, 3),
2023-04-29 15:58:56 +00:00
}
return b
2023-04-29 16:40:44 +00:00
}
2023-04-29 15:58:56 +00:00
func (b *Broker) Start() {
for {
select {
case newClient := <-b.subsCh:
b.subs[newClient.Name] = newClient.Ch
case req := <-b.publishCh:
for name, ch := range b.subs {
if name == req.Source {
continue // don't send to ourselves.
}
// a kinda-inelegant non-blocking push.
// if we can't do it, we just drop it. this should ideally never happen.
select {
case ch <- req.Msg:
default:
fmt.Printf("we dropped a packet to dest %s", name)
}
}
case clientToRemove := <-b.unsubCh:
close(b.subs[clientToRemove.Name])
delete(b.subs, clientToRemove.Name)
}
}
}
func (b *Broker) Publish(name string, msg can.Frame) {
breq := BrokerRequest{
Source: name,
Msg: msg,
}
b.publishCh <- breq
}
func (b *Broker) Subscribe(name string) <-chan can.Frame {
ch := make(chan can.Frame, 3)
bc := BrokerClient{
Name: name,
2023-04-29 16:40:44 +00:00
Ch: ch,
2023-04-29 15:58:56 +00:00
}
b.subsCh <- bc
return ch
}
2023-05-01 14:49:47 +00:00
func (b *Broker) Unsubscribe(name string) {
bc := BrokerClient{
Name: name,
}
b.unsubCh <- bc
}