Commit acafdb9e authored by Adrian Sutton's avatar Adrian Sutton Committed by GitHub

op-supervisor: Rewind database if an error occurs while storing logs for a block. (#11122)

Ensures the database is rewound to the point where it can retry storing the logs for the block rather than having some logs already exist.
parent 69d2d47b
......@@ -23,6 +23,11 @@ type Metrics interface {
caching.Metrics
}
type LogDB interface {
LogStorage
DatabaseRewinder
}
// ChainMonitor monitors a source L2 chain, retrieving the data required to populate the database and perform
// interop consolidation. It detects and notifies when reorgs occur.
type ChainMonitor struct {
......@@ -30,7 +35,7 @@ type ChainMonitor struct {
headMonitor *HeadMonitor
}
func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID types.ChainID, rpc string, client client.RPC, store LogStorage, block uint64) (*ChainMonitor, error) {
func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID types.ChainID, rpc string, client client.RPC, store LogDB, block uint64) (*ChainMonitor, error) {
logger = logger.New("chainID", chainID)
cl, err := newClient(ctx, logger, m, rpc, client, pollInterval, trustRpc, rpcKind)
if err != nil {
......@@ -43,7 +48,7 @@ func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID
processLogs := newLogProcessor(store)
fetchReceipts := newLogFetcher(cl, processLogs)
unsafeBlockProcessor := NewChainProcessor(logger, cl, startingHead, fetchReceipts)
unsafeBlockProcessor := NewChainProcessor(logger, cl, startingHead, fetchReceipts, store)
unsafeProcessors := []HeadProcessor{unsafeBlockProcessor}
callback := newHeadUpdateProcessor(logger, unsafeProcessors, nil, nil)
......
......@@ -15,6 +15,10 @@ type BlockProcessor interface {
ProcessBlock(ctx context.Context, block eth.L1BlockRef) error
}
type DatabaseRewinder interface {
Rewind(headBlockNum uint64) error
}
type BlockProcessorFn func(ctx context.Context, block eth.L1BlockRef) error
func (fn BlockProcessorFn) ProcessBlock(ctx context.Context, block eth.L1BlockRef) error {
......@@ -28,14 +32,16 @@ type ChainProcessor struct {
client BlockByNumberSource
lastBlock eth.L1BlockRef
processor BlockProcessor
rewinder DatabaseRewinder
}
func NewChainProcessor(log log.Logger, client BlockByNumberSource, startingHead eth.L1BlockRef, processor BlockProcessor) *ChainProcessor {
func NewChainProcessor(log log.Logger, client BlockByNumberSource, startingHead eth.L1BlockRef, processor BlockProcessor, rewinder DatabaseRewinder) *ChainProcessor {
return &ChainProcessor{
log: log,
client: client,
lastBlock: startingHead,
processor: processor,
rewinder: rewinder,
}
}
......@@ -48,18 +54,27 @@ func (s *ChainProcessor) OnNewHead(ctx context.Context, head eth.L1BlockRef) {
nextBlock, err := s.client.L1BlockRefByNumber(ctx, blockNum)
if err != nil {
s.log.Error("Failed to fetch block info", "number", blockNum, "err", err)
return // Don't update the last processed block so we will retry fetching this block on next head update
return
}
if err := s.processor.ProcessBlock(ctx, nextBlock); err != nil {
s.log.Error("Failed to process block", "block", nextBlock, "err", err)
return // Don't update the last processed block so we will retry on next update
if ok := s.processBlock(ctx, nextBlock); !ok {
return
}
s.lastBlock = nextBlock
}
if err := s.processor.ProcessBlock(ctx, head); err != nil {
s.log.Error("Failed to process block", "block", head, "err", err)
return // Don't update the last processed block so we will retry on next update
s.processBlock(ctx, head)
}
func (s *ChainProcessor) processBlock(ctx context.Context, block eth.L1BlockRef) bool {
if err := s.processor.ProcessBlock(ctx, block); err != nil {
s.log.Error("Failed to process block", "block", block, "err", err)
// Try to rewind the database to the previous block to remove any logs from this block that were written
if err := s.rewinder.Rewind(s.lastBlock.Number); err != nil {
// If any logs were written, our next attempt to write will fail and we'll retry this rewind.
// If no logs were written successfully then the rewind wouldn't have done anything anyway.
s.log.Error("Failed to rewind after error processing block", "block", block, "err", err)
}
return false // Don't update the last processed block so we will retry on next update
}
s.lastBlock = head
s.lastBlock = block
return true
}
......@@ -18,7 +18,7 @@ func TestUnsafeBlocksStage(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
client := &stubBlockByNumberSource{}
processor := &stubBlockProcessor{}
stage := NewChainProcessor(logger, client, eth.L1BlockRef{Number: 100}, processor)
stage := NewChainProcessor(logger, client, eth.L1BlockRef{Number: 100}, processor, &stubRewinder{})
stage.OnNewHead(ctx, eth.L1BlockRef{Number: 100})
stage.OnNewHead(ctx, eth.L1BlockRef{Number: 99})
......@@ -35,7 +35,7 @@ func TestUnsafeBlocksStage(t *testing.T) {
block2 := eth.L1BlockRef{Number: 102}
block3 := eth.L1BlockRef{Number: 103}
processor := &stubBlockProcessor{}
stage := NewChainProcessor(logger, client, block0, processor)
stage := NewChainProcessor(logger, client, block0, processor, &stubRewinder{})
stage.OnNewHead(ctx, block1)
require.Equal(t, []eth.L1BlockRef{block1}, processor.processed)
stage.OnNewHead(ctx, block2)
......@@ -53,7 +53,7 @@ func TestUnsafeBlocksStage(t *testing.T) {
block0 := eth.L1BlockRef{Number: 100}
block1 := eth.L1BlockRef{Number: 101}
processor := &stubBlockProcessor{}
stage := NewChainProcessor(logger, client, block0, processor)
stage := NewChainProcessor(logger, client, block0, processor, &stubRewinder{})
stage.OnNewHead(ctx, block1)
require.NotEmpty(t, processor.processed)
require.Equal(t, []eth.L1BlockRef{block1}, processor.processed)
......@@ -72,7 +72,7 @@ func TestUnsafeBlocksStage(t *testing.T) {
block0 := eth.L1BlockRef{Number: 100}
block3 := eth.L1BlockRef{Number: 103}
processor := &stubBlockProcessor{}
stage := NewChainProcessor(logger, client, block0, processor)
stage := NewChainProcessor(logger, client, block0, processor, &stubRewinder{})
stage.OnNewHead(ctx, block3)
require.Equal(t, []eth.L1BlockRef{makeBlockRef(101), makeBlockRef(102), block3}, processor.processed)
......@@ -87,7 +87,8 @@ func TestUnsafeBlocksStage(t *testing.T) {
block0 := eth.L1BlockRef{Number: 100}
block3 := eth.L1BlockRef{Number: 103}
processor := &stubBlockProcessor{}
stage := NewChainProcessor(logger, client, block0, processor)
rewinder := &stubRewinder{}
stage := NewChainProcessor(logger, client, block0, processor, rewinder)
stage.OnNewHead(ctx, block3)
require.Empty(t, processor.processed, "should not update any blocks because backfill failed")
......@@ -95,6 +96,7 @@ func TestUnsafeBlocksStage(t *testing.T) {
client.err = nil
stage.OnNewHead(ctx, block3)
require.Equal(t, []eth.L1BlockRef{makeBlockRef(101), makeBlockRef(102), block3}, processor.processed)
require.False(t, rewinder.rewindCalled, "should not rewind because no logs could have been written")
})
t.Run("DoNotUpdateLastBlockOnProcessorError", func(t *testing.T) {
......@@ -104,16 +106,34 @@ func TestUnsafeBlocksStage(t *testing.T) {
block0 := eth.L1BlockRef{Number: 100}
block3 := eth.L1BlockRef{Number: 103}
processor := &stubBlockProcessor{err: errors.New("boom")}
stage := NewChainProcessor(logger, client, block0, processor)
rewinder := &stubRewinder{}
stage := NewChainProcessor(logger, client, block0, processor, rewinder)
stage.OnNewHead(ctx, block3)
require.Equal(t, []eth.L1BlockRef{makeBlockRef(101)}, processor.processed, "Attempted to process block 101")
require.Equal(t, block0.Number, rewinder.rewoundTo, "should rewind to block before error")
processor.err = nil
stage.OnNewHead(ctx, block3)
// Attempts to process block 101 again, then carries on
require.Equal(t, []eth.L1BlockRef{makeBlockRef(101), makeBlockRef(101), makeBlockRef(102), block3}, processor.processed)
})
t.Run("RewindWhenNewHeadProcessingFails", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LvlInfo)
client := &stubBlockByNumberSource{}
block0 := eth.L1BlockRef{Number: 100}
block1 := eth.L1BlockRef{Number: 101}
processor := &stubBlockProcessor{err: errors.New("boom")}
rewinder := &stubRewinder{}
stage := NewChainProcessor(logger, client, block0, processor, rewinder)
// No skipped blocks
stage.OnNewHead(ctx, block1)
require.Equal(t, []eth.L1BlockRef{block1}, processor.processed, "Attempted to process block 101")
require.Equal(t, block0.Number, rewinder.rewoundTo, "should rewind to block before error")
})
}
type stubBlockByNumberSource struct {
......@@ -147,3 +167,14 @@ func makeBlockRef(number uint64) eth.L1BlockRef {
Time: number * 1000,
}
}
type stubRewinder struct {
rewoundTo uint64
rewindCalled bool
}
func (s *stubRewinder) Rewind(headBlockNum uint64) error {
s.rewoundTo = headBlockNum
s.rewindCalled = true
return nil
}
......@@ -29,7 +29,6 @@ func (p *logProcessor) ProcessLogs(_ context.Context, block eth.L1BlockRef, rcpt
logHash := logToHash(l)
err := p.logStore.AddLog(logHash, block.ID(), block.Time, uint32(l.Index), nil)
if err != nil {
// TODO(optimism#11044): Need to roll back to the start of the block....
return fmt.Errorf("failed to add log %d from block %v: %w", l.Index, block.ID(), err)
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment