diff --git a/go.mod b/go.mod index 13ce34a..d15a018 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.24.0 // indirect + golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53 // indirect golang.org/x/mod v0.10.0 // indirect golang.org/x/tools v0.8.0 // indirect ) diff --git a/go.sum b/go.sum index b472da2..472bd42 100644 --- a/go.sum +++ b/go.sum @@ -30,6 +30,8 @@ go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53 h1:5llv2sWeaMSnA3w2kS57ouQQ4pudlXrR0dCgw51QK9o= +golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk= diff --git a/internal/gotelem/rpc.go b/internal/gotelem/rpc.go index b869578..1b8e6c2 100644 --- a/internal/gotelem/rpc.go +++ b/internal/gotelem/rpc.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/tinylib/msgp/msgp" + "golang.org/x/exp/slog" ) // the target architecture is a subscribe function that @@ -19,34 +20,37 @@ import ( // we should register handlers. They should handle serialization // and deserialization on their own. This way we avoid reflect. // since reflected code can be more complex under the hood. +// to make writing services easier, we can use generic functions +// that convert a normal go function to a serviceFunc -// ServiceFunc is a RPC service handler. +// ServiceFunc is a RPC service handler. It can be created manually, +// or by using the generic MakeService function on a +// `func(msgp.Encoder) (msgp.Deocder, error)` type ServiceFunc func(params msgp.Raw) (res msgp.Raw, err error) -// RPCConn is a single RPC communication pair. +// RPCConn is a single RPC communication pair. It is used by both the +// "server" aka listener, and client. Dynamic registration of service +// handlers is supported. type RPCConn struct { // TODO: use io.readwritecloser? conn net.Conn handlers map[string]ServiceFunc - // indicates what messages we've used. - // TODO: use a channel to return a response? - // TODO: lock with mutex ct rpcConnTrack + + slog.Logger } // Call intiates an RPC call to a remote method and returns the // response, or the error, if any. -// TODO: determine signature -// TODO: this should block? -func (rpc *RPCConn) Call(method string, params msgp.Marshaler) (msgp.Raw, error) { +// TODO: determine signature. Should params be msgp.Raw? +func (rpc *RPCConn) Call(method string, params msgp.Raw) (msgp.Raw, error) { // TODO: error handling. - rawParam, _ := params.MarshalMsg([]byte{}) id, cb := rpc.ct.Claim() - req := NewRequest(id, method, rawParam) + req := NewRequest(id, method, params) w := msgp.NewWriter(rpc.conn) req.EncodeMsg(w) @@ -61,11 +65,10 @@ func (rpc *RPCConn) Call(method string, params msgp.Marshaler) (msgp.Raw, error) // return any information. There is no response from the server. // This method will not block. An error is returned if there is a local // problem. -func (rpc *RPCConn) Notify(method string, params msgp.Marshaler) { +func (rpc *RPCConn) Notify(method string, params msgp.Raw) { // TODO: return an error if there's a local problem? - rawParam, _ := params.MarshalMsg([]byte{}) - req := NewNotification(method, rawParam) + req := NewNotification(method, params) w := msgp.NewWriter(rpc.conn) req.EncodeMsg(w) @@ -79,6 +82,8 @@ func (rpc *RPCConn) RegisterHandler(name string, fn ServiceFunc) error { // TODO: mutex lock for sync (or use sync.map? rpc.handlers[name] = fn + rpc.Logger.Info("registered a new handler", "name", name, "fn", fn) + return nil } @@ -97,7 +102,12 @@ func (rpc *RPCConn) Serve() { for { rawmsg.DecodeMsg(msgReader) - rpcIntf, _ := parseRPC(rawmsg) + rpcIntf, err := parseRPC(rawmsg) + + if err != nil { + rpc.Logger.Warn("Could not parse RPC message", "err", err) + continue + } switch rpcObject := rpcIntf.(type) { case Request: @@ -110,6 +120,8 @@ func (rpc *RPCConn) Serve() { cbCh, err := rpc.ct.Clear(rpcObject.MsgId) if err != nil { // TODO: scream + rpc.Logger.Warn("could not get rpc callback", "msgid", rpcObject.MsgId, "err", err) + continue } cbCh <- rpcObject } @@ -121,17 +133,14 @@ func (rpc *RPCConn) dispatch(req Request) { result, err := rpc.handlers[req.Method](req.Params) if err != nil { - // log the error. + rpc.Logger.Warn("error dispatching rpc function", "method", req.Method, "err", err) } // construct the response frame. var rpcE *RPCError = MakeRPCError(err) w := msgp.NewWriter(rpc.conn) - resBuf := make(msgp.Raw, result.Msgsize()) - result.MarshalMsg(resBuf) - - response := NewResponse(req.MsgId, *rpcE, resBuf) + response := NewResponse(req.MsgId, *rpcE, result) response.EncodeMsg(w) @@ -141,9 +150,9 @@ func (rpc *RPCConn) dispatchNotif(req Notification) { _, err := rpc.handlers[req.Method](req.Params) if err != nil { - // log the error. + // log the error, but don't do anything about it. + rpc.Logger.Warn("error dispatching rpc function", "method", req.Method, "err", err) } - // no need for response. } // RPCConntrack is a request-response tracker that is used to connect @@ -214,7 +223,7 @@ type msgpackObject interface { // and returns a new function that handles conversion to/from msgp.Raw. // the function returned can be used by the RPCConn as a handler function. // This function can typically have it's paramters inferred. -func MakeService[T, R msgpackObject](fn func(T) (R, error)) func(msgp.Raw) (msgp.Raw, error) { +func MakeService[T, R msgpackObject](fn func(T) (R, error)) ServiceFunc { return func(p msgp.Raw) (msgp.Raw, error) { // decode the raw data into a new underlying type. var params T @@ -252,7 +261,9 @@ func MakeService[T, R msgpackObject](fn func(T) (R, error)) func(msgp.Raw) (msgp // They cannot be inferred from the given parameters. func MakeCaller[T, R msgpackObject]() func(string, T, *RPCConn) (R, error) { return func(method string, param T, rpc *RPCConn) (R, error) { - rawResponse, err := rpc.Call(method, param) + + rawParam, _ := param.MarshalMsg([]byte{}) + rawResponse, err := rpc.Call(method, rawParam) if err != nil { var emtpyR R return emtpyR, err @@ -268,7 +279,7 @@ func MakeCaller[T, R msgpackObject]() func(string, T, *RPCConn) (R, error) { // MakeBoundCaller is like MakeCaller, except the RPC connection and method name are // fixed and cannot be adjusted later. This function is more elegant but less flexible -// than MakeCaller +// than MakeCaller and should be used when performance is not critical. // // This generic function must always have it's type paratmers declared explicitly. // They cannot be inferred from the given parameters. @@ -279,8 +290,9 @@ func MakeBoundCaller[T, R msgpackObject](rpc *RPCConn, method string) func(T) (R // invoke rpc.Call // await response // unpack values. + rawParam, _ := param.MarshalMsg([]byte{}) - rawResponse, err := rpc.Call(method, param) + rawResponse, err := rpc.Call(method, rawParam) if err != nil { var emtpyR R return emtpyR, err @@ -294,3 +306,10 @@ func MakeBoundCaller[T, R msgpackObject](rpc *RPCConn, method string) func(T) (R } } + +func MakeNotifier[T msgpackObject]() func(string, T, *RPCConn) { + return func(method string, param T, rpc *RPCConn) { + rawParam, _ := param.MarshalMsg([]byte{}) + rpc.Notify(method, rawParam) + } +}