rpc: add logging, refactor call signature

This commit is contained in:
saji 2023-05-02 19:16:20 -05:00
parent bbe84c865d
commit 62ed745169
3 changed files with 47 additions and 25 deletions

1
go.mod
View file

@ -20,6 +20,7 @@ require (
go.uber.org/atomic v1.10.0 // indirect go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.24.0 // indirect go.uber.org/zap v1.24.0 // indirect
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53 // indirect
golang.org/x/mod v0.10.0 // indirect golang.org/x/mod v0.10.0 // indirect
golang.org/x/tools v0.8.0 // indirect golang.org/x/tools v0.8.0 // indirect
) )

2
go.sum
View file

@ -30,6 +30,8 @@ go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53 h1:5llv2sWeaMSnA3w2kS57ouQQ4pudlXrR0dCgw51QK9o=
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk= golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk=

View file

@ -7,6 +7,7 @@ import (
"sync" "sync"
"github.com/tinylib/msgp/msgp" "github.com/tinylib/msgp/msgp"
"golang.org/x/exp/slog"
) )
// the target architecture is a subscribe function that // the target architecture is a subscribe function that
@ -19,34 +20,37 @@ import (
// we should register handlers. They should handle serialization // we should register handlers. They should handle serialization
// and deserialization on their own. This way we avoid reflect. // and deserialization on their own. This way we avoid reflect.
// since reflected code can be more complex under the hood. // since reflected code can be more complex under the hood.
// to make writing services easier, we can use generic functions
// that convert a normal go function to a serviceFunc
// ServiceFunc is a RPC service handler. // ServiceFunc is a RPC service handler. It can be created manually,
// or by using the generic MakeService function on a
// `func(msgp.Encoder) (msgp.Deocder, error)`
type ServiceFunc func(params msgp.Raw) (res msgp.Raw, err error) type ServiceFunc func(params msgp.Raw) (res msgp.Raw, err error)
// RPCConn is a single RPC communication pair. // RPCConn is a single RPC communication pair. It is used by both the
// "server" aka listener, and client. Dynamic registration of service
// handlers is supported.
type RPCConn struct { type RPCConn struct {
// TODO: use io.readwritecloser? // TODO: use io.readwritecloser?
conn net.Conn conn net.Conn
handlers map[string]ServiceFunc handlers map[string]ServiceFunc
// indicates what messages we've used.
// TODO: use a channel to return a response?
// TODO: lock with mutex
ct rpcConnTrack ct rpcConnTrack
slog.Logger
} }
// Call intiates an RPC call to a remote method and returns the // Call intiates an RPC call to a remote method and returns the
// response, or the error, if any. // response, or the error, if any.
// TODO: determine signature // TODO: determine signature. Should params be msgp.Raw?
// TODO: this should block? func (rpc *RPCConn) Call(method string, params msgp.Raw) (msgp.Raw, error) {
func (rpc *RPCConn) Call(method string, params msgp.Marshaler) (msgp.Raw, error) {
// TODO: error handling. // TODO: error handling.
rawParam, _ := params.MarshalMsg([]byte{})
id, cb := rpc.ct.Claim() id, cb := rpc.ct.Claim()
req := NewRequest(id, method, rawParam) req := NewRequest(id, method, params)
w := msgp.NewWriter(rpc.conn) w := msgp.NewWriter(rpc.conn)
req.EncodeMsg(w) req.EncodeMsg(w)
@ -61,11 +65,10 @@ func (rpc *RPCConn) Call(method string, params msgp.Marshaler) (msgp.Raw, error)
// return any information. There is no response from the server. // return any information. There is no response from the server.
// This method will not block. An error is returned if there is a local // This method will not block. An error is returned if there is a local
// problem. // problem.
func (rpc *RPCConn) Notify(method string, params msgp.Marshaler) { func (rpc *RPCConn) Notify(method string, params msgp.Raw) {
// TODO: return an error if there's a local problem? // TODO: return an error if there's a local problem?
rawParam, _ := params.MarshalMsg([]byte{})
req := NewNotification(method, rawParam) req := NewNotification(method, params)
w := msgp.NewWriter(rpc.conn) w := msgp.NewWriter(rpc.conn)
req.EncodeMsg(w) req.EncodeMsg(w)
@ -79,6 +82,8 @@ func (rpc *RPCConn) RegisterHandler(name string, fn ServiceFunc) error {
// TODO: mutex lock for sync (or use sync.map? // TODO: mutex lock for sync (or use sync.map?
rpc.handlers[name] = fn rpc.handlers[name] = fn
rpc.Logger.Info("registered a new handler", "name", name, "fn", fn)
return nil return nil
} }
@ -97,7 +102,12 @@ func (rpc *RPCConn) Serve() {
for { for {
rawmsg.DecodeMsg(msgReader) rawmsg.DecodeMsg(msgReader)
rpcIntf, _ := parseRPC(rawmsg) rpcIntf, err := parseRPC(rawmsg)
if err != nil {
rpc.Logger.Warn("Could not parse RPC message", "err", err)
continue
}
switch rpcObject := rpcIntf.(type) { switch rpcObject := rpcIntf.(type) {
case Request: case Request:
@ -110,6 +120,8 @@ func (rpc *RPCConn) Serve() {
cbCh, err := rpc.ct.Clear(rpcObject.MsgId) cbCh, err := rpc.ct.Clear(rpcObject.MsgId)
if err != nil { if err != nil {
// TODO: scream // TODO: scream
rpc.Logger.Warn("could not get rpc callback", "msgid", rpcObject.MsgId, "err", err)
continue
} }
cbCh <- rpcObject cbCh <- rpcObject
} }
@ -121,17 +133,14 @@ func (rpc *RPCConn) dispatch(req Request) {
result, err := rpc.handlers[req.Method](req.Params) result, err := rpc.handlers[req.Method](req.Params)
if err != nil { if err != nil {
// log the error. rpc.Logger.Warn("error dispatching rpc function", "method", req.Method, "err", err)
} }
// construct the response frame. // construct the response frame.
var rpcE *RPCError = MakeRPCError(err) var rpcE *RPCError = MakeRPCError(err)
w := msgp.NewWriter(rpc.conn) w := msgp.NewWriter(rpc.conn)
resBuf := make(msgp.Raw, result.Msgsize())
result.MarshalMsg(resBuf) response := NewResponse(req.MsgId, *rpcE, result)
response := NewResponse(req.MsgId, *rpcE, resBuf)
response.EncodeMsg(w) response.EncodeMsg(w)
@ -141,9 +150,9 @@ func (rpc *RPCConn) dispatchNotif(req Notification) {
_, err := rpc.handlers[req.Method](req.Params) _, err := rpc.handlers[req.Method](req.Params)
if err != nil { if err != nil {
// log the error. // log the error, but don't do anything about it.
rpc.Logger.Warn("error dispatching rpc function", "method", req.Method, "err", err)
} }
// no need for response.
} }
// RPCConntrack is a request-response tracker that is used to connect // RPCConntrack is a request-response tracker that is used to connect
@ -214,7 +223,7 @@ type msgpackObject interface {
// and returns a new function that handles conversion to/from msgp.Raw. // and returns a new function that handles conversion to/from msgp.Raw.
// the function returned can be used by the RPCConn as a handler function. // the function returned can be used by the RPCConn as a handler function.
// This function can typically have it's paramters inferred. // This function can typically have it's paramters inferred.
func MakeService[T, R msgpackObject](fn func(T) (R, error)) func(msgp.Raw) (msgp.Raw, error) { func MakeService[T, R msgpackObject](fn func(T) (R, error)) ServiceFunc {
return func(p msgp.Raw) (msgp.Raw, error) { return func(p msgp.Raw) (msgp.Raw, error) {
// decode the raw data into a new underlying type. // decode the raw data into a new underlying type.
var params T var params T
@ -252,7 +261,9 @@ func MakeService[T, R msgpackObject](fn func(T) (R, error)) func(msgp.Raw) (msgp
// They cannot be inferred from the given parameters. // They cannot be inferred from the given parameters.
func MakeCaller[T, R msgpackObject]() func(string, T, *RPCConn) (R, error) { func MakeCaller[T, R msgpackObject]() func(string, T, *RPCConn) (R, error) {
return func(method string, param T, rpc *RPCConn) (R, error) { return func(method string, param T, rpc *RPCConn) (R, error) {
rawResponse, err := rpc.Call(method, param)
rawParam, _ := param.MarshalMsg([]byte{})
rawResponse, err := rpc.Call(method, rawParam)
if err != nil { if err != nil {
var emtpyR R var emtpyR R
return emtpyR, err return emtpyR, err
@ -268,7 +279,7 @@ func MakeCaller[T, R msgpackObject]() func(string, T, *RPCConn) (R, error) {
// MakeBoundCaller is like MakeCaller, except the RPC connection and method name are // MakeBoundCaller is like MakeCaller, except the RPC connection and method name are
// fixed and cannot be adjusted later. This function is more elegant but less flexible // fixed and cannot be adjusted later. This function is more elegant but less flexible
// than MakeCaller // than MakeCaller and should be used when performance is not critical.
// //
// This generic function must always have it's type paratmers declared explicitly. // This generic function must always have it's type paratmers declared explicitly.
// They cannot be inferred from the given parameters. // They cannot be inferred from the given parameters.
@ -279,8 +290,9 @@ func MakeBoundCaller[T, R msgpackObject](rpc *RPCConn, method string) func(T) (R
// invoke rpc.Call // invoke rpc.Call
// await response // await response
// unpack values. // unpack values.
rawParam, _ := param.MarshalMsg([]byte{})
rawResponse, err := rpc.Call(method, param) rawResponse, err := rpc.Call(method, rawParam)
if err != nil { if err != nil {
var emtpyR R var emtpyR R
return emtpyR, err return emtpyR, err
@ -294,3 +306,10 @@ func MakeBoundCaller[T, R msgpackObject](rpc *RPCConn, method string) func(T) (R
} }
} }
func MakeNotifier[T msgpackObject]() func(string, T, *RPCConn) {
return func(method string, param T, rpc *RPCConn) {
rawParam, _ := param.MarshalMsg([]byte{})
rpc.Notify(method, rawParam)
}
}