Skip to content

Commit

Permalink
Merge pull request #114 from filecoin-project/feat/custom-backend
Browse files Browse the repository at this point in the history
feat: Custom transport mode
  • Loading branch information
magik6k authored Jun 6, 2024
2 parents 81c1e3f + 938dd3e commit e75dcdc
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 0 deletions.
113 changes: 113 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,119 @@ type _ interface {

```

### Custom Transport Feature
The go-jsonrpc library supports creating clients with custom transport mechanisms (e.g. use for IPC). This allows for greater flexibility in how requests are sent and received, enabling the use of custom protocols, special handling of requests, or integration with other systems.

#### Example Usage of Custom Transport

Here is an example demonstrating how to create a custom client with a custom transport mechanism:

```go
// Setup server
serverHandler := &SimpleServerHandler{} // some type with methods

rpcServer := jsonrpc.NewServer()
rpcServer.Register("SimpleServerHandler", serverHandler)

// Custom doRequest function
doRequest := func(ctx context.Context, body []byte) (io.ReadCloser, error) {
reader := bytes.NewReader(body)
pr, pw := io.Pipe()
go func() {
defer pw.Close()
rpcServer.HandleRequest(ctx, reader, pw) // handle the rpc frame
}()
return pr, nil
}

var client struct {
Add func(int) error
}

// Create custom client
closer, err := jsonrpc.NewCustomClient("SimpleServerHandler", []interface{}{&client}, doRequest)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
defer closer()

// Use the client
if err := client.Add(10); err != nil {
log.Fatalf("Failed to call Add: %v", err)
}
fmt.Printf("Current value: %d\n", client.AddGet(5))
```

### Reverse Calling Feature
The go-jsonrpc library also supports reverse calling, where the server can make calls to the client. This is useful in scenarios where the server needs to notify or request data from the client.

NOTE: Reverse calling only works in websocket mode

#### Example Usage of Reverse Calling

Here is an example demonstrating how to set up reverse calling:

```go
// Define the client handler interface
type ClientHandler struct {
CallOnClient func(int) (int, error)
}

// Define the server handler
type ServerHandler struct {}

func (h *ServerHandler) Call(ctx context.Context) error {
revClient, ok := jsonrpc.ExtractReverseClient[ClientHandler](ctx)
if !ok {
return fmt.Errorf("no reverse client")
}

result, err := revClient.CallOnClient(7) // Multiply by 2 on client
if err != nil {
return fmt.Errorf("call on client: %w", err)
}

if result != 14 {
return fmt.Errorf("unexpected result: %d", result)
}

return nil
}

// Define client handler
type RevCallTestClientHandler struct {
}

func (h *RevCallTestClientHandler) CallOnClient(a int) (int, error) {
return a * 2, nil
}

// Setup server with reverse client capability
rpcServer := jsonrpc.NewServer(jsonrpc.WithReverseClient[ClientHandler]("Client"))
rpcServer.Register("ServerHandler", &ServerHandler{})

testServ := httptest.NewServer(rpcServer)
defer testServ.Close()

// Setup client with reverse call handler
var client struct {
Call func() error
}

closer, err := jsonrpc.NewMergeClient(context.Background(), "ws://"+testServ.Listener.Addr().String(), "ServerHandler", []interface{}{
&client,
}, nil, jsonrpc.WithClientHandler("Client", &RevCallTestClientHandler{}))
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
defer closer()

// Make a call from the client to the server, which will trigger a reverse call
if err := client.Call(); err != nil {
log.Fatalf("Failed to call server: %v", err)
}
```

## Contribute

PRs are welcome!
Expand Down
61 changes: 61 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/url"
Expand Down Expand Up @@ -129,6 +130,66 @@ func NewMergeClient(ctx context.Context, addr string, namespace string, outs []i

}

// NewCustomClient is like NewMergeClient in single-request (http) mode, except it allows for a custom doRequest function
func NewCustomClient(namespace string, outs []interface{}, doRequest func(ctx context.Context, body []byte) (io.ReadCloser, error), opts ...Option) (ClientCloser, error) {
config := defaultConfig()
for _, o := range opts {
o(&config)
}

c := client{
namespace: namespace,
paramEncoders: config.paramEncoders,
errors: config.errors,
}

stop := make(chan struct{})
c.exiting = stop

c.doRequest = func(ctx context.Context, cr clientRequest) (clientResponse, error) {
b, err := json.Marshal(&cr.req)
if err != nil {
return clientResponse{}, xerrors.Errorf("marshalling request: %w", err)
}

if ctx == nil {
ctx = context.Background()
}

rawResp, err := doRequest(ctx, b)
if err != nil {
return clientResponse{}, xerrors.Errorf("doRequest failed: %w", err)
}

defer rawResp.Close()

var resp clientResponse
if cr.req.ID != nil { // non-notification
if err := json.NewDecoder(rawResp).Decode(&resp); err != nil {
return clientResponse{}, xerrors.Errorf("unmarshaling response: %w", err)
}

if resp.ID, err = normalizeID(resp.ID); err != nil {
return clientResponse{}, xerrors.Errorf("failed to response ID: %w", err)
}

if resp.ID != cr.req.ID {
return clientResponse{}, xerrors.New("request and response id didn't match")
}
}

return resp, nil
}

if err := c.provide(outs); err != nil {
return nil, err
}

return func() {
close(stop)
}, nil
}

func httpClient(ctx context.Context, addr string, namespace string, outs []interface{}, requestHeader http.Header, config Config) (ClientCloser, error) {
c := client{
namespace: namespace,
Expand Down
41 changes: 41 additions & 0 deletions rpc_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package jsonrpc

import (
"bytes"
"context"
"encoding/json"
"errors"
Expand Down Expand Up @@ -1651,3 +1652,43 @@ func TestBigResult(t *testing.T) {

fmt.Println("done")
}

func TestNewCustomClient(t *testing.T) {
// Setup server
serverHandler := &SimpleServerHandler{}
rpcServer := NewServer()
rpcServer.Register("SimpleServerHandler", serverHandler)

// Custom doRequest function
doRequest := func(ctx context.Context, body []byte) (io.ReadCloser, error) {
reader := bytes.NewReader(body)
pr, pw := io.Pipe()
go func() {
defer pw.Close()
rpcServer.HandleRequest(ctx, reader, pw)
}()
return pr, nil
}

var client struct {
Add func(int) error
AddGet func(int) int
}

// Create custom client
closer, err := NewCustomClient("SimpleServerHandler", []interface{}{&client}, doRequest)
require.NoError(t, err)
defer closer()

// Add(int) error
require.NoError(t, client.Add(10))
require.Equal(t, int32(10), serverHandler.n)

err = client.Add(-3546)
require.EqualError(t, err, "test")

// AddGet(int) int
n := client.AddGet(3)
require.Equal(t, 13, n)
require.Equal(t, int32(13), serverHandler.n)
}
4 changes: 4 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ func (s *RPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.handleReader(ctx, r.Body, w, rpcError)
}

func (s *RPCServer) HandleRequest(ctx context.Context, r io.Reader, w io.Writer) {
s.handleReader(ctx, r, w, rpcError)
}

func rpcError(wf func(func(io.Writer)), req *request, code ErrorCode, err error) {
log.Errorf("RPC Error: %s", err)
wf(func(w io.Writer) {
Expand Down

0 comments on commit e75dcdc

Please sign in to comment.