Commit 42dd604b authored by Adrian Sutton's avatar Adrian Sutton Committed by GitHub

op-supervisor: Fetch receipts for each block as head updates (#11035)

* op-supervisor: Introduce pipeline concept and stage to handle all blocks on head update

* op-supervisor: Simplify to just call a block processor directly

* op-supervisor: Simplify further to remove pipeline entirely and hook up processor

* op-supervisor: Separate out and test the head update callback.

* op-supervisor: Fetch receipts for each block
parent 379973e9
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/sources/caching" "github.com/ethereum-optimism/optimism/op-service/sources/caching"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
) )
...@@ -29,6 +30,7 @@ type Metrics interface { ...@@ -29,6 +30,7 @@ type Metrics interface {
// ChainMonitor monitors a source L2 chain, retrieving the data required to populate the database and perform // ChainMonitor monitors a source L2 chain, retrieving the data required to populate the database and perform
// interop consolidation. It detects and notifies when reorgs occur. // interop consolidation. It detects and notifies when reorgs occur.
type ChainMonitor struct { type ChainMonitor struct {
log log.Logger
headMonitor *HeadMonitor headMonitor *HeadMonitor
} }
...@@ -48,14 +50,25 @@ func NewChainMonitor(ctx context.Context, logger log.Logger, genericMetrics Metr ...@@ -48,14 +50,25 @@ func NewChainMonitor(ctx context.Context, logger log.Logger, genericMetrics Metr
if err != nil { if err != nil {
return nil, err return nil, err
} }
logger.Info("Monitoring chain")
headMonitor := NewHeadMonitor(logger, epochPollInterval, cl, &loggingCallback{logger}) // TODO(optimism#11023): Load the starting block from log db
startingHead := eth.L1BlockRef{}
fetchReceipts := newLogFetcher(cl, &loggingReceiptProcessor{logger})
unsafeBlockProcessor := NewChainProcessor(logger, cl, startingHead, fetchReceipts)
unsafeProcessors := []HeadProcessor{unsafeBlockProcessor}
callback := newHeadUpdateProcessor(logger, unsafeProcessors, nil, nil)
headMonitor := NewHeadMonitor(logger, epochPollInterval, cl, callback)
return &ChainMonitor{ return &ChainMonitor{
log: logger,
headMonitor: headMonitor, headMonitor: headMonitor,
}, nil }, nil
} }
func (c *ChainMonitor) Start() error { func (c *ChainMonitor) Start() error {
c.log.Info("Started monitoring chain")
return c.headMonitor.Start() return c.headMonitor.Start()
} }
...@@ -63,21 +76,13 @@ func (c *ChainMonitor) Stop() error { ...@@ -63,21 +76,13 @@ func (c *ChainMonitor) Stop() error {
return c.headMonitor.Stop() return c.headMonitor.Stop()
} }
// loggingCallback is a temporary implementation of the head monitor callback that just logs the events. type loggingReceiptProcessor struct {
type loggingCallback struct {
log log.Logger log log.Logger
} }
func (n *loggingCallback) OnNewUnsafeHead(_ context.Context, block eth.L1BlockRef) { func (n *loggingReceiptProcessor) ProcessLogs(_ context.Context, block eth.L1BlockRef, rcpts types.Receipts) error {
n.log.Info("New unsafe head", "block", block) n.log.Info("Process unsafe block", "block", block, "rcpts", len(rcpts))
} return nil
func (n *loggingCallback) OnNewSafeHead(_ context.Context, block eth.L1BlockRef) {
n.log.Info("New safe head", "block", block)
}
func (n *loggingCallback) OnNewFinalizedHead(_ context.Context, block eth.L1BlockRef) {
n.log.Info("New finalized head", "block", block)
} }
func newClient(ctx context.Context, logger log.Logger, m caching.Metrics, rpc string, rpcClient *rpc.Client, pollRate time.Duration, trustRPC bool, kind sources.RPCProviderKind) (*sources.L1Client, error) { func newClient(ctx context.Context, logger log.Logger, m caching.Metrics, rpc string, rpcClient *rpc.Client, pollRate time.Duration, trustRPC bool, kind sources.RPCProviderKind) (*sources.L1Client, error) {
......
package source
import (
"context"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/log"
)
type BlockByNumberSource interface {
L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error)
}
type BlockProcessor interface {
ProcessBlock(ctx context.Context, block eth.L1BlockRef) error
}
type BlockProcessorFn func(ctx context.Context, block eth.L1BlockRef) error
func (fn BlockProcessorFn) ProcessBlock(ctx context.Context, block eth.L1BlockRef) error {
return fn(ctx, block)
}
// ChainProcessor is a HeadProcessor that fills in any skipped blocks between head update events.
// It ensures that, absent reorgs, every block in the chain is processed even if some head advancements are skipped.
type ChainProcessor struct {
log log.Logger
client BlockByNumberSource
lastBlock eth.L1BlockRef
processor BlockProcessor
}
func NewChainProcessor(log log.Logger, client BlockByNumberSource, startingHead eth.L1BlockRef, processor BlockProcessor) *ChainProcessor {
return &ChainProcessor{
log: log,
client: client,
lastBlock: startingHead,
processor: processor,
}
}
func (s *ChainProcessor) OnNewHead(ctx context.Context, head eth.L1BlockRef) {
if head.Number <= s.lastBlock.Number {
return
}
for s.lastBlock.Number+1 < head.Number {
blockNum := s.lastBlock.Number + 1
nextBlock, err := s.client.L1BlockRefByNumber(ctx, blockNum)
if err != nil {
s.log.Error("Failed to fetch block info", "number", blockNum, "err", err)
return // Don't update the last processed block so we will retry fetching this block on next head update
}
if err := s.processor.ProcessBlock(ctx, nextBlock); err != nil {
s.log.Error("Failed to process block", "block", nextBlock, "err", err)
return // Don't update the last processed block so we will retry on next update
}
s.lastBlock = nextBlock
}
if err := s.processor.ProcessBlock(ctx, head); err != nil {
s.log.Error("Failed to process block", "block", head, "err", err)
return // Don't update the last processed block so we will retry on next update
}
s.lastBlock = head
}
package source
import (
"context"
"errors"
"testing"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func TestUnsafeBlocksStage(t *testing.T) {
t.Run("IgnoreEventsAtOrPriorToStartingHead", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LvlInfo)
client := &stubBlockByNumberSource{}
processor := &stubBlockProcessor{}
stage := NewChainProcessor(logger, client, eth.L1BlockRef{Number: 100}, processor)
stage.OnNewHead(ctx, eth.L1BlockRef{Number: 100})
stage.OnNewHead(ctx, eth.L1BlockRef{Number: 99})
require.Empty(t, processor.processed)
require.Zero(t, client.calls)
})
t.Run("OutputNewHeadsWithNoMissedBlocks", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LvlInfo)
client := &stubBlockByNumberSource{}
block0 := eth.L1BlockRef{Number: 100}
block1 := eth.L1BlockRef{Number: 101}
block2 := eth.L1BlockRef{Number: 102}
block3 := eth.L1BlockRef{Number: 103}
processor := &stubBlockProcessor{}
stage := NewChainProcessor(logger, client, block0, processor)
stage.OnNewHead(ctx, block1)
require.Equal(t, []eth.L1BlockRef{block1}, processor.processed)
stage.OnNewHead(ctx, block2)
require.Equal(t, []eth.L1BlockRef{block1, block2}, processor.processed)
stage.OnNewHead(ctx, block3)
require.Equal(t, []eth.L1BlockRef{block1, block2, block3}, processor.processed)
require.Zero(t, client.calls, "should not need to request block info")
})
t.Run("IgnoreEventsAtOrPriorToPreviousHead", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LvlInfo)
client := &stubBlockByNumberSource{}
block0 := eth.L1BlockRef{Number: 100}
block1 := eth.L1BlockRef{Number: 101}
processor := &stubBlockProcessor{}
stage := NewChainProcessor(logger, client, block0, processor)
stage.OnNewHead(ctx, block1)
require.NotEmpty(t, processor.processed)
require.Equal(t, []eth.L1BlockRef{block1}, processor.processed)
stage.OnNewHead(ctx, block0)
stage.OnNewHead(ctx, block1)
require.Equal(t, []eth.L1BlockRef{block1}, processor.processed)
require.Zero(t, client.calls, "should not need to request block info")
})
t.Run("OutputSkippedBlocks", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LvlInfo)
client := &stubBlockByNumberSource{}
block0 := eth.L1BlockRef{Number: 100}
block3 := eth.L1BlockRef{Number: 103}
processor := &stubBlockProcessor{}
stage := NewChainProcessor(logger, client, block0, processor)
stage.OnNewHead(ctx, block3)
require.Equal(t, []eth.L1BlockRef{makeBlockRef(101), makeBlockRef(102), block3}, processor.processed)
require.Equal(t, 2, client.calls, "should only request the two missing blocks")
})
t.Run("DoNotUpdateLastBlockOnFetchError", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LvlInfo)
client := &stubBlockByNumberSource{err: errors.New("boom")}
block0 := eth.L1BlockRef{Number: 100}
block3 := eth.L1BlockRef{Number: 103}
processor := &stubBlockProcessor{}
stage := NewChainProcessor(logger, client, block0, processor)
stage.OnNewHead(ctx, block3)
require.Empty(t, processor.processed, "should not update any blocks because backfill failed")
client.err = nil
stage.OnNewHead(ctx, block3)
require.Equal(t, []eth.L1BlockRef{makeBlockRef(101), makeBlockRef(102), block3}, processor.processed)
})
t.Run("DoNotUpdateLastBlockOnProcessorError", func(t *testing.T) {
ctx := context.Background()
logger := testlog.Logger(t, log.LvlInfo)
client := &stubBlockByNumberSource{}
block0 := eth.L1BlockRef{Number: 100}
block3 := eth.L1BlockRef{Number: 103}
processor := &stubBlockProcessor{err: errors.New("boom")}
stage := NewChainProcessor(logger, client, block0, processor)
stage.OnNewHead(ctx, block3)
require.Equal(t, []eth.L1BlockRef{makeBlockRef(101)}, processor.processed, "Attempted to process block 101")
processor.err = nil
stage.OnNewHead(ctx, block3)
// Attempts to process block 101 again, then carries on
require.Equal(t, []eth.L1BlockRef{makeBlockRef(101), makeBlockRef(101), makeBlockRef(102), block3}, processor.processed)
})
}
type stubBlockByNumberSource struct {
calls int
err error
}
func (s *stubBlockByNumberSource) L1BlockRefByNumber(_ context.Context, number uint64) (eth.L1BlockRef, error) {
s.calls++
if s.err != nil {
return eth.L1BlockRef{}, s.err
}
return makeBlockRef(number), nil
}
type stubBlockProcessor struct {
processed []eth.L1BlockRef
err error
}
func (s *stubBlockProcessor) ProcessBlock(_ context.Context, block eth.L1BlockRef) error {
s.processed = append(s.processed, block)
return s.err
}
func makeBlockRef(number uint64) eth.L1BlockRef {
return eth.L1BlockRef{
Number: number,
Hash: common.Hash{byte(number)},
ParentHash: common.Hash{byte(number - 1)},
Time: number * 1000,
}
}
package source
import (
"context"
"fmt"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
type LogSource interface {
FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error)
}
type ReceiptProcessor interface {
ProcessLogs(ctx context.Context, block eth.L1BlockRef, rcpts types.Receipts) error
}
type ReceiptProcessorFn func(ctx context.Context, block eth.L1BlockRef, rcpts types.Receipts) error
func (r ReceiptProcessorFn) ProcessLogs(ctx context.Context, block eth.L1BlockRef, rcpts types.Receipts) error {
return r(ctx, block, rcpts)
}
type logFetcher struct {
client LogSource
processor ReceiptProcessor
}
func newLogFetcher(client LogSource, processor ReceiptProcessor) *logFetcher {
return &logFetcher{
client: client,
processor: processor,
}
}
var _ BlockProcessor = (*logFetcher)(nil)
func (l *logFetcher) ProcessBlock(ctx context.Context, block eth.L1BlockRef) error {
_, rcpts, err := l.client.FetchReceipts(ctx, block.Hash)
if err != nil {
return fmt.Errorf("failed to fetch receipts for block %v: %w", block, err)
}
return l.processor.ProcessLogs(ctx, block, rcpts)
}
package source
import (
"context"
"errors"
"testing"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"
)
func TestFetchLogs(t *testing.T) {
ctx := context.Background()
rcpts := types.Receipts{&types.Receipt{Type: 3}, &types.Receipt{Type: 4}}
t.Run("Success", func(t *testing.T) {
client := &stubLogSource{
rcpts: rcpts,
}
var processed []types.Receipts
processor := ReceiptProcessorFn(func(ctx context.Context, block eth.L1BlockRef, rcpts types.Receipts) error {
processed = append(processed, rcpts)
return nil
})
fetcher := newLogFetcher(client, processor)
block := eth.L1BlockRef{Number: 11, Hash: common.Hash{0xaa}}
err := fetcher.ProcessBlock(ctx, block)
require.NoError(t, err)
require.Equal(t, []types.Receipts{rcpts}, processed)
})
t.Run("ReceiptFetcherError", func(t *testing.T) {
client := &stubLogSource{
err: errors.New("boom"),
}
processor := ReceiptProcessorFn(func(ctx context.Context, block eth.L1BlockRef, rcpts types.Receipts) error {
t.Fatal("should not be called")
return nil
})
fetcher := newLogFetcher(client, processor)
block := eth.L1BlockRef{Number: 11, Hash: common.Hash{0xaa}}
err := fetcher.ProcessBlock(ctx, block)
require.ErrorIs(t, err, client.err)
})
t.Run("ProcessorError", func(t *testing.T) {
expectedErr := errors.New("boom")
client := &stubLogSource{
rcpts: rcpts,
}
processor := ReceiptProcessorFn(func(ctx context.Context, block eth.L1BlockRef, rcpts types.Receipts) error {
return expectedErr
})
fetcher := newLogFetcher(client, processor)
block := eth.L1BlockRef{Number: 11, Hash: common.Hash{0xaa}}
err := fetcher.ProcessBlock(ctx, block)
require.ErrorIs(t, err, expectedErr)
})
}
type stubLogSource struct {
err error
rcpts types.Receipts
}
func (s *stubLogSource) FetchReceipts(_ context.Context, _ common.Hash) (eth.BlockInfo, types.Receipts, error) {
if s.err != nil {
return nil, nil, s.err
}
return nil, s.rcpts, nil
}
package source
import (
"context"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/log"
)
type HeadProcessor interface {
OnNewHead(ctx context.Context, head eth.L1BlockRef)
}
type HeadProcessorFn func(ctx context.Context, head eth.L1BlockRef)
func (f HeadProcessorFn) OnNewHead(ctx context.Context, head eth.L1BlockRef) {
f(ctx, head)
}
// headUpdateProcessor handles head update events and routes them to the appropriate handlers
type headUpdateProcessor struct {
log log.Logger
unsafeProcessors []HeadProcessor
safeProcessors []HeadProcessor
finalizedProcessors []HeadProcessor
}
func newHeadUpdateProcessor(log log.Logger, unsafeProcessors []HeadProcessor, safeProcessors []HeadProcessor, finalizedProcessors []HeadProcessor) *headUpdateProcessor {
return &headUpdateProcessor{
log: log,
unsafeProcessors: unsafeProcessors,
safeProcessors: safeProcessors,
finalizedProcessors: finalizedProcessors,
}
}
func (n *headUpdateProcessor) OnNewUnsafeHead(ctx context.Context, block eth.L1BlockRef) {
n.log.Debug("New unsafe head", "block", block)
for _, processor := range n.unsafeProcessors {
processor.OnNewHead(ctx, block)
}
}
func (n *headUpdateProcessor) OnNewSafeHead(ctx context.Context, block eth.L1BlockRef) {
n.log.Debug("New safe head", "block", block)
for _, processor := range n.safeProcessors {
processor.OnNewHead(ctx, block)
}
}
func (n *headUpdateProcessor) OnNewFinalizedHead(ctx context.Context, block eth.L1BlockRef) {
n.log.Debug("New finalized head", "block", block)
for _, processor := range n.finalizedProcessors {
processor.OnNewHead(ctx, block)
}
}
package source
import (
"context"
"testing"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func TestHeadUpdateProcessor(t *testing.T) {
t.Run("NotifyUnsafeHeadProcessors", func(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
processed := make([]eth.L1BlockRef, 3)
makeProcessor := func(idx int) HeadProcessor {
return HeadProcessorFn(func(_ context.Context, head eth.L1BlockRef) {
processed[idx] = head
})
}
headUpdates := newHeadUpdateProcessor(logger, []HeadProcessor{makeProcessor(0), makeProcessor(1), makeProcessor(2)}, nil, nil)
block := eth.L1BlockRef{Number: 110, Hash: common.Hash{0xaa}}
headUpdates.OnNewUnsafeHead(context.Background(), block)
require.Equal(t, []eth.L1BlockRef{block, block, block}, processed)
})
t.Run("NotifySafeHeadProcessors", func(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
processed := make([]eth.L1BlockRef, 3)
makeProcessor := func(idx int) HeadProcessor {
return HeadProcessorFn(func(_ context.Context, head eth.L1BlockRef) {
processed[idx] = head
})
}
headUpdates := newHeadUpdateProcessor(logger, nil, []HeadProcessor{makeProcessor(0), makeProcessor(1), makeProcessor(2)}, nil)
block := eth.L1BlockRef{Number: 110, Hash: common.Hash{0xaa}}
headUpdates.OnNewSafeHead(context.Background(), block)
require.Equal(t, []eth.L1BlockRef{block, block, block}, processed)
})
t.Run("NotifyFinalizedHeadProcessors", func(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
processed := make([]eth.L1BlockRef, 3)
makeProcessor := func(idx int) HeadProcessor {
return HeadProcessorFn(func(_ context.Context, head eth.L1BlockRef) {
processed[idx] = head
})
}
headUpdates := newHeadUpdateProcessor(logger, nil, nil, []HeadProcessor{makeProcessor(0), makeProcessor(1), makeProcessor(2)})
block := eth.L1BlockRef{Number: 110, Hash: common.Hash{0xaa}}
headUpdates.OnNewFinalizedHead(context.Background(), block)
require.Equal(t, []eth.L1BlockRef{block, block, block}, processed)
})
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment