Commit 1fedc1ff authored by Hamdi Allam's avatar Hamdi Allam

remove buffered_header_traversal. processor can just keep reference to the...

remove buffered_header_traversal. processor can just keep reference to the unprocessed headers in memory
parent 8ad6f38a
package node
import (
"errors"
"math/big"
"github.com/ethereum/go-ethereum/core/types"
)
var (
ErrBufferedHeaderTraversalInvalidAdvance = errors.New("invalid advancement based on the BufferedHeaderTraversal's internal state")
)
// BufferedHeaderTraversal is a wrapper over HeaderTraversal which buffers traversed headers and only
// expands the buffer on a requested size increase or advancements are made which reduces the buffer
type BufferedHeaderTraversal struct {
*HeaderTraversal
bufferedHeaders []*types.Header
}
// NewBufferedHeaderTraversal creates a new instance of BufferedHeaderTraversal
func NewBufferedHeaderTraversal(ethClient EthClient, fromHeader *types.Header) *BufferedHeaderTraversal {
return &BufferedHeaderTraversal{HeaderTraversal: NewHeaderTraversal(ethClient, fromHeader)}
}
// NextFinalizedHeaders returns the buffered set of headers bounded by the supplied size
func (bf *BufferedHeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]*types.Header, error) {
numBuffered := uint64(len(bf.bufferedHeaders))
if maxSize <= numBuffered {
return bf.bufferedHeaders[:maxSize], nil
}
headers, err := bf.HeaderTraversal.NextFinalizedHeaders(maxSize - uint64(numBuffered))
if err != nil {
if numBuffered == 0 {
return nil, err
}
// swallow the error and return existing buffered headers since we have some
return bf.bufferedHeaders, nil
}
// No need to check the integrity of this new batch since the underlying HeaderTraversal ensures this
bf.bufferedHeaders = append(bf.bufferedHeaders, headers...)
return bf.bufferedHeaders, nil
}
// Advance reduces the internal buffer by marking the supplied header as the new base for the buffer
func (bf *BufferedHeaderTraversal) Advance(header *types.Header) error {
numBuffered := uint64(len(bf.bufferedHeaders))
if numBuffered == 0 {
return ErrBufferedHeaderTraversalInvalidAdvance
}
firstBuffered := bf.bufferedHeaders[0]
if firstBuffered.Number.Cmp(header.Number) > 0 {
return ErrBufferedHeaderTraversalInvalidAdvance
}
step := new(big.Int).Sub(header.Number, firstBuffered.Number).Uint64()
if step > numBuffered-1 || header.Hash() != bf.bufferedHeaders[step].Hash() {
// too large a step or the supplied header does not match the buffered header
return ErrBufferedHeaderTraversalInvalidAdvance
}
if step < numBuffered-1 {
// partial advancement
bf.bufferedHeaders = bf.bufferedHeaders[step+1:]
} else {
// throw away the entire buffer
bf.bufferedHeaders = nil
}
return nil
}
package node
import (
"errors"
"math/big"
"testing"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
func TestBufferedHeaderTraversalNextFinalizedHeaders(t *testing.T) {
client := new(MockEthClient)
bufferedHeaderTraversal := NewBufferedHeaderTraversal(client, nil)
// buffer 10 blocks
headers := makeHeaders(10, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(10), nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(9))).Return(headers, nil)
headers, err := bufferedHeaderTraversal.NextFinalizedHeaders(10)
assert.NoError(t, err)
assert.Len(t, headers, 10)
// next call returns the same headers
sameHeaders, err := bufferedHeaderTraversal.NextFinalizedHeaders(10)
assert.NoError(t, err)
assert.ElementsMatch(t, sameHeaders, headers)
// subset reuses the same internal buffer
subsetHeaders, err := bufferedHeaderTraversal.NextFinalizedHeaders(5)
assert.NoError(t, err)
assert.ElementsMatch(t, subsetHeaders[:5], headers[:5])
}
func TestBufferedHeaderTraversalExpandingBuffer(t *testing.T) {
client := new(MockEthClient)
bufferedHeaderTraversal := NewBufferedHeaderTraversal(client, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(100), nil)
headers := makeHeaders(20, nil)
// buffer 10 blocks
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(9))).Return(headers[:10], nil)
headerBatch, err := bufferedHeaderTraversal.NextFinalizedHeaders(10)
assert.NoError(t, err)
assert.Len(t, headerBatch, 10)
// expand buffer to 20 blocks
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(10)), mock.MatchedBy(bigIntMatcher(19))).Return(headers[10:], nil)
headerBatch, err = bufferedHeaderTraversal.NextFinalizedHeaders(20)
assert.NoError(t, err)
assert.Len(t, headerBatch, 20)
// covers the full list of headers
assert.ElementsMatch(t, headers, headerBatch)
}
func TestBufferedHeaderTraversalExpandingBufferFailures(t *testing.T) {
client := new(MockEthClient)
bufferedHeaderTraversal := NewBufferedHeaderTraversal(client, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(100), nil)
headers := makeHeaders(20, nil)
// buffer 10 blocks
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(9))).Return(headers[:10], nil)
headerBatch, err := bufferedHeaderTraversal.NextFinalizedHeaders(10)
assert.NoError(t, err)
assert.Len(t, headerBatch, 10)
// buffer expansion fails. Returns current buffer as is
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(10)), mock.MatchedBy(bigIntMatcher(19))).Return([]*types.Header{}, errors.New("boom"))
headerBatch, err = bufferedHeaderTraversal.NextFinalizedHeaders(20)
assert.NoError(t, err)
assert.Len(t, headerBatch, 10)
}
func TestBufferedHeaderTraversalAdvance(t *testing.T) {
client := new(MockEthClient)
bufferedHeaderTraversal := NewBufferedHeaderTraversal(client, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(100), nil)
headers := makeHeaders(20, nil)
// observe & buffer 10 blocks
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(9))).Return(headers[:10], nil)
headerBatch, err := bufferedHeaderTraversal.NextFinalizedHeaders(10)
assert.NoError(t, err)
assert.Len(t, headerBatch, 10)
assert.ElementsMatch(t, headers[:10], headerBatch)
// advance past the first 5
err = bufferedHeaderTraversal.Advance(headers[4])
assert.NoError(t, err)
// 5 remaining headers that are internally buffered
headerBatch, err = bufferedHeaderTraversal.NextFinalizedHeaders(5)
assert.NoError(t, err)
assert.Len(t, headerBatch, 5)
assert.ElementsMatch(t, headers[5:10], headerBatch)
// empty the buffer by advancing to the last (10th) buffer
err = bufferedHeaderTraversal.Advance(headers[9])
assert.NoError(t, err)
// Next 10 requires an expansion of an empty internal buffer
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(10)), mock.MatchedBy(bigIntMatcher(19))).Return(headers[10:], nil)
headerBatch, err = bufferedHeaderTraversal.NextFinalizedHeaders(10)
assert.NoError(t, err)
assert.Len(t, headerBatch, 10)
assert.ElementsMatch(t, headers[10:], headerBatch)
}
func TestBufferedHeaderTraversalInvalidAdvance(t *testing.T) {
client := new(MockEthClient)
bufferedHeaderTraversal := NewBufferedHeaderTraversal(client, nil)
client.On("FinalizedBlockHeight").Return(big.NewInt(100), nil)
headers := makeHeaders(20, nil)
// observe & buffer 10 blocks
client.On("BlockHeadersByRange", mock.MatchedBy(bigIntMatcher(0)), mock.MatchedBy(bigIntMatcher(9))).Return(headers[:10], nil)
headerBatch, err := bufferedHeaderTraversal.NextFinalizedHeaders(10)
assert.NoError(t, err)
assert.Len(t, headerBatch, 10)
assert.ElementsMatch(t, headers[:10], headerBatch)
// advance past the first 5
err = bufferedHeaderTraversal.Advance(headers[4])
assert.NoError(t, err)
// advance to the same header
err = bufferedHeaderTraversal.Advance(headers[4])
assert.Error(t, err)
assert.Equal(t, ErrBufferedHeaderTraversalInvalidAdvance, err)
// advance to a past header
err = bufferedHeaderTraversal.Advance(headers[0])
assert.Error(t, err)
assert.Equal(t, ErrBufferedHeaderTraversalInvalidAdvance, err)
// non-buffered header
err = bufferedHeaderTraversal.Advance(headers[10])
assert.Error(t, err)
assert.Equal(t, ErrBufferedHeaderTraversalInvalidAdvance, err)
// valid header number but a different block hash
fakeHeader := &types.Header{Number: big.NewInt(6)}
assert.Equal(t, headers[6].Number, fakeHeader.Number)
assert.NotEqual(t, headers[6].Hash(), fakeHeader.Hash())
err = bufferedHeaderTraversal.Advance(fakeHeader)
assert.Error(t, err)
assert.Equal(t, ErrBufferedHeaderTraversalInvalidAdvance, err)
}
...@@ -96,7 +96,7 @@ func NewL1Processor(ethClient node.EthClient, db *database.DB, l1Contracts L1Con ...@@ -96,7 +96,7 @@ func NewL1Processor(ethClient node.EthClient, db *database.DB, l1Contracts L1Con
l1Processor := &L1Processor{ l1Processor := &L1Processor{
processor: processor{ processor: processor{
headerTraversal: node.NewBufferedHeaderTraversal(ethClient, fromL1Header), headerTraversal: node.NewHeaderTraversal(ethClient, fromL1Header),
db: db, db: db,
processFn: l1ProcessFn(l1ProcessLog, ethClient, l1Contracts, checkpointAbi), processFn: l1ProcessFn(l1ProcessLog, ethClient, l1Contracts, checkpointAbi),
processLog: l1ProcessLog, processLog: l1ProcessLog,
...@@ -130,7 +130,7 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1 ...@@ -130,7 +130,7 @@ func l1ProcessFn(processLog log.Logger, ethClient node.EthClient, l1Contracts L1
return err return err
} }
// L2 blocks posted on L1 // L2 checkpoitns posted on L1
outputProposals := []*database.OutputProposal{} outputProposals := []*database.OutputProposal{}
legacyStateBatches := []*database.LegacyStateBatch{} legacyStateBatches := []*database.LegacyStateBatch{}
......
...@@ -80,7 +80,7 @@ func NewL2Processor(ethClient node.EthClient, db *database.DB, l2Contracts L2Con ...@@ -80,7 +80,7 @@ func NewL2Processor(ethClient node.EthClient, db *database.DB, l2Contracts L2Con
l2Processor := &L2Processor{ l2Processor := &L2Processor{
processor: processor{ processor: processor{
headerTraversal: node.NewBufferedHeaderTraversal(ethClient, fromL2Header), headerTraversal: node.NewHeaderTraversal(ethClient, fromL2Header),
db: db, db: db,
processFn: l2ProcessFn(l2ProcessLog, ethClient, l2Contracts), processFn: l2ProcessFn(l2ProcessLog, ethClient, l2Contracts),
processLog: l2ProcessLog, processLog: l2ProcessLog,
......
...@@ -20,7 +20,7 @@ const ( ...@@ -20,7 +20,7 @@ const (
type ProcessFn func(*database.DB, []*types.Header) error type ProcessFn func(*database.DB, []*types.Header) error
type processor struct { type processor struct {
headerTraversal *node.BufferedHeaderTraversal headerTraversal *node.HeaderTraversal
db *database.DB db *database.DB
processFn ProcessFn processFn ProcessFn
...@@ -33,32 +33,38 @@ func (p processor) Start() { ...@@ -33,32 +33,38 @@ func (p processor) Start() {
defer pollTicker.Stop() defer pollTicker.Stop()
p.processLog.Info("starting processor...") p.processLog.Info("starting processor...")
var unprocessedHeaders []*types.Header
for range pollTicker.C { for range pollTicker.C {
headers, err := p.headerTraversal.NextFinalizedHeaders(defaultHeaderBufferSize) if len(unprocessedHeaders) == 0 {
if err != nil { newHeaders, err := p.headerTraversal.NextFinalizedHeaders(defaultHeaderBufferSize)
p.processLog.Error("error querying for headers", "err", err) if err != nil {
continue p.processLog.Error("error querying for headers", "err", err)
} else if len(headers) == 0 { continue
// Logged as an error since this loop should be operating at a longer interval than the provider } else if len(newHeaders) == 0 {
p.processLog.Error("no new headers. processor unexpectadly at head...") // Logged as an error since this loop should be operating at a longer interval than the provider
continue p.processLog.Error("no new headers. processor unexpectedly at head...")
continue
}
unprocessedHeaders = newHeaders
} else {
p.processLog.Info("retrying previous batch")
} }
batchLog := p.processLog.New("batch_start_block_number", headers[0].Number, "batch_end_block_number", headers[len(headers)-1].Number) firstHeader := unprocessedHeaders[0]
lastHeader := unprocessedHeaders[len(unprocessedHeaders)-1]
batchLog := p.processLog.New("batch_start_block_number", firstHeader.Number, "batch_end_block_number", lastHeader.Number)
batchLog.Info("processing batch") batchLog.Info("processing batch")
err := p.db.Transaction(func(db *database.DB) error {
err = p.db.Transaction(func(db *database.DB) error { return p.processFn(db, unprocessedHeaders)
err := p.processFn(db, headers)
if err != nil {
return err
}
return p.headerTraversal.Advance(headers[len(headers)-1])
}) })
if err != nil { if err != nil {
batchLog.Warn("error processing batch. no operations committed", "err", err) batchLog.Warn("error processing batch. no operations committed", "err", err)
} else { } else {
batchLog.Info("fully committed batch") batchLog.Info("fully committed batch")
unprocessedHeaders = nil
} }
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment