chain.go 2.63 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
package source

import (
	"context"
	"fmt"
	"time"

	"github.com/ethereum-optimism/optimism/op-service/client"
	"github.com/ethereum-optimism/optimism/op-service/eth"
	"github.com/ethereum-optimism/optimism/op-service/sources"
	"github.com/ethereum-optimism/optimism/op-service/sources/caching"
12
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
13 14 15 16 17 18 19 20 21 22
	"github.com/ethereum/go-ethereum/log"
)

// TODO(optimism#11032) Make these configurable and a sensible default
const epochPollInterval = 30 * time.Second
const pollInterval = 2 * time.Second
const trustRpc = false
const rpcKind = sources.RPCKindStandard

type Metrics interface {
23
	caching.Metrics
24 25
}

26
type Storage interface {
27 28
	LogStorage
	DatabaseRewinder
29
	LatestBlockNum(chainID types.ChainID) uint64
30 31
}

32 33 34
// ChainMonitor monitors a source L2 chain, retrieving the data required to populate the database and perform
// interop consolidation. It detects and notifies when reorgs occur.
type ChainMonitor struct {
35
	log         log.Logger
36 37 38
	headMonitor *HeadMonitor
}

39
func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID types.ChainID, rpc string, client client.RPC, store Storage) (*ChainMonitor, error) {
40
	logger = logger.New("chainID", chainID)
41
	cl, err := newClient(ctx, logger, m, rpc, client, pollInterval, trustRpc, rpcKind)
42 43 44
	if err != nil {
		return nil, err
	}
45

46
	startingHead := eth.L1BlockRef{
47
		Number: store.LatestBlockNum(chainID),
48
	}
49

50
	processLogs := newLogProcessor(chainID, store)
51
	fetchReceipts := newLogFetcher(cl, processLogs)
52
	unsafeBlockProcessor := NewChainProcessor(logger, cl, chainID, startingHead, fetchReceipts, store)
53 54 55 56 57

	unsafeProcessors := []HeadProcessor{unsafeBlockProcessor}
	callback := newHeadUpdateProcessor(logger, unsafeProcessors, nil, nil)
	headMonitor := NewHeadMonitor(logger, epochPollInterval, cl, callback)

58
	return &ChainMonitor{
59
		log:         logger,
60 61 62 63 64
		headMonitor: headMonitor,
	}, nil
}

func (c *ChainMonitor) Start() error {
65
	c.log.Info("Started monitoring chain")
66 67 68 69 70 71 72
	return c.headMonitor.Start()
}

func (c *ChainMonitor) Stop() error {
	return c.headMonitor.Stop()
}

73 74
func newClient(ctx context.Context, logger log.Logger, m caching.Metrics, rpc string, rpcClient client.RPC, pollRate time.Duration, trustRPC bool, kind sources.RPCProviderKind) (*sources.L1Client, error) {
	c, err := client.NewRPCWithClient(ctx, logger, rpc, rpcClient, pollRate)
75 76 77 78 79 80 81 82 83 84
	if err != nil {
		return nil, fmt.Errorf("failed to create new RPC client: %w", err)
	}

	l1Client, err := sources.NewL1Client(c, logger, m, sources.L1ClientSimpleConfig(trustRPC, kind, 100))
	if err != nil {
		return nil, fmt.Errorf("failed to connect client: %w", err)
	}
	return l1Client, nil
}