chain.go 2.61 KB
Newer Older
1 2 3 4 5 6 7
package source

import (
	"context"
	"fmt"
	"time"

8 9
	"github.com/ethereum/go-ethereum/log"

10 11 12
	"github.com/ethereum-optimism/optimism/op-service/client"
	"github.com/ethereum-optimism/optimism/op-service/sources"
	"github.com/ethereum-optimism/optimism/op-service/sources/caching"
13
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
14 15 16
)

// TODO(optimism#11032) Make these configurable and a sensible default
17
const epochPollInterval = 3 * time.Second
18 19 20 21 22
const pollInterval = 2 * time.Second
const trustRpc = false
const rpcKind = sources.RPCKindStandard

type Metrics interface {
23
	caching.Metrics
24 25
}

26
type Storage interface {
27
	ChainsDBClientForLogProcessor
28
	DatabaseRewinder
29
	LatestBlockNum(chainID types.ChainID) (num uint64, ok bool)
30 31
}

32 33 34
// ChainMonitor monitors a source L2 chain, retrieving the data required to populate the database and perform
// interop consolidation. It detects and notifies when reorgs occur.
type ChainMonitor struct {
35 36 37
	log            log.Logger
	headMonitor    *HeadMonitor
	chainProcessor *ChainProcessor
38 39
}

40
func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID types.ChainID, rpc string, client client.RPC, store Storage) (*ChainMonitor, error) {
41
	logger = logger.New("chainID", chainID)
42
	cl, err := newClient(ctx, logger, m, rpc, client, pollInterval, trustRpc, rpcKind)
43 44 45
	if err != nil {
		return nil, err
	}
46

47 48 49
	// Create the log processor and fetcher
	processLogs := newLogProcessor(chainID, store)
	unsafeBlockProcessor := NewChainProcessor(logger, cl, chainID, processLogs, store)
50

51
	unsafeProcessors := []HeadProcessor{unsafeBlockProcessor}
52

53
	callback := newHeadUpdateProcessor(logger, unsafeProcessors, nil, nil)
54 55
	headMonitor := NewHeadMonitor(logger, epochPollInterval, cl, callback)

56
	return &ChainMonitor{
57 58 59
		log:            logger,
		headMonitor:    headMonitor,
		chainProcessor: unsafeBlockProcessor,
60 61 62 63
	}, nil
}

func (c *ChainMonitor) Start() error {
64
	c.log.Info("Started monitoring chain")
65 66 67 68
	return c.headMonitor.Start()
}

func (c *ChainMonitor) Stop() error {
69
	c.chainProcessor.Close()
70 71 72
	return c.headMonitor.Stop()
}

73 74
func newClient(ctx context.Context, logger log.Logger, m caching.Metrics, rpc string, rpcClient client.RPC, pollRate time.Duration, trustRPC bool, kind sources.RPCProviderKind) (*sources.L1Client, error) {
	c, err := client.NewRPCWithClient(ctx, logger, rpc, rpcClient, pollRate)
75 76 77 78 79 80 81 82 83 84
	if err != nil {
		return nil, fmt.Errorf("failed to create new RPC client: %w", err)
	}

	l1Client, err := sources.NewL1Client(c, logger, m, sources.L1ClientSimpleConfig(trustRPC, kind, 100))
	if err != nil {
		return nil, fmt.Errorf("failed to connect client: %w", err)
	}
	return l1Client, nil
}