add xbee session command line tools

This commit is contained in:
saji 2023-05-07 00:00:35 -05:00
parent 86c9d8d492
commit b1bebae325
4 changed files with 109 additions and 111 deletions

View file

@ -3,14 +3,15 @@ package cmd
import ( import (
"fmt" "fmt"
"net" "net"
"os"
"time" "time"
"github.com/kschamplin/gotelem/can" "github.com/kschamplin/gotelem/can"
"github.com/kschamplin/gotelem/socketcan" "github.com/kschamplin/gotelem/socketcan"
"github.com/kschamplin/gotelem/xbee" "github.com/kschamplin/gotelem/xbee"
"github.com/tinylib/msgp/msgp"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"go.bug.st/serial" "go.bug.st/serial"
"golang.org/x/exp/slog"
) )
const xbeeCategory = "XBee settings" const xbeeCategory = "XBee settings"
@ -35,9 +36,6 @@ func serve(useXbee bool) {
go vcanTest() go vcanTest()
go canHandler(broker) go canHandler(broker)
go broker.Start() go broker.Start()
if useXbee {
go xbeeSvc()
}
ln, err := net.Listen("tcp", ":8082") ln, err := net.Listen("tcp", ":8082")
if err != nil { if err != nil {
fmt.Printf("Error listening: %v\n", err) fmt.Printf("Error listening: %v\n", err)
@ -55,48 +53,7 @@ func serve(useXbee bool) {
func handleCon(conn net.Conn, broker *Broker) { func handleCon(conn net.Conn, broker *Broker) {
// reader := msgp.NewReader(conn) // reader := msgp.NewReader(conn)
rxPkts := make(chan gotelem.Data)
done := make(chan bool)
go func() {
// setpu our msgp reader.
scann := msgp.NewReader(conn)
data := gotelem.Data{}
for {
err := data.DecodeMsg(scann)
if err != nil {
break
}
rxPkts <- data
}
done <- true // if we got here, it means the connction was closed.
}()
// subscribe to can packets
// TODO: make this unique since remote addr could be non-unique
canCh := broker.Subscribe(conn.RemoteAddr().String())
writer := msgp.NewWriter(conn)
mainloop:
for {
select {
case canFrame := <-canCh:
cf := gotelem.CanBody{
Id: canFrame.Id,
Payload: canFrame.Data,
Source: "me",
}
cf.EncodeMsg(writer)
case rxBody := <-rxPkts:
// do nothing for now.
fmt.Printf("got a body %v\n", rxBody)
case <-time.After(1 * time.Second): // time out.
writer.Flush()
case <-done:
break mainloop
}
}
// unsubscribe and close the conn.
broker.Unsubscribe(conn.RemoteAddr().String())
conn.Close() conn.Close()
} }
@ -107,27 +64,12 @@ func xbeeSvc(b *Broker) {
BaudRate: 115200, BaudRate: 115200,
} }
sess, err := xbee.NewSerialXBee("/dev/ttyACM0", mode) logger := slog.New(slog.NewTextHandler(os.Stderr))
_, err := xbee.NewSerialXBee("/dev/ttyACM0", mode, logger)
if err != nil { if err != nil {
fmt.Printf("got error %v", err) fmt.Printf("got error %v", err)
panic(err) panic(err)
} }
receivedData := make(chan gotelem.Data)
// make a simple reader that goes in the background.
go func() {
msgpReader := msgp.NewReader(sess)
var msgData gotelem.Data
msgData.DecodeMsg(msgpReader)
receivedData <- msgData
}()
for {
select {
case data := <-receivedData:
fmt.Printf("Got a data %v\n", data)
}
}
} }
// this spins up a new can socket on vcan0 and broadcasts a packet every second. for testing. // this spins up a new can socket on vcan0 and broadcasts a packet every second. for testing.

View file

@ -4,7 +4,14 @@ package cmd
// we can do network discovery and netcat-like things. // we can do network discovery and netcat-like things.
import ( import (
"fmt"
"io"
"os"
"github.com/kschamplin/gotelem/xbee"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"go.bug.st/serial"
"golang.org/x/exp/slog"
) )
var xbeeCmd = &cli.Command{ var xbeeCmd = &cli.Command{
@ -21,25 +28,85 @@ For serial devices (COM1 and /dev/ttyUSB0), you can specify the baud rate
using a ':'. If excluded the baud rate will default to 9600. Note that using a ':'. If excluded the baud rate will default to 9600. Note that
if using the native USB of the XLR Pro, the baud rate setting has no effect. if using the native USB of the XLR Pro, the baud rate setting has no effect.
TCP/UDP connections require a port. TCP/UDP connections require a port and will fail if one is not provided.
`, `,
Flags: []cli.Flag{ Flags: []cli.Flag{
&cli.StringFlag{ &cli.StringFlag{
Name: "device", Name: "device",
Aliases: []string{"d"}, Aliases: []string{"d"},
Usage: "The XBee to connect to", Usage: "The XBee to connect to",
Required: true,
EnvVars: []string{"XBEE_DEVICE"},
}, },
}, },
Subcommands: []*cli.Command{ Subcommands: []*cli.Command{
{ {
Name: "info", Name: "info",
Usage: "get information about an xbee device", Usage: "get information about an xbee device",
Action: xbeeInfo,
HideHelpCommand: true,
}, },
{ {
Name: "netcat", Name: "netcat",
Aliases: []string{"nc"}, Aliases: []string{"nc"},
ArgsUsage: "[addr]", ArgsUsage: "[addr]",
Usage: "send data from stdio over the xbee", Usage: "send data from stdio over the xbee",
Description: `
netcat emulates the nc command. It reads data from stdin and transmits it to
[addr] on the XBee network. If [addr] is FFFF or not present, it will broadcast
the data to all listening devices. Data received from the network will be
writtend to stdout.
`,
Action: netcat,
HideHelpCommand: true,
}, },
}, },
} }
func xbeeInfo(ctx *cli.Context) error {
logger := slog.New(slog.NewTextHandler(os.Stderr))
xb, err := xbee.NewSerialXBee("/dev/ttyACM0", &serial.Mode{}, logger)
if err != nil {
return cli.Exit(err, 1)
}
b, err := xb.ATCommand([2]rune{'I', 'D'}, nil, false)
if err != nil {
return cli.Exit(err, 1)
}
fmt.Println(b)
return nil
}
func netcat(ctx *cli.Context) error {
if ctx.Args().Len() < 1 {
cli.ShowSubcommandHelp(ctx)
return cli.Exit("missing [addr] argument", 1)
}
// basically create two pipes.
logger := slog.New(slog.NewTextHandler(os.Stderr))
xb, _ := xbee.NewSerialXBee("/dev/ttyACM0", &serial.Mode{}, logger)
sent := make(chan int64)
streamCopy := func(r io.ReadCloser, w io.WriteCloser) {
defer r.Close()
defer w.Close()
n, err := io.Copy(w, r)
if err != nil {
logger.Warn("got error copying", "err", err)
}
sent <- n
}
go streamCopy(os.Stdin, xb)
go streamCopy(xb, os.Stdout)
<-sent
return nil
}

View file

@ -16,6 +16,12 @@ type connTrack struct {
// the map is set when writing a frame, and deleted when recieving a matching frame. // the map is set when writing a frame, and deleted when recieving a matching frame.
} }
func NewConnTrack() *connTrack {
return &connTrack{
internal: make(map[uint8]chan []byte),
}
}
// GetMark finds the next available marker and takes it, returning the value of // GetMark finds the next available marker and takes it, returning the value of
// the mark as well as a channel to use as a semaphore when the mark is cleared. // the mark as well as a channel to use as a semaphore when the mark is cleared.
// If no mark can be acquired, it returns an error. // If no mark can be acquired, it returns an error.
@ -49,8 +55,13 @@ func (ct *connTrack) GetMark() (uint8, <-chan []byte, error) {
// tracked. // tracked.
func (ct *connTrack) ClearMark(mark uint8, data []byte) error { func (ct *connTrack) ClearMark(mark uint8, data []byte) error {
ct.mu.RLock() ct.mu.RLock()
// FIXME: should this be the other way around (swap if and normal execution
if val, ok := ct.internal[mark]; ok { val, ok := ct.internal[mark]
if !ok {
ct.mu.RUnlock()
return errors.New("mark was not set")
}
ct.mu.RUnlock() ct.mu.RUnlock()
ct.mu.Lock() ct.mu.Lock()
val <- data val <- data
@ -58,7 +69,4 @@ func (ct *connTrack) ClearMark(mark uint8, data []byte) error {
delete(ct.internal, mark) delete(ct.internal, mark)
ct.mu.Unlock() ct.mu.Unlock()
return nil return nil
}
ct.mu.RUnlock()
return errors.New("mark was not set")
} }

View file

@ -38,7 +38,7 @@ type SerialSession struct {
writeLock sync.Mutex // prevents multiple writers from accessing the port at once. writeLock sync.Mutex // prevents multiple writers from accessing the port at once.
} }
func NewSerialXBee(portName string, mode *serial.Mode) (*SerialSession, error) { func NewSerialXBee(portName string, mode *serial.Mode, baseLog *slog.Logger) (*SerialSession, error) {
// make the session with the port/mode given, and set up the conntrack. // make the session with the port/mode given, and set up the conntrack.
sess := &SerialSession{} sess := &SerialSession{}
@ -48,7 +48,8 @@ func NewSerialXBee(portName string, mode *serial.Mode) (*SerialSession, error) {
} }
sess.port = port sess.port = port
sess.ct = connTrack{} sess.ct = *NewConnTrack()
sess.Logger = *baseLog.With("portname", portName)
// setup io readwriter with a pipe. // setup io readwriter with a pipe.
rd, wr := io.Pipe() rd, wr := io.Pipe()
@ -76,19 +77,16 @@ func (sess *SerialSession) rxHandler() {
scan.Split(xbeeFrameSplit) scan.Split(xbeeFrameSplit)
for scan.Scan() { for scan.Scan() {
// TODO: check for errors?
// data is a frame payload - not a full frame.
data, err := parseFrame(scan.Bytes()) data, err := parseFrame(scan.Bytes())
if err != nil { if err != nil {
sess.Logger.Warn("error parsing frame", "error", err, "data", data) sess.Logger.Warn("error parsing frame", "error", err, "data", data)
continue continue
} }
// data is good, lets parse the frame - using the first byte as the identifier.
switch XBeeCmd(data[0]) { switch XBeeCmd(data[0]) {
case RxPktType: case RxPktType:
// we parse the data, and push it to the rx buffer. // we parse the data, and push it to the rx buffer.
//TODO: if we have multiple sources, we need to track them here. //TODO: if we have multiple remotes on the network, we need to track them here.
frame, err := ParseRxFrame(data) frame, err := ParseRxFrame(data)
if err != nil { if err != nil {
sess.Logger.Warn("error parsing rx packet", "error", err, "data", data) sess.Logger.Warn("error parsing rx packet", "error", err, "data", data)
@ -100,10 +98,8 @@ func (sess *SerialSession) rxHandler() {
sess.Logger.Warn("error writing data", "error", err, "payload", frame.Payload) sess.Logger.Warn("error writing data", "error", err, "payload", frame.Payload)
} }
// the "callback"-style handler. Any received packet with a frame ID should case TxStatusType, ATCmdResponseType, RemoteCmdRespType:
// be handled here. // we hand the frame back via the channel. we directly find the ID since it's always
case TxStatusType, ATCmdResponseType, RemoteCmdRespType: // these take the frame bytes and parse it themselves.
// we hand it back via the channel. we directly find the ID since it's always
// the second byte. // the second byte.
idx := data[1] idx := data[1]
@ -130,12 +126,13 @@ func (sess *SerialSession) Read(p []byte) (int, error) {
} }
func (sess *SerialSession) Write(p []byte) (n int, err error) { func (sess *SerialSession) Write(p []byte) (n int, err error) {
// we construct a packet - using the conntrack to ensure that the packet is okay. sess.Warn("hello")
// we block - this is more correct.
idx, ch, err := sess.ct.GetMark() idx, ch, err := sess.ct.GetMark()
if err != nil { if err != nil {
return return
} }
n = len(p)
wf := &TxFrame{ wf := &TxFrame{
Id: idx, Id: idx,
Destination: BroadcastAddr, Destination: BroadcastAddr,
@ -144,7 +141,7 @@ func (sess *SerialSession) Write(p []byte) (n int, err error) {
// write the actual packet // write the actual packet
sess.writeLock.Lock() sess.writeLock.Lock()
n, err = writeXBeeFrame(sess.port, wf.Bytes()) _, err = writeXBeeFrame(sess.port, wf.Bytes())
sess.writeLock.Unlock() sess.writeLock.Unlock()
if err != nil { if err != nil {
return return
@ -180,7 +177,6 @@ func (sess *SerialSession) ATCommand(cmd [2]rune, data []byte, queued bool) ([]b
// get a mark for the frame // get a mark for the frame
isQuery := len(data) > 0
idx, ch, err := sess.ct.GetMark() idx, ch, err := sess.ct.GetMark()
if err != nil { if err != nil {
return nil, err return nil, err
@ -195,12 +191,8 @@ func (sess *SerialSession) ATCommand(cmd [2]rune, data []byte, queued bool) ([]b
return nil, fmt.Errorf("error writing xbee frame: %w", err) return nil, fmt.Errorf("error writing xbee frame: %w", err)
} }
// we use the AT command that was provided to decode the frame.
// Parse stores the response result locally.
// we parse the base frame ourselves, and if it's okay we pass it
// to the provided ATCommand
// TODO: add timeout. // TODO: add timeout.
resp, err := ParseATCmdResponse(<-ch) resp, err := ParseATCmdResponse(<-ch)
if err != nil { if err != nil {
return nil, err return nil, err
@ -208,20 +200,10 @@ func (sess *SerialSession) ATCommand(cmd [2]rune, data []byte, queued bool) ([]b
if resp.Status != 0 { if resp.Status != 0 {
// sinec ATCmdStatus is a stringer thanks to the generator // sinec ATCmdStatus is a stringer thanks to the generator
return nil, fmt.Errorf("AT command failed: %v", resp.Status) return resp.Data, fmt.Errorf("AT command failed: %v", resp.Status)
} }
// finally, we use the provided ATCmd interface to unpack the data.
// this overwrites the values provided, but this should only happen
// during a query, so this is fine.
// TODO: skip if not a query command?
if isQuery {
return resp.Data, nil return resp.Data, nil
}
// it's not a query, and there was no error, so we just plain return
return nil, nil
} }
@ -235,6 +217,5 @@ func (sess *SerialSession) Close() error {
return sess.port.Close() return sess.port.Close()
} }
func (sess *SerialSession) DiscoverNodes() { // next, we define AT commands. These are functions that take in a Session and
panic("TODO: implement") // provide wrappers around AT command information, like type checking.
}