l1_accessor.go 4.71 KB
Newer Older
1 2 3 4 5
package l1access

import (
	"context"
	"errors"
6
	"fmt"
7 8 9 10 11
	"sync"
	"time"

	"github.com/ethereum/go-ethereum"
	"github.com/ethereum/go-ethereum/log"
12 13 14 15

	"github.com/ethereum-optimism/optimism/op-node/rollup/event"
	"github.com/ethereum-optimism/optimism/op-service/eth"
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/superevents"
16 17
)

18 19
const reqTimeout = time.Second * 10

20 21 22 23 24 25 26 27 28 29 30 31 32
type L1Source interface {
	L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error)
	L1BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L1BlockRef, error)
}

// L1Accessor provides access to the L1 chain.
// it wraps an L1 source in order to pass calls to the L1 chain
// and manages the finality and latest block subscriptions.
// The finality subscription is hooked to a finality handler function provided by the caller.
// and the latest block subscription is used to monitor the tip height of the L1 chain.
// L1Accessor has the concept of confirmation depth, which is used to block access to requests to blocks which are too recent.
// When requests for blocks are more recent than the tip minus the confirmation depth, a NotFound error is returned.
type L1Accessor struct {
33 34
	log log.Logger

35 36 37
	client   L1Source // may be nil if no source is attached
	clientMu sync.RWMutex

38 39 40
	emitter event.Emitter

	finalitySub ethereum.Subscription
41 42 43 44 45 46

	// tipHeight is the height of the L1 chain tip
	// used to block access to requests more recent than the confirmation depth
	tipHeight uint64
	latestSub ethereum.Subscription
	confDepth uint64
47 48 49

	// to interrupt requests, so the system can shut down quickly
	sysCtx context.Context
50 51
}

52 53 54
var _ event.AttachEmitter = (*L1Accessor)(nil)

func NewL1Accessor(sysCtx context.Context, log log.Logger, client L1Source) *L1Accessor {
55
	return &L1Accessor{
56 57
		log:    log.New("service", "l1-processor"),
		client: client,
58 59
		// placeholder confirmation depth
		confDepth: 2,
60
		sysCtx:    sysCtx,
61 62 63
	}
}

64 65 66 67 68 69 70 71 72 73 74 75 76
func (p *L1Accessor) AttachEmitter(em event.Emitter) {
	p.emitter = em
}

func (p *L1Accessor) OnEvent(ev event.Event) bool {
	return false
}

// AttachClient attaches a new client to the processor.
// If an existing client was attached, the old subscriptions are unsubscribed.
// New subscriptions are created if subscribe is true.
// If subscribe is false, L1 status has to be fetched manually with PullFinalized and PullLatest.
func (p *L1Accessor) AttachClient(client L1Source, subscribe bool) {
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
	p.clientMu.Lock()
	defer p.clientMu.Unlock()

	// if we have a finality subscription, unsubscribe from it
	if p.finalitySub != nil {
		p.finalitySub.Unsubscribe()
	}

	// if we have a latest subscription, unsubscribe from it
	if p.latestSub != nil {
		p.latestSub.Unsubscribe()
	}

	p.client = client

92 93
	if client != nil && subscribe {
		p.SubscribeLatestHandler()
94 95 96 97 98 99 100 101
		p.SubscribeFinalityHandler()
	}
}

func (p *L1Accessor) SubscribeFinalityHandler() {
	p.finalitySub = eth.PollBlockChanges(
		p.log,
		p.client,
102
		p.onFinalized,
103 104
		eth.Finalized,
		3*time.Second,
105
		reqTimeout)
106 107 108 109 110 111
}

func (p *L1Accessor) SubscribeLatestHandler() {
	p.latestSub = eth.PollBlockChanges(
		p.log,
		p.client,
112
		p.onLatest,
113 114
		eth.Unsafe,
		3*time.Second,
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
		reqTimeout)
}

func (p *L1Accessor) SetConfDepth(depth uint64) {
	p.confDepth = depth
}

func (p *L1Accessor) PullFinalized() error {
	p.clientMu.RLock()
	defer p.clientMu.RUnlock()
	if p.client == nil {
		return errors.New("no L1 source configured")
	}

	ctx, cancel := context.WithTimeout(p.sysCtx, reqTimeout)
	defer cancel()
	ref, err := p.client.L1BlockRefByLabel(ctx, eth.Finalized)
	if err != nil {
		return fmt.Errorf("failed to pull finalized block ref: %w", err)
	}
	p.onFinalized(p.sysCtx, ref)
	return nil
}

func (p *L1Accessor) PullLatest() error {
	p.clientMu.RLock()
	defer p.clientMu.RUnlock()

	if p.client == nil {
		return errors.New("no L1 source configured")
	}

	ctx, cancel := context.WithTimeout(p.sysCtx, reqTimeout)
	defer cancel()
	ref, err := p.client.L1BlockRefByLabel(ctx, eth.Unsafe)
	if err != nil {
		return fmt.Errorf("failed to pull latest block ref: %w", err)
	}
	p.onLatest(p.sysCtx, ref)
	return nil
}

func (p *L1Accessor) onFinalized(ctx context.Context, ref eth.L1BlockRef) {
	p.emitter.Emit(superevents.FinalizedL1RequestEvent{FinalizedL1: ref})
159 160
}

161
func (p *L1Accessor) onLatest(ctx context.Context, ref eth.L1BlockRef) {
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
	p.tipHeight = ref.Number
}

func (p *L1Accessor) L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error) {
	p.clientMu.RLock()
	defer p.clientMu.RUnlock()
	if p.client == nil {
		return eth.L1BlockRef{}, errors.New("no L1 source available")
	}
	// block access to requests more recent than the confirmation depth
	if number > p.tipHeight-p.confDepth {
		return eth.L1BlockRef{}, ethereum.NotFound
	}
	return p.client.L1BlockRefByNumber(ctx, number)
}