chain_processor_test.go 6.56 KB
Newer Older
1 2 3 4 5
package source

import (
	"context"
	"errors"
6
	"fmt"
7 8 9 10
	"testing"

	"github.com/ethereum-optimism/optimism/op-service/eth"
	"github.com/ethereum-optimism/optimism/op-service/testlog"
11
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
12 13 14 15 16
	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/log"
	"github.com/stretchr/testify/require"
)

17 18
var processorChainID = types.ChainIDFromUInt64(4)

19 20 21 22 23 24
func TestUnsafeBlocksStage(t *testing.T) {
	t.Run("IgnoreEventsAtOrPriorToStartingHead", func(t *testing.T) {
		ctx := context.Background()
		logger := testlog.Logger(t, log.LvlInfo)
		client := &stubBlockByNumberSource{}
		processor := &stubBlockProcessor{}
25
		stage := NewChainProcessor(logger, client, processorChainID, eth.L1BlockRef{Number: 100}, processor, &stubRewinder{})
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
		stage.OnNewHead(ctx, eth.L1BlockRef{Number: 100})
		stage.OnNewHead(ctx, eth.L1BlockRef{Number: 99})

		require.Empty(t, processor.processed)
		require.Zero(t, client.calls)
	})

	t.Run("OutputNewHeadsWithNoMissedBlocks", func(t *testing.T) {
		ctx := context.Background()
		logger := testlog.Logger(t, log.LvlInfo)
		client := &stubBlockByNumberSource{}
		block0 := eth.L1BlockRef{Number: 100}
		block1 := eth.L1BlockRef{Number: 101}
		block2 := eth.L1BlockRef{Number: 102}
		block3 := eth.L1BlockRef{Number: 103}
		processor := &stubBlockProcessor{}
42
		stage := NewChainProcessor(logger, client, processorChainID, block0, processor, &stubRewinder{})
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
		stage.OnNewHead(ctx, block1)
		require.Equal(t, []eth.L1BlockRef{block1}, processor.processed)
		stage.OnNewHead(ctx, block2)
		require.Equal(t, []eth.L1BlockRef{block1, block2}, processor.processed)
		stage.OnNewHead(ctx, block3)
		require.Equal(t, []eth.L1BlockRef{block1, block2, block3}, processor.processed)

		require.Zero(t, client.calls, "should not need to request block info")
	})

	t.Run("IgnoreEventsAtOrPriorToPreviousHead", func(t *testing.T) {
		ctx := context.Background()
		logger := testlog.Logger(t, log.LvlInfo)
		client := &stubBlockByNumberSource{}
		block0 := eth.L1BlockRef{Number: 100}
		block1 := eth.L1BlockRef{Number: 101}
		processor := &stubBlockProcessor{}
60
		stage := NewChainProcessor(logger, client, processorChainID, block0, processor, &stubRewinder{})
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
		stage.OnNewHead(ctx, block1)
		require.NotEmpty(t, processor.processed)
		require.Equal(t, []eth.L1BlockRef{block1}, processor.processed)

		stage.OnNewHead(ctx, block0)
		stage.OnNewHead(ctx, block1)
		require.Equal(t, []eth.L1BlockRef{block1}, processor.processed)

		require.Zero(t, client.calls, "should not need to request block info")
	})

	t.Run("OutputSkippedBlocks", func(t *testing.T) {
		ctx := context.Background()
		logger := testlog.Logger(t, log.LvlInfo)
		client := &stubBlockByNumberSource{}
		block0 := eth.L1BlockRef{Number: 100}
		block3 := eth.L1BlockRef{Number: 103}
		processor := &stubBlockProcessor{}
79
		stage := NewChainProcessor(logger, client, processorChainID, block0, processor, &stubRewinder{})
80 81 82 83 84 85 86 87 88 89 90 91 92 93

		stage.OnNewHead(ctx, block3)
		require.Equal(t, []eth.L1BlockRef{makeBlockRef(101), makeBlockRef(102), block3}, processor.processed)

		require.Equal(t, 2, client.calls, "should only request the two missing blocks")
	})

	t.Run("DoNotUpdateLastBlockOnFetchError", func(t *testing.T) {
		ctx := context.Background()
		logger := testlog.Logger(t, log.LvlInfo)
		client := &stubBlockByNumberSource{err: errors.New("boom")}
		block0 := eth.L1BlockRef{Number: 100}
		block3 := eth.L1BlockRef{Number: 103}
		processor := &stubBlockProcessor{}
94
		rewinder := &stubRewinder{}
95
		stage := NewChainProcessor(logger, client, processorChainID, block0, processor, rewinder)
96 97 98 99 100 101 102

		stage.OnNewHead(ctx, block3)
		require.Empty(t, processor.processed, "should not update any blocks because backfill failed")

		client.err = nil
		stage.OnNewHead(ctx, block3)
		require.Equal(t, []eth.L1BlockRef{makeBlockRef(101), makeBlockRef(102), block3}, processor.processed)
103
		require.False(t, rewinder.rewindCalled, "should not rewind because no logs could have been written")
104 105 106 107 108 109 110 111 112
	})

	t.Run("DoNotUpdateLastBlockOnProcessorError", func(t *testing.T) {
		ctx := context.Background()
		logger := testlog.Logger(t, log.LvlInfo)
		client := &stubBlockByNumberSource{}
		block0 := eth.L1BlockRef{Number: 100}
		block3 := eth.L1BlockRef{Number: 103}
		processor := &stubBlockProcessor{err: errors.New("boom")}
113
		rewinder := &stubRewinder{}
114
		stage := NewChainProcessor(logger, client, processorChainID, block0, processor, rewinder)
115 116 117

		stage.OnNewHead(ctx, block3)
		require.Equal(t, []eth.L1BlockRef{makeBlockRef(101)}, processor.processed, "Attempted to process block 101")
118
		require.Equal(t, block0.Number, rewinder.rewoundTo, "should rewind to block before error")
119 120 121 122 123 124

		processor.err = nil
		stage.OnNewHead(ctx, block3)
		// Attempts to process block 101 again, then carries on
		require.Equal(t, []eth.L1BlockRef{makeBlockRef(101), makeBlockRef(101), makeBlockRef(102), block3}, processor.processed)
	})
125 126 127 128 129 130 131 132 133

	t.Run("RewindWhenNewHeadProcessingFails", func(t *testing.T) {
		ctx := context.Background()
		logger := testlog.Logger(t, log.LvlInfo)
		client := &stubBlockByNumberSource{}
		block0 := eth.L1BlockRef{Number: 100}
		block1 := eth.L1BlockRef{Number: 101}
		processor := &stubBlockProcessor{err: errors.New("boom")}
		rewinder := &stubRewinder{}
134
		stage := NewChainProcessor(logger, client, processorChainID, block0, processor, rewinder)
135 136 137 138 139 140

		// No skipped blocks
		stage.OnNewHead(ctx, block1)
		require.Equal(t, []eth.L1BlockRef{block1}, processor.processed, "Attempted to process block 101")
		require.Equal(t, block0.Number, rewinder.rewoundTo, "should rewind to block before error")
	})
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
}

