Skip to content

Commit

Permalink
Merge branch 'hotfix_submitter_hang'
Browse files Browse the repository at this point in the history
* hotfix_submitter_hang:
  choose simple strategy, drop current claim if a tx is not verified in time
  setting version to 0.2.1
  working rebroadcast tx
  add timeout to tx watcher
  • Loading branch information
tranvictor committed Apr 3, 2017
2 parents bc8a75f + a60f58d commit b607b9c
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 10 deletions.
2 changes: 1 addition & 1 deletion compile.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ mustrun build/env.sh go get -v github.com/ethereum/go-ethereum
mustrun build/env.sh go get -v golang.org/x/crypto/ssh/terminal
mustrun build/env.sh go get -v gopkg.in/urfave/cli.v1
echo "Compiling SmartPool client..."
mustrun build/env.sh go build -o smartpool cmd/ropsten/ropsten.go
mustrun build/env.sh go build -ldflags -s -o smartpool cmd/ropsten/ropsten.go
echo "Done. You can run SmartPool by ./smartpool --help"
24 changes: 20 additions & 4 deletions ethereum/geth/geth_contract_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,13 @@ func (cc *GethContractClient) Register(paymentAddress common.Address) error {
return err
}
smartpool.Output.Printf("Registering address %s to SmartPool contract by tx: %s\n", paymentAddress.Hex(), tx.Hash().Hex())
errCode, errInfo := NewTxWatcher(
errCode, errInfo, err := NewTxWatcher(
tx, cc.node, blockNo, RegisterEventTopic,
cc.sender.Big()).Wait()
if err != nil {
smartpool.Output.Printf("Tx: %s was not approved by the network in time.\n", tx.Hash().Hex())
return err
}
if errCode.Cmp(common.Big0) != 0 {
smartpool.Output.Printf("Error code: 0x%s - Error info: 0x%s\n", errCode.Text(16), errInfo.Text(16))
return errors.New(ErrorMsg(errCode, errInfo))
Expand Down Expand Up @@ -94,9 +98,13 @@ func (cc *GethContractClient) SubmitClaim(
smartpool.Output.Printf("Submitting claim failed. Error: %s\n", err)
return err
}
errCode, errInfo := NewTxWatcher(
errCode, errInfo, err := NewTxWatcher(
tx, cc.node, blockNo, SubmitClaimEventTopic,
cc.sender.Big()).Wait()
if err != nil {
smartpool.Output.Printf("Tx: %s was not approved by the network in time.\n", tx.Hash().Hex())
return err
}
if errCode.Cmp(common.Big0) != 0 {
smartpool.Output.Printf("Error code: 0x%s - Error info: 0x%s\n", errCode.Text(16), errInfo.Text(16))
return errors.New(ErrorMsg(errCode, errInfo))
Expand Down Expand Up @@ -124,9 +132,13 @@ func (cc *GethContractClient) VerifyClaim(
smartpool.Output.Printf("Verifying claim failed. Error: %s\n", err)
return err
}
errCode, errInfo := NewTxWatcher(
errCode, errInfo, err := NewTxWatcher(
tx, cc.node, blockNo, VerifyClaimEventTopic,
cc.sender.Big()).Wait()
if err != nil {
smartpool.Output.Printf("Tx: %s was not approved by the network in time.\n", tx.Hash().Hex())
return err
}
if errCode.Cmp(common.Big0) != 0 {
smartpool.Output.Printf("Error code: 0x%s - Error info: 0x%s\n", errCode.Text(16), errInfo.Text(16))
return errors.New(ErrorMsg(errCode, errInfo))
Expand All @@ -146,9 +158,13 @@ func (cc *GethContractClient) SetEpochData(merkleRoot []*big.Int, fullSizeIn128R
smartpool.Output.Printf("Setting epoch data. Error: %s\n", err)
return err
}
errCode, errInfo := NewTxWatcher(
errCode, errInfo, err := NewTxWatcher(
tx, cc.node, blockNo, SetEpochDataEventTopic,
cc.sender.Big()).Wait()
if err != nil {
smartpool.Output.Printf("Tx: %s was not approved by the network in time.\n", tx.Hash().Hex())
return err
}
if errCode.Cmp(common.Big0) != 0 {
smartpool.Output.Printf("Error code: 0x%s - Error info: 0x%s\n", errCode.Text(16), errInfo.Text(16))
return errors.New(ErrorMsg(errCode, errInfo))
Expand Down
7 changes: 7 additions & 0 deletions ethereum/geth/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,13 @@ func (g *GethRPC) SetExtradata(extradata string) error {
return err
}

func (g *GethRPC) Broadcast(data []byte) (common.Hash, error) {
hash := common.Hash{}
err := g.client.Call(&hash, "eth_sendRawTransaction",
fmt.Sprintf("0x%s", common.Bytes2Hex(data)))
return hash, err
}

func NewGethRPC(endpoint, contractAddr, extraData string, diff *big.Int) (*GethRPC, error) {
client, err := rpc.DialHTTP(endpoint)
if err != nil {
Expand Down
44 changes: 41 additions & 3 deletions ethereum/geth/txwatcher.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package geth

import (
"bytes"
"errors"
"github.com/SmartPool/smartpool-client"
"github.com/SmartPool/smartpool-client/ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"math/big"
"time"
Expand Down Expand Up @@ -36,10 +40,44 @@ func (tw *TxWatcher) loop() {
}
}

func (tw *TxWatcher) Wait() (*big.Int, *big.Int) {
func (tw *TxWatcher) WaitAndRetry() (*big.Int, *big.Int, error) {
errCode, errInfo, err := tw.Wait()
if err != nil {
smartpool.Output.Printf("Rebroadcast tx: %s...\n", tw.tx.Hash().Hex())
buff := bytes.NewBuffer([]byte{})
err = tw.tx.EncodeRLP(buff)
if err != nil {
return nil, nil, err
}
hash, err := tw.node.Broadcast(buff.Bytes())
if err != nil {
smartpool.Output.Printf("Broadcast error: %s\n", err)
return nil, nil, err
}
if hash.Big().Cmp(common.Big0) == 0 {
return nil, nil, errors.New("Rebroadcast tx got 0 tx hash. This is not supposed to happend.")
}
return tw.Wait()
} else {
return errCode, errInfo, err
}
}

func (tw *TxWatcher) Wait() (*big.Int, *big.Int, error) {
timeout := make(chan bool, 1)
go tw.loop()
<-tw.verChan
return tw.node.GetLog(tw.block, tw.event, tw.sender)
go func() {
time.Sleep(10 * time.Minute)
timeout <- true
}()
select {
case <-tw.verChan:
break
case <-timeout:
return nil, nil, errors.New("timeout error")
}
errCode, errInfo := tw.node.GetLog(tw.block, tw.event, tw.sender)
return errCode, errInfo, nil
}

func NewTxWatcher(
Expand Down
1 change: 1 addition & 0 deletions ethereum/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ type RPCClient interface {
GetLog(from *big.Int, event *big.Int, sender *big.Int) (*big.Int, *big.Int)
SetEtherbase(etherbase common.Address) error
SetExtradata(extradata string) error
Broadcast(raw []byte) (common.Hash, error)
}
16 changes: 15 additions & 1 deletion protocol/smartpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,20 @@ func (sp *SmartPool) Submit() (bool, error) {
return true, nil
}

func (sp *SmartPool) shouldStop(err error) bool {
if err == nil {
return false
}
if err.Error() == "timeout error" {
smartpool.Output.Printf("The tx might not be verified. Current claim is dropped. Continue with next claim.\n")
return false
} else if sp.HotStop {
return true
} else {
return false
}
}

func (sp *SmartPool) actOnTick() {
defer func() {
if r := recover(); r != nil {
Expand All @@ -183,7 +197,7 @@ func (sp *SmartPool) actOnTick() {
sp.PoolMonitor.ContractAddress().Hex(),
sp.PoolMonitor.ContractAddress().Hex())
}
if err != nil && sp.HotStop {
if sp.shouldStop(err) {
smartpool.Output.Printf("SmartPool stopped. If you want SmartPool to keep running, please use \"--no-hot-stop\" to disable Hot Stop mode.\n")
break
}
Expand Down
2 changes: 1 addition & 1 deletion version.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package smartpool

const VERSION = "0.2.0"
const VERSION = "0.2.1"

0 comments on commit b607b9c

Please sign in to comment.