Commit abf5967b authored by inphi's avatar inphi

use ethclient; fixes

parent daf8db0b
This diff is collapsed.
package proxyd package proxyd
import ( import (
"bytes" "context"
"encoding/json"
"fmt"
"net/http"
"strconv"
"sync" "sync"
"time" "time"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -17,22 +13,29 @@ const blockHeadSyncPeriod = 1 * time.Second ...@@ -17,22 +13,29 @@ const blockHeadSyncPeriod = 1 * time.Second
type LatestBlockHead struct { type LatestBlockHead struct {
url string url string
client *http.Client client *ethclient.Client
quit chan struct{} quit chan struct{}
done chan struct{}
mutex sync.RWMutex mutex sync.RWMutex
blockNum uint64 blockNum uint64
} }
func newLatestBlockHead(url string) *LatestBlockHead { func newLatestBlockHead(url string) (*LatestBlockHead, error) {
client, err := ethclient.Dial(url)
if err != nil {
return nil, err
}
return &LatestBlockHead{ return &LatestBlockHead{
url: url, url: url,
client: &http.Client{Timeout: 5 * time.Second}, client: client,
quit: make(chan struct{}), quit: make(chan struct{}),
} done: make(chan struct{}),
}, nil
} }
func (h *LatestBlockHead) Start() error { func (h *LatestBlockHead) Start() {
go func() { go func() {
ticker := time.NewTicker(blockHeadSyncPeriod) ticker := time.NewTicker(blockHeadSyncPeriod)
defer ticker.Stop() defer ticker.Stop()
...@@ -43,6 +46,7 @@ func (h *LatestBlockHead) Start() error { ...@@ -43,6 +46,7 @@ func (h *LatestBlockHead) Start() error {
blockNum, err := h.getBlockNum() blockNum, err := h.getBlockNum()
if err != nil { if err != nil {
log.Error("error retrieving latest block number", "error", err) log.Error("error retrieving latest block number", "error", err)
continue
} }
log.Trace("polling block number", "blockNum", blockNum) log.Trace("polling block number", "blockNum", blockNum)
h.mutex.Lock() h.mutex.Lock()
...@@ -50,57 +54,25 @@ func (h *LatestBlockHead) Start() error { ...@@ -50,57 +54,25 @@ func (h *LatestBlockHead) Start() error {
h.mutex.Unlock() h.mutex.Unlock()
case <-h.quit: case <-h.quit:
close(h.done)
return return
} }
} }
}() }()
return nil
} }
func (h *LatestBlockHead) getBlockNum() (uint64, error) { func (h *LatestBlockHead) getBlockNum() (uint64, error) {
rpcReq := RPCReq{
JSONRPC: "2.0",
Method: "eth_blockNumber",
ID: []byte(strconv.Itoa(1)),
}
body := mustMarshalJSON(&rpcReq)
const maxRetries = 5 const maxRetries = 5
var httpErr error var httpErr error
for i := 0; i <= maxRetries; i++ { for i := 0; i <= maxRetries; i++ {
httpReq, err := http.NewRequest("POST", h.url, bytes.NewReader(body)) blockNum, err := h.client.BlockNumber(context.Background())
if err != nil { if err != nil {
return 0, err
}
httpReq.Header.Set("Content-Type", "application/json")
httpRes, httpErr := h.client.Do(httpReq)
if httpErr != nil {
backoff := calcBackoff(i) backoff := calcBackoff(i)
log.Warn("http operation failed. retrying...", "error", err, "backoff", backoff) log.Warn("http operation failed. retrying...", "error", err, "backoff", backoff)
time.Sleep(backoff) time.Sleep(backoff)
continue continue
} }
if httpRes.StatusCode != 200 {
return 0, fmt.Errorf("resposne code %d", httpRes.StatusCode)
}
defer httpRes.Body.Close()
res := new(RPCRes)
if err := json.NewDecoder(httpRes.Body).Decode(res); err != nil {
return 0, err
}
blockNumHex, ok := res.Result.(string)
if !ok {
return 0, fmt.Errorf("invalid eth_blockNumber result")
}
blockNum, err := hexutil.DecodeUint64(blockNumHex)
if err != nil {
return 0, err
}
return blockNum, nil return blockNum, nil
} }
...@@ -109,6 +81,8 @@ func (h *LatestBlockHead) getBlockNum() (uint64, error) { ...@@ -109,6 +81,8 @@ func (h *LatestBlockHead) getBlockNum() (uint64, error) {
func (h *LatestBlockHead) Stop() { func (h *LatestBlockHead) Stop() {
close(h.quit) close(h.quit)
<-h.done
h.client.Close()
} }
func (h *LatestBlockHead) GetBlockNum() uint64 { func (h *LatestBlockHead) GetBlockNum() uint64 {
......
...@@ -171,10 +171,11 @@ func Start(config *Config) error { ...@@ -171,10 +171,11 @@ func Start(config *Config) error {
if config.Cache.BlockSyncRPCURL == "" { if config.Cache.BlockSyncRPCURL == "" {
return fmt.Errorf("block sync node required for caching") return fmt.Errorf("block sync node required for caching")
} }
latestHead := newLatestBlockHead(config.Cache.BlockSyncRPCURL) latestHead, err := newLatestBlockHead(config.Cache.BlockSyncRPCURL)
if err := latestHead.Start(); err != nil { if err != nil {
return err return err
} }
latestHead.Start()
defer latestHead.Stop() defer latestHead.Stop()
getLatestBlockNumFn = func(ctx context.Context) (uint64, error) { getLatestBlockNumFn = func(ctx context.Context) (uint64, error) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment