diff --git a/internal/gotelem/rpc.go b/internal/gotelem/rpc.go index 825e8d3..9c8feac 100644 --- a/internal/gotelem/rpc.go +++ b/internal/gotelem/rpc.go @@ -1,12 +1,14 @@ package gotelem import ( + "errors" + "math/rand" "net" + "sync" "github.com/tinylib/msgp/msgp" ) - // the target architecture is a subscribe function that // takes a can FILTER. Then the server will emit notifications. // that contain new can packets as they come in. @@ -18,9 +20,8 @@ import ( // and deserialization on their own. This way we avoid reflect. // since reflected code can be more complex under the hood. -// ServiceFunc is a RPC service handler. -type ServiceFunc func(params msgp.Raw) (res msgp.MarshalSizer, err error) - +// ServiceFunc is a RPC service handler. +type ServiceFunc func(params msgp.Raw) (res msgp.Raw, err error) // RPCConn is a single RPC communication pair. type RPCConn struct { @@ -31,15 +32,29 @@ type RPCConn struct { // indicates what messages we've used. // TODO: use a channel to return a response? // TODO: lock with mutex - ct map[uint32]struct{} + ct rpcConnTrack } // Call intiates an RPC call to a remote method and returns the // response, or the error, if any. // TODO: determine signature // TODO: this should block? -func (rpc *RPCConn) Call(method string, params msgp.Marshaler) { +func (rpc *RPCConn) Call(method string, params msgp.Marshaler) (msgp.Raw, error) { + // TODO: error handling. + rawParam, _ := params.MarshalMsg([]byte{}) + + id, cb := rpc.ct.Claim() + + req := NewRequest(id, method, rawParam) + + w := msgp.NewWriter(rpc.conn) + req.EncodeMsg(w) + + // block and wait for response. + resp := <-cb + + return resp.Result, &resp.Error } // Notify initiates a notification to a remote method. It does not @@ -48,6 +63,12 @@ func (rpc *RPCConn) Call(method string, params msgp.Marshaler) { // problem. func (rpc *RPCConn) Notify(method string, params msgp.Marshaler) { // TODO: return an error if there's a local problem? + rawParam, _ := params.MarshalMsg([]byte{}) + + req := NewNotification(method, rawParam) + + w := msgp.NewWriter(rpc.conn) + req.EncodeMsg(w) } @@ -73,19 +94,25 @@ func (rpc *RPCConn) Serve() { var rawmsg msgp.Raw = make(msgp.Raw, 0, 4) - rawmsg.DecodeMsg(msgReader) + for { + rawmsg.DecodeMsg(msgReader) - rpcIntf, err := parseRPC(rawmsg) + rpcIntf, _ := parseRPC(rawmsg) - switch rpcObject := rpcIntf.(type) { - case Request: - // the object is a request - we must dispatch a goroutine - // that will call the handler and also send a return value. - go rpc.dispatch(rpcObject) - case Notification: - go rpc.dispatchNotif(rpcObject) - case Response: - // TODO: return response to caller. + switch rpcObject := rpcIntf.(type) { + case Request: + // the object is a request - we must dispatch a goroutine + // that will call the handler and also send a return value. + go rpc.dispatch(rpcObject) + case Notification: + go rpc.dispatchNotif(rpcObject) + case Response: + cbCh, err := rpc.ct.Clear(rpcObject.MsgId) + if err != nil { + // TODO: scream + } + cbCh <- rpcObject + } } } @@ -118,3 +145,94 @@ func (rpc *RPCConn) dispatchNotif(req Notification) { } // no need for response. } + +// RPCConntrack is a request-response tracker that is used to connect +// the response to the appropriate caller. +type rpcConnTrack struct { + ct map[uint32]chan Response // TODO: change the values of the map for callbacks. + mu sync.RWMutex +} + +// Get attempts to get a random mark from the mutex. +func (c *rpcConnTrack) Claim() (uint32, chan Response) { + // TODO: make this threadsafe. + var val uint32 + for { + + newVal := rand.Uint32() + // collision is *rare* - so we just try again. + // I hope to god you don't saturate this tracker. + c.mu.RLock() + if _, exist := c.ct[newVal]; !exist { + val = newVal + c.mu.RUnlock() + break + } + c.mu.RUnlock() + } + + // claim it + // the channel should be buffered. We only expect one value to go through. + // so the size is fixed. + ch := make(chan Response, 1) + c.mu.Lock() + c.ct[val] = ch + c.mu.Unlock() + + return val, ch +} + +// Clear deletes the connection from the tracker and returns the channel +// associated with it. The caller can use the channel afterwards +// to send the response. +func (c *rpcConnTrack) Clear(val uint32) (chan Response, error) { + // TODO: get a lock + c.mu.RLock() + ch, ok := c.ct[val] + c.mu.RUnlock() + if !ok { + return nil, errors.New("invalid msg id") + } + c.mu.Lock() + delete(c.ct, val) + c.mu.Unlock() + return ch, nil +} + +// Next, we define some helper generic functions that can be used to make +// implementing a msg wrapper easier. + +type msgpackObject interface { + msgp.Decodable + msgp.Encodable + msgp.MarshalSizer + msgp.Unmarshaler +} + +// MakeService is a generic wrapper function. It takes a function with the signature +// of func(T msgpObject)(R msgpObject, error) where T and R can be *concrete* types. +// and returns a new function that handles conversion to/from msgp.Raw. +// the function returned can be used by the RPCConn as a handler function. +func MakeService[T, R msgpackObject](fn func(T) (R, error)) func(msgp.Raw) (msgp.Raw, error) { + return func(p msgp.Raw) (msgp.Raw, error) { + // decode the raw data into a new underlying type. + var params T + // TODO: handler errors + _, err := params.UnmarshalMsg(p) + + if err != nil { + return nil, err + } + + // now, call the function fn with the given params, and record the value. + + resp, err := fn(params) + + if err != nil { + return nil, err + } + + return resp.MarshalMsg([]byte{}) + + } +} diff --git a/internal/gotelem/rpc_msg.go b/internal/gotelem/rpc_msg.go index 55e32c0..13d6ec0 100644 --- a/internal/gotelem/rpc_msg.go +++ b/internal/gotelem/rpc_msg.go @@ -18,7 +18,6 @@ const ( NotificationType RPCType = 2 ) - // the messagepack RPC spec requires that the RPC wire formts are ordered arrays, // aka tuples. we can use msgp options to make them tuple automatically, // based on the order they are declared. This makes the order of these @@ -50,8 +49,8 @@ func NewRequest(msgid uint32, method string, params msgp.Raw) *Request { // A response is the result of a function call, or an error. type Response struct { // should always be one. - msgtype RPCType `msg:"type"` - MsgId uint32 `msg:"msgid"` + msgtype RPCType `msg:"type"` + MsgId uint32 `msg:"msgid"` Error RPCError `msg:"error,allownil"` Result msgp.Raw `msg:"result,allownil"` } @@ -59,9 +58,9 @@ type Response struct { func NewResponse(msgid uint32, respErr RPCError, res msgp.Raw) *Response { return &Response{ msgtype: 1, - MsgId: msgid, - Error: respErr, - Result: res, + MsgId: msgid, + Error: respErr, + Result: res, } } @@ -69,11 +68,19 @@ func NewResponse(msgid uint32, respErr RPCError, res msgp.Raw) *Response { // succeeds and ignores responses. type Notification struct { // should always be *2* - msgtype RPCType `msg:"type"` - Method string `msg:"method"` + msgtype RPCType `msg:"type"` + Method string `msg:"method"` Params msgp.Raw `msg:"params,allownil"` } +func NewNotification(method string, params msgp.Raw) *Notification { + return &Notification{ + msgtype: 2, + Method: method, + Params: params, + } +} + // todo: should these be functions instead, since they're arrays? and we need to determine the type beforehand. func getMsgType(b []byte) RPCType { @@ -95,7 +102,6 @@ func getMsgType(b []byte) RPCType { return RPCType(vtype) } - // parseRPC takes a raw message and decodes it based on the first value // of the array (the type). It returns the decoded object. Callers // can use a type-switch to determine the type of the data. @@ -126,14 +132,12 @@ func parseRPC(raw msgp.Raw) (interface{}, error) { // RPCError is a common RPC error format. It is basically a clone of the // JSON-RPC error format. We use it so we know what to expect there. - //msgp:tuple RPCError type RPCError struct { Code int Desc string } - // Converts a go error into a RPC error. func MakeRPCError(err error) *RPCError { if err == nil { @@ -144,3 +148,7 @@ func MakeRPCError(err error) *RPCError { Desc: err.Error(), } } + +func (r *RPCError) Error() string { + return r.Desc +} diff --git a/internal/gotelem/rpc_msg_gen.go b/internal/gotelem/rpc_msg_gen.go new file mode 100644 index 0000000..1121327 --- /dev/null +++ b/internal/gotelem/rpc_msg_gen.go @@ -0,0 +1,494 @@ +package gotelem + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "github.com/tinylib/msgp/msgp" +) + +// DecodeMsg implements msgp.Decodable +func (z *Notification) DecodeMsg(dc *msgp.Reader) (err error) { + var zb0001 uint32 + zb0001, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + if zb0001 != 2 { + err = msgp.ArrayError{Wanted: 2, Got: zb0001} + return + } + z.Method, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Method") + return + } + err = z.Params.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Params") + return + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *Notification) EncodeMsg(en *msgp.Writer) (err error) { + // array header, size 2 + err = en.Append(0x92) + if err != nil { + return + } + err = en.WriteString(z.Method) + if err != nil { + err = msgp.WrapError(err, "Method") + return + } + err = z.Params.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Params") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *Notification) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // array header, size 2 + o = append(o, 0x92) + o = msgp.AppendString(o, z.Method) + o, err = z.Params.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Params") + return + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *Notification) UnmarshalMsg(bts []byte) (o []byte, err error) { + var zb0001 uint32 + zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + if zb0001 != 2 { + err = msgp.ArrayError{Wanted: 2, Got: zb0001} + return + } + z.Method, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Method") + return + } + bts, err = z.Params.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Params") + return + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *Notification) Msgsize() (s int) { + s = 1 + msgp.StringPrefixSize + len(z.Method) + z.Params.Msgsize() + return +} + +// DecodeMsg implements msgp.Decodable +func (z *RPCError) DecodeMsg(dc *msgp.Reader) (err error) { + var zb0001 uint32 + zb0001, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + if zb0001 != 2 { + err = msgp.ArrayError{Wanted: 2, Got: zb0001} + return + } + z.Code, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "Code") + return + } + z.Desc, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Desc") + return + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z RPCError) EncodeMsg(en *msgp.Writer) (err error) { + // array header, size 2 + err = en.Append(0x92) + if err != nil { + return + } + err = en.WriteInt(z.Code) + if err != nil { + err = msgp.WrapError(err, "Code") + return + } + err = en.WriteString(z.Desc) + if err != nil { + err = msgp.WrapError(err, "Desc") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z RPCError) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // array header, size 2 + o = append(o, 0x92) + o = msgp.AppendInt(o, z.Code) + o = msgp.AppendString(o, z.Desc) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *RPCError) UnmarshalMsg(bts []byte) (o []byte, err error) { + var zb0001 uint32 + zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + if zb0001 != 2 { + err = msgp.ArrayError{Wanted: 2, Got: zb0001} + return + } + z.Code, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Code") + return + } + z.Desc, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Desc") + return + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z RPCError) Msgsize() (s int) { + s = 1 + msgp.IntSize + msgp.StringPrefixSize + len(z.Desc) + return +} + +// DecodeMsg implements msgp.Decodable +func (z *RPCType) DecodeMsg(dc *msgp.Reader) (err error) { + { + var zb0001 int + zb0001, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err) + return + } + (*z) = RPCType(zb0001) + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z RPCType) EncodeMsg(en *msgp.Writer) (err error) { + err = en.WriteInt(int(z)) + if err != nil { + err = msgp.WrapError(err) + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z RPCType) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + o = msgp.AppendInt(o, int(z)) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *RPCType) UnmarshalMsg(bts []byte) (o []byte, err error) { + { + var zb0001 int + zb0001, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + (*z) = RPCType(zb0001) + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z RPCType) Msgsize() (s int) { + s = msgp.IntSize + return +} + +// DecodeMsg implements msgp.Decodable +func (z *Request) DecodeMsg(dc *msgp.Reader) (err error) { + var zb0001 uint32 + zb0001, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + if zb0001 != 3 { + err = msgp.ArrayError{Wanted: 3, Got: zb0001} + return + } + z.MsgId, err = dc.ReadUint32() + if err != nil { + err = msgp.WrapError(err, "MsgId") + return + } + z.Method, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Method") + return + } + err = z.Params.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Params") + return + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *Request) EncodeMsg(en *msgp.Writer) (err error) { + // array header, size 3 + err = en.Append(0x93) + if err != nil { + return + } + err = en.WriteUint32(z.MsgId) + if err != nil { + err = msgp.WrapError(err, "MsgId") + return + } + err = en.WriteString(z.Method) + if err != nil { + err = msgp.WrapError(err, "Method") + return + } + err = z.Params.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Params") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *Request) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // array header, size 3 + o = append(o, 0x93) + o = msgp.AppendUint32(o, z.MsgId) + o = msgp.AppendString(o, z.Method) + o, err = z.Params.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Params") + return + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *Request) UnmarshalMsg(bts []byte) (o []byte, err error) { + var zb0001 uint32 + zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + if zb0001 != 3 { + err = msgp.ArrayError{Wanted: 3, Got: zb0001} + return + } + z.MsgId, bts, err = msgp.ReadUint32Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "MsgId") + return + } + z.Method, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Method") + return + } + bts, err = z.Params.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Params") + return + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *Request) Msgsize() (s int) { + s = 1 + msgp.Uint32Size + msgp.StringPrefixSize + len(z.Method) + z.Params.Msgsize() + return +} + +// DecodeMsg implements msgp.Decodable +func (z *Response) DecodeMsg(dc *msgp.Reader) (err error) { + var zb0001 uint32 + zb0001, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + if zb0001 != 3 { + err = msgp.ArrayError{Wanted: 3, Got: zb0001} + return + } + z.MsgId, err = dc.ReadUint32() + if err != nil { + err = msgp.WrapError(err, "MsgId") + return + } + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Error") + return + } + if zb0002 != 2 { + err = msgp.ArrayError{Wanted: 2, Got: zb0002} + return + } + z.Error.Code, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "Error", "Code") + return + } + z.Error.Desc, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Error", "Desc") + return + } + err = z.Result.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Result") + return + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *Response) EncodeMsg(en *msgp.Writer) (err error) { + // array header, size 3 + err = en.Append(0x93) + if err != nil { + return + } + err = en.WriteUint32(z.MsgId) + if err != nil { + err = msgp.WrapError(err, "MsgId") + return + } + // array header, size 2 + err = en.Append(0x92) + if err != nil { + return + } + err = en.WriteInt(z.Error.Code) + if err != nil { + err = msgp.WrapError(err, "Error", "Code") + return + } + err = en.WriteString(z.Error.Desc) + if err != nil { + err = msgp.WrapError(err, "Error", "Desc") + return + } + err = z.Result.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Result") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *Response) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // array header, size 3 + o = append(o, 0x93) + o = msgp.AppendUint32(o, z.MsgId) + // array header, size 2 + o = append(o, 0x92) + o = msgp.AppendInt(o, z.Error.Code) + o = msgp.AppendString(o, z.Error.Desc) + o, err = z.Result.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Result") + return + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *Response) UnmarshalMsg(bts []byte) (o []byte, err error) { + var zb0001 uint32 + zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + if zb0001 != 3 { + err = msgp.ArrayError{Wanted: 3, Got: zb0001} + return + } + z.MsgId, bts, err = msgp.ReadUint32Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "MsgId") + return + } + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Error") + return + } + if zb0002 != 2 { + err = msgp.ArrayError{Wanted: 2, Got: zb0002} + return + } + z.Error.Code, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Error", "Code") + return + } + z.Error.Desc, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Error", "Desc") + return + } + bts, err = z.Result.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Result") + return + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *Response) Msgsize() (s int) { + s = 1 + msgp.Uint32Size + 1 + msgp.IntSize + msgp.StringPrefixSize + len(z.Error.Desc) + z.Result.Msgsize() + return +} diff --git a/internal/gotelem/rpc_msg_gen_test.go b/internal/gotelem/rpc_msg_gen_test.go new file mode 100644 index 0000000..b82e70b --- /dev/null +++ b/internal/gotelem/rpc_msg_gen_test.go @@ -0,0 +1,462 @@ +package gotelem + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "bytes" + "testing" + + "github.com/tinylib/msgp/msgp" +) + +func TestMarshalUnmarshalNotification(t *testing.T) { + v := Notification{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgNotification(b *testing.B) { + v := Notification{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgNotification(b *testing.B) { + v := Notification{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalNotification(b *testing.B) { + v := Notification{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeNotification(t *testing.T) { + v := Notification{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeNotification Msgsize() is inaccurate") + } + + vn := Notification{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeNotification(b *testing.B) { + v := Notification{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeNotification(b *testing.B) { + v := Notification{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalRPCError(t *testing.T) { + v := RPCError{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgRPCError(b *testing.B) { + v := RPCError{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgRPCError(b *testing.B) { + v := RPCError{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalRPCError(b *testing.B) { + v := RPCError{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeRPCError(t *testing.T) { + v := RPCError{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeRPCError Msgsize() is inaccurate") + } + + vn := RPCError{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeRPCError(b *testing.B) { + v := RPCError{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeRPCError(b *testing.B) { + v := RPCError{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalRequest(t *testing.T) { + v := Request{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgRequest(b *testing.B) { + v := Request{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgRequest(b *testing.B) { + v := Request{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalRequest(b *testing.B) { + v := Request{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeRequest(t *testing.T) { + v := Request{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeRequest Msgsize() is inaccurate") + } + + vn := Request{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeRequest(b *testing.B) { + v := Request{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeRequest(b *testing.B) { + v := Request{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalResponse(t *testing.T) { + v := Response{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgResponse(b *testing.B) { + v := Response{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgResponse(b *testing.B) { + v := Response{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalResponse(b *testing.B) { + v := Response{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeResponse(t *testing.T) { + v := Response{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeResponse Msgsize() is inaccurate") + } + + vn := Response{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeResponse(b *testing.B) { + v := Response{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeResponse(b *testing.B) { + v := Response{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/internal/gotelem/rpc_msg_test.go b/internal/gotelem/rpc_msg_test.go new file mode 100644 index 0000000..2c8c1cb --- /dev/null +++ b/internal/gotelem/rpc_msg_test.go @@ -0,0 +1,54 @@ +package gotelem + +import ( + "reflect" + "testing" + + "github.com/tinylib/msgp/msgp" +) + +func Test_parseRPC(t *testing.T) { + type args struct { + raw msgp.Raw + } + tests := []struct { + name string + args args + want interface{} + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseRPC(tt.args.raw) + if (err != nil) != tt.wantErr { + t.Errorf("parseRPC() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("parseRPC() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_getMsgType(t *testing.T) { + type args struct { + b []byte + } + tests := []struct { + name string + args args + want RPCType + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := getMsgType(tt.args.b); got != tt.want { + t.Errorf("getMsgType() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/internal/xbee/at.go b/internal/xbee/at.go index d4dcadd..33545fb 100644 --- a/internal/xbee/at.go +++ b/internal/xbee/at.go @@ -138,6 +138,7 @@ func encodeRemoteATCommand(at ATCmd, idx uint8, queued bool, destination uint64) // let's actually define some AT commands now. +// TODO: should we just use a function. // the AT command for the ID (Network ID). // the network identifier is used to communicate with other devices. It must match. type ATCmdID struct {