From 709b1f0bac39e6bd0141d17aa25372b2d49af131 Mon Sep 17 00:00:00 2001 From: saji Date: Sun, 7 May 2023 21:00:34 -0500 Subject: [PATCH] move cli, session cleanup --- cmd/{ => gotelem/cli}/root.go | 2 +- cmd/{ => gotelem/cli}/server.go | 2 +- cmd/{ => gotelem/cli}/xbee.go | 30 +++++++++-- cmd/gotelem/gotelem.go | 9 ++++ xbee/api_frame.go | 7 --- xbee/rxframe_test.go | 1 - xbee/session.go | 93 ++++++++++++++++++++------------- 7 files changed, 92 insertions(+), 52 deletions(-) rename cmd/{ => gotelem/cli}/root.go (95%) rename cmd/{ => gotelem/cli}/server.go (99%) rename cmd/{ => gotelem/cli}/xbee.go (86%) create mode 100644 cmd/gotelem/gotelem.go diff --git a/cmd/root.go b/cmd/gotelem/cli/root.go similarity index 95% rename from cmd/root.go rename to cmd/gotelem/cli/root.go index f26c0a8..d39c26d 100644 --- a/cmd/root.go +++ b/cmd/gotelem/cli/root.go @@ -1,4 +1,4 @@ -package cmd +package cli import ( "log" diff --git a/cmd/server.go b/cmd/gotelem/cli/server.go similarity index 99% rename from cmd/server.go rename to cmd/gotelem/cli/server.go index f326dbd..ebeed86 100644 --- a/cmd/server.go +++ b/cmd/gotelem/cli/server.go @@ -1,4 +1,4 @@ -package cmd +package cli import ( "fmt" diff --git a/cmd/xbee.go b/cmd/gotelem/cli/xbee.go similarity index 86% rename from cmd/xbee.go rename to cmd/gotelem/cli/xbee.go index 64474f6..104dc56 100644 --- a/cmd/xbee.go +++ b/cmd/gotelem/cli/xbee.go @@ -1,9 +1,10 @@ -package cmd +package cli // this file contains xbee utilities. // we can do network discovery and netcat-like things. import ( + "context" "errors" "fmt" "io" @@ -19,6 +20,13 @@ import ( "golang.org/x/exp/slog" ) +// context key stuff to prevent collisions +type ctxKey int + +const ( + keyIODevice ctxKey = iota +) + var xbeeCmd = &cli.Command{ Name: "xbee", Aliases: []string{"x"}, @@ -45,6 +53,17 @@ TCP/UDP connections require a port and will fail if one is not provided. EnvVars: []string{"XBEE_DEVICE"}, }, }, + // this parses the device string and creates the io device. + // TODO: should we create the session here instead? + Before: func(ctx *cli.Context) error { + transport, err := parseDeviceString(ctx.String("device")) + if err != nil { + return err + } + + ctx.Context = context.WithValue(ctx.Context, keyIODevice, transport) + return nil + }, Subcommands: []*cli.Command{ { Name: "info", @@ -72,7 +91,7 @@ writtend to stdout. func xbeeInfo(ctx *cli.Context) error { logger := slog.New(slog.NewTextHandler(os.Stderr)) - transport, _ := parseDeviceString(ctx.String("device")) + transport := ctx.Context.Value(keyIODevice).(xbeeTransport) xb, err := xbee.NewSession(transport, logger.With("device", transport.Type())) if err != nil { return cli.Exit(err, 1) @@ -94,10 +113,9 @@ func netcat(ctx *cli.Context) error { return cli.Exit("missing [addr] argument", 1) } - // basically create two pipes. logger := slog.New(slog.NewTextHandler(os.Stderr)) - transport, _ := parseDeviceString(ctx.String("device")) + transport := ctx.Context.Value(keyIODevice).(xbeeTransport) xb, _ := xbee.NewSession(transport, logger.With("devtype", transport.Type())) sent := make(chan int64) @@ -114,7 +132,9 @@ func netcat(ctx *cli.Context) error { go streamCopy(os.Stdin, xb) go streamCopy(xb, os.Stdout) - <-sent + n := <-sent + + fmt.Printf("Sent %d\n", n) return nil } diff --git a/cmd/gotelem/gotelem.go b/cmd/gotelem/gotelem.go new file mode 100644 index 0000000..7af7c9d --- /dev/null +++ b/cmd/gotelem/gotelem.go @@ -0,0 +1,9 @@ +package main + +import ( + "github.com/kschamplin/gotelem/cmd/gotelem/cli" +) + +func main() { + cli.Execute() +} diff --git a/xbee/api_frame.go b/xbee/api_frame.go index 11b0719..0243603 100644 --- a/xbee/api_frame.go +++ b/xbee/api_frame.go @@ -1,10 +1,3 @@ -// Package xbee implements xbee API encoding and decoding. - -// It encodes and decodes -// API frames from io.Writer and io.Reader by providing a WriteFrame function and -// a scanner.split function. It also includes internal packets for using the API. -// For end-users, it provides a simple net.Conn-like interface that can write -// and read arbitrary bytes (to be used by a higher level protocol) package xbee import ( diff --git a/xbee/rxframe_test.go b/xbee/rxframe_test.go index 39c32d5..327058a 100644 --- a/xbee/rxframe_test.go +++ b/xbee/rxframe_test.go @@ -15,7 +15,6 @@ func TestParseRxFrame(t *testing.T) { want *RxFrame wantErr bool }{ - // TODO: Add test cases. { name: "64-bit unicast", args: args{ diff --git a/xbee/session.go b/xbee/session.go index b50ef43..edcd00b 100644 --- a/xbee/session.go +++ b/xbee/session.go @@ -1,3 +1,10 @@ +/* +Package xbee provides communication and configuration of Digi XBee products + +(and other Digi products that are similar such as the XLR Pro). It provides +a net.Conn-like interface as well as AT commands for configuration. The most +common usage of the package is with a Session, which provides +*/ package xbee import ( @@ -9,39 +16,32 @@ import ( "golang.org/x/exp/slog" ) -// todo: make transport-agnostic (serial port or TCP/IP) - -// A session is a simple way to manage an xbee device. -// it provides io.Reader and io.Writer, as well as some extra functions to handle -// custom Xbee frames. -// type Session interface { -// io.ReadWriteCloser -// GetStatus() // todo: figure out signature for this - -// // Dial takes an address and allows direct communication with that -// // device, without using broadcast. -// Dial(addr uint64) io.ReadWriteCloser -// // AT command related functions - query, set on local, query, set on remote. - -// ATCommand(cmd ATCmd, queued bool) (resp ATCmd, err error) -// RemoteATCommand(cmd ATCmd, addr uint64) (resp ATCmd, err error) -// } +// TODO: implement net.Conn for Session/Conn. We are missing LocalAddr, RemoteAddr, +// and Deadline related methods. +// Session represents a connection to a locally-attached XBee. The connection can be through +// serial/USB or TCP/IP depending on what is supported by the device. type Session struct { ioDev io.ReadWriteCloser ct connTrack - slog.Logger + log slog.Logger // this buffer is used for storing data that must be read at some point. - rxBuf *bufio.ReadWriter + rxBuf *bufio.ReadWriter + writeLock sync.Mutex // prevents multiple writers from accessing the port at once. + + // conns contain a map of addresses to connections. This means that there + // can only be one direct connection to a device. This is pretty reasonable IMO. + // but needs to be documented very clearly. + conns map[uint64]*Conn } +// NewSession takes an IO device and a logger and returns a new XBee session. func NewSession(dev io.ReadWriteCloser, baseLog *slog.Logger) (*Session, error) { - // make the session with the port/mode given, and set up the conntrack. sess := &Session{ - ioDev: dev, - Logger: *baseLog, - ct: *NewConnTrack(), + ioDev: dev, + log: *baseLog, + ct: *NewConnTrack(), } // setup io readwriter with a pipe. @@ -69,10 +69,12 @@ func (sess *Session) rxHandler() { scan := bufio.NewScanner(sess.ioDev) scan.Split(xbeeFrameSplit) + // scan.Scan() will return false when there's EOF, i.e the io device is closed. + // this is activated by sess.Close() for scan.Scan() { data, err := parseFrame(scan.Bytes()) if err != nil { - sess.Logger.Warn("error parsing frame", "error", err, "data", data) + sess.log.Warn("error parsing frame", "error", err, "data", data) continue } @@ -82,13 +84,13 @@ func (sess *Session) rxHandler() { //TODO: if we have multiple remotes on the network, we need to track them here. frame, err := ParseRxFrame(data) if err != nil { - sess.Logger.Warn("error parsing rx packet", "error", err, "data", data) + sess.log.Warn("error parsing rx packet", "error", err, "data", data) break //continue? } // take the data and write it to our internal rx packet buffer. _, err = sess.rxBuf.Write(frame.Payload) if err != nil { - sess.Logger.Warn("error writing data", "error", err, "payload", frame.Payload) + sess.log.Warn("error writing data", "error", err, "payload", frame.Payload) } case TxStatusType, ATCmdResponseType, RemoteCmdRespType: @@ -99,17 +101,18 @@ func (sess *Session) rxHandler() { err := sess.ct.ClearMark(idx, data) if err != nil { // we got a rogue packet lol - sess.Logger.Warn("rogue frame ID", "id", idx, "error", err) + sess.log.Warn("rogue frame ID", "id", idx, "error", err) } default: // we don't know what to do with it. - sess.Logger.Info("unhandled packet type", "type", data[0], "id", data[1]) + sess.log.Info("unhandled packet type", "type", data[0], "id", data[1]) } } // if we get here, the serial port has closed. this is fine. + sess.log.Debug("closing rx handler", "err", scan.Err()) } // This implements io.Reader for the UART Session. @@ -118,27 +121,34 @@ func (sess *Session) Read(p []byte) (int, error) { return sess.rxBuf.Read(p) } -func (sess *Session) Write(p []byte) (n int, err error) { - sess.Warn("hello") +// Write sends a message to all XBees listening on the network. To send a message to a specific +// XBee, use Dial() to get a Conn +func (sess *Session) Write(p []byte) (int, error) { + + return sess.writeAddr(p, 0xFFFF) + +} + +func (sess *Session) writeAddr(p []byte, dest uint64) (n int, err error) { + idx, ch, err := sess.ct.GetMark() if err != nil { return } - n = len(p) wf := &TxFrame{ Id: idx, - Destination: BroadcastAddr, + Destination: dest, Payload: p, } - // write the actual packet sess.writeLock.Lock() - _, err = writeXBeeFrame(sess.ioDev, wf.Bytes()) + n, err = writeXBeeFrame(sess.ioDev, wf.Bytes()) sess.writeLock.Unlock() if err != nil { return } + n = n - 4 // finally, wait for the channel we got to return. this means that // the matching response frame was received, so we can parse it. @@ -210,6 +220,15 @@ func (sess *Session) Close() error { return sess.ioDev.Close() } -// next, we define AT commands. These are functions that take in a Session and -// provide wrappers around AT command information, like type checking. -// +// Conn is a connection to a specific remote XBee. Conn allows for the user to +// contact one Xbee for point-to-point communications. This enables ACK packets +// for reliable transmission. +type Conn struct { + parent *Session + log slog.Logger + addr uint64 +} + +func (c *Conn) Write(p []byte) (int, error) { + return c.parent.writeAddr(p, c.addr) +}