plasma_data_source.go 3.66 KB
Newer Older
1 2 3 4
package derive

import (
	"context"
5
	"errors"
6 7
	"fmt"

8
	plasma "github.com/ethereum-optimism/optimism/op-plasma"
9 10 11 12 13 14 15 16 17 18
	"github.com/ethereum-optimism/optimism/op-service/eth"
	"github.com/ethereum/go-ethereum/log"
)

// PlasmaDataSource is a data source that fetches inputs from a plasma DA provider given
// their onchain commitments. Same as CalldataSource it will keep attempting to fetch.
type PlasmaDataSource struct {
	log     log.Logger
	src     DataIter
	fetcher PlasmaInputFetcher
19
	l1      L1Fetcher
20 21
	id      eth.BlockID
	// keep track of a pending commitment so we can keep trying to fetch the input.
22
	comm plasma.Keccak256Commitment
23 24
}

25
func NewPlasmaDataSource(log log.Logger, src DataIter, l1 L1Fetcher, fetcher PlasmaInputFetcher, id eth.BlockID) *PlasmaDataSource {
26 27 28 29
	return &PlasmaDataSource{
		log:     log,
		src:     src,
		fetcher: fetcher,
30
		l1:      l1,
31 32 33 34 35
		id:      id,
	}
}

func (s *PlasmaDataSource) Next(ctx context.Context) (eth.Data, error) {
36 37 38 39 40 41 42 43 44 45 46
	// Process origin syncs the challenge contract events and updates the local challenge states
	// before we can proceed to fetch the input data. This function can be called multiple times
	// for the same origin and noop if the origin was already processed. It is also called if
	// there is not commitment in the current origin.
	if err := s.fetcher.AdvanceL1Origin(ctx, s.l1, s.id); err != nil {
		if errors.Is(err, plasma.ErrReorgRequired) {
			return nil, NewResetError(fmt.Errorf("new expired challenge"))
		}
		return nil, NewTemporaryError(fmt.Errorf("failed to advance plasma L1 origin: %w", err))
	}

47 48
	if s.comm == nil {
		// the l1 source returns the input commitment for the batch.
49
		data, err := s.src.Next(ctx)
50 51 52
		if err != nil {
			return nil, err
		}
53

54 55 56 57 58 59 60 61 62
		if len(data) == 0 {
			return nil, NotEnoughData
		}
		// If the tx data type is not plasma, we forward it downstream to let the next
		// steps validate and potentially parse it as L1 DA inputs.
		if data[0] != plasma.TxDataVersion1 {
			return data, nil
		}

63
		// validate batcher inbox data is a commitment.
64
		comm, err := plasma.DecodeKeccak256(data[1:])
65 66 67 68 69
		if err != nil {
			s.log.Warn("invalid commitment", "commitment", data, "err", err)
			return s.Next(ctx)
		}
		s.comm = comm
70 71
	}
	// use the commitment to fetch the input from the plasma DA provider.
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
	data, err := s.fetcher.GetInput(ctx, s.l1, s.comm, s.id)
	// GetInput may call for a reorg if the pipeline is stalled and the plasma DA manager
	// continued syncing origins detached from the pipeline origin.
	if errors.Is(err, plasma.ErrReorgRequired) {
		// challenge for a new previously derived commitment expired.
		return nil, NewResetError(err)
	} else if errors.Is(err, plasma.ErrExpiredChallenge) {
		// this commitment was challenged and the challenge expired.
		s.log.Warn("challenge expired, skipping batch", "comm", s.comm)
		s.comm = nil
		// skip the input
		return s.Next(ctx)
	} else if errors.Is(err, plasma.ErrMissingPastWindow) {
		return nil, NewCriticalError(fmt.Errorf("data for comm %x not available: %w", s.comm, err))
	} else if errors.Is(err, plasma.ErrPendingChallenge) {
		// continue stepping without slowing down.
		return nil, NotEnoughData
	} else if err != nil {
90 91 92
		// return temporary error so we can keep retrying.
		return nil, NewTemporaryError(fmt.Errorf("failed to fetch input data with comm %x from da service: %w", s.comm, err))
	}
93 94 95 96 97 98
	// inputs are limited to a max size to ensure they can be challenged in the DA contract.
	if len(data) > plasma.MaxInputSize {
		s.log.Warn("input data exceeds max size", "size", len(data), "max", plasma.MaxInputSize)
		s.comm = nil
		return s.Next(ctx)
	}
99 100
	// reset the commitment so we can fetch the next one from the source at the next iteration.
	s.comm = nil
101
	return data, nil
102
}