diff --git a/cmd/server.go b/cmd/server.go index 4152c33..b221e70 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -3,14 +3,15 @@ package cmd import ( "fmt" "net" + "os" "time" "github.com/kschamplin/gotelem/can" "github.com/kschamplin/gotelem/socketcan" "github.com/kschamplin/gotelem/xbee" - "github.com/tinylib/msgp/msgp" "github.com/urfave/cli/v2" "go.bug.st/serial" + "golang.org/x/exp/slog" ) const xbeeCategory = "XBee settings" @@ -35,9 +36,6 @@ func serve(useXbee bool) { go vcanTest() go canHandler(broker) go broker.Start() - if useXbee { - go xbeeSvc() - } ln, err := net.Listen("tcp", ":8082") if err != nil { fmt.Printf("Error listening: %v\n", err) @@ -55,48 +53,7 @@ func serve(useXbee bool) { func handleCon(conn net.Conn, broker *Broker) { // reader := msgp.NewReader(conn) - rxPkts := make(chan gotelem.Data) - done := make(chan bool) - go func() { - // setpu our msgp reader. - scann := msgp.NewReader(conn) - data := gotelem.Data{} - for { - err := data.DecodeMsg(scann) - if err != nil { - break - } - rxPkts <- data - } - done <- true // if we got here, it means the connction was closed. - }() - - // subscribe to can packets - // TODO: make this unique since remote addr could be non-unique - canCh := broker.Subscribe(conn.RemoteAddr().String()) - writer := msgp.NewWriter(conn) -mainloop: - for { - select { - case canFrame := <-canCh: - cf := gotelem.CanBody{ - Id: canFrame.Id, - Payload: canFrame.Data, - Source: "me", - } - cf.EncodeMsg(writer) - case rxBody := <-rxPkts: - // do nothing for now. - fmt.Printf("got a body %v\n", rxBody) - case <-time.After(1 * time.Second): // time out. - writer.Flush() - case <-done: - break mainloop - } - } - // unsubscribe and close the conn. - broker.Unsubscribe(conn.RemoteAddr().String()) conn.Close() } @@ -107,27 +64,12 @@ func xbeeSvc(b *Broker) { BaudRate: 115200, } - sess, err := xbee.NewSerialXBee("/dev/ttyACM0", mode) + logger := slog.New(slog.NewTextHandler(os.Stderr)) + _, err := xbee.NewSerialXBee("/dev/ttyACM0", mode, logger) if err != nil { fmt.Printf("got error %v", err) panic(err) } - - receivedData := make(chan gotelem.Data) - // make a simple reader that goes in the background. - go func() { - msgpReader := msgp.NewReader(sess) - var msgData gotelem.Data - msgData.DecodeMsg(msgpReader) - receivedData <- msgData - }() - for { - select { - case data := <-receivedData: - fmt.Printf("Got a data %v\n", data) - } - - } } // this spins up a new can socket on vcan0 and broadcasts a packet every second. for testing. diff --git a/cmd/xbee.go b/cmd/xbee.go index 1097604..6d63df9 100644 --- a/cmd/xbee.go +++ b/cmd/xbee.go @@ -4,7 +4,14 @@ package cmd // we can do network discovery and netcat-like things. import ( + "fmt" + "io" + "os" + + "github.com/kschamplin/gotelem/xbee" "github.com/urfave/cli/v2" + "go.bug.st/serial" + "golang.org/x/exp/slog" ) var xbeeCmd = &cli.Command{ @@ -21,25 +28,85 @@ For serial devices (COM1 and /dev/ttyUSB0), you can specify the baud rate using a ':'. If excluded the baud rate will default to 9600. Note that if using the native USB of the XLR Pro, the baud rate setting has no effect. -TCP/UDP connections require a port. +TCP/UDP connections require a port and will fail if one is not provided. + `, Flags: []cli.Flag{ &cli.StringFlag{ - Name: "device", - Aliases: []string{"d"}, - Usage: "The XBee to connect to", + Name: "device", + Aliases: []string{"d"}, + Usage: "The XBee to connect to", + Required: true, + EnvVars: []string{"XBEE_DEVICE"}, }, }, Subcommands: []*cli.Command{ { - Name: "info", - Usage: "get information about an xbee device", + Name: "info", + Usage: "get information about an xbee device", + Action: xbeeInfo, + HideHelpCommand: true, }, { Name: "netcat", Aliases: []string{"nc"}, ArgsUsage: "[addr]", Usage: "send data from stdio over the xbee", + Description: ` +netcat emulates the nc command. It reads data from stdin and transmits it to +[addr] on the XBee network. If [addr] is FFFF or not present, it will broadcast +the data to all listening devices. Data received from the network will be +writtend to stdout. + `, + Action: netcat, + HideHelpCommand: true, }, }, } + +func xbeeInfo(ctx *cli.Context) error { + + logger := slog.New(slog.NewTextHandler(os.Stderr)) + xb, err := xbee.NewSerialXBee("/dev/ttyACM0", &serial.Mode{}, logger) + if err != nil { + return cli.Exit(err, 1) + } + + b, err := xb.ATCommand([2]rune{'I', 'D'}, nil, false) + if err != nil { + return cli.Exit(err, 1) + } + fmt.Println(b) + return nil + +} +func netcat(ctx *cli.Context) error { + if ctx.Args().Len() < 1 { + + cli.ShowSubcommandHelp(ctx) + + return cli.Exit("missing [addr] argument", 1) + + } + // basically create two pipes. + logger := slog.New(slog.NewTextHandler(os.Stderr)) + xb, _ := xbee.NewSerialXBee("/dev/ttyACM0", &serial.Mode{}, logger) + + sent := make(chan int64) + streamCopy := func(r io.ReadCloser, w io.WriteCloser) { + defer r.Close() + defer w.Close() + n, err := io.Copy(w, r) + if err != nil { + logger.Warn("got error copying", "err", err) + } + sent <- n + } + + go streamCopy(os.Stdin, xb) + go streamCopy(xb, os.Stdout) + + <-sent + + return nil +} diff --git a/xbee/conntrack.go b/xbee/conntrack.go index 44f4e50..741d1b4 100644 --- a/xbee/conntrack.go +++ b/xbee/conntrack.go @@ -16,6 +16,12 @@ type connTrack struct { // the map is set when writing a frame, and deleted when recieving a matching frame. } +func NewConnTrack() *connTrack { + return &connTrack{ + internal: make(map[uint8]chan []byte), + } +} + // GetMark finds the next available marker and takes it, returning the value of // the mark as well as a channel to use as a semaphore when the mark is cleared. // If no mark can be acquired, it returns an error. @@ -49,16 +55,18 @@ func (ct *connTrack) GetMark() (uint8, <-chan []byte, error) { // tracked. func (ct *connTrack) ClearMark(mark uint8, data []byte) error { ct.mu.RLock() - // FIXME: should this be the other way around (swap if and normal execution - if val, ok := ct.internal[mark]; ok { + + val, ok := ct.internal[mark] + if !ok { ct.mu.RUnlock() - ct.mu.Lock() - val <- data - close(val) - delete(ct.internal, mark) - ct.mu.Unlock() - return nil + return errors.New("mark was not set") } + ct.mu.RUnlock() - return errors.New("mark was not set") + ct.mu.Lock() + val <- data + close(val) + delete(ct.internal, mark) + ct.mu.Unlock() + return nil } diff --git a/xbee/session.go b/xbee/session.go index fb41c4a..09b313e 100644 --- a/xbee/session.go +++ b/xbee/session.go @@ -38,7 +38,7 @@ type SerialSession struct { writeLock sync.Mutex // prevents multiple writers from accessing the port at once. } -func NewSerialXBee(portName string, mode *serial.Mode) (*SerialSession, error) { +func NewSerialXBee(portName string, mode *serial.Mode, baseLog *slog.Logger) (*SerialSession, error) { // make the session with the port/mode given, and set up the conntrack. sess := &SerialSession{} @@ -48,7 +48,8 @@ func NewSerialXBee(portName string, mode *serial.Mode) (*SerialSession, error) { } sess.port = port - sess.ct = connTrack{} + sess.ct = *NewConnTrack() + sess.Logger = *baseLog.With("portname", portName) // setup io readwriter with a pipe. rd, wr := io.Pipe() @@ -76,19 +77,16 @@ func (sess *SerialSession) rxHandler() { scan.Split(xbeeFrameSplit) for scan.Scan() { - // TODO: check for errors? - // data is a frame payload - not a full frame. data, err := parseFrame(scan.Bytes()) if err != nil { sess.Logger.Warn("error parsing frame", "error", err, "data", data) continue } - // data is good, lets parse the frame - using the first byte as the identifier. switch XBeeCmd(data[0]) { case RxPktType: // we parse the data, and push it to the rx buffer. - //TODO: if we have multiple sources, we need to track them here. + //TODO: if we have multiple remotes on the network, we need to track them here. frame, err := ParseRxFrame(data) if err != nil { sess.Logger.Warn("error parsing rx packet", "error", err, "data", data) @@ -100,10 +98,8 @@ func (sess *SerialSession) rxHandler() { sess.Logger.Warn("error writing data", "error", err, "payload", frame.Payload) } - // the "callback"-style handler. Any received packet with a frame ID should - // be handled here. - case TxStatusType, ATCmdResponseType, RemoteCmdRespType: // these take the frame bytes and parse it themselves. - // we hand it back via the channel. we directly find the ID since it's always + case TxStatusType, ATCmdResponseType, RemoteCmdRespType: + // we hand the frame back via the channel. we directly find the ID since it's always // the second byte. idx := data[1] @@ -130,12 +126,13 @@ func (sess *SerialSession) Read(p []byte) (int, error) { } func (sess *SerialSession) Write(p []byte) (n int, err error) { - // we construct a packet - using the conntrack to ensure that the packet is okay. - // we block - this is more correct. + sess.Warn("hello") idx, ch, err := sess.ct.GetMark() if err != nil { return } + + n = len(p) wf := &TxFrame{ Id: idx, Destination: BroadcastAddr, @@ -144,7 +141,7 @@ func (sess *SerialSession) Write(p []byte) (n int, err error) { // write the actual packet sess.writeLock.Lock() - n, err = writeXBeeFrame(sess.port, wf.Bytes()) + _, err = writeXBeeFrame(sess.port, wf.Bytes()) sess.writeLock.Unlock() if err != nil { return @@ -180,7 +177,6 @@ func (sess *SerialSession) ATCommand(cmd [2]rune, data []byte, queued bool) ([]b // get a mark for the frame - isQuery := len(data) > 0 idx, ch, err := sess.ct.GetMark() if err != nil { return nil, err @@ -195,12 +191,8 @@ func (sess *SerialSession) ATCommand(cmd [2]rune, data []byte, queued bool) ([]b return nil, fmt.Errorf("error writing xbee frame: %w", err) } - // we use the AT command that was provided to decode the frame. - // Parse stores the response result locally. - // we parse the base frame ourselves, and if it's okay we pass it - // to the provided ATCommand - // TODO: add timeout. + resp, err := ParseATCmdResponse(<-ch) if err != nil { return nil, err @@ -208,20 +200,10 @@ func (sess *SerialSession) ATCommand(cmd [2]rune, data []byte, queued bool) ([]b if resp.Status != 0 { // sinec ATCmdStatus is a stringer thanks to the generator - return nil, fmt.Errorf("AT command failed: %v", resp.Status) + return resp.Data, fmt.Errorf("AT command failed: %v", resp.Status) } - // finally, we use the provided ATCmd interface to unpack the data. - // this overwrites the values provided, but this should only happen - // during a query, so this is fine. - // TODO: skip if not a query command? - - if isQuery { - return resp.Data, nil - } - - // it's not a query, and there was no error, so we just plain return - return nil, nil + return resp.Data, nil } @@ -235,6 +217,5 @@ func (sess *SerialSession) Close() error { return sess.port.Close() } -func (sess *SerialSession) DiscoverNodes() { - panic("TODO: implement") -} +// next, we define AT commands. These are functions that take in a Session and +// provide wrappers around AT command information, like type checking.