gotelem/internal/xbee/session.go
2023-05-01 09:49:47 -05:00

243 lines
7 KiB
Go

package xbee
import (
"bufio"
"fmt"
"io"
"sync"
"go.bug.st/serial"
"go.uber.org/zap"
)
// todo: make transport-agnostic (serial port or TCP/IP)
// A session is a simple way to manage an xbee device.
// it provides io.Reader and io.Writer, as well as some extra functions to handle
// custom Xbee frames.
type Session interface {
io.ReadWriteCloser
GetStatus() // todo: figure out signature for this
// Dial takes an address and allows direct communication with that
// device, without using broadcast.
Dial(addr uint64) io.ReadWriteCloser
// AT command related functions - query, set on local, query, set on remote.
ATCommand(cmd ATCmd, queued bool) (resp ATCmd, err error)
RemoteATCommand(cmd ATCmd, addr uint64) (resp ATCmd, err error)
}
type SerialSession struct {
port serial.Port
ct connTrack
log *zap.SugaredLogger
// todo: add queuing structures here for reliable transport and tracking.
// this buffer is used for storing data that must be read at some point.
rxBuf *bufio.ReadWriter
writeLock sync.Mutex // prevents multiple writers from accessing the port at once.
}
func NewSerialXBee(portName string, mode *serial.Mode) (*SerialSession, error) {
// make the session with the port/mode given, and set up the conntrack.
sess := &SerialSession{}
port, err := serial.Open(portName, mode)
if err != nil {
return sess, err
}
sess.port = port
sess.ct = connTrack{}
// setup io readwriter with a pipe.
rd, wr := io.Pipe()
// this is for reading data *only* - writes are different! it's a
// readWriter because the goroutine that runs scan continuously (and handles all other packets)
// will write to the buffer when new Rx packets come in, and we can read out from application code.
sess.rxBuf = bufio.NewReadWriter(bufio.NewReader(rd), bufio.NewWriter(wr))
logger, err := zap.NewDevelopment()
if err != nil {
return sess, err
}
sess.log = logger.Sugar()
// start the rx handler in the background. we close it later by closing the serial port.
go sess.rxHandler()
return sess, nil
}
// before we can write `Read(p []byte)` we have to have a goroutine that takes the input from
// the serial port and parses it out - if it's data, we push the data to a buffer for
// the application to read the bytes on its own.
//
// if it's a different kind of packet, we do custom functionality (free the conntrack, update
// local status, etc)
func (sess *SerialSession) rxHandler() {
// we wrap the serial port read line in a bufio.scanner using our custom split function.
scan := bufio.NewScanner(sess.port)
scan.Split(xbeeFrameSplit)
for scan.Scan() {
// TODO: check for errors?
// data is a frame payload - not a full frame.
data, err := parseFrame(scan.Bytes())
if err != nil {
sess.log.Warnw("error parsing frame", "error", err, "data", data)
continue
}
// data is good, lets parse the frame - using the first byte as the identifier.
switch XBeeCmd(data[0]) {
case RxPktType:
// we parse the data, and push it to the rx buffer.
//TODO: if we have multiple sources, we need to track them here.
frame, err := ParseRxFrame(data)
if err != nil {
sess.log.Warnw("error parsing rx packet", "error", err, "data", data)
break //continue?
}
// take the data and write it to our internal rx packet buffer.
_, err = sess.rxBuf.Write(frame.Payload)
if err != nil {
sess.log.Warnw("error writing data", "error", err, "payload", frame.Payload)
}
// the "callback"-style handler. Any received packet with a frame ID should
// be handled here.
case TxStatusType, ATCmdResponseType, RemoteCmdRespType: // these take the frame bytes and parse it themselves.
// we hand it back via the channel. we directly find the ID since it's always
// the second byte.
idx := data[1]
err := sess.ct.ClearMark(idx, data)
if err != nil {
// we got a rogue packet lol
sess.log.Warnw("rogue frame ID", "id", idx, "error", err)
}
default:
// we don't know what to do with it.
sess.log.Infow("unhandled packet type", "type", data[0], "id", data[1])
}
}
// if we get here, the serial port has closed. this is fine.
}
// This implements io.Reader for the UART Session.
func (sess *SerialSession) Read(p []byte) (int, error) {
// Since we have an rx buffer, we just read from that and return the results.
return sess.rxBuf.Read(p)
}
func (sess *SerialSession) Write(p []byte) (n int, err error) {
// we construct a packet - using the conntrack to ensure that the packet is okay.
// we block - this is more correct.
idx, ch, err := sess.ct.GetMark()
if err != nil {
return
}
wf := &TxFrame{
Id: idx,
Destination: BroadcastAddr,
Payload: p,
}
// write the actual packet
sess.writeLock.Lock()
n, err = writeXBeeFrame(sess.port, wf.Bytes())
if err != nil {
return
}
sess.writeLock.Unlock()
// finally, wait for the channel we got to return. this means that
// the matching response frame was received, so we can parse it.
// TODO: add timeout.
responseFrame := <-ch
// this is a tx status frame bytes, so lets parse it out.
status, err := ParseTxStatusFrame(responseFrame)
if err != nil {
return
}
if status.Status != 0 {
err = fmt.Errorf("tx failed 0x%x", status.Status)
}
return
}
// sends a local AT command. If `queued` is true, the command is not immediately applied;
// instead, an AC command must be set to apply the queued changes. `queued` does not
// affect query-type commands, which always return right away.
// the AT command is an interface.
func (sess *SerialSession) ATCommand(at ATCmd, queued bool) error {
// we must encode the command, and then create the actual packet.
// then we send the packet, and wait for the response
// TODO: how to handle multiple-response-packet AT commands?
// (mainly Node Discovery ND)
// get a mark for the frame
isQuery := len(at.Payload()) > 0
idx, ch, err := sess.ct.GetMark()
if err != nil {
return err
}
rawData := encodeATCommand(at, idx, queued)
sess.writeLock.Lock()
_, err = writeXBeeFrame(sess.port, rawData)
sess.writeLock.Unlock()
if err != nil {
return fmt.Errorf("error writing xbee frame: %w", err)
}
// we use the AT command that was provided to decode the frame.
// Parse stores the response result locally.
// we parse the base frame ourselves, and if it's okay we pass it
// to the provided ATCommand
// TODO: add timeout.
resp, err := ParseATCmdResponse(<-ch)
if err != nil {
return err
}
if resp.Status != 0 {
// sinec ATCmdStatus is a stringer thanks to the generator
return fmt.Errorf("AT command failed: %v", resp.Status)
}
// finally, we use the provided ATCmd interface to unpack the data.
// this overwrites the values provided, but this should only happen
// during a query, so this is fine.
// TODO: skip if not a query command?
if isQuery {
return at.Parse(resp)
}
// it's not a query, and there was no error, so we just plain return
return nil
}
// Does this need to exist?
func (sess *SerialSession) GetStatus() {
panic("TODO: implement")
}
// Implement the io.Closer.
func (sess *SerialSession) Close() error {
return sess.port.Close()
}