chain_processor.go 5.91 KB
Newer Older
1
package processors
2 3 4

import (
	"context"
5
	"errors"
6 7 8 9 10
	"fmt"
	"sync"
	"sync/atomic"
	"time"

11
	"github.com/ethereum/go-ethereum"
12 13 14
	"github.com/ethereum/go-ethereum/common"
	gethtypes "github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/log"
15 16

	"github.com/ethereum-optimism/optimism/op-service/eth"
17
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
18 19
)

20
type Source interface {
21
	L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error)
22
	FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, gethtypes.Receipts, error)
23 24
}

25
type LogProcessor interface {
Axel Kingsley's avatar
Axel Kingsley committed
26
	ProcessLogs(ctx context.Context, block eth.BlockRef, receipts gethtypes.Receipts) error
27 28
}

29
type DatabaseRewinder interface {
30
	Rewind(chain types.ChainID, headBlockNum uint64) error
31
	LatestBlockNum(chain types.ChainID) (num uint64, ok bool)
32 33
}

Axel Kingsley's avatar
Axel Kingsley committed
34
type BlockProcessorFn func(ctx context.Context, block eth.BlockRef) error
35

Axel Kingsley's avatar
Axel Kingsley committed
36
func (fn BlockProcessorFn) ProcessBlock(ctx context.Context, block eth.BlockRef) error {
37 38 39 40 41 42
	return fn(ctx, block)
}

// ChainProcessor is a HeadProcessor that fills in any skipped blocks between head update events.
// It ensures that, absent reorgs, every block in the chain is processed even if some head advancements are skipped.
type ChainProcessor struct {
43 44 45 46
	log log.Logger

	client     Source
	clientLock sync.Mutex
47 48 49 50

	chain types.ChainID

	processor LogProcessor
51
	rewinder  DatabaseRewinder
52 53 54 55 56 57

	// the last known head. May be 0 if not known.
	lastHead atomic.Uint64
	// channel with capacity of 1, full if there is work to do
	newHead chan struct{}

58 59
	// to signal to the other services that new indexed data is available
	onIndexed func()
60 61 62 63 64

	// lifetime management of the chain processor
	ctx    context.Context
	cancel context.CancelFunc
	wg     sync.WaitGroup
65 66
}

67
func NewChainProcessor(log log.Logger, chain types.ChainID, processor LogProcessor, rewinder DatabaseRewinder, onIndexed func()) *ChainProcessor {
68 69
	ctx, cancel := context.WithCancel(context.Background())
	out := &ChainProcessor{
70 71
		log:       log.New("chain", chain),
		client:    nil,
72
		chain:     chain,
73
		processor: processor,
74
		rewinder:  rewinder,
75
		newHead:   make(chan struct{}, 1),
76
		onIndexed: onIndexed,
77 78
		ctx:       ctx,
		cancel:    cancel,
79
	}
80
	return out
81 82
}

83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
func (s *ChainProcessor) SetSource(cl Source) {
	s.clientLock.Lock()
	defer s.clientLock.Unlock()
	s.client = cl
}

func (s *ChainProcessor) StartBackground() {
	s.wg.Add(1)
	go s.worker()
}

func (s *ChainProcessor) ProcessToHead() {
	s.work()
}

98 99 100 101
func (s *ChainProcessor) nextNum() uint64 {
	headNum, ok := s.rewinder.LatestBlockNum(s.chain)
	if !ok {
		return 0 // genesis. We could change this to start at a later block.
102
	}
103 104 105
	return headNum + 1
}

106 107
// worker is the main loop of the chain processor's worker
// it manages work by request or on a timer, and watches for shutdown
108 109 110 111
func (s *ChainProcessor) worker() {
	defer s.wg.Done()

	delay := time.NewTicker(time.Second * 5)
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
	for {
		// await next time we process, or detect shutdown
		select {
		case <-s.ctx.Done():
			delay.Stop()
			return
		case <-s.newHead:
			s.log.Debug("Responding to new head signal")
			s.work()
		case <-delay.C:
			s.log.Debug("Checking for updates")
			s.work()
		}
	}
}

// work processes the next block in the chain repeatedly until it reaches the head
func (s *ChainProcessor) work() {
130 131
	for {
		if s.ctx.Err() != nil { // check if we are closing down
132
			return
133
		}
134 135
		target := s.nextNum()
		if err := s.update(target); err != nil {
136
			if errors.Is(err, ethereum.NotFound) {
137
				s.log.Debug("Event-indexer cannot find next block yet", "target", target, "err", err)
138
			} else if errors.Is(err, types.ErrNoRPCSource) {
139 140 141 142 143
				s.log.Warn("No RPC source configured, cannot process new blocks")
			} else {
				s.log.Error("Failed to process new block", "err", err)
				// idle until next update trigger
			}
144
		} else if x := s.lastHead.Load(); target+1 <= x {
145
			s.log.Debug("Continuing with next block", "newTarget", target+1, "lastHead", x)
146 147 148 149
			continue // instantly continue processing, no need to idle
		} else {
			s.log.Debug("Idling block-processing, reached latest block", "head", target)
		}
150
		return
151
	}
152 153
}

154
func (s *ChainProcessor) update(nextNum uint64) error {
155 156 157 158
	s.clientLock.Lock()
	defer s.clientLock.Unlock()

	if s.client == nil {
159
		return types.ErrNoRPCSource
160 161
	}

162
	ctx, cancel := context.WithTimeout(s.ctx, time.Second*10)
163
	nextL1, err := s.client.L1BlockRefByNumber(ctx, nextNum)
Axel Kingsley's avatar
Axel Kingsley committed
164
	next := eth.BlockRef{
165 166 167 168 169
		Hash:       nextL1.Hash,
		ParentHash: nextL1.ParentHash,
		Number:     nextL1.Number,
		Time:       nextL1.Time,
	}
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
	cancel()
	if err != nil {
		return fmt.Errorf("failed to fetch next block: %w", err)
	}

	// Try and fetch the receipts
	ctx, cancel = context.WithTimeout(s.ctx, time.Second*10)
	_, receipts, err := s.client.FetchReceipts(ctx, next.Hash)
	cancel()
	if err != nil {
		return fmt.Errorf("failed to fetch receipts of block: %w", err)
	}
	if err := s.processor.ProcessLogs(ctx, next, receipts); err != nil {
		s.log.Error("Failed to process block", "block", next, "err", err)

		if next.Number == 0 { // cannot rewind genesis
			return nil
		}

189
		// Try to rewind the database to the previous block to remove any logs from this block that were written
190
		if err := s.rewinder.Rewind(s.chain, nextNum-1); err != nil {
191 192
			// If any logs were written, our next attempt to write will fail and we'll retry this rewind.
			// If no logs were written successfully then the rewind wouldn't have done anything anyway.
193
			s.log.Error("Failed to rewind after error processing block", "block", next, "err", err)
194
		}
195
		return err
196
	}
197 198
	s.log.Info("Indexed block events", "block", next, "txs", len(receipts))
	s.onIndexed()
199 200 201
	return nil
}

202
func (s *ChainProcessor) OnNewHead(head eth.BlockRef) error {
203 204 205 206 207 208 209 210 211 212 213 214 215 216
	// update the latest target
	s.lastHead.Store(head.Number)
	// signal that we have something to process
	select {
	case s.newHead <- struct{}{}:
	default:
		// already requested an update
	}
	return nil
}

func (s *ChainProcessor) Close() {
	s.cancel()
	s.wg.Wait()
217
}