diff --git a/internal/gotelem/rpc_msg_test.go b/internal/gotelem/rpc_msg_test.go deleted file mode 100644 index 2c8c1cb..0000000 --- a/internal/gotelem/rpc_msg_test.go +++ /dev/null @@ -1,54 +0,0 @@ -package gotelem - -import ( - "reflect" - "testing" - - "github.com/tinylib/msgp/msgp" -) - -func Test_parseRPC(t *testing.T) { - type args struct { - raw msgp.Raw - } - tests := []struct { - name string - args args - want interface{} - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := parseRPC(tt.args.raw) - if (err != nil) != tt.wantErr { - t.Errorf("parseRPC() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("parseRPC() = %v, want %v", got, tt.want) - } - }) - } -} - -func Test_getMsgType(t *testing.T) { - type args struct { - b []byte - } - tests := []struct { - name string - args args - want RPCType - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := getMsgType(tt.args.b); got != tt.want { - t.Errorf("getMsgType() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/internal/gotelem/rpc.go b/internal/mprpc/rpc.go similarity index 72% rename from internal/gotelem/rpc.go rename to internal/mprpc/rpc.go index 1b8e6c2..82cea36 100644 --- a/internal/gotelem/rpc.go +++ b/internal/mprpc/rpc.go @@ -1,36 +1,63 @@ -package gotelem +/* +mprpc is a simple bidirectional RPC library using the MessagePack-RPC spec. + +It fully implements the spec and additionally provides Go `error“ handling by +converting the error to a standard format for other clients. + +mprpc does not have a typical server/client designation - both use "handlers", +which expose methods to be called over the network. A "client" would be an +RPCConn which doesn't expose any services, and a "server" would be an RPCConn +that doesn't make any `Call`s to the other side. + +This lack of discrete server and client enables mprpc to implement a basic +"streaming" architecture on top of the MessagePack-RPC spec, which does not +include streaming primitives. Instead, we can provide simple "service handlers" +as a callback/destination for streaming data. + +For example, a "client" could subscribe to events from the "server", by +providing a callback service to point events to. Then, the "server" would +Notify() the callback service with the new event as an argument every time it +occured. While this may be less optimal than protocol-level streaming, it is +far simpler. + +The idiomatic way to use mprpc is to use the generic functions that are provided +as helpers. They allow the programmer to easily wrap functions in a closure that +automatically encodes and decodes the parameters and results to their +MessagePack representations. See the Make* generic functions for more information. + + // Assume myParam and myResult are MessagePack-enabled structs. + // Use `msgp` to generate the required functions for them. + + // this is our plain function - we can call it locally to test. + func myPlainFunction(p myParam) (r myResult, err error) + + // wrapped is a ServiceFunc that can be passed to rpcConn.RegisterHandler + var wrapped := MakeService(myPlainFunction) + +The generic functions allow for flexiblity and elegant code while still keeping +the underlying implementation reflect-free. For more complex functions (i.e +multiple parameters or return types), a second layer of indirection can be used. +*/ +package mprpc import ( - "errors" - "math/rand" "net" - "sync" "github.com/tinylib/msgp/msgp" "golang.org/x/exp/slog" ) -// the target architecture is a subscribe function that -// takes a can FILTER. Then the server will emit notifications. -// that contain new can packets as they come in. - -// this means that the client should be able to handle -// notify packets on top of response packets. - -// we should register handlers. They should handle serialization -// and deserialization on their own. This way we avoid reflect. -// since reflected code can be more complex under the hood. -// to make writing services easier, we can use generic functions -// that convert a normal go function to a serviceFunc - -// ServiceFunc is a RPC service handler. It can be created manually, -// or by using the generic MakeService function on a -// `func(msgp.Encoder) (msgp.Deocder, error)` +// ServiceFunc is a RPC service handler. +// It can be created manually, or by using the generic MakeService function on a +// +// func(msgp.Encoder) (msgp.Deocder, error) +// +// type. type ServiceFunc func(params msgp.Raw) (res msgp.Raw, err error) -// RPCConn is a single RPC communication pair. It is used by both the -// "server" aka listener, and client. Dynamic registration of service -// handlers is supported. +// RPCConn is a single RPC communication pair. +// It is used by both the +// "server" aka listener, and client. type RPCConn struct { // TODO: use io.readwritecloser? conn net.Conn @@ -42,8 +69,8 @@ type RPCConn struct { } // Call intiates an RPC call to a remote method and returns the -// response, or the error, if any. -// TODO: determine signature. Should params be msgp.Raw? +// response, or the error, if any. To make calling easier, you can +// construct a "Caller" with MakeCaller func (rpc *RPCConn) Call(method string, params msgp.Raw) (msgp.Raw, error) { // TODO: error handling. @@ -155,59 +182,6 @@ func (rpc *RPCConn) dispatchNotif(req Notification) { } } -// RPCConntrack is a request-response tracker that is used to connect -// the response to the appropriate caller. -type rpcConnTrack struct { - ct map[uint32]chan Response // TODO: change the values of the map for callbacks. - mu sync.RWMutex -} - -// Get attempts to get a random mark from the mutex. -func (c *rpcConnTrack) Claim() (uint32, chan Response) { - // TODO: make this threadsafe. - var val uint32 - for { - - newVal := rand.Uint32() - // collision is *rare* - so we just try again. - // I hope to god you don't saturate this tracker. - c.mu.RLock() - if _, exist := c.ct[newVal]; !exist { - val = newVal - c.mu.RUnlock() - break - } - c.mu.RUnlock() - } - - // claim it - // the channel should be buffered. We only expect one value to go through. - // so the size is fixed. - ch := make(chan Response, 1) - c.mu.Lock() - c.ct[val] = ch - c.mu.Unlock() - - return val, ch -} - -// Clear deletes the connection from the tracker and returns the channel -// associated with it. The caller can use the channel afterwards -// to send the response. -func (c *rpcConnTrack) Clear(val uint32) (chan Response, error) { - // TODO: get a lock - c.mu.RLock() - ch, ok := c.ct[val] - c.mu.RUnlock() - if !ok { - return nil, errors.New("invalid msg id") - } - c.mu.Lock() - delete(c.ct, val) - c.mu.Unlock() - return ch, nil -} - // Next, we define some helper generic functions that can be used to make // implementing a msg wrapper easier. diff --git a/internal/gotelem/rpc_msg.go b/internal/mprpc/rpc_msg.go similarity index 70% rename from internal/gotelem/rpc_msg.go rename to internal/mprpc/rpc_msg.go index 13d6ec0..49ad03b 100644 --- a/internal/gotelem/rpc_msg.go +++ b/internal/mprpc/rpc_msg.go @@ -1,4 +1,4 @@ -package gotelem +package mprpc import ( "errors" @@ -6,10 +6,9 @@ import ( "github.com/tinylib/msgp/msgp" ) -// this file is a simple implementation of the msgpack-rpc data formato. -// it also contains an RPC server and client. -// We can port this to python rather easily too. +// this file is a simple implementation of the msgpack-rpc data formats. +// RPCType is the message type that is being sent. type RPCType int const ( @@ -28,13 +27,17 @@ const ( //msgp:tuple Response //msgp:tuple Notification -// A request is a function call that expects a Response. +// Request represents a function call that expects a Response. type Request struct { // should always be zero. - msgtype RPCType `msg:"type"` - MsgId uint32 `msg:"msgid"` - Method string `msg:"method"` - Params msgp.Raw `msg:"params,allownil"` + msgtype RPCType `msg:"type"` + // MsgId is used to match a Response with a Request + MsgId uint32 `msg:"msgid"` + // Method is the name of the method/service to execute on the remote + Method string `msg:"method"` + // Params is the arguments of the method/service. It can be any + // MessagePack-serializable type. + Params msgp.Raw `msg:"params,allownil"` } func NewRequest(msgid uint32, method string, params msgp.Raw) *Request { @@ -46,13 +49,17 @@ func NewRequest(msgid uint32, method string, params msgp.Raw) *Request { } } -// A response is the result of a function call, or an error. +// A Response is the result and error given from calling a service. type Response struct { // should always be one. - msgtype RPCType `msg:"type"` - MsgId uint32 `msg:"msgid"` - Error RPCError `msg:"error,allownil"` - Result msgp.Raw `msg:"result,allownil"` + msgtype RPCType `msg:"type"` + // MsgId is an identifier used to match this Response with the Request that created it. + MsgId uint32 `msg:"msgid"` + // Error is the error encountered while attempting to execute the method, if any. + Error RPCError `msg:"error,allownil"` + // Result is the raw object that was returned by the calling method. It + // can be any MessagePack-serializable object. + Result msgp.Raw `msg:"result,allownil"` } func NewResponse(msgid uint32, respErr RPCError, res msgp.Raw) *Response { @@ -81,9 +88,9 @@ func NewNotification(method string, params msgp.Raw) *Notification { } } -// todo: should these be functions instead, since they're arrays? and we need to determine the type beforehand. - -func getMsgType(b []byte) RPCType { +// getMsgType uses raw messagpack RPC to return the underlying message type from +// the raw array given by b. +func getMsgType(b msgp.Raw) RPCType { size, next, err := msgp.ReadArrayHeaderBytes(b) if err != nil { panic(err) @@ -129,16 +136,16 @@ func parseRPC(raw msgp.Raw) (interface{}, error) { } } +//msgp:tuple RPCError + // RPCError is a common RPC error format. It is basically a clone of the // JSON-RPC error format. We use it so we know what to expect there. - -//msgp:tuple RPCError type RPCError struct { Code int Desc string } -// Converts a go error into a RPC error. +// Converts a Go error into a RPC error. func MakeRPCError(err error) *RPCError { if err == nil { return nil @@ -149,6 +156,7 @@ func MakeRPCError(err error) *RPCError { } } +// Implements the Error interface for RPCError func (r *RPCError) Error() string { return r.Desc } diff --git a/internal/gotelem/rpc_msg_gen.go b/internal/mprpc/rpc_msg_gen.go similarity index 99% rename from internal/gotelem/rpc_msg_gen.go rename to internal/mprpc/rpc_msg_gen.go index 1121327..0524814 100644 --- a/internal/gotelem/rpc_msg_gen.go +++ b/internal/mprpc/rpc_msg_gen.go @@ -1,4 +1,4 @@ -package gotelem +package mprpc // Code generated by github.com/tinylib/msgp DO NOT EDIT. diff --git a/internal/gotelem/rpc_msg_gen_test.go b/internal/mprpc/rpc_msg_gen_test.go similarity index 99% rename from internal/gotelem/rpc_msg_gen_test.go rename to internal/mprpc/rpc_msg_gen_test.go index b82e70b..f451535 100644 --- a/internal/gotelem/rpc_msg_gen_test.go +++ b/internal/mprpc/rpc_msg_gen_test.go @@ -1,4 +1,4 @@ -package gotelem +package mprpc // Code generated by github.com/tinylib/msgp DO NOT EDIT. diff --git a/internal/mprpc/rpcconntrack.go b/internal/mprpc/rpcconntrack.go new file mode 100644 index 0000000..e5f9fea --- /dev/null +++ b/internal/mprpc/rpcconntrack.go @@ -0,0 +1,64 @@ +package mprpc + +import ( + "errors" + "math/rand" + "sync" +) + +// RPCConntrack is a request-response tracker that is used to connect +// the response to the appropriate caller. +type rpcConnTrack struct { + ct map[uint32]chan Response // TODO: change the values of the map for callbacks. + mu sync.RWMutex +} + +// Get attempts to get a random mark from the mutex. +func (c *rpcConnTrack) Claim() (uint32, chan Response) { + // TODO: make this threadsafe. + var val uint32 + for { + + // + newVal := rand.Uint32() + + // BUG(saji): rpcConnTrack collisions are inefficient. + + // collision is *rare* - so we just try again. + // I hope to god you don't saturate this tracker. + c.mu.RLock() + if _, exist := c.ct[newVal]; !exist { + val = newVal + c.mu.RUnlock() + break + } + c.mu.RUnlock() + } + + // claim it + // the channel should be buffered. We only expect one value to go through. + // so the size is fixed to 1. + ch := make(chan Response, 1) + c.mu.Lock() + c.ct[val] = ch + c.mu.Unlock() + + return val, ch +} + +// Clear deletes the connection from the tracker and returns the channel +// associated with it. The caller can use the channel afterwards +// to send the response. +func (c *rpcConnTrack) Clear(val uint32) (chan Response, error) { + // TODO: get a lock + c.mu.RLock() + ch, ok := c.ct[val] + c.mu.RUnlock() + if !ok { + return nil, errors.New("invalid msg id") + } + c.mu.Lock() + delete(c.ct, val) + c.mu.Unlock() + return ch, nil +}