type stubBlockByNumberSource struct {
	calls int
	err   error
}

func (s *stubBlockByNumberSource) L1BlockRefByNumber(_ context.Context, number uint64) (eth.L1BlockRef, error) {
	s.calls++
	if s.err != nil {
		return eth.L1BlockRef{}, s.err
	}
	return makeBlockRef(number), nil
}

type stubBlockProcessor struct {
	processed []eth.L1BlockRef
	err       error
}

func (s *stubBlockProcessor) ProcessBlock(_ context.Context, block eth.L1BlockRef) error {
	s.processed = append(s.processed, block)
	return s.err
}

func makeBlockRef(number uint64) eth.L1BlockRef {
	return eth.L1BlockRef{
		Number:     number,
		Hash:       common.Hash{byte(number)},
		ParentHash: common.Hash{byte(number - 1)},
		Time:       number * 1000,
	}
}
174 175 176 177 178 179

type stubRewinder struct {
	rewoundTo    uint64
	rewindCalled bool
}

180 181 182 183
func (s *stubRewinder) Rewind(chainID types.ChainID, headBlockNum uint64) error {
	if chainID != processorChainID {
		return fmt.Errorf("chainID mismatch, expected %v but was %v", processorChainID, chainID)
	}
184 185 186 187
	s.rewoundTo = headBlockNum
	s.rewindCalled = true
	return nil
}