Commit b9ae70a0 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into qbzzt/230321-stop-start

parents 2ecfb76e f088859a
---
'@eth-optimism/sdk': patch
---
Have SDK automatically create Standard and ETH bridges when L1StandardBridge is provided.
---
'@eth-optimism/batch-submitter-service': patch
---
Allow deposit only batches
---
'@eth-optimism/contracts-bedrock': patch
---
Added a contsructor to the System Dictator
...@@ -518,6 +518,10 @@ jobs: ...@@ -518,6 +518,10 @@ jobs:
patterns: packages patterns: packages
# Note: The below needs to be manually configured whenever we # Note: The below needs to be manually configured whenever we
# add a new package to CI. # add a new package to CI.
- run:
name: Check common-ts
command: npx depcheck
working_directory: packages/common-ts
- run: - run:
name: Check contracts name: Check contracts
command: npx depcheck command: npx depcheck
......
...@@ -66,6 +66,7 @@ You'll need the following: ...@@ -66,6 +66,7 @@ You'll need the following:
* [Yarn](https://classic.yarnpkg.com/en/docs/install) * [Yarn](https://classic.yarnpkg.com/en/docs/install)
* [Docker](https://docs.docker.com/get-docker/) * [Docker](https://docs.docker.com/get-docker/)
* [Docker Compose](https://docs.docker.com/compose/install/) * [Docker Compose](https://docs.docker.com/compose/install/)
* [Go](https://go.dev/dl/)
* [Foundry](https://getfoundry.sh) * [Foundry](https://getfoundry.sh)
### Setup ### Setup
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"github.com/ethereum-optimism/optimism/batch-submitter/drivers/sequencer" "github.com/ethereum-optimism/optimism/batch-submitter/drivers/sequencer"
l2common "github.com/ethereum-optimism/optimism/l2geth/common" l2common "github.com/ethereum-optimism/optimism/l2geth/common"
"github.com/ethereum-optimism/optimism/l2geth/core/types"
l2types "github.com/ethereum-optimism/optimism/l2geth/core/types" l2types "github.com/ethereum-optimism/optimism/l2geth/core/types"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
...@@ -47,3 +48,76 @@ func TestBatchElementFromBlock(t *testing.T) { ...@@ -47,3 +48,76 @@ func TestBatchElementFromBlock(t *testing.T) {
require.False(t, element.IsSequencerTx()) require.False(t, element.IsSequencerTx())
require.Nil(t, element.Tx) require.Nil(t, element.Tx)
} }
func TestGenSequencerParams(t *testing.T) {
tx := types.NewTransaction(0, l2common.Address{}, big.NewInt(0), 0, big.NewInt(0), []byte{})
shouldStartAtElement := uint64(1)
blockOffset := uint64(1)
batches := []sequencer.BatchElement{
{Timestamp: 1, BlockNumber: 1},
{Timestamp: 1, BlockNumber: 1, Tx: sequencer.NewCachedTx(tx)},
}
params, err := sequencer.GenSequencerBatchParams(shouldStartAtElement, blockOffset, batches)
require.NoError(t, err)
require.Equal(t, uint64(0), params.ShouldStartAtElement)
require.Equal(t, uint64(len(batches)), params.TotalElementsToAppend)
require.Equal(t, len(batches), len(params.Contexts))
// There is only 1 sequencer tx
require.Equal(t, 1, len(params.Txs))
// There are 2 contexts
// The first context contains the deposit
context1 := params.Contexts[0]
require.Equal(t, uint64(0), context1.NumSequencedTxs)
require.Equal(t, uint64(1), context1.NumSubsequentQueueTxs)
require.Equal(t, uint64(1), context1.Timestamp)
require.Equal(t, uint64(1), context1.BlockNumber)
// The second context contains the sequencer tx
context2 := params.Contexts[1]
require.Equal(t, uint64(1), context2.NumSequencedTxs)
require.Equal(t, uint64(0), context2.NumSubsequentQueueTxs)
require.Equal(t, uint64(1), context2.Timestamp)
require.Equal(t, uint64(1), context2.BlockNumber)
}
func TestGenSequencerParamsOnlyDeposits(t *testing.T) {
shouldStartAtElement := uint64(1)
blockOffset := uint64(1)
batches := []sequencer.BatchElement{
{Timestamp: 1, BlockNumber: 1},
{Timestamp: 1, BlockNumber: 1},
{Timestamp: 2, BlockNumber: 2},
}
params, err := sequencer.GenSequencerBatchParams(shouldStartAtElement, blockOffset, batches)
require.NoError(t, err)
// The batches will pack deposits into the same context when their
// timestamps and blocknumbers are the same
require.Equal(t, uint64(0), params.ShouldStartAtElement)
require.Equal(t, uint64(len(batches)), params.TotalElementsToAppend)
// 2 deposits have the same timestamp + blocknumber, they go in the
// same context. 1 deposit has a different timestamp + blocknumber,
// it goes into a different context. Therefore there are 2 contexts
require.Equal(t, 2, len(params.Contexts))
// No sequencer txs
require.Equal(t, 0, len(params.Txs))
// There are 2 contexts
// The first context contains the deposit
context1 := params.Contexts[0]
require.Equal(t, uint64(0), context1.NumSequencedTxs)
require.Equal(t, uint64(2), context1.NumSubsequentQueueTxs)
require.Equal(t, uint64(1), context1.Timestamp)
require.Equal(t, uint64(1), context1.BlockNumber)
context2 := params.Contexts[1]
require.Equal(t, uint64(0), context2.NumSequencedTxs)
require.Equal(t, uint64(1), context2.NumSubsequentQueueTxs)
require.Equal(t, uint64(2), context2.Timestamp)
require.Equal(t, uint64(2), context2.BlockNumber)
}
...@@ -222,11 +222,6 @@ func (p *AppendSequencerBatchParams) Write( ...@@ -222,11 +222,6 @@ func (p *AppendSequencerBatchParams) Write(
return ErrMalformedBatch return ErrMalformedBatch
} }
// There must be transactions if there are contexts
if len(p.Txs) == 0 && len(p.Contexts) != 0 {
return ErrMalformedBatch
}
// copy the contexts as to not malleate the struct // copy the contexts as to not malleate the struct
// when it is a typed batch // when it is a typed batch
contexts := make([]BatchContext, 0, len(p.Contexts)+1) contexts := make([]BatchContext, 0, len(p.Contexts)+1)
...@@ -361,9 +356,6 @@ func (p *AppendSequencerBatchParams) Read(r io.Reader) error { ...@@ -361,9 +356,6 @@ func (p *AppendSequencerBatchParams) Read(r io.Reader) error {
if len(p.Contexts) == 0 && len(p.Txs) != 0 { if len(p.Contexts) == 0 && len(p.Txs) != 0 {
return ErrMalformedBatch return ErrMalformedBatch
} }
if len(p.Txs) == 0 && len(p.Contexts) != 0 {
return ErrMalformedBatch
}
return closeReader() return closeReader()
} else if err != nil { } else if err != nil {
return err return err
......
...@@ -46,7 +46,7 @@ ...@@ -46,7 +46,7 @@
} }
], ],
"txs": [], "txs": [],
"error": true "error": false
}, },
{ {
"name": "multiple-contexts-no-txs", "name": "multiple-contexts-no-txs",
...@@ -80,7 +80,7 @@ ...@@ -80,7 +80,7 @@
} }
], ],
"txs": [], "txs": [],
"error": true "error": false
}, },
{ {
"name": "complex", "name": "complex",
......
...@@ -166,6 +166,7 @@ module.exports = { ...@@ -166,6 +166,7 @@ module.exports = {
children: [ children: [
"/docs/build/tutorials/add-attr.md", "/docs/build/tutorials/add-attr.md",
"/docs/build/tutorials/new-precomp.md", "/docs/build/tutorials/new-precomp.md",
"/docs/build/tutorials/predeploys.md"
] ]
} // End of tutorials } // End of tutorials
], ],
......
...@@ -32,6 +32,11 @@ ...@@ -32,6 +32,11 @@
<i class="fab fa-discord"></i> Discord community <i class="fab fa-discord"></i> Discord community
</a> </a>
</li> </li>
<li>
<a href="https://wkf.ms/3XTdpLl" target="_blank" rel="noopener noreferrer">
<i class="far fa-comment-dots"></i> Get support for going live
</a>
</li>
</ul> </ul>
</div> </div>
</div> </div>
......
...@@ -201,7 +201,7 @@ Once you’ve built both repositories, you’ll need head back to the Optimism M ...@@ -201,7 +201,7 @@ Once you’ve built both repositories, you’ll need head back to the Optimism M
- Replace `"BATCHER"` with the address of the Batcher account you generated earlier. - Replace `"BATCHER"` with the address of the Batcher account you generated earlier.
- Replace `"SEQUENCER"` with the address of the Sequencer account you generated earlier. - Replace `"SEQUENCER"` with the address of the Sequencer account you generated earlier.
- Replace `"BLOCKHASH"` with the blockhash you got from the `cast` command. - Replace `"BLOCKHASH"` with the blockhash you got from the `cast` command.
- Replace `"TIMESTAMP"` with the timestamp you got from the `cast` command. Note that although all the other fields are strings, this field is a number! Don’t include the quotation marks. - Replace `TIMESTAMP` with the timestamp you got from the `cast` command. Note that although all the other fields are strings, this field is a number! Don’t include the quotation marks.
## Deploy the L1 contracts ## Deploy the L1 contracts
......
---
title: Modifying Predeployed Contracts
lang: en-US
---
::: warning 🚧 OP Stack Hacks are explicitly things that you can do with the OP Stack that are *not* currently intended for production use
OP Stack Hacks are not for the faint of heart. You will not be able to receive significant developer support for OP Stack Hacks — be prepared to get your hands dirty and to work without support.
:::
OP Stack blockchains have a number of [predeployed contracts](https://github.com/ethereum-optimism/optimism/blob/develop/packages/contracts-bedrock/src/constants.ts) that provide important functionality.
Most of those contracts are proxies that can be upgraded using the `proxyAdminOwner` which was configured when the network was initially deployed.
The predeploys are controlled from a predeploy called [`ProxyAdmin`](https://github.com/ethereum-optimism/optimism/blob/develop/packages/contracts-bedrock/contracts/universal/ProxyAdmin.sol), whose address is `0x4200000000000000000000000000000000000018`.
The function to call is [`upgrade(address,address)`](https://github.com/ethereum-optimism/optimism/blob/develop/packages/contracts-bedrock/contracts/universal/ProxyAdmin.sol#L211-L229).
The first parameter is the proxy to upgrade, and the second is the address of a new implementation.
For example, the legacy `L1BlockNumber` contract is at `0x420...013`.
To disable this function, we'll set the implementation to `0x00...00`.
We do this using the [Foundry](https://book.getfoundry.sh/) command `cast`.
1. We'll need several constants.
- Set these addresses as variables in your terminal.
```sh
L1BLOCKNUM=0x4200000000000000000000000000000000000013
PROXY_ADMIN=0x4200000000000000000000000000000000000018
ZERO_ADDR=0x0000000000000000000000000000000000000000
```
- Set `PRIVKEY` to the private key of your ADMIN account.
- Set `ETH_RPC_URL`. If you're on the computer that runs the blockchain, use this command.
```sh
export ETH_RPC_URL=http://localhost:8545
```
1. Verify `L1BlockNumber` works correctly.
See that when you call the contract you get a block number, and twelve seconds later you get the next one (block time on L1 is twelve seconds).
```sh
cast call $L1BLOCKNUM 'number()' | cast --to-dec
sleep 12 && cast call $L1BLOCKNUM 'number()' | cast --to-dec
```
1. Get the current implementation for the contract.
```sh
L1BLOCKNUM_IMPLEMENTATION=`cast call $L1BLOCKNUM "implementation()" | sed 's/000000000000000000000000//'`
echo $L1BLOCKNUM_IMPLEMENTATION
```
1. Change the implementation to the zero address
```sh
cast send --private-key $PRIVKEY $PROXY_ADMIN "upgrade(address,address)" $L1BLOCKNUM $ZERO_ADDR
```
1. See that the implementation is address zero, and that calling it fails.
```sh
cast call $L1BLOCKNUM 'implementation()'
cast call $L1BLOCKNUM 'number()'
```
1. Fix the predeploy by returning it to the previous implementation, and verify it works.
```sh
cast send --private-key $PRIVKEY $PROXY_ADMIN "upgrade(address,address)" $L1BLOCKNUM $L1BLOCKNUM_IMPLEMENTATION
cast call $L1BLOCKNUM 'number()' | cast --to-dec
```
\ No newline at end of file
...@@ -20,6 +20,7 @@ lint: ...@@ -20,6 +20,7 @@ lint:
golangci-lint run -E goimports,sqlclosecheck,bodyclose,asciicheck,misspell,errorlint -e "errors.As" -e "errors.Is" golangci-lint run -E goimports,sqlclosecheck,bodyclose,asciicheck,misspell,errorlint -e "errors.As" -e "errors.Is"
fuzz: fuzz:
go test -run NOTAREALTEST -v -fuzztime 10s -fuzz FuzzChannelConfig_CheckTimeout ./batcher
go test -run NOTAREALTEST -v -fuzztime 10s -fuzz FuzzDurationZero ./batcher go test -run NOTAREALTEST -v -fuzztime 10s -fuzz FuzzDurationZero ./batcher
go test -run NOTAREALTEST -v -fuzztime 10s -fuzz FuzzDurationTimeoutMaxChannelDuration ./batcher go test -run NOTAREALTEST -v -fuzztime 10s -fuzz FuzzDurationTimeoutMaxChannelDuration ./batcher
go test -run NOTAREALTEST -v -fuzztime 10s -fuzz FuzzDurationTimeoutZeroMaxChannelDuration ./batcher go test -run NOTAREALTEST -v -fuzztime 10s -fuzz FuzzDurationTimeoutZeroMaxChannelDuration ./batcher
......
...@@ -12,8 +12,6 @@ import ( ...@@ -12,8 +12,6 @@ import (
) )
var ( var (
ErrZeroMaxFrameSize = errors.New("max frame size cannot be zero")
ErrSmallMaxFrameSize = errors.New("max frame size cannot be less than 23")
ErrInvalidChannelTimeout = errors.New("channel timeout is less than the safety margin") ErrInvalidChannelTimeout = errors.New("channel timeout is less than the safety margin")
ErrInputTargetReached = errors.New("target amount of input data reached") ErrInputTargetReached = errors.New("target amount of input data reached")
ErrMaxFrameIndex = errors.New("max frame index reached (uint16)") ErrMaxFrameIndex = errors.New("max frame index reached (uint16)")
...@@ -83,15 +81,15 @@ func (cc *ChannelConfig) Check() error { ...@@ -83,15 +81,15 @@ func (cc *ChannelConfig) Check() error {
// will infinitely loop when trying to create frames in the // will infinitely loop when trying to create frames in the
// [channelBuilder.OutputFrames] function. // [channelBuilder.OutputFrames] function.
if cc.MaxFrameSize == 0 { if cc.MaxFrameSize == 0 {
return ErrZeroMaxFrameSize return errors.New("max frame size cannot be zero")
} }
// If the [MaxFrameSize] is set to < 23, the channel out // If the [MaxFrameSize] is less than [FrameV0OverHeadSize], the channel
// will underflow the maxSize variable in the [derive.ChannelOut]. // out will underflow the maxSize variable in the [derive.ChannelOut].
// Since it is of type uint64, it will wrap around to a very large // Since it is of type uint64, it will wrap around to a very large
// number, making the frame size extremely large. // number, making the frame size extremely large.
if cc.MaxFrameSize < 23 { if cc.MaxFrameSize < derive.FrameV0OverHeadSize {
return ErrSmallMaxFrameSize return fmt.Errorf("max frame size %d is less than the minimum 23", cc.MaxFrameSize)
} }
return nil return nil
......
This diff is collapsed.
...@@ -32,8 +32,7 @@ func TestPendingChannelTimeout(t *testing.T) { ...@@ -32,8 +32,7 @@ func TestPendingChannelTimeout(t *testing.T) {
require.False(t, timeout) require.False(t, timeout)
// Set the pending channel // Set the pending channel
err := m.ensurePendingChannel(eth.BlockID{}) require.NoError(t, m.ensurePendingChannel(eth.BlockID{}))
require.NoError(t, err)
// There are no confirmed transactions so // There are no confirmed transactions so
// the pending channel cannot be timed out // the pending channel cannot be timed out
...@@ -85,14 +84,10 @@ func TestChannelManagerReturnsErrReorg(t *testing.T) { ...@@ -85,14 +84,10 @@ func TestChannelManagerReturnsErrReorg(t *testing.T) {
ParentHash: common.Hash{0xff}, ParentHash: common.Hash{0xff},
}, nil, nil, nil, nil) }, nil, nil, nil, nil)
err := m.AddL2Block(a) require.NoError(t, m.AddL2Block(a))
require.NoError(t, err) require.NoError(t, m.AddL2Block(b))
err = m.AddL2Block(b) require.NoError(t, m.AddL2Block(c))
require.NoError(t, err) require.ErrorIs(t, m.AddL2Block(x), ErrReorg)
err = m.AddL2Block(c)
require.NoError(t, err)
err = m.AddL2Block(x)
require.ErrorIs(t, err, ErrReorg)
require.Equal(t, []*types.Block{a, b, c}, m.blocks) require.Equal(t, []*types.Block{a, b, c}, m.blocks)
} }
...@@ -111,16 +106,14 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) { ...@@ -111,16 +106,14 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
a := newMiniL2Block(0) a := newMiniL2Block(0)
x := newMiniL2BlockWithNumberParent(0, big.NewInt(1), common.Hash{0xff}) x := newMiniL2BlockWithNumberParent(0, big.NewInt(1), common.Hash{0xff})
err := m.AddL2Block(a) require.NoError(t, m.AddL2Block(a))
require.NoError(t, err)
_, err = m.TxData(eth.BlockID{}) _, err := m.TxData(eth.BlockID{})
require.NoError(t, err) require.NoError(t, err)
_, err = m.TxData(eth.BlockID{}) _, err = m.TxData(eth.BlockID{})
require.ErrorIs(t, err, io.EOF) require.ErrorIs(t, err, io.EOF)
err = m.AddL2Block(x) require.ErrorIs(t, m.AddL2Block(x), ErrReorg)
require.ErrorIs(t, err, ErrReorg)
} }
// TestChannelManagerNextTxData checks the nextTxData function. // TestChannelManagerNextTxData checks the nextTxData function.
...@@ -136,8 +129,7 @@ func TestChannelManagerNextTxData(t *testing.T) { ...@@ -136,8 +129,7 @@ func TestChannelManagerNextTxData(t *testing.T) {
// Set the pending channel // Set the pending channel
// The nextTxData function should still return EOF // The nextTxData function should still return EOF
// since the pending channel has no frames // since the pending channel has no frames
err = m.ensurePendingChannel(eth.BlockID{}) require.NoError(t, m.ensurePendingChannel(eth.BlockID{}))
require.NoError(t, err)
returnedTxData, err = m.nextTxData() returnedTxData, err = m.nextTxData()
require.ErrorIs(t, err, io.EOF) require.ErrorIs(t, err, io.EOF)
require.Equal(t, txData{}, returnedTxData) require.Equal(t, txData{}, returnedTxData)
...@@ -164,8 +156,10 @@ func TestChannelManagerNextTxData(t *testing.T) { ...@@ -164,8 +156,10 @@ func TestChannelManagerNextTxData(t *testing.T) {
require.Equal(t, expectedTxData, m.pendingTransactions[expectedChannelID]) require.Equal(t, expectedTxData, m.pendingTransactions[expectedChannelID])
} }
// TestClearChannelManager tests clearing the channel manager. // TestChannelManager_Clear tests clearing the channel manager.
func TestClearChannelManager(t *testing.T) { func TestChannelManager_Clear(t *testing.T) {
require := require.New(t)
// Create a channel manager // Create a channel manager
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
rng := rand.New(rand.NewSource(time.Now().UnixNano())) rng := rand.New(rand.NewSource(time.Now().UnixNano()))
...@@ -176,15 +170,17 @@ func TestClearChannelManager(t *testing.T) { ...@@ -176,15 +170,17 @@ func TestClearChannelManager(t *testing.T) {
ChannelTimeout: 10, ChannelTimeout: 10,
// Have to set the max frame size here otherwise the channel builder would not // Have to set the max frame size here otherwise the channel builder would not
// be able to output any frames // be able to output any frames
MaxFrameSize: 1, MaxFrameSize: 24,
TargetFrameSize: 24,
ApproxComprRatio: 1.0,
}) })
// Channel Manager state should be empty by default // Channel Manager state should be empty by default
require.Empty(t, m.blocks) require.Empty(m.blocks)
require.Equal(t, common.Hash{}, m.tip) require.Equal(common.Hash{}, m.tip)
require.Nil(t, m.pendingChannel) require.Nil(m.pendingChannel)
require.Empty(t, m.pendingTransactions) require.Empty(m.pendingTransactions)
require.Empty(t, m.confirmedTransactions) require.Empty(m.confirmedTransactions)
// Add a block to the channel manager // Add a block to the channel manager
a, _ := derivetest.RandomL2Block(rng, 4) a, _ := derivetest.RandomL2Block(rng, 4)
...@@ -193,28 +189,25 @@ func TestClearChannelManager(t *testing.T) { ...@@ -193,28 +189,25 @@ func TestClearChannelManager(t *testing.T) {
Hash: a.Hash(), Hash: a.Hash(),
Number: a.NumberU64(), Number: a.NumberU64(),
} }
err := m.AddL2Block(a) require.NoError(m.AddL2Block(a))
require.NoError(t, err)
// Make sure there is a channel builder // Make sure there is a channel builder
err = m.ensurePendingChannel(l1BlockID) require.NoError(m.ensurePendingChannel(l1BlockID))
require.NoError(t, err) require.NotNil(m.pendingChannel)
require.NotNil(t, m.pendingChannel) require.Len(m.confirmedTransactions, 0)
require.Equal(t, 0, len(m.confirmedTransactions))
// Process the blocks // Process the blocks
// We should have a pending channel with 1 frame // We should have a pending channel with 1 frame
// and no more blocks since processBlocks consumes // and no more blocks since processBlocks consumes
// the list // the list
err = m.processBlocks() require.NoError(m.processBlocks())
require.NoError(t, err) require.NoError(m.pendingChannel.co.Flush())
err = m.pendingChannel.OutputFrames() require.NoError(m.pendingChannel.OutputFrames())
require.NoError(t, err) _, err := m.nextTxData()
_, err = m.nextTxData() require.NoError(err)
require.NoError(t, err) require.Len(m.blocks, 0)
require.Equal(t, 0, len(m.blocks)) require.Equal(newL1Tip, m.tip)
require.Equal(t, newL1Tip, m.tip) require.Len(m.pendingTransactions, 1)
require.Equal(t, 1, len(m.pendingTransactions))
// Add a new block so we can test clearing // Add a new block so we can test clearing
// the channel manager with a full state // the channel manager with a full state
...@@ -222,20 +215,19 @@ func TestClearChannelManager(t *testing.T) { ...@@ -222,20 +215,19 @@ func TestClearChannelManager(t *testing.T) {
Number: big.NewInt(1), Number: big.NewInt(1),
ParentHash: a.Hash(), ParentHash: a.Hash(),
}, nil, nil, nil, nil) }, nil, nil, nil, nil)
err = m.AddL2Block(b) require.NoError(m.AddL2Block(b))
require.NoError(t, err) require.Len(m.blocks, 1)
require.Equal(t, 1, len(m.blocks)) require.Equal(b.Hash(), m.tip)
require.Equal(t, b.Hash(), m.tip)
// Clear the channel manager // Clear the channel manager
m.Clear() m.Clear()
// Check that the entire channel manager state cleared // Check that the entire channel manager state cleared
require.Empty(t, m.blocks) require.Empty(m.blocks)
require.Equal(t, common.Hash{}, m.tip) require.Equal(common.Hash{}, m.tip)
require.Nil(t, m.pendingChannel) require.Nil(m.pendingChannel)
require.Empty(t, m.pendingTransactions) require.Empty(m.pendingTransactions)
require.Empty(t, m.confirmedTransactions) require.Empty(m.confirmedTransactions)
} }
// TestChannelManagerTxConfirmed checks the [ChannelManager.TxConfirmed] function. // TestChannelManagerTxConfirmed checks the [ChannelManager.TxConfirmed] function.
...@@ -251,8 +243,7 @@ func TestChannelManagerTxConfirmed(t *testing.T) { ...@@ -251,8 +243,7 @@ func TestChannelManagerTxConfirmed(t *testing.T) {
// Let's add a valid pending transaction to the channel manager // Let's add a valid pending transaction to the channel manager
// So we can demonstrate that TxConfirmed's correctness // So we can demonstrate that TxConfirmed's correctness
err := m.ensurePendingChannel(eth.BlockID{}) require.NoError(t, m.ensurePendingChannel(eth.BlockID{}))
require.NoError(t, err)
channelID := m.pendingChannel.ID() channelID := m.pendingChannel.ID()
frame := frameData{ frame := frameData{
data: []byte{}, data: []byte{},
...@@ -270,7 +261,7 @@ func TestChannelManagerTxConfirmed(t *testing.T) { ...@@ -270,7 +261,7 @@ func TestChannelManagerTxConfirmed(t *testing.T) {
require.Equal(t, expectedTxData, returnedTxData) require.Equal(t, expectedTxData, returnedTxData)
require.Equal(t, 0, m.pendingChannel.NumFrames()) require.Equal(t, 0, m.pendingChannel.NumFrames())
require.Equal(t, expectedTxData, m.pendingTransactions[expectedChannelID]) require.Equal(t, expectedTxData, m.pendingTransactions[expectedChannelID])
require.Equal(t, 1, len(m.pendingTransactions)) require.Len(t, m.pendingTransactions, 1)
// An unknown pending transaction should not be marked as confirmed // An unknown pending transaction should not be marked as confirmed
// and should not be removed from the pending transactions map // and should not be removed from the pending transactions map
...@@ -281,14 +272,14 @@ func TestChannelManagerTxConfirmed(t *testing.T) { ...@@ -281,14 +272,14 @@ func TestChannelManagerTxConfirmed(t *testing.T) {
blockID := eth.BlockID{Number: 0, Hash: common.Hash{0x69}} blockID := eth.BlockID{Number: 0, Hash: common.Hash{0x69}}
m.TxConfirmed(unknownTxID, blockID) m.TxConfirmed(unknownTxID, blockID)
require.Empty(t, m.confirmedTransactions) require.Empty(t, m.confirmedTransactions)
require.Equal(t, 1, len(m.pendingTransactions)) require.Len(t, m.pendingTransactions, 1)
// Now let's mark the pending transaction as confirmed // Now let's mark the pending transaction as confirmed
// and check that it is removed from the pending transactions map // and check that it is removed from the pending transactions map
// and added to the confirmed transactions map // and added to the confirmed transactions map
m.TxConfirmed(expectedChannelID, blockID) m.TxConfirmed(expectedChannelID, blockID)
require.Empty(t, m.pendingTransactions) require.Empty(t, m.pendingTransactions)
require.Equal(t, 1, len(m.confirmedTransactions)) require.Len(t, m.confirmedTransactions, 1)
require.Equal(t, blockID, m.confirmedTransactions[expectedChannelID]) require.Equal(t, blockID, m.confirmedTransactions[expectedChannelID])
} }
...@@ -300,8 +291,7 @@ func TestChannelManagerTxFailed(t *testing.T) { ...@@ -300,8 +291,7 @@ func TestChannelManagerTxFailed(t *testing.T) {
// Let's add a valid pending transaction to the channel // Let's add a valid pending transaction to the channel
// manager so we can demonstrate correctness // manager so we can demonstrate correctness
err := m.ensurePendingChannel(eth.BlockID{}) require.NoError(t, m.ensurePendingChannel(eth.BlockID{}))
require.NoError(t, err)
channelID := m.pendingChannel.ID() channelID := m.pendingChannel.ID()
frame := frameData{ frame := frameData{
data: []byte{}, data: []byte{},
...@@ -319,7 +309,7 @@ func TestChannelManagerTxFailed(t *testing.T) { ...@@ -319,7 +309,7 @@ func TestChannelManagerTxFailed(t *testing.T) {
require.Equal(t, expectedTxData, returnedTxData) require.Equal(t, expectedTxData, returnedTxData)
require.Equal(t, 0, m.pendingChannel.NumFrames()) require.Equal(t, 0, m.pendingChannel.NumFrames())
require.Equal(t, expectedTxData, m.pendingTransactions[expectedChannelID]) require.Equal(t, expectedTxData, m.pendingTransactions[expectedChannelID])
require.Equal(t, 1, len(m.pendingTransactions)) require.Len(t, m.pendingTransactions, 1)
// Trying to mark an unknown pending transaction as failed // Trying to mark an unknown pending transaction as failed
// shouldn't modify state // shouldn't modify state
...@@ -348,8 +338,7 @@ func TestChannelManager_TxResend(t *testing.T) { ...@@ -348,8 +338,7 @@ func TestChannelManager_TxResend(t *testing.T) {
a, _ := derivetest.RandomL2Block(rng, 4) a, _ := derivetest.RandomL2Block(rng, 4)
err := m.AddL2Block(a) require.NoError(m.AddL2Block(a))
require.NoError(err)
txdata0, err := m.TxData(eth.BlockID{}) txdata0, err := m.TxData(eth.BlockID{})
require.NoError(err) require.NoError(err)
......
...@@ -110,6 +110,14 @@ type CLIConfig struct { ...@@ -110,6 +110,14 @@ type CLIConfig struct {
/* Optional Params */ /* Optional Params */
// TxManagerTimeout is the max amount of time to wait for the [txmgr].
// This will default to: 10 * time.Minute.
TxManagerTimeout time.Duration
// OfflineGasEstimation specifies whether the batcher should calculate
// gas estimations offline using the [core.IntrinsicGas] function.
OfflineGasEstimation bool
// MaxL1TxSize is the maximum size of a batch tx submitted to L1. // MaxL1TxSize is the maximum size of a batch tx submitted to L1.
MaxL1TxSize uint64 MaxL1TxSize uint64
...@@ -168,6 +176,8 @@ func NewConfig(ctx *cli.Context) CLIConfig { ...@@ -168,6 +176,8 @@ func NewConfig(ctx *cli.Context) CLIConfig {
ResubmissionTimeout: ctx.GlobalDuration(flags.ResubmissionTimeoutFlag.Name), ResubmissionTimeout: ctx.GlobalDuration(flags.ResubmissionTimeoutFlag.Name),
/* Optional Flags */ /* Optional Flags */
OfflineGasEstimation: ctx.GlobalBool(flags.OfflineGasEstimationFlag.Name),
TxManagerTimeout: ctx.GlobalDuration(flags.TxManagerTimeoutFlag.Name),
MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name), MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name),
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name), MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name), TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name),
......
...@@ -15,6 +15,7 @@ import ( ...@@ -15,6 +15,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto" opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto"
"github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -24,7 +25,7 @@ import ( ...@@ -24,7 +25,7 @@ import (
type BatchSubmitter struct { type BatchSubmitter struct {
Config // directly embed the config + sources Config // directly embed the config + sources
txMgr *TransactionManager txMgr txmgr.TxManager
wg sync.WaitGroup wg sync.WaitGroup
done chan struct{} done chan struct{}
...@@ -79,6 +80,7 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri ...@@ -79,6 +80,7 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
NumConfirmations: cfg.NumConfirmations, NumConfirmations: cfg.NumConfirmations,
SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount, SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount,
From: fromAddress, From: fromAddress,
ChainID: rcfg.L1ChainID,
Signer: signer(rcfg.L1ChainID), Signer: signer(rcfg.L1ChainID),
} }
...@@ -125,9 +127,7 @@ func NewBatchSubmitter(ctx context.Context, cfg Config, l log.Logger, m metrics. ...@@ -125,9 +127,7 @@ func NewBatchSubmitter(ctx context.Context, cfg Config, l log.Logger, m metrics.
return &BatchSubmitter{ return &BatchSubmitter{
Config: cfg, Config: cfg,
txMgr: NewTransactionManager(l, txMgr: txmgr.NewSimpleTxManager("batcher", l, cfg.TxManagerConfig, cfg.L1Client),
cfg.TxManagerConfig, cfg.Rollup.BatchInboxAddress, cfg.Rollup.L1ChainID,
cfg.From, cfg.L1Client),
state: NewChannelManager(l, m, cfg.Channel), state: NewChannelManager(l, m, cfg.Channel),
}, nil }, nil
...@@ -226,7 +226,7 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) { ...@@ -226,7 +226,7 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) {
// loadBlockIntoState fetches & stores a single block into `state`. It returns the block it loaded. // loadBlockIntoState fetches & stores a single block into `state`. It returns the block it loaded.
func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uint64) (*types.Block, error) { func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uint64) (*types.Block, error) {
ctx, cancel := context.WithTimeout(ctx, networkTimeout) ctx, cancel := context.WithTimeout(ctx, txManagerTimeout)
defer cancel() defer cancel()
block, err := l.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumber)) block, err := l.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumber))
if err != nil { if err != nil {
...@@ -244,7 +244,7 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin ...@@ -244,7 +244,7 @@ func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uin
// calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state. // calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state.
// It also takes care of initializing some local state (i.e. will modify l.lastStoredBlock in certain conditions) // It also takes care of initializing some local state (i.e. will modify l.lastStoredBlock in certain conditions)
func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.BlockID, eth.BlockID, error) { func (l *BatchSubmitter) calculateL2BlockRangeToStore(ctx context.Context) (eth.BlockID, eth.BlockID, error) {
childCtx, cancel := context.WithTimeout(ctx, networkTimeout) childCtx, cancel := context.WithTimeout(ctx, txManagerTimeout)
defer cancel() defer cancel()
syncStatus, err := l.RollupNode.SyncStatus(childCtx) syncStatus, err := l.RollupNode.SyncStatus(childCtx)
// Ensure that we have the sync status // Ensure that we have the sync status
...@@ -312,8 +312,9 @@ func (l *BatchSubmitter) loop() { ...@@ -312,8 +312,9 @@ func (l *BatchSubmitter) loop() {
l.log.Error("unable to get tx data", "err", err) l.log.Error("unable to get tx data", "err", err)
break break
} }
// Record TX Status // Record TX Status
if receipt, err := l.txMgr.SendTransaction(l.ctx, txdata.Bytes()); err != nil { if receipt, err := l.SendTransaction(l.ctx, txdata.Bytes()); err != nil {
l.recordFailedTx(txdata.ID(), err) l.recordFailedTx(txdata.ID(), err)
} else { } else {
l.recordConfirmedTx(txdata.ID(), receipt) l.recordConfirmedTx(txdata.ID(), receipt)
...@@ -335,6 +336,46 @@ func (l *BatchSubmitter) loop() { ...@@ -335,6 +336,46 @@ func (l *BatchSubmitter) loop() {
} }
} }
const networkTimeout = 2 * time.Second // How long a single network request can take. TODO: put in a config somewhere
// fix(refcell):
// combined with above, these config variables should also be replicated in the op-proposer
// along with op-proposer changes to include the updated tx manager
const txManagerTimeout = 2 * time.Minute // How long the tx manager can take to send a transaction.
// SendTransaction creates & submits a transaction to the batch inbox address with the given `data`.
// It currently uses the underlying `txmgr` to handle transaction sending & price management.
// This is a blocking method. It should not be called concurrently.
func (l *BatchSubmitter) SendTransaction(ctx context.Context, data []byte) (*types.Receipt, error) {
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.
intrinsicGas, err := core.IntrinsicGas(data, nil, false, true, true, false)
if err != nil {
return nil, fmt.Errorf("failed to calculate intrinsic gas: %w", err)
}
// Create the transaction
tx, err := l.txMgr.CraftTx(ctx, txmgr.TxCandidate{
To: l.Rollup.BatchInboxAddress,
TxData: data,
From: l.From,
GasLimit: intrinsicGas,
})
if err != nil {
return nil, fmt.Errorf("failed to create tx: %w", err)
}
// Send the transaction through the txmgr
ctx, cancel := context.WithTimeout(ctx, txManagerTimeout)
defer cancel()
if receipt, err := l.txMgr.Send(ctx, tx); err != nil {
l.log.Warn("unable to publish tx", "err", err, "data_size", len(data))
return nil, err
} else {
l.log.Info("tx successfully published", "tx_hash", receipt.TxHash, "data_size", len(data))
return receipt, nil
}
}
func (l *BatchSubmitter) recordL1Tip(l1tip eth.L1BlockRef) { func (l *BatchSubmitter) recordL1Tip(l1tip eth.L1BlockRef) {
if l.lastL1Tip == l1tip { if l.lastL1Tip == l1tip {
return return
......
package batcher
import (
"context"
"math/big"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum-optimism/optimism/op-service/txmgr/mocks"
)
// TestBatchSubmitter_SendTransaction tests the driver's
// [SendTransaction] external facing function.
func TestBatchSubmitter_SendTransaction(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit)
txMgr := mocks.TxManager{}
batcherInboxAddress := common.HexToAddress("0x42000000000000000000000000000000000000ff")
chainID := big.NewInt(1)
sender := common.HexToAddress("0xdeadbeef")
bs := BatchSubmitter{
Config: Config{
log: log,
From: sender,
Rollup: &rollup.Config{
L1ChainID: chainID,
BatchInboxAddress: batcherInboxAddress,
},
},
txMgr: &txMgr,
}
txData := []byte{0x00, 0x01, 0x02}
gasTipCap := big.NewInt(136)
gasFeeCap := big.NewInt(137)
gas := uint64(1337)
// Candidate gas should be calculated with [core.IntrinsicGas]
intrinsicGas, err := core.IntrinsicGas(txData, nil, false, true, true, false)
require.NoError(t, err)
candidate := txmgr.TxCandidate{
To: batcherInboxAddress,
TxData: txData,
From: sender,
GasLimit: intrinsicGas,
}
tx := types.NewTx(&types.DynamicFeeTx{
ChainID: chainID,
Nonce: 0,
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Gas: gas,
To: &batcherInboxAddress,
Data: txData,
})
txHash := tx.Hash()
expectedReceipt := types.Receipt{
Type: 1,
PostState: []byte{},
Status: uint64(1),
CumulativeGasUsed: gas,
TxHash: txHash,
GasUsed: gas,
}
txMgr.On("CraftTx", mock.Anything, candidate).Return(tx, nil)
txMgr.On("Send", mock.Anything, tx).Return(&expectedReceipt, nil)
receipt, err := bs.SendTransaction(context.Background(), tx.Data())
require.NoError(t, err)
require.Equal(t, receipt, &expectedReceipt)
}
package batcher
import (
"context"
"fmt"
"math/big"
"time"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto"
)
const networkTimeout = 2 * time.Second // How long a single network request can take. TODO: put in a config somewhere
// TransactionManager wraps the simple txmgr package to make it easy to send & wait for transactions
type TransactionManager struct {
// Config
batchInboxAddress common.Address
senderAddress common.Address
chainID *big.Int
// Outside world
txMgr txmgr.TxManager
l1Client *ethclient.Client
signerFn opcrypto.SignerFn
log log.Logger
}
func NewTransactionManager(log log.Logger, txMgrConfg txmgr.Config, batchInboxAddress common.Address, chainID *big.Int, senderAddress common.Address, l1Client *ethclient.Client) *TransactionManager {
t := &TransactionManager{
batchInboxAddress: batchInboxAddress,
senderAddress: senderAddress,
chainID: chainID,
txMgr: txmgr.NewSimpleTxManager("batcher", log, txMgrConfg, l1Client),
l1Client: l1Client,
signerFn: txMgrConfg.Signer,
log: log,
}
return t
}
// SendTransaction creates & submits a transaction to the batch inbox address with the given `data`.
// It currently uses the underlying `txmgr` to handle transaction sending & price management.
// This is a blocking method. It should not be called concurrently.
// TODO: where to put concurrent transaction handling logic.
func (t *TransactionManager) SendTransaction(ctx context.Context, data []byte) (*types.Receipt, error) {
tx, err := t.CraftTx(ctx, data)
if err != nil {
return nil, fmt.Errorf("failed to create tx: %w", err)
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) // TODO: Select a timeout that makes sense here.
defer cancel()
if receipt, err := t.txMgr.Send(ctx, tx); err != nil {
t.log.Warn("unable to publish tx", "err", err, "data_size", len(data))
return nil, err
} else {
t.log.Info("tx successfully published", "tx_hash", receipt.TxHash, "data_size", len(data))
return receipt, nil
}
}
// calcGasTipAndFeeCap queries L1 to determine what a suitable miner tip & basefee limit would be for timely inclusion
func (t *TransactionManager) calcGasTipAndFeeCap(ctx context.Context) (gasTipCap *big.Int, gasFeeCap *big.Int, err error) {
childCtx, cancel := context.WithTimeout(ctx, networkTimeout)
gasTipCap, err = t.l1Client.SuggestGasTipCap(childCtx)
cancel()
if err != nil {
return nil, nil, fmt.Errorf("failed to get suggested gas tip cap: %w", err)
}
if gasTipCap == nil {
t.log.Warn("unexpected unset gasTipCap, using default 2 gwei")
gasTipCap = new(big.Int).SetUint64(params.GWei * 2)
}
childCtx, cancel = context.WithTimeout(ctx, networkTimeout)
head, err := t.l1Client.HeaderByNumber(childCtx, nil)
cancel()
if err != nil || head == nil {
return nil, nil, fmt.Errorf("failed to get L1 head block for fee cap: %w", err)
}
if head.BaseFee == nil {
return nil, nil, fmt.Errorf("failed to get L1 basefee in block %d for fee cap", head.Number)
}
gasFeeCap = txmgr.CalcGasFeeCap(head.BaseFee, gasTipCap)
return gasTipCap, gasFeeCap, nil
}
// CraftTx creates the signed transaction to the batchInboxAddress.
// It queries L1 for the current fee market conditions as well as for the nonce.
// NOTE: This method SHOULD NOT publish the resulting transaction.
func (t *TransactionManager) CraftTx(ctx context.Context, data []byte) (*types.Transaction, error) {
gasTipCap, gasFeeCap, err := t.calcGasTipAndFeeCap(ctx)
if err != nil {
return nil, err
}
childCtx, cancel := context.WithTimeout(ctx, networkTimeout)
nonce, err := t.l1Client.NonceAt(childCtx, t.senderAddress, nil)
cancel()
if err != nil {
return nil, fmt.Errorf("failed to get nonce: %w", err)
}
rawTx := &types.DynamicFeeTx{
ChainID: t.chainID,
Nonce: nonce,
To: &t.batchInboxAddress,
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Data: data,
}
t.log.Info("creating tx", "to", rawTx.To, "from", t.senderAddress)
gas, err := core.IntrinsicGas(rawTx.Data, nil, false, true, true, false)
if err != nil {
return nil, fmt.Errorf("failed to calculate intrinsic gas: %w", err)
}
rawTx.Gas = gas
ctx, cancel = context.WithTimeout(ctx, networkTimeout)
defer cancel()
tx := types.NewTx(rawTx)
return t.signerFn(ctx, t.senderAddress, tx)
}
package flags package flags
import ( import (
"time"
"github.com/urfave/cli" "github.com/urfave/cli"
"github.com/ethereum-optimism/optimism/op-batcher/rpc" "github.com/ethereum-optimism/optimism/op-batcher/rpc"
...@@ -74,7 +76,17 @@ var ( ...@@ -74,7 +76,17 @@ var (
} }
/* Optional flags */ /* Optional flags */
OfflineGasEstimationFlag = cli.BoolFlag{
Name: "offline-gas-estimation",
Usage: "Whether to use offline gas estimation",
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "OFFLINE_GAS_ESTIMATION"),
}
TxManagerTimeoutFlag = cli.DurationFlag{
Name: "tx-manager-timeout",
Usage: "Maximum duration to wait for L1 transactions, including resubmissions",
Value: 10 * time.Minute,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "TX_MANAGER_TIMEOUT"),
}
MaxChannelDurationFlag = cli.Uint64Flag{ MaxChannelDurationFlag = cli.Uint64Flag{
Name: "max-channel-duration", Name: "max-channel-duration",
Usage: "The maximum duration of L1-blocks to keep a channel open. 0 to disable.", Usage: "The maximum duration of L1-blocks to keep a channel open. 0 to disable.",
...@@ -141,6 +153,8 @@ var requiredFlags = []cli.Flag{ ...@@ -141,6 +153,8 @@ var requiredFlags = []cli.Flag{
} }
var optionalFlags = []cli.Flag{ var optionalFlags = []cli.Flag{
OfflineGasEstimationFlag,
TxManagerTimeoutFlag,
MaxChannelDurationFlag, MaxChannelDurationFlag,
MaxL1TxSizeBytesFlag, MaxL1TxSizeBytesFlag,
TargetL1TxSizeBytesFlag, TargetL1TxSizeBytesFlag,
......
...@@ -185,7 +185,7 @@ func (m *Metrics) RecordLatestL1Block(l1ref eth.L1BlockRef) { ...@@ -185,7 +185,7 @@ func (m *Metrics) RecordLatestL1Block(l1ref eth.L1BlockRef) {
m.RecordL1Ref("latest", l1ref) m.RecordL1Ref("latest", l1ref)
} }
// RecordL2BlockLoaded should be called when a new L2 block was loaded into the // RecordL2BlocksLoaded should be called when a new L2 block was loaded into the
// channel manager (but not processed yet). // channel manager (but not processed yet).
func (m *Metrics) RecordL2BlocksLoaded(l2ref eth.L2BlockRef) { func (m *Metrics) RecordL2BlocksLoaded(l2ref eth.L2BlockRef) {
m.RecordL2Ref(StageLoaded, l2ref) m.RecordL2Ref(StageLoaded, l2ref)
......
This diff is collapsed.
This diff is collapsed.
package ether package ether
import ( import (
"fmt"
"math/big" "math/big"
"math/rand" "math/rand"
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-chain-ops/util"
"github.com/ethereum-optimism/optimism/op-bindings/predeploys" "github.com/ethereum-optimism/optimism/op-bindings/predeploys"
"github.com/ethereum-optimism/optimism/op-chain-ops/crossdomain" "github.com/ethereum-optimism/optimism/op-chain-ops/crossdomain"
...@@ -190,7 +191,7 @@ func TestMigrateBalances(t *testing.T) { ...@@ -190,7 +191,7 @@ func TestMigrateBalances(t *testing.T) {
} }
} }
func makeLegacyETH(t *testing.T, totalSupply *big.Int, balances map[common.Address]*big.Int, allowances map[common.Address]common.Address) (*state.StateDB, DBFactory) { func makeLegacyETH(t *testing.T, totalSupply *big.Int, balances map[common.Address]*big.Int, allowances map[common.Address]common.Address) (*state.StateDB, util.DBFactory) {
memDB := rawdb.NewMemoryDatabase() memDB := rawdb.NewMemoryDatabase()
db, err := state.New(common.Hash{}, state.NewDatabaseWithConfig(memDB, &trie.Config{ db, err := state.New(common.Hash{}, state.NewDatabaseWithConfig(memDB, &trie.Config{
Preimages: true, Preimages: true,
...@@ -283,85 +284,6 @@ func TestMigrateBalancesRandomMissing(t *testing.T) { ...@@ -283,85 +284,6 @@ func TestMigrateBalancesRandomMissing(t *testing.T) {
} }
} }
func TestPartitionKeyspace(t *testing.T) {
tests := []struct {
i int
count int
expected [2]common.Hash
}{
{
i: 0,
count: 1,
expected: [2]common.Hash{
common.HexToHash("0x00"),
common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"),
},
},
{
i: 0,
count: 2,
expected: [2]common.Hash{
common.HexToHash("0x00"),
common.HexToHash("0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"),
},
},
{
i: 1,
count: 2,
expected: [2]common.Hash{
common.HexToHash("0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"),
common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"),
},
},
{
i: 0,
count: 3,
expected: [2]common.Hash{
common.HexToHash("0x00"),
common.HexToHash("0x5555555555555555555555555555555555555555555555555555555555555555"),
},
},
{
i: 1,
count: 3,
expected: [2]common.Hash{
common.HexToHash("0x5555555555555555555555555555555555555555555555555555555555555555"),
common.HexToHash("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
},
},
{
i: 2,
count: 3,
expected: [2]common.Hash{
common.HexToHash("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"),
},
},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("i %d, count %d", tt.i, tt.count), func(t *testing.T) {
start, end := PartitionKeyspace(tt.i, tt.count)
require.Equal(t, tt.expected[0], start)
require.Equal(t, tt.expected[1], end)
})
}
t.Run("panics on invalid i or count", func(t *testing.T) {
require.Panics(t, func() {
PartitionKeyspace(1, 1)
})
require.Panics(t, func() {
PartitionKeyspace(-1, 1)
})
require.Panics(t, func() {
PartitionKeyspace(0, -1)
})
require.Panics(t, func() {
PartitionKeyspace(-1, -1)
})
})
}
func randAddr(t *testing.T) common.Address { func randAddr(t *testing.T) common.Address {
var addr common.Address var addr common.Address
_, err := rand.Read(addr[:]) _, err := rand.Read(addr[:])
......
package util
import (
"fmt"
"math/big"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)
var (
// maxSlot is the maximum possible storage slot.
maxSlot = common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
)
type DBFactory func() (*state.StateDB, error)
type StateCallback func(db *state.StateDB, key, value common.Hash) error
func IterateState(dbFactory DBFactory, address common.Address, cb StateCallback, workers int) error {
if workers <= 0 {
panic("workers must be greater than 0")
}
// WaitGroup to wait for all workers to finish.
var wg sync.WaitGroup
// Channel to receive errors from each iteration job.
errCh := make(chan error, workers)
// Channel to cancel all iteration jobs.
cancelCh := make(chan struct{})
worker := func(start, end common.Hash) {
// Decrement the WaitGroup when the function returns.
defer wg.Done()
db, err := dbFactory()
if err != nil {
// Should never happen, so explode if it does.
log.Crit("cannot create state db", "err", err)
}
st, err := db.StorageTrie(address)
if err != nil {
// Should never happen, so explode if it does.
log.Crit("cannot get storage trie", "address", address, "err", err)
}
// st can be nil if the account doesn't exist.
if st == nil {
errCh <- fmt.Errorf("account does not exist: %s", address.Hex())
return
}
it := trie.NewIterator(st.NodeIterator(start.Bytes()))
// Below code is largely based on db.ForEachStorage. We can't use that
// because it doesn't allow us to specify a start and end key.
for it.Next() {
select {
case <-cancelCh:
// If one of the workers encounters an error, cancel all of them.
return
default:
break
}
// Use the raw (i.e., secure hashed) key to check if we've reached
// the end of the partition. Use > rather than >= here to account for
// the fact that the values returned by PartitionKeys are inclusive.
// Duplicate addresses that may be returned by this iteration are
// filtered out in the collector.
if new(big.Int).SetBytes(it.Key).Cmp(end.Big()) > 0 {
return
}
// Skip if the value is empty.
rawValue := it.Value
if len(rawValue) == 0 {
continue
}
// Get the preimage.
rawKey := st.GetKey(it.Key)
if rawKey == nil {
// Should never happen, so explode if it does.
log.Crit("cannot get preimage for storage key", "key", it.Key)
}
key := common.BytesToHash(rawKey)
// Parse the raw value.
_, content, _, err := rlp.Split(rawValue)
if err != nil {
// Should never happen, so explode if it does.
log.Crit("mal-formed data in state: %v", err)
}
value := common.BytesToHash(content)
// Call the callback with the DB, key, and value. Errors get
// bubbled up to the errCh.
if err := cb(db, key, value); err != nil {
errCh <- err
return
}
}
}
for i := 0; i < workers; i++ {
wg.Add(1)
// Partition the keyspace per worker.
start, end := PartitionKeyspace(i, workers)
// Kick off our worker.
go worker(start, end)
}
wg.Wait()
for len(errCh) > 0 {
err := <-errCh
if err != nil {
return err
}
}
return nil
}
// PartitionKeyspace divides the key space into partitions by dividing the maximum keyspace
// by count then multiplying by i. This will leave some slots left over, which we handle below. It
// returns the start and end keys for the partition as a common.Hash. Note that the returned range
// of keys is inclusive, i.e., [start, end] NOT [start, end).
func PartitionKeyspace(i int, count int) (common.Hash, common.Hash) {
if i < 0 || count < 0 {
panic("i and count must be greater than 0")
}
if i > count-1 {
panic("i must be less than count - 1")
}
// Divide the key space into partitions by dividing the key space by the number
// of jobs. This will leave some slots left over, which we handle below.
partSize := new(big.Int).Div(maxSlot.Big(), big.NewInt(int64(count)))
start := common.BigToHash(new(big.Int).Mul(big.NewInt(int64(i)), partSize))
var end common.Hash
if i < count-1 {
// If this is not the last partition, use the next partition's start key as the end.
end = common.BigToHash(new(big.Int).Mul(big.NewInt(int64(i+1)), partSize))
} else {
// If this is the last partition, use the max slot as the end.
end = maxSlot
}
return start, end
}
package util
import (
crand "crypto/rand"
"fmt"
"math/rand"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/trie"
"github.com/stretchr/testify/require"
)
var testAddr = common.Address{0: 0xff}
func TestStateIteratorWorkers(t *testing.T) {
_, factory, _ := setupRandTest(t)
for i := -1; i <= 0; i++ {
require.Panics(t, func() {
_ = IterateState(factory, testAddr, func(db *state.StateDB, key, value common.Hash) error {
return nil
}, i)
})
}
}
func TestStateIteratorNonexistentAccount(t *testing.T) {
_, factory, _ := setupRandTest(t)
require.ErrorContains(t, IterateState(factory, common.Address{}, func(db *state.StateDB, key, value common.Hash) error {
return nil
}, 1), "account does not exist")
}
func TestStateIteratorRandomOK(t *testing.T) {
for i := 0; i < 100; i++ {
hashes, factory, workerCount := setupRandTest(t)
seenHashes := make(map[common.Hash]bool)
hashCh := make(chan common.Hash)
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
for hash := range hashCh {
seenHashes[hash] = true
}
}()
require.NoError(t, IterateState(factory, testAddr, func(db *state.StateDB, key, value common.Hash) error {
hashCh <- key
return nil
}, workerCount))
close(hashCh)
<-doneCh
// Perform a less or equal check here in case of duplicates. The map check below will assert
// that all of the hashes are accounted for.
require.LessOrEqual(t, len(seenHashes), len(hashes))
// Every hash we put into state should have been iterated over.
for _, hash := range hashes {
require.Contains(t, seenHashes, hash)
}
}
}
func TestStateIteratorRandomError(t *testing.T) {
for i := 0; i < 100; i++ {
hashes, factory, workerCount := setupRandTest(t)
failHash := hashes[rand.Intn(len(hashes))]
require.ErrorContains(t, IterateState(factory, testAddr, func(db *state.StateDB, key, value common.Hash) error {
if key == failHash {
return fmt.Errorf("test error")
}
return nil
}, workerCount), "test error")
}
}
func TestPartitionKeyspace(t *testing.T) {
tests := []struct {
i int
count int
expected [2]common.Hash
}{
{
i: 0,
count: 1,
expected: [2]common.Hash{
common.HexToHash("0x00"),
common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"),
},
},
{
i: 0,
count: 2,
expected: [2]common.Hash{
common.HexToHash("0x00"),
common.HexToHash("0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"),
},
},
{
i: 1,
count: 2,
expected: [2]common.Hash{
common.HexToHash("0x7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"),
common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"),
},
},
{
i: 0,
count: 3,
expected: [2]common.Hash{
common.HexToHash("0x00"),
common.HexToHash("0x5555555555555555555555555555555555555555555555555555555555555555"),
},
},
{
i: 1,
count: 3,
expected: [2]common.Hash{
common.HexToHash("0x5555555555555555555555555555555555555555555555555555555555555555"),
common.HexToHash("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
},
},
{
i: 2,
count: 3,
expected: [2]common.Hash{
common.HexToHash("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"),
},
},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("i %d, count %d", tt.i, tt.count), func(t *testing.T) {
start, end := PartitionKeyspace(tt.i, tt.count)
require.Equal(t, tt.expected[0], start)
require.Equal(t, tt.expected[1], end)
})
}
t.Run("panics on invalid i or count", func(t *testing.T) {
require.Panics(t, func() {
PartitionKeyspace(1, 1)
})
require.Panics(t, func() {
PartitionKeyspace(-1, 1)
})
require.Panics(t, func() {
PartitionKeyspace(0, -1)
})
require.Panics(t, func() {
PartitionKeyspace(-1, -1)
})
})
}
func setupRandTest(t *testing.T) ([]common.Hash, DBFactory, int) {
memDB := rawdb.NewMemoryDatabase()
db, err := state.New(common.Hash{}, state.NewDatabaseWithConfig(memDB, &trie.Config{
Preimages: true,
Cache: 1024,
}), nil)
require.NoError(t, err)
hashCount := rand.Intn(100)
if hashCount == 0 {
hashCount = 1
}
hashes := make([]common.Hash, hashCount)
db.CreateAccount(testAddr)
for j := 0; j < hashCount; j++ {
hashes[j] = randHash(t)
db.SetState(testAddr, hashes[j], hashes[j])
}
root, err := db.Commit(false)
require.NoError(t, err)
err = db.Database().TrieDB().Commit(root, true)
require.NoError(t, err)
factory := func() (*state.StateDB, error) {
return state.New(root, state.NewDatabaseWithConfig(memDB, &trie.Config{
Preimages: true,
Cache: 1024,
}), nil)
}
workerCount := rand.Intn(64)
if workerCount == 0 {
workerCount = 1
}
return hashes, factory, workerCount
}
func randHash(t *testing.T) common.Hash {
var h common.Hash
_, err := crand.Read(h[:])
require.NoError(t, err)
return h
}
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-proposer/metrics"
"github.com/ethereum-optimism/optimism/op-proposer/proposer" "github.com/ethereum-optimism/optimism/op-proposer/proposer"
opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto" opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto"
"github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum-optimism/optimism/op-service/txmgr"
...@@ -51,6 +52,7 @@ func NewL2Proposer(t Testing, log log.Logger, cfg *ProposerCfg, l1 *ethclient.Cl ...@@ -51,6 +52,7 @@ func NewL2Proposer(t Testing, log log.Logger, cfg *ProposerCfg, l1 *ethclient.Cl
NumConfirmations: 1, NumConfirmations: 1,
SafeAbortNonceTooLowCount: 4, SafeAbortNonceTooLowCount: 4,
From: from, From: from,
ChainID: big.NewInt(420),
// Signer is loaded in `proposer.NewL2OutputSubmitter` // Signer is loaded in `proposer.NewL2OutputSubmitter`
}, },
L1Client: l1, L1Client: l1,
...@@ -60,7 +62,7 @@ func NewL2Proposer(t Testing, log log.Logger, cfg *ProposerCfg, l1 *ethclient.Cl ...@@ -60,7 +62,7 @@ func NewL2Proposer(t Testing, log log.Logger, cfg *ProposerCfg, l1 *ethclient.Cl
SignerFnFactory: signer, SignerFnFactory: signer,
} }
dr, err := proposer.NewL2OutputSubmitter(proposerCfg, log) dr, err := proposer.NewL2OutputSubmitter(proposerCfg, log, metrics.NoopMetrics)
require.NoError(t, err) require.NoError(t, err)
return &L2Proposer{ return &L2Proposer{
......
...@@ -15,6 +15,7 @@ import ( ...@@ -15,6 +15,7 @@ import (
batchermetrics "github.com/ethereum-optimism/optimism/op-batcher/metrics" batchermetrics "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/chaincfg" "github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
proposermetrics "github.com/ethereum-optimism/optimism/op-proposer/metrics"
l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer" l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
...@@ -277,6 +278,7 @@ func TestMigration(t *testing.T) { ...@@ -277,6 +278,7 @@ func TestMigration(t *testing.T) {
L2EngineAddr: gethNode.HTTPAuthEndpoint(), L2EngineAddr: gethNode.HTTPAuthEndpoint(),
L2EngineJWTSecret: testingJWTSecret, L2EngineJWTSecret: testingJWTSecret,
}, },
L2Sync: &node.PreparedL2SyncEndpoint{Client: nil, TrustRPC: false},
Driver: driver.Config{ Driver: driver.Config{
VerifierConfDepth: 0, VerifierConfDepth: 0,
SequencerConfDepth: 0, SequencerConfDepth: 0,
...@@ -327,6 +329,8 @@ func TestMigration(t *testing.T) { ...@@ -327,6 +329,8 @@ func TestMigration(t *testing.T) {
L1EthRpc: forkedL1URL, L1EthRpc: forkedL1URL,
L2EthRpc: gethNode.WSEndpoint(), L2EthRpc: gethNode.WSEndpoint(),
RollupRpc: rollupNode.HTTPEndpoint(), RollupRpc: rollupNode.HTTPEndpoint(),
TxManagerTimeout: 10 * time.Minute,
OfflineGasEstimation: true,
MaxChannelDuration: 1, MaxChannelDuration: 1,
MaxL1TxSize: 120_000, MaxL1TxSize: 120_000,
TargetL1TxSize: 100_000, TargetL1TxSize: 100_000,
...@@ -362,7 +366,7 @@ func TestMigration(t *testing.T) { ...@@ -362,7 +366,7 @@ func TestMigration(t *testing.T) {
Format: "text", Format: "text",
}, },
PrivateKey: hexPriv(secrets.Proposer), PrivateKey: hexPriv(secrets.Proposer),
}, lgr.New("module", "proposer")) }, lgr.New("module", "proposer"), proposermetrics.NoopMetrics)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
proposer.Stop() proposer.Stop()
......
...@@ -37,6 +37,7 @@ import ( ...@@ -37,6 +37,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/driver" "github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testlog"
proposermetrics "github.com/ethereum-optimism/optimism/op-proposer/metrics"
l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer" l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
) )
...@@ -193,6 +194,9 @@ type SystemConfig struct { ...@@ -193,6 +194,9 @@ type SystemConfig struct {
// If the proposer can make proposals for L2 blocks derived from L1 blocks which are not finalized on L1 yet. // If the proposer can make proposals for L2 blocks derived from L1 blocks which are not finalized on L1 yet.
NonFinalizedProposals bool NonFinalizedProposals bool
// Explicitly disable batcher, for tests that rely on unsafe L2 payloads
DisableBatcher bool
} }
type System struct { type System struct {
...@@ -417,6 +421,10 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -417,6 +421,10 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
L2EngineAddr: l2EndpointConfig, L2EngineAddr: l2EndpointConfig,
L2EngineJWTSecret: cfg.JWTSecret, L2EngineJWTSecret: cfg.JWTSecret,
} }
rollupCfg.L2Sync = &rollupNode.PreparedL2SyncEndpoint{
Client: nil,
TrustRPC: false,
}
} }
// Geth Clients // Geth Clients
...@@ -572,7 +580,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -572,7 +580,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
Format: "text", Format: "text",
}, },
PrivateKey: hexPriv(cfg.Secrets.Proposer), PrivateKey: hexPriv(cfg.Secrets.Proposer),
}, sys.cfg.Loggers["proposer"]) }, sys.cfg.Loggers["proposer"], proposermetrics.NoopMetrics)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to setup l2 output submitter: %w", err) return nil, fmt.Errorf("unable to setup l2 output submitter: %w", err)
} }
...@@ -582,10 +590,13 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -582,10 +590,13 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
} }
// Batch Submitter // Batch Submitter
txManagerTimeout := 10 * time.Minute
sys.BatchSubmitter, err = bss.NewBatchSubmitterFromCLIConfig(bss.CLIConfig{ sys.BatchSubmitter, err = bss.NewBatchSubmitterFromCLIConfig(bss.CLIConfig{
L1EthRpc: sys.Nodes["l1"].WSEndpoint(), L1EthRpc: sys.Nodes["l1"].WSEndpoint(),
L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(), L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(),
RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(), RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(),
TxManagerTimeout: txManagerTimeout,
OfflineGasEstimation: true,
MaxChannelDuration: 1, MaxChannelDuration: 1,
MaxL1TxSize: 120_000, MaxL1TxSize: 120_000,
TargetL1TxSize: 100_000, TargetL1TxSize: 100_000,
...@@ -606,9 +617,12 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -606,9 +617,12 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
return nil, fmt.Errorf("failed to setup batch submitter: %w", err) return nil, fmt.Errorf("failed to setup batch submitter: %w", err)
} }
// Batcher may be enabled later
if !sys.cfg.DisableBatcher {
if err := sys.BatchSubmitter.Start(); err != nil { if err := sys.BatchSubmitter.Start(); err != nil {
return nil, fmt.Errorf("unable to start batch submitter: %w", err) return nil, fmt.Errorf("unable to start batch submitter: %w", err)
} }
}
return sys, nil return sys, nil
} }
......
...@@ -649,7 +649,7 @@ func TestSystemMockP2P(t *testing.T) { ...@@ -649,7 +649,7 @@ func TestSystemMockP2P(t *testing.T) {
require.Contains(t, received, receiptVerif.BlockHash) require.Contains(t, received, receiptVerif.BlockHash)
} }
// TestSystemMockP2P sets up a L1 Geth node, a rollup node, and a L2 geth node and then confirms that // TestSystemRPCAltSync sets up a L1 Geth node, a rollup node, and a L2 geth node and then confirms that
// the nodes can sync L2 blocks before they are confirmed on L1. // the nodes can sync L2 blocks before they are confirmed on L1.
// //
// Test steps: // Test steps:
...@@ -660,24 +660,28 @@ func TestSystemMockP2P(t *testing.T) { ...@@ -660,24 +660,28 @@ func TestSystemMockP2P(t *testing.T) {
// 6. Wait for the RPC sync method to grab the block from the sequencer over RPC and insert it into the verifier's unsafe chain. // 6. Wait for the RPC sync method to grab the block from the sequencer over RPC and insert it into the verifier's unsafe chain.
// 7. Wait for the verifier to sync the unsafe chain into the safe chain. // 7. Wait for the verifier to sync the unsafe chain into the safe chain.
// 8. Verify that the TX is included in the verifier's safe chain. // 8. Verify that the TX is included in the verifier's safe chain.
func TestSystemMockAltSync(t *testing.T) { func TestSystemRPCAltSync(t *testing.T) {
parallel(t) parallel(t)
if !verboseGethNodes { if !verboseGethNodes {
log.Root().SetHandler(log.DiscardHandler()) log.Root().SetHandler(log.DiscardHandler())
} }
cfg := DefaultSystemConfig(t) cfg := DefaultSystemConfig(t)
// slow down L1 blocks so we can see the L2 blocks arrive well before the L1 blocks do. // the default is nil, but this may change in the future.
// Keep the seq window small so the L2 chain is started quick // This test must ensure the blocks are not synced via Gossip, but instead via the alt RPC based sync.
cfg.DeployConfig.L1BlockTime = 10 cfg.P2PTopology = nil
// Disable batcher, so there will not be any L1 data to sync from
cfg.DisableBatcher = true
var published, received []common.Hash var published, received []string
seqTracer, verifTracer := new(FnTracer), new(FnTracer) seqTracer, verifTracer := new(FnTracer), new(FnTracer)
// The sequencer still publishes the blocks to the tracer, even if they do not reach the network due to disabled P2P
seqTracer.OnPublishL2PayloadFn = func(ctx context.Context, payload *eth.ExecutionPayload) { seqTracer.OnPublishL2PayloadFn = func(ctx context.Context, payload *eth.ExecutionPayload) {
published = append(published, payload.BlockHash) published = append(published, payload.ID().String())
} }
// Blocks are now received via the RPC based alt-sync method
verifTracer.OnUnsafeL2PayloadFn = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) { verifTracer.OnUnsafeL2PayloadFn = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) {
received = append(received, payload.BlockHash) received = append(received, payload.ID().String())
} }
cfg.Nodes["sequencer"].Tracer = seqTracer cfg.Nodes["sequencer"].Tracer = seqTracer
cfg.Nodes["verifier"].Tracer = verifTracer cfg.Nodes["verifier"].Tracer = verifTracer
...@@ -687,8 +691,8 @@ func TestSystemMockAltSync(t *testing.T) { ...@@ -687,8 +691,8 @@ func TestSystemMockAltSync(t *testing.T) {
role: "sequencer", role: "sequencer",
action: func(sCfg *SystemConfig, system *System) { action: func(sCfg *SystemConfig, system *System) {
rpc, _ := system.Nodes["sequencer"].Attach() // never errors rpc, _ := system.Nodes["sequencer"].Attach() // never errors
cfg.Nodes["verifier"].L2Sync = &rollupNode.L2SyncRPCConfig{ cfg.Nodes["verifier"].L2Sync = &rollupNode.PreparedL2SyncEndpoint{
Rpc: client.NewBaseRPCClient(rpc), Client: client.NewBaseRPCClient(rpc),
} }
}, },
}) })
...@@ -726,7 +730,7 @@ func TestSystemMockAltSync(t *testing.T) { ...@@ -726,7 +730,7 @@ func TestSystemMockAltSync(t *testing.T) {
require.Equal(t, receiptSeq, receiptVerif) require.Equal(t, receiptSeq, receiptVerif)
// Verify that the tx was received via RPC sync (P2P is disabled) // Verify that the tx was received via RPC sync (P2P is disabled)
require.Contains(t, received, receiptVerif.BlockHash) require.Contains(t, received, eth.BlockID{Hash: receiptVerif.BlockHash, Number: receiptVerif.BlockNumber.Uint64()}.String())
// Verify that everything that was received was published // Verify that everything that was received was published
require.GreaterOrEqual(t, len(published), len(received)) require.GreaterOrEqual(t, len(published), len(received))
......
...@@ -52,6 +52,12 @@ jq "select(.valid_data == false)|.tx.hash" $TX_DIR ...@@ -52,6 +52,12 @@ jq "select(.valid_data == false)|.tx.hash" $TX_DIR
# Select all channels that are not ready and then get the id and inclusion block & tx hash of the first frame. # Select all channels that are not ready and then get the id and inclusion block & tx hash of the first frame.
jq "select(.is_ready == false)|[.id, .frames[0].inclusion_block, .frames[0].transaction_hash]" $CHANNEL_DIR jq "select(.is_ready == false)|[.id, .frames[0].inclusion_block, .frames[0].transaction_hash]" $CHANNEL_DIR
# Show all of the frames in a channel without seeing the batches or frame data
jq 'del(.batches)|del(.frames[]|.frame.data)' $CHANNEL_FILE
# Show all batches (without timestamps) in a channel
jq '.batches|del(.[]|.Transactions)' $CHANNEL_FILE
``` ```
......
...@@ -17,3 +17,9 @@ const ( ...@@ -17,3 +17,9 @@ const (
// - L2: Derived chain tip from finalized L1 data // - L2: Derived chain tip from finalized L1 data
Finalized = "finalized" Finalized = "finalized"
) )
func (label BlockLabel) Arg() any { return string(label) }
func (BlockLabel) CheckID(id BlockID) error {
return nil
}
...@@ -14,10 +14,10 @@ import ( ...@@ -14,10 +14,10 @@ import (
// Flags // Flags
const envVarPrefix = "OP_NODE_" const envVarPrefix = "OP_NODE"
func prefixEnvVar(name string) string { func prefixEnvVar(name string) string {
return envVarPrefix + name return envVarPrefix + "_" + name
} }
var ( var (
...@@ -175,6 +175,13 @@ var ( ...@@ -175,6 +175,13 @@ var (
EnvVar: prefixEnvVar("L2_BACKUP_UNSAFE_SYNC_RPC"), EnvVar: prefixEnvVar("L2_BACKUP_UNSAFE_SYNC_RPC"),
Required: false, Required: false,
} }
BackupL2UnsafeSyncRPCTrustRPC = cli.StringFlag{
Name: "l2.backup-unsafe-sync-rpc.trustrpc",
Usage: "Like l1.trustrpc, configure if response data from the RPC needs to be verified, e.g. blockhash computation." +
"This does not include checks if the blockhash is part of the canonical chain.",
EnvVar: prefixEnvVar("L2_BACKUP_UNSAFE_SYNC_RPC_TRUST_RPC"),
Required: false,
}
) )
var requiredFlags = []cli.Flag{ var requiredFlags = []cli.Flag{
...@@ -207,6 +214,7 @@ var optionalFlags = []cli.Flag{ ...@@ -207,6 +214,7 @@ var optionalFlags = []cli.Flag{
HeartbeatMonikerFlag, HeartbeatMonikerFlag,
HeartbeatURLFlag, HeartbeatURLFlag,
BackupL2UnsafeSyncRPC, BackupL2UnsafeSyncRPC,
BackupL2UnsafeSyncRPCTrustRPC,
} }
// Flags contains the list of configuration options available to the binary. // Flags contains the list of configuration options available to the binary.
......
...@@ -34,6 +34,15 @@ var ( ...@@ -34,6 +34,15 @@ var (
Value: "none", Value: "none",
EnvVar: p2pEnv("PEER_SCORING"), EnvVar: p2pEnv("PEER_SCORING"),
} }
PeerScoreBands = cli.StringFlag{
Name: "p2p.score.bands",
Usage: "Sets the peer score bands used primarily for peer score metrics. " +
"Should be provided in following format: <threshold>:<label>;<threshold>:<label>;..." +
"For example: -40:graylist;-20:restricted;0:nopx;20:friend;",
Required: false,
Value: "-40:graylist;-20:restricted;0:nopx;20:friend;",
EnvVar: p2pEnv("SCORE_BANDS"),
}
// Banning Flag - whether or not we want to act on the scoring // Banning Flag - whether or not we want to act on the scoring
Banning = cli.BoolFlag{ Banning = cli.BoolFlag{
...@@ -276,6 +285,10 @@ var p2pFlags = []cli.Flag{ ...@@ -276,6 +285,10 @@ var p2pFlags = []cli.Flag{
NoDiscovery, NoDiscovery,
P2PPrivPath, P2PPrivPath,
P2PPrivRaw, P2PPrivRaw,
PeerScoring,
PeerScoreBands,
Banning,
TopicScoring,
ListenIP, ListenIP,
ListenTCPPort, ListenTCPPort,
ListenUDPPort, ListenUDPPort,
......
...@@ -15,7 +15,6 @@ import ( ...@@ -15,7 +15,6 @@ import (
pb "github.com/libp2p/go-libp2p-pubsub/pb" pb "github.com/libp2p/go-libp2p-pubsub/pb"
libp2pmetrics "github.com/libp2p/go-libp2p/core/metrics" libp2pmetrics "github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
...@@ -66,7 +65,7 @@ type Metricer interface { ...@@ -66,7 +65,7 @@ type Metricer interface {
RecordSequencerSealingTime(duration time.Duration) RecordSequencerSealingTime(duration time.Duration)
Document() []metrics.DocumentedMetric Document() []metrics.DocumentedMetric
// P2P Metrics // P2P Metrics
RecordPeerScoring(peerID peer.ID, score float64) SetPeerScores(scores map[string]float64)
} }
// Metrics tracks all the metrics for the op-node. // Metrics tracks all the metrics for the op-node.
...@@ -287,21 +286,24 @@ func NewMetrics(procName string) *Metrics { ...@@ -287,21 +286,24 @@ func NewMetrics(procName string) *Metrics {
Name: "peer_count", Name: "peer_count",
Help: "Count of currently connected p2p peers", Help: "Count of currently connected p2p peers",
}), }),
StreamCount: factory.NewGauge(prometheus.GaugeOpts{ // Notice: We cannot use peer ids as [Labels] in the GaugeVec
Namespace: ns, // since peer ids would open a service attack vector.
Subsystem: "p2p", // Each peer id would be a separate metric, flooding prometheus.
Name: "stream_count", //
Help: "Count of currently connected p2p streams", // [Labels]: https://prometheus.io/docs/practices/naming/#labels
}),
PeerScores: factory.NewGaugeVec(prometheus.GaugeOpts{ PeerScores: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Namespace: ns,
Subsystem: "p2p", Subsystem: "p2p",
Name: "peer_scores", Name: "peer_scores",
Help: "Peer scoring", Help: "Count of peer scores grouped by score",
}, []string{ }, []string{
// No label names here since peer ids would open a service attack vector. "band",
// Each peer id would be a separate metric, flooding prometheus. }),
// See: https://prometheus.io/docs/practices/naming/#labels StreamCount: factory.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Subsystem: "p2p",
Name: "stream_count",
Help: "Count of currently connected p2p streams",
}), }),
GossipEventsTotal: factory.NewCounterVec(prometheus.CounterOpts{ GossipEventsTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
...@@ -350,6 +352,14 @@ func NewMetrics(procName string) *Metrics { ...@@ -350,6 +352,14 @@ func NewMetrics(procName string) *Metrics {
} }
} }
// SetPeerScores updates the peer score [prometheus.GaugeVec].
// This takes a map of labels to scores.
func (m *Metrics) SetPeerScores(scores map[string]float64) {
for label, score := range scores {
m.PeerScores.WithLabelValues(label).Set(score)
}
}
// RecordInfo sets a pseudo-metric that contains versioning and // RecordInfo sets a pseudo-metric that contains versioning and
// config info for the opnode. // config info for the opnode.
func (m *Metrics) RecordInfo(version string) { func (m *Metrics) RecordInfo(version string) {
...@@ -491,10 +501,6 @@ func (m *Metrics) RecordGossipEvent(evType int32) { ...@@ -491,10 +501,6 @@ func (m *Metrics) RecordGossipEvent(evType int32) {
m.GossipEventsTotal.WithLabelValues(pb.TraceEvent_Type_name[evType]).Inc() m.GossipEventsTotal.WithLabelValues(pb.TraceEvent_Type_name[evType]).Inc()
} }
func (m *Metrics) RecordPeerScoring(peerID peer.ID, score float64) {
m.PeerScores.WithLabelValues(peerID.String()).Set(score)
}
func (m *Metrics) IncPeerCount() { func (m *Metrics) IncPeerCount() {
m.PeerCount.Inc() m.PeerCount.Inc()
} }
...@@ -627,7 +633,7 @@ func (n *noopMetricer) RecordSequencerReset() { ...@@ -627,7 +633,7 @@ func (n *noopMetricer) RecordSequencerReset() {
func (n *noopMetricer) RecordGossipEvent(evType int32) { func (n *noopMetricer) RecordGossipEvent(evType int32) {
} }
func (n *noopMetricer) RecordPeerScoring(peerID peer.ID, score float64) { func (n *noopMetricer) SetPeerScores(scores map[string]float64) {
} }
func (n *noopMetricer) IncPeerCount() { func (n *noopMetricer) IncPeerCount() {
......
...@@ -20,7 +20,9 @@ type L2EndpointSetup interface { ...@@ -20,7 +20,9 @@ type L2EndpointSetup interface {
} }
type L2SyncEndpointSetup interface { type L2SyncEndpointSetup interface {
Setup(ctx context.Context, log log.Logger) (cl client.RPC, err error) // Setup a RPC client to another L2 node to sync L2 blocks from.
// It may return a nil client with nil error if RPC based sync is not enabled.
Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error)
Check() error Check() error
} }
...@@ -82,45 +84,45 @@ func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (client ...@@ -82,45 +84,45 @@ func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (client
// L2SyncEndpointConfig contains configuration for the fallback sync endpoint // L2SyncEndpointConfig contains configuration for the fallback sync endpoint
type L2SyncEndpointConfig struct { type L2SyncEndpointConfig struct {
// Address of the L2 RPC to use for backup sync // Address of the L2 RPC to use for backup sync, may be empty if RPC alt-sync is disabled.
L2NodeAddr string L2NodeAddr string
TrustRPC bool
} }
var _ L2SyncEndpointSetup = (*L2SyncEndpointConfig)(nil) var _ L2SyncEndpointSetup = (*L2SyncEndpointConfig)(nil)
func (cfg *L2SyncEndpointConfig) Setup(ctx context.Context, log log.Logger) (client.RPC, error) { // Setup creates an RPC client to sync from.
// It will return nil without error if no sync method is configured.
func (cfg *L2SyncEndpointConfig) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) {
if cfg.L2NodeAddr == "" {
return nil, false, nil
}
l2Node, err := client.NewRPC(ctx, log, cfg.L2NodeAddr) l2Node, err := client.NewRPC(ctx, log, cfg.L2NodeAddr)
if err != nil { if err != nil {
return nil, err return nil, false, err
} }
return l2Node, nil return l2Node, cfg.TrustRPC, nil
} }
func (cfg *L2SyncEndpointConfig) Check() error { func (cfg *L2SyncEndpointConfig) Check() error {
if cfg.L2NodeAddr == "" { // empty addr is valid, as it is optional.
return errors.New("empty L2 Node Address")
}
return nil return nil
} }
type L2SyncRPCConfig struct { type PreparedL2SyncEndpoint struct {
// RPC endpoint to use for syncing // RPC endpoint to use for syncing, may be nil if RPC alt-sync is disabled.
Rpc client.RPC Client client.RPC
TrustRPC bool
} }
var _ L2SyncEndpointSetup = (*L2SyncRPCConfig)(nil) var _ L2SyncEndpointSetup = (*PreparedL2SyncEndpoint)(nil)
func (cfg *L2SyncRPCConfig) Setup(ctx context.Context, log log.Logger) (client.RPC, error) { func (cfg *PreparedL2SyncEndpoint) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) {
return cfg.Rpc, nil return cfg.Client, cfg.TrustRPC, nil
} }
func (cfg *L2SyncRPCConfig) Check() error { func (cfg *PreparedL2SyncEndpoint) Check() error {
if cfg.Rpc == nil {
return errors.New("rpc cannot be nil")
}
return nil return nil
} }
......
...@@ -80,6 +80,9 @@ func (cfg *Config) Check() error { ...@@ -80,6 +80,9 @@ func (cfg *Config) Check() error {
if err := cfg.L2.Check(); err != nil { if err := cfg.L2.Check(); err != nil {
return fmt.Errorf("l2 endpoint config error: %w", err) return fmt.Errorf("l2 endpoint config error: %w", err)
} }
if err := cfg.L2Sync.Check(); err != nil {
return fmt.Errorf("sync config error: %w", err)
}
if err := cfg.Rollup.Check(); err != nil { if err := cfg.Rollup.Check(); err != nil {
return fmt.Errorf("rollup config error: %w", err) return fmt.Errorf("rollup config error: %w", err)
} }
......
...@@ -33,6 +33,7 @@ type OpNode struct { ...@@ -33,6 +33,7 @@ type OpNode struct {
l1Source *sources.L1Client // L1 Client to fetch data from l1Source *sources.L1Client // L1 Client to fetch data from
l2Driver *driver.Driver // L2 Engine to Sync l2Driver *driver.Driver // L2 Engine to Sync
l2Source *sources.EngineClient // L2 Execution Engine RPC bindings l2Source *sources.EngineClient // L2 Execution Engine RPC bindings
rpcSync *sources.SyncClient // Alt-sync RPC client, optional (may be nil)
server *rpcServer // RPC server hosting the rollup-node API server *rpcServer // RPC server hosting the rollup-node API
p2pNode *p2p.NodeP2P // P2P node functionality p2pNode *p2p.NodeP2P // P2P node functionality
p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer
...@@ -86,6 +87,9 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger) ...@@ -86,6 +87,9 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger)
if err := n.initL2(ctx, cfg, snapshotLog); err != nil { if err := n.initL2(ctx, cfg, snapshotLog); err != nil {
return err return err
} }
if err := n.initRPCSync(ctx, cfg); err != nil {
return err
}
if err := n.initP2PSigner(ctx, cfg); err != nil { if err := n.initP2PSigner(ctx, cfg); err != nil {
return err return err
} }
...@@ -197,29 +201,27 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger ...@@ -197,29 +201,27 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger
return err return err
} }
var syncClient *sources.SyncClient n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n, n, n.log, snapshotLog, n.metrics)
// If the L2 sync config is present, use it to create a sync client
if cfg.L2Sync != nil { return nil
if err := cfg.L2Sync.Check(); err != nil { }
log.Info("L2 sync config is not present, skipping L2 sync client setup", "err", err)
} else { func (n *OpNode) initRPCSync(ctx context.Context, cfg *Config) error {
rpcSyncClient, err := cfg.L2Sync.Setup(ctx, n.log) rpcSyncClient, trustRPC, err := cfg.L2Sync.Setup(ctx, n.log)
if err != nil { if err != nil {
return fmt.Errorf("failed to setup L2 execution-engine RPC client for backup sync: %w", err) return fmt.Errorf("failed to setup L2 execution-engine RPC client for backup sync: %w", err)
} }
if rpcSyncClient == nil { // if no RPC client is configured to sync from, then don't add the RPC sync client
return nil
}
// The sync client's RPC is always trusted config := sources.SyncClientDefaultConfig(&cfg.Rollup, trustRPC)
config := sources.SyncClientDefaultConfig(&cfg.Rollup, true)
syncClient, err = sources.NewSyncClient(n.OnUnsafeL2Payload, rpcSyncClient, n.log, n.metrics.L2SourceCache, config) syncClient, err := sources.NewSyncClient(n.OnUnsafeL2Payload, rpcSyncClient, n.log, n.metrics.L2SourceCache, config)
if err != nil { if err != nil {
return fmt.Errorf("failed to create sync client: %w", err) return fmt.Errorf("failed to create sync client: %w", err)
} }
} n.rpcSync = syncClient
}
n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, syncClient, n, n.log, snapshotLog, n.metrics)
return nil return nil
} }
...@@ -292,11 +294,12 @@ func (n *OpNode) Start(ctx context.Context) error { ...@@ -292,11 +294,12 @@ func (n *OpNode) Start(ctx context.Context) error {
} }
// If the backup unsafe sync client is enabled, start its event loop // If the backup unsafe sync client is enabled, start its event loop
if n.l2Driver.L2SyncCl != nil { if n.rpcSync != nil {
if err := n.l2Driver.L2SyncCl.Start(); err != nil { if err := n.rpcSync.Start(); err != nil {
n.log.Error("Could not start the backup sync client", "err", err) n.log.Error("Could not start the backup sync client", "err", err)
return err return err
} }
n.log.Info("Started L2-RPC sync service")
} }
return nil return nil
...@@ -375,6 +378,14 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, payload *e ...@@ -375,6 +378,14 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, payload *e
return nil return nil
} }
func (n *OpNode) RequestL2Range(ctx context.Context, start, end uint64) error {
if n.rpcSync != nil {
return n.rpcSync.RequestL2Range(ctx, start, end)
}
n.log.Debug("ignoring request to sync L2 range, no sync method available")
return nil
}
func (n *OpNode) P2P() p2p.Node { func (n *OpNode) P2P() p2p.Node {
return n.p2pNode return n.p2pNode
} }
...@@ -413,8 +424,8 @@ func (n *OpNode) Close() error { ...@@ -413,8 +424,8 @@ func (n *OpNode) Close() error {
} }
// If the L2 sync client is present & running, close it. // If the L2 sync client is present & running, close it.
if n.l2Driver.L2SyncCl != nil { if n.rpcSync != nil {
if err := n.l2Driver.L2SyncCl.Close(); err != nil { if err := n.rpcSync.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close L2 engine backup sync client cleanly: %w", err)) result = multierror.Append(result, fmt.Errorf("failed to close L2 engine backup sync client cleanly: %w", err))
} }
} }
......
package p2p
import (
"testing"
"github.com/stretchr/testify/require"
)
// TestBandScorer_ParseDefault tests the [BandScorer.Parse] function
// on the default band scores cli flag value.
func TestBandScorer_ParseDefault(t *testing.T) {
// Create a new band scorer.
bandScorer, err := NewBandScorer("-40:graylist;-20:restricted;0:nopx;20:friend;")
require.NoError(t, err)
// Validate the [BandScorer] internals.
require.ElementsMatch(t, bandScorer.bands, []scorePair{
{band: "graylist", threshold: -40},
{band: "restricted", threshold: -20},
{band: "nopx", threshold: 0},
{band: "friend", threshold: 20},
})
}
// TestBandScorer_BucketCorrectly tests the [BandScorer.Bucket] function
// on a variety of scores.
func TestBandScorer_BucketCorrectly(t *testing.T) {
// Create a new band scorer.
bandScorer, err := NewBandScorer("-40:graylist;-20:restricted;0:nopx;20:friend;")
require.NoError(t, err)
// Validate the [BandScorer] internals.
require.Equal(t, bandScorer.Bucket(-100), "graylist")
require.Equal(t, bandScorer.Bucket(-40), "graylist")
require.Equal(t, bandScorer.Bucket(-39), "restricted")
require.Equal(t, bandScorer.Bucket(-20), "restricted")
require.Equal(t, bandScorer.Bucket(-19), "nopx")
require.Equal(t, bandScorer.Bucket(0), "nopx")
require.Equal(t, bandScorer.Bucket(1), "friend")
require.Equal(t, bandScorer.Bucket(20), "friend")
require.Equal(t, bandScorer.Bucket(21), "friend")
}
// TestBandScorer_BucketInverted tests the [BandScorer.Bucket] function
// on a variety of scores, in descending order.
func TestBandScorer_BucketInverted(t *testing.T) {
// Create a new band scorer.
bandScorer, err := NewBandScorer("20:friend;0:nopx;-20:restricted;-40:graylist;")
require.NoError(t, err)
// Validate the [BandScorer] internals.
require.Equal(t, bandScorer.Bucket(-100), "graylist")
require.Equal(t, bandScorer.Bucket(-40), "graylist")
require.Equal(t, bandScorer.Bucket(-39), "restricted")
require.Equal(t, bandScorer.Bucket(-20), "restricted")
require.Equal(t, bandScorer.Bucket(-19), "nopx")
require.Equal(t, bandScorer.Bucket(0), "nopx")
require.Equal(t, bandScorer.Bucket(1), "friend")
require.Equal(t, bandScorer.Bucket(20), "friend")
require.Equal(t, bandScorer.Bucket(21), "friend")
}
// TestBandScorer_ParseEmpty tests the [BandScorer.Parse] function
// on an empty string.
func TestBandScorer_ParseEmpty(t *testing.T) {
// Create a band scorer on an empty string.
bandScorer, err := NewBandScorer("")
require.NoError(t, err)
// Validate the [BandScorer] internals.
require.Len(t, bandScorer.bands, 0)
}
// TestBandScorer_ParseWhitespace tests the [BandScorer.Parse] function
// on a variety of whitespaced strings.
func TestBandScorer_ParseWhitespace(t *testing.T) {
// Create a band scorer on an empty string.
bandScorer, err := NewBandScorer(" ; ; ; ")
require.NoError(t, err)
// Validate the [BandScorer] internals.
require.Len(t, bandScorer.bands, 0)
}
...@@ -58,6 +58,10 @@ func NewConfig(ctx *cli.Context, blockTime uint64) (*p2p.Config, error) { ...@@ -58,6 +58,10 @@ func NewConfig(ctx *cli.Context, blockTime uint64) (*p2p.Config, error) {
return nil, fmt.Errorf("failed to load p2p peer scoring options: %w", err) return nil, fmt.Errorf("failed to load p2p peer scoring options: %w", err)
} }
if err := loadPeerScoreBands(conf, ctx); err != nil {
return nil, fmt.Errorf("failed to load p2p peer score bands: %w", err)
}
if err := loadBanningOption(conf, ctx); err != nil { if err := loadBanningOption(conf, ctx); err != nil {
return nil, fmt.Errorf("failed to load banning option: %w", err) return nil, fmt.Errorf("failed to load banning option: %w", err)
} }
...@@ -121,6 +125,17 @@ func loadPeerScoringParams(conf *p2p.Config, ctx *cli.Context, blockTime uint64) ...@@ -121,6 +125,17 @@ func loadPeerScoringParams(conf *p2p.Config, ctx *cli.Context, blockTime uint64)
return nil return nil
} }
// loadPeerScoreBands loads [p2p.BandScorer] from the CLI context.
func loadPeerScoreBands(conf *p2p.Config, ctx *cli.Context) error {
scoreBands := ctx.GlobalString(flags.PeerScoreBands.Name)
bandScorer, err := p2p.NewBandScorer(scoreBands)
if err != nil {
return err
}
conf.BandScoreThresholds = *bandScorer
return nil
}
// loadBanningOption loads whether or not to ban peers from the CLI context. // loadBanningOption loads whether or not to ban peers from the CLI context.
func loadBanningOption(conf *p2p.Config, ctx *cli.Context) error { func loadBanningOption(conf *p2p.Config, ctx *cli.Context) error {
ban := ctx.GlobalBool(flags.Banning.Name) ban := ctx.GlobalBool(flags.Banning.Name)
......
...@@ -54,6 +54,9 @@ type Config struct { ...@@ -54,6 +54,9 @@ type Config struct {
PeerScoring pubsub.PeerScoreParams PeerScoring pubsub.PeerScoreParams
TopicScoring pubsub.TopicScoreParams TopicScoring pubsub.TopicScoreParams
// Peer Score Band Thresholds
BandScoreThresholds BandScoreThresholds
// Whether to ban peers based on their [PeerScoring] score. // Whether to ban peers based on their [PeerScoring] score.
BanningEnabled bool BanningEnabled bool
...@@ -151,6 +154,10 @@ func (conf *Config) PeerScoringParams() *pubsub.PeerScoreParams { ...@@ -151,6 +154,10 @@ func (conf *Config) PeerScoringParams() *pubsub.PeerScoreParams {
return &conf.PeerScoring return &conf.PeerScoring
} }
func (conf *Config) PeerBandScorer() *BandScoreThresholds {
return &conf.BandScoreThresholds
}
func (conf *Config) BanPeers() bool { func (conf *Config) BanPeers() bool {
return conf.BanningEnabled return conf.BanningEnabled
} }
......
...@@ -55,6 +55,7 @@ type GossipSetupConfigurables interface { ...@@ -55,6 +55,7 @@ type GossipSetupConfigurables interface {
TopicScoringParams() *pubsub.TopicScoreParams TopicScoringParams() *pubsub.TopicScoreParams
BanPeers() bool BanPeers() bool
ConfigureGossip(params *pubsub.GossipSubParams) []pubsub.Option ConfigureGossip(params *pubsub.GossipSubParams) []pubsub.Option
PeerBandScorer() *BandScoreThresholds
} }
type GossipRuntimeConfig interface { type GossipRuntimeConfig interface {
...@@ -64,7 +65,8 @@ type GossipRuntimeConfig interface { ...@@ -64,7 +65,8 @@ type GossipRuntimeConfig interface {
//go:generate mockery --name GossipMetricer //go:generate mockery --name GossipMetricer
type GossipMetricer interface { type GossipMetricer interface {
RecordGossipEvent(evType int32) RecordGossipEvent(evType int32)
RecordPeerScoring(peerID peer.ID, score float64) // Peer Scoring Metric Funcs
SetPeerScores(map[string]float64)
} }
func blocksTopicV1(cfg *rollup.Config) string { func blocksTopicV1(cfg *rollup.Config) string {
......
// Code generated by mockery v2.14.0. DO NOT EDIT. // Code generated by mockery v2.22.1. DO NOT EDIT.
package mocks package mocks
...@@ -123,13 +123,16 @@ func (_m *ConnectionGater) InterceptUpgraded(_a0 network.Conn) (bool, control.Di ...@@ -123,13 +123,16 @@ func (_m *ConnectionGater) InterceptUpgraded(_a0 network.Conn) (bool, control.Di
ret := _m.Called(_a0) ret := _m.Called(_a0)
var r0 bool var r0 bool
var r1 control.DisconnectReason
if rf, ok := ret.Get(0).(func(network.Conn) (bool, control.DisconnectReason)); ok {
return rf(_a0)
}
if rf, ok := ret.Get(0).(func(network.Conn) bool); ok { if rf, ok := ret.Get(0).(func(network.Conn) bool); ok {
r0 = rf(_a0) r0 = rf(_a0)
} else { } else {
r0 = ret.Get(0).(bool) r0 = ret.Get(0).(bool)
} }
var r1 control.DisconnectReason
if rf, ok := ret.Get(1).(func(network.Conn) control.DisconnectReason); ok { if rf, ok := ret.Get(1).(func(network.Conn) control.DisconnectReason); ok {
r1 = rf(_a0) r1 = rf(_a0)
} else { } else {
......
// Code generated by mockery v2.14.0. DO NOT EDIT. // Code generated by mockery v2.22.1. DO NOT EDIT.
package mocks package mocks
import ( import mock "github.com/stretchr/testify/mock"
mock "github.com/stretchr/testify/mock"
peer "github.com/libp2p/go-libp2p/core/peer"
)
// GossipMetricer is an autogenerated mock type for the GossipMetricer type // GossipMetricer is an autogenerated mock type for the GossipMetricer type
type GossipMetricer struct { type GossipMetricer struct {
...@@ -18,9 +14,9 @@ func (_m *GossipMetricer) RecordGossipEvent(evType int32) { ...@@ -18,9 +14,9 @@ func (_m *GossipMetricer) RecordGossipEvent(evType int32) {
_m.Called(evType) _m.Called(evType)
} }
// RecordPeerScoring provides a mock function with given fields: peerID, score // SetPeerScores provides a mock function with given fields: _a0
func (_m *GossipMetricer) RecordPeerScoring(peerID peer.ID, score float64) { func (_m *GossipMetricer) SetPeerScores(_a0 map[string]float64) {
_m.Called(peerID, score) _m.Called(_a0)
} }
type mockConstructorTestingTNewGossipMetricer interface { type mockConstructorTestingTNewGossipMetricer interface {
......
// Code generated by mockery v2.14.0. DO NOT EDIT. // Code generated by mockery v2.22.1. DO NOT EDIT.
package mocks package mocks
......
// Code generated by mockery v2.14.0. DO NOT EDIT. // Code generated by mockery v2.22.1. DO NOT EDIT.
package mocks package mocks
......
...@@ -43,11 +43,15 @@ func (s *gater) Update(id peer.ID, score float64) { ...@@ -43,11 +43,15 @@ func (s *gater) Update(id peer.ID, score float64) {
if score < PeerScoreThreshold && s.banEnabled { if score < PeerScoreThreshold && s.banEnabled {
s.log.Warn("peer blocking enabled, blocking peer", "id", id.String(), "score", score) s.log.Warn("peer blocking enabled, blocking peer", "id", id.String(), "score", score)
err := s.connGater.BlockPeer(id) err := s.connGater.BlockPeer(id)
s.log.Warn("connection gater failed to block peer", id.String(), "err", err) if err != nil {
s.log.Warn("connection gater failed to block peer", "id", id.String(), "err", err)
}
} }
// Unblock peers whose score has recovered to an acceptable level // Unblock peers whose score has recovered to an acceptable level
if (score > PeerScoreThreshold) && slices.Contains(s.connGater.ListBlockedPeers(), id) { if (score > PeerScoreThreshold) && slices.Contains(s.connGater.ListBlockedPeers(), id) {
err := s.connGater.UnblockPeer(id) err := s.connGater.UnblockPeer(id)
s.log.Warn("connection gater failed to unblock peer", id.String(), "err", err) if err != nil {
s.log.Warn("connection gater failed to unblock peer", "id", id.String(), "err", err)
}
} }
} }
package p2p package p2p
import ( import (
"fmt"
"sort"
"strconv"
"strings"
log "github.com/ethereum/go-ethereum/log" log "github.com/ethereum/go-ethereum/log"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
peer "github.com/libp2p/go-libp2p/core/peer" peer "github.com/libp2p/go-libp2p/core/peer"
...@@ -11,6 +16,68 @@ type scorer struct { ...@@ -11,6 +16,68 @@ type scorer struct {
metricer GossipMetricer metricer GossipMetricer
log log.Logger log log.Logger
gater PeerGater gater PeerGater
bandScoreThresholds *BandScoreThresholds
}
// scorePair holds a band and its corresponding threshold.
type scorePair struct {
band string
threshold float64
}
// BandScoreThresholds holds the thresholds for classifying peers
// into different score bands.
type BandScoreThresholds struct {
bands []scorePair
}
// NewBandScorer constructs a new [BandScoreThresholds] instance.
func NewBandScorer(str string) (*BandScoreThresholds, error) {
s := &BandScoreThresholds{
bands: make([]scorePair, 0),
}
for _, band := range strings.Split(str, ";") {
// Skip empty band strings.
band := strings.TrimSpace(band)
if band == "" {
continue
}
split := strings.Split(band, ":")
if len(split) != 2 {
return nil, fmt.Errorf("invalid score band: %s", band)
}
threshold, err := strconv.ParseFloat(split[0], 64)
if err != nil {
return nil, err
}
s.bands = append(s.bands, scorePair{
band: split[1],
threshold: threshold,
})
}
// Order the bands by threshold in ascending order.
sort.Slice(s.bands, func(i, j int) bool {
return s.bands[i].threshold < s.bands[j].threshold
})
return s, nil
}
// Bucket returns the appropriate band for a given score.
func (s *BandScoreThresholds) Bucket(score float64) string {
for _, pair := range s.bands {
if score <= pair.threshold {
return pair.band
}
}
// If there is no band threshold higher than the score,
// the peer must be placed in the highest bucket.
if len(s.bands) > 0 {
return s.bands[len(s.bands)-1].band
}
return ""
} }
// Peerstore is a subset of the libp2p peerstore.Peerstore interface. // Peerstore is a subset of the libp2p peerstore.Peerstore interface.
...@@ -34,12 +101,13 @@ type Scorer interface { ...@@ -34,12 +101,13 @@ type Scorer interface {
} }
// NewScorer returns a new peer scorer. // NewScorer returns a new peer scorer.
func NewScorer(peerGater PeerGater, peerStore Peerstore, metricer GossipMetricer, log log.Logger) Scorer { func NewScorer(peerGater PeerGater, peerStore Peerstore, metricer GossipMetricer, bandScoreThresholds *BandScoreThresholds, log log.Logger) Scorer {
return &scorer{ return &scorer{
peerStore: peerStore, peerStore: peerStore,
metricer: metricer, metricer: metricer,
log: log, log: log,
gater: peerGater, gater: peerGater,
bandScoreThresholds: bandScoreThresholds,
} }
} }
...@@ -48,13 +116,13 @@ func NewScorer(peerGater PeerGater, peerStore Peerstore, metricer GossipMetricer ...@@ -48,13 +116,13 @@ func NewScorer(peerGater PeerGater, peerStore Peerstore, metricer GossipMetricer
// The returned [pubsub.ExtendedPeerScoreInspectFn] is called with a mapping of peer IDs to peer score snapshots. // The returned [pubsub.ExtendedPeerScoreInspectFn] is called with a mapping of peer IDs to peer score snapshots.
func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn { func (s *scorer) SnapshotHook() pubsub.ExtendedPeerScoreInspectFn {
return func(m map[peer.ID]*pubsub.PeerScoreSnapshot) { return func(m map[peer.ID]*pubsub.PeerScoreSnapshot) {
scoreMap := make(map[string]float64)
for id, snap := range m { for id, snap := range m {
// Record peer score in the metricer band := s.bandScoreThresholds.Bucket(snap.Score)
s.metricer.RecordPeerScoring(id, snap.Score) scoreMap[band] += 1
// Update with the peer gater
s.gater.Update(id, snap.Score) s.gater.Update(id, snap.Score)
} }
s.metricer.SetPeerScores(scoreMap)
} }
} }
......
...@@ -20,15 +20,18 @@ type PeerScorerTestSuite struct { ...@@ -20,15 +20,18 @@ type PeerScorerTestSuite struct {
mockGater *p2pMocks.PeerGater mockGater *p2pMocks.PeerGater
mockStore *p2pMocks.Peerstore mockStore *p2pMocks.Peerstore
mockMetricer *p2pMocks.GossipMetricer mockMetricer *p2pMocks.GossipMetricer
bandScorer *p2p.BandScoreThresholds
logger log.Logger logger log.Logger
} }
// SetupTest sets up the test suite. // SetupTest sets up the test suite.
func (testSuite *PeerScorerTestSuite) SetupTest() { func (testSuite *PeerScorerTestSuite) SetupTest() {
testSuite.mockGater = &p2pMocks.PeerGater{} testSuite.mockGater = &p2pMocks.PeerGater{}
// testSuite.mockConnGater = &p2pMocks.ConnectionGater{}
testSuite.mockStore = &p2pMocks.Peerstore{} testSuite.mockStore = &p2pMocks.Peerstore{}
testSuite.mockMetricer = &p2pMocks.GossipMetricer{} testSuite.mockMetricer = &p2pMocks.GossipMetricer{}
bandScorer, err := p2p.NewBandScorer("0:graylist;")
testSuite.NoError(err)
testSuite.bandScorer = bandScorer
testSuite.logger = testlog.Logger(testSuite.T(), log.LvlError) testSuite.logger = testlog.Logger(testSuite.T(), log.LvlError)
} }
...@@ -37,45 +40,49 @@ func TestPeerScorer(t *testing.T) { ...@@ -37,45 +40,49 @@ func TestPeerScorer(t *testing.T) {
suite.Run(t, new(PeerScorerTestSuite)) suite.Run(t, new(PeerScorerTestSuite))
} }
// TestPeerScorerOnConnect ensures we can call the OnConnect method on the peer scorer. // TestScorer_OnConnect ensures we can call the OnConnect method on the peer scorer.
func (testSuite *PeerScorerTestSuite) TestPeerScorerOnConnect() { func (testSuite *PeerScorerTestSuite) TestScorer_OnConnect() {
scorer := p2p.NewScorer( scorer := p2p.NewScorer(
testSuite.mockGater, testSuite.mockGater,
testSuite.mockStore, testSuite.mockStore,
testSuite.mockMetricer, testSuite.mockMetricer,
testSuite.bandScorer,
testSuite.logger, testSuite.logger,
) )
scorer.OnConnect() scorer.OnConnect()
} }
// TestPeerScorerOnDisconnect ensures we can call the OnDisconnect method on the peer scorer. // TestScorer_OnDisconnect ensures we can call the OnDisconnect method on the peer scorer.
func (testSuite *PeerScorerTestSuite) TestPeerScorerOnDisconnect() { func (testSuite *PeerScorerTestSuite) TestScorer_OnDisconnect() {
scorer := p2p.NewScorer( scorer := p2p.NewScorer(
testSuite.mockGater, testSuite.mockGater,
testSuite.mockStore, testSuite.mockStore,
testSuite.mockMetricer, testSuite.mockMetricer,
testSuite.bandScorer,
testSuite.logger, testSuite.logger,
) )
scorer.OnDisconnect() scorer.OnDisconnect()
} }
// TestSnapshotHook tests running the snapshot hook on the peer scorer. // TestScorer_SnapshotHook tests running the snapshot hook on the peer scorer.
func (testSuite *PeerScorerTestSuite) TestSnapshotHook() { func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHook() {
scorer := p2p.NewScorer( scorer := p2p.NewScorer(
testSuite.mockGater, testSuite.mockGater,
testSuite.mockStore, testSuite.mockStore,
testSuite.mockMetricer, testSuite.mockMetricer,
testSuite.bandScorer,
testSuite.logger, testSuite.logger,
) )
inspectFn := scorer.SnapshotHook() inspectFn := scorer.SnapshotHook()
// Mock the snapshot updates
// This doesn't return anything
testSuite.mockMetricer.On("RecordPeerScoring", peer.ID("peer1"), float64(-100)).Return(nil)
// Mock the peer gater call // Mock the peer gater call
testSuite.mockGater.On("Update", peer.ID("peer1"), float64(-100)).Return(nil) testSuite.mockGater.On("Update", peer.ID("peer1"), float64(-100)).Return(nil)
// The metricer should then be called with the peer score band map
testSuite.mockMetricer.On("SetPeerScores", map[string]float64{
"graylist": 1,
}).Return(nil)
// Apply the snapshot // Apply the snapshot
snapshotMap := map[peer.ID]*pubsub.PeerScoreSnapshot{ snapshotMap := map[peer.ID]*pubsub.PeerScoreSnapshot{
peer.ID("peer1"): { peer.ID("peer1"): {
...@@ -85,24 +92,26 @@ func (testSuite *PeerScorerTestSuite) TestSnapshotHook() { ...@@ -85,24 +92,26 @@ func (testSuite *PeerScorerTestSuite) TestSnapshotHook() {
inspectFn(snapshotMap) inspectFn(snapshotMap)
} }
// TestSnapshotHookBlockPeer tests running the snapshot hook on the peer scorer with a peer score below the threshold. // TestScorer_SnapshotHookBlocksPeer tests running the snapshot hook on the peer scorer with a peer score below the threshold.
// This implies that the peer should be blocked. // This implies that the peer should be blocked.
func (testSuite *PeerScorerTestSuite) TestSnapshotHookBlockPeer() { func (testSuite *PeerScorerTestSuite) TestScorer_SnapshotHookBlocksPeer() {
scorer := p2p.NewScorer( scorer := p2p.NewScorer(
testSuite.mockGater, testSuite.mockGater,
testSuite.mockStore, testSuite.mockStore,
testSuite.mockMetricer, testSuite.mockMetricer,
testSuite.bandScorer,
testSuite.logger, testSuite.logger,
) )
inspectFn := scorer.SnapshotHook() inspectFn := scorer.SnapshotHook()
// Mock the snapshot updates
// This doesn't return anything
testSuite.mockMetricer.On("RecordPeerScoring", peer.ID("peer1"), float64(-101)).Return(nil)
// Mock the peer gater call // Mock the peer gater call
testSuite.mockGater.On("Update", peer.ID("peer1"), float64(-101)).Return(nil) testSuite.mockGater.On("Update", peer.ID("peer1"), float64(-101)).Return(nil)
// The metricer should then be called with the peer score band map
testSuite.mockMetricer.On("SetPeerScores", map[string]float64{
"graylist": 1,
}).Return(nil)
// Apply the snapshot // Apply the snapshot
snapshotMap := map[peer.ID]*pubsub.PeerScoreSnapshot{ snapshotMap := map[peer.ID]*pubsub.PeerScoreSnapshot{
peer.ID("peer1"): { peer.ID("peer1"): {
......
...@@ -14,7 +14,7 @@ func ConfigurePeerScoring(h host.Host, g ConnectionGater, gossipConf GossipSetup ...@@ -14,7 +14,7 @@ func ConfigurePeerScoring(h host.Host, g ConnectionGater, gossipConf GossipSetup
peerScoreThresholds := NewPeerScoreThresholds() peerScoreThresholds := NewPeerScoreThresholds()
banEnabled := gossipConf.BanPeers() banEnabled := gossipConf.BanPeers()
peerGater := NewPeerGater(g, log, banEnabled) peerGater := NewPeerGater(g, log, banEnabled)
scorer := NewScorer(peerGater, h.Peerstore(), m, log) scorer := NewScorer(peerGater, h.Peerstore(), m, gossipConf.PeerBandScorer(), log)
opts := []pubsub.Option{} opts := []pubsub.Option{}
// Check the app specific score since libp2p doesn't export it's [validate] function :/ // Check the app specific score since libp2p doesn't export it's [validate] function :/
if peerScoreParams != nil && peerScoreParams.AppSpecificScore != nil { if peerScoreParams != nil && peerScoreParams.AppSpecificScore != nil {
......
...@@ -11,7 +11,7 @@ import ( ...@@ -11,7 +11,7 @@ import (
p2pMocks "github.com/ethereum-optimism/optimism/op-node/p2p/mocks" p2pMocks "github.com/ethereum-optimism/optimism/op-node/p2p/mocks"
testlog "github.com/ethereum-optimism/optimism/op-node/testlog" testlog "github.com/ethereum-optimism/optimism/op-node/testlog"
mock "github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
suite "github.com/stretchr/testify/suite" suite "github.com/stretchr/testify/suite"
log "github.com/ethereum/go-ethereum/log" log "github.com/ethereum/go-ethereum/log"
...@@ -30,6 +30,7 @@ type PeerScoresTestSuite struct { ...@@ -30,6 +30,7 @@ type PeerScoresTestSuite struct {
mockGater *p2pMocks.ConnectionGater mockGater *p2pMocks.ConnectionGater
mockStore *p2pMocks.Peerstore mockStore *p2pMocks.Peerstore
mockMetricer *p2pMocks.GossipMetricer mockMetricer *p2pMocks.GossipMetricer
bandScorer p2p.BandScoreThresholds
logger log.Logger logger log.Logger
} }
...@@ -38,6 +39,9 @@ func (testSuite *PeerScoresTestSuite) SetupTest() { ...@@ -38,6 +39,9 @@ func (testSuite *PeerScoresTestSuite) SetupTest() {
testSuite.mockGater = &p2pMocks.ConnectionGater{} testSuite.mockGater = &p2pMocks.ConnectionGater{}
testSuite.mockStore = &p2pMocks.Peerstore{} testSuite.mockStore = &p2pMocks.Peerstore{}
testSuite.mockMetricer = &p2pMocks.GossipMetricer{} testSuite.mockMetricer = &p2pMocks.GossipMetricer{}
bandScorer, err := p2p.NewBandScorer("0:graylist;")
testSuite.NoError(err)
testSuite.bandScorer = *bandScorer
testSuite.logger = testlog.Logger(testSuite.T(), log.LvlError) testSuite.logger = testlog.Logger(testSuite.T(), log.LvlError)
} }
...@@ -68,6 +72,7 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts [] ...@@ -68,6 +72,7 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []
rt := pubsub.DefaultGossipSubRouter(h) rt := pubsub.DefaultGossipSubRouter(h)
opts := []pubsub.Option{} opts := []pubsub.Option{}
opts = append(opts, p2p.ConfigurePeerScoring(h, testSuite.mockGater, &p2p.Config{ opts = append(opts, p2p.ConfigurePeerScoring(h, testSuite.mockGater, &p2p.Config{
BandScoreThresholds: testSuite.bandScorer,
PeerScoring: pubsub.PeerScoreParams{ PeerScoring: pubsub.PeerScoreParams{
AppSpecificScore: func(p peer.ID) float64 { AppSpecificScore: func(p peer.ID) float64 {
if p == hosts[0].ID() { if p == hosts[0].ID() {
...@@ -118,8 +123,7 @@ func (testSuite *PeerScoresTestSuite) TestNegativeScores() { ...@@ -118,8 +123,7 @@ func (testSuite *PeerScoresTestSuite) TestNegativeScores() {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
testSuite.mockMetricer.On("RecordPeerScoring", mock.Anything, float64(0)).Return(nil) testSuite.mockMetricer.On("SetPeerScores", mock.Anything).Return(nil)
testSuite.mockMetricer.On("RecordPeerScoring", mock.Anything, float64(-1000)).Return(nil)
testSuite.mockGater.On("ListBlockedPeers").Return([]peer.ID{}) testSuite.mockGater.On("ListBlockedPeers").Return([]peer.ID{})
......
...@@ -68,6 +68,10 @@ func (p *Prepared) PeerScoringParams() *pubsub.PeerScoreParams { ...@@ -68,6 +68,10 @@ func (p *Prepared) PeerScoringParams() *pubsub.PeerScoreParams {
return nil return nil
} }
func (p *Prepared) PeerBandScorer() *BandScoreThresholds {
return nil
}
func (p *Prepared) BanPeers() bool { func (p *Prepared) BanPeers() bool {
return false return false
} }
......
...@@ -14,9 +14,17 @@ import ( ...@@ -14,9 +14,17 @@ import (
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
) )
var ErrMaxFrameSizeTooSmall = errors.New("maxSize is too small to fit the fixed frame overhead")
var ErrNotDepositTx = errors.New("first transaction in block is not a deposit tx") var ErrNotDepositTx = errors.New("first transaction in block is not a deposit tx")
var ErrTooManyRLPBytes = errors.New("batch would cause RLP bytes to go over limit") var ErrTooManyRLPBytes = errors.New("batch would cause RLP bytes to go over limit")
// FrameV0OverHeadSize is the absolute minimum size of a frame.
// This is the fixed overhead frame size, calculated as specified
// in the [Frame Format] specs: 16 + 2 + 4 + 1 = 23 bytes.
//
// [Frame Format]: https://github.com/ethereum-optimism/optimism/blob/develop/specs/derivation.md#frame-format
const FrameV0OverHeadSize = 23
type ChannelOut struct { type ChannelOut struct {
id ChannelID id ChannelID
// Frame ID of the next frame to emit. Increment after emitting // Frame ID of the next frame to emit. Increment after emitting
...@@ -141,19 +149,23 @@ func (co *ChannelOut) Close() error { ...@@ -141,19 +149,23 @@ func (co *ChannelOut) Close() error {
// OutputFrame writes a frame to w with a given max size and returns the frame // OutputFrame writes a frame to w with a given max size and returns the frame
// number. // number.
// Use `ReadyBytes`, `Flush`, and `Close` to modify the ready buffer. // Use `ReadyBytes`, `Flush`, and `Close` to modify the ready buffer.
// Returns io.EOF when the channel is closed & there are no more frames // Returns an error if the `maxSize` < FrameV0OverHeadSize.
// Returns io.EOF when the channel is closed & there are no more frames.
// Returns nil if there is still more buffered data. // Returns nil if there is still more buffered data.
// Returns and error if it ran into an error during processing. // Returns an error if it ran into an error during processing.
func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, error) { func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, error) {
f := Frame{ f := Frame{
ID: co.id, ID: co.id,
FrameNumber: uint16(co.frame), FrameNumber: uint16(co.frame),
} }
// Check that the maxSize is large enough for the frame overhead size.
if maxSize < FrameV0OverHeadSize {
return 0, ErrMaxFrameSizeTooSmall
}
// Copy data from the local buffer into the frame data buffer // Copy data from the local buffer into the frame data buffer
// Don't go past the maxSize with the fixed frame overhead. maxDataSize := maxSize - FrameV0OverHeadSize
// Fixed overhead: 16 + 2 + 4 + 1 = 23 bytes.
maxDataSize := maxSize - 23
if maxDataSize > uint64(co.buf.Len()) { if maxDataSize > uint64(co.buf.Len()) {
maxDataSize = uint64(co.buf.Len()) maxDataSize = uint64(co.buf.Len())
// If we are closed & will not spill past the current frame // If we are closed & will not spill past the current frame
......
...@@ -29,6 +29,22 @@ func TestChannelOutAddBlock(t *testing.T) { ...@@ -29,6 +29,22 @@ func TestChannelOutAddBlock(t *testing.T) {
}) })
} }
// TestOutputFrameSmallMaxSize tests that calling [OutputFrame] with a small
// max size that is below the fixed frame size overhead of 23, will return
// an error.
func TestOutputFrameSmallMaxSize(t *testing.T) {
cout, err := NewChannelOut()
require.NoError(t, err)
// Call OutputFrame with the range of small max size values that err
var w bytes.Buffer
for i := 0; i < 23; i++ {
fid, err := cout.OutputFrame(&w, uint64(i))
require.ErrorIs(t, err, ErrMaxFrameSizeTooSmall)
require.Zero(t, fid)
}
}
// TestRLPByteLimit ensures that stream encoder is properly limiting the length. // TestRLPByteLimit ensures that stream encoder is properly limiting the length.
// It will decode the input if `len(input) <= inputLimit`. // It will decode the input if `len(input) <= inputLimit`.
func TestRLPByteLimit(t *testing.T) { func TestRLPByteLimit(t *testing.T) {
......
...@@ -107,7 +107,7 @@ type EngineQueue struct { ...@@ -107,7 +107,7 @@ type EngineQueue struct {
// The queued-up attributes // The queued-up attributes
safeAttributesParent eth.L2BlockRef safeAttributesParent eth.L2BlockRef
safeAttributes *eth.PayloadAttributes safeAttributes *eth.PayloadAttributes
unsafePayloads PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps unsafePayloads *PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps and duplicates
// Tracks which L2 blocks where last derived from which L1 block. At most finalityLookback large. // Tracks which L2 blocks where last derived from which L1 block. At most finalityLookback large.
finalityData []FinalityData finalityData []FinalityData
...@@ -132,11 +132,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M ...@@ -132,11 +132,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M
engine: engine, engine: engine,
metrics: metrics, metrics: metrics,
finalityData: make([]FinalityData, 0, finalityLookback), finalityData: make([]FinalityData, 0, finalityLookback),
unsafePayloads: PayloadsQueue{ unsafePayloads: NewPayloadsQueue(maxUnsafePayloadsMemory, payloadMemSize),
MaxSize: maxUnsafePayloadsMemory,
SizeFn: payloadMemSize,
blockNos: make(map[uint64]bool),
},
prev: prev, prev: prev,
l1Fetcher: l1Fetcher, l1Fetcher: l1Fetcher,
} }
...@@ -682,7 +678,8 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System ...@@ -682,7 +678,8 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
return io.EOF return io.EOF
} }
// GetUnsafeQueueGap retrieves the current [start, end] range of the gap between the tip of the unsafe priority queue and the unsafe head. // GetUnsafeQueueGap retrieves the current [start, end) range (incl. start, excl. end)
// of the gap between the tip of the unsafe priority queue and the unsafe head.
// If there is no gap, the difference between end and start will be 0. // If there is no gap, the difference between end and start will be 0.
func (eq *EngineQueue) GetUnsafeQueueGap(expectedNumber uint64) (start uint64, end uint64) { func (eq *EngineQueue) GetUnsafeQueueGap(expectedNumber uint64) (start uint64, end uint64) {
// The start of the gap is always the unsafe head + 1 // The start of the gap is always the unsafe head + 1
...@@ -691,9 +688,11 @@ func (eq *EngineQueue) GetUnsafeQueueGap(expectedNumber uint64) (start uint64, e ...@@ -691,9 +688,11 @@ func (eq *EngineQueue) GetUnsafeQueueGap(expectedNumber uint64) (start uint64, e
// If the priority queue is empty, the end is the first block number at the top of the priority queue // If the priority queue is empty, the end is the first block number at the top of the priority queue
// Otherwise, the end is the expected block number // Otherwise, the end is the expected block number
if first := eq.unsafePayloads.Peek(); first != nil { if first := eq.unsafePayloads.Peek(); first != nil {
// Don't include the payload we already have in the sync range
end = first.ID().Number end = first.ID().Number
} else { } else {
end = expectedNumber // Include the expected payload in the sync range
end = expectedNumber + 1
} }
return start, end return start, end
......
...@@ -5,6 +5,8 @@ import ( ...@@ -5,6 +5,8 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
) )
...@@ -48,8 +50,8 @@ func (pq *payloadsByNumber) Pop() any { ...@@ -48,8 +50,8 @@ func (pq *payloadsByNumber) Pop() any {
} }
const ( const (
// ~580 bytes per payload, with some margin for overhead // ~580 bytes per payload, with some margin for overhead like map data
payloadMemFixedCost uint64 = 600 payloadMemFixedCost uint64 = 800
// 24 bytes per tx overhead (size of slice header in memory) // 24 bytes per tx overhead (size of slice header in memory)
payloadTxMemOverhead uint64 = 24 payloadTxMemOverhead uint64 = 24
) )
...@@ -72,15 +74,25 @@ func payloadMemSize(p *eth.ExecutionPayload) uint64 { ...@@ -72,15 +74,25 @@ func payloadMemSize(p *eth.ExecutionPayload) uint64 {
// without the need to use heap.Push/heap.Pop as caller. // without the need to use heap.Push/heap.Pop as caller.
// PayloadsQueue maintains a MaxSize by counting and tracking sizes of added eth.ExecutionPayload entries. // PayloadsQueue maintains a MaxSize by counting and tracking sizes of added eth.ExecutionPayload entries.
// When the size grows too large, the first (lowest block-number) payload is removed from the queue. // When the size grows too large, the first (lowest block-number) payload is removed from the queue.
// PayloadsQueue allows entries with same block number, or even full duplicates. // PayloadsQueue allows entries with same block number, but does not allow duplicate blocks
type PayloadsQueue struct { type PayloadsQueue struct {
pq payloadsByNumber pq payloadsByNumber
currentSize uint64 currentSize uint64
MaxSize uint64 MaxSize uint64
blockNos map[uint64]bool blockHashes map[common.Hash]struct{}
SizeFn func(p *eth.ExecutionPayload) uint64 SizeFn func(p *eth.ExecutionPayload) uint64
} }
func NewPayloadsQueue(maxSize uint64, sizeFn func(p *eth.ExecutionPayload) uint64) *PayloadsQueue {
return &PayloadsQueue{
pq: nil,
currentSize: 0,
MaxSize: maxSize,
blockHashes: make(map[common.Hash]struct{}),
SizeFn: sizeFn,
}
}
func (upq *PayloadsQueue) Len() int { func (upq *PayloadsQueue) Len() int {
return len(upq.pq) return len(upq.pq)
} }
...@@ -100,8 +112,8 @@ func (upq *PayloadsQueue) Push(p *eth.ExecutionPayload) error { ...@@ -100,8 +112,8 @@ func (upq *PayloadsQueue) Push(p *eth.ExecutionPayload) error {
if p == nil { if p == nil {
return errors.New("cannot add nil payload") return errors.New("cannot add nil payload")
} }
if upq.blockNos[p.ID().Number] { if _, ok := upq.blockHashes[p.BlockHash]; ok {
return errors.New("cannot add duplicate payload") return fmt.Errorf("cannot add duplicate payload %s", p.ID())
} }
size := upq.SizeFn(p) size := upq.SizeFn(p)
if size > upq.MaxSize { if size > upq.MaxSize {
...@@ -115,7 +127,7 @@ func (upq *PayloadsQueue) Push(p *eth.ExecutionPayload) error { ...@@ -115,7 +127,7 @@ func (upq *PayloadsQueue) Push(p *eth.ExecutionPayload) error {
for upq.currentSize > upq.MaxSize { for upq.currentSize > upq.MaxSize {
upq.Pop() upq.Pop()
} }
upq.blockNos[p.ID().Number] = true upq.blockHashes[p.BlockHash] = struct{}{}
return nil return nil
} }
...@@ -137,7 +149,7 @@ func (upq *PayloadsQueue) Pop() *eth.ExecutionPayload { ...@@ -137,7 +149,7 @@ func (upq *PayloadsQueue) Pop() *eth.ExecutionPayload {
} }
ps := heap.Pop(&upq.pq).(payloadAndSize) // nosemgrep ps := heap.Pop(&upq.pq).(payloadAndSize) // nosemgrep
upq.currentSize -= ps.size upq.currentSize -= ps.size
// remove the key from the blockNos map // remove the key from the block hashes map
delete(upq.blockNos, ps.payload.ID().Number) delete(upq.blockHashes, ps.payload.BlockHash)
return ps.payload return ps.payload
} }
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"container/heap" "container/heap"
"testing" "testing"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
...@@ -74,20 +75,17 @@ func TestPayloadMemSize(t *testing.T) { ...@@ -74,20 +75,17 @@ func TestPayloadMemSize(t *testing.T) {
} }
func TestPayloadsQueue(t *testing.T) { func TestPayloadsQueue(t *testing.T) {
pq := PayloadsQueue{ pq := NewPayloadsQueue(payloadMemFixedCost*3, payloadMemSize)
MaxSize: payloadMemFixedCost * 3,
SizeFn: payloadMemSize,
blockNos: make(map[uint64]bool),
}
require.Equal(t, 0, pq.Len()) require.Equal(t, 0, pq.Len())
require.Equal(t, (*eth.ExecutionPayload)(nil), pq.Peek()) require.Equal(t, (*eth.ExecutionPayload)(nil), pq.Peek())
require.Equal(t, (*eth.ExecutionPayload)(nil), pq.Pop()) require.Equal(t, (*eth.ExecutionPayload)(nil), pq.Pop())
a := &eth.ExecutionPayload{BlockNumber: 3} a := &eth.ExecutionPayload{BlockNumber: 3, BlockHash: common.Hash{3}}
b := &eth.ExecutionPayload{BlockNumber: 4} b := &eth.ExecutionPayload{BlockNumber: 4, BlockHash: common.Hash{4}}
c := &eth.ExecutionPayload{BlockNumber: 5} c := &eth.ExecutionPayload{BlockNumber: 5, BlockHash: common.Hash{5}}
d := &eth.ExecutionPayload{BlockNumber: 6} d := &eth.ExecutionPayload{BlockNumber: 6, BlockHash: common.Hash{6}}
bAlt := &eth.ExecutionPayload{BlockNumber: 4} bAlt := &eth.ExecutionPayload{BlockNumber: 4, BlockHash: common.Hash{0xff}}
bDup := &eth.ExecutionPayload{BlockNumber: 4, BlockHash: common.Hash{4}}
require.NoError(t, pq.Push(b)) require.NoError(t, pq.Push(b))
require.Equal(t, pq.Len(), 1) require.Equal(t, pq.Len(), 1)
require.Equal(t, pq.Peek(), b) require.Equal(t, pq.Peek(), b)
...@@ -130,7 +128,9 @@ func TestPayloadsQueue(t *testing.T) { ...@@ -130,7 +128,9 @@ func TestPayloadsQueue(t *testing.T) {
require.Equal(t, pq.Peek(), a) require.Equal(t, pq.Peek(), a)
// No duplicates allowed // No duplicates allowed
require.Error(t, pq.Push(bAlt)) require.Error(t, pq.Push(bDup))
// But reorg data allowed
require.NoError(t, pq.Push(bAlt))
require.NoError(t, pq.Push(d)) require.NoError(t, pq.Push(d))
require.Equal(t, pq.Len(), 3) require.Equal(t, pq.Len(), 3)
......
...@@ -10,7 +10,6 @@ import ( ...@@ -10,7 +10,6 @@ import (
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/sources"
) )
type Metrics interface { type Metrics interface {
...@@ -82,8 +81,19 @@ type Network interface { ...@@ -82,8 +81,19 @@ type Network interface {
PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayload) error PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayload) error
} }
type AltSync interface {
// RequestL2Range informs the sync source that the given range of L2 blocks is missing,
// and should be retrieved from any available alternative syncing source.
// The start of the range is inclusive, the end is exclusive.
// The sync results should be returned back to the driver via the OnUnsafeL2Payload(ctx, payload) method.
// The latest requested range should always take priority over previous requests.
// There may be overlaps in requested ranges.
// An error may be returned if the scheduling fails immediately, e.g. a context timeout.
RequestL2Range(ctx context.Context, start, end uint64) error
}
// NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks. // NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks.
func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, syncClient *sources.SyncClient, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics) *Driver { func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, altSync AltSync, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics) *Driver {
l1State := NewL1State(log, metrics) l1State := NewL1State(log, metrics)
sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1) sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1)
findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth) findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth)
...@@ -115,6 +125,6 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, sy ...@@ -115,6 +125,6 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, sy
l1SafeSig: make(chan eth.L1BlockRef, 10), l1SafeSig: make(chan eth.L1BlockRef, 10),
l1FinalizedSig: make(chan eth.L1BlockRef, 10), l1FinalizedSig: make(chan eth.L1BlockRef, 10),
unsafeL2Payloads: make(chan *eth.ExecutionPayload, 10), unsafeL2Payloads: make(chan *eth.ExecutionPayload, 10),
L2SyncCl: syncClient, altSync: altSync,
} }
} }
...@@ -16,7 +16,6 @@ import ( ...@@ -16,7 +16,6 @@ import (
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-service/backoff" "github.com/ethereum-optimism/optimism/op-service/backoff"
) )
...@@ -64,8 +63,8 @@ type Driver struct { ...@@ -64,8 +63,8 @@ type Driver struct {
l1SafeSig chan eth.L1BlockRef l1SafeSig chan eth.L1BlockRef
l1FinalizedSig chan eth.L1BlockRef l1FinalizedSig chan eth.L1BlockRef
// Backup unsafe sync client // Interface to signal the L2 block range to sync.
L2SyncCl *sources.SyncClient altSync AltSync
// L2 Signals: // L2 Signals:
...@@ -200,11 +199,12 @@ func (s *Driver) eventLoop() { ...@@ -200,11 +199,12 @@ func (s *Driver) eventLoop() {
sequencerTimer.Reset(delay) sequencerTimer.Reset(delay)
} }
// Create a ticker to check if there is a gap in the engine queue every 15 seconds // Create a ticker to check if there is a gap in the engine queue. Whenever
// If there is, we send requests to the backup RPC to retrieve the missing payloads // there is, we send requests to sync source to retrieve the missing payloads.
// and add them to the unsafe queue. syncCheckInterval := time.Duration(s.config.BlockTime) * time.Second * 2
altSyncTicker := time.NewTicker(15 * time.Second) altSyncTicker := time.NewTicker(syncCheckInterval)
defer altSyncTicker.Stop() defer altSyncTicker.Stop()
lastUnsafeL2 := s.derivation.UnsafeL2Head()
for { for {
// If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action. // If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action.
...@@ -220,6 +220,13 @@ func (s *Driver) eventLoop() { ...@@ -220,6 +220,13 @@ func (s *Driver) eventLoop() {
sequencerCh = nil sequencerCh = nil
} }
// If the engine is not ready, or if the L2 head is actively changing, then reset the alt-sync:
// there is no need to request L2 blocks when we are syncing already.
if head := s.derivation.UnsafeL2Head(); head != lastUnsafeL2 || !s.derivation.EngineReady() {
lastUnsafeL2 = head
altSyncTicker.Reset(syncCheckInterval)
}
select { select {
case <-sequencerCh: case <-sequencerCh:
payload, err := s.sequencer.RunNextSequencerAction(ctx) payload, err := s.sequencer.RunNextSequencerAction(ctx)
...@@ -237,10 +244,12 @@ func (s *Driver) eventLoop() { ...@@ -237,10 +244,12 @@ func (s *Driver) eventLoop() {
} }
planSequencerAction() // schedule the next sequencer action to keep the sequencing looping planSequencerAction() // schedule the next sequencer action to keep the sequencing looping
case <-altSyncTicker.C: case <-altSyncTicker.C:
// Check if there is a gap in the current unsafe payload queue. If there is, attempt to fetch // Check if there is a gap in the current unsafe payload queue.
// missing payloads from the backup RPC (if it is configured). ctx, cancel := context.WithTimeout(ctx, time.Second*2)
if s.L2SyncCl != nil { err := s.checkForGapInUnsafeQueue(ctx)
s.checkForGapInUnsafeQueue(ctx) cancel()
if err != nil {
s.log.Warn("failed to check for unsafe L2 blocks to sync", "err", err)
} }
case payload := <-s.unsafeL2Payloads: case payload := <-s.unsafeL2Payloads:
s.snapshot("New unsafe payload") s.snapshot("New unsafe payload")
...@@ -462,35 +471,29 @@ type hashAndErrorChannel struct { ...@@ -462,35 +471,29 @@ type hashAndErrorChannel struct {
err chan error err chan error
} }
// checkForGapInUnsafeQueue checks if there is a gap in the unsafe queue and attempts to retrieve the missing payloads from the backup RPC. // checkForGapInUnsafeQueue checks if there is a gap in the unsafe queue and attempts to retrieve the missing payloads from an alt-sync method.
// WARNING: The sync client's attempt to retrieve the missing payloads is not guaranteed to succeed, and it will fail silently (besides // WARNING: This is only an outgoing signal, the blocks are not guaranteed to be retrieved.
// emitting warning logs) if the requests fail. // Results are received through OnUnsafeL2Payload.
func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) { func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) error {
// subtract genesis time from wall clock to get the time elapsed since genesis, and then divide that // subtract genesis time from wall clock to get the time elapsed since genesis, and then divide that
// difference by the block time to get the expected L2 block number at the current time. If the // difference by the block time to get the expected L2 block number at the current time. If the
// unsafe head does not have this block number, then there is a gap in the queue. // unsafe head does not have this block number, then there is a gap in the queue.
wallClock := uint64(time.Now().Unix()) wallClock := uint64(time.Now().Unix())
genesisTimestamp := s.config.Genesis.L2Time genesisTimestamp := s.config.Genesis.L2Time
if wallClock < genesisTimestamp {
s.log.Debug("nothing to sync, did not reach genesis L2 time yet", "genesis", genesisTimestamp)
return nil
}
wallClockGenesisDiff := wallClock - genesisTimestamp wallClockGenesisDiff := wallClock - genesisTimestamp
expectedL2Block := wallClockGenesisDiff / s.config.BlockTime // Note: round down, we should not request blocks into the future.
blocksSinceGenesis := wallClockGenesisDiff / s.config.BlockTime
expectedL2Block := s.config.Genesis.L2.Number + blocksSinceGenesis
start, end := s.derivation.GetUnsafeQueueGap(expectedL2Block) start, end := s.derivation.GetUnsafeQueueGap(expectedL2Block)
size := end - start
// Check if there is a gap between the unsafe head and the expected L2 block number at the current time. // Check if there is a gap between the unsafe head and the expected L2 block number at the current time.
if size > 0 { if end > start {
s.log.Warn("Gap in payload queue tip and expected unsafe chain detected", "start", start, "end", end, "size", size) s.log.Debug("requesting missing unsafe L2 block range", "start", start, "end", end, "size", end-start)
s.log.Info("Attempting to fetch missing payloads from backup RPC", "start", start, "end", end, "size", size) return s.altSync.RequestL2Range(ctx, start, end)
// Attempt to fetch the missing payloads from the backup unsafe sync RPC concurrently.
// Concurrent requests are safe here due to the engine queue being a priority queue.
for blockNumber := start; blockNumber <= end; blockNumber++ {
select {
case s.L2SyncCl.FetchUnsafeBlock <- blockNumber:
// Do nothing- the block number was successfully sent into the channel
default:
return // If the channel is full, return and wait for the next iteration of the event loop
}
}
} }
return nil
} }
...@@ -109,6 +109,9 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain ...@@ -109,6 +109,9 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain
return nil, fmt.Errorf("failed to fetch current L2 forkchoice state: %w", err) return nil, fmt.Errorf("failed to fetch current L2 forkchoice state: %w", err)
} }
lgr.Info("Loaded current L2 heads", "unsafe", result.Unsafe, "safe", result.Safe, "finalized", result.Finalized,
"unsafe_origin", result.Unsafe.L1Origin, "unsafe_origin", result.Safe.L1Origin)
// Remember original unsafe block to determine reorg depth // Remember original unsafe block to determine reorg depth
prevUnsafe := result.Unsafe prevUnsafe := result.Unsafe
...@@ -134,6 +137,7 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain ...@@ -134,6 +137,7 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain
// Exit, find-sync start should start over, to move to an available L1 chain with block-by-number / not-found case. // Exit, find-sync start should start over, to move to an available L1 chain with block-by-number / not-found case.
return nil, fmt.Errorf("failed to retrieve L1 block: %w", err) return nil, fmt.Errorf("failed to retrieve L1 block: %w", err)
} }
lgr.Info("Walking back L1Block by hash", "curr", l1Block, "next", b, "l2block", n)
l1Block = b l1Block = b
ahead = false ahead = false
} else if l1Block == (eth.L1BlockRef{}) || n.L1Origin.Hash != l1Block.Hash { } else if l1Block == (eth.L1BlockRef{}) || n.L1Origin.Hash != l1Block.Hash {
...@@ -145,9 +149,10 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain ...@@ -145,9 +149,10 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain
} }
l1Block = b l1Block = b
ahead = notFound ahead = notFound
lgr.Info("Walking back L1Block by number", "curr", l1Block, "next", b, "l2block", n)
} }
lgr.Trace("walking sync start", "number", n.Number) lgr.Trace("walking sync start", "l2block", n)
// Don't walk past genesis. If we were at the L2 genesis, but could not find its L1 origin, // Don't walk past genesis. If we were at the L2 genesis, but could not find its L1 origin,
// the L2 chain is building on the wrong L1 branch. // the L2 chain is building on the wrong L1 branch.
...@@ -201,6 +206,8 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain ...@@ -201,6 +206,8 @@ func FindL2Heads(ctx context.Context, cfg *rollup.Config, l1 L1Chain, l2 L2Chain
// Don't traverse further than the finalized head to find a safe head // Don't traverse further than the finalized head to find a safe head
if n.Number == result.Finalized.Number { if n.Number == result.Finalized.Number {
lgr.Info("Hit finalized L2 head, returning immediately", "unsafe", result.Unsafe, "safe", result.Safe,
"finalized", result.Finalized, "unsafe_origin", result.Unsafe.L1Origin, "unsafe_origin", result.Safe.L1Origin)
result.Safe = n result.Safe = n
return result, nil return result, nil
} }
......
...@@ -136,6 +136,7 @@ func NewL2EndpointConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointConf ...@@ -136,6 +136,7 @@ func NewL2EndpointConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointConf
func NewL2SyncEndpointConfig(ctx *cli.Context) *node.L2SyncEndpointConfig { func NewL2SyncEndpointConfig(ctx *cli.Context) *node.L2SyncEndpointConfig {
return &node.L2SyncEndpointConfig{ return &node.L2SyncEndpointConfig{
L2NodeAddr: ctx.GlobalString(flags.BackupL2UnsafeSyncRPC.Name), L2NodeAddr: ctx.GlobalString(flags.BackupL2UnsafeSyncRPC.Name),
TrustRPC: ctx.GlobalBool(flags.BackupL2UnsafeSyncRPCTrustRPC.Name),
} }
} }
......
This diff is collapsed.
...@@ -140,3 +140,40 @@ func TestEthClient_InfoByNumber(t *testing.T) { ...@@ -140,3 +140,40 @@ func TestEthClient_InfoByNumber(t *testing.T) {
require.Equal(t, info, expectedInfo) require.Equal(t, info, expectedInfo)
m.Mock.AssertExpectations(t) m.Mock.AssertExpectations(t)
} }
func TestEthClient_WrongInfoByNumber(t *testing.T) {
m := new(mockRPC)
_, rhdr := randHeader()
rhdr2 := *rhdr
rhdr2.Number += 1
n := rhdr.Number
ctx := context.Background()
m.On("CallContext", ctx, new(*rpcHeader),
"eth_getBlockByNumber", []any{n.String(), false}).Run(func(args mock.Arguments) {
*args[1].(**rpcHeader) = &rhdr2
}).Return([]error{nil})
s, err := NewL1Client(m, nil, nil, L1ClientDefaultConfig(&rollup.Config{SeqWindowSize: 10}, true, RPCKindBasic))
require.NoError(t, err)
_, err = s.InfoByNumber(ctx, uint64(n))
require.Error(t, err, "cannot accept the wrong block")
m.Mock.AssertExpectations(t)
}
func TestEthClient_WrongInfoByHash(t *testing.T) {
m := new(mockRPC)
_, rhdr := randHeader()
rhdr2 := *rhdr
rhdr2.Root[0] += 1
rhdr2.Hash = rhdr2.computeBlockHash()
k := rhdr.Hash
ctx := context.Background()
m.On("CallContext", ctx, new(*rpcHeader),
"eth_getBlockByHash", []any{k, false}).Run(func(args mock.Arguments) {
*args[1].(**rpcHeader) = &rhdr2
}).Return([]error{nil})
s, err := NewL1Client(m, nil, nil, L1ClientDefaultConfig(&rollup.Config{SeqWindowSize: 10}, true, RPCKindBasic))
require.NoError(t, err)
_, err = s.InfoByHash(ctx, k)
require.Error(t, err, "cannot accept the wrong block")
m.Mock.AssertExpectations(t)
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment