rpc work (unfinished)

This commit is contained in:
saji 2023-05-01 09:49:47 -05:00
parent c5716de704
commit 09d1660ce0
5 changed files with 297 additions and 84 deletions

View file

@ -24,18 +24,21 @@ var serveCmd = &cli.Command{
&cli.BoolFlag{Name: "xbee", Aliases: []string{"x"}, Usage: "Find and connect to an XBee"}, &cli.BoolFlag{Name: "xbee", Aliases: []string{"x"}, Usage: "Find and connect to an XBee"},
}, },
Action: func(ctx *cli.Context) error { Action: func(ctx *cli.Context) error {
serve() serve(ctx.Bool("xbee"))
return nil return nil
}, },
} }
func serve() { func serve(useXbee bool) {
broker := NewBroker(3) broker := NewBroker(3)
// start the can listener // start the can listener
go vcanTest() go vcanTest()
go canHandler(broker) go canHandler(broker)
go broker.Start() go broker.Start()
if useXbee {
go xbeeSvc()
}
ln, err := net.Listen("tcp", ":8082") ln, err := net.Listen("tcp", ":8082")
if err != nil { if err != nil {
fmt.Printf("Error listening: %v\n", err) fmt.Printf("Error listening: %v\n", err)
@ -54,6 +57,7 @@ func serve() {
func handleCon(conn net.Conn, broker *Broker) { func handleCon(conn net.Conn, broker *Broker) {
// reader := msgp.NewReader(conn) // reader := msgp.NewReader(conn)
rxPkts := make(chan gotelem.Data) rxPkts := make(chan gotelem.Data)
done := make(chan bool)
go func() { go func() {
// setpu our msgp reader. // setpu our msgp reader.
scann := msgp.NewReader(conn) scann := msgp.NewReader(conn)
@ -66,14 +70,15 @@ func handleCon(conn net.Conn, broker *Broker) {
rxPkts <- data rxPkts <- data
} }
done <- true // if we got here, it means the connction was closed.
}() }()
// subscribe to can packets // subscribe to can packets
// TODO: make this unique since remote addr could be non-unique // TODO: make this unique since remote addr could be non-unique
canCh := broker.Subscribe(conn.RemoteAddr().String()) canCh := broker.Subscribe(conn.RemoteAddr().String())
writer := msgp.NewWriter(conn) writer := msgp.NewWriter(conn)
mainloop:
for { for {
select { select {
case canFrame := <-canCh: case canFrame := <-canCh:
cf := gotelem.CanBody{ cf := gotelem.CanBody{
@ -86,24 +91,24 @@ func handleCon(conn net.Conn, broker *Broker) {
// do nothing for now. // do nothing for now.
fmt.Printf("got a body %v\n", rxBody) fmt.Printf("got a body %v\n", rxBody)
case <-time.After(1 * time.Second): // time out. case <-time.After(1 * time.Second): // time out.
fmt.Printf("timeout\n")
data := gotelem.StatusBody{
BatteryPct: 1.2,
ErrCode: 0,
}
data.EncodeMsg(writer)
writer.Flush() writer.Flush()
case <-done:
break mainloop
} }
} }
// unsubscribe and close the conn.
broker.Unsubscribe(conn.RemoteAddr().String())
conn.Close()
} }
func xbeeSvc(packets <-chan can.Frame, device string, quit <-chan struct{}) { func xbeeSvc(b *Broker) {
// open the session. // open the session.
mode := &serial.Mode{ mode := &serial.Mode{
BaudRate: 115200, BaudRate: 115200,
} }
sess, err := xbee.NewSerialXBee("/dev/ttyUSB1", mode)
sess, err := xbee.NewSerialXBee("/dev/ttyACM0", mode)
if err != nil { if err != nil {
fmt.Printf("got error %v", err) fmt.Printf("got error %v", err)
panic(err) panic(err)
@ -121,8 +126,6 @@ func xbeeSvc(packets <-chan can.Frame, device string, quit <-chan struct{}) {
select { select {
case data := <-receivedData: case data := <-receivedData:
fmt.Printf("Got a data %v\n", data) fmt.Printf("Got a data %v\n", data)
case packet := <-packets:
fmt.Printf("Got a packet, %v\n", packet)
} }
} }
@ -238,3 +241,10 @@ func (b *Broker) Subscribe(name string) <-chan can.Frame {
b.subsCh <- bc b.subsCh <- bc
return ch return ch
} }
func (b *Broker) Unsubscribe(name string) {
bc := BrokerClient{
Name: name,
}
b.unsubCh <- bc
}

View file

@ -1,84 +1,120 @@
package gotelem package gotelem
import "github.com/tinylib/msgp/msgp" import (
"net"
// this file is a simple implementation of the msgpack-rpc data format. "github.com/tinylib/msgp/msgp"
type RPCType int
const (
RequestType RPCType = 0
ResponseType RPCType = 1
NotificationType RPCType = 2
) )
//go:generate msgp
//msgp:tuple Request
//msgp:tuple Response
//msgp:tuple Notification
// A request is a function call that expects a Response. // the target architecture is a subscribe function that
type Request struct { // takes a can FILTER. Then the server will emit notifications.
// should always be zero. // that contain new can packets as they come in.
msgtype int `msg:"type"`
MsgId uint32 `msg:"msgid"` // this means that the client should be able to handle
Method string `msg:"method"` // notify packets on top of response packets.
Params interface{} `msg:"params,allownil"`
// we should register handlers. They should handle serialization
// and deserialization on their own. This way we avoid reflect.
// since reflected code can be more complex under the hood.
// ServiceFunc is a RPC service handler.
type ServiceFunc func(params msgp.Raw) (res msgp.MarshalSizer, err error)
// RPCConn is a single RPC communication pair.
type RPCConn struct {
// TODO: use io.readwritecloser?
conn net.Conn
handlers map[string]ServiceFunc
// indicates what messages we've used.
// TODO: use a channel to return a response?
// TODO: lock with mutex
ct map[uint32]struct{}
} }
func NewRequest(msgid uint32, method string, params interface{}) *Request { // Call intiates an RPC call to a remote method and returns the
return &Request{ // response, or the error, if any.
msgtype: 0, // TODO: determine signature
MsgId: msgid, // TODO: this should block?
Method: method, func (rpc *RPCConn) Call(method string, params msgp.Marshaler) {
Params: params,
}
}
// A response is the result of a function call, or an error.
type Response struct {
// should always be one.
msgtype int `msg:"type"`
MsgId uint32 `msg:"msgid"`
Error interface{} `msg:"error,allownil"`
Result interface{} `msg:"result,allownil"`
}
// A notification is a function call that does not care if the call
// succeeds and ignores responses.
type Notification struct {
// should always be *2*
msgtype int `msg:"type"`
Method string `msg:"method"`
Params interface{} `msg:"params,allownil"`
}
// todo: should these be functions instead, since they're arrays? and we need to determine the type beforehand.
func getMsgType(b []byte) RPCType {
size, next, err := msgp.ReadArrayHeaderBytes(b)
if err != nil {
panic(err)
}
if size == 3 { // hot path for notifications.
return NotificationType
}
vtype, _, err := msgp.ReadIntBytes(next)
if err != nil {
panic(err)
}
// todo: use readIntf instead? returns a []interface{} and we can map it ourselves...
return RPCType(vtype)
}
func parseRPC(raw msgp.Raw) interface{} {
t := getMsgType(raw)
if t == RequestType {
} }
// Notify initiates a notification to a remote method. It does not
// return any information. There is no response from the server.
// This method will not block. An error is returned if there is a local
// problem.
func (rpc *RPCConn) Notify(method string, params msgp.Marshaler) {
// TODO: return an error if there's a local problem?
}
// Register a new handler to be called by the remote side. An error
// is returned if the handler name is already in use.
func (rpc *RPCConn) RegisterHandler(name string, fn ServiceFunc) error {
// TODO: check if name in use.
// TODO: mutex lock for sync (or use sync.map?
rpc.handlers[name] = fn
return nil return nil
} }
// Serve runs the server. It will dispatch goroutines to handle each
// method call. This can (and should in most cases) be run in the background to allow for
// sending and receving on the same connection.
func (rpc *RPCConn) Serve() {
// construct a stream reader.
msgReader := msgp.NewReader(rpc.conn)
// read a request/notification from the connection.
var rawmsg msgp.Raw = make(msgp.Raw, 0, 4)
rawmsg.DecodeMsg(msgReader)
rpcIntf, err := parseRPC(rawmsg)
switch rpcObject := rpcIntf.(type) {
case Request:
// the object is a request - we must dispatch a goroutine
// that will call the handler and also send a return value.
go rpc.dispatch(rpcObject)
case Notification:
go rpc.dispatchNotif(rpcObject)
case Response:
// TODO: return response to caller.
}
}
func (rpc *RPCConn) dispatch(req Request) {
result, err := rpc.handlers[req.Method](req.Params)
if err != nil {
// log the error.
}
// construct the response frame.
var rpcE *RPCError = MakeRPCError(err)
w := msgp.NewWriter(rpc.conn)
resBuf := make(msgp.Raw, result.Msgsize())
result.MarshalMsg(resBuf)
response := NewResponse(req.MsgId, *rpcE, resBuf)
response.EncodeMsg(w)
}
func (rpc *RPCConn) dispatchNotif(req Notification) {
_, err := rpc.handlers[req.Method](req.Params)
if err != nil {
// log the error.
}
// no need for response.
}

146
internal/gotelem/rpc_msg.go Normal file
View file

@ -0,0 +1,146 @@
package gotelem
import (
"errors"
"github.com/tinylib/msgp/msgp"
)
// this file is a simple implementation of the msgpack-rpc data formato.
// it also contains an RPC server and client.
// We can port this to python rather easily too.
type RPCType int
const (
RequestType RPCType = 0
ResponseType RPCType = 1
NotificationType RPCType = 2
)
// the messagepack RPC spec requires that the RPC wire formts are ordered arrays,
// aka tuples. we can use msgp options to make them tuple automatically,
// based on the order they are declared. This makes the order of these
// structs *critical*! Do not touch!
//go:generate msgp
//msgp:tuple Request
//msgp:tuple Response
//msgp:tuple Notification
// A request is a function call that expects a Response.
type Request struct {
// should always be zero.
msgtype RPCType `msg:"type"`
MsgId uint32 `msg:"msgid"`
Method string `msg:"method"`
Params msgp.Raw `msg:"params,allownil"`
}
func NewRequest(msgid uint32, method string, params msgp.Raw) *Request {
return &Request{
msgtype: 0,
MsgId: msgid,
Method: method,
Params: params,
}
}
// A response is the result of a function call, or an error.
type Response struct {
// should always be one.
msgtype RPCType `msg:"type"`
MsgId uint32 `msg:"msgid"`
Error RPCError `msg:"error,allownil"`
Result msgp.Raw `msg:"result,allownil"`
}
func NewResponse(msgid uint32, respErr RPCError, res msgp.Raw) *Response {
return &Response{
msgtype: 1,
MsgId: msgid,
Error: respErr,
Result: res,
}
}
// A notification is a function call that does not care if the call
// succeeds and ignores responses.
type Notification struct {
// should always be *2*
msgtype RPCType `msg:"type"`
Method string `msg:"method"`
Params msgp.Raw `msg:"params,allownil"`
}
// todo: should these be functions instead, since they're arrays? and we need to determine the type beforehand.
func getMsgType(b []byte) RPCType {
size, next, err := msgp.ReadArrayHeaderBytes(b)
if err != nil {
panic(err)
}
if size == 3 { // hot path for notifications.
return NotificationType
}
vtype, _, err := msgp.ReadIntBytes(next)
if err != nil {
panic(err)
}
// todo: use readIntf instead? returns a []interface{} and we can map it ourselves...
return RPCType(vtype)
}
// parseRPC takes a raw message and decodes it based on the first value
// of the array (the type). It returns the decoded object. Callers
// can use a type-switch to determine the type of the data.
func parseRPC(raw msgp.Raw) (interface{}, error) {
t := getMsgType(raw)
switch RPCType(t) {
case RequestType:
// create and return a request struct.
req := &Request{}
_, err := req.UnmarshalMsg(raw)
return req, err
case ResponseType:
res := &Response{}
_, err := res.UnmarshalMsg(raw)
return res, err
case NotificationType:
notif := &Notification{}
_, err := notif.UnmarshalMsg(raw)
return notif, err
default:
// uh oh.
return nil, errors.New("unmatched RPC type")
}
}
// RPCError is a common RPC error format. It is basically a clone of the
// JSON-RPC error format. We use it so we know what to expect there.
//msgp:tuple RPCError
type RPCError struct {
Code int
Desc string
}
// Converts a go error into a RPC error.
func MakeRPCError(err error) *RPCError {
if err == nil {
return nil
}
return &RPCError{
Code: -1,
Desc: err.Error(),
}
}

View file

@ -123,6 +123,7 @@ func encodeRemoteATCommand(at ATCmd, idx uint8, queued bool, destination uint64)
if !queued { if !queued {
options = options | 0x2 options = options | 0x2
} }
buf.WriteByte(options)
// write AT command // write AT command
cmd := at.Cmd() cmd := at.Cmd()
@ -136,3 +137,23 @@ func encodeRemoteATCommand(at ATCmd, idx uint8, queued bool, destination uint64)
} }
// let's actually define some AT commands now. // let's actually define some AT commands now.
// the AT command for the ID (Network ID).
// the network identifier is used to communicate with other devices. It must match.
type ATCmdID struct {
id uint32
isQuery bool
}
func (cmd ATCmdID) Cmd() [2]rune {
return [2]rune{'I', 'D'}
}
func (cmd ATCmdID) Payload() []byte {
if cmd.isQuery {
return []byte{}
}
res := make([]byte, 0)
res = binary.BigEndian.AppendUint32(res, cmd.id)
return res
}

View file

@ -126,7 +126,7 @@ func (sess *SerialSession) rxHandler() {
} }
} }
// if we get here, the serial port has closed. this is fine, usually. // if we get here, the serial port has closed. this is fine.
} }
// This implements io.Reader for the UART Session. // This implements io.Reader for the UART Session.
@ -214,7 +214,7 @@ func (sess *SerialSession) ATCommand(at ATCmd, queued bool) error {
if resp.Status != 0 { if resp.Status != 0 {
// sinec ATCmdStatus is a stringer thanks to the generator // sinec ATCmdStatus is a stringer thanks to the generator
return fmt.Errorf("AT command failed: %s", resp.Status) return fmt.Errorf("AT command failed: %v", resp.Status)
} }
// finally, we use the provided ATCmd interface to unpack the data. // finally, we use the provided ATCmd interface to unpack the data.