plasma_data_source.go 3.81 KB
Newer Older
1 2 3 4
package derive

import (
	"context"
5
	"errors"
6 7
	"fmt"

8
	plasma "github.com/ethereum-optimism/optimism/op-plasma"
9 10 11 12 13 14 15 16 17 18
	"github.com/ethereum-optimism/optimism/op-service/eth"
	"github.com/ethereum/go-ethereum/log"
)

// PlasmaDataSource is a data source that fetches inputs from a plasma DA provider given
// their onchain commitments. Same as CalldataSource it will keep attempting to fetch.
type PlasmaDataSource struct {
	log     log.Logger
	src     DataIter
	fetcher PlasmaInputFetcher
19
	l1      L1Fetcher
20
	id      eth.L1BlockRef
21
	// keep track of a pending commitment so we can keep trying to fetch the input.
22
	comm plasma.CommitmentData
23 24
}

25
func NewPlasmaDataSource(log log.Logger, src DataIter, l1 L1Fetcher, fetcher PlasmaInputFetcher, id eth.L1BlockRef) *PlasmaDataSource {
26 27 28 29
	return &PlasmaDataSource{
		log:     log,
		src:     src,
		fetcher: fetcher,
30
		l1:      l1,
31 32 33 34 35
		id:      id,
	}
}

func (s *PlasmaDataSource) Next(ctx context.Context) (eth.Data, error) {
36 37 38 39
	// Process origin syncs the challenge contract events and updates the local challenge states
	// before we can proceed to fetch the input data. This function can be called multiple times
	// for the same origin and noop if the origin was already processed. It is also called if
	// there is not commitment in the current origin.
40
	if err := s.fetcher.AdvanceL1Origin(ctx, s.l1, s.id.ID()); err != nil {
41
		if errors.Is(err, plasma.ErrReorgRequired) {
42
			return nil, NewResetError(errors.New("new expired challenge"))
43 44 45 46
		}
		return nil, NewTemporaryError(fmt.Errorf("failed to advance plasma L1 origin: %w", err))
	}

47 48
	if s.comm == nil {
		// the l1 source returns the input commitment for the batch.
49
		data, err := s.src.Next(ctx)
50 51 52
		if err != nil {
			return nil, err
		}
53

54 55 56 57 58 59 60 61 62
		if len(data) == 0 {
			return nil, NotEnoughData
		}
		// If the tx data type is not plasma, we forward it downstream to let the next
		// steps validate and potentially parse it as L1 DA inputs.
		if data[0] != plasma.TxDataVersion1 {
			return data, nil
		}

63
		// validate batcher inbox data is a commitment.
64 65
		// strip the transaction data version byte from the data before decoding.
		comm, err := plasma.DecodeCommitmentData(data[1:])
66 67
		if err != nil {
			s.log.Warn("invalid commitment", "commitment", data, "err", err)
68 69
			return nil, NotEnoughData
		}
70
		s.comm = comm
71 72
	}
	// use the commitment to fetch the input from the plasma DA provider.
73 74 75 76 77 78 79 80 81 82 83 84 85
	data, err := s.fetcher.GetInput(ctx, s.l1, s.comm, s.id)
	// GetInput may call for a reorg if the pipeline is stalled and the plasma DA manager
	// continued syncing origins detached from the pipeline origin.
	if errors.Is(err, plasma.ErrReorgRequired) {
		// challenge for a new previously derived commitment expired.
		return nil, NewResetError(err)
	} else if errors.Is(err, plasma.ErrExpiredChallenge) {
		// this commitment was challenged and the challenge expired.
		s.log.Warn("challenge expired, skipping batch", "comm", s.comm)
		s.comm = nil
		// skip the input
		return s.Next(ctx)
	} else if errors.Is(err, plasma.ErrMissingPastWindow) {
86
		return nil, NewCriticalError(fmt.Errorf("data for comm %s not available: %w", s.comm, err))
87 88 89 90
	} else if errors.Is(err, plasma.ErrPendingChallenge) {
		// continue stepping without slowing down.
		return nil, NotEnoughData
	} else if err != nil {
91
		// return temporary error so we can keep retrying.
92
		return nil, NewTemporaryError(fmt.Errorf("failed to fetch input data with comm %s from da service: %w", s.comm, err))
93
	}
94
	// inputs are limited to a max size to ensure they can be challenged in the DA contract.
95
	if s.comm.CommitmentType() == plasma.Keccak256CommitmentType && len(data) > plasma.MaxInputSize {
96 97 98 99
		s.log.Warn("input data exceeds max size", "size", len(data), "max", plasma.MaxInputSize)
		s.comm = nil
		return s.Next(ctx)
	}
100 101
	// reset the commitment so we can fetch the next one from the source at the next iteration.
	s.comm = nil
102
	return data, nil
103
}