Skip to content

Commit

Permalink
<fix> #260 (#268)
Browse files Browse the repository at this point in the history
Co-authored-by: ken <[email protected]>
  • Loading branch information
Keyon11 and ken authored Mar 7, 2024
1 parent aa7c485 commit 6a6596c
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 106 deletions.
201 changes: 99 additions & 102 deletions v3/client/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"sync/atomic"
"time"

cache "github.com/patrickmn/go-cache"

"github.com/FISCO-BCOS/bcos-c-sdk/bindings/go/csdk"
"github.com/FISCO-BCOS/go-sdk/v3/types"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -59,19 +57,19 @@ type Error interface {

// Connection represents a connection to an RPC server.
type Connection struct {
csdk *csdk.CSDK
idCounter int64
blockNumberNotify func(int64)
notifyLock sync.Mutex
transactionHandlers *cache.Cache
closed bool
csdk *csdk.CSDK
idCounter int64
blockNumberNotify func(int64)
notifyLock sync.Mutex
closed bool
lock sync.Mutex
}

type requestOp struct {
ids []json.RawMessage
err error
respChanData *csdk.CallbackChan
handler func(*types.Receipt, error)
//handler func(*types.Receipt, error)
}

type EventLogRespResult struct {
Expand Down Expand Up @@ -143,8 +141,9 @@ func NewConnectionByFile(configFile, groupID string, privateKey []byte) (*Connec
if err != nil {
return nil, err
}
c := &Connection{csdk: sdk, transactionHandlers: cache.New(defaultTransactionTimeout, cleanupInterval)}
go c.processTransactionResponses()
c := &Connection{
csdk: sdk,
}
return c, nil
}

Expand Down Expand Up @@ -173,51 +172,42 @@ func NewConnection(config *Config) (*Connection, error) {
if err != nil {
return nil, err
}
c := &Connection{csdk: sdk, transactionHandlers: cache.New(defaultTransactionTimeout, cleanupInterval)}
go c.processTransactionResponses()
c := &Connection{
csdk: sdk,
}
return c, nil
}
func (c *Connection) GetCSDK() *csdk.CSDK {
return c.csdk
}

func (c *Connection) processTransactionResponses() {
for {
if !c.closed {
items := c.transactionHandlers.Items()
for key, item := range items {
op := item.Object.(*requestOp)
if len(op.respChanData.Data) > 0 {
go func() {
resp, _, err := op.waitRpcMessage()
if err != nil {
op.handler(nil, err)
return
}
if resp.Error != nil {
op.handler(nil, resp.Error)
return
}
if len(resp.Result) == 0 {
op.handler(nil, errors.New("result is null"))
return
}
var receipt types.Receipt
err = json.Unmarshal(resp.Result, &receipt)
if err != nil {
op.handler(nil, fmt.Errorf("unmarshal receipt error: %v", err))
return
}
op.handler(&receipt, nil)
}()
c.transactionHandlers.Delete(key)
}
}
} else {
func wrapTransactionResponsesHandler(f func(*types.Receipt, error)) func([]byte, error) {
return func(bytes []byte, err error) {
var jrm jsonrpcMessage
if err != nil {
f(nil, err)
return
}
if err = json.Unmarshal(bytes, &jrm); err != nil {
f(nil, err)
return
}
if jrm.Error != nil {
f(nil, jrm.Error)
return
}
if len(jrm.Result) == 0 {
f(nil, errors.New("result is null"))
return
}
var receipt types.Receipt
err = json.Unmarshal(jrm.Result, &receipt)
if err != nil {
f(nil, fmt.Errorf("unmarshal receipt error: %v", err))
return
}
f(&receipt, nil)
}

}

func (c *Connection) nextID() int64 {
Expand All @@ -238,6 +228,14 @@ func (c *Connection) NewMessage(method string, paramsIn ...interface{}) (*jsonrp

// Close closes the client, aborting any in-flight requests.
func (c *Connection) Close() {
if c.closed {
return
}
c.lock.Lock()
defer c.lock.Unlock()
if c.closed {
return
}
c.closed = true
c.csdk.Close()
}
Expand Down Expand Up @@ -340,7 +338,14 @@ func (c *Connection) Call(result interface{}, method string, args ...interface{}
// can also pass nil, in which case the result is ignored.
func (c *Connection) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
//logrus.Infof("CallContext method:%s\n", method)
op := &requestOp{respChanData: &csdk.CallbackChan{Data: make(chan csdk.Response, 1)}}
op := &requestOp{respChanData: &csdk.CallbackChan{Data: nil}}

switch method {
case "asyncSendTransaction", "SendEncodedTransaction":
default:
op.respChanData.Data = make(chan csdk.Response, 1)
}

switch method {
case "call":
arg := args[0].(map[string]interface{})
Expand Down Expand Up @@ -404,67 +409,59 @@ func (c *Connection) CallContext(ctx context.Context, result interface{}, method
case "getPendingTxSize":
c.csdk.GetPendingTxSize(op.respChanData)
case "asyncSendTransaction":
fallthrough
data := hexutil.Encode(args[0].([]byte))
contractAddress := args[1].(string)
op.respChanData.Handler = wrapTransactionResponsesHandler(args[2].(func(*types.Receipt, error)))
var abiStr string
if len(args) >= 4 && len(contractAddress) == 0 {
abiStr = args[3].(string)
}
_, err := c.csdk.CreateAndSendTransaction(op.respChanData, contractAddress, data, abiStr, "", false)
if err != nil {
return err
}
case "sendTransaction":
fallthrough
data := hexutil.Encode(args[0].([]byte))
contractAddress := args[1].(string)
var abiStr string
if len(args) >= 3 && len(contractAddress) == 0 {
abiStr = args[2].(string)
}
_, err := c.csdk.CreateAndSendTransaction(op.respChanData, contractAddress, data, abiStr, "", false)
if err != nil {
return err
}
case "SendEncodedTransaction":
var handler func(*types.Receipt, error)
if method == "sendTransaction" {
data := hexutil.Encode(args[0].([]byte))
contractAddress := args[1].(string)
var abiStr string
if len(args) >= 3 && len(contractAddress) == 0 {
abiStr = args[2].(string)
}
_, err := c.csdk.CreateAndSendTransaction(op.respChanData, contractAddress, data, abiStr, "", false)
if err != nil {
return err
}
} else if method == "asyncSendTransaction" {
data := hexutil.Encode(args[0].([]byte))
contractAddress := args[1].(string)
handler = args[2].(func(*types.Receipt, error))
var abiStr string
if len(args) >= 4 && len(contractAddress) == 0 {
abiStr = args[3].(string)
}
_, err := c.csdk.CreateAndSendTransaction(op.respChanData, contractAddress, data, abiStr, "", false)
if err != nil {
return err
}
} else { // SendEncodedTransaction
encodedTransaction := args[0].([]byte)
withProof := args[1].(bool)
if len(args) >= 3 {
handler = args[2].(func(*types.Receipt, error))
}
err := c.csdk.SendEncodedTransaction(op.respChanData, encodedTransaction, withProof)
if err != nil {
return err
}
encodedTransaction := args[0].([]byte)
withProof := args[1].(bool)
if len(args) >= 3 {
op.respChanData.Handler = wrapTransactionResponsesHandler(args[2].(func(*types.Receipt, error)))
} else {
op.respChanData.Data = make(chan csdk.Response, 1)
}
// async send transaction
if handler != nil {
op.handler = handler
pointer := fmt.Sprintf("%p", op.respChanData)
c.transactionHandlers.Set(pointer, op, defaultTransactionTimeout)
return nil
err := c.csdk.SendEncodedTransaction(op.respChanData, encodedTransaction, withProof)
if err != nil {
return err
}
default:
return ErrNoRpcMethod
}

// dispatch has accepted the request and will close the channel when it quits.
switch resp, _, err := op.waitRpcMessage(); {
case err != nil:
return err
case resp.Error != nil:
return resp.Error
case len(resp.Result) == 0:
logrus.Errorf("result is null, %+v, err:%+v \n", resp, err)
return ErrNoResult
default:
return json.Unmarshal(resp.Result, &result)
// async send transaction
if op.respChanData.Handler != nil {
return nil
} else {
// dispatch has accepted the request and will close the channel when it quits.
switch resp, _, err := op.waitRpcMessage(); {
case err != nil:
return err
case resp.Error != nil:
return resp.Error
case len(resp.Result) == 0:
logrus.Errorf("result is null, %+v, err:%+v \n", resp, err)
return ErrNoResult
default:
return json.Unmarshal(resp.Result, &result)
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions v3/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ go 1.21.5
// replace github.com/FISCO-BCOS/bcos-c-sdk => ../../bcos-c-sdk

require (
github.com/FISCO-BCOS/bcos-c-sdk v0.0.0-20240219081048-53240138c396
github.com/FISCO-BCOS/bcos-c-sdk v0.0.0-20240305024607-895d57002774
github.com/FISCO-BCOS/crypto v0.0.0-20200202032121-bd8ab0b5d4f1
github.com/TarsCloud/TarsGo v1.4.5
github.com/deckarep/golang-set/v2 v2.6.0
github.com/ethereum/go-ethereum v1.13.10
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/schollz/progressbar/v3 v3.14.1
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.5.0
Expand Down Expand Up @@ -81,6 +80,7 @@ require (
github.com/mmcloughlin/addchain v0.4.0 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/opentracing/opentracing-go v1.1.0 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.12.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions v3/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ github.com/CloudyKit/fastprinter v0.0.0-20170127035650-74b38d55f37a/go.mod h1:EF
github.com/CloudyKit/jet v2.1.3-0.20180809161101-62edd43e4f88+incompatible/go.mod h1:HPYO+50pSWkPoj9Q/eq0aRGByCL6ScRlUmiEX5Zgm+w=
github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=
github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/FISCO-BCOS/bcos-c-sdk v0.0.0-20240219081048-53240138c396 h1:VYwt+oviUo74Hb43ml9jEXC5DLJidNFrOp7lG6t8UTU=
github.com/FISCO-BCOS/bcos-c-sdk v0.0.0-20240219081048-53240138c396/go.mod h1:n2KxbYa73MW3xdLVu2vpPpoblZMms+CwPmvFkubO9xM=
github.com/FISCO-BCOS/bcos-c-sdk v0.0.0-20240305024607-895d57002774 h1:CgNdyFyNAHeg+3jxCQU+y5CtHawLvj5p1hCLmOH4ffQ=
github.com/FISCO-BCOS/bcos-c-sdk v0.0.0-20240305024607-895d57002774/go.mod h1:n2KxbYa73MW3xdLVu2vpPpoblZMms+CwPmvFkubO9xM=
github.com/FISCO-BCOS/crypto v0.0.0-20200202032121-bd8ab0b5d4f1 h1:ThPht4qK10+cMZC5COIjHPq0INm5HAMVYqrez5zEgFI=
github.com/FISCO-BCOS/crypto v0.0.0-20200202032121-bd8ab0b5d4f1/go.mod h1:UrLdwsFrjiaCsvdcPLcH6B7s/FUmym3qfM93u2ziR+4=
github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY=
Expand Down

0 comments on commit 6a6596c

Please sign in to comment.