Commit a633783d authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into jg/tx_manager_owns_cli_setup

parents d03c20ce 54494814
......@@ -365,7 +365,7 @@ jobs:
push: true
tags: ethereumoptimism/wd-mon:${{ needs.release.outputs.wd-mon }},ethereumoptimism/wd-mon:latest
drippie-mon:
balance-mon:
name: Publish Balance Monitor Version ${{ needs.release.outputs.balance-mon }}
needs: release
if: needs.release.outputs.balance-mon != ''
......
......@@ -51,7 +51,7 @@ func Main(version string, cliCtx *cli.Context) error {
return err
}
}
defer batchSubmitter.StopIfRunning()
defer batchSubmitter.StopIfRunning(context.Background())
ctx, cancel := context.WithCancel(context.Background())
......
......@@ -18,6 +18,7 @@ var (
ErrMaxDurationReached = errors.New("max channel duration reached")
ErrChannelTimeoutClose = errors.New("close to channel timeout")
ErrSeqWindowClose = errors.New("close to sequencer window timeout")
ErrTerminated = errors.New("channel terminated")
)
type ChannelFullError struct {
......@@ -188,7 +189,7 @@ func (c *channelBuilder) Reset() error {
}
// AddBlock adds a block to the channel compression pipeline. IsFull should be
// called aftewards to test whether the channel is full. If full, a new channel
// called afterwards to test whether the channel is full. If full, a new channel
// must be started.
//
// AddBlock returns a ChannelFullError if called even though the channel is
......@@ -307,16 +308,17 @@ func (c *channelBuilder) IsFull() bool {
// FullErr returns the reason why the channel is full. If not full yet, it
// returns nil.
//
// It returns a ChannelFullError wrapping one of six possible reasons for the
// channel being full:
// It returns a ChannelFullError wrapping one of the following possible reasons
// for the channel being full:
// - ErrInputTargetReached if the target amount of input data has been reached,
// - derive.MaxRLPBytesPerChannel if the general maximum amount of input data
// would have been exceeded by the latest AddBlock call,
// - ErrMaxFrameIndex if the maximum number of frames has been generated
// (uint16),
// - ErrMaxDurationReached if the max channel duration got reached.
// - ErrChannelTimeoutClose if the consensus channel timeout got too close.
// - ErrSeqWindowClose if the end of the sequencer window got too close.
// - ErrMaxDurationReached if the max channel duration got reached,
// - ErrChannelTimeoutClose if the consensus channel timeout got too close,
// - ErrSeqWindowClose if the end of the sequencer window got too close,
// - ErrTerminated if the channel was explicitly terminated.
func (c *channelBuilder) FullErr() error {
return c.fullErr
}
......@@ -402,6 +404,14 @@ func (c *channelBuilder) outputFrame() error {
return err // possibly io.EOF (last frame)
}
// Close immediately marks the channel as full with an ErrTerminated
// if the channel is not already full.
func (c *channelBuilder) Close() {
if !c.IsFull() {
c.setFullErr(ErrTerminated)
}
}
// HasFrame returns whether there's any available frame. If true, it can be
// popped using NextFrame().
//
......
......@@ -41,6 +41,9 @@ type channelManager struct {
pendingTransactions map[txID]txData
// Set of confirmed txID -> inclusion block. For determining if the channel is timed out
confirmedTransactions map[txID]eth.BlockID
// if set to true, prevents production of any new channel frames
closed bool
}
func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) *channelManager {
......@@ -60,6 +63,7 @@ func (s *channelManager) Clear() {
s.log.Trace("clearing channel manager state")
s.blocks = s.blocks[:0]
s.tip = common.Hash{}
s.closed = false
s.clearPendingChannel()
}
......@@ -78,6 +82,10 @@ func (s *channelManager) TxFailed(id txID) {
}
s.metr.RecordBatchTxFailed()
if s.closed && len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 {
s.log.Info("Channel has no submitted transactions, clearing for shutdown", "chID", s.pendingChannel.ID())
s.clearPendingChannel()
}
}
// TxConfirmed marks a transaction as confirmed on L1. Unfortunately even if all frames in
......@@ -179,8 +187,8 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
dataPending := s.pendingChannel != nil && s.pendingChannel.HasFrame()
s.log.Debug("Requested tx data", "l1Head", l1Head, "data_pending", dataPending, "blocks_pending", len(s.blocks))
// Short circuit if there is a pending frame.
if dataPending {
// Short circuit if there is a pending frame or the channel manager is closed.
if dataPending || s.closed {
return s.nextTxData()
}
......@@ -344,3 +352,27 @@ func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo)
SequenceNumber: l1info.SequenceNumber,
}
}
// Close closes the current pending channel, if one exists, outputs any remaining frames,
// and prevents the creation of any new channels.
// Any outputted frames still need to be published.
func (s *channelManager) Close() error {
if s.closed {
return nil
}
s.closed = true
// Any pending state can be proactively cleared if there are no submitted transactions
if len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 {
s.clearPendingChannel()
}
if s.pendingChannel == nil {
return nil
}
s.pendingChannel.Close()
return s.outputFrames()
}
......@@ -363,3 +363,145 @@ func TestChannelManager_TxResend(t *testing.T) {
require.NoError(err)
require.Len(fs, 1)
}
// TestChannelManagerCloseBeforeFirstUse ensures that the channel manager
// will not produce any frames if closed immediately.
func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
TargetFrameSize: 0,
MaxFrameSize: 100,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
})
a, _ := derivetest.RandomL2Block(rng, 4)
m.Close()
err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to contain no tx data")
}
// TestChannelManagerCloseNoPendingChannel ensures that the channel manager
// can gracefully close with no pending channels, and will not emit any new
// channel frames.
func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
require := require.New(t)
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
TargetFrameSize: 0,
MaxFrameSize: 100,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
})
a := newMiniL2Block(0)
b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash())
err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
txdata, err := m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to return valid tx data")
m.TxConfirmed(txdata.ID(), eth.BlockID{})
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected channel manager to EOF")
m.Close()
err = m.AddL2Block(b)
require.NoError(err, "Failed to add L2 block")
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to return no new tx data")
}
// TestChannelManagerCloseNoPendingChannel ensures that the channel manager
// can gracefully close with a pending channel, and will not produce any
// new channel frames after this point.
func TestChannelManagerClosePendingChannel(t *testing.T) {
require := require.New(t)
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
TargetNumFrames: 100,
TargetFrameSize: 1000,
MaxFrameSize: 1000,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
})
a := newMiniL2Block(50_000)
b := newMiniL2BlockWithNumberParent(10, big.NewInt(1), a.Hash())
err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
txdata, err := m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce valid tx data")
m.TxConfirmed(txdata.ID(), eth.BlockID{})
m.Close()
txdata, err = m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce tx data from remaining L2 block data")
m.TxConfirmed(txdata.ID(), eth.BlockID{})
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected channel manager to have no more tx data")
err = m.AddL2Block(b)
require.NoError(err, "Failed to add L2 block")
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data")
}
// TestChannelManagerCloseAllTxsFailed ensures that the channel manager
// can gracefully close after producing transaction frames if none of these
// have successfully landed on chain.
func TestChannelManagerCloseAllTxsFailed(t *testing.T) {
require := require.New(t)
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
TargetNumFrames: 100,
TargetFrameSize: 1000,
MaxFrameSize: 1000,
ApproxComprRatio: 1.0,
ChannelTimeout: 1000,
})
a := newMiniL2Block(50_000)
err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
txdata, err := m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to produce valid tx data")
m.TxFailed(txdata.ID())
// Show that this data will continue to be emitted as long as the transaction
// fails and the channel manager is not closed
txdata, err = m.TxData(eth.BlockID{})
require.NoError(err, "Expected channel manager to re-attempt the failed transaction")
m.TxFailed(txdata.ID())
m.Close()
_, err = m.TxData(eth.BlockID{})
require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data")
}
......@@ -26,10 +26,11 @@ type BatchSubmitter struct {
txMgr txmgr.TxManager
wg sync.WaitGroup
done chan struct{}
ctx context.Context
cancel context.CancelFunc
shutdownCtx context.Context
cancelShutdownCtx context.CancelFunc
killCtx context.Context
cancelKillCtx context.CancelFunc
mutex sync.Mutex
running bool
......@@ -133,10 +134,8 @@ func (l *BatchSubmitter) Start() error {
}
l.running = true
l.done = make(chan struct{})
// TODO: this context only exists because the event loop doesn't reach done
// if the tx manager is blocking forever due to e.g. insufficient balance.
l.ctx, l.cancel = context.WithCancel(context.Background())
l.shutdownCtx, l.cancelShutdownCtx = context.WithCancel(context.Background())
l.killCtx, l.cancelKillCtx = context.WithCancel(context.Background())
l.state.Clear()
l.lastStoredBlock = eth.BlockID{}
......@@ -148,11 +147,11 @@ func (l *BatchSubmitter) Start() error {
return nil
}
func (l *BatchSubmitter) StopIfRunning() {
_ = l.Stop()
func (l *BatchSubmitter) StopIfRunning(ctx context.Context) {
_ = l.Stop(ctx)
}
func (l *BatchSubmitter) Stop() error {
func (l *BatchSubmitter) Stop(ctx context.Context) error {
l.log.Info("Stopping Batch Submitter")
l.mutex.Lock()
......@@ -163,9 +162,18 @@ func (l *BatchSubmitter) Stop() error {
}
l.running = false
l.cancel()
close(l.done)
// go routine will call cancelKill() if the passed in ctx is ever Done
cancelKill := l.cancelKillCtx
wrapped, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
<-wrapped.Done()
cancelKill()
}()
l.cancelShutdownCtx()
l.wg.Wait()
l.cancelKillCtx()
l.log.Info("Batch Submitter stopped")
......@@ -281,14 +289,39 @@ func (l *BatchSubmitter) loop() {
for {
select {
case <-ticker.C:
l.loadBlocksIntoState(l.ctx)
l.loadBlocksIntoState(l.shutdownCtx)
l.publishStateToL1(l.killCtx)
case <-l.shutdownCtx.Done():
l.publishStateToL1(l.killCtx)
return
}
}
}
blockLoop:
// publishStateToL1 loops through the block data loaded into `state` and
// submits the associated data to the L1 in the form of channel frames.
func (l *BatchSubmitter) publishStateToL1(ctx context.Context) {
for {
l1tip, err := l.l1Tip(l.ctx)
// Attempt to gracefully terminate the current channel, ensuring that no new frames will be
// produced. Any remaining frames must still be published to the L1 to prevent stalling.
select {
case <-ctx.Done():
err := l.state.Close()
if err != nil {
l.log.Error("error closing the channel manager", "err", err)
}
case <-l.shutdownCtx.Done():
err := l.state.Close()
if err != nil {
l.log.Error("error closing the channel manager", "err", err)
}
default:
}
l1tip, err := l.l1Tip(ctx)
if err != nil {
l.log.Error("Failed to query L1 tip", "error", err)
break
return
}
l.recordL1Tip(l1tip)
......@@ -296,32 +329,17 @@ func (l *BatchSubmitter) loop() {
txdata, err := l.state.TxData(l1tip.ID())
if err == io.EOF {
l.log.Trace("no transaction data available")
break // local for loop
break
} else if err != nil {
l.log.Error("unable to get tx data", "err", err)
break
}
// Record TX Status
if receipt, err := l.sendTransaction(l.ctx, txdata.Bytes()); err != nil {
if receipt, err := l.sendTransaction(ctx, txdata.Bytes()); err != nil {
l.recordFailedTx(txdata.ID(), err)
} else {
l.recordConfirmedTx(txdata.ID(), receipt)
}
// hack to exit this loop. Proper fix is to do request another send tx or parallel tx sending
// from the channel manager rather than sending the channel in a loop. This stalls b/c if the
// context is cancelled while sending, it will never fully clear the pending txns.
select {
case <-l.ctx.Done():
break blockLoop
default:
}
}
case <-l.done:
return
}
}
}
......
......@@ -6,7 +6,7 @@ import (
type batcherClient interface {
Start() error
Stop() error
Stop(ctx context.Context) error
}
type adminAPI struct {
......@@ -23,6 +23,6 @@ func (a *adminAPI) StartBatcher(_ context.Context) error {
return a.b.Start()
}
func (a *adminAPI) StopBatcher(_ context.Context) error {
return a.b.Stop()
func (a *adminAPI) StopBatcher(ctx context.Context) error {
return a.b.Stop(ctx)
}
......@@ -354,7 +354,9 @@ func TestMigration(t *testing.T) {
}, lgr.New("module", "batcher"), batchermetrics.NoopMetrics)
require.NoError(t, err)
t.Cleanup(func() {
batcher.StopIfRunning()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
batcher.StopIfRunning(ctx)
})
proposer, err := l2os.NewL2OutputSubmitterFromCLIConfig(l2os.CLIConfig{
......
......@@ -220,7 +220,9 @@ func (sys *System) Close() {
sys.L2OutputSubmitter.Stop()
}
if sys.BatchSubmitter != nil {
sys.BatchSubmitter.StopIfRunning()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
sys.BatchSubmitter.StopIfRunning(ctx)
}
for _, node := range sys.RollupNodes {
......
......@@ -1449,7 +1449,7 @@ func TestStopStartBatcher(t *testing.T) {
require.Greater(t, newSeqStatus.SafeL2.Number, seqStatus.SafeL2.Number, "Safe chain did not advance")
// stop the batch submission
err = sys.BatchSubmitter.Stop()
err = sys.BatchSubmitter.Stop(context.Background())
require.Nil(t, err)
// wait for any old safe blocks being submitted / derived
......
......@@ -118,6 +118,13 @@ var (
Usage: "Initialize the sequencer in a stopped state. The sequencer can be started using the admin_startSequencer RPC",
EnvVar: prefixEnvVar("SEQUENCER_STOPPED"),
}
SequencerMaxSafeLagFlag = cli.Uint64Flag{
Name: "sequencer.max-safe-lag",
Usage: "Maximum number of L2 blocks for restricting the distance between L2 safe and unsafe. Disabled if 0.",
EnvVar: prefixEnvVar("SEQUENCER_MAX_SAFE_LAG"),
Required: false,
Value: 0,
}
SequencerL1Confs = cli.Uint64Flag{
Name: "sequencer.l1-confs",
Usage: "Number of L1 blocks to keep distance from the L1 head as a sequencer for picking an L1 origin.",
......@@ -221,6 +228,7 @@ var optionalFlags = []cli.Flag{
VerifierL1Confs,
SequencerEnabledFlag,
SequencerStoppedFlag,
SequencerMaxSafeLagFlag,
SequencerL1Confs,
L1EpochPollIntervalFlag,
RPCEnableAdmin,
......
......@@ -142,6 +142,20 @@ func (_m *ConnectionGater) InterceptUpgraded(_a0 network.Conn) (bool, control.Di
return r0, r1
}
// IsBlocked provides a mock function with given fields: p
func (_m *ConnectionGater) IsBlocked(p peer.ID) bool {
ret := _m.Called(p)
var r0 bool
if rf, ok := ret.Get(0).(func(peer.ID) bool); ok {
r0 = rf(p)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// ListBlockedAddrs provides a mock function with given fields:
func (_m *ConnectionGater) ListBlockedAddrs() []net.IP {
ret := _m.Called()
......
......@@ -13,6 +13,20 @@ type PeerGater struct {
mock.Mock
}
// IsBlocked provides a mock function with given fields: _a0
func (_m *PeerGater) IsBlocked(_a0 peer.ID) bool {
ret := _m.Called(_a0)
var r0 bool
if rf, ok := ret.Get(0).(func(peer.ID) bool); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// Update provides a mock function with given fields: _a0, _a1
func (_m *PeerGater) Update(_a0 peer.ID, _a1 float64) {
_m.Called(_a0, _a1)
......
......@@ -3,7 +3,6 @@ package p2p
import (
log "github.com/ethereum/go-ethereum/log"
peer "github.com/libp2p/go-libp2p/core/peer"
slices "golang.org/x/exp/slices"
)
// ConnectionFactor is the factor by which we multiply the connection score.
......@@ -15,6 +14,7 @@ const PeerScoreThreshold = -100
// gater is an internal implementation of the [PeerGater] interface.
type gater struct {
connGater ConnectionGater
blockedMap map[peer.ID]bool
log log.Logger
banEnabled bool
}
......@@ -25,33 +25,51 @@ type gater struct {
type PeerGater interface {
// Update handles a peer score update and blocks/unblocks the peer if necessary.
Update(peer.ID, float64)
// IsBlocked returns true if the given [peer.ID] is blocked.
IsBlocked(peer.ID) bool
}
// NewPeerGater returns a new peer gater.
func NewPeerGater(connGater ConnectionGater, log log.Logger, banEnabled bool) PeerGater {
return &gater{
connGater: connGater,
blockedMap: make(map[peer.ID]bool),
log: log,
banEnabled: banEnabled,
}
}
// IsBlocked returns true if the given [peer.ID] is blocked.
func (s *gater) IsBlocked(peerID peer.ID) bool {
return s.blockedMap[peerID]
}
// setBlocked sets the blocked status of the given [peer.ID].
func (s *gater) setBlocked(peerID peer.ID, blocked bool) {
s.blockedMap[peerID] = blocked
}
// Update handles a peer score update and blocks/unblocks the peer if necessary.
func (s *gater) Update(id peer.ID, score float64) {
// Check if the peer score is below the threshold
// If so, we need to block the peer
if score < PeerScoreThreshold && s.banEnabled {
isAlreadyBlocked := s.IsBlocked(id)
if score < PeerScoreThreshold && s.banEnabled && !isAlreadyBlocked {
s.log.Warn("peer blocking enabled, blocking peer", "id", id.String(), "score", score)
err := s.connGater.BlockPeer(id)
if err != nil {
s.log.Warn("connection gater failed to block peer", "id", id.String(), "err", err)
}
// Set the peer as blocked in the blocked map
s.setBlocked(id, true)
}
// Unblock peers whose score has recovered to an acceptable level
if (score > PeerScoreThreshold) && slices.Contains(s.connGater.ListBlockedPeers(), id) {
if (score > PeerScoreThreshold) && isAlreadyBlocked {
err := s.connGater.UnblockPeer(id)
if err != nil {
s.log.Warn("connection gater failed to unblock peer", "id", id.String(), "err", err)
}
// Set the peer as unblocked in the blocked map
s.setBlocked(id, false)
}
}
......@@ -37,30 +37,59 @@ func (testSuite *PeerGaterTestSuite) TestPeerScoreConstants() {
}
// TestPeerGaterUpdate tests the peer gater update hook.
func (testSuite *PeerGaterTestSuite) TestPeerGaterUpdate() {
func (testSuite *PeerGaterTestSuite) TestPeerGater_UpdateBansPeers() {
gater := p2p.NewPeerGater(
testSuite.mockGater,
testSuite.logger,
true,
)
// Return an empty list of already blocked peers
testSuite.mockGater.On("ListBlockedPeers").Return([]peer.ID{}).Once()
// Mock a connection gater peer block call
// Since the peer score is below the [PeerScoreThreshold] of -100,
// the [BlockPeer] method should be called
testSuite.mockGater.On("BlockPeer", peer.ID("peer1")).Return(nil)
testSuite.mockGater.On("BlockPeer", peer.ID("peer1")).Return(nil).Once()
// The peer should initially be unblocked
testSuite.False(gater.IsBlocked(peer.ID("peer1")))
// Apply the peer gater update
gater.Update(peer.ID("peer1"), float64(-100))
gater.Update(peer.ID("peer1"), float64(-101))
// The peer should be considered blocked
testSuite.True(gater.IsBlocked(peer.ID("peer1")))
// Now let's unblock the peer
testSuite.mockGater.On("UnblockPeer", peer.ID("peer1")).Return(nil).Once()
gater.Update(peer.ID("peer1"), float64(0))
// The peer should be considered unblocked
testSuite.False(gater.IsBlocked(peer.ID("peer1")))
}
// TestPeerGaterUpdateNoBanning tests the peer gater update hook without banning set
func (testSuite *PeerGaterTestSuite) TestPeerGaterUpdateNoBanning() {
func (testSuite *PeerGaterTestSuite) TestPeerGater_UpdateNoBanning() {
gater := p2p.NewPeerGater(
testSuite.mockGater,
testSuite.logger,
false,
)
// Return an empty list of already blocked peers
testSuite.mockGater.On("ListBlockedPeers").Return([]peer.ID{})
// Notice: [BlockPeer] should not be called since banning is not enabled
// even though the peer score is way below the [PeerScoreThreshold] of -100
gater.Update(peer.ID("peer1"), float64(-100000))
// The peer should be unblocked
testSuite.False(gater.IsBlocked(peer.ID("peer1")))
// Make sure that if we then "unblock" the peer, nothing happens
gater.Update(peer.ID("peer1"), float64(0))
// The peer should still be unblocked
testSuite.False(gater.IsBlocked(peer.ID("peer1")))
}
package derive
import (
"math/big"
"testing"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"
)
var (
// ABI encoding helpers
dynBytes, _ = abi.NewType("bytes", "", nil)
address, _ = abi.NewType("address", "", nil)
uint256T, _ = abi.NewType("uint256", "", nil)
addressArgs = abi.Arguments{
{Type: address},
}
bytesArgs = abi.Arguments{
{Type: dynBytes},
}
twoUint256 = abi.Arguments{
{Type: uint256T},
{Type: uint256T},
}
oneUint256 = abi.Arguments{
{Type: uint256T},
}
)
// TestProcessSystemConfigUpdateLogEvent tests the parsing of an event and mutating the
// SystemConfig. The hook will build the ABI encoded data dynamically. All tests create
// a new SystemConfig and apply a log against it and then assert that the mutated system
// config is equal to the defined system config in the test.
func TestProcessSystemConfigUpdateLogEvent(t *testing.T) {
tests := []struct {
name string
log *types.Log
config eth.SystemConfig
hook func(*testing.T, *types.Log) *types.Log
err bool
}{
{
// The log data is ignored by consensus and no modifications to the
// system config occur.
name: "SystemConfigUpdateUnsafeBlockSigner",
log: &types.Log{
Topics: []common.Hash{
ConfigUpdateEventABIHash,
ConfigUpdateEventVersion0,
SystemConfigUpdateUnsafeBlockSigner,
},
},
hook: func(t *testing.T, log *types.Log) *types.Log {
addr := common.Address{}
data, err := addressArgs.Pack(&addr)
require.NoError(t, err)
log.Data = data
return log
},
config: eth.SystemConfig{},
err: false,
},
{
// The batcher address should be updated.
name: "SystemConfigUpdateBatcher",
log: &types.Log{
Topics: []common.Hash{
ConfigUpdateEventABIHash,
ConfigUpdateEventVersion0,
SystemConfigUpdateBatcher,
},
},
hook: func(t *testing.T, log *types.Log) *types.Log {
addr := common.Address{19: 0xaa}
addrData, err := addressArgs.Pack(&addr)
require.NoError(t, err)
data, err := bytesArgs.Pack(addrData)
require.NoError(t, err)
log.Data = data
return log
},
config: eth.SystemConfig{
BatcherAddr: common.Address{19: 0xaa},
},
err: false,
},
{
// The overhead and the scalar should be updated.
name: "SystemConfigUpdateGasConfig",
log: &types.Log{
Topics: []common.Hash{
ConfigUpdateEventABIHash,
ConfigUpdateEventVersion0,
SystemConfigUpdateGasConfig,
},
},
hook: func(t *testing.T, log *types.Log) *types.Log {
overhead := big.NewInt(0xff)
scalar := big.NewInt(0xaa)
numberData, err := twoUint256.Pack(overhead, scalar)
require.NoError(t, err)
data, err := bytesArgs.Pack(numberData)
require.NoError(t, err)
log.Data = data
return log
},
config: eth.SystemConfig{
Overhead: eth.Bytes32{31: 0xff},
Scalar: eth.Bytes32{31: 0xaa},
},
err: false,
},
{
// The gas limit should be updated.
name: "SystemConfigUpdateGasLimit",
log: &types.Log{
Topics: []common.Hash{
ConfigUpdateEventABIHash,
ConfigUpdateEventVersion0,
SystemConfigUpdateGasLimit,
},
},
hook: func(t *testing.T, log *types.Log) *types.Log {
gasLimit := big.NewInt(0xbb)
numberData, err := oneUint256.Pack(gasLimit)
require.NoError(t, err)
data, err := bytesArgs.Pack(numberData)
require.NoError(t, err)
log.Data = data
return log
},
config: eth.SystemConfig{
GasLimit: 0xbb,
},
err: false,
},
{
name: "SystemConfigOneTopic",
log: &types.Log{
Topics: []common.Hash{
ConfigUpdateEventABIHash,
},
},
hook: func(t *testing.T, log *types.Log) *types.Log {
return log
},
config: eth.SystemConfig{},
err: true,
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
config := eth.SystemConfig{}
err := ProcessSystemConfigUpdateLogEvent(&config, test.hook(t, test.log))
if test.err {
require.Error(t, err)
} else {
require.NoError(t, err)
}
require.Equal(t, config, test.config)
})
}
}
......@@ -16,4 +16,8 @@ type Config struct {
// SequencerStopped is false when the driver should sequence new blocks.
SequencerStopped bool `json:"sequencer_stopped"`
// SequencerMaxSafeLag is the maximum number of L2 blocks for restricting the distance between L2 safe and unsafe.
// Disabled if 0.
SequencerMaxSafeLag uint64 `json:"sequencer_max_safe_lag"`
}
......@@ -212,8 +212,22 @@ func (s *Driver) eventLoop() {
// And avoid sequencing if the derivation pipeline indicates the engine is not ready.
if s.driverConfig.SequencerEnabled && !s.driverConfig.SequencerStopped &&
s.l1State.L1Head() != (eth.L1BlockRef{}) && s.derivation.EngineReady() {
if s.driverConfig.SequencerMaxSafeLag > 0 && s.derivation.SafeL2Head().Number+s.driverConfig.SequencerMaxSafeLag <= s.derivation.UnsafeL2Head().Number {
// If the safe head has fallen behind by a significant number of blocks, delay creating new blocks
// until the safe lag is below SequencerMaxSafeLag.
if sequencerCh != nil {
s.log.Warn(
"Delay creating new block since safe lag exceeds limit",
"safe_l2", s.derivation.SafeL2Head(),
"unsafe_l2", s.derivation.UnsafeL2Head(),
)
sequencerCh = nil
}
} else if s.sequencer.BuildingOnto().ID() != s.derivation.UnsafeL2Head().ID() {
// If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action.
// This may adjust at any time based on fork-choice changes or previous errors.
//
// update sequencer time if the head changed
if s.sequencer.BuildingOnto().ID() != s.derivation.UnsafeL2Head().ID() {
planSequencerAction()
}
} else {
......
......@@ -149,6 +149,7 @@ func NewDriverConfig(ctx *cli.Context) *driver.Config {
SequencerConfDepth: ctx.GlobalUint64(flags.SequencerL1Confs.Name),
SequencerEnabled: ctx.GlobalBool(flags.SequencerEnabledFlag.Name),
SequencerStopped: ctx.GlobalBool(flags.SequencerStoppedFlag.Name),
SequencerMaxSafeLag: ctx.GlobalUint64(flags.SequencerMaxSafeLagFlag.Name),
}
}
......
......@@ -182,6 +182,8 @@ const deployFn: DeployFunction = async (hre) => {
false // do not pause the the OptimismPortal when initializing
)
} else {
// pause the OptimismPortal when initializing
const optimismPortalPaused = true
const tx = await SystemDictator.populateTransaction.updateDynamicConfig(
{
l2OutputOracleStartingBlockNumber:
......@@ -189,9 +191,22 @@ const deployFn: DeployFunction = async (hre) => {
l2OutputOracleStartingTimestamp:
hre.deployConfig.l2OutputOracleStartingTimestamp,
},
true
optimismPortalPaused
)
console.log(`Please update dynamic oracle config...`)
console.log(
JSON.stringify(
{
l2OutputOracleStartingBlockNumber:
hre.deployConfig.l2OutputOracleStartingBlockNumber,
l2OutputOracleStartingTimestamp:
hre.deployConfig.l2OutputOracleStartingTimestamp,
optimismPortalPaused,
},
null,
2
)
)
console.log(`MSD address: ${SystemDictator.address}`)
console.log(`JSON:`)
console.log(jsonifyTransaction(tx))
......
......@@ -23,7 +23,7 @@ make devnet-down # stops the devnet
make devnet-clean # removes the devnet by deleting images and persistent volumes
```
L1 is accessible at `http://localhost:8545`, and L2 is accessible at `http://localhost:8546`.
L1 is accessible at `http://localhost:8545`, and L2 is accessible at `http://localhost:9545`.
Any Ethereum tool - Metamask, `seth`, etc. - can use these endpoints.
Note that you will need to specify the L2 chain ID manually if you use Metamask. The devnet's L2 chain ID is 901.
......@@ -43,7 +43,7 @@ You'll need a `.env` with the following contents:
```bash
L1_PROVIDER_URL=http://localhost:8545
L2_PROVIDER_URL=http://localhost:8546
L2_PROVIDER_URL=http://localhost:9545
PRIVATE_KEY=bf7604d9d3a1c7748642b1b7b05c2bd219c9faa91458b370f85e5a40f3b03af7
```
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment