Commit 1df934a1 authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

indexer: Don't spam the backend (#2441)

parent e12e8976
---
'@eth-optimism/indexer': patch
---
Don't spam the backend
...@@ -9,6 +9,8 @@ import ( ...@@ -9,6 +9,8 @@ import (
"strconv" "strconv"
"time" "time"
l2rpc "github.com/ethereum-optimism/optimism/l2geth/rpc"
"github.com/ethereum-optimism/optimism/go/indexer/metrics" "github.com/ethereum-optimism/optimism/go/indexer/metrics"
"github.com/ethereum-optimism/optimism/go/indexer/server" "github.com/ethereum-optimism/optimism/go/indexer/server"
"github.com/rs/cors" "github.com/rs/cors"
...@@ -128,7 +130,7 @@ func NewIndexer(cfg Config, gitVersion string) (*Indexer, error) { ...@@ -128,7 +130,7 @@ func NewIndexer(cfg Config, gitVersion string) (*Indexer, error) {
return nil, err return nil, err
} }
l2Client, err := dialL2EthClientWithTimeout(ctx, cfg.L2EthRpc) l2Client, l2RPC, err := dialL2EthClientWithTimeout(ctx, cfg.L2EthRpc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -180,6 +182,7 @@ func NewIndexer(cfg Config, gitVersion string) (*Indexer, error) { ...@@ -180,6 +182,7 @@ func NewIndexer(cfg Config, gitVersion string) (*Indexer, error) {
l2IndexingService, err := l2.NewService(l2.ServiceConfig{ l2IndexingService, err := l2.NewService(l2.ServiceConfig{
Context: ctx, Context: ctx,
Metrics: m, Metrics: m,
L2RPC: l2RPC,
L2Client: l2Client, L2Client: l2Client,
DB: db, DB: db,
ConfDepth: cfg.ConfDepth, ConfDepth: cfg.ConfDepth,
...@@ -277,12 +280,17 @@ func dialL1EthClientWithTimeout(ctx context.Context, url string) ( ...@@ -277,12 +280,17 @@ func dialL1EthClientWithTimeout(ctx context.Context, url string) (
// provided URL. If the dial doesn't complete within defaultDialTimeout seconds, // provided URL. If the dial doesn't complete within defaultDialTimeout seconds,
// this method will return an error. // this method will return an error.
func dialL2EthClientWithTimeout(ctx context.Context, url string) ( func dialL2EthClientWithTimeout(ctx context.Context, url string) (
*l2ethclient.Client, error) { *l2ethclient.Client, *l2rpc.Client, error) {
ctxt, cancel := context.WithTimeout(ctx, defaultDialTimeout) ctxt, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel() defer cancel()
return l2ethclient.DialContext(ctxt, url) rpc, err := l2rpc.DialContext(ctxt, url)
if err != nil {
return nil, nil, err
}
return l2ethclient.NewClient(rpc), rpc, nil
} }
// traceRateToFloat64 converts a time.Duration into a valid float64 for the // traceRateToFloat64 converts a time.Duration into a valid float64 for the
......
...@@ -5,10 +5,10 @@ import ( ...@@ -5,10 +5,10 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"math/big" "math/big"
"sync"
"time" "time"
"github.com/ethereum/go-ethereum" "github.com/ethereum-optimism/optimism/go/indexer/services/util"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
...@@ -17,9 +17,9 @@ import ( ...@@ -17,9 +17,9 @@ import (
) )
const ( const (
DefaultConnectionTimeout = 20 * time.Second DefaultConnectionTimeout = 30 * time.Second
DefaultConfDepth uint64 = 20 DefaultConfDepth uint64 = 20
DefaultMaxBatchSize uint64 = 100 DefaultMaxBatchSize = 100
) )
type NewHeader struct { type NewHeader struct {
...@@ -128,24 +128,34 @@ type ConfirmedHeaderSelector struct { ...@@ -128,24 +128,34 @@ type ConfirmedHeaderSelector struct {
cfg HeaderSelectorConfig cfg HeaderSelectorConfig
} }
func toBlockNumArg(number *big.Int) string { func HeadersByRange(ctx context.Context, client *rpc.Client, startHeight uint64, count int) ([]*NewHeader, error) {
if number == nil { height := startHeight
return "latest" batchElems := make([]rpc.BatchElem, count)
for i := 0; i < count; i++ {
batchElems[i] = rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []interface{}{
util.ToBlockNumArg(new(big.Int).SetUint64(height + uint64(i))),
false,
},
Result: new(NewHeader),
Error: nil,
}
} }
pending := big.NewInt(-1)
if number.Cmp(pending) == 0 { if err := client.BatchCallContext(ctx, batchElems); err != nil {
return "pending" return nil, err
} }
return hexutil.EncodeBig(number)
}
func HeaderByNumber(ctx context.Context, client *rpc.Client, height *big.Int) (*NewHeader, error) { out := make([]*NewHeader, count)
var head *NewHeader for i := 0; i < len(batchElems); i++ {
err := client.CallContext(ctx, &head, "eth_getBlockByNumber", toBlockNumArg(height), false) if batchElems[i].Error != nil {
if err == nil && head == nil { return nil, batchElems[i].Error
err = ethereum.NotFound }
out[i] = batchElems[i].Result.(*NewHeader)
} }
return head, err
return out, nil
} }
func (f *ConfirmedHeaderSelector) NewHead( func (f *ConfirmedHeaderSelector) NewHead(
...@@ -153,7 +163,7 @@ func (f *ConfirmedHeaderSelector) NewHead( ...@@ -153,7 +163,7 @@ func (f *ConfirmedHeaderSelector) NewHead(
lowest uint64, lowest uint64,
header *types.Header, header *types.Header,
client *rpc.Client, client *rpc.Client,
) []*NewHeader { ) ([]*NewHeader, error) {
number := header.Number.Uint64() number := header.Number.Uint64()
blockHash := header.Hash blockHash := header.Hash
...@@ -161,14 +171,14 @@ func (f *ConfirmedHeaderSelector) NewHead( ...@@ -161,14 +171,14 @@ func (f *ConfirmedHeaderSelector) NewHead(
logger.Info("New block", "block", number, "hash", blockHash) logger.Info("New block", "block", number, "hash", blockHash)
if number < f.cfg.ConfDepth { if number < f.cfg.ConfDepth {
return nil return nil, nil
} }
endHeight := number - f.cfg.ConfDepth + 1 endHeight := number - f.cfg.ConfDepth + 1
minNextHeight := lowest + f.cfg.ConfDepth minNextHeight := lowest + f.cfg.ConfDepth
if minNextHeight > number { if minNextHeight > number {
log.Info("Fork block ", "block", number, "hash", blockHash) log.Info("Fork block ", "block", number, "hash", blockHash)
return nil return nil, nil
} }
startHeight := lowest + 1 startHeight := lowest + 1
...@@ -177,34 +187,35 @@ func (f *ConfirmedHeaderSelector) NewHead( ...@@ -177,34 +187,35 @@ func (f *ConfirmedHeaderSelector) NewHead(
endHeight = startHeight + f.cfg.MaxBatchSize - 1 endHeight = startHeight + f.cfg.MaxBatchSize - 1
} }
nHeaders := endHeight - startHeight + 1 nHeaders := int(endHeight - startHeight + 1)
if nHeaders > 1 { if nHeaders > 1 {
logger.Info("Loading block batch ", logger.Info("Loading blocks",
"startHeight", startHeight, "endHeight", endHeight) "startHeight", startHeight, "endHeight", endHeight)
} }
headers := make([]*NewHeader, nHeaders) headers := make([]*NewHeader, 0)
var wg sync.WaitGroup height := startHeight
for i := uint64(0); i < nHeaders; i++ { left := nHeaders - len(headers)
wg.Add(1) for left > 0 {
go func(ii uint64) { count := DefaultMaxBatchSize
defer wg.Done() if count > left {
count = left
}
ctxt, cancel := context.WithTimeout(ctx, DefaultConnectionTimeout) logger.Info("Loading block batch",
defer cancel() "height", height, "count", count)
height := startHeight + ii ctxt, cancel := context.WithTimeout(ctx, DefaultConnectionTimeout)
bigHeight := new(big.Int).SetUint64(height) fetched, err := HeadersByRange(ctxt, client, height, count)
header, err := HeaderByNumber(ctxt, client, bigHeight) cancel()
if err != nil { if err != nil {
log.Error("Unable to load block ", "block", height, "err", err) return nil, err
return }
}
headers[ii] = header headers = append(headers, fetched...)
}(i) left = nHeaders - len(headers)
height += uint64(count)
} }
wg.Wait()
logger.Debug("Verifying block range ", logger.Debug("Verifying block range ",
"startHeight", startHeight, "endHeight", endHeight) "startHeight", startHeight, "endHeight", endHeight)
...@@ -233,7 +244,7 @@ func (f *ConfirmedHeaderSelector) NewHead( ...@@ -233,7 +244,7 @@ func (f *ConfirmedHeaderSelector) NewHead(
"block", header.Number.Uint64(), "hash", header.Hash) "block", header.Number.Uint64(), "hash", header.Hash)
} }
return headers return headers, nil
} }
func NewConfirmedHeaderSelector(cfg HeaderSelectorConfig) (*ConfirmedHeaderSelector, func NewConfirmedHeaderSelector(cfg HeaderSelectorConfig) (*ConfirmedHeaderSelector,
......
...@@ -215,7 +215,10 @@ func (s *Service) Update(newHeader *types.Header) error { ...@@ -215,7 +215,10 @@ func (s *Service) Update(newHeader *types.Header) error {
lowest = *highestConfirmed lowest = *highestConfirmed
} }
headers := s.headerSelector.NewHead(s.ctx, lowest.Number, newHeader, s.cfg.RawL1Client) headers, err := s.headerSelector.NewHead(s.ctx, lowest.Number, newHeader, s.cfg.RawL1Client)
if err != nil {
return err
}
if len(headers) == 0 { if len(headers) == 0 {
return errNoNewBlocks return errNoNewBlocks
} }
......
...@@ -4,18 +4,20 @@ import ( ...@@ -4,18 +4,20 @@ import (
"context" "context"
"errors" "errors"
"math/big" "math/big"
"sync"
"time" "time"
"github.com/ethereum-optimism/optimism/go/indexer/services/util"
"github.com/ethereum-optimism/optimism/l2geth/rpc"
"github.com/ethereum-optimism/optimism/l2geth/core/types" "github.com/ethereum-optimism/optimism/l2geth/core/types"
l2ethclient "github.com/ethereum-optimism/optimism/l2geth/ethclient"
"github.com/ethereum-optimism/optimism/l2geth/log" "github.com/ethereum-optimism/optimism/l2geth/log"
l2rpc "github.com/ethereum-optimism/optimism/l2geth/rpc"
) )
const ( const (
DefaultConnectionTimeout = 20 * time.Second DefaultConnectionTimeout = 20 * time.Second
DefaultConfDepth uint64 = 20 DefaultConfDepth uint64 = 20
DefaultMaxBatchSize uint64 = 100 DefaultMaxBatchSize = 50
) )
type HeaderSelectorConfig struct { type HeaderSelectorConfig struct {
...@@ -27,12 +29,42 @@ type ConfirmedHeaderSelector struct { ...@@ -27,12 +29,42 @@ type ConfirmedHeaderSelector struct {
cfg HeaderSelectorConfig cfg HeaderSelectorConfig
} }
func HeadersByRange(ctx context.Context, client *l2rpc.Client, startHeight uint64, count int) ([]*types.Header, error) {
height := startHeight
batchElems := make([]rpc.BatchElem, count)
for i := 0; i < count; i++ {
batchElems[i] = rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []interface{}{
util.ToBlockNumArg(new(big.Int).SetUint64(height + uint64(i))),
false,
},
Result: new(types.Header),
Error: nil,
}
}
if err := client.BatchCallContext(ctx, batchElems); err != nil {
return nil, err
}
out := make([]*types.Header, count)
for i := 0; i < len(batchElems); i++ {
if batchElems[i].Error != nil {
return nil, batchElems[i].Error
}
out[i] = batchElems[i].Result.(*types.Header)
}
return out, nil
}
func (f *ConfirmedHeaderSelector) NewHead( func (f *ConfirmedHeaderSelector) NewHead(
ctx context.Context, ctx context.Context,
lowest uint64, lowest uint64,
header *types.Header, header *types.Header,
client *l2ethclient.Client, client *l2rpc.Client,
) []*types.Header { ) ([]*types.Header, error) {
number := header.Number.Uint64() number := header.Number.Uint64()
blockHash := header.Hash() blockHash := header.Hash()
...@@ -40,14 +72,14 @@ func (f *ConfirmedHeaderSelector) NewHead( ...@@ -40,14 +72,14 @@ func (f *ConfirmedHeaderSelector) NewHead(
logger.Info("New block", "block", number, "hash", blockHash) logger.Info("New block", "block", number, "hash", blockHash)
if number < f.cfg.ConfDepth { if number < f.cfg.ConfDepth {
return nil return nil, nil
} }
endHeight := number - f.cfg.ConfDepth + 1 endHeight := number - f.cfg.ConfDepth + 1
minNextHeight := lowest + f.cfg.ConfDepth minNextHeight := lowest + f.cfg.ConfDepth
if minNextHeight > number { if minNextHeight > number {
log.Info("Fork block=%d hash=%s", number, blockHash) log.Info("Fork block=%d hash=%s", number, blockHash)
return nil return nil, nil
} }
startHeight := lowest + 1 startHeight := lowest + 1
...@@ -56,34 +88,35 @@ func (f *ConfirmedHeaderSelector) NewHead( ...@@ -56,34 +88,35 @@ func (f *ConfirmedHeaderSelector) NewHead(
endHeight = startHeight + f.cfg.MaxBatchSize - 1 endHeight = startHeight + f.cfg.MaxBatchSize - 1
} }
nHeaders := endHeight - startHeight + 1 nHeaders := int(endHeight - startHeight + 1)
if nHeaders > 1 { if nHeaders > 1 {
logger.Info("Loading block batch ", logger.Info("Loading blocks",
"startHeight", startHeight, "endHeight", endHeight) "startHeight", startHeight, "endHeight", endHeight)
} }
headers := make([]*types.Header, nHeaders) headers := make([]*types.Header, 0)
var wg sync.WaitGroup height := startHeight
for i := uint64(0); i < nHeaders; i++ { left := nHeaders - len(headers)
wg.Add(1) for left > 0 {
go func(ii uint64) { count := DefaultMaxBatchSize
defer wg.Done() if count > left {
count = left
ctxt, cancel := context.WithTimeout(ctx, DefaultConnectionTimeout) }
defer cancel()
logger.Info("Loading block batch",
height := startHeight + ii "height", height, "count", count)
bigHeight := new(big.Int).SetUint64(height)
header, err := client.HeaderByNumber(ctxt, bigHeight) ctxt, cancel := context.WithTimeout(ctx, DefaultConnectionTimeout)
if err != nil { fetched, err := HeadersByRange(ctxt, client, height, count)
log.Error("Unable to load block ", "block", height, "err", err) cancel()
return if err != nil {
} return nil, err
}
headers[ii] = header headers = append(headers, fetched...)
}(i) left = nHeaders - len(headers)
height += uint64(count)
} }
wg.Wait()
logger.Debug("Verifying block range ", logger.Debug("Verifying block range ",
"startHeight", startHeight, "endHeight", endHeight) "startHeight", startHeight, "endHeight", endHeight)
...@@ -112,7 +145,7 @@ func (f *ConfirmedHeaderSelector) NewHead( ...@@ -112,7 +145,7 @@ func (f *ConfirmedHeaderSelector) NewHead(
"block", header.Number.Uint64(), "hash", header.Hash()) "block", header.Number.Uint64(), "hash", header.Hash())
} }
return headers return headers, nil
} }
func NewConfirmedHeaderSelector(cfg HeaderSelectorConfig) (*ConfirmedHeaderSelector, func NewConfirmedHeaderSelector(cfg HeaderSelectorConfig) (*ConfirmedHeaderSelector,
......
...@@ -10,6 +10,8 @@ import ( ...@@ -10,6 +10,8 @@ import (
"sync" "sync"
"time" "time"
l2rpc "github.com/ethereum-optimism/optimism/l2geth/rpc"
"github.com/ethereum-optimism/optimism/go/indexer/metrics" "github.com/ethereum-optimism/optimism/go/indexer/metrics"
"github.com/ethereum-optimism/optimism/go/indexer/server" "github.com/ethereum-optimism/optimism/go/indexer/server"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
...@@ -58,6 +60,7 @@ func HeaderByNumberWithRetry(ctx context.Context, ...@@ -58,6 +60,7 @@ func HeaderByNumberWithRetry(ctx context.Context,
type ServiceConfig struct { type ServiceConfig struct {
Context context.Context Context context.Context
Metrics *metrics.Metrics Metrics *metrics.Metrics
L2RPC *l2rpc.Client
L2Client *l2ethclient.Client L2Client *l2ethclient.Client
ChainID *big.Int ChainID *big.Int
ConfDepth uint64 ConfDepth uint64
...@@ -190,7 +193,10 @@ func (s *Service) Update(newHeader *types.Header) error { ...@@ -190,7 +193,10 @@ func (s *Service) Update(newHeader *types.Header) error {
lowest = *highestConfirmed lowest = *highestConfirmed
} }
headers := s.headerSelector.NewHead(s.ctx, lowest.Number, newHeader, s.cfg.L2Client) headers, err := s.headerSelector.NewHead(s.ctx, lowest.Number, newHeader, s.cfg.L2RPC)
if err != nil {
return err
}
if len(headers) == 0 { if len(headers) == 0 {
return errNoNewBlocks return errNoNewBlocks
} }
......
package util
import (
"math/big"
"github.com/ethereum/go-ethereum/common/hexutil"
)
func ToBlockNumArg(number *big.Int) string {
if number == nil {
return "latest"
}
pending := big.NewInt(-1)
if number.Cmp(pending) == 0 {
return "pending"
}
return hexutil.EncodeBig(number)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment