Commit 8abd5df5 authored by Adrian Sutton's avatar Adrian Sutton

Merge branch 'develop' into aj/multicaller

parents 194c739e a31e727a
......@@ -770,9 +770,10 @@ jobs:
key: golang-build-cache-<<parameters.module>>
paths:
- "/root/.cache/go-build"
- run:
name: upload coverage
command: codecov --verbose --clean --flags bedrock-go-tests
# TODO(CLI-148): Fix codecov
#- run:
#name: upload coverage
#command: codecov --verbose --clean --flags bedrock-go-tests
- store_test_results:
path: /tmp/test-results
......@@ -791,6 +792,12 @@ jobs:
docker:
- image: us-docker.pkg.dev/oplabs-tools-artifacts/images/ci-builder:latest
resource_class: xlarge
# Note: Tests are split between runs manually.
# Tests default to run on the first executor but can be moved to the second with:
# InitParallel(t, UseExecutor(1))
# Any tests assigned to an executor greater than the number available automatically use the last executor.
# Executor indexes start from 0
parallelism: 2
steps:
- checkout
- check-changed:
......@@ -1231,6 +1238,28 @@ jobs:
name: check-generated-mocks
command: make generate-mocks-op-service && git diff --exit-code
check-values-match:
parameters:
pattern_file1:
type: string
default: ""
pattern_file2:
type: string
default: ""
file1_path:
type: string
default: ""
file2_path:
type: string
default: ""
docker:
- image: us-docker.pkg.dev/oplabs-tools-artifacts/images/ci-builder:latest
steps:
- checkout
- run:
name: Verify Values Match
command: |
./ops/scripts/ci-match-values-between-files.sh "<< parameters.file1_path >>" "<< parameters.pattern_file1 >>" "<< parameters.file2_path >>" "<< parameters.pattern_file2 >>"
workflows:
main:
when:
......@@ -1477,7 +1506,11 @@ workflows:
- check-generated-mocks-op-service
- cannon-go-lint-and-test
- cannon-build-test-vectors
- check-values-match:
pattern_file1: "uint8 internal constant INITIALIZER ="
pattern_file2: "const initializedValue ="
file1_path: "packages/contracts-bedrock/src/libraries/Constants.sol"
file2_path: "op-chain-ops/genesis/config.go"
release:
when:
not:
......
......@@ -31,7 +31,7 @@ golang-docker:
# We don't use a buildx builder here, and just load directly into regular docker, for convenience.
GIT_COMMIT=$$(git rev-parse HEAD) \
GIT_DATE=$$(git show -s --format='%ct') \
IMAGE_TAGS=$$GIT_COMMIT,latest \
IMAGE_TAGS=$$(git rev-parse HEAD),latest \
docker buildx bake \
--progress plain \
--load \
......@@ -189,4 +189,3 @@ install-geth:
go install -v github.com/ethereum/go-ethereum/cmd/geth@$(shell cat .gethrc); \
echo "Installed geth!"; true)
.PHONY: install-geth
......@@ -3,7 +3,7 @@ codecov:
comment:
layout: "diff, flags, files"
behavior: default
require_changes: false
require_changes: true
flags:
- contracts-bedrock-tests
......
......@@ -8,7 +8,7 @@ require (
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0
github.com/ethereum-optimism/go-ethereum-hdwallet v0.1.3
github.com/ethereum-optimism/superchain-registry/superchain v0.0.0-20231026175037-2cff0d130e74
github.com/ethereum-optimism/superchain-registry/superchain v0.0.0-20231030223232-e16eae11e492
github.com/ethereum/go-ethereum v1.13.1
github.com/fsnotify/fsnotify v1.7.0
github.com/go-chi/chi/v5 v5.0.10
......@@ -209,7 +209,7 @@ require (
rsc.io/tmplfunc v0.0.3 // indirect
)
replace github.com/ethereum/go-ethereum v1.13.1 => github.com/ethereum-optimism/op-geth v1.101303.0-rc.2.0.20231026180835-94fbbd04522e
replace github.com/ethereum/go-ethereum v1.13.1 => github.com/ethereum-optimism/op-geth v1.101304.0-rc.2.0.20231030225546-cd491fa3b588
//replace github.com/ethereum-optimism/superchain-registry/superchain => ../superchain-registry/superchain
//replace github.com/ethereum/go-ethereum v1.13.1 => ../go-ethereum
......@@ -151,10 +151,10 @@ github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/
github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
github.com/ethereum-optimism/go-ethereum-hdwallet v0.1.3 h1:RWHKLhCrQThMfch+QJ1Z8veEq5ZO3DfIhZ7xgRP9WTc=
github.com/ethereum-optimism/go-ethereum-hdwallet v0.1.3/go.mod h1:QziizLAiF0KqyLdNJYD7O5cpDlaFMNZzlxYNcWsJUxs=
github.com/ethereum-optimism/op-geth v1.101303.0-rc.2.0.20231026180835-94fbbd04522e h1:5ucLyIBCwo07ejZOKFY+6QbCqbLgITHWVqkmLoO6604=
github.com/ethereum-optimism/op-geth v1.101303.0-rc.2.0.20231026180835-94fbbd04522e/go.mod h1:m6GrpSyAe1zdFLJlSctgYKSXUdHwj/yfq2WSOc5vs2A=
github.com/ethereum-optimism/superchain-registry/superchain v0.0.0-20231026175037-2cff0d130e74 h1:02gXBD+Cas7xj9rpkke5wD1+vpfYxyF/+31M5tosP9A=
github.com/ethereum-optimism/superchain-registry/superchain v0.0.0-20231026175037-2cff0d130e74/go.mod h1:/70H/KqrtKcvWvNGVj6S3rAcLC+kUPr3t2aDmYIS+Xk=
github.com/ethereum-optimism/op-geth v1.101304.0-rc.2.0.20231030225546-cd491fa3b588 h1:jrvFoV3aSGJcTT8Pyo8R2Sp7CZ0v3hqrdhfSmyZbJVw=
github.com/ethereum-optimism/op-geth v1.101304.0-rc.2.0.20231030225546-cd491fa3b588/go.mod h1:12W+vBetjYbDj5D2+V8hizke5yWuLrUDf7UmVkXTnCQ=
github.com/ethereum-optimism/superchain-registry/superchain v0.0.0-20231030223232-e16eae11e492 h1:FyzLzMLKMc9zcDYcSxbrLDglIRrGQJE9juFzIO35RmE=
github.com/ethereum-optimism/superchain-registry/superchain v0.0.0-20231030223232-e16eae11e492/go.mod h1:/70H/KqrtKcvWvNGVj6S3rAcLC+kUPr3t2aDmYIS+Xk=
github.com/ethereum/c-kzg-4844 v0.3.1 h1:sR65+68+WdnMKxseNWxSJuAv2tsUrihTpVBTfM/U5Zg=
github.com/ethereum/c-kzg-4844 v0.3.1/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0=
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
......
......@@ -67,14 +67,18 @@ func (etl *ETL) Start(ctx context.Context) error {
if len(headers) > 0 {
etl.log.Info("retrying previous batch")
} else {
newHeaders, err := etl.headerTraversal.NextFinalizedHeaders(etl.headerBufferSize)
newHeaders, err := etl.headerTraversal.NextHeaders(etl.headerBufferSize)
if err != nil {
etl.log.Error("error querying for headers", "err", err)
} else if len(newHeaders) == 0 {
etl.log.Warn("no new headers. processor unexpectedly at head...")
etl.log.Warn("no new headers. etl at head?")
} else {
headers = newHeaders
etl.metrics.RecordBatchHeaders(len(newHeaders))
}
latestHeader := etl.headerTraversal.LatestHeader()
if latestHeader != nil {
etl.metrics.RecordLatestHeight(latestHeader.Number)
}
}
......@@ -98,7 +102,6 @@ func (etl *ETL) processBatch(headers []types.Header) error {
batchLog := etl.log.New("batch_start_block_number", firstHeader.Number, "batch_end_block_number", lastHeader.Number)
batchLog.Info("extracting batch", "size", len(headers))
etl.metrics.RecordBatchLatestHeight(lastHeader.Number)
headerMap := make(map[common.Hash]*types.Header, len(headers))
for i := range headers {
header := headers[i]
......@@ -128,6 +131,7 @@ func (etl *ETL) processBatch(headers []types.Header) error {
for i := range logs.Logs {
log := logs.Logs[i]
headersWithLog[log.BlockHash] = true
if _, ok := headerMap[log.BlockHash]; !ok {
// NOTE. Definitely an error state if the none of the headers were re-orged out in between
// the blocks and logs retrieval operations. Unlikely as long as the confirmation depth has
......@@ -135,9 +139,6 @@ func (etl *ETL) processBatch(headers []types.Header) error {
batchLog.Error("log found with block hash not in the batch", "block_hash", logs.Logs[i].BlockHash, "log_index", logs.Logs[i].Index)
return errors.New("parsed log with a block hash not in the batch")
}
etl.metrics.RecordBatchLog(log.Address)
headersWithLog[log.BlockHash] = true
}
// ensure we use unique downstream references for the etl batch
......
......@@ -108,6 +108,7 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error {
l1BlockHeaders = append(l1BlockHeaders, database.L1BlockHeader{BlockHeader: database.BlockHeaderFromHeader(&batch.Headers[i])})
}
}
if len(l1BlockHeaders) == 0 {
batch.Logger.Info("no l1 blocks with logs in batch")
continue
......@@ -117,6 +118,7 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error {
for i := range batch.Logs {
timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time
l1ContractEvents[i] = database.L1ContractEvent{ContractEvent: database.ContractEventFromLog(&batch.Logs[i], timestamp)}
l1Etl.ETL.metrics.RecordIndexedLog(batch.Logs[i].Address)
}
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out
......@@ -138,7 +140,6 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error {
l1Etl.ETL.metrics.RecordIndexedHeaders(len(l1BlockHeaders))
l1Etl.ETL.metrics.RecordIndexedLatestHeight(l1BlockHeaders[len(l1BlockHeaders)-1].Number)
l1Etl.ETL.metrics.RecordIndexedLogs(len(l1ContractEvents))
// a-ok!
return nil, nil
......
......@@ -62,7 +62,7 @@ func TestL1ETLConstruction(t *testing.T) {
},
assertion: func(etl *L1ETL, err error) {
require.NoError(t, err)
require.Equal(t, etl.headerTraversal.LastHeader().ParentHash, common.HexToHash("0x69"))
require.Equal(t, etl.headerTraversal.LastTraversedHeader().ParentHash, common.HexToHash("0x69"))
},
},
{
......@@ -94,7 +94,7 @@ func TestL1ETLConstruction(t *testing.T) {
},
assertion: func(etl *L1ETL, err error) {
require.NoError(t, err)
header := etl.headerTraversal.LastHeader()
header := etl.headerTraversal.LastTraversedHeader()
require.True(t, header.Number.Cmp(big.NewInt(69)) == 0)
},
......
......@@ -93,6 +93,7 @@ func (l2Etl *L2ETL) Start(ctx context.Context) error {
for i := range batch.Logs {
timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time
l2ContractEvents[i] = database.L2ContractEvent{ContractEvent: database.ContractEventFromLog(&batch.Logs[i], timestamp)}
l2Etl.ETL.metrics.RecordIndexedLog(batch.Logs[i].Address)
}
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out
......@@ -115,9 +116,6 @@ func (l2Etl *L2ETL) Start(ctx context.Context) error {
l2Etl.ETL.metrics.RecordIndexedHeaders(len(l2BlockHeaders))
l2Etl.ETL.metrics.RecordIndexedLatestHeight(l2BlockHeaders[len(l2BlockHeaders)-1].Number)
if len(l2ContractEvents) > 0 {
l2Etl.ETL.metrics.RecordIndexedLogs(len(l2ContractEvents))
}
// a-ok!
return nil, nil
......
......@@ -9,35 +9,28 @@ import (
)
var (
MetricsNamespace string = "etl"
MetricsNamespace string = "op_indexer_etl"
)
type Metricer interface {
RecordInterval() (done func(err error))
// Batch Extraction
RecordBatchLatestHeight(height *big.Int)
RecordBatchHeaders(size int)
RecordBatchLog(contractAddress common.Address)
RecordLatestHeight(height *big.Int)
// Indexed Batches
RecordIndexedLatestHeight(height *big.Int)
RecordIndexedHeaders(size int)
RecordIndexedLogs(size int)
RecordIndexedLog(contractAddress common.Address)
}
type etlMetrics struct {
intervalTick prometheus.Counter
intervalDuration prometheus.Histogram
batchFailures prometheus.Counter
batchLatestHeight prometheus.Gauge
batchHeaders prometheus.Counter
batchLogs *prometheus.CounterVec
intervalFailures prometheus.Counter
latestHeight prometheus.Gauge
indexedLatestHeight prometheus.Gauge
indexedHeaders prometheus.Counter
indexedLogs prometheus.Counter
indexedLogs *prometheus.CounterVec
}
func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
......@@ -55,31 +48,17 @@ func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
Name: "interval_seconds",
Help: "duration elapsed for during the processing loop",
}),
batchFailures: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "failures_total",
Help: "number of times the etl encountered a failure to extract a batch",
}),
batchLatestHeight: factory.NewGauge(prometheus.GaugeOpts{
intervalFailures: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "height",
Help: "the latest block height observed by an etl interval",
Name: "interval_failures_total",
Help: "number of times the etl encountered a failure during the processing loop",
}),
batchHeaders: factory.NewCounter(prometheus.CounterOpts{
latestHeight: factory.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "headers_total",
Help: "number of headers observed by the etl",
}),
batchLogs: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "logs_total",
Help: "number of logs observed by the etl",
}, []string{
"contract",
Name: "latest_height",
Help: "the latest height reported by the connected client",
}),
indexedLatestHeight: factory.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
......@@ -93,11 +72,13 @@ func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
Name: "indexed_headers_total",
Help: "number of headers indexed by the etl",
}),
indexedLogs: factory.NewCounter(prometheus.CounterOpts{
indexedLogs: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "indexed_logs_total",
Help: "number of logs indexed by the etl",
}, []string{
"contract",
}),
}
}
......@@ -107,23 +88,14 @@ func (m *etlMetrics) RecordInterval() func(error) {
timer := prometheus.NewTimer(m.intervalDuration)
return func(err error) {
if err != nil {
m.batchFailures.Inc()
m.intervalFailures.Inc()
}
timer.ObserveDuration()
}
}
func (m *etlMetrics) RecordBatchLatestHeight(height *big.Int) {
m.batchLatestHeight.Set(float64(height.Uint64()))
}
func (m *etlMetrics) RecordBatchHeaders(size int) {
m.batchHeaders.Add(float64(size))
}
func (m *etlMetrics) RecordBatchLog(contractAddress common.Address) {
m.batchLogs.WithLabelValues(contractAddress.String()).Inc()
func (m *etlMetrics) RecordLatestHeight(height *big.Int) {
m.latestHeight.Set(float64(height.Uint64()))
}
func (m *etlMetrics) RecordIndexedLatestHeight(height *big.Int) {
......@@ -134,6 +106,6 @@ func (m *etlMetrics) RecordIndexedHeaders(size int) {
m.indexedHeaders.Add(float64(size))
}
func (m *etlMetrics) RecordIndexedLogs(size int) {
m.indexedLogs.Add(float64(size))
func (m *etlMetrics) RecordIndexedLog(addr common.Address) {
m.indexedLogs.WithLabelValues(addr.String()).Inc()
}
......@@ -17,38 +17,56 @@ var (
type HeaderTraversal struct {
ethClient EthClient
lastHeader *types.Header
latestHeader *types.Header
lastTraversedHeader *types.Header
blockConfirmationDepth *big.Int
}
// NewHeaderTraversal instantiates a new instance of HeaderTraversal against the supplied rpc client.
// The HeaderTraversal will start fetching blocks starting from the supplied header unless nil, indicating genesis.
func NewHeaderTraversal(ethClient EthClient, fromHeader *types.Header, confDepth *big.Int) *HeaderTraversal {
return &HeaderTraversal{ethClient: ethClient, lastHeader: fromHeader, blockConfirmationDepth: confDepth}
return &HeaderTraversal{
ethClient: ethClient,
lastTraversedHeader: fromHeader,
blockConfirmationDepth: confDepth,
}
}
// LatestHeader returns the latest header reported by underlying eth client
// as headers are traversed via `NextHeaders`.
func (f *HeaderTraversal) LatestHeader() *types.Header {
return f.latestHeader
}
// LastHeader returns the last header that was fetched by the HeaderTraversal
// This is useful for testing the state of the HeaderTraversal
func (f *HeaderTraversal) LastHeader() *types.Header {
return f.lastHeader
// LastTraversedHeader returns the last header traversed.
// - This is useful for testing the state of the HeaderTraversal
// - LastTraversedHeader may be << LatestHeader depending on the number
// headers traversed via `NextHeaders`.
func (f *HeaderTraversal) LastTraversedHeader() *types.Header {
return f.lastTraversedHeader
}
// NextFinalizedHeaders retrieves the next set of headers that have been
// NextHeaders retrieves the next set of headers that have been
// marked as finalized by the connected client, bounded by the supplied size
func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, error) {
latestBlockHeader, err := f.ethClient.BlockHeaderByNumber(nil)
func (f *HeaderTraversal) NextHeaders(maxSize uint64) ([]types.Header, error) {
latestHeader, err := f.ethClient.BlockHeaderByNumber(nil)
if err != nil {
return nil, fmt.Errorf("unable to query latest block: %w", err)
} else if latestHeader == nil {
return nil, fmt.Errorf("latest header unreported")
} else {
f.latestHeader = latestHeader
}
endHeight := new(big.Int).Sub(latestBlockHeader.Number, f.blockConfirmationDepth)
endHeight := new(big.Int).Sub(latestHeader.Number, f.blockConfirmationDepth)
if endHeight.Sign() < 0 {
// No blocks with the provided confirmation depth available
return nil, nil
}
if f.lastHeader != nil {
cmp := f.lastHeader.Number.Cmp(endHeight)
if f.lastTraversedHeader != nil {
cmp := f.lastTraversedHeader.Number.Cmp(endHeight)
if cmp == 0 { // We're synced to head and there are no new headers
return nil, nil
} else if cmp > 0 {
......@@ -57,8 +75,8 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header,
}
nextHeight := bigint.Zero
if f.lastHeader != nil {
nextHeight = new(big.Int).Add(f.lastHeader.Number, bigint.One)
if f.lastTraversedHeader != nil {
nextHeight = new(big.Int).Add(f.lastTraversedHeader.Number, bigint.One)
}
// endHeight = (nextHeight - endHeight) <= maxSize
......@@ -71,12 +89,12 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header,
numHeaders := len(headers)
if numHeaders == 0 {
return nil, nil
} else if f.lastHeader != nil && headers[0].ParentHash != f.lastHeader.Hash() {
} else if f.lastTraversedHeader != nil && headers[0].ParentHash != f.lastTraversedHeader.Hash() {
// The indexer's state is in an irrecoverable state relative to the provider. This
// should never happen since the indexer is dealing with only finalized blocks.
return nil, ErrHeaderTraversalAndProviderMismatchedState
}
f.lastHeader = &headers[numHeaders-1]
f.lastTraversedHeader = &headers[numHeaders-1]
return headers, nil
}
......@@ -33,44 +33,55 @@ func makeHeaders(numHeaders uint64, prevHeader *types.Header) []types.Header {
return headers
}
func TestHeaderTraversalNextFinalizedHeadersNoOp(t *testing.T) {
func TestHeaderTraversalNextHeadersNoOp(t *testing.T) {
client := new(MockEthClient)
// start from block 10 as the latest fetched block
lastHeader := &types.Header{Number: big.NewInt(10)}
headerTraversal := NewHeaderTraversal(client, lastHeader, bigint.Zero)
LastTraversedHeader := &types.Header{Number: big.NewInt(10)}
headerTraversal := NewHeaderTraversal(client, LastTraversedHeader, bigint.Zero)
require.Nil(t, headerTraversal.LatestHeader())
require.NotNil(t, headerTraversal.LastTraversedHeader())
// no new headers when matched with head
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(lastHeader, nil)
headers, err := headerTraversal.NextFinalizedHeaders(100)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(LastTraversedHeader, nil)
headers, err := headerTraversal.NextHeaders(100)
require.NoError(t, err)
require.Empty(t, headers)
require.NotNil(t, headerTraversal.LatestHeader())
require.NotNil(t, headerTraversal.LastTraversedHeader())
require.Equal(t, LastTraversedHeader.Number.Uint64(), headerTraversal.LatestHeader().Number.Uint64())
}
func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) {
func TestHeaderTraversalNextHeadersCursored(t *testing.T) {
client := new(MockEthClient)
// start from genesis
headerTraversal := NewHeaderTraversal(client, nil, bigint.Zero)
// blocks [0..4]
headers := makeHeaders(5, nil)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5)
headers := makeHeaders(10, nil)
// blocks [0..4]. Latest reported is 7
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[7], nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers[:5], nil)
_, err := headerTraversal.NextHeaders(5)
require.NoError(t, err)
require.Len(t, headers, 5)
// blocks [5..9]
headers = makeHeaders(5, &headers[len(headers)-1])
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(9))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(5)
require.Equal(t, uint64(7), headerTraversal.LatestHeader().Number.Uint64())
require.Equal(t, uint64(4), headerTraversal.LastTraversedHeader().Number.Uint64())
// blocks [5..9]. Latest Reported is 9
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[9], nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(9))).Return(headers[5:], nil)
_, err = headerTraversal.NextHeaders(5)
require.NoError(t, err)
require.Len(t, headers, 5)
require.Equal(t, uint64(9), headerTraversal.LatestHeader().Number.Uint64())
require.Equal(t, uint64(9), headerTraversal.LastTraversedHeader().Number.Uint64())
}
func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) {
func TestHeaderTraversalNextHeadersMaxSize(t *testing.T) {
client := new(MockEthClient)
// start from genesis
......@@ -82,16 +93,22 @@ func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) {
// clamped by the supplied size
headers := makeHeaders(5, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5)
headers, err := headerTraversal.NextHeaders(5)
require.NoError(t, err)
require.Len(t, headers, 5)
require.Equal(t, uint64(100), headerTraversal.LatestHeader().Number.Uint64())
require.Equal(t, uint64(4), headerTraversal.LastTraversedHeader().Number.Uint64())
// clamped by the supplied size. FinalizedHeight == 100
headers = makeHeaders(10, &headers[len(headers)-1])
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(14))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(10)
headers, err = headerTraversal.NextHeaders(10)
require.NoError(t, err)
require.Len(t, headers, 10)
require.Equal(t, uint64(100), headerTraversal.LatestHeader().Number.Uint64())
require.Equal(t, uint64(14), headerTraversal.LastTraversedHeader().Number.Uint64())
}
func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
......@@ -104,7 +121,7 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
headers := makeHeaders(5, nil)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5)
headers, err := headerTraversal.NextHeaders(5)
require.NoError(t, err)
require.Len(t, headers, 5)
......@@ -112,7 +129,7 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
headers = makeHeaders(5, nil)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&types.Header{Number: big.NewInt(9)}, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(9))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(5)
headers, err = headerTraversal.NextHeaders(5)
require.Nil(t, headers)
require.Equal(t, ErrHeaderTraversalAndProviderMismatchedState, err)
}
......@@ -12,7 +12,7 @@ import (
)
var (
MetricsNamespace = "rpc"
MetricsNamespace = "op_indexer_rpc"
batchMethod = "<batch>"
)
......
......@@ -231,6 +231,9 @@ func (b *BridgeProcessor) run() error {
batchLog.Info("indexed bridge events", "latest_l1_block_number", toL1Height, "latest_l2_block_number", toL2Height)
b.LatestL1Header = latestEpoch.L1BlockHeader.RLPHeader.Header()
b.metrics.RecordLatestIndexedL1Height(b.LatestL1Header.Number)
b.LatestL2Header = latestEpoch.L2BlockHeader.RLPHeader.Header()
b.metrics.RecordLatestIndexedL2Height(b.LatestL2Header.Number)
return nil
}
......@@ -10,7 +10,7 @@ import (
)
var (
MetricsNamespace string = "bridge"
MetricsNamespace string = "op_indexer_bridge"
)
type L1Metricer interface {
......@@ -83,7 +83,7 @@ func NewMetrics(registry *prometheus.Registry) Metricer {
}),
intervalFailures: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "failures_total",
Name: "interval_failures_total",
Help: "number of failures encountered",
}),
latestL1Height: factory.NewGauge(prometheus.GaugeOpts{
......
......@@ -5,6 +5,7 @@ import (
"math"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/core/types"
......@@ -26,8 +27,8 @@ type channel struct {
confirmedTransactions map[txID]eth.BlockID
}
func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) (*channel, error) {
cb, err := newChannelBuilder(cfg)
func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rcfg *rollup.Config) (*channel, error) {
cb, err := newChannelBuilder(cfg, rcfg)
if err != nil {
return nil, fmt.Errorf("creating new channel: %w", err)
}
......
......@@ -8,6 +8,7 @@ import (
"math"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/core/types"
)
......@@ -58,6 +59,9 @@ type ChannelConfig struct {
// CompressorConfig contains the configuration for creating new compressors.
CompressorConfig compressor.Config
// BatchType indicates whether the channel uses SingularBatch or SpanBatch.
BatchType uint
}
// Check validates the [ChannelConfig] parameters.
......@@ -83,6 +87,10 @@ func (cc *ChannelConfig) Check() error {
return fmt.Errorf("max frame size %d is less than the minimum 23", cc.MaxFrameSize)
}
if cc.BatchType > derive.SpanBatchType {
return fmt.Errorf("unrecognized batch type: %d", cc.BatchType)
}
return nil
}
......@@ -114,7 +122,7 @@ type channelBuilder struct {
// guaranteed to be a ChannelFullError wrapping the specific reason.
fullErr error
// current channel
co *derive.ChannelOut
co derive.ChannelOut
// list of blocks in the channel. Saved in case the channel must be rebuilt
blocks []*types.Block
// frames data queue, to be send as txs
......@@ -127,12 +135,16 @@ type channelBuilder struct {
// newChannelBuilder creates a new channel builder or returns an error if the
// channel out could not be created.
func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) {
func newChannelBuilder(cfg ChannelConfig, rcfg *rollup.Config) (*channelBuilder, error) {
c, err := cfg.CompressorConfig.NewCompressor()
if err != nil {
return nil, err
}
co, err := derive.NewChannelOut(c)
var spanBatchBuilder *derive.SpanBatchBuilder
if cfg.BatchType == derive.SpanBatchType {
spanBatchBuilder = derive.NewSpanBatchBuilder(rcfg.Genesis.L2Time, rcfg.L2ChainID)
}
co, err := derive.NewChannelOut(cfg.BatchType, c, spanBatchBuilder)
if err != nil {
return nil, err
}
......@@ -194,12 +206,12 @@ func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error
return derive.L1BlockInfo{}, c.FullErr()
}
batch, l1info, err := derive.BlockToBatch(block)
batch, l1info, err := derive.BlockToSingularBatch(block)
if err != nil {
return l1info, fmt.Errorf("converting block to batch: %w", err)
}
if _, err = c.co.AddBatch(batch); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.CompressorFullErr) {
if _, err = c.co.AddSingularBatch(batch, l1info.SequenceNumber); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.CompressorFullErr) {
c.setFullErr(err)
return l1info, c.FullErr()
} else if err != nil {
......@@ -252,7 +264,7 @@ func (c *channelBuilder) updateDurationTimeout(l1BlockNum uint64) {
// derived from the batch's origin L1 block. The timeout is only moved forward
// if the derived sequencer window timeout is earlier than the currently set
// timeout.
func (c *channelBuilder) updateSwTimeout(batch *derive.BatchData) {
func (c *channelBuilder) updateSwTimeout(batch *derive.SingularBatch) {
timeout := uint64(batch.EpochNum) + c.cfg.SeqWindowSize - c.cfg.SubSafetyMargin
c.updateTimeout(timeout, ErrSeqWindowClose)
}
......
This diff is collapsed.
......@@ -7,6 +7,7 @@ import (
"sync"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
......@@ -28,6 +29,7 @@ type channelManager struct {
log log.Logger
metr metrics.Metricer
cfg ChannelConfig
rcfg *rollup.Config
// All blocks since the last request for new tx data.
blocks []*types.Block
......@@ -45,17 +47,18 @@ type channelManager struct {
closed bool
}
func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) *channelManager {
func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rcfg *rollup.Config) *channelManager {
return &channelManager{
log: log,
metr: metr,
cfg: cfg,
rcfg: rcfg,
txChannels: make(map[txID]*channel),
}
}
// Clear clears the entire state of the channel manager.
// It is intended to be used after an L2 reorg.
// It is intended to be used before launching op-batcher and after an L2 reorg.
func (s *channelManager) Clear() {
s.mu.Lock()
defer s.mu.Unlock()
......@@ -195,7 +198,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
return nil
}
pc, err := newChannel(s.log, s.metr, s.cfg)
pc, err := newChannel(s.log, s.metr, s.cfg, s.rcfg)
if err != nil {
return fmt.Errorf("creating new channel: %w", err)
}
......
......@@ -9,6 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
derivetest "github.com/ethereum-optimism/optimism/op-node/rollup/derive/test"
"github.com/ethereum-optimism/optimism/op-service/eth"
......@@ -19,11 +20,41 @@ import (
"github.com/stretchr/testify/require"
)
// TestChannelManagerReturnsErrReorg ensures that the channel manager
func TestChannelManagerBatchType(t *testing.T) {
tests := []struct {
name string
f func(t *testing.T, batchType uint)
}{
{"ChannelManagerReturnsErrReorg", ChannelManagerReturnsErrReorg},
{"ChannelManagerReturnsErrReorgWhenDrained", ChannelManagerReturnsErrReorgWhenDrained},
{"ChannelManager_Clear", ChannelManager_Clear},
{"ChannelManager_TxResend", ChannelManager_TxResend},
{"ChannelManagerCloseBeforeFirstUse", ChannelManagerCloseBeforeFirstUse},
{"ChannelManagerCloseNoPendingChannel", ChannelManagerCloseNoPendingChannel},
{"ChannelManagerClosePendingChannel", ChannelManagerClosePendingChannel},
{"ChannelManagerCloseAllTxsFailed", ChannelManagerCloseAllTxsFailed},
}
for _, test := range tests {
test := test
t.Run(test.name+"_SingularBatch", func(t *testing.T) {
test.f(t, derive.SingularBatchType)
})
}
for _, test := range tests {
test := test
t.Run(test.name+"_SpanBatch", func(t *testing.T) {
test.f(t, derive.SpanBatchType)
})
}
}
// ChannelManagerReturnsErrReorg ensures that the channel manager
// detects a reorg when it has cached L1 blocks.
func TestChannelManagerReturnsErrReorg(t *testing.T) {
func ChannelManagerReturnsErrReorg(t *testing.T, batchType uint) {
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{})
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{BatchType: batchType}, &rollup.Config{})
m.Clear()
a := types.NewBlock(&types.Header{
Number: big.NewInt(0),
......@@ -49,9 +80,9 @@ func TestChannelManagerReturnsErrReorg(t *testing.T) {
require.Equal(t, []*types.Block{a, b, c}, m.blocks)
}
// TestChannelManagerReturnsErrReorgWhenDrained ensures that the channel manager
// ChannelManagerReturnsErrReorgWhenDrained ensures that the channel manager
// detects a reorg even if it does not have any blocks inside it.
func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
func ChannelManagerReturnsErrReorgWhenDrained(t *testing.T, batchType uint) {
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
......@@ -61,7 +92,11 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&rollup.Config{},
)
m.Clear()
a := newMiniL2Block(0)
x := newMiniL2BlockWithNumberParent(0, big.NewInt(1), common.Hash{0xff})
......@@ -76,8 +111,8 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
require.ErrorIs(t, m.AddL2Block(x), ErrReorg)
}
// TestChannelManager_Clear tests clearing the channel manager.
func TestChannelManager_Clear(t *testing.T) {
// ChannelManager_Clear tests clearing the channel manager.
func ChannelManager_Clear(t *testing.T, batchType uint) {
require := require.New(t)
// Create a channel manager
......@@ -96,7 +131,10 @@ func TestChannelManager_Clear(t *testing.T) {
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&defaultTestRollupConfig,
)
// Channel Manager state should be empty by default
require.Empty(m.blocks)
......@@ -104,9 +142,11 @@ func TestChannelManager_Clear(t *testing.T) {
require.Nil(m.currentChannel)
require.Empty(m.channelQueue)
require.Empty(m.txChannels)
// Set the last block
m.Clear()
// Add a block to the channel manager
a, _ := derivetest.RandomL2Block(rng, 4)
a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID)
newL1Tip := a.Hash()
l1BlockID := eth.BlockID{
Hash: a.Hash(),
......@@ -153,7 +193,7 @@ func TestChannelManager_Clear(t *testing.T) {
require.Empty(m.txChannels)
}
func TestChannelManager_TxResend(t *testing.T) {
func ChannelManager_TxResend(t *testing.T, batchType uint) {
require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LvlError)
......@@ -165,9 +205,13 @@ func TestChannelManager_TxResend(t *testing.T) {
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&defaultTestRollupConfig,
)
m.Clear()
a, _ := derivetest.RandomL2Block(rng, 4)
a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID)
require.NoError(m.AddL2Block(a))
......@@ -195,9 +239,9 @@ func TestChannelManager_TxResend(t *testing.T) {
require.Len(fs, 1)
}
// TestChannelManagerCloseBeforeFirstUse ensures that the channel manager
// ChannelManagerCloseBeforeFirstUse ensures that the channel manager
// will not produce any frames if closed immediately.
func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
func ChannelManagerCloseBeforeFirstUse(t *testing.T, batchType uint) {
require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LvlCrit)
......@@ -209,9 +253,13 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
TargetFrameSize: 0,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&defaultTestRollupConfig,
)
m.Clear()
a, _ := derivetest.RandomL2Block(rng, 4)
a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID)
m.Close()
......@@ -222,10 +270,10 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
require.ErrorIs(err, io.EOF, "Expected closed channel manager to contain no tx data")
}
// TestChannelManagerCloseNoPendingChannel ensures that the channel manager
// ChannelManagerCloseNoPendingChannel ensures that the channel manager
// can gracefully close with no pending channels, and will not emit any new
// channel frames.
func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
func ChannelManagerCloseNoPendingChannel(t *testing.T, batchType uint) {
require := require.New(t)
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
......@@ -237,7 +285,11 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&defaultTestRollupConfig,
)
m.Clear()
a := newMiniL2Block(0)
b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash())
......@@ -261,25 +313,37 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
require.ErrorIs(err, io.EOF, "Expected closed channel manager to return no new tx data")
}
// TestChannelManagerCloseNoPendingChannel ensures that the channel manager
// ChannelManagerCloseNoPendingChannel ensures that the channel manager
// can gracefully close with a pending channel, and will not produce any
// new channel frames after this point.
func TestChannelManagerClosePendingChannel(t *testing.T) {
func ChannelManagerClosePendingChannel(t *testing.T, batchType uint) {
require := require.New(t)
// The number of batch txs depends on compression of the random data, hence the static test RNG seed.
// Example of different RNG seed that creates less than 2 frames: 1698700588902821588
rng := rand.New(rand.NewSource(123))
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
MaxFrameSize: 1000,
MaxFrameSize: 10000,
ChannelTimeout: 1000,
CompressorConfig: compressor.Config{
TargetNumFrames: 100,
TargetFrameSize: 1000,
TargetNumFrames: 1,
TargetFrameSize: 10000,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&defaultTestRollupConfig,
)
m.Clear()
a := newMiniL2Block(50_000)
b := newMiniL2BlockWithNumberParent(10, big.NewInt(1), a.Hash())
numTx := 20 // Adjust number of txs to make 2 frames
a := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
b := derivetest.RandomL2BlockWithChainId(rng, 10, defaultTestRollupConfig.L2ChainID)
bHeader := b.Header()
bHeader.Number = new(big.Int).Add(a.Number(), big.NewInt(1))
bHeader.ParentHash = a.Hash()
b = b.WithSeal(bHeader)
err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
......@@ -306,11 +370,12 @@ func TestChannelManagerClosePendingChannel(t *testing.T) {
require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data")
}
// TestChannelManagerCloseAllTxsFailed ensures that the channel manager
// ChannelManagerCloseAllTxsFailed ensures that the channel manager
// can gracefully close after producing transaction frames if none of these
// have successfully landed on chain.
func TestChannelManagerCloseAllTxsFailed(t *testing.T) {
func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) {
require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
......@@ -321,9 +386,12 @@ func TestChannelManagerCloseAllTxsFailed(t *testing.T) {
TargetFrameSize: 1000,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
}, &defaultTestRollupConfig,
)
m.Clear()
a := newMiniL2Block(50_000)
a := derivetest.RandomL2BlockWithChainId(rng, 50000, defaultTestRollupConfig.L2ChainID)
err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
......
......@@ -5,6 +5,7 @@ import (
"testing"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
......@@ -20,7 +21,8 @@ func TestChannelTimeout(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{
ChannelTimeout: 100,
})
}, &rollup.Config{})
m.Clear()
// Pending channel is nil so is cannot be timed out
require.Nil(t, m.currentChannel)
......@@ -61,7 +63,8 @@ func TestChannelTimeout(t *testing.T) {
// TestChannelNextTxData checks the nextTxData function.
func TestChannelNextTxData(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{})
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{})
m.Clear()
// Nil pending channel should return EOF
returnedTxData, err := m.nextTxData(nil)
......@@ -109,7 +112,8 @@ func TestChannelTxConfirmed(t *testing.T) {
// channels on confirmation. This would result in [TxConfirmed]
// clearing confirmed transactions, and reseting the pendingChannels map
ChannelTimeout: 10,
})
}, &rollup.Config{})
m.Clear()
// Let's add a valid pending transaction to the channel manager
// So we can demonstrate that TxConfirmed's correctness
......@@ -157,7 +161,8 @@ func TestChannelTxConfirmed(t *testing.T) {
func TestChannelTxFailed(t *testing.T) {
// Create a channel manager
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{})
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{})
m.Clear()
// Let's add a valid pending transaction to the channel
// manager so we can demonstrate correctness
......
......@@ -52,6 +52,8 @@ type CLIConfig struct {
Stopped bool
BatchType uint
TxMgrConfig txmgr.CLIConfig
LogConfig oplog.CLIConfig
MetricsConfig opmetrics.CLIConfig
......@@ -93,6 +95,7 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
MaxChannelDuration: ctx.Uint64(flags.MaxChannelDurationFlag.Name),
MaxL1TxSize: ctx.Uint64(flags.MaxL1TxSizeBytesFlag.Name),
Stopped: ctx.Bool(flags.StoppedFlag.Name),
BatchType: ctx.Uint(flags.BatchTypeFlag.Name),
TxMgrConfig: txmgr.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
......
......@@ -74,7 +74,7 @@ type BatchSubmitter struct {
func NewBatchSubmitter(setup DriverSetup) *BatchSubmitter {
return &BatchSubmitter{
DriverSetup: setup,
state: NewChannelManager(setup.Log, setup.Metr, setup.Channel),
state: NewChannelManager(setup.Log, setup.Metr, setup.Channel, setup.RollupCfg),
}
}
......
......@@ -173,6 +173,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
SubSafetyMargin: cfg.SubSafetyMargin,
MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version
CompressorConfig: cfg.CompressorConfig.Config(),
BatchType: cfg.BatchType,
}
if err := bs.Channel.Check(); err != nil {
return fmt.Errorf("invalid channel configuration: %w", err)
......@@ -289,9 +290,11 @@ func (bs *BatcherService) Stop(ctx context.Context) error {
bs.Log.Info("Stopping batcher")
var result error
if bs.driver != nil {
if err := bs.driver.StopBatchSubmittingIfRunning(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop batch submitting: %w", err))
}
}
if bs.rpcServer != nil {
// TODO(7685): the op-service RPC server is not built on top of op-service httputil Server, and has poor shutdown
......@@ -327,7 +330,7 @@ func (bs *BatcherService) Stop(ctx context.Context) error {
if result == nil {
bs.stopped.Store(true)
bs.driver.Log.Info("Batch Submitter stopped")
bs.Log.Info("Batch Submitter stopped")
}
return result
}
......
......@@ -76,6 +76,12 @@ var (
Usage: "Initialize the batcher in a stopped state. The batcher can be started using the admin_startBatcher RPC",
EnvVars: prefixEnvVars("STOPPED"),
}
BatchTypeFlag = &cli.UintFlag{
Name: "batch-type",
Usage: "The batch type. 0 for SingularBatch and 1 for SpanBatch.",
Value: 0,
EnvVars: prefixEnvVars("BATCH_TYPE"),
}
// Legacy Flags
SequencerHDPathFlag = txmgr.SequencerHDPathFlag
)
......@@ -94,6 +100,7 @@ var optionalFlags = []cli.Flag{
MaxL1TxSizeBytesFlag,
StoppedFlag,
SequencerHDPathFlag,
BatchTypeFlag,
}
func init() {
......
......@@ -9,7 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-bindings/solc"
)
const AlphabetVMStorageLayoutJSON = "{\"storage\":[{\"astId\":1000,\"contract\":\"test/FaultDisputeGame.t.sol:AlphabetVM\",\"label\":\"oracle\",\"offset\":0,\"slot\":\"0\",\"type\":\"t_contract(IPreimageOracle)1001\"}],\"types\":{\"t_contract(IPreimageOracle)1001\":{\"encoding\":\"inplace\",\"label\":\"contract IPreimageOracle\",\"numberOfBytes\":\"20\"}}}"
const AlphabetVMStorageLayoutJSON = "{\"storage\":[{\"astId\":1000,\"contract\":\"test/mocks/AlphabetVM.sol:AlphabetVM\",\"label\":\"oracle\",\"offset\":0,\"slot\":\"0\",\"type\":\"t_contract(IPreimageOracle)1001\"}],\"types\":{\"t_contract(IPreimageOracle)1001\":{\"encoding\":\"inplace\",\"label\":\"contract IPreimageOracle\",\"numberOfBytes\":\"20\"}}}"
var AlphabetVMStorageLayout = new(solc.StorageLayout)
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -7,7 +7,6 @@ import (
"time"
"github.com/ethereum-optimism/optimism/op-challenger/config"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
......@@ -17,8 +16,8 @@ import (
var (
l1EthRpc = "http://example.com:8545"
gameFactoryAddressValue = "0xbb00000000000000000000000000000000000000"
cannonNetwork = chaincfg.AvailableNetworks()[0]
otherCannonNetwork = chaincfg.AvailableNetworks()[1]
cannonNetwork = "op-mainnet"
otherCannonNetwork = "op-goerli"
cannonBin = "./bin/cannon"
cannonServer = "./bin/op-program"
cannonPreState = "./pre.json"
......
......@@ -61,8 +61,11 @@ type ChannelOutIface interface {
OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, error)
}
// Compile-time check for ChannelOutIface interface implementation for the ChannelOut type.
var _ ChannelOutIface = (*derive.ChannelOut)(nil)
// Compile-time check for ChannelOutIface interface implementation for the SingularChannelOut type.
var _ ChannelOutIface = (*derive.SingularChannelOut)(nil)
// Compile-time check for ChannelOutIface interface implementation for the SpanChannelOut type.
var _ ChannelOutIface = (*derive.SpanChannelOut)(nil)
// Compile-time check for ChannelOutIface interface implementation for the GarbageChannelOut type.
var _ ChannelOutIface = (*GarbageChannelOut)(nil)
......@@ -252,13 +255,13 @@ func blockToBatch(block *types.Block) (*derive.BatchData, error) {
return nil, fmt.Errorf("could not parse the L1 Info deposit: %w", err)
}
return &derive.BatchData{
SingularBatch: derive.SingularBatch{
singularBatch := &derive.SingularBatch{
ParentHash: block.ParentHash(),
EpochNum: rollup.Epoch(l1Info.Number),
EpochHash: l1Info.BlockHash,
Timestamp: block.Time(),
Transactions: opaqueTxs,
},
}, nil
}
return derive.NewBatchData(singularBatch), nil
}
......@@ -140,7 +140,7 @@ func (s *L2Batcher) Buffer(t Testing) error {
ApproxComprRatio: 1,
})
require.NoError(t, e, "failed to create compressor")
ch, err = derive.NewChannelOut(c)
ch, err = derive.NewChannelOut(derive.SingularBatchType, c, nil)
}
require.NoError(t, err, "failed to create channel")
s.l2ChannelOut = ch
......
......@@ -18,7 +18,7 @@ import (
)
func TestMultipleCannonGames(t *testing.T) {
InitParallel(t, UsesCannon)
InitParallel(t, UsesCannon, UseExecutor(0))
ctx := context.Background()
sys, l1Client := startFaultDisputeSystem(t)
......@@ -78,7 +78,7 @@ func TestMultipleCannonGames(t *testing.T) {
}
func TestMultipleGameTypes(t *testing.T) {
InitParallel(t, UsesCannon)
InitParallel(t, UsesCannon, UseExecutor(0))
ctx := context.Background()
sys, l1Client := startFaultDisputeSystem(t)
......@@ -113,7 +113,7 @@ func TestMultipleGameTypes(t *testing.T) {
}
func TestChallengerCompleteDisputeGame(t *testing.T) {
InitParallel(t)
InitParallel(t, UseExecutor(1))
tests := []struct {
name string
......@@ -182,7 +182,7 @@ func TestChallengerCompleteDisputeGame(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
InitParallel(t)
InitParallel(t, UseExecutor(1))
ctx := context.Background()
sys, l1Client := startFaultDisputeSystem(t)
......@@ -219,7 +219,7 @@ func TestChallengerCompleteDisputeGame(t *testing.T) {
}
func TestChallengerCompleteExhaustiveDisputeGame(t *testing.T) {
InitParallel(t)
InitParallel(t, UseExecutor(1))
testCase := func(t *testing.T, isRootCorrect bool) {
ctx := context.Background()
......@@ -267,17 +267,17 @@ func TestChallengerCompleteExhaustiveDisputeGame(t *testing.T) {
}
t.Run("RootCorrect", func(t *testing.T) {
InitParallel(t)
InitParallel(t, UseExecutor(1))
testCase(t, true)
})
t.Run("RootIncorrect", func(t *testing.T) {
InitParallel(t)
InitParallel(t, UseExecutor(1))
testCase(t, false)
})
}
func TestCannonDisputeGame(t *testing.T) {
InitParallel(t, UsesCannon)
InitParallel(t, UsesCannon, UseExecutor(1))
tests := []struct {
name string
......@@ -290,7 +290,7 @@ func TestCannonDisputeGame(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
InitParallel(t)
InitParallel(t, UseExecutor(1))
ctx := context.Background()
sys, l1Client := startFaultDisputeSystem(t)
......@@ -328,7 +328,7 @@ func TestCannonDisputeGame(t *testing.T) {
}
func TestCannonDefendStep(t *testing.T) {
InitParallel(t, UsesCannon)
InitParallel(t, UsesCannon, UseExecutor(1))
ctx := context.Background()
sys, l1Client := startFaultDisputeSystem(t)
......@@ -370,7 +370,7 @@ func TestCannonDefendStep(t *testing.T) {
}
func TestCannonProposedOutputRootInvalid(t *testing.T) {
InitParallel(t, UsesCannon)
InitParallel(t, UsesCannon, UseExecutor(0))
// honestStepsFail attempts to perform both an attack and defend step using the correct trace.
honestStepsFail := func(ctx context.Context, game *disputegame.CannonGameHelper, correctTrace *disputegame.HonestHelper, parentClaimIdx int64) {
// Attack step should fail
......@@ -421,7 +421,7 @@ func TestCannonProposedOutputRootInvalid(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
InitParallel(t)
InitParallel(t, UseExecutor(0))
ctx := context.Background()
sys, l1Client, game, correctTrace := setupDisputeGameForInvalidOutputRoot(t, test.outputRoot)
......@@ -448,7 +448,7 @@ func TestCannonProposedOutputRootInvalid(t *testing.T) {
}
func TestCannonPoisonedPostState(t *testing.T) {
InitParallel(t, UsesCannon)
InitParallel(t, UsesCannon, UseExecutor(0))
ctx := context.Background()
sys, l1Client := startFaultDisputeSystem(t)
......@@ -558,8 +558,7 @@ func setupDisputeGameForInvalidOutputRoot(t *testing.T, outputRoot common.Hash)
}
func TestCannonChallengeWithCorrectRoot(t *testing.T) {
InitParallel(t, UsesCannon)
InitParallel(t, UsesCannon, UseExecutor(0))
ctx := context.Background()
sys, l1Client := startFaultDisputeSystem(t)
t.Cleanup(sys.Close)
......
......@@ -2,23 +2,70 @@ package op_e2e
import (
"os"
"strconv"
"testing"
)
var enableParallelTesting bool = os.Getenv("OP_E2E_DISABLE_PARALLEL") != "true"
func InitParallel(t *testing.T, opts ...func(t *testing.T)) {
type testopts struct {
executor uint64
}
func InitParallel(t *testing.T, args ...func(t *testing.T, opts *testopts)) {
t.Helper()
if enableParallelTesting {
t.Parallel()
}
for _, opt := range opts {
opt(t)
opts := &testopts{}
for _, arg := range args {
arg(t, opts)
}
checkExecutor(t, opts.executor)
}
func UsesCannon(t *testing.T) {
func UsesCannon(t *testing.T, opts *testopts) {
if os.Getenv("OP_E2E_CANNON_ENABLED") == "false" {
t.Skip("Skipping cannon test")
}
}
// UseExecutor allows manually splitting tests between circleci executors
//
// Tests default to run on the first executor but can be moved to the second with:
// InitParallel(t, UseExecutor(1))
// Any tests assigned to an executor greater than the number available automatically use the last executor.
// Executor indexes start from 0
func UseExecutor(assignedIdx uint64) func(t *testing.T, opts *testopts) {
return func(t *testing.T, opts *testopts) {
opts.executor = assignedIdx
}
}
func checkExecutor(t *testing.T, assignedIdx uint64) {
envTotal := os.Getenv("CIRCLE_NODE_TOTAL")
envIdx := os.Getenv("CIRCLE_NODE_INDEX")
if envTotal == "" || envIdx == "" {
// Not using test splitting, so ignore assigned executor
t.Logf("Running test. Test splitting not in use.")
return
}
total, err := strconv.ParseUint(envTotal, 10, 0)
if err != nil {
t.Fatalf("Could not parse CIRCLE_NODE_TOTAL env var %v: %v", envTotal, err)
}
idx, err := strconv.ParseUint(envIdx, 10, 0)
if err != nil {
t.Fatalf("Could not parse CIRCLE_NODE_INDEX env var %v: %v", envIdx, err)
}
if assignedIdx >= total && idx == total-1 {
t.Logf("Running test. Current executor (%v) is the last executor and assigned executor (%v) >= total executors (%v).", idx, assignedIdx, total)
return
}
if idx == assignedIdx {
t.Logf("Running test. Assigned executor (%v) matches current executor (%v) of total (%v)", assignedIdx, idx, total)
return
}
t.Skipf("Skipping test. Assigned executor %v, current executor %v of total %v", assignedIdx, idx, total)
}
......@@ -49,6 +49,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
proposermetrics "github.com/ethereum-optimism/optimism/op-proposer/metrics"
l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer"
......@@ -202,6 +203,9 @@ type SystemConfig struct {
// Target L1 tx size for the batcher transactions
BatcherTargetL1TxSizeBytes uint64
// Max L1 tx size for the batcher transactions
BatcherMaxL1TxSizeBytes uint64
// SupportL1TimeTravel determines if the L1 node supports quickly skipping forward in time
SupportL1TimeTravel bool
}
......@@ -679,13 +683,21 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
return nil, fmt.Errorf("unable to start l2 output submitter: %w", err)
}
batchType := derive.SingularBatchType
if os.Getenv("OP_E2E_USE_SPAN_BATCH") == "true" {
batchType = derive.SpanBatchType
}
batcherMaxL1TxSizeBytes := cfg.BatcherMaxL1TxSizeBytes
if batcherMaxL1TxSizeBytes == 0 {
batcherMaxL1TxSizeBytes = 240_000
}
batcherCLIConfig := &bss.CLIConfig{
L1EthRpc: sys.EthInstances["l1"].WSEndpoint(),
L2EthRpc: sys.EthInstances["sequencer"].WSEndpoint(),
RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(),
MaxPendingTransactions: 0,
MaxChannelDuration: 1,
MaxL1TxSize: 240_000,
MaxL1TxSize: batcherMaxL1TxSizeBytes,
CompressorConfig: compressor.CLIConfig{
TargetL1TxSizeBytes: cfg.BatcherTargetL1TxSizeBytes,
TargetNumFrames: 1,
......@@ -699,6 +711,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
Format: oplog.FormatText,
},
Stopped: sys.cfg.DisableBatcher, // Batch submitter may be enabled later
BatchType: uint(batchType),
}
// Batch Submitter
batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.cfg.Loggers["batcher"])
......
......@@ -1553,3 +1553,14 @@ func TestRequiredProtocolVersionChangeAndHalt(t *testing.T) {
require.NoError(t, err)
t.Log("verified that op-geth closed!")
}
func TestIncorrectBatcherConfiguration(t *testing.T) {
InitParallel(t)
cfg := DefaultSystemConfig(t)
// make the batcher configuration invalid
cfg.BatcherMaxL1TxSizeBytes = 1
_, err := cfg.Start(t)
require.Error(t, err, "Expected error on invalid batcher configuration")
}
......@@ -33,14 +33,7 @@ var L2ChainIDToNetworkDisplayName = func() map[string]string {
}()
// AvailableNetworks returns the selection of network configurations that is available by default.
// Other configurations that are part of the superchain-registry can be used with the --beta.network flag.
func AvailableNetworks() []string {
return []string{"op-mainnet", "op-goerli", "op-sepolia"}
}
// BetaAvailableNetworks returns all available network configurations in the superchain-registry.
// This set of configurations is experimental, and may change at any time.
func BetaAvailableNetworks() []string {
var networks []string
for _, cfg := range superchain.OPChains {
networks = append(networks, cfg.Chain+"-"+cfg.Superchain)
......@@ -48,20 +41,6 @@ func BetaAvailableNetworks() []string {
return networks
}
func IsAvailableNetwork(name string, beta bool) bool {
name = handleLegacyName(name)
available := AvailableNetworks()
if beta {
available = BetaAvailableNetworks()
}
for _, v := range available {
if v == name {
return true
}
}
return false
}
func handleLegacyName(name string) string {
switch name {
case "goerli":
......@@ -91,7 +70,7 @@ func ChainByName(name string) *superchain.ChainConfig {
func GetRollupConfig(name string) (*rollup.Config, error) {
chainCfg := ChainByName(name)
if chainCfg == nil {
return nil, fmt.Errorf("invalid network %s", name)
return nil, fmt.Errorf("invalid network: %q", name)
}
rollupCfg, err := rollup.LoadOPStackRollupConfig(chainCfg.ChainID)
if err != nil {
......
......@@ -27,8 +27,6 @@ func TestGetRollupConfig(t *testing.T) {
}
for name, expectedCfg := range configsByName {
require.True(t, IsAvailableNetwork(name, false))
gotCfg, err := GetRollupConfig(name)
require.NoError(t, err)
......@@ -95,6 +93,7 @@ var goerliCfg = rollup.Config{
DepositContractAddress: common.HexToAddress("0x5b47E1A08Ea6d985D6649300584e6722Ec4B1383"),
L1SystemConfigAddress: common.HexToAddress("0xAe851f927Ee40dE99aaBb7461C00f9622ab91d60"),
RegolithTime: u64Ptr(1679079600),
CanyonTime: u64Ptr(1699981200),
ProtocolVersionsAddress: common.HexToAddress("0x0C24F5098774aA366827D667494e9F889f7cFc08"),
}
......@@ -126,6 +125,7 @@ var sepoliaCfg = rollup.Config{
DepositContractAddress: common.HexToAddress("0x16fc5058f25648194471939df75cf27a2fdc48bc"),
L1SystemConfigAddress: common.HexToAddress("0x034edd2a225f7f429a63e0f1d2084b9e0a93b538"),
RegolithTime: u64Ptr(0),
CanyonTime: u64Ptr(1699981200),
ProtocolVersionsAddress: common.HexToAddress("0x79ADD5713B383DAa0a138d3C4780C7A1804a8090"),
}
......
......@@ -24,7 +24,7 @@ type ChannelWithMetadata struct {
InvalidFrames bool `json:"invalid_frames"`
InvalidBatches bool `json:"invalid_batches"`
Frames []FrameWithMetadata `json:"frames"`
Batches []derive.SingularBatch `json:"batches"`
Batches []derive.BatchData `json:"batches"`
}
type FrameWithMetadata struct {
......@@ -104,17 +104,17 @@ func processFrames(cfg *rollup.Config, id derive.ChannelID, frames []FrameWithMe
}
}
var batches []derive.SingularBatch
var batches []derive.BatchData
invalidBatches := false
if ch.IsReady() {
br, err := derive.BatchReader(cfg, ch.Reader(), eth.L1BlockRef{})
br, err := derive.BatchReader(ch.Reader())
if err == nil {
for batch, err := br(); err != io.EOF; batch, err = br() {
if err != nil {
fmt.Printf("Error reading batch for channel %v. Err: %v\n", id.String(), err)
invalidBatches = true
} else {
batches = append(batches, batch.Batch.SingularBatch)
batches = append(batches, *batch)
}
}
} else {
......
......@@ -87,6 +87,7 @@ var (
Usage: "The L1 RethDB path, used to fetch receipts for L1 blocks. Only applicable when using the `reth_db` RPC kind with `l1.rpckind`.",
EnvVars: prefixEnvVars("L1_RETHDB"),
Required: false,
Hidden: true,
}
L1RPCRateLimit = &cli.Float64Flag{
Name: "l1.rpc-rate-limit",
......@@ -244,10 +245,9 @@ var (
}
BetaExtraNetworks = &cli.BoolFlag{
Name: "beta.extra-networks",
Usage: fmt.Sprintf("Beta feature: enable selection of a predefined-network from the superchain-registry. "+
"The superchain-registry is experimental, and the availability of configurations may change."+
"Available networks: %s", strings.Join(chaincfg.BetaAvailableNetworks(), ", ")),
Usage: "Legacy flag, ignored, all superchain-registry networks are enabled by default.",
EnvVars: prefixEnvVars("BETA_EXTRA_NETWORKS"),
Hidden: true, // hidden, this is deprecated, the flag is not used anymore.
}
RollupHalt = &cli.StringFlag{
Name: "rollup.halt",
......
......@@ -32,7 +32,7 @@ type AttributesQueue struct {
config *rollup.Config
builder AttributesBuilder
prev *BatchQueue
batch *BatchData
batch *SingularBatch
}
func NewAttributesQueue(log log.Logger, cfg *rollup.Config, builder AttributesBuilder, prev *BatchQueue) *AttributesQueue {
......@@ -71,7 +71,7 @@ func (aq *AttributesQueue) NextAttributes(ctx context.Context, l2SafeHead eth.L2
// createNextAttributes transforms a batch into a payload attributes. This sets `NoTxPool` and appends the batched transactions
// to the attributes transaction list
func (aq *AttributesQueue) createNextAttributes(ctx context.Context, batch *BatchData, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
func (aq *AttributesQueue) createNextAttributes(ctx context.Context, batch *SingularBatch, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
// sanity check parent hash
if batch.ParentHash != l2SafeHead.Hash {
return nil, NewResetError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, l2SafeHead.Hash))
......
......@@ -42,13 +42,13 @@ func TestAttributesQueue(t *testing.T) {
safeHead.L1Origin = l1Info.ID()
safeHead.Time = l1Info.InfoTime
batch := NewSingularBatchData(SingularBatch{
batch := SingularBatch{
ParentHash: safeHead.Hash,
EpochNum: rollup.Epoch(l1Info.InfoNum),
EpochHash: l1Info.InfoHash,
Timestamp: safeHead.Time + cfg.BlockTime,
Transactions: []eth.Data{eth.Data("foobar"), eth.Data("example")},
})
}
parentL1Cfg := eth.SystemConfig{
BatcherAddr: common.Address{42},
......@@ -80,7 +80,7 @@ func TestAttributesQueue(t *testing.T) {
aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, attrBuilder, nil)
actual, err := aq.createNextAttributes(context.Background(), batch, safeHead)
actual, err := aq.createNextAttributes(context.Background(), &batch, safeHead)
require.NoError(t, err)
require.Equal(t, attrs, *actual)
......
......@@ -25,9 +25,9 @@ var encodeBufferPool = sync.Pool{
const (
// SingularBatchType is the first version of Batch format, representing a single L2 block.
SingularBatchType = iota
SingularBatchType = 0
// SpanBatchType is the Batch version used after SpanBatch hard fork, representing a span of L2 blocks.
SpanBatchType
SpanBatchType = 1
)
// Batch contains information to build one or multiple L2 blocks.
......@@ -39,12 +39,20 @@ type Batch interface {
LogContext(log.Logger) log.Logger
}
// BatchData is a composition type that contains raw data of each batch version.
// It has encoding & decoding methods to implement typed encoding.
// BatchData is used to represent the typed encoding & decoding.
// and wraps around a single interface InnerBatchData.
// Further fields such as cache can be added in the future, without embedding each type of InnerBatchData.
// Similar design with op-geth's types.Transaction struct.
type BatchData struct {
BatchType int
SingularBatch
RawSpanBatch
inner InnerBatchData
}
// InnerBatchData is the underlying data of a BatchData.
// This is implemented by SingularBatch and RawSpanBatch.
type InnerBatchData interface {
GetBatchType() int
encode(w io.Writer) error
decode(r *bytes.Reader) error
}
// EncodeRLP implements rlp.Encoder
......@@ -58,6 +66,10 @@ func (b *BatchData) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, buf.Bytes())
}
func (bd *BatchData) GetBatchType() uint8 {
return uint8(bd.inner.GetBatchType())
}
// MarshalBinary returns the canonical encoding of the batch.
func (b *BatchData) MarshalBinary() ([]byte, error) {
var buf bytes.Buffer
......@@ -67,16 +79,10 @@ func (b *BatchData) MarshalBinary() ([]byte, error) {
// encodeTyped encodes batch type and payload for each batch type.
func (b *BatchData) encodeTyped(buf *bytes.Buffer) error {
switch b.BatchType {
case SingularBatchType:
buf.WriteByte(SingularBatchType)
return rlp.Encode(buf, &b.SingularBatch)
case SpanBatchType:
buf.WriteByte(SpanBatchType)
return b.RawSpanBatch.encode(buf)
default:
return fmt.Errorf("unrecognized batch type: %d", b.BatchType)
if err := buf.WriteByte(b.GetBatchType()); err != nil {
return err
}
return b.inner.encode(buf)
}
// DecodeRLP implements rlp.Decoder
......@@ -99,35 +105,28 @@ func (b *BatchData) UnmarshalBinary(data []byte) error {
return b.decodeTyped(data)
}
// decodeTyped decodes batch type and payload for each batch type.
// decodeTyped decodes a typed batchData
func (b *BatchData) decodeTyped(data []byte) error {
if len(data) == 0 {
return fmt.Errorf("batch too short")
return errors.New("batch too short")
}
var inner InnerBatchData
switch data[0] {
case SingularBatchType:
b.BatchType = SingularBatchType
return rlp.DecodeBytes(data[1:], &b.SingularBatch)
inner = new(SingularBatch)
case SpanBatchType:
b.BatchType = int(data[0])
return b.RawSpanBatch.decodeBytes(data[1:])
inner = new(RawSpanBatch)
default:
return fmt.Errorf("unrecognized batch type: %d", data[0])
}
}
// NewSingularBatchData creates new BatchData with SingularBatch
func NewSingularBatchData(singularBatch SingularBatch) *BatchData {
return &BatchData{
BatchType: SingularBatchType,
SingularBatch: singularBatch,
if err := inner.decode(bytes.NewReader(data[1:])); err != nil {
return err
}
b.inner = inner
return nil
}
// NewSpanBatchData creates new BatchData with SpanBatch
func NewSpanBatchData(spanBatch RawSpanBatch) *BatchData {
return &BatchData{
BatchType: SpanBatchType,
RawSpanBatch: spanBatch,
}
// NewBatchData creates a new BatchData
func NewBatchData(inner InnerBatchData) *BatchData {
return &BatchData{inner: inner}
}
This diff is collapsed.
This diff is collapsed.
......@@ -6,15 +6,16 @@ import (
"math/rand"
"testing"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testutils"
)
func RandomRawSpanBatch(rng *rand.Rand, chainId *big.Int) *RawSpanBatch {
......@@ -52,8 +53,8 @@ func RandomRawSpanBatch(rng *rand.Rand, chainId *big.Int) *RawSpanBatch {
spanBatchPrefix: spanBatchPrefix{
relTimestamp: uint64(rng.Uint32()),
l1OriginNum: rng.Uint64(),
parentCheck: testutils.RandomData(rng, 20),
l1OriginCheck: testutils.RandomData(rng, 20),
parentCheck: [20]byte(testutils.RandomData(rng, 20)),
l1OriginCheck: [20]byte(testutils.RandomData(rng, 20)),
},
spanBatchPayload: spanBatchPayload{
blockCount: blockCount,
......@@ -141,40 +142,42 @@ func TestBatchRoundTrip(t *testing.T) {
chainID := new(big.Int).SetUint64(rng.Uint64())
batches := []*BatchData{
{
SingularBatch: SingularBatch{
NewBatchData(
&SingularBatch{
ParentHash: common.Hash{},
EpochNum: 0,
Timestamp: 0,
Transactions: []hexutil.Bytes{},
},
},
{
SingularBatch: SingularBatch{
),
NewBatchData(
&SingularBatch{
ParentHash: common.Hash{31: 0x42},
EpochNum: 1,
Timestamp: 1647026951,
Transactions: []hexutil.Bytes{[]byte{0, 0, 0}, []byte{0x76, 0xfd, 0x7c}},
},
},
NewSingularBatchData(*RandomSingularBatch(rng, 5, chainID)),
NewSingularBatchData(*RandomSingularBatch(rng, 7, chainID)),
NewSpanBatchData(*RandomRawSpanBatch(rng, chainID)),
NewSpanBatchData(*RandomRawSpanBatch(rng, chainID)),
NewSpanBatchData(*RandomRawSpanBatch(rng, chainID)),
),
NewBatchData(RandomSingularBatch(rng, 5, chainID)),
NewBatchData(RandomSingularBatch(rng, 7, chainID)),
NewBatchData(RandomRawSpanBatch(rng, chainID)),
NewBatchData(RandomRawSpanBatch(rng, chainID)),
NewBatchData(RandomRawSpanBatch(rng, chainID)),
}
for i, batch := range batches {
enc, err := batch.MarshalBinary()
assert.NoError(t, err)
require.NoError(t, err)
var dec BatchData
err = dec.UnmarshalBinary(enc)
assert.NoError(t, err)
if dec.BatchType == SpanBatchType {
_, err := dec.RawSpanBatch.derive(blockTime, genesisTimestamp, chainID)
assert.NoError(t, err)
require.NoError(t, err)
if dec.GetBatchType() == SpanBatchType {
rawSpanBatch, ok := dec.inner.(*RawSpanBatch)
require.True(t, ok)
_, err := rawSpanBatch.derive(blockTime, genesisTimestamp, chainID)
require.NoError(t, err)
}
assert.Equal(t, batch, &dec, "Batch not equal test case %v", i)
require.Equal(t, batch, &dec, "Batch not equal test case %v", i)
}
}
......@@ -185,43 +188,45 @@ func TestBatchRoundTripRLP(t *testing.T) {
chainID := new(big.Int).SetUint64(rng.Uint64())
batches := []*BatchData{
{
SingularBatch: SingularBatch{
NewBatchData(
&SingularBatch{
ParentHash: common.Hash{},
EpochNum: 0,
Timestamp: 0,
Transactions: []hexutil.Bytes{},
},
},
{
SingularBatch: SingularBatch{
),
NewBatchData(
&SingularBatch{
ParentHash: common.Hash{31: 0x42},
EpochNum: 1,
Timestamp: 1647026951,
Transactions: []hexutil.Bytes{[]byte{0, 0, 0}, []byte{0x76, 0xfd, 0x7c}},
},
},
NewSingularBatchData(*RandomSingularBatch(rng, 5, chainID)),
NewSingularBatchData(*RandomSingularBatch(rng, 7, chainID)),
NewSpanBatchData(*RandomRawSpanBatch(rng, chainID)),
NewSpanBatchData(*RandomRawSpanBatch(rng, chainID)),
NewSpanBatchData(*RandomRawSpanBatch(rng, chainID)),
),
NewBatchData(RandomSingularBatch(rng, 5, chainID)),
NewBatchData(RandomSingularBatch(rng, 7, chainID)),
NewBatchData(RandomRawSpanBatch(rng, chainID)),
NewBatchData(RandomRawSpanBatch(rng, chainID)),
NewBatchData(RandomRawSpanBatch(rng, chainID)),
}
for i, batch := range batches {
var buf bytes.Buffer
err := batch.EncodeRLP(&buf)
assert.NoError(t, err)
require.NoError(t, err)
result := buf.Bytes()
var dec BatchData
r := bytes.NewReader(result)
s := rlp.NewStream(r, 0)
err = dec.DecodeRLP(s)
assert.NoError(t, err)
if dec.BatchType == SpanBatchType {
_, err := dec.RawSpanBatch.derive(blockTime, genesisTimestamp, chainID)
assert.NoError(t, err)
}
assert.Equal(t, batch, &dec, "Batch not equal test case %v", i)
require.NoError(t, err)
if dec.GetBatchType() == SpanBatchType {
rawSpanBatch, ok := dec.inner.(*RawSpanBatch)
require.True(t, ok)
_, err := rawSpanBatch.derive(blockTime, genesisTimestamp, chainID)
require.NoError(t, err)
}
require.Equal(t, batch, &dec, "Batch not equal test case %v", i)
}
}
......@@ -17,13 +17,13 @@ func FuzzBatchRoundTrip(f *testing.F) {
typeProvider := fuzz.NewFromGoFuzz(fuzzedData).NilChance(0).MaxDepth(10000).NumElements(0, 0x100).AllowUnexportedFields(true)
fuzzerutils.AddFuzzerFunctions(typeProvider)
var singularBatch SingularBatch
typeProvider.Fuzz(&singularBatch)
// Create our batch data from fuzzed data
var batchData BatchData
typeProvider.Fuzz(&batchData)
// force batchdata to only contain singular batch
batchData.BatchType = SingularBatchType
batchData.RawSpanBatch = RawSpanBatch{}
batchData.inner = &singularBatch
// Encode our batch data
enc, err := batchData.MarshalBinary()
......
This diff is collapsed.
This diff is collapsed.
......@@ -6,8 +6,6 @@ import (
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/rlp"
)
......@@ -146,7 +144,9 @@ func (ch *Channel) Reader() io.Reader {
// BatchReader provides a function that iteratively consumes batches from the reader.
// The L1Inclusion block is also provided at creation time.
func BatchReader(cfg *rollup.Config, r io.Reader, l1InclusionBlock eth.L1BlockRef) (func() (BatchWithL1InclusionBlock, error), error) {
// Warning: the batch reader can read every batch-type.
// The caller of the batch-reader should filter the results.
func BatchReader(r io.Reader) (func() (*BatchData, error), error) {
// Setup decompressor stage + RLP reader
zr, err := zlib.NewReader(r)
if err != nil {
......@@ -154,17 +154,11 @@ func BatchReader(cfg *rollup.Config, r io.Reader, l1InclusionBlock eth.L1BlockRe
}
rlpReader := rlp.NewStream(zr, MaxRLPBytesPerChannel)
// Read each batch iteratively
return func() (BatchWithL1InclusionBlock, error) {
ret := BatchWithL1InclusionBlock{
L1InclusionBlock: l1InclusionBlock,
}
err := rlpReader.Decode(&ret.Batch)
if err != nil {
return ret, err
}
if ret.Batch.BatchType == SpanBatchType && !cfg.IsSpanBatch(ret.L1InclusionBlock.Time) {
return ret, fmt.Errorf("cannot accept span-batch in L1 block with time %d", ret.L1InclusionBlock.Time)
return func() (*BatchData, error) {
var batchData BatchData
if err = rlpReader.Decode(&batchData); err != nil {
return nil, err
}
return ret, nil
return &batchData, nil
}, nil
}
......@@ -3,12 +3,13 @@ package derive
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
......@@ -21,7 +22,7 @@ type ChannelInReader struct {
cfg *rollup.Config
nextBatchFn func() (BatchWithL1InclusionBlock, error)
nextBatchFn func() (*BatchData, error)
prev *ChannelBank
......@@ -46,7 +47,7 @@ func (cr *ChannelInReader) Origin() eth.L1BlockRef {
// TODO: Take full channel for better logging
func (cr *ChannelInReader) WriteChannel(data []byte) error {
if f, err := BatchReader(cr.cfg, bytes.NewBuffer(data), cr.Origin()); err == nil {
if f, err := BatchReader(bytes.NewBuffer(data)); err == nil {
cr.nextBatchFn = f
cr.metrics.RecordChannelInputBytes(len(data))
return nil
......@@ -65,7 +66,7 @@ func (cr *ChannelInReader) NextChannel() {
// NextBatch pulls out the next batch from the channel if it has it.
// It returns io.EOF when it cannot make any more progress.
// It will return a temporary error if it needs to be called again to advance some internal state.
func (cr *ChannelInReader) NextBatch(ctx context.Context) (*BatchData, error) {
func (cr *ChannelInReader) NextBatch(ctx context.Context) (Batch, error) {
if cr.nextBatchFn == nil {
if data, err := cr.prev.NextData(ctx); err == io.EOF {
return nil, io.EOF
......@@ -80,7 +81,7 @@ func (cr *ChannelInReader) NextBatch(ctx context.Context) (*BatchData, error) {
// TODO: can batch be non nil while err == io.EOF
// This depends on the behavior of rlp.Stream
batch, err := cr.nextBatchFn()
batchData, err := cr.nextBatchFn()
if err == io.EOF {
cr.NextChannel()
return nil, NotEnoughData
......@@ -89,7 +90,31 @@ func (cr *ChannelInReader) NextBatch(ctx context.Context) (*BatchData, error) {
cr.NextChannel()
return nil, NotEnoughData
}
return batch.Batch, nil
switch batchData.GetBatchType() {
case SingularBatchType:
singularBatch, ok := batchData.inner.(*SingularBatch)
if !ok {
return nil, NewCriticalError(errors.New("failed type assertion to SingularBatch"))
}
return singularBatch, nil
case SpanBatchType:
if origin := cr.Origin(); !cr.cfg.IsSpanBatch(origin.Time) {
return nil, NewTemporaryError(fmt.Errorf("cannot accept span batch in L1 block %s at time %d", origin, origin.Time))
}
rawSpanBatch, ok := batchData.inner.(*RawSpanBatch)
if !ok {
return nil, NewCriticalError(errors.New("failed type assertion to SpanBatch"))
}
// If the batch type is Span batch, derive block inputs from RawSpanBatch.
spanBatch, err := rawSpanBatch.derive(cr.cfg.BlockTime, cr.cfg.Genesis.L2Time, cr.cfg.L2ChainID)
if err != nil {
return nil, err
}
return spanBatch, nil
default:
// error is bubbled up to user, but pipeline can skip the batch and continue after.
return nil, NewTemporaryError(fmt.Errorf("unrecognized batch type: %d", batchData.GetBatchType()))
}
}
func (cr *ChannelInReader) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.SystemConfig) error {
......
......@@ -48,7 +48,31 @@ type Compressor interface {
FullErr() error
}
type ChannelOut struct {
type ChannelOut interface {
ID() ChannelID
Reset() error
AddBlock(*types.Block) (uint64, error)
AddSingularBatch(*SingularBatch, uint64) (uint64, error)
InputBytes() int
ReadyBytes() int
Flush() error
FullErr() error
Close() error
OutputFrame(*bytes.Buffer, uint64) (uint16, error)
}
func NewChannelOut(batchType uint, compress Compressor, spanBatchBuilder *SpanBatchBuilder) (ChannelOut, error) {
switch batchType {
case SingularBatchType:
return NewSingularChannelOut(compress)
case SpanBatchType:
return NewSpanChannelOut(compress, spanBatchBuilder)
default:
return nil, fmt.Errorf("unrecognized batch type: %d", batchType)
}
}
type SingularChannelOut struct {
id ChannelID
// Frame ID of the next frame to emit. Increment after emitting
frame uint64
......@@ -61,13 +85,13 @@ type ChannelOut struct {
closed bool
}
func (co *ChannelOut) ID() ChannelID {
func (co *SingularChannelOut) ID() ChannelID {
return co.id
}
func NewChannelOut(compress Compressor) (*ChannelOut, error) {
c := &ChannelOut{
id: ChannelID{}, // TODO: use GUID here instead of fully random data
func NewSingularChannelOut(compress Compressor) (*SingularChannelOut, error) {
c := &SingularChannelOut{
id: ChannelID{},
frame: 0,
rlpLength: 0,
compress: compress,
......@@ -80,8 +104,7 @@ func NewChannelOut(compress Compressor) (*ChannelOut, error) {
return c, nil
}
// TODO: reuse ChannelOut for performance
func (co *ChannelOut) Reset() error {
func (co *SingularChannelOut) Reset() error {
co.frame = 0
co.rlpLength = 0
co.compress.Reset()
......@@ -94,27 +117,27 @@ func (co *ChannelOut) Reset() error {
// and an error if there is a problem adding the block. The only sentinel error
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// should be closed and a new one should be made.
func (co *ChannelOut) AddBlock(block *types.Block) (uint64, error) {
func (co *SingularChannelOut) AddBlock(block *types.Block) (uint64, error) {
if co.closed {
return 0, errors.New("already closed")
}
batch, _, err := BlockToBatch(block)
batch, l1Info, err := BlockToSingularBatch(block)
if err != nil {
return 0, err
}
return co.AddBatch(batch)
return co.AddSingularBatch(batch, l1Info.SequenceNumber)
}
// AddBatch adds a batch to the channel. It returns the RLP encoded byte size
// AddSingularBatch adds a batch to the channel. It returns the RLP encoded byte size
// and an error if there is a problem adding the batch. The only sentinel error
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// should be closed and a new one should be made.
//
// AddBatch should be used together with BlockToBatch if you need to access the
// AddSingularBatch should be used together with BlockToBatch if you need to access the
// BatchData before adding a block to the channel. It isn't possible to access
// the batch data with AddBlock.
func (co *ChannelOut) AddBatch(batch *BatchData) (uint64, error) {
func (co *SingularChannelOut) AddSingularBatch(batch *SingularBatch, _ uint64) (uint64, error) {
if co.closed {
return 0, errors.New("already closed")
}
......@@ -122,7 +145,7 @@ func (co *ChannelOut) AddBatch(batch *BatchData) (uint64, error) {
// We encode to a temporary buffer to determine the encoded length to
// ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL
var buf bytes.Buffer
if err := rlp.Encode(&buf, batch); err != nil {
if err := rlp.Encode(&buf, NewBatchData(batch)); err != nil {
return 0, err
}
if co.rlpLength+buf.Len() > MaxRLPBytesPerChannel {
......@@ -137,28 +160,28 @@ func (co *ChannelOut) AddBatch(batch *BatchData) (uint64, error) {
}
// InputBytes returns the total amount of RLP-encoded input bytes.
func (co *ChannelOut) InputBytes() int {
func (co *SingularChannelOut) InputBytes() int {
return co.rlpLength
}
// ReadyBytes returns the number of bytes that the channel out can immediately output into a frame.
// Use `Flush` or `Close` to move data from the compression buffer into the ready buffer if more bytes
// are needed. Add blocks may add to the ready buffer, but it is not guaranteed due to the compression stage.
func (co *ChannelOut) ReadyBytes() int {
func (co *SingularChannelOut) ReadyBytes() int {
return co.compress.Len()
}
// Flush flushes the internal compression stage to the ready buffer. It enables pulling a larger & more
// complete frame. It reduces the compression efficiency.
func (co *ChannelOut) Flush() error {
func (co *SingularChannelOut) Flush() error {
return co.compress.Flush()
}
func (co *ChannelOut) FullErr() error {
func (co *SingularChannelOut) FullErr() error {
return co.compress.FullErr()
}
func (co *ChannelOut) Close() error {
func (co *SingularChannelOut) Close() error {
if co.closed {
return errors.New("already closed")
}
......@@ -173,28 +196,13 @@ func (co *ChannelOut) Close() error {
// Returns io.EOF when the channel is closed & there are no more frames.
// Returns nil if there is still more buffered data.
// Returns an error if it ran into an error during processing.
func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, error) {
f := Frame{
ID: co.id,
FrameNumber: uint16(co.frame),
}
func (co *SingularChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, error) {
// Check that the maxSize is large enough for the frame overhead size.
if maxSize < FrameV0OverHeadSize {
return 0, ErrMaxFrameSizeTooSmall
}
// Copy data from the local buffer into the frame data buffer
maxDataSize := maxSize - FrameV0OverHeadSize
if maxDataSize > uint64(co.compress.Len()) {
maxDataSize = uint64(co.compress.Len())
// If we are closed & will not spill past the current frame
// mark it is the final frame of the channel.
if co.closed {
f.IsLast = true
}
}
f.Data = make([]byte, maxDataSize)
f := createEmptyFrame(co.id, co.frame, co.ReadyBytes(), co.closed, maxSize)
if _, err := io.ReadFull(co.compress, f.Data); err != nil {
return 0, err
......@@ -213,8 +221,8 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, erro
}
}
// BlockToBatch transforms a block into a batch object that can easily be RLP encoded.
func BlockToBatch(block *types.Block) (*BatchData, L1BlockInfo, error) {
// BlockToSingularBatch transforms a block into a batch object that can easily be RLP encoded.
func BlockToSingularBatch(block *types.Block) (*SingularBatch, L1BlockInfo, error) {
opaqueTxs := make([]hexutil.Bytes, 0, len(block.Transactions()))
for i, tx := range block.Transactions() {
if tx.Type() == types.DepositTxType {
......@@ -238,15 +246,13 @@ func BlockToBatch(block *types.Block) (*BatchData, L1BlockInfo, error) {
return nil, l1Info, fmt.Errorf("could not parse the L1 Info deposit: %w", err)
}
return NewSingularBatchData(
SingularBatch{
return &SingularBatch{
ParentHash: block.ParentHash(),
EpochNum: rollup.Epoch(l1Info.Number),
EpochHash: l1Info.BlockHash,
Timestamp: block.Time(),
Transactions: opaqueTxs,
},
), l1Info, nil
}, l1Info, nil
}
// ForceCloseTxData generates the transaction data for a transaction which will force close
......@@ -303,3 +309,24 @@ func ForceCloseTxData(frames []Frame) ([]byte, error) {
return out.Bytes(), nil
}
// createEmptyFrame creates new empty Frame with given information. Frame data must be copied from ChannelOut.
func createEmptyFrame(id ChannelID, frame uint64, readyBytes int, closed bool, maxSize uint64) *Frame {
f := Frame{
ID: id,
FrameNumber: uint16(frame),
}
// Copy data from the local buffer into the frame data buffer
maxDataSize := maxSize - FrameV0OverHeadSize
if maxDataSize > uint64(readyBytes) {
maxDataSize = uint64(readyBytes)
// If we are closed & will not spill past the current frame
// mark it is the final frame of the channel.
if closed {
f.IsLast = true
}
}
f.Data = make([]byte, maxDataSize)
return &f
}
......@@ -29,7 +29,7 @@ func (s *nonCompressor) FullErr() error {
}
func TestChannelOutAddBlock(t *testing.T) {
cout, err := NewChannelOut(&nonCompressor{})
cout, err := NewChannelOut(SingularBatchType, &nonCompressor{}, nil)
require.NoError(t, err)
t.Run("returns err if first tx is not an l1info tx", func(t *testing.T) {
......@@ -50,7 +50,7 @@ func TestChannelOutAddBlock(t *testing.T) {
// max size that is below the fixed frame size overhead of 23, will return
// an error.
func TestOutputFrameSmallMaxSize(t *testing.T) {
cout, err := NewChannelOut(&nonCompressor{})
cout, err := NewChannelOut(SingularBatchType, &nonCompressor{}, nil)
require.NoError(t, err)
// Call OutputFrame with the range of small max size values that err
......@@ -97,42 +97,42 @@ func TestForceCloseTxData(t *testing.T) {
output: "",
},
{
frames: []Frame{Frame{FrameNumber: 0, IsLast: false}, Frame{ID: id, FrameNumber: 1, IsLast: true}},
frames: []Frame{{FrameNumber: 0, IsLast: false}, {ID: id, FrameNumber: 1, IsLast: true}},
errors: true,
output: "",
},
{
frames: []Frame{Frame{ID: id, FrameNumber: 0, IsLast: false}},
frames: []Frame{{ID: id, FrameNumber: 0, IsLast: false}},
errors: false,
output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000001",
},
{
frames: []Frame{Frame{ID: id, FrameNumber: 0, IsLast: true}},
frames: []Frame{{ID: id, FrameNumber: 0, IsLast: true}},
errors: false,
output: "00",
},
{
frames: []Frame{Frame{ID: id, FrameNumber: 1, IsLast: false}},
frames: []Frame{{ID: id, FrameNumber: 1, IsLast: false}},
errors: false,
output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000001",
},
{
frames: []Frame{Frame{ID: id, FrameNumber: 1, IsLast: true}},
frames: []Frame{{ID: id, FrameNumber: 1, IsLast: true}},
errors: false,
output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000000",
},
{
frames: []Frame{Frame{ID: id, FrameNumber: 2, IsLast: true}},
frames: []Frame{{ID: id, FrameNumber: 2, IsLast: true}},
errors: false,
output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000000deadbeefdeadbeefdeadbeefdeadbeef00010000000000",
},
{
frames: []Frame{Frame{ID: id, FrameNumber: 1, IsLast: false}, Frame{ID: id, FrameNumber: 3, IsLast: true}},
frames: []Frame{{ID: id, FrameNumber: 1, IsLast: false}, {ID: id, FrameNumber: 3, IsLast: true}},
errors: false,
output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000000deadbeefdeadbeefdeadbeefdeadbeef00020000000000",
},
{
frames: []Frame{Frame{ID: id, FrameNumber: 1, IsLast: false}, Frame{ID: id, FrameNumber: 3, IsLast: true}, Frame{ID: id, FrameNumber: 5, IsLast: true}},
frames: []Frame{{ID: id, FrameNumber: 1, IsLast: false}, {ID: id, FrameNumber: 3, IsLast: true}, {ID: id, FrameNumber: 5, IsLast: true}},
errors: false,
output: "00deadbeefdeadbeefdeadbeefdeadbeef00000000000000deadbeefdeadbeefdeadbeefdeadbeef00020000000000",
},
......@@ -152,6 +152,6 @@ func TestForceCloseTxData(t *testing.T) {
func TestBlockToBatchValidity(t *testing.T) {
block := new(types.Block)
_, _, err := BlockToBatch(block)
_, _, err := BlockToSingularBatch(block)
require.ErrorContains(t, err, "has no transactions")
}
......@@ -35,6 +35,7 @@ type Engine interface {
PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayload, error)
L2BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L2BlockRef, error)
L2BlockRefByHash(ctx context.Context, l2Hash common.Hash) (eth.L2BlockRef, error)
L2BlockRefByNumber(ctx context.Context, num uint64) (eth.L2BlockRef, error)
SystemConfigL2Fetcher
}
......
......@@ -19,10 +19,10 @@ func frameSize(frame Frame) uint64 {
const DerivationVersion0 = 0
// MaxSpanBatchFieldSize is the maximum amount of bytes that will be read from
// a span batch to decode span batch field. This value cannot be larger than
// MaxSpanBatchSize is the maximum amount of bytes that will be needed
// to decode every span batch field. This value cannot be larger than
// MaxRLPBytesPerChannel because single batch cannot be larger than channel size.
const MaxSpanBatchFieldSize = 10_000_000
const MaxSpanBatchSize = MaxRLPBytesPerChannel
// MaxChannelBankSize is the amount of memory space, in number of bytes,
// till the bank is pruned by removing channels,
......
......@@ -90,7 +90,7 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch
frameQueue := NewFrameQueue(log, l1Src)
bank := NewChannelBank(log, cfg, frameQueue, l1Fetcher, metrics)
chInReader := NewChannelInReader(cfg, log, bank, metrics)
batchQueue := NewBatchQueue(log, cfg, chInReader)
batchQueue := NewBatchQueue(log, cfg, chInReader, engine)
attrBuilder := NewFetchingAttributesBuilder(cfg, l1Fetcher, engine)
attributesQueue := NewAttributesQueue(log, cfg, attrBuilder, batchQueue)
......
package derive
import (
"bytes"
"io"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)
// Batch format
......@@ -51,3 +55,13 @@ func (b *SingularBatch) LogContext(log log.Logger) log.Logger {
func (b *SingularBatch) Epoch() eth.BlockID {
return eth.BlockID{Hash: b.EpochHash, Number: uint64(b.EpochNum)}
}
// encode writes the byte encoding of SingularBatch to Writer stream
func (b *SingularBatch) encode(w io.Writer) error {
return rlp.Encode(w, b)
}
// decode reads the byte encoding of SingularBatch from Reader stream
func (b *SingularBatch) decode(r *bytes.Reader) error {
return rlp.Decode(r, b)
}
......@@ -5,7 +5,7 @@ import (
"math/rand"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestSingularBatchForBatchInterface(t *testing.T) {
......@@ -15,7 +15,7 @@ func TestSingularBatchForBatchInterface(t *testing.T) {
singularBatch := RandomSingularBatch(rng, txCount, chainID)
assert.Equal(t, SingularBatchType, singularBatch.GetBatchType())
assert.Equal(t, singularBatch.Timestamp, singularBatch.GetTimestamp())
assert.Equal(t, singularBatch.EpochNum, singularBatch.GetEpochNum())
require.Equal(t, SingularBatchType, singularBatch.GetBatchType())
require.Equal(t, singularBatch.Timestamp, singularBatch.GetTimestamp())
require.Equal(t, singularBatch.EpochNum, singularBatch.GetEpochNum())
}
This diff is collapsed.
This diff is collapsed.
......@@ -4,7 +4,6 @@ import (
"bytes"
"errors"
"fmt"
"io"
"math/big"
"github.com/ethereum/go-ethereum/common"
......@@ -70,21 +69,6 @@ func (tx *spanBatchTx) MarshalBinary() ([]byte, error) {
return buf.Bytes(), err
}
// EncodeRLP implements rlp.Encoder
func (tx *spanBatchTx) EncodeRLP(w io.Writer) error {
if tx.Type() == types.LegacyTxType {
return rlp.Encode(w, tx.inner)
}
// It's an EIP-2718 typed TX envelope.
buf := encodeBufferPool.Get().(*bytes.Buffer)
defer encodeBufferPool.Put(buf)
buf.Reset()
if err := tx.encodeTyped(buf); err != nil {
return err
}
return rlp.Encode(w, buf.Bytes())
}
// setDecoded sets the inner transaction after decoding.
func (tx *spanBatchTx) setDecoded(inner spanBatchTxData, size uint64) {
tx.inner = inner
......@@ -115,36 +99,6 @@ func (tx *spanBatchTx) decodeTyped(b []byte) (spanBatchTxData, error) {
}
}
// DecodeRLP implements rlp.Decoder
func (tx *spanBatchTx) DecodeRLP(s *rlp.Stream) error {
kind, size, err := s.Kind()
switch {
case err != nil:
return err
case kind == rlp.List:
// It's a legacy transaction.
var inner spanBatchLegacyTxData
err = s.Decode(&inner)
if err != nil {
return fmt.Errorf("failed to decode spanBatchLegacyTxData: %w", err)
}
tx.setDecoded(&inner, rlp.ListSize(size))
return nil
default:
// It's an EIP-2718 typed TX envelope.
var b []byte
if b, err = s.Bytes(); err != nil {
return err
}
inner, err := tx.decodeTyped(b)
if err != nil {
return err
}
tx.setDecoded(inner, uint64(len(b)))
return nil
}
}
// UnmarshalBinary decodes the canonical encoding of transactions.
// It supports legacy RLP transactions and EIP2718 typed transactions.
func (tx *spanBatchTx) UnmarshalBinary(b []byte) error {
......
package derive
import (
"bytes"
"math/big"
"math/rand"
"testing"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type spanBatchTxTest struct {
name string
trials int
mkTx func(rng *rand.Rand, signer types.Signer) *types.Transaction
}
func TestSpanBatchTxConvert(t *testing.T) {
rng := rand.New(rand.NewSource(0x1331))
cases := []spanBatchTxTest{
{"legacy tx", 32, testutils.RandomLegacyTx},
{"access list tx", 32, testutils.RandomAccessListTx},
{"dynamic fee tx", 32, testutils.RandomDynamicFeeTx},
}
for i, testCase := range cases {
t.Run(testCase.name, func(t *testing.T) {
rng := rand.New(rand.NewSource(int64(0x1331 + i)))
chainID := big.NewInt(rng.Int63n(1000))
signer := types.NewLondonSigner(chainID)
m := make(map[byte]int)
for i := 0; i < 32; i++ {
tx := testutils.RandomTx(rng, new(big.Int).SetUint64(rng.Uint64()), signer)
m[tx.Type()] += 1
for txIdx := 0; txIdx < testCase.trials; txIdx++ {
tx := testCase.mkTx(rng, signer)
v, r, s := tx.RawSignatureValues()
sbtx, err := newSpanBatchTx(*tx)
assert.NoError(t, err)
require.NoError(t, err)
tx2, err := sbtx.convertToFullTx(tx.Nonce(), tx.Gas(), tx.To(), chainID, v, r, s)
assert.NoError(t, err)
require.NoError(t, err)
// compare after marshal because we only need inner field of transaction
txEncoded, err := tx.MarshalBinary()
assert.NoError(t, err)
require.NoError(t, err)
tx2Encoded, err := tx2.MarshalBinary()
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, txEncoded, tx2Encoded)
}
// make sure every tx type is tested
assert.Positive(t, m[types.LegacyTxType])
assert.Positive(t, m[types.AccessListTxType])
assert.Positive(t, m[types.DynamicFeeTxType])
})
}
}
func TestSpanBatchTxRoundTrip(t *testing.T) {
rng := rand.New(rand.NewSource(0x1332))
cases := []spanBatchTxTest{
{"legacy tx", 32, testutils.RandomLegacyTx},
{"access list tx", 32, testutils.RandomAccessListTx},
{"dynamic fee tx", 32, testutils.RandomDynamicFeeTx},
}
for i, testCase := range cases {
t.Run(testCase.name, func(t *testing.T) {
rng := rand.New(rand.NewSource(int64(0x1332 + i)))
chainID := big.NewInt(rng.Int63n(1000))
signer := types.NewLondonSigner(chainID)
m := make(map[byte]int)
for i := 0; i < 32; i++ {
tx := testutils.RandomTx(rng, new(big.Int).SetUint64(rng.Uint64()), signer)
m[tx.Type()] += 1
for txIdx := 0; txIdx < testCase.trials; txIdx++ {
tx := testCase.mkTx(rng, signer)
sbtx, err := newSpanBatchTx(*tx)
assert.NoError(t, err)
require.NoError(t, err)
sbtxEncoded, err := sbtx.MarshalBinary()
assert.NoError(t, err)
require.NoError(t, err)
var sbtx2 spanBatchTx
err = sbtx2.UnmarshalBinary(sbtxEncoded)
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, sbtx, &sbtx2)
}
// make sure every tx type is tested
assert.Positive(t, m[types.LegacyTxType])
assert.Positive(t, m[types.AccessListTxType])
assert.Positive(t, m[types.DynamicFeeTxType])
}
func TestSpanBatchTxRoundTripRLP(t *testing.T) {
rng := rand.New(rand.NewSource(0x1333))
chainID := big.NewInt(rng.Int63n(1000))
signer := types.NewLondonSigner(chainID)
m := make(map[byte]int)
for i := 0; i < 32; i++ {
tx := testutils.RandomTx(rng, new(big.Int).SetUint64(rng.Uint64()), signer)
m[tx.Type()] += 1
sbtx, err := newSpanBatchTx(*tx)
assert.NoError(t, err)
var buf bytes.Buffer
err = sbtx.EncodeRLP(&buf)
assert.NoError(t, err)
result := buf.Bytes()
var sbtx2 spanBatchTx
r := bytes.NewReader(result)
rlpReader := rlp.NewStream(r, 0)
err = sbtx2.DecodeRLP(rlpReader)
assert.NoError(t, err)
assert.Equal(t, sbtx, &sbtx2)
})
}
// make sure every tx type is tested
assert.Positive(t, m[types.LegacyTxType])
assert.Positive(t, m[types.AccessListTxType])
assert.Positive(t, m[types.DynamicFeeTxType])
}
type spanBatchDummyTxData struct{}
......@@ -107,44 +91,44 @@ func TestSpanBatchTxInvalidTxType(t *testing.T) {
// span batch never contain deposit tx
depositTx := types.NewTx(&types.DepositTx{})
_, err := newSpanBatchTx(*depositTx)
assert.ErrorContains(t, err, "invalid tx type")
require.ErrorContains(t, err, "invalid tx type")
var sbtx spanBatchTx
sbtx.inner = &spanBatchDummyTxData{}
_, err = sbtx.convertToFullTx(0, 0, nil, nil, nil, nil, nil)
assert.ErrorContains(t, err, "invalid tx type")
require.ErrorContains(t, err, "invalid tx type")
}
func TestSpanBatchTxDecodeInvalid(t *testing.T) {
var sbtx spanBatchTx
_, err := sbtx.decodeTyped([]byte{})
assert.EqualError(t, err, "typed transaction too short")
require.EqualError(t, err, "typed transaction too short")
tx := types.NewTx(&types.LegacyTx{})
txEncoded, err := tx.MarshalBinary()
assert.NoError(t, err)
require.NoError(t, err)
// legacy tx is not typed tx
_, err = sbtx.decodeTyped(txEncoded)
assert.EqualError(t, err, types.ErrTxTypeNotSupported.Error())
require.EqualError(t, err, types.ErrTxTypeNotSupported.Error())
tx2 := types.NewTx(&types.AccessListTx{})
tx2Encoded, err := tx2.MarshalBinary()
assert.NoError(t, err)
require.NoError(t, err)
tx2Encoded[0] = types.DynamicFeeTxType
_, err = sbtx.decodeTyped(tx2Encoded)
assert.ErrorContains(t, err, "failed to decode spanBatchDynamicFeeTxData")
require.ErrorContains(t, err, "failed to decode spanBatchDynamicFeeTxData")
tx3 := types.NewTx(&types.DynamicFeeTx{})
tx3Encoded, err := tx3.MarshalBinary()
assert.NoError(t, err)
require.NoError(t, err)
tx3Encoded[0] = types.AccessListTxType
_, err = sbtx.decodeTyped(tx3Encoded)
assert.ErrorContains(t, err, "failed to decode spanBatchAccessListTxData")
require.ErrorContains(t, err, "failed to decode spanBatchAccessListTxData")
invalidLegacyTxDecoded := []byte{0xFF, 0xFF}
err = sbtx.UnmarshalBinary(invalidLegacyTxDecoded)
assert.ErrorContains(t, err, "failed to decode spanBatchLegacyTxData")
require.ErrorContains(t, err, "failed to decode spanBatchLegacyTxData")
}
......@@ -67,8 +67,8 @@ func (btx *spanBatchTxs) decodeContractCreationBits(r *bytes.Reader) error {
contractCreationBitBufferLen++
}
// avoid out of memory before allocation
if contractCreationBitBufferLen > MaxSpanBatchFieldSize {
return ErrTooBigSpanBatchFieldSize
if contractCreationBitBufferLen > MaxSpanBatchSize {
return ErrTooBigSpanBatchSize
}
contractCreationBitBuffer := make([]byte, contractCreationBitBufferLen)
_, err := io.ReadFull(r, contractCreationBitBuffer)
......@@ -190,8 +190,8 @@ func (btx *spanBatchTxs) decodeYParityBits(r *bytes.Reader) error {
yParityBitBufferLen++
}
// avoid out of memory before allocation
if yParityBitBufferLen > MaxSpanBatchFieldSize {
return ErrTooBigSpanBatchFieldSize
if yParityBitBufferLen > MaxSpanBatchSize {
return ErrTooBigSpanBatchSize
}
yParityBitBuffer := make([]byte, yParityBitBufferLen)
_, err := io.ReadFull(r, yParityBitBuffer)
......
This diff is collapsed.
package test
import (
"math/big"
"math/rand"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
......@@ -21,3 +22,13 @@ func RandomL2Block(rng *rand.Rand, txCount int) (*types.Block, []*types.Receipt)
}
return testutils.RandomBlockPrependTxs(rng, txCount, types.NewTx(l1InfoTx))
}
func RandomL2BlockWithChainId(rng *rand.Rand, txCount int, chainId *big.Int) *types.Block {
signer := types.NewLondonSigner(chainId)
block, _ := RandomL2Block(rng, 0)
txs := []*types.Transaction{block.Transactions()[0]} // L1 info deposit TX
for i := 0; i < txCount; i++ {
txs = append(txs, testutils.RandomTx(rng, big.NewInt(int64(rng.Uint32())), signer))
}
return block.WithBody(txs, nil)
}
......@@ -188,6 +188,9 @@ func NewDriverConfig(ctx *cli.Context) *driver.Config {
func NewRollupConfig(log log.Logger, ctx *cli.Context) (*rollup.Config, error) {
network := ctx.String(flags.Network.Name)
rollupConfigPath := ctx.String(flags.RollupConfig.Name)
if ctx.Bool(flags.BetaExtraNetworks.Name) {
log.Warn("The beta.extra-networks flag is deprecated and can be omitted safely.")
}
if network != "" {
if rollupConfigPath != "" {
log.Error(`Cannot configure network and rollup-config at the same time.
......@@ -195,10 +198,6 @@ Startup will proceed to use the network-parameter and ignore the rollup config.
Conflicting configuration is deprecated, and will stop the op-node from starting in the future.
`, "network", network, "rollup_config", rollupConfigPath)
}
// check that the network is available
if !chaincfg.IsAvailableNetwork(network, ctx.Bool(flags.BetaExtraNetworks.Name)) {
return nil, fmt.Errorf("unavailable network: %q", network)
}
config, err := chaincfg.GetRollupConfig(network)
if err != nil {
return nil, err
......
......@@ -109,6 +109,14 @@ func (o *OracleEngine) L2BlockRefByHash(ctx context.Context, l2Hash common.Hash)
return derive.L2BlockToBlockRef(block, &o.rollupCfg.Genesis)
}
func (o *OracleEngine) L2BlockRefByNumber(ctx context.Context, n uint64) (eth.L2BlockRef, error) {
hash := o.backend.GetCanonicalHash(n)
if hash == (common.Hash{}) {
return eth.L2BlockRef{}, ErrNotFound
}
return o.L2BlockRefByHash(ctx, hash)
}
func (o *OracleEngine) SystemConfigByL2Hash(ctx context.Context, hash common.Hash) (eth.SystemConfig, error) {
payload, err := o.PayloadByHash(ctx, hash)
if err != nil {
......
......@@ -75,7 +75,7 @@ func TestDefaultCLIOptionsMatchDefaultConfig(t *testing.T) {
func TestNetwork(t *testing.T) {
t.Run("Unknown", func(t *testing.T) {
verifyArgsInvalid(t, "unavailable network: \"bar\"", replaceRequiredArg("--network", "bar"))
verifyArgsInvalid(t, "invalid network: \"bar\"", replaceRequiredArg("--network", "bar"))
})
t.Run("Required", func(t *testing.T) {
......
......@@ -139,9 +139,13 @@ var RPCProviderKinds = []RPCProviderKind{
RPCKindBasic,
RPCKindAny,
RPCKindStandard,
RPCKindRethDB,
}
// Copy of RPCProviderKinds with RethDB added to all RethDB to be used but to hide it from the flags
var validRPCProviderKinds = func() []RPCProviderKind {
return append(RPCProviderKinds, RPCKindRethDB)
}()
func (kind RPCProviderKind) String() string {
return string(kind)
}
......@@ -160,7 +164,7 @@ func (kind *RPCProviderKind) Clone() any {
}
func ValidRPCProviderKind(value RPCProviderKind) bool {
for _, k := range RPCProviderKinds {
for _, k := range validRPCProviderKinds {
if k == value {
return true
}
......
This diff is collapsed.
This diff is collapsed.
......@@ -159,6 +159,7 @@ services:
OP_BATCHER_PPROF_ENABLED: "true"
OP_BATCHER_METRICS_ENABLED: "true"
OP_BATCHER_RPC_ENABLE_ADMIN: "true"
OP_BATCHER_BATCH_TYPE: 0
artifact-server:
depends_on:
......
This diff is collapsed.
This diff is collapsed.
......@@ -35,7 +35,7 @@
},
"dependencies": {
"@eth-optimism/core-utils": "workspace:*",
"@sentry/node": "^7.75.0",
"@sentry/node": "^7.77.0",
"bcfg": "^0.2.1",
"body-parser": "^1.20.2",
"commander": "^11.1.0",
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment