From 5d42f8693ae05a280e4179fe52322bafe633531b Mon Sep 17 00:00:00 2001 From: saji Date: Sat, 29 Apr 2023 10:58:56 -0500 Subject: [PATCH] more xbee work --- cmd/server.go | 158 ++++++++++++++++++++- go.mod | 5 + go.sum | 10 ++ internal/xbee/api_frame.go | 74 ++++------ internal/xbee/api_frame_test.go | 28 ++-- internal/xbee/at.go | 145 +++++++++++++------ internal/xbee/conntrack.go | 38 +++-- internal/xbee/rxframe.go | 2 +- internal/xbee/session.go | 242 ++++++++++++++++++++++++++++++++ internal/xbee/txframe.go | 44 +++++- 10 files changed, 624 insertions(+), 122 deletions(-) diff --git a/cmd/server.go b/cmd/server.go index 23e740d..701dbff 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -5,9 +5,13 @@ import ( "net" "time" + "github.com/kschamplin/gotelem/internal/can" "github.com/kschamplin/gotelem/internal/gotelem" + "github.com/kschamplin/gotelem/internal/socketcan" + "github.com/kschamplin/gotelem/internal/xbee" "github.com/tinylib/msgp/msgp" "github.com/urfave/cli/v2" + "go.bug.st/serial" ) const xbeeCategory = "XBee settings" @@ -25,7 +29,6 @@ var serveCmd = &cli.Command{ }, } - type session struct { conn net.Conn send chan gotelem.Body @@ -34,12 +37,17 @@ type session struct { } func serve() { + // start the can listener + go vcanTest() + fromCanBus := make(chan can.Frame, 1) + toCanBus := make(chan can.Frame) + go canHandler(toCanBus, fromCanBus) ln, err := net.Listen("tcp", ":8082") if err != nil { fmt.Printf("Error listening: %v\n", err) } fmt.Printf("Listening on :8082\n") - + for { conn, err := ln.Accept() if err != nil { @@ -53,11 +61,6 @@ func handleCon(conn net.Conn) { // reader := msgp.NewReader(conn) writer := msgp.NewWriter(conn) for { - // data := telemnet.StatusBody{ - // BatteryPct: 1.2, - // ErrCode: 0, - // } - // data.EncodeMsg(writer) data := gotelem.StatusBody{ BatteryPct: 1.2, ErrCode: 0, @@ -67,3 +70,144 @@ func handleCon(conn net.Conn) { time.Sleep(1 * time.Second) } } + +func xbeeSvc(packets <-chan can.Frame, device string, quit <-chan struct{}) { + + // open the session. + mode := &serial.Mode{ + BaudRate: 115200, + } + sess, err := xbee.NewSerialXBee("/dev/ttyUSB1", mode) + if err != nil { + fmt.Printf("got error %v", err) + panic(err) + } + + receivedData := make(chan gotelem.Data) + // make a simple reader that goes in the background. + go func() { + msgpReader := msgp.NewReader(sess) + var msgData gotelem.Data + msgData.DecodeMsg(msgpReader) + receivedData <- msgData + }() + for { + select { + case data := <-receivedData: + fmt.Printf("Got a data %v\n", data) + case packet := <-packets: + fmt.Printf("Got a packet, %v\n", packet) + } + + } +} + +// this spins up a new can socket on vcan0 and broadcasts a packet every second. +func vcanTest() { + sock, _ := socketcan.NewCanSocket("vcan0") + testFrame := &can.Frame{ + Id: 0x234, + Kind: can.SFF, + Data: []byte{0, 1, 2, 3, 4, 5, 6, 7}, + } + for { + + fmt.Printf("sending test packet\n") + sock.Send(testFrame) + time.Sleep(1 * time.Second) + } +} + +func canHandler(pktToSend <-chan can.Frame, pktRecv chan<- can.Frame) { + sock, _ := socketcan.NewCanSocket("vcan0") + + // start a simple dispatcher that just relays can frames. + r := make(chan can.Frame) + go func() { + for { + pkt, _ := sock.Recv() + r <- *pkt + } + }() + for { + select { + case msg := <-pktToSend: + sock.Send(&msg) + case msg := <-r: + fmt.Printf("got a packet from the can %v\n", msg) + pktRecv <- msg + } + } +} + +type BrokerRequest struct { + Source string // the name of the sender + Msg can.Frame // the message to send +} +type BrokerClient struct { + Name string // the name of the client + Ch chan can.Frame // the channel to send frames to this client +} +type Broker struct { + subs map[string]chan can.Frame + + publishCh chan BrokerRequest + + subsCh chan BrokerClient + unsubCh chan BrokerClient +} + +func NewBroker(bufsize int) *Broker{ + b := &Broker{ + subs: make(map[string]chan can.Frame), + publishCh: make(chan BrokerRequest, 3), + subsCh: make(chan BrokerClient, 3), + unsubCh: make(chan BrokerClient, 3), + } + return b +} + +func (b *Broker) Start() { + + for { + select { + case newClient := <-b.subsCh: + b.subs[newClient.Name] = newClient.Ch + case req := <-b.publishCh: + for name, ch := range b.subs { + if name == req.Source { + continue // don't send to ourselves. + } + // a kinda-inelegant non-blocking push. + // if we can't do it, we just drop it. this should ideally never happen. + select { + case ch <- req.Msg: + default: + fmt.Printf("we dropped a packet to dest %s", name) + } + } + case clientToRemove := <-b.unsubCh: + close(b.subs[clientToRemove.Name]) + delete(b.subs, clientToRemove.Name) + } + } +} + +func (b *Broker) Publish(name string, msg can.Frame) { + breq := BrokerRequest{ + Source: name, + Msg: msg, + } + b.publishCh <- breq +} + +func (b *Broker) Subscribe(name string) <-chan can.Frame { + ch := make(chan can.Frame, 3) + + bc := BrokerClient{ + Name: name, + Ch: ch, + } + b.subsCh <- bc + return ch +} diff --git a/go.mod b/go.mod index 10d8477..13ce34a 100644 --- a/go.mod +++ b/go.mod @@ -9,12 +9,17 @@ require ( require ( github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect + github.com/creack/goselect v0.1.2 // indirect github.com/jmoiron/sqlx v1.3.5 // indirect github.com/mattn/go-sqlite3 v1.14.16 // indirect github.com/philhofer/fwd v1.1.2 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/tinylib/msgp v1.1.8 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect + go.bug.st/serial v1.5.0 // indirect + go.uber.org/atomic v1.10.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.24.0 // indirect golang.org/x/mod v0.10.0 // indirect golang.org/x/tools v0.8.0 // indirect ) diff --git a/go.sum b/go.sum index c24c542..b472da2 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/creack/goselect v0.1.2 h1:2DNy14+JPjRBgPzAd1thbQp4BSIihxcBf0IXhQXDRa0= +github.com/creack/goselect v0.1.2/go.mod h1:a/NhLweNvqIYMuxcMOuWY516Cimucms3DglDzQP3hKY= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g= github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ= @@ -18,6 +20,14 @@ github.com/urfave/cli/v2 v2.25.1/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6f github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.bug.st/serial v1.5.0 h1:ThuUkHpOEmCVXxGEfpoExjQCS2WBVV4ZcUKVYInM9T4= +go.bug.st/serial v1.5.0/go.mod h1:UABfsluHAiaNI+La2iESysd9Vetq7VRdpxvjx7CmmOE= +go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= +go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= +go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= diff --git a/internal/xbee/api_frame.go b/internal/xbee/api_frame.go index b317d3e..96baa6e 100644 --- a/internal/xbee/api_frame.go +++ b/internal/xbee/api_frame.go @@ -22,13 +22,10 @@ import ( type Frameable interface { // returns the API identifier for this frame. - GetId() byte // encodes this frame correctly. - Bytes() ([]byte, error) + Bytes() []byte } - - // calculateChecksum is a helper function to calculate the 1-byte checksum of a data range. // the data range does not include the start delimiter, or the length uint16 (only the frame payload) func calculateChecksum(data []byte) byte { @@ -39,21 +36,20 @@ func calculateChecksum(data []byte) byte { return 0xFF - sum } -// WriteFrame takes a frameable and writes it out to the given writer. -func WriteFrame(w io.Writer, cmd Frameable) (n int, err error) { - frame_data, err := cmd.Bytes() +// writeXBeeFrame takes some bytes and wraps it in an XBee frame. +// +// An XBee frame has a start delimiter, followed by the length of the payload, +// then the payload itself, and finally a checksum. +func writeXBeeFrame(w io.Writer, data []byte) (n int, err error) { - if err != nil { - return - } - frame := make([]byte, len(frame_data)+4) + frame := make([]byte, len(data)+4) frame[0] = 0x7E - binary.BigEndian.PutUint16(frame[1:], uint16(len(frame_data))) + binary.BigEndian.PutUint16(frame[1:], uint16(len(data))) - copy(frame[3:], frame_data) + copy(frame[3:], data) - chk := calculateChecksum(frame_data) + chk := calculateChecksum(data) frame[len(frame)-1] = chk return w.Write(frame) @@ -69,23 +65,23 @@ type XBeeCmd byte const ( // commands sent to the xbee s3b - ATCmd XBeeCmd = 0x08 // AT Command - ATCmdQueue XBeeCmd = 0x09 // AT Command - Queue Parameter Value - TxReq XBeeCmd = 0x10 // TX Request - TxReqExpl XBeeCmd = 0x11 // Explicit TX Request - RemoteCmdReq XBeeCmd = 0x17 // Remote Command Request + ATCmdType XBeeCmd = 0x08 // AT Command + ATCmdQueueType XBeeCmd = 0x09 // AT Command - Queue Parameter Value + TxReqType XBeeCmd = 0x10 // TX Request + TxReqExplType XBeeCmd = 0x11 // Explicit TX Request + RemoteCmdReqType XBeeCmd = 0x17 // Remote Command Request // commands recieved from the xbee - ATCmdResponse XBeeCmd = 0x88 // AT Command Response - ModemStatus XBeeCmd = 0x8A // Modem Status - TxStatus XBeeCmd = 0x8B // Transmit Status - RouteInfoPkt XBeeCmd = 0x8D // Route information packet - AddrUpdate XBeeCmd = 0x8E // Aggregate Addressing Update - RxPkt XBeeCmd = 0x90 // RX Indicator (AO=0) - RxPktExpl XBeeCmd = 0x91 // Explicit RX Indicator (AO=1) - IOSample XBeeCmd = 0x92 // Data Sample RX Indicator - NodeId XBeeCmd = 0x95 // Note Identification Indicator - RemoteCmdResp XBeeCmd = 0x97 // Remote Command Response + ATCmdResponseType XBeeCmd = 0x88 // AT Command Response + ModemStatusType XBeeCmd = 0x8A // Modem Status + TxStatusType XBeeCmd = 0x8B // Transmit Status + RouteInfoType XBeeCmd = 0x8D // Route information packet + AddrUpdateType XBeeCmd = 0x8E // Aggregate Addressing Update + RxPktType XBeeCmd = 0x90 // RX Indicator (AO=0) + RxPktExplType XBeeCmd = 0x91 // Explicit RX Indicator (AO=1) + IOSampleType XBeeCmd = 0x92 // Data Sample RX Indicator + NodeIdType XBeeCmd = 0x95 // Note Identification Indicator + RemoteCmdRespType XBeeCmd = 0x97 // Remote Command Response ) // AT commands are hard, so let's write out all the major ones here @@ -115,42 +111,34 @@ func xbeeFrameSplit(data []byte, atEOF bool) (advance int, token []byte, err err return startIdx, nil, nil } // FIXME: add bounds checking! this can panic. - var frameLen = binary.BigEndian.Uint16(data[startIdx+1:startIdx+3]) + 4 - if len(data[startIdx:]) < int(frameLen) { + var frameLen = int(binary.BigEndian.Uint16(data[startIdx+1:startIdx+3])) + 4 + if len(data[startIdx:]) < frameLen { // we got the length, but there's not enough data for the frame. we can trim the // data that came before the start, but not return a token. return startIdx, nil, nil } // there is enough data to pull a frame. // todo: check checksum here? we can return an error. - return startIdx + int(frameLen), data[startIdx : startIdx+int(frameLen)], nil + return startIdx + frameLen, data[startIdx : startIdx+frameLen], nil } // we didn't find a start character in our data, so request more. trash everythign given to us return len(data), nil, nil } - func parseFrame(frame []byte) ([]byte, error) { if frame[0] != 0x7E { return nil, errors.New("incorrect start delimiter") } fsize := len(frame) - if calculateChecksum(frame[3:fsize - 1]) != frame[fsize] { + if calculateChecksum(frame[3:fsize-1]) != frame[fsize] { return nil, errors.New("checksum mismatch") } - return frame[3:fsize - 1], nil + return frame[3 : fsize-1], nil } - // stackup // low level readwriter (serial or IP socket) // XBee library layer (frame encoding/decoding to/from structs) // xbee conn-like layer (ReadWriter + custom control functions) // application marshalling format (msgpack or json or gob) -// application - - -type XBeeConn interface { - io.ReadWriter - WriteFrame(Frameable, bool) Frameable -} +// application diff --git a/internal/xbee/api_frame_test.go b/internal/xbee/api_frame_test.go index 8ccbc88..d22feff 100644 --- a/internal/xbee/api_frame_test.go +++ b/internal/xbee/api_frame_test.go @@ -1,7 +1,6 @@ package xbee import ( - "bytes" "reflect" "testing" ) @@ -107,32 +106,35 @@ func Test_xbeeFrameSplit(t *testing.T) { } } -func TestWriteFrame(t *testing.T) { +func Test_parseFrame(t *testing.T) { type args struct { - cmd Frameable + frame []byte } tests := []struct { name string args args - wantN int - wantW string + want []byte wantErr bool }{ // TODO: Add test cases. + { + name: "missing start delimiter", + args: args{ + frame: []byte{0x00, 0x02, 0x03, 0x00, 0x3}, + }, + want: nil, + wantErr: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - w := &bytes.Buffer{} - gotN, err := WriteFrame(w, tt.args.cmd) + got, err := parseFrame(tt.args.frame) if (err != nil) != tt.wantErr { - t.Errorf("WriteFrame() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("parseFrame() error = %v, wantErr %v", err, tt.wantErr) return } - if gotN != tt.wantN { - t.Errorf("WriteFrame() = %v, want %v", gotN, tt.wantN) - } - if gotW := w.String(); gotW != tt.wantW { - t.Errorf("WriteFrame() = %v, want %v", gotW, tt.wantW) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("parseFrame() = %v, want %v", got, tt.want) } }) } diff --git a/internal/xbee/at.go b/internal/xbee/at.go index b838d65..53b89c0 100644 --- a/internal/xbee/at.go +++ b/internal/xbee/at.go @@ -2,72 +2,137 @@ package xbee import ( "bytes" - "fmt" "encoding/binary" + "fmt" ) -// this file contains some AT command constants and -type ATCmdFrame struct { - Id byte - Cmd string - Param []byte - Queued bool +// This code is handled slightly differently. Rather than use structs to represent +// the frame data, we instead use an interface that represents an AT command, and +// standalone functions that convert the AT command to frame data. +// this way, we can add command support without needing to compose structs. +// The downside is that it's more difficult to deal with. +// +// AT command responses are handled with a struct to get the response status code. +// + +// an ATCmd is anything that has a Payload function that returns the given payload +// pre-formatted to be send over the wire, and a Cmd command that returns the 2-character +// ATcommand itself. It must also have a parse response function that takes the response struct. +type ATCmd interface { + Payload() []byte // converts the AT command options to the binary argument format + Cmd() [2]rune // returns the AT command string. + Parse(*ATCmdResponse) error // takes a command response and parses the data into itself. } -// implement the frame stuff for us. -func (atFrame *ATCmdFrame) Bytes() ([]byte, error) { +// The AT command, in its raw format. we don't handle parameters since it's dealt with by +// the interface. +type RawATCmd []byte + +// implements frameable. this is kinda stupid but makes it more safe. +func (b RawATCmd) Bytes() []byte { + return b +} + +// EncodeATCommand takes an AT command and encodes it in the payload format. +// it takes the frame index (which can be zero) as well as if it should be queued or +// not. It encodes the AT command to be framed and sent over the wire and returns the packet +func encodeATCommand(at ATCmd, idx uint8, queued bool) RawATCmd { + // we encode a new byte slice that contains the cmd + payload concatenated correclty. + // this is then used to make the command frame, which contains ID/Type/Queued or not. + // the ATCmdFrame can be converted to bytes to be sent over the wire once framed. + buf := new(bytes.Buffer) - if atFrame.Queued { - // queued (batched) at comamnds have different Frame type - buf.WriteByte(byte(ATCmdQueue)) - + // we change the frame type based on if it's queued. + if queued { + buf.WriteByte(byte(ATCmdQueueType)) } else { - // normal frame type - buf.WriteByte(byte(ATCmd)) - + buf.WriteByte(byte(ATCmdType)) } - buf.WriteByte(atFrame.Id) + // next is the provided frame identifier, used for callbacks. + buf.WriteByte(idx) - // write cmd, if it's the right length. - if cmdLen := len(atFrame.Cmd); cmdLen != 2 { - return nil, fmt.Errorf("AT command incorrect length: %d", cmdLen) - } - buf.Write([]byte(atFrame.Cmd)) + cmd := at.Cmd() + buf.WriteByte(byte(cmd[0])) + buf.WriteByte(byte(cmd[1])) + // the payload can be empty. This would make it a query. + p := at.Payload() + buf.Write(p) - // write param. - buf.Write(atFrame.Param) - return buf.Bytes(), nil + return buf.Bytes() } +type ATCmdResponse struct { + Cmd string + Status ATCmdStatus + Data []byte +} + +func ParseATCmdResponse(p []byte) (*ATCmdResponse, error) { + + if p[0] != 0x88 { + return nil, fmt.Errorf("invalid frame type 0x%x", p[0]) + } + resp := &ATCmdResponse{ + Cmd: string(p[2:4]), + Status: ATCmdStatus(p[4]), + // TODO: check if this overflows when there's no payload. + Data: p[5:], + } + + return resp, nil +} + +//go:generate stringer -output=at_cmd_status.go -type ATCmdStatus +type ATCmdStatus uint8 + +const ( + ATCmdStatusOK ATCmdStatus = 0 + ATCmdStatusErr ATCmdStatus = 1 + + ATCmdStatusInvalidCmd ATCmdStatus = 2 + ATCmdStatusInvalidParam ATCmdStatus = 3 +) type RemoteATCmdReq struct { - ATCmdFrame Destination uint64 Options uint8 } -func (remoteAT *RemoteATCmdReq) Bytes() ([]byte, error) { +func encodeRemoteATCommand(at ATCmd, idx uint8, queued bool, destination uint64) RawATCmd { + + // sizing take from buf := new(bytes.Buffer) - buf.WriteByte(byte(RemoteCmdReq)) - buf.WriteByte(remoteAT.Id) + buf.WriteByte(byte(RemoteCmdReqType)) - a := make([]byte, 8) - binary.LittleEndian.PutUint64(a, remoteAT.Destination) - buf.Write(a) + buf.WriteByte(idx) - // write the reserved part. - buf.Write([]byte{0xFF, 0xFE}) - // write options - buf.WriteByte(remoteAT.Options) + binary.Write(buf, binary.BigEndian, destination) - // now, write the AT command and the data. - buf.Write([]byte(remoteAT.Cmd)) + binary.Write(buf, binary.BigEndian, uint16(0xFFFE)) - buf.Write(remoteAT.Param) + // set remote command options. if idx = 0, we set bit zero (disable ack) + // if queued is true, we clear bit one (if false we set it) - return buf.Bytes(), nil + var options uint8 = 0 + if idx == 0 { + options = options | 0x1 + } + if !queued { + options = options | 0x2 + } + // write AT command + cmd := at.Cmd() + buf.WriteByte(byte(cmd[0])) + buf.WriteByte(byte(cmd[1])) + + // write payload. + buf.Write(at.Payload()) + + return buf.Bytes() } + +// let's actually define some AT commands now. diff --git a/internal/xbee/conntrack.go b/internal/xbee/conntrack.go index 1c87b26..44f4e50 100644 --- a/internal/xbee/conntrack.go +++ b/internal/xbee/conntrack.go @@ -1,50 +1,60 @@ package xbee + import ( - "sync" "errors" + "sync" ) // A connTrack is a simple frame mark utility for xbee packets. The xbee api frame // takes a mark that is used when sending the response - this allows to coordinate // the sent packet and the response, since there may be other packets emitted -// between them. +// between them. The data that is stored in the tag is a channel that contains an error - this +// is sent by the reader. type connTrack struct { - mu sync.RWMutex // use RW mutex to allow for multiple readers - internal map[uint8]bool // map frame tag to if it's been used. + mu sync.RWMutex // use RW mutex to allow for multiple readers + internal map[uint8]chan []byte // the map is set when writing a frame, and deleted when recieving a matching frame. } // GetMark finds the next available marker and takes it, returning the value of -// the mark. If no mark can be acquired, it returns an error. -func (ct *connTrack) GetMark() (uint8, error) { +// the mark as well as a channel to use as a semaphore when the mark is cleared. +// If no mark can be acquired, it returns an error. +func (ct *connTrack) GetMark() (uint8, <-chan []byte, error) { // get a read lock. ct.mu.RLock() - // NOTE: we start at one. This is because 0 will not return a frame ever - it's + // NOTE: we start at one. This is because 0 will not return a frame ever - it's // the "silent mode" mark for i := 1; i < 256; i++ { - if !ct.internal[uint8(i)] { + if _, ok := ct.internal[uint8(i)]; !ok { // it's free. // discard our read lock and lock for write. ct.mu.RUnlock() ct.mu.Lock() - // update the value to true. - ct.internal[uint8(i)] = true + // create the channel, makeit buffered so that we don't + // block when we write the error when freeing the mark later. + ct.internal[uint8(i)] = make(chan []byte, 1) ct.mu.Unlock() - return uint8(i), nil + return uint8(i), ct.internal[uint8(i)], nil } } ct.mu.RUnlock() - return 0, errors.New("no available marks") + return 0, nil, errors.New("no available marks") } + // ClearMark removes a given mark from the set if it exists, or returns an error. -func (ct *connTrack) ClearMark(mark uint8) error { +// it takes an error (which can be nil) to send to the channel. this is used to free +// whatever command wrote that packet - be it a write() call or a custom AT command that is +// tracked. +func (ct *connTrack) ClearMark(mark uint8, data []byte) error { ct.mu.RLock() // FIXME: should this be the other way around (swap if and normal execution - if ct.internal[mark] { + if val, ok := ct.internal[mark]; ok { ct.mu.RUnlock() ct.mu.Lock() + val <- data + close(val) delete(ct.internal, mark) ct.mu.Unlock() return nil diff --git a/internal/xbee/rxframe.go b/internal/xbee/rxframe.go index 525f738..5e9b800 100644 --- a/internal/xbee/rxframe.go +++ b/internal/xbee/rxframe.go @@ -17,7 +17,7 @@ func ParseRxFrame(data []byte) (*RxFrame, error) { // i.e it excludes start delimiter, length, and checksum. // check the frame type (data[0]) - if data[0] != byte(RxPkt) && data[0] != byte(RxPktExpl) { + if data[0] != byte(RxPktType) { return nil, fmt.Errorf("incorrect frame type 0x%x", data[0]) } diff --git a/internal/xbee/session.go b/internal/xbee/session.go index e69de29..b4aab9b 100644 --- a/internal/xbee/session.go +++ b/internal/xbee/session.go @@ -0,0 +1,242 @@ +package xbee + +import ( + "bufio" + "fmt" + "io" + "sync" + + "go.bug.st/serial" + "go.uber.org/zap" +) + +// todo: make transport-agnostic (serial port or TCP/IP) + +// A session is a simple way to manage an xbee device. +// it provides io.Reader and io.Writer, as well as some extra functions to handle +// custom Xbee frames. +type Session interface { + io.ReadWriteCloser + GetStatus() // todo: figure out signature for this + + // Dial takes an address and allows direct communication with that + // device, without using broadcast. + Dial(addr uint64) io.ReadWriteCloser + // AT command related functions - query, set on local, query, set on remote. + + ATCommand(cmd ATCmd, queued bool) (resp ATCmd, err error) + RemoteATCommand(cmd ATCmd, addr uint64) (resp ATCmd, err error) +} + +type SerialSession struct { + port serial.Port + ct connTrack + log *zap.SugaredLogger + // todo: add queuing structures here for reliable transport and tracking. + // this buffer is used for storing data that must be read at some point. + rxBuf *bufio.ReadWriter + writeLock sync.Mutex // prevents multiple writers from accessing the port at once. +} + +func NewSerialXBee(portName string, mode *serial.Mode) (*SerialSession, error) { + // make the session with the port/mode given, and set up the conntrack. + sess := &SerialSession{} + + port, err := serial.Open(portName, mode) + if err != nil { + return sess, err + } + sess.port = port + + sess.ct = connTrack{} + + // setup io readwriter with a pipe. + rd, wr := io.Pipe() + // this is for reading data *only* - writes are different! it's a + // readWriter because the goroutine that runs scan continuously (and handles all other packets) + // will write to the buffer when new Rx packets come in, and we can read out from application code. + sess.rxBuf = bufio.NewReadWriter(bufio.NewReader(rd), bufio.NewWriter(wr)) + + logger, err := zap.NewDevelopment() + if err != nil { + return sess, err + } + sess.log = logger.Sugar() + + // start the rx handler in the background. we close it later by closing the serial port. + + go sess.rxHandler() + + return sess, nil +} + +// before we can write `Read(p []byte)` we have to have a goroutine that takes the input from +// the serial port and parses it out - if it's data, we push the data to a buffer for +// the application to read the bytes on its own. +// +// if it's a different kind of packet, we do custom functionality (free the conntrack, update +// local status, etc) +func (sess *SerialSession) rxHandler() { + // we wrap the serial port read line in a bufio.scanner using our custom split function. + scan := bufio.NewScanner(sess.port) + scan.Split(xbeeFrameSplit) + + for scan.Scan() { + // TODO: check for errors? + // data is a frame payload - not a full frame. + data, err := parseFrame(scan.Bytes()) + if err != nil { + sess.log.Warnw("error parsing frame", "error", err, "data", data) + continue + } + // data is good, lets parse the frame - using the first byte as the identifier. + + switch XBeeCmd(data[0]) { + case RxPktType: + // we parse the data, and push it to the rx buffer. + //TODO: if we have multiple sources, we need to track them here. + frame, err := ParseRxFrame(data) + if err != nil { + sess.log.Warnw("error parsing rx packet", "error", err, "data", data) + break //continue? + } + // take the data and write it to our internal rx packet buffer. + _, err = sess.rxBuf.Write(frame.Payload) + if err != nil { + sess.log.Warnw("error writing data", "error", err, "payload", frame.Payload) + } + + // the "callback"-style handler. Any received packet with a frame ID should + // be handled here. + case TxStatusType, ATCmdResponseType, RemoteCmdRespType: // these take the frame bytes and parse it themselves. + // we hand it back via the channel. we directly find the ID since it's always + // the second byte. + idx := data[1] + + err := sess.ct.ClearMark(idx, data) + if err != nil { + // we got a rogue packet lol + sess.log.Warnw("rogue frame ID", "id", idx, "error", err) + } + + default: + // we don't know what to do with it. + sess.log.Infow("unhandled packet type", "type", data[0], "id", data[1]) + + } + + } + // if we get here, the serial port has closed. this is fine, usually. +} + +// This implements io.Reader for the UART Session. +func (sess *SerialSession) Read(p []byte) (int, error) { + // Since we have an rx buffer, we just read from that and return the results. + return sess.rxBuf.Read(p) +} + +func (sess *SerialSession) Write(p []byte) (n int, err error) { + // we construct a packet - using the conntrack to ensure that the packet is okay. + // we block - this is more correct. + idx, ch, err := sess.ct.GetMark() + if err != nil { + return + } + wf := &TxFrame{ + Id: idx, + Destination: BroadcastAddr, + Payload: p, + } + // write the actual packet + + sess.writeLock.Lock() + n, err = writeXBeeFrame(sess.port, wf.Bytes()) + if err != nil { + return + } + sess.writeLock.Unlock() + + // finally, wait for the channel we got to return. this means that + // the matching response frame was received, so we can parse it. + // TODO: add timeout. + responseFrame := <-ch + + // this is a tx status frame bytes, so lets parse it out. + status, err := ParseTxStatusFrame(responseFrame) + if err != nil { + return + } + + if status.Status != 0 { + err = fmt.Errorf("tx failed 0x%x", status.Status) + } + + return +} + +// sends a local AT command. If `queued` is true, the command is not immediately applied; +// instead, an AC command must be set to apply the queued changes. `queued` does not +// affect query-type commands, which always return right away. +// the AT command is an interface. +func (sess *SerialSession) ATCommand(at ATCmd, queued bool) error { + // we must encode the command, and then create the actual packet. + // then we send the packet, and wait for the response + // TODO: how to handle multiple-response-packet AT commands? + // (mainly Node Discovery ND) + + // get a mark for the frame + + isQuery := len(at.Payload()) > 0 + idx, ch, err := sess.ct.GetMark() + if err != nil { + return err + } + rawData := encodeATCommand(at, idx, queued) + + sess.writeLock.Lock() + _, err = writeXBeeFrame(sess.port, rawData) + sess.writeLock.Unlock() + + if err != nil { + return fmt.Errorf("error writing xbee frame: %w", err) + } + + // we use the AT command that was provided to decode the frame. + // Parse stores the response result locally. + // we parse the base frame ourselves, and if it's okay we pass it + // to the provided ATCommand + + // TODO: add timeout. + resp, err := ParseATCmdResponse(<-ch) + if err != nil { + return err + } + + if resp.Status != 0 { + // sinec ATCmdStatus is a stringer thanks to the generator + return fmt.Errorf("AT command failed: %s", resp.Status) + } + + // finally, we use the provided ATCmd interface to unpack the data. + // this overwrites the values provided, but this should only happen + // during a query, so this is fine. + // TODO: skip if not a query command? + + if isQuery { + return at.Parse(resp) + } + + // it's not a query, and there was no error, so we just plain return + return nil + +} + +// Does this need to exist? +func (sess *SerialSession) GetStatus() { + panic("TODO: implement") +} + +// Implement the io.Closer. +func (sess *SerialSession) Close() error { + return sess.port.Close() +} diff --git a/internal/xbee/txframe.go b/internal/xbee/txframe.go index 3fd0f37..7f370cd 100644 --- a/internal/xbee/txframe.go +++ b/internal/xbee/txframe.go @@ -3,7 +3,9 @@ package xbee import ( "bytes" "encoding/binary" + "fmt" ) + // transmissions to this address are instead broadcast const BroadcastAddr = 0xFFFF @@ -15,15 +17,15 @@ type TxFrame struct { Payload []byte } -func (txFrame *TxFrame) Bytes() ([]byte, error) { +func (txFrame *TxFrame) Bytes() []byte { buf := new(bytes.Buffer) - buf.WriteByte(byte(TxReq)) + buf.WriteByte(byte(TxReqType)) buf.WriteByte(txFrame.Id) a := make([]byte, 8) - binary.LittleEndian.PutUint64(a, txFrame.Destination) + binary.BigEndian.PutUint64(a, txFrame.Destination) buf.Write(a) // write the reserved part. @@ -36,5 +38,39 @@ func (txFrame *TxFrame) Bytes() ([]byte, error) { buf.Write(txFrame.Payload) - return buf.Bytes(), nil + return buf.Bytes() +} + +// we also handle transmit status response frames here. +// these are emitted by the xbee when the status of the tx packet is known. +// it has an Id that matches it to the corressponding transmit request. +type TxStatus uint8 + +//go:generate stringer -output=txStatus.go -type TxStatus +const ( + TxStatusSuccess TxStatus = 0x00 + TxStatusNoACK TxStatus = 0x01 + TxStatusCCAFail TxStatus = 0x02 + TxStatusIndirect TxStatus = 0x03 + TxStatusTxFail TxStatus = 0x04 + TxStatusACKFail TxStatus = 0x21 + // TODO: finish this +) + +type TxStatusFrame struct { + Id uint8 // the Frame identifier that this status frame represents. + Status TxStatus // the status itself - TxStatus is a stringable. +} + +func ParseTxStatusFrame(data []byte) (*TxStatusFrame, error) { + if data[0] != byte(TxStatusType) { + return nil, fmt.Errorf("incorrect frame type for Tx status frame 0x%x", data[0]) + } + + status := &TxStatusFrame{ + Id: data[1], + Status: TxStatus(data[2]), + } + + return status, nil }