move cli, session cleanup

This commit is contained in:
saji 2023-05-07 21:00:34 -05:00
parent eb01bc28ba
commit 709b1f0bac
7 changed files with 92 additions and 52 deletions

View file

@ -1,4 +1,4 @@
package cmd package cli
import ( import (
"log" "log"

View file

@ -1,4 +1,4 @@
package cmd package cli
import ( import (
"fmt" "fmt"

View file

@ -1,9 +1,10 @@
package cmd package cli
// this file contains xbee utilities. // this file contains xbee utilities.
// we can do network discovery and netcat-like things. // we can do network discovery and netcat-like things.
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -19,6 +20,13 @@ import (
"golang.org/x/exp/slog" "golang.org/x/exp/slog"
) )
// context key stuff to prevent collisions
type ctxKey int
const (
keyIODevice ctxKey = iota
)
var xbeeCmd = &cli.Command{ var xbeeCmd = &cli.Command{
Name: "xbee", Name: "xbee",
Aliases: []string{"x"}, Aliases: []string{"x"},
@ -45,6 +53,17 @@ TCP/UDP connections require a port and will fail if one is not provided.
EnvVars: []string{"XBEE_DEVICE"}, EnvVars: []string{"XBEE_DEVICE"},
}, },
}, },
// this parses the device string and creates the io device.
// TODO: should we create the session here instead?
Before: func(ctx *cli.Context) error {
transport, err := parseDeviceString(ctx.String("device"))
if err != nil {
return err
}
ctx.Context = context.WithValue(ctx.Context, keyIODevice, transport)
return nil
},
Subcommands: []*cli.Command{ Subcommands: []*cli.Command{
{ {
Name: "info", Name: "info",
@ -72,7 +91,7 @@ writtend to stdout.
func xbeeInfo(ctx *cli.Context) error { func xbeeInfo(ctx *cli.Context) error {
logger := slog.New(slog.NewTextHandler(os.Stderr)) logger := slog.New(slog.NewTextHandler(os.Stderr))
transport, _ := parseDeviceString(ctx.String("device")) transport := ctx.Context.Value(keyIODevice).(xbeeTransport)
xb, err := xbee.NewSession(transport, logger.With("device", transport.Type())) xb, err := xbee.NewSession(transport, logger.With("device", transport.Type()))
if err != nil { if err != nil {
return cli.Exit(err, 1) return cli.Exit(err, 1)
@ -94,10 +113,9 @@ func netcat(ctx *cli.Context) error {
return cli.Exit("missing [addr] argument", 1) return cli.Exit("missing [addr] argument", 1)
} }
// basically create two pipes.
logger := slog.New(slog.NewTextHandler(os.Stderr)) logger := slog.New(slog.NewTextHandler(os.Stderr))
transport, _ := parseDeviceString(ctx.String("device")) transport := ctx.Context.Value(keyIODevice).(xbeeTransport)
xb, _ := xbee.NewSession(transport, logger.With("devtype", transport.Type())) xb, _ := xbee.NewSession(transport, logger.With("devtype", transport.Type()))
sent := make(chan int64) sent := make(chan int64)
@ -114,7 +132,9 @@ func netcat(ctx *cli.Context) error {
go streamCopy(os.Stdin, xb) go streamCopy(os.Stdin, xb)
go streamCopy(xb, os.Stdout) go streamCopy(xb, os.Stdout)
<-sent n := <-sent
fmt.Printf("Sent %d\n", n)
return nil return nil
} }

9
cmd/gotelem/gotelem.go Normal file
View file

@ -0,0 +1,9 @@
package main
import (
"github.com/kschamplin/gotelem/cmd/gotelem/cli"
)
func main() {
cli.Execute()
}

View file

@ -1,10 +1,3 @@
// Package xbee implements xbee API encoding and decoding.
// It encodes and decodes
// API frames from io.Writer and io.Reader by providing a WriteFrame function and
// a scanner.split function. It also includes internal packets for using the API.
// For end-users, it provides a simple net.Conn-like interface that can write
// and read arbitrary bytes (to be used by a higher level protocol)
package xbee package xbee
import ( import (

View file

@ -15,7 +15,6 @@ func TestParseRxFrame(t *testing.T) {
want *RxFrame want *RxFrame
wantErr bool wantErr bool
}{ }{
// TODO: Add test cases.
{ {
name: "64-bit unicast", name: "64-bit unicast",
args: args{ args: args{

View file

@ -1,3 +1,10 @@
/*
Package xbee provides communication and configuration of Digi XBee products
(and other Digi products that are similar such as the XLR Pro). It provides
a net.Conn-like interface as well as AT commands for configuration. The most
common usage of the package is with a Session, which provides
*/
package xbee package xbee
import ( import (
@ -9,39 +16,32 @@ import (
"golang.org/x/exp/slog" "golang.org/x/exp/slog"
) )
// todo: make transport-agnostic (serial port or TCP/IP) // TODO: implement net.Conn for Session/Conn. We are missing LocalAddr, RemoteAddr,
// and Deadline related methods.
// A session is a simple way to manage an xbee device.
// it provides io.Reader and io.Writer, as well as some extra functions to handle
// custom Xbee frames.
// type Session interface {
// io.ReadWriteCloser
// GetStatus() // todo: figure out signature for this
// // Dial takes an address and allows direct communication with that
// // device, without using broadcast.
// Dial(addr uint64) io.ReadWriteCloser
// // AT command related functions - query, set on local, query, set on remote.
// ATCommand(cmd ATCmd, queued bool) (resp ATCmd, err error)
// RemoteATCommand(cmd ATCmd, addr uint64) (resp ATCmd, err error)
// }
// Session represents a connection to a locally-attached XBee. The connection can be through
// serial/USB or TCP/IP depending on what is supported by the device.
type Session struct { type Session struct {
ioDev io.ReadWriteCloser ioDev io.ReadWriteCloser
ct connTrack ct connTrack
slog.Logger log slog.Logger
// this buffer is used for storing data that must be read at some point. // this buffer is used for storing data that must be read at some point.
rxBuf *bufio.ReadWriter rxBuf *bufio.ReadWriter
writeLock sync.Mutex // prevents multiple writers from accessing the port at once. writeLock sync.Mutex // prevents multiple writers from accessing the port at once.
// conns contain a map of addresses to connections. This means that there
// can only be one direct connection to a device. This is pretty reasonable IMO.
// but needs to be documented very clearly.
conns map[uint64]*Conn
} }
// NewSession takes an IO device and a logger and returns a new XBee session.
func NewSession(dev io.ReadWriteCloser, baseLog *slog.Logger) (*Session, error) { func NewSession(dev io.ReadWriteCloser, baseLog *slog.Logger) (*Session, error) {
// make the session with the port/mode given, and set up the conntrack.
sess := &Session{ sess := &Session{
ioDev: dev, ioDev: dev,
Logger: *baseLog, log: *baseLog,
ct: *NewConnTrack(), ct: *NewConnTrack(),
} }
// setup io readwriter with a pipe. // setup io readwriter with a pipe.
@ -69,10 +69,12 @@ func (sess *Session) rxHandler() {
scan := bufio.NewScanner(sess.ioDev) scan := bufio.NewScanner(sess.ioDev)
scan.Split(xbeeFrameSplit) scan.Split(xbeeFrameSplit)
// scan.Scan() will return false when there's EOF, i.e the io device is closed.
// this is activated by sess.Close()
for scan.Scan() { for scan.Scan() {
data, err := parseFrame(scan.Bytes()) data, err := parseFrame(scan.Bytes())
if err != nil { if err != nil {
sess.Logger.Warn("error parsing frame", "error", err, "data", data) sess.log.Warn("error parsing frame", "error", err, "data", data)
continue continue
} }
@ -82,13 +84,13 @@ func (sess *Session) rxHandler() {
//TODO: if we have multiple remotes on the network, we need to track them here. //TODO: if we have multiple remotes on the network, we need to track them here.
frame, err := ParseRxFrame(data) frame, err := ParseRxFrame(data)
if err != nil { if err != nil {
sess.Logger.Warn("error parsing rx packet", "error", err, "data", data) sess.log.Warn("error parsing rx packet", "error", err, "data", data)
break //continue? break //continue?
} }
// take the data and write it to our internal rx packet buffer. // take the data and write it to our internal rx packet buffer.
_, err = sess.rxBuf.Write(frame.Payload) _, err = sess.rxBuf.Write(frame.Payload)
if err != nil { if err != nil {
sess.Logger.Warn("error writing data", "error", err, "payload", frame.Payload) sess.log.Warn("error writing data", "error", err, "payload", frame.Payload)
} }
case TxStatusType, ATCmdResponseType, RemoteCmdRespType: case TxStatusType, ATCmdResponseType, RemoteCmdRespType:
@ -99,17 +101,18 @@ func (sess *Session) rxHandler() {
err := sess.ct.ClearMark(idx, data) err := sess.ct.ClearMark(idx, data)
if err != nil { if err != nil {
// we got a rogue packet lol // we got a rogue packet lol
sess.Logger.Warn("rogue frame ID", "id", idx, "error", err) sess.log.Warn("rogue frame ID", "id", idx, "error", err)
} }
default: default:
// we don't know what to do with it. // we don't know what to do with it.
sess.Logger.Info("unhandled packet type", "type", data[0], "id", data[1]) sess.log.Info("unhandled packet type", "type", data[0], "id", data[1])
} }
} }
// if we get here, the serial port has closed. this is fine. // if we get here, the serial port has closed. this is fine.
sess.log.Debug("closing rx handler", "err", scan.Err())
} }
// This implements io.Reader for the UART Session. // This implements io.Reader for the UART Session.
@ -118,27 +121,34 @@ func (sess *Session) Read(p []byte) (int, error) {
return sess.rxBuf.Read(p) return sess.rxBuf.Read(p)
} }
func (sess *Session) Write(p []byte) (n int, err error) { // Write sends a message to all XBees listening on the network. To send a message to a specific
sess.Warn("hello") // XBee, use Dial() to get a Conn
func (sess *Session) Write(p []byte) (int, error) {
return sess.writeAddr(p, 0xFFFF)
}
func (sess *Session) writeAddr(p []byte, dest uint64) (n int, err error) {
idx, ch, err := sess.ct.GetMark() idx, ch, err := sess.ct.GetMark()
if err != nil { if err != nil {
return return
} }
n = len(p)
wf := &TxFrame{ wf := &TxFrame{
Id: idx, Id: idx,
Destination: BroadcastAddr, Destination: dest,
Payload: p, Payload: p,
} }
// write the actual packet
sess.writeLock.Lock() sess.writeLock.Lock()
_, err = writeXBeeFrame(sess.ioDev, wf.Bytes()) n, err = writeXBeeFrame(sess.ioDev, wf.Bytes())
sess.writeLock.Unlock() sess.writeLock.Unlock()
if err != nil { if err != nil {
return return
} }
n = n - 4
// finally, wait for the channel we got to return. this means that // finally, wait for the channel we got to return. this means that
// the matching response frame was received, so we can parse it. // the matching response frame was received, so we can parse it.
@ -210,6 +220,15 @@ func (sess *Session) Close() error {
return sess.ioDev.Close() return sess.ioDev.Close()
} }
// next, we define AT commands. These are functions that take in a Session and // Conn is a connection to a specific remote XBee. Conn allows for the user to
// provide wrappers around AT command information, like type checking. // contact one Xbee for point-to-point communications. This enables ACK packets
// // for reliable transmission.
type Conn struct {
parent *Session
log slog.Logger
addr uint64
}
func (c *Conn) Write(p []byte) (int, error) {
return c.parent.writeAddr(p, c.addr)
}