package xbee import ( "bufio" "fmt" "io" "sync" "go.bug.st/serial" "go.uber.org/zap" ) // todo: make transport-agnostic (serial port or TCP/IP) // A session is a simple way to manage an xbee device. // it provides io.Reader and io.Writer, as well as some extra functions to handle // custom Xbee frames. type Session interface { io.ReadWriteCloser GetStatus() // todo: figure out signature for this // Dial takes an address and allows direct communication with that // device, without using broadcast. Dial(addr uint64) io.ReadWriteCloser // AT command related functions - query, set on local, query, set on remote. ATCommand(cmd ATCmd, queued bool) (resp ATCmd, err error) RemoteATCommand(cmd ATCmd, addr uint64) (resp ATCmd, err error) } type SerialSession struct { port serial.Port ct connTrack log *zap.SugaredLogger // todo: add queuing structures here for reliable transport and tracking. // this buffer is used for storing data that must be read at some point. rxBuf *bufio.ReadWriter writeLock sync.Mutex // prevents multiple writers from accessing the port at once. } func NewSerialXBee(portName string, mode *serial.Mode) (*SerialSession, error) { // make the session with the port/mode given, and set up the conntrack. sess := &SerialSession{} port, err := serial.Open(portName, mode) if err != nil { return sess, err } sess.port = port sess.ct = connTrack{} // setup io readwriter with a pipe. rd, wr := io.Pipe() // this is for reading data *only* - writes are different! it's a // readWriter because the goroutine that runs scan continuously (and handles all other packets) // will write to the buffer when new Rx packets come in, and we can read out from application code. sess.rxBuf = bufio.NewReadWriter(bufio.NewReader(rd), bufio.NewWriter(wr)) logger, err := zap.NewDevelopment() if err != nil { return sess, err } sess.log = logger.Sugar() // start the rx handler in the background. we close it later by closing the serial port. go sess.rxHandler() return sess, nil } // before we can write `Read(p []byte)` we have to have a goroutine that takes the input from // the serial port and parses it out - if it's data, we push the data to a buffer for // the application to read the bytes on its own. // // if it's a different kind of packet, we do custom functionality (free the conntrack, update // local status, etc) func (sess *SerialSession) rxHandler() { // we wrap the serial port read line in a bufio.scanner using our custom split function. scan := bufio.NewScanner(sess.port) scan.Split(xbeeFrameSplit) for scan.Scan() { // TODO: check for errors? // data is a frame payload - not a full frame. data, err := parseFrame(scan.Bytes()) if err != nil { sess.log.Warnw("error parsing frame", "error", err, "data", data) continue } // data is good, lets parse the frame - using the first byte as the identifier. switch XBeeCmd(data[0]) { case RxPktType: // we parse the data, and push it to the rx buffer. //TODO: if we have multiple sources, we need to track them here. frame, err := ParseRxFrame(data) if err != nil { sess.log.Warnw("error parsing rx packet", "error", err, "data", data) break //continue? } // take the data and write it to our internal rx packet buffer. _, err = sess.rxBuf.Write(frame.Payload) if err != nil { sess.log.Warnw("error writing data", "error", err, "payload", frame.Payload) } // the "callback"-style handler. Any received packet with a frame ID should // be handled here. case TxStatusType, ATCmdResponseType, RemoteCmdRespType: // these take the frame bytes and parse it themselves. // we hand it back via the channel. we directly find the ID since it's always // the second byte. idx := data[1] err := sess.ct.ClearMark(idx, data) if err != nil { // we got a rogue packet lol sess.log.Warnw("rogue frame ID", "id", idx, "error", err) } default: // we don't know what to do with it. sess.log.Infow("unhandled packet type", "type", data[0], "id", data[1]) } } // if we get here, the serial port has closed. this is fine, usually. } // This implements io.Reader for the UART Session. func (sess *SerialSession) Read(p []byte) (int, error) { // Since we have an rx buffer, we just read from that and return the results. return sess.rxBuf.Read(p) } func (sess *SerialSession) Write(p []byte) (n int, err error) { // we construct a packet - using the conntrack to ensure that the packet is okay. // we block - this is more correct. idx, ch, err := sess.ct.GetMark() if err != nil { return } wf := &TxFrame{ Id: idx, Destination: BroadcastAddr, Payload: p, } // write the actual packet sess.writeLock.Lock() n, err = writeXBeeFrame(sess.port, wf.Bytes()) if err != nil { return } sess.writeLock.Unlock() // finally, wait for the channel we got to return. this means that // the matching response frame was received, so we can parse it. // TODO: add timeout. responseFrame := <-ch // this is a tx status frame bytes, so lets parse it out. status, err := ParseTxStatusFrame(responseFrame) if err != nil { return } if status.Status != 0 { err = fmt.Errorf("tx failed 0x%x", status.Status) } return } // sends a local AT command. If `queued` is true, the command is not immediately applied; // instead, an AC command must be set to apply the queued changes. `queued` does not // affect query-type commands, which always return right away. // the AT command is an interface. func (sess *SerialSession) ATCommand(at ATCmd, queued bool) error { // we must encode the command, and then create the actual packet. // then we send the packet, and wait for the response // TODO: how to handle multiple-response-packet AT commands? // (mainly Node Discovery ND) // get a mark for the frame isQuery := len(at.Payload()) > 0 idx, ch, err := sess.ct.GetMark() if err != nil { return err } rawData := encodeATCommand(at, idx, queued) sess.writeLock.Lock() _, err = writeXBeeFrame(sess.port, rawData) sess.writeLock.Unlock() if err != nil { return fmt.Errorf("error writing xbee frame: %w", err) } // we use the AT command that was provided to decode the frame. // Parse stores the response result locally. // we parse the base frame ourselves, and if it's okay we pass it // to the provided ATCommand // TODO: add timeout. resp, err := ParseATCmdResponse(<-ch) if err != nil { return err } if resp.Status != 0 { // sinec ATCmdStatus is a stringer thanks to the generator return fmt.Errorf("AT command failed: %s", resp.Status) } // finally, we use the provided ATCmd interface to unpack the data. // this overwrites the values provided, but this should only happen // during a query, so this is fine. // TODO: skip if not a query command? if isQuery { return at.Parse(resp) } // it's not a query, and there was no error, so we just plain return return nil } // Does this need to exist? func (sess *SerialSession) GetStatus() { panic("TODO: implement") } // Implement the io.Closer. func (sess *SerialSession) Close() error { return sess.port.Close() }