more xbee work

This commit is contained in:
saji 2023-04-29 10:58:56 -05:00
parent 67dbb74f77
commit 5d42f8693a
10 changed files with 624 additions and 122 deletions

View file

@ -5,9 +5,13 @@ import (
"net" "net"
"time" "time"
"github.com/kschamplin/gotelem/internal/can"
"github.com/kschamplin/gotelem/internal/gotelem" "github.com/kschamplin/gotelem/internal/gotelem"
"github.com/kschamplin/gotelem/internal/socketcan"
"github.com/kschamplin/gotelem/internal/xbee"
"github.com/tinylib/msgp/msgp" "github.com/tinylib/msgp/msgp"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"go.bug.st/serial"
) )
const xbeeCategory = "XBee settings" const xbeeCategory = "XBee settings"
@ -25,7 +29,6 @@ var serveCmd = &cli.Command{
}, },
} }
type session struct { type session struct {
conn net.Conn conn net.Conn
send chan gotelem.Body send chan gotelem.Body
@ -34,12 +37,17 @@ type session struct {
} }
func serve() { func serve() {
// start the can listener
go vcanTest()
fromCanBus := make(chan can.Frame, 1)
toCanBus := make(chan can.Frame)
go canHandler(toCanBus, fromCanBus)
ln, err := net.Listen("tcp", ":8082") ln, err := net.Listen("tcp", ":8082")
if err != nil { if err != nil {
fmt.Printf("Error listening: %v\n", err) fmt.Printf("Error listening: %v\n", err)
} }
fmt.Printf("Listening on :8082\n") fmt.Printf("Listening on :8082\n")
for { for {
conn, err := ln.Accept() conn, err := ln.Accept()
if err != nil { if err != nil {
@ -53,11 +61,6 @@ func handleCon(conn net.Conn) {
// reader := msgp.NewReader(conn) // reader := msgp.NewReader(conn)
writer := msgp.NewWriter(conn) writer := msgp.NewWriter(conn)
for { for {
// data := telemnet.StatusBody{
// BatteryPct: 1.2,
// ErrCode: 0,
// }
// data.EncodeMsg(writer)
data := gotelem.StatusBody{ data := gotelem.StatusBody{
BatteryPct: 1.2, BatteryPct: 1.2,
ErrCode: 0, ErrCode: 0,
@ -67,3 +70,144 @@ func handleCon(conn net.Conn) {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} }
} }
func xbeeSvc(packets <-chan can.Frame, device string, quit <-chan struct{}) {
// open the session.
mode := &serial.Mode{
BaudRate: 115200,
}
sess, err := xbee.NewSerialXBee("/dev/ttyUSB1", mode)
if err != nil {
fmt.Printf("got error %v", err)
panic(err)
}
receivedData := make(chan gotelem.Data)
// make a simple reader that goes in the background.
go func() {
msgpReader := msgp.NewReader(sess)
var msgData gotelem.Data
msgData.DecodeMsg(msgpReader)
receivedData <- msgData
}()
for {
select {
case data := <-receivedData:
fmt.Printf("Got a data %v\n", data)
case packet := <-packets:
fmt.Printf("Got a packet, %v\n", packet)
}
}
}
// this spins up a new can socket on vcan0 and broadcasts a packet every second.
func vcanTest() {
sock, _ := socketcan.NewCanSocket("vcan0")
testFrame := &can.Frame{
Id: 0x234,
Kind: can.SFF,
Data: []byte{0, 1, 2, 3, 4, 5, 6, 7},
}
for {
fmt.Printf("sending test packet\n")
sock.Send(testFrame)
time.Sleep(1 * time.Second)
}
}
func canHandler(pktToSend <-chan can.Frame, pktRecv chan<- can.Frame) {
sock, _ := socketcan.NewCanSocket("vcan0")
// start a simple dispatcher that just relays can frames.
r := make(chan can.Frame)
go func() {
for {
pkt, _ := sock.Recv()
r <- *pkt
}
}()
for {
select {
case msg := <-pktToSend:
sock.Send(&msg)
case msg := <-r:
fmt.Printf("got a packet from the can %v\n", msg)
pktRecv <- msg
}
}
}
type BrokerRequest struct {
Source string // the name of the sender
Msg can.Frame // the message to send
}
type BrokerClient struct {
Name string // the name of the client
Ch chan can.Frame // the channel to send frames to this client
}
type Broker struct {
subs map[string]chan can.Frame
publishCh chan BrokerRequest
subsCh chan BrokerClient
unsubCh chan BrokerClient
}
func NewBroker(bufsize int) *Broker{
b := &Broker{
subs: make(map[string]chan can.Frame),
publishCh: make(chan BrokerRequest, 3),
subsCh: make(chan BrokerClient, 3),
unsubCh: make(chan BrokerClient, 3),
}
return b
}
func (b *Broker) Start() {
for {
select {
case newClient := <-b.subsCh:
b.subs[newClient.Name] = newClient.Ch
case req := <-b.publishCh:
for name, ch := range b.subs {
if name == req.Source {
continue // don't send to ourselves.
}
// a kinda-inelegant non-blocking push.
// if we can't do it, we just drop it. this should ideally never happen.
select {
case ch <- req.Msg:
default:
fmt.Printf("we dropped a packet to dest %s", name)
}
}
case clientToRemove := <-b.unsubCh:
close(b.subs[clientToRemove.Name])
delete(b.subs, clientToRemove.Name)
}
}
}
func (b *Broker) Publish(name string, msg can.Frame) {
breq := BrokerRequest{
Source: name,
Msg: msg,
}
b.publishCh <- breq
}
func (b *Broker) Subscribe(name string) <-chan can.Frame {
ch := make(chan can.Frame, 3)
bc := BrokerClient{
Name: name,
Ch: ch,
}
b.subsCh <- bc
return ch
}

5
go.mod
View file

@ -9,12 +9,17 @@ require (
require ( require (
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/creack/goselect v0.1.2 // indirect
github.com/jmoiron/sqlx v1.3.5 // indirect github.com/jmoiron/sqlx v1.3.5 // indirect
github.com/mattn/go-sqlite3 v1.14.16 // indirect github.com/mattn/go-sqlite3 v1.14.16 // indirect
github.com/philhofer/fwd v1.1.2 // indirect github.com/philhofer/fwd v1.1.2 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/tinylib/msgp v1.1.8 // indirect github.com/tinylib/msgp v1.1.8 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.bug.st/serial v1.5.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/mod v0.10.0 // indirect golang.org/x/mod v0.10.0 // indirect
golang.org/x/tools v0.8.0 // indirect golang.org/x/tools v0.8.0 // indirect
) )

10
go.sum
View file

@ -1,5 +1,7 @@
github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/goselect v0.1.2 h1:2DNy14+JPjRBgPzAd1thbQp4BSIihxcBf0IXhQXDRa0=
github.com/creack/goselect v0.1.2/go.mod h1:a/NhLweNvqIYMuxcMOuWY516Cimucms3DglDzQP3hKY=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g= github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g=
github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ= github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ=
@ -18,6 +20,14 @@ github.com/urfave/cli/v2 v2.25.1/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6f
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.bug.st/serial v1.5.0 h1:ThuUkHpOEmCVXxGEfpoExjQCS2WBVV4ZcUKVYInM9T4=
go.bug.st/serial v1.5.0/go.mod h1:UABfsluHAiaNI+La2iESysd9Vetq7VRdpxvjx7CmmOE=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=

View file

@ -22,13 +22,10 @@ import (
type Frameable interface { type Frameable interface {
// returns the API identifier for this frame. // returns the API identifier for this frame.
GetId() byte
// encodes this frame correctly. // encodes this frame correctly.
Bytes() ([]byte, error) Bytes() []byte
} }
// calculateChecksum is a helper function to calculate the 1-byte checksum of a data range. // calculateChecksum is a helper function to calculate the 1-byte checksum of a data range.
// the data range does not include the start delimiter, or the length uint16 (only the frame payload) // the data range does not include the start delimiter, or the length uint16 (only the frame payload)
func calculateChecksum(data []byte) byte { func calculateChecksum(data []byte) byte {
@ -39,21 +36,20 @@ func calculateChecksum(data []byte) byte {
return 0xFF - sum return 0xFF - sum
} }
// WriteFrame takes a frameable and writes it out to the given writer. // writeXBeeFrame takes some bytes and wraps it in an XBee frame.
func WriteFrame(w io.Writer, cmd Frameable) (n int, err error) { //
frame_data, err := cmd.Bytes() // An XBee frame has a start delimiter, followed by the length of the payload,
// then the payload itself, and finally a checksum.
func writeXBeeFrame(w io.Writer, data []byte) (n int, err error) {
if err != nil { frame := make([]byte, len(data)+4)
return
}
frame := make([]byte, len(frame_data)+4)
frame[0] = 0x7E frame[0] = 0x7E
binary.BigEndian.PutUint16(frame[1:], uint16(len(frame_data))) binary.BigEndian.PutUint16(frame[1:], uint16(len(data)))
copy(frame[3:], frame_data) copy(frame[3:], data)
chk := calculateChecksum(frame_data) chk := calculateChecksum(data)
frame[len(frame)-1] = chk frame[len(frame)-1] = chk
return w.Write(frame) return w.Write(frame)
@ -69,23 +65,23 @@ type XBeeCmd byte
const ( const (
// commands sent to the xbee s3b // commands sent to the xbee s3b
ATCmd XBeeCmd = 0x08 // AT Command ATCmdType XBeeCmd = 0x08 // AT Command
ATCmdQueue XBeeCmd = 0x09 // AT Command - Queue Parameter Value ATCmdQueueType XBeeCmd = 0x09 // AT Command - Queue Parameter Value
TxReq XBeeCmd = 0x10 // TX Request TxReqType XBeeCmd = 0x10 // TX Request
TxReqExpl XBeeCmd = 0x11 // Explicit TX Request TxReqExplType XBeeCmd = 0x11 // Explicit TX Request
RemoteCmdReq XBeeCmd = 0x17 // Remote Command Request RemoteCmdReqType XBeeCmd = 0x17 // Remote Command Request
// commands recieved from the xbee // commands recieved from the xbee
ATCmdResponse XBeeCmd = 0x88 // AT Command Response ATCmdResponseType XBeeCmd = 0x88 // AT Command Response
ModemStatus XBeeCmd = 0x8A // Modem Status ModemStatusType XBeeCmd = 0x8A // Modem Status
TxStatus XBeeCmd = 0x8B // Transmit Status TxStatusType XBeeCmd = 0x8B // Transmit Status
RouteInfoPkt XBeeCmd = 0x8D // Route information packet RouteInfoType XBeeCmd = 0x8D // Route information packet
AddrUpdate XBeeCmd = 0x8E // Aggregate Addressing Update AddrUpdateType XBeeCmd = 0x8E // Aggregate Addressing Update
RxPkt XBeeCmd = 0x90 // RX Indicator (AO=0) RxPktType XBeeCmd = 0x90 // RX Indicator (AO=0)
RxPktExpl XBeeCmd = 0x91 // Explicit RX Indicator (AO=1) RxPktExplType XBeeCmd = 0x91 // Explicit RX Indicator (AO=1)
IOSample XBeeCmd = 0x92 // Data Sample RX Indicator IOSampleType XBeeCmd = 0x92 // Data Sample RX Indicator
NodeId XBeeCmd = 0x95 // Note Identification Indicator NodeIdType XBeeCmd = 0x95 // Note Identification Indicator
RemoteCmdResp XBeeCmd = 0x97 // Remote Command Response RemoteCmdRespType XBeeCmd = 0x97 // Remote Command Response
) )
// AT commands are hard, so let's write out all the major ones here // AT commands are hard, so let's write out all the major ones here
@ -115,42 +111,34 @@ func xbeeFrameSplit(data []byte, atEOF bool) (advance int, token []byte, err err
return startIdx, nil, nil return startIdx, nil, nil
} }
// FIXME: add bounds checking! this can panic. // FIXME: add bounds checking! this can panic.
var frameLen = binary.BigEndian.Uint16(data[startIdx+1:startIdx+3]) + 4 var frameLen = int(binary.BigEndian.Uint16(data[startIdx+1:startIdx+3])) + 4
if len(data[startIdx:]) < int(frameLen) { if len(data[startIdx:]) < frameLen {
// we got the length, but there's not enough data for the frame. we can trim the // we got the length, but there's not enough data for the frame. we can trim the
// data that came before the start, but not return a token. // data that came before the start, but not return a token.
return startIdx, nil, nil return startIdx, nil, nil
} }
// there is enough data to pull a frame. // there is enough data to pull a frame.
// todo: check checksum here? we can return an error. // todo: check checksum here? we can return an error.
return startIdx + int(frameLen), data[startIdx : startIdx+int(frameLen)], nil return startIdx + frameLen, data[startIdx : startIdx+frameLen], nil
} }
// we didn't find a start character in our data, so request more. trash everythign given to us // we didn't find a start character in our data, so request more. trash everythign given to us
return len(data), nil, nil return len(data), nil, nil
} }
func parseFrame(frame []byte) ([]byte, error) { func parseFrame(frame []byte) ([]byte, error) {
if frame[0] != 0x7E { if frame[0] != 0x7E {
return nil, errors.New("incorrect start delimiter") return nil, errors.New("incorrect start delimiter")
} }
fsize := len(frame) fsize := len(frame)
if calculateChecksum(frame[3:fsize - 1]) != frame[fsize] { if calculateChecksum(frame[3:fsize-1]) != frame[fsize] {
return nil, errors.New("checksum mismatch") return nil, errors.New("checksum mismatch")
} }
return frame[3:fsize - 1], nil return frame[3 : fsize-1], nil
} }
// stackup // stackup
// low level readwriter (serial or IP socket) // low level readwriter (serial or IP socket)
// XBee library layer (frame encoding/decoding to/from structs) // XBee library layer (frame encoding/decoding to/from structs)
// xbee conn-like layer (ReadWriter + custom control functions) // xbee conn-like layer (ReadWriter + custom control functions)
// application marshalling format (msgpack or json or gob) // application marshalling format (msgpack or json or gob)
// application // application
type XBeeConn interface {
io.ReadWriter
WriteFrame(Frameable, bool) Frameable
}

View file

@ -1,7 +1,6 @@
package xbee package xbee
import ( import (
"bytes"
"reflect" "reflect"
"testing" "testing"
) )
@ -107,32 +106,35 @@ func Test_xbeeFrameSplit(t *testing.T) {
} }
} }
func TestWriteFrame(t *testing.T) { func Test_parseFrame(t *testing.T) {
type args struct { type args struct {
cmd Frameable frame []byte
} }
tests := []struct { tests := []struct {
name string name string
args args args args
wantN int want []byte
wantW string
wantErr bool wantErr bool
}{ }{
// TODO: Add test cases. // TODO: Add test cases.
{
name: "missing start delimiter",
args: args{
frame: []byte{0x00, 0x02, 0x03, 0x00, 0x3},
},
want: nil,
wantErr: true,
},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
w := &bytes.Buffer{} got, err := parseFrame(tt.args.frame)
gotN, err := WriteFrame(w, tt.args.cmd)
if (err != nil) != tt.wantErr { if (err != nil) != tt.wantErr {
t.Errorf("WriteFrame() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("parseFrame() error = %v, wantErr %v", err, tt.wantErr)
return return
} }
if gotN != tt.wantN { if !reflect.DeepEqual(got, tt.want) {
t.Errorf("WriteFrame() = %v, want %v", gotN, tt.wantN) t.Errorf("parseFrame() = %v, want %v", got, tt.want)
}
if gotW := w.String(); gotW != tt.wantW {
t.Errorf("WriteFrame() = %v, want %v", gotW, tt.wantW)
} }
}) })
} }

View file

@ -2,72 +2,137 @@ package xbee
import ( import (
"bytes" "bytes"
"fmt"
"encoding/binary" "encoding/binary"
"fmt"
) )
// this file contains some AT command constants and // This code is handled slightly differently. Rather than use structs to represent
type ATCmdFrame struct { // the frame data, we instead use an interface that represents an AT command, and
Id byte // standalone functions that convert the AT command to frame data.
Cmd string // this way, we can add command support without needing to compose structs.
Param []byte // The downside is that it's more difficult to deal with.
Queued bool //
// AT command responses are handled with a struct to get the response status code.
//
// an ATCmd is anything that has a Payload function that returns the given payload
// pre-formatted to be send over the wire, and a Cmd command that returns the 2-character
// ATcommand itself. It must also have a parse response function that takes the response struct.
type ATCmd interface {
Payload() []byte // converts the AT command options to the binary argument format
Cmd() [2]rune // returns the AT command string.
Parse(*ATCmdResponse) error // takes a command response and parses the data into itself.
} }
// implement the frame stuff for us. // The AT command, in its raw format. we don't handle parameters since it's dealt with by
func (atFrame *ATCmdFrame) Bytes() ([]byte, error) { // the interface.
type RawATCmd []byte
// implements frameable. this is kinda stupid but makes it more safe.
func (b RawATCmd) Bytes() []byte {
return b
}
// EncodeATCommand takes an AT command and encodes it in the payload format.
// it takes the frame index (which can be zero) as well as if it should be queued or
// not. It encodes the AT command to be framed and sent over the wire and returns the packet
func encodeATCommand(at ATCmd, idx uint8, queued bool) RawATCmd {
// we encode a new byte slice that contains the cmd + payload concatenated correclty.
// this is then used to make the command frame, which contains ID/Type/Queued or not.
// the ATCmdFrame can be converted to bytes to be sent over the wire once framed.
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
if atFrame.Queued { // we change the frame type based on if it's queued.
// queued (batched) at comamnds have different Frame type if queued {
buf.WriteByte(byte(ATCmdQueue)) buf.WriteByte(byte(ATCmdQueueType))
} else { } else {
// normal frame type buf.WriteByte(byte(ATCmdType))
buf.WriteByte(byte(ATCmd))
} }
buf.WriteByte(atFrame.Id) // next is the provided frame identifier, used for callbacks.
buf.WriteByte(idx)
// write cmd, if it's the right length. cmd := at.Cmd()
if cmdLen := len(atFrame.Cmd); cmdLen != 2 { buf.WriteByte(byte(cmd[0]))
return nil, fmt.Errorf("AT command incorrect length: %d", cmdLen) buf.WriteByte(byte(cmd[1]))
} // the payload can be empty. This would make it a query.
buf.Write([]byte(atFrame.Cmd)) p := at.Payload()
buf.Write(p)
// write param. return buf.Bytes()
buf.Write(atFrame.Param)
return buf.Bytes(), nil
} }
type ATCmdResponse struct {
Cmd string
Status ATCmdStatus
Data []byte
}
func ParseATCmdResponse(p []byte) (*ATCmdResponse, error) {
if p[0] != 0x88 {
return nil, fmt.Errorf("invalid frame type 0x%x", p[0])
}
resp := &ATCmdResponse{
Cmd: string(p[2:4]),
Status: ATCmdStatus(p[4]),
// TODO: check if this overflows when there's no payload.
Data: p[5:],
}
return resp, nil
}
//go:generate stringer -output=at_cmd_status.go -type ATCmdStatus
type ATCmdStatus uint8
const (
ATCmdStatusOK ATCmdStatus = 0
ATCmdStatusErr ATCmdStatus = 1
ATCmdStatusInvalidCmd ATCmdStatus = 2
ATCmdStatusInvalidParam ATCmdStatus = 3
)
type RemoteATCmdReq struct { type RemoteATCmdReq struct {
ATCmdFrame
Destination uint64 Destination uint64
Options uint8 Options uint8
} }
func (remoteAT *RemoteATCmdReq) Bytes() ([]byte, error) { func encodeRemoteATCommand(at ATCmd, idx uint8, queued bool, destination uint64) RawATCmd {
// sizing take from
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
buf.WriteByte(byte(RemoteCmdReq))
buf.WriteByte(remoteAT.Id) buf.WriteByte(byte(RemoteCmdReqType))
a := make([]byte, 8) buf.WriteByte(idx)
binary.LittleEndian.PutUint64(a, remoteAT.Destination)
buf.Write(a)
// write the reserved part. binary.Write(buf, binary.BigEndian, destination)
buf.Write([]byte{0xFF, 0xFE})
// write options
buf.WriteByte(remoteAT.Options)
// now, write the AT command and the data. binary.Write(buf, binary.BigEndian, uint16(0xFFFE))
buf.Write([]byte(remoteAT.Cmd))
buf.Write(remoteAT.Param) // set remote command options. if idx = 0, we set bit zero (disable ack)
// if queued is true, we clear bit one (if false we set it)
return buf.Bytes(), nil var options uint8 = 0
if idx == 0 {
options = options | 0x1
}
if !queued {
options = options | 0x2
}
// write AT command
cmd := at.Cmd()
buf.WriteByte(byte(cmd[0]))
buf.WriteByte(byte(cmd[1]))
// write payload.
buf.Write(at.Payload())
return buf.Bytes()
} }
// let's actually define some AT commands now.

View file

@ -1,50 +1,60 @@
package xbee package xbee
import ( import (
"sync"
"errors" "errors"
"sync"
) )
// A connTrack is a simple frame mark utility for xbee packets. The xbee api frame // A connTrack is a simple frame mark utility for xbee packets. The xbee api frame
// takes a mark that is used when sending the response - this allows to coordinate // takes a mark that is used when sending the response - this allows to coordinate
// the sent packet and the response, since there may be other packets emitted // the sent packet and the response, since there may be other packets emitted
// between them. // between them. The data that is stored in the tag is a channel that contains an error - this
// is sent by the reader.
type connTrack struct { type connTrack struct {
mu sync.RWMutex // use RW mutex to allow for multiple readers mu sync.RWMutex // use RW mutex to allow for multiple readers
internal map[uint8]bool // map frame tag to if it's been used. internal map[uint8]chan []byte
// the map is set when writing a frame, and deleted when recieving a matching frame. // the map is set when writing a frame, and deleted when recieving a matching frame.
} }
// GetMark finds the next available marker and takes it, returning the value of // GetMark finds the next available marker and takes it, returning the value of
// the mark. If no mark can be acquired, it returns an error. // the mark as well as a channel to use as a semaphore when the mark is cleared.
func (ct *connTrack) GetMark() (uint8, error) { // If no mark can be acquired, it returns an error.
func (ct *connTrack) GetMark() (uint8, <-chan []byte, error) {
// get a read lock. // get a read lock.
ct.mu.RLock() ct.mu.RLock()
// NOTE: we start at one. This is because 0 will not return a frame ever - it's // NOTE: we start at one. This is because 0 will not return a frame ever - it's
// the "silent mode" mark // the "silent mode" mark
for i := 1; i < 256; i++ { for i := 1; i < 256; i++ {
if !ct.internal[uint8(i)] { if _, ok := ct.internal[uint8(i)]; !ok {
// it's free. // it's free.
// discard our read lock and lock for write. // discard our read lock and lock for write.
ct.mu.RUnlock() ct.mu.RUnlock()
ct.mu.Lock() ct.mu.Lock()
// update the value to true. // create the channel, makeit buffered so that we don't
ct.internal[uint8(i)] = true // block when we write the error when freeing the mark later.
ct.internal[uint8(i)] = make(chan []byte, 1)
ct.mu.Unlock() ct.mu.Unlock()
return uint8(i), nil return uint8(i), ct.internal[uint8(i)], nil
} }
} }
ct.mu.RUnlock() ct.mu.RUnlock()
return 0, errors.New("no available marks") return 0, nil, errors.New("no available marks")
} }
// ClearMark removes a given mark from the set if it exists, or returns an error. // ClearMark removes a given mark from the set if it exists, or returns an error.
func (ct *connTrack) ClearMark(mark uint8) error { // it takes an error (which can be nil) to send to the channel. this is used to free
// whatever command wrote that packet - be it a write() call or a custom AT command that is
// tracked.
func (ct *connTrack) ClearMark(mark uint8, data []byte) error {
ct.mu.RLock() ct.mu.RLock()
// FIXME: should this be the other way around (swap if and normal execution // FIXME: should this be the other way around (swap if and normal execution
if ct.internal[mark] { if val, ok := ct.internal[mark]; ok {
ct.mu.RUnlock() ct.mu.RUnlock()
ct.mu.Lock() ct.mu.Lock()
val <- data
close(val)
delete(ct.internal, mark) delete(ct.internal, mark)
ct.mu.Unlock() ct.mu.Unlock()
return nil return nil

View file

@ -17,7 +17,7 @@ func ParseRxFrame(data []byte) (*RxFrame, error) {
// i.e it excludes start delimiter, length, and checksum. // i.e it excludes start delimiter, length, and checksum.
// check the frame type (data[0]) // check the frame type (data[0])
if data[0] != byte(RxPkt) && data[0] != byte(RxPktExpl) { if data[0] != byte(RxPktType) {
return nil, fmt.Errorf("incorrect frame type 0x%x", data[0]) return nil, fmt.Errorf("incorrect frame type 0x%x", data[0])
} }

View file

@ -0,0 +1,242 @@
package xbee
import (
"bufio"
"fmt"
"io"
"sync"
"go.bug.st/serial"
"go.uber.org/zap"
)
// todo: make transport-agnostic (serial port or TCP/IP)
// A session is a simple way to manage an xbee device.
// it provides io.Reader and io.Writer, as well as some extra functions to handle
// custom Xbee frames.
type Session interface {
io.ReadWriteCloser
GetStatus() // todo: figure out signature for this
// Dial takes an address and allows direct communication with that
// device, without using broadcast.
Dial(addr uint64) io.ReadWriteCloser
// AT command related functions - query, set on local, query, set on remote.
ATCommand(cmd ATCmd, queued bool) (resp ATCmd, err error)
RemoteATCommand(cmd ATCmd, addr uint64) (resp ATCmd, err error)
}
type SerialSession struct {
port serial.Port
ct connTrack
log *zap.SugaredLogger
// todo: add queuing structures here for reliable transport and tracking.
// this buffer is used for storing data that must be read at some point.
rxBuf *bufio.ReadWriter
writeLock sync.Mutex // prevents multiple writers from accessing the port at once.
}
func NewSerialXBee(portName string, mode *serial.Mode) (*SerialSession, error) {
// make the session with the port/mode given, and set up the conntrack.
sess := &SerialSession{}
port, err := serial.Open(portName, mode)
if err != nil {
return sess, err
}
sess.port = port
sess.ct = connTrack{}
// setup io readwriter with a pipe.
rd, wr := io.Pipe()
// this is for reading data *only* - writes are different! it's a
// readWriter because the goroutine that runs scan continuously (and handles all other packets)
// will write to the buffer when new Rx packets come in, and we can read out from application code.
sess.rxBuf = bufio.NewReadWriter(bufio.NewReader(rd), bufio.NewWriter(wr))
logger, err := zap.NewDevelopment()
if err != nil {
return sess, err
}
sess.log = logger.Sugar()
// start the rx handler in the background. we close it later by closing the serial port.
go sess.rxHandler()
return sess, nil
}
// before we can write `Read(p []byte)` we have to have a goroutine that takes the input from
// the serial port and parses it out - if it's data, we push the data to a buffer for
// the application to read the bytes on its own.
//
// if it's a different kind of packet, we do custom functionality (free the conntrack, update
// local status, etc)
func (sess *SerialSession) rxHandler() {
// we wrap the serial port read line in a bufio.scanner using our custom split function.
scan := bufio.NewScanner(sess.port)
scan.Split(xbeeFrameSplit)
for scan.Scan() {
// TODO: check for errors?
// data is a frame payload - not a full frame.
data, err := parseFrame(scan.Bytes())
if err != nil {
sess.log.Warnw("error parsing frame", "error", err, "data", data)
continue
}
// data is good, lets parse the frame - using the first byte as the identifier.
switch XBeeCmd(data[0]) {
case RxPktType:
// we parse the data, and push it to the rx buffer.
//TODO: if we have multiple sources, we need to track them here.
frame, err := ParseRxFrame(data)
if err != nil {
sess.log.Warnw("error parsing rx packet", "error", err, "data", data)
break //continue?
}
// take the data and write it to our internal rx packet buffer.
_, err = sess.rxBuf.Write(frame.Payload)
if err != nil {
sess.log.Warnw("error writing data", "error", err, "payload", frame.Payload)
}
// the "callback"-style handler. Any received packet with a frame ID should
// be handled here.
case TxStatusType, ATCmdResponseType, RemoteCmdRespType: // these take the frame bytes and parse it themselves.
// we hand it back via the channel. we directly find the ID since it's always
// the second byte.
idx := data[1]
err := sess.ct.ClearMark(idx, data)
if err != nil {
// we got a rogue packet lol
sess.log.Warnw("rogue frame ID", "id", idx, "error", err)
}
default:
// we don't know what to do with it.
sess.log.Infow("unhandled packet type", "type", data[0], "id", data[1])
}
}
// if we get here, the serial port has closed. this is fine, usually.
}
// This implements io.Reader for the UART Session.
func (sess *SerialSession) Read(p []byte) (int, error) {
// Since we have an rx buffer, we just read from that and return the results.
return sess.rxBuf.Read(p)
}
func (sess *SerialSession) Write(p []byte) (n int, err error) {
// we construct a packet - using the conntrack to ensure that the packet is okay.
// we block - this is more correct.
idx, ch, err := sess.ct.GetMark()
if err != nil {
return
}
wf := &TxFrame{
Id: idx,
Destination: BroadcastAddr,
Payload: p,
}
// write the actual packet
sess.writeLock.Lock()
n, err = writeXBeeFrame(sess.port, wf.Bytes())
if err != nil {
return
}
sess.writeLock.Unlock()
// finally, wait for the channel we got to return. this means that
// the matching response frame was received, so we can parse it.
// TODO: add timeout.
responseFrame := <-ch
// this is a tx status frame bytes, so lets parse it out.
status, err := ParseTxStatusFrame(responseFrame)
if err != nil {
return
}
if status.Status != 0 {
err = fmt.Errorf("tx failed 0x%x", status.Status)
}
return
}
// sends a local AT command. If `queued` is true, the command is not immediately applied;
// instead, an AC command must be set to apply the queued changes. `queued` does not
// affect query-type commands, which always return right away.
// the AT command is an interface.
func (sess *SerialSession) ATCommand(at ATCmd, queued bool) error {
// we must encode the command, and then create the actual packet.
// then we send the packet, and wait for the response
// TODO: how to handle multiple-response-packet AT commands?
// (mainly Node Discovery ND)
// get a mark for the frame
isQuery := len(at.Payload()) > 0
idx, ch, err := sess.ct.GetMark()
if err != nil {
return err
}
rawData := encodeATCommand(at, idx, queued)
sess.writeLock.Lock()
_, err = writeXBeeFrame(sess.port, rawData)
sess.writeLock.Unlock()
if err != nil {
return fmt.Errorf("error writing xbee frame: %w", err)
}
// we use the AT command that was provided to decode the frame.
// Parse stores the response result locally.
// we parse the base frame ourselves, and if it's okay we pass it
// to the provided ATCommand
// TODO: add timeout.
resp, err := ParseATCmdResponse(<-ch)
if err != nil {
return err
}
if resp.Status != 0 {
// sinec ATCmdStatus is a stringer thanks to the generator
return fmt.Errorf("AT command failed: %s", resp.Status)
}
// finally, we use the provided ATCmd interface to unpack the data.
// this overwrites the values provided, but this should only happen
// during a query, so this is fine.
// TODO: skip if not a query command?
if isQuery {
return at.Parse(resp)
}
// it's not a query, and there was no error, so we just plain return
return nil
}
// Does this need to exist?
func (sess *SerialSession) GetStatus() {
panic("TODO: implement")
}
// Implement the io.Closer.
func (sess *SerialSession) Close() error {
return sess.port.Close()
}

View file

@ -3,7 +3,9 @@ package xbee
import ( import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"fmt"
) )
// transmissions to this address are instead broadcast // transmissions to this address are instead broadcast
const BroadcastAddr = 0xFFFF const BroadcastAddr = 0xFFFF
@ -15,15 +17,15 @@ type TxFrame struct {
Payload []byte Payload []byte
} }
func (txFrame *TxFrame) Bytes() ([]byte, error) { func (txFrame *TxFrame) Bytes() []byte {
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
buf.WriteByte(byte(TxReq)) buf.WriteByte(byte(TxReqType))
buf.WriteByte(txFrame.Id) buf.WriteByte(txFrame.Id)
a := make([]byte, 8) a := make([]byte, 8)
binary.LittleEndian.PutUint64(a, txFrame.Destination) binary.BigEndian.PutUint64(a, txFrame.Destination)
buf.Write(a) buf.Write(a)
// write the reserved part. // write the reserved part.
@ -36,5 +38,39 @@ func (txFrame *TxFrame) Bytes() ([]byte, error) {
buf.Write(txFrame.Payload) buf.Write(txFrame.Payload)
return buf.Bytes(), nil return buf.Bytes()
}
// we also handle transmit status response frames here.
// these are emitted by the xbee when the status of the tx packet is known.
// it has an Id that matches it to the corressponding transmit request.
type TxStatus uint8
//go:generate stringer -output=txStatus.go -type TxStatus
const (
TxStatusSuccess TxStatus = 0x00
TxStatusNoACK TxStatus = 0x01
TxStatusCCAFail TxStatus = 0x02
TxStatusIndirect TxStatus = 0x03
TxStatusTxFail TxStatus = 0x04
TxStatusACKFail TxStatus = 0x21
// TODO: finish this
)
type TxStatusFrame struct {
Id uint8 // the Frame identifier that this status frame represents.
Status TxStatus // the status itself - TxStatus is a stringable.
}
func ParseTxStatusFrame(data []byte) (*TxStatusFrame, error) {
if data[0] != byte(TxStatusType) {
return nil, fmt.Errorf("incorrect frame type for Tx status frame 0x%x", data[0])
}
status := &TxStatusFrame{
Id: data[1],
Status: TxStatus(data[2]),
}
return status, nil
} }