make xbee session generic

This commit is contained in:
saji 2023-05-07 08:39:10 -05:00
parent 73fec064c5
commit ddad848127
3 changed files with 65 additions and 52 deletions

View file

@ -3,15 +3,11 @@ package cmd
import ( import (
"fmt" "fmt"
"net" "net"
"os"
"time" "time"
"github.com/kschamplin/gotelem/can" "github.com/kschamplin/gotelem/can"
"github.com/kschamplin/gotelem/socketcan" "github.com/kschamplin/gotelem/socketcan"
"github.com/kschamplin/gotelem/xbee"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"go.bug.st/serial"
"golang.org/x/exp/slog"
) )
const xbeeCategory = "XBee settings" const xbeeCategory = "XBee settings"
@ -59,17 +55,6 @@ func handleCon(conn net.Conn, broker *Broker) {
func xbeeSvc(b *Broker) { func xbeeSvc(b *Broker) {
// open the session.
mode := &serial.Mode{
BaudRate: 115200,
}
logger := slog.New(slog.NewTextHandler(os.Stderr))
_, err := xbee.NewSerialXBee("/dev/ttyACM0", mode, logger)
if err != nil {
fmt.Printf("got error %v", err)
panic(err)
}
} }
// this spins up a new can socket on vcan0 and broadcasts a packet every second. for testing. // this spins up a new can socket on vcan0 and broadcasts a packet every second. for testing.

View file

@ -7,6 +7,8 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"runtime"
"strings"
"github.com/kschamplin/gotelem/xbee" "github.com/kschamplin/gotelem/xbee"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
@ -67,7 +69,8 @@ writtend to stdout.
func xbeeInfo(ctx *cli.Context) error { func xbeeInfo(ctx *cli.Context) error {
logger := slog.New(slog.NewTextHandler(os.Stderr)) logger := slog.New(slog.NewTextHandler(os.Stderr))
xb, err := xbee.NewSerialXBee("/dev/ttyACM0", &serial.Mode{}, logger) serialDevice, _ := serial.Open("/dev/ttyACM0", &serial.Mode{})
xb, err := xbee.NewSession(serialDevice, logger.With("name", ""))
if err != nil { if err != nil {
return cli.Exit(err, 1) return cli.Exit(err, 1)
} }
@ -90,7 +93,9 @@ func netcat(ctx *cli.Context) error {
} }
// basically create two pipes. // basically create two pipes.
logger := slog.New(slog.NewTextHandler(os.Stderr)) logger := slog.New(slog.NewTextHandler(os.Stderr))
xb, _ := xbee.NewSerialXBee("/dev/ttyACM0", &serial.Mode{}, logger)
transport, _ := parseDeviceString("/dev/ttyUSB0")
xb, _ := xbee.NewSession(transport, logger.With("devtype", transport.Type()))
sent := make(chan int64) sent := make(chan int64)
streamCopy := func(r io.ReadCloser, w io.WriteCloser) { streamCopy := func(r io.ReadCloser, w io.WriteCloser) {
@ -110,3 +115,32 @@ func netcat(ctx *cli.Context) error {
return nil return nil
} }
type xbeeTransport struct {
io.ReadWriteCloser
devType string
}
func (xbt *xbeeTransport) Type() string {
return xbt.devType
}
// parseDeviceString parses the device parameter and sets up the associated
// device. The device is returned in an xbeeTransport which also stores
// the underlying type of the device with Type() string
func parseDeviceString(dev string) (*xbeeTransport, error) {
// FIXME: implement properly
serialDevice, _ := serial.Open(dev, &serial.Mode{})
xbt := &xbeeTransport{
ReadWriteCloser: serialDevice,
devType: "serial",
}
if strings.HasPrefix(dev, "tcp://") {
} else if strings.HasPrefix(dev, "COM") && runtime.GOOS == "windows" {
} else if strings.HasPrefix(dev, "/") && runtime.GOOS != "windows" {
}
return xbt, nil
}

View file

@ -6,7 +6,6 @@ import (
"io" "io"
"sync" "sync"
"go.bug.st/serial"
"golang.org/x/exp/slog" "golang.org/x/exp/slog"
) )
@ -15,41 +14,35 @@ import (
// A session is a simple way to manage an xbee device. // A session is a simple way to manage an xbee device.
// it provides io.Reader and io.Writer, as well as some extra functions to handle // it provides io.Reader and io.Writer, as well as some extra functions to handle
// custom Xbee frames. // custom Xbee frames.
type Session interface { // type Session interface {
io.ReadWriteCloser // io.ReadWriteCloser
GetStatus() // todo: figure out signature for this // GetStatus() // todo: figure out signature for this
// Dial takes an address and allows direct communication with that // // Dial takes an address and allows direct communication with that
// device, without using broadcast. // // device, without using broadcast.
Dial(addr uint64) io.ReadWriteCloser // Dial(addr uint64) io.ReadWriteCloser
// AT command related functions - query, set on local, query, set on remote. // // AT command related functions - query, set on local, query, set on remote.
ATCommand(cmd ATCmd, queued bool) (resp ATCmd, err error) // ATCommand(cmd ATCmd, queued bool) (resp ATCmd, err error)
RemoteATCommand(cmd ATCmd, addr uint64) (resp ATCmd, err error) // RemoteATCommand(cmd ATCmd, addr uint64) (resp ATCmd, err error)
} // }
type SerialSession struct { type Session struct {
port serial.Port ioDev io.ReadWriteCloser
ct connTrack ct connTrack
slog.Logger slog.Logger
// todo: add queuing structures here for reliable transport and tracking.
// this buffer is used for storing data that must be read at some point. // this buffer is used for storing data that must be read at some point.
rxBuf *bufio.ReadWriter rxBuf *bufio.ReadWriter
writeLock sync.Mutex // prevents multiple writers from accessing the port at once. writeLock sync.Mutex // prevents multiple writers from accessing the port at once.
} }
func NewSerialXBee(portName string, mode *serial.Mode, baseLog *slog.Logger) (*SerialSession, error) { func NewSession(dev io.ReadWriteCloser, baseLog *slog.Logger) (*Session, error) {
// make the session with the port/mode given, and set up the conntrack. // make the session with the port/mode given, and set up the conntrack.
sess := &SerialSession{} sess := &Session{
ioDev: dev,
port, err := serial.Open(portName, mode) Logger: *baseLog,
if err != nil { ct: *NewConnTrack(),
return sess, err
} }
sess.port = port
sess.ct = *NewConnTrack()
sess.Logger = *baseLog.With("portname", portName)
// setup io readwriter with a pipe. // setup io readwriter with a pipe.
rd, wr := io.Pipe() rd, wr := io.Pipe()
@ -71,9 +64,9 @@ func NewSerialXBee(portName string, mode *serial.Mode, baseLog *slog.Logger) (*S
// //
// if it's a different kind of packet, we do custom functionality (free the conntrack, update // if it's a different kind of packet, we do custom functionality (free the conntrack, update
// local status, etc) // local status, etc)
func (sess *SerialSession) rxHandler() { func (sess *Session) rxHandler() {
// we wrap the serial port read line in a bufio.scanner using our custom split function. // we wrap the serial port read line in a bufio.scanner using our custom split function.
scan := bufio.NewScanner(sess.port) scan := bufio.NewScanner(sess.ioDev)
scan.Split(xbeeFrameSplit) scan.Split(xbeeFrameSplit)
for scan.Scan() { for scan.Scan() {
@ -120,12 +113,12 @@ func (sess *SerialSession) rxHandler() {
} }
// This implements io.Reader for the UART Session. // This implements io.Reader for the UART Session.
func (sess *SerialSession) Read(p []byte) (int, error) { func (sess *Session) Read(p []byte) (int, error) {
// Since we have an rx buffer, we just read from that and return the results. // Since we have an rx buffer, we just read from that and return the results.
return sess.rxBuf.Read(p) return sess.rxBuf.Read(p)
} }
func (sess *SerialSession) Write(p []byte) (n int, err error) { func (sess *Session) Write(p []byte) (n int, err error) {
sess.Warn("hello") sess.Warn("hello")
idx, ch, err := sess.ct.GetMark() idx, ch, err := sess.ct.GetMark()
if err != nil { if err != nil {
@ -141,7 +134,7 @@ func (sess *SerialSession) Write(p []byte) (n int, err error) {
// write the actual packet // write the actual packet
sess.writeLock.Lock() sess.writeLock.Lock()
_, err = writeXBeeFrame(sess.port, wf.Bytes()) _, err = writeXBeeFrame(sess.ioDev, wf.Bytes())
sess.writeLock.Unlock() sess.writeLock.Unlock()
if err != nil { if err != nil {
return return
@ -169,7 +162,7 @@ func (sess *SerialSession) Write(p []byte) (n int, err error) {
// instead, an AC command must be set to apply the queued changes. `queued` does not // instead, an AC command must be set to apply the queued changes. `queued` does not
// affect query-type commands, which always return right away. // affect query-type commands, which always return right away.
// the AT command is an interface. // the AT command is an interface.
func (sess *SerialSession) ATCommand(cmd [2]rune, data []byte, queued bool) ([]byte, error) { func (sess *Session) ATCommand(cmd [2]rune, data []byte, queued bool) ([]byte, error) {
// we must encode the command, and then create the actual packet. // we must encode the command, and then create the actual packet.
// then we send the packet, and wait for the response // then we send the packet, and wait for the response
// TODO: how to handle multiple-response-packet AT commands? // TODO: how to handle multiple-response-packet AT commands?
@ -184,7 +177,7 @@ func (sess *SerialSession) ATCommand(cmd [2]rune, data []byte, queued bool) ([]b
rawData := encodeATCommand(cmd, data, idx, queued) rawData := encodeATCommand(cmd, data, idx, queued)
sess.writeLock.Lock() sess.writeLock.Lock()
_, err = writeXBeeFrame(sess.port, rawData) _, err = writeXBeeFrame(sess.ioDev, rawData)
sess.writeLock.Unlock() sess.writeLock.Unlock()
if err != nil { if err != nil {
@ -208,14 +201,15 @@ func (sess *SerialSession) ATCommand(cmd [2]rune, data []byte, queued bool) ([]b
} }
// Does this need to exist? // Does this need to exist?
func (sess *SerialSession) GetStatus() { func (sess *Session) GetStatus() {
panic("TODO: implement") panic("TODO: implement")
} }
// Implement the io.Closer. // Implement the io.Closer.
func (sess *SerialSession) Close() error { func (sess *Session) Close() error {
return sess.port.Close() return sess.ioDev.Close()
} }
// next, we define AT commands. These are functions that take in a Session and // next, we define AT commands. These are functions that take in a Session and
// provide wrappers around AT command information, like type checking. // provide wrappers around AT command information, like type checking.
//