diff --git a/cmd/server.go b/cmd/server.go index 2ded0a2..636cb38 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -24,18 +24,21 @@ var serveCmd = &cli.Command{ &cli.BoolFlag{Name: "xbee", Aliases: []string{"x"}, Usage: "Find and connect to an XBee"}, }, Action: func(ctx *cli.Context) error { - serve() + serve(ctx.Bool("xbee")) return nil }, } -func serve() { +func serve(useXbee bool) { broker := NewBroker(3) // start the can listener go vcanTest() go canHandler(broker) go broker.Start() + if useXbee { + go xbeeSvc() + } ln, err := net.Listen("tcp", ":8082") if err != nil { fmt.Printf("Error listening: %v\n", err) @@ -54,6 +57,7 @@ func serve() { func handleCon(conn net.Conn, broker *Broker) { // reader := msgp.NewReader(conn) rxPkts := make(chan gotelem.Data) + done := make(chan bool) go func() { // setpu our msgp reader. scann := msgp.NewReader(conn) @@ -66,14 +70,15 @@ func handleCon(conn net.Conn, broker *Broker) { rxPkts <- data } + done <- true // if we got here, it means the connction was closed. }() // subscribe to can packets // TODO: make this unique since remote addr could be non-unique canCh := broker.Subscribe(conn.RemoteAddr().String()) writer := msgp.NewWriter(conn) +mainloop: for { - select { case canFrame := <-canCh: cf := gotelem.CanBody{ @@ -86,24 +91,24 @@ func handleCon(conn net.Conn, broker *Broker) { // do nothing for now. fmt.Printf("got a body %v\n", rxBody) case <-time.After(1 * time.Second): // time out. - fmt.Printf("timeout\n") - data := gotelem.StatusBody{ - BatteryPct: 1.2, - ErrCode: 0, - } - data.EncodeMsg(writer) writer.Flush() + case <-done: + break mainloop } } + // unsubscribe and close the conn. + broker.Unsubscribe(conn.RemoteAddr().String()) + conn.Close() } -func xbeeSvc(packets <-chan can.Frame, device string, quit <-chan struct{}) { +func xbeeSvc(b *Broker) { // open the session. mode := &serial.Mode{ BaudRate: 115200, } - sess, err := xbee.NewSerialXBee("/dev/ttyUSB1", mode) + + sess, err := xbee.NewSerialXBee("/dev/ttyACM0", mode) if err != nil { fmt.Printf("got error %v", err) panic(err) @@ -121,8 +126,6 @@ func xbeeSvc(packets <-chan can.Frame, device string, quit <-chan struct{}) { select { case data := <-receivedData: fmt.Printf("Got a data %v\n", data) - case packet := <-packets: - fmt.Printf("Got a packet, %v\n", packet) } } @@ -238,3 +241,10 @@ func (b *Broker) Subscribe(name string) <-chan can.Frame { b.subsCh <- bc return ch } + +func (b *Broker) Unsubscribe(name string) { + bc := BrokerClient{ + Name: name, + } + b.unsubCh <- bc +} diff --git a/internal/gotelem/rpc.go b/internal/gotelem/rpc.go index 6a24aae..825e8d3 100644 --- a/internal/gotelem/rpc.go +++ b/internal/gotelem/rpc.go @@ -1,84 +1,120 @@ package gotelem -import "github.com/tinylib/msgp/msgp" +import ( + "net" -// this file is a simple implementation of the msgpack-rpc data format. - -type RPCType int - -const ( - RequestType RPCType = 0 - ResponseType RPCType = 1 - NotificationType RPCType = 2 + "github.com/tinylib/msgp/msgp" ) -//go:generate msgp -//msgp:tuple Request -//msgp:tuple Response -//msgp:tuple Notification -// A request is a function call that expects a Response. -type Request struct { - // should always be zero. - msgtype int `msg:"type"` - MsgId uint32 `msg:"msgid"` - Method string `msg:"method"` - Params interface{} `msg:"params,allownil"` +// the target architecture is a subscribe function that +// takes a can FILTER. Then the server will emit notifications. +// that contain new can packets as they come in. + +// this means that the client should be able to handle +// notify packets on top of response packets. + +// we should register handlers. They should handle serialization +// and deserialization on their own. This way we avoid reflect. +// since reflected code can be more complex under the hood. + +// ServiceFunc is a RPC service handler. +type ServiceFunc func(params msgp.Raw) (res msgp.MarshalSizer, err error) + + +// RPCConn is a single RPC communication pair. +type RPCConn struct { + // TODO: use io.readwritecloser? + conn net.Conn + handlers map[string]ServiceFunc + + // indicates what messages we've used. + // TODO: use a channel to return a response? + // TODO: lock with mutex + ct map[uint32]struct{} } -func NewRequest(msgid uint32, method string, params interface{}) *Request { - return &Request{ - msgtype: 0, - MsgId: msgid, - Method: method, - Params: params, - } +// Call intiates an RPC call to a remote method and returns the +// response, or the error, if any. +// TODO: determine signature +// TODO: this should block? +func (rpc *RPCConn) Call(method string, params msgp.Marshaler) { + } -// A response is the result of a function call, or an error. -type Response struct { - // should always be one. - msgtype int `msg:"type"` - MsgId uint32 `msg:"msgid"` - Error interface{} `msg:"error,allownil"` - Result interface{} `msg:"result,allownil"` +// Notify initiates a notification to a remote method. It does not +// return any information. There is no response from the server. +// This method will not block. An error is returned if there is a local +// problem. +func (rpc *RPCConn) Notify(method string, params msgp.Marshaler) { + // TODO: return an error if there's a local problem? + } -// A notification is a function call that does not care if the call -// succeeds and ignores responses. -type Notification struct { - // should always be *2* - msgtype int `msg:"type"` - Method string `msg:"method"` - Params interface{} `msg:"params,allownil"` -} +// Register a new handler to be called by the remote side. An error +// is returned if the handler name is already in use. +func (rpc *RPCConn) RegisterHandler(name string, fn ServiceFunc) error { + // TODO: check if name in use. + // TODO: mutex lock for sync (or use sync.map? + rpc.handlers[name] = fn -// todo: should these be functions instead, since they're arrays? and we need to determine the type beforehand. - -func getMsgType(b []byte) RPCType { - size, next, err := msgp.ReadArrayHeaderBytes(b) - if err != nil { - panic(err) - } - if size == 3 { // hot path for notifications. - return NotificationType - } - - vtype, _, err := msgp.ReadIntBytes(next) - - if err != nil { - panic(err) - } - - // todo: use readIntf instead? returns a []interface{} and we can map it ourselves... - return RPCType(vtype) -} - -func parseRPC(raw msgp.Raw) interface{} { - t := getMsgType(raw) - - if t == RequestType { - - } return nil } + +// Serve runs the server. It will dispatch goroutines to handle each +// method call. This can (and should in most cases) be run in the background to allow for +// sending and receving on the same connection. +func (rpc *RPCConn) Serve() { + + // construct a stream reader. + msgReader := msgp.NewReader(rpc.conn) + + // read a request/notification from the connection. + + var rawmsg msgp.Raw = make(msgp.Raw, 0, 4) + + rawmsg.DecodeMsg(msgReader) + + rpcIntf, err := parseRPC(rawmsg) + + switch rpcObject := rpcIntf.(type) { + case Request: + // the object is a request - we must dispatch a goroutine + // that will call the handler and also send a return value. + go rpc.dispatch(rpcObject) + case Notification: + go rpc.dispatchNotif(rpcObject) + case Response: + // TODO: return response to caller. + } +} + +func (rpc *RPCConn) dispatch(req Request) { + + result, err := rpc.handlers[req.Method](req.Params) + + if err != nil { + // log the error. + } + // construct the response frame. + var rpcE *RPCError = MakeRPCError(err) + + w := msgp.NewWriter(rpc.conn) + resBuf := make(msgp.Raw, result.Msgsize()) + + result.MarshalMsg(resBuf) + + response := NewResponse(req.MsgId, *rpcE, resBuf) + + response.EncodeMsg(w) + +} +func (rpc *RPCConn) dispatchNotif(req Notification) { + + _, err := rpc.handlers[req.Method](req.Params) + + if err != nil { + // log the error. + } + // no need for response. +} diff --git a/internal/gotelem/rpc_msg.go b/internal/gotelem/rpc_msg.go new file mode 100644 index 0000000..55e32c0 --- /dev/null +++ b/internal/gotelem/rpc_msg.go @@ -0,0 +1,146 @@ +package gotelem + +import ( + "errors" + + "github.com/tinylib/msgp/msgp" +) + +// this file is a simple implementation of the msgpack-rpc data formato. +// it also contains an RPC server and client. +// We can port this to python rather easily too. + +type RPCType int + +const ( + RequestType RPCType = 0 + ResponseType RPCType = 1 + NotificationType RPCType = 2 +) + + +// the messagepack RPC spec requires that the RPC wire formts are ordered arrays, +// aka tuples. we can use msgp options to make them tuple automatically, +// based on the order they are declared. This makes the order of these +// structs *critical*! Do not touch! + +//go:generate msgp +//msgp:tuple Request +//msgp:tuple Response +//msgp:tuple Notification + +// A request is a function call that expects a Response. +type Request struct { + // should always be zero. + msgtype RPCType `msg:"type"` + MsgId uint32 `msg:"msgid"` + Method string `msg:"method"` + Params msgp.Raw `msg:"params,allownil"` +} + +func NewRequest(msgid uint32, method string, params msgp.Raw) *Request { + return &Request{ + msgtype: 0, + MsgId: msgid, + Method: method, + Params: params, + } +} + +// A response is the result of a function call, or an error. +type Response struct { + // should always be one. + msgtype RPCType `msg:"type"` + MsgId uint32 `msg:"msgid"` + Error RPCError `msg:"error,allownil"` + Result msgp.Raw `msg:"result,allownil"` +} + +func NewResponse(msgid uint32, respErr RPCError, res msgp.Raw) *Response { + return &Response{ + msgtype: 1, + MsgId: msgid, + Error: respErr, + Result: res, + } +} + +// A notification is a function call that does not care if the call +// succeeds and ignores responses. +type Notification struct { + // should always be *2* + msgtype RPCType `msg:"type"` + Method string `msg:"method"` + Params msgp.Raw `msg:"params,allownil"` +} + +// todo: should these be functions instead, since they're arrays? and we need to determine the type beforehand. + +func getMsgType(b []byte) RPCType { + size, next, err := msgp.ReadArrayHeaderBytes(b) + if err != nil { + panic(err) + } + if size == 3 { // hot path for notifications. + return NotificationType + } + + vtype, _, err := msgp.ReadIntBytes(next) + + if err != nil { + panic(err) + } + + // todo: use readIntf instead? returns a []interface{} and we can map it ourselves... + return RPCType(vtype) +} + + +// parseRPC takes a raw message and decodes it based on the first value +// of the array (the type). It returns the decoded object. Callers +// can use a type-switch to determine the type of the data. +func parseRPC(raw msgp.Raw) (interface{}, error) { + t := getMsgType(raw) + + switch RPCType(t) { + + case RequestType: + // create and return a request struct. + req := &Request{} + _, err := req.UnmarshalMsg(raw) + return req, err + case ResponseType: + res := &Response{} + _, err := res.UnmarshalMsg(raw) + return res, err + case NotificationType: + notif := &Notification{} + _, err := notif.UnmarshalMsg(raw) + return notif, err + default: + // uh oh. + return nil, errors.New("unmatched RPC type") + } +} + +// RPCError is a common RPC error format. It is basically a clone of the +// JSON-RPC error format. We use it so we know what to expect there. + + +//msgp:tuple RPCError +type RPCError struct { + Code int + Desc string +} + + +// Converts a go error into a RPC error. +func MakeRPCError(err error) *RPCError { + if err == nil { + return nil + } + return &RPCError{ + Code: -1, + Desc: err.Error(), + } +} diff --git a/internal/xbee/at.go b/internal/xbee/at.go index 53b89c0..d4dcadd 100644 --- a/internal/xbee/at.go +++ b/internal/xbee/at.go @@ -123,6 +123,7 @@ func encodeRemoteATCommand(at ATCmd, idx uint8, queued bool, destination uint64) if !queued { options = options | 0x2 } + buf.WriteByte(options) // write AT command cmd := at.Cmd() @@ -136,3 +137,23 @@ func encodeRemoteATCommand(at ATCmd, idx uint8, queued bool, destination uint64) } // let's actually define some AT commands now. + +// the AT command for the ID (Network ID). +// the network identifier is used to communicate with other devices. It must match. +type ATCmdID struct { + id uint32 + isQuery bool +} + +func (cmd ATCmdID) Cmd() [2]rune { + return [2]rune{'I', 'D'} +} + +func (cmd ATCmdID) Payload() []byte { + if cmd.isQuery { + return []byte{} + } + res := make([]byte, 0) + res = binary.BigEndian.AppendUint32(res, cmd.id) + return res +} diff --git a/internal/xbee/session.go b/internal/xbee/session.go index b4aab9b..8b524f1 100644 --- a/internal/xbee/session.go +++ b/internal/xbee/session.go @@ -126,7 +126,7 @@ func (sess *SerialSession) rxHandler() { } } - // if we get here, the serial port has closed. this is fine, usually. + // if we get here, the serial port has closed. this is fine. } // This implements io.Reader for the UART Session. @@ -214,7 +214,7 @@ func (sess *SerialSession) ATCommand(at ATCmd, queued bool) error { if resp.Status != 0 { // sinec ATCmdStatus is a stringer thanks to the generator - return fmt.Errorf("AT command failed: %s", resp.Status) + return fmt.Errorf("AT command failed: %v", resp.Status) } // finally, we use the provided ATCmd interface to unpack the data.