Commit 0f9897b0 authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

op-e2e: Improve WaitForBlock timeouts (#12627)

Update WaitForBlock to maintain two timeouts: a no-change timeout, which fires if the chain's head does not change within a specified window, and an absolute timeout, which fires if the chain's head does not meet or exceed the specified block.

These changes should ideally reduce the number of test flakes we're seeing. Everything takes longer when test executors are under load; by maintaining these two timeouts we can provide longer-running tests with more buffer while retaining the ability to fail fast if the chain gets stuck.

As part of this PR I also refactored the wait method to use polling rather than WebSockets. I've found WebSockets to be unreliable in tests.
parent bf28a04e
...@@ -291,7 +291,7 @@ func (h *FactoryHelper) WaitForBlock(l2Node string, l2BlockNumber uint64, cfg *G ...@@ -291,7 +291,7 @@ func (h *FactoryHelper) WaitForBlock(l2Node string, l2BlockNumber uint64, cfg *G
l2Client := h.System.NodeClient(l2Node) l2Client := h.System.NodeClient(l2Node)
if cfg.allowUnsafe { if cfg.allowUnsafe {
_, err := geth.WaitForBlock(new(big.Int).SetUint64(l2BlockNumber), l2Client, 1*time.Minute) _, err := geth.WaitForBlock(new(big.Int).SetUint64(l2BlockNumber), l2Client, geth.WithAbsoluteTimeout(time.Minute))
h.Require.NoErrorf(err, "Block number %v did not become unsafe", l2BlockNumber) h.Require.NoErrorf(err, "Block number %v did not become unsafe", l2BlockNumber)
} else { } else {
_, err := geth.WaitForBlockToBeSafe(new(big.Int).SetUint64(l2BlockNumber), l2Client, 1*time.Minute) _, err := geth.WaitForBlockToBeSafe(new(big.Int).SetUint64(l2BlockNumber), l2Client, 1*time.Minute)
......
...@@ -17,7 +17,10 @@ import ( ...@@ -17,7 +17,10 @@ import (
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
) )
const errStrTxIdxingInProgress = "transaction indexing is in progress" const (
errStrTxIdxingInProgress = "transaction indexing is in progress"
waitForBlockMaxRetries = 3
)
// errTimeout represents a timeout // errTimeout represents a timeout
var errTimeout = errors.New("timeout") var errTimeout = errors.New("timeout")
...@@ -83,25 +86,80 @@ func WaitForTransaction(hash common.Hash, client *ethclient.Client, timeout time ...@@ -83,25 +86,80 @@ func WaitForTransaction(hash common.Hash, client *ethclient.Client, timeout time
} }
} }
func WaitForBlock(number *big.Int, client *ethclient.Client, timeout time.Duration) (*types.Block, error) { type waitForBlockOptions struct {
ctx, cancel := context.WithTimeout(context.Background(), timeout) noChangeTimeout time.Duration
absoluteTimeout time.Duration
}
func WithNoChangeTimeout(timeout time.Duration) WaitForBlockOption {
return func(o *waitForBlockOptions) {
o.noChangeTimeout = timeout
}
}
func WithAbsoluteTimeout(timeout time.Duration) WaitForBlockOption {
return func(o *waitForBlockOptions) {
o.absoluteTimeout = timeout
}
}
type WaitForBlockOption func(*waitForBlockOptions)
// WaitForBlock waits for the chain to advance to the provided block number. It can be configured with
// two different timeout: an absolute timeout, and a no change timeout. The absolute timeout caps
// the maximum amount of time this method will run. The no change timeout will return an error if the
// block number does not change within that time window. This is useful to bail out early in the event
// of a stuck chain, but allow things to continue if the chain is still advancing.
//
// This function will also retry fetch errors up to three times before returning an error in order to
// protect against transient network problems. This function uses polling rather than websockets.
func WaitForBlock(number *big.Int, client *ethclient.Client, opts ...WaitForBlockOption) (*types.Block, error) {
defaultOpts := &waitForBlockOptions{
noChangeTimeout: 30 * time.Second,
absoluteTimeout: 3 * time.Minute,
}
for _, opt := range opts {
opt(defaultOpts)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultOpts.absoluteTimeout)
defer cancel() defer cancel()
headChan := make(chan *types.Header, 100) lastAdvancement := time.Now()
headSub, err := client.SubscribeNewHead(ctx, headChan) lastBlockNumber := big.NewInt(0)
pollTicker := time.NewTicker(500 * time.Millisecond)
defer pollTicker.Stop()
var errCount int
for {
head, err := client.BlockByNumber(ctx, nil)
if err != nil { if err != nil {
return nil, err errCount++
if errCount >= waitForBlockMaxRetries {
return nil, fmt.Errorf("head fetching exceeded max retries. last error: %w", err)
}
continue
} }
defer headSub.Unsubscribe()
for { errCount = 0
select {
case head := <-headChan: if head.Number().Cmp(number) >= 0 {
if head.Number.Cmp(number) >= 0 {
return client.BlockByNumber(ctx, number) return client.BlockByNumber(ctx, number)
} }
case err := <-headSub.Err():
return nil, fmt.Errorf("error in head subscription: %w", err) if head.Number().Cmp(lastBlockNumber) != 0 {
lastBlockNumber = head.Number()
lastAdvancement = time.Now()
}
if time.Since(lastAdvancement) > defaultOpts.noChangeTimeout {
return nil, fmt.Errorf("block number %d has not changed in %s", lastBlockNumber, defaultOpts.noChangeTimeout)
}
select {
case <-pollTicker.C:
continue
case <-ctx.Done(): case <-ctx.Done():
return nil, ctx.Err() return nil, ctx.Err()
} }
......
...@@ -43,7 +43,7 @@ func TestBatcherConcurrentAltDARequests(t *testing.T) { ...@@ -43,7 +43,7 @@ func TestBatcherConcurrentAltDARequests(t *testing.T) {
l2Seq := sys.NodeClient("sequencer") l2Seq := sys.NodeClient("sequencer")
// we wait for numL1TxsExpected L2 blocks to have been produced, just to make sure the sequencer is working properly // we wait for numL1TxsExpected L2 blocks to have been produced, just to make sure the sequencer is working properly
_, err = geth.WaitForBlock(big.NewInt(numL1TxsExpected), l2Seq, time.Duration(cfg.DeployConfig.L2BlockTime*uint64(numL1TxsExpected))*time.Second) _, err = geth.WaitForBlock(big.NewInt(numL1TxsExpected), l2Seq)
require.NoError(t, err, "Waiting for L2 blocks") require.NoError(t, err, "Waiting for L2 blocks")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
...@@ -59,7 +59,7 @@ func TestBatcherConcurrentAltDARequests(t *testing.T) { ...@@ -59,7 +59,7 @@ func TestBatcherConcurrentAltDARequests(t *testing.T) {
// exceed the number of blocks. // exceed the number of blocks.
checkBlocks := 10 checkBlocks := 10
for i := 0; i < checkBlocks; i++ { for i := 0; i < checkBlocks; i++ {
block, err := geth.WaitForBlock(big.NewInt(int64(startingL1BlockNum)+int64(i)), l1Client, time.Duration(cfg.DeployConfig.L1BlockTime*2)*time.Second) block, err := geth.WaitForBlock(big.NewInt(int64(startingL1BlockNum)+int64(i)), l1Client)
require.NoError(t, err, "Waiting for l1 blocks") require.NoError(t, err, "Waiting for l1 blocks")
// there are possibly other services (proposer/challenger) in the background sending txs // there are possibly other services (proposer/challenger) in the background sending txs
// so we only count the batcher txs // so we only count the batcher txs
......
...@@ -29,7 +29,7 @@ func TestBatcherMultiTx(t *testing.T) { ...@@ -29,7 +29,7 @@ func TestBatcherMultiTx(t *testing.T) {
l1Client := sys.NodeClient("l1") l1Client := sys.NodeClient("l1")
l2Seq := sys.NodeClient("sequencer") l2Seq := sys.NodeClient("sequencer")
_, err = geth.WaitForBlock(big.NewInt(10), l2Seq, time.Duration(cfg.DeployConfig.L2BlockTime*15)*time.Second) _, err = geth.WaitForBlock(big.NewInt(10), l2Seq)
require.NoError(t, err, "Waiting for L2 blocks") require.NoError(t, err, "Waiting for L2 blocks")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
...@@ -47,7 +47,7 @@ func TestBatcherMultiTx(t *testing.T) { ...@@ -47,7 +47,7 @@ func TestBatcherMultiTx(t *testing.T) {
// possible additional L1 blocks will be created before the batcher starts, // possible additional L1 blocks will be created before the batcher starts,
// so we wait additional blocks. // so we wait additional blocks.
for i := int64(0); i < 5; i++ { for i := int64(0); i < 5; i++ {
block, err := geth.WaitForBlock(big.NewInt(int64(l1Number)+i), l1Client, time.Duration(cfg.DeployConfig.L1BlockTime*2)*time.Second) block, err := geth.WaitForBlock(big.NewInt(int64(l1Number)+i), l1Client)
require.NoError(t, err, "Waiting for l1 blocks") require.NoError(t, err, "Waiting for l1 blocks")
// there are possibly other services (proposer/challenger) in the background sending txs // there are possibly other services (proposer/challenger) in the background sending txs
// so we only count the batcher txs // so we only count the batcher txs
......
...@@ -72,7 +72,7 @@ func testStartStopBatcher(t *testing.T, cfgMod func(*e2esys.SystemConfig)) { ...@@ -72,7 +72,7 @@ func testStartStopBatcher(t *testing.T, cfgMod func(*e2esys.SystemConfig)) {
// wait until the block the tx was first included in shows up in the safe chain on the verifier // wait until the block the tx was first included in shows up in the safe chain on the verifier
safeBlockInclusionDuration := time.Duration(6*cfg.DeployConfig.L1BlockTime) * time.Second safeBlockInclusionDuration := time.Duration(6*cfg.DeployConfig.L1BlockTime) * time.Second
_, err = geth.WaitForBlock(receipt.BlockNumber, l2Verif, safeBlockInclusionDuration) _, err = geth.WaitForBlock(receipt.BlockNumber, l2Verif)
require.NoError(t, err, "Waiting for block on verifier") require.NoError(t, err, "Waiting for block on verifier")
require.NoError(t, wait.ForProcessingFullBatch(context.Background(), rollupClient)) require.NoError(t, wait.ForProcessingFullBatch(context.Background(), rollupClient))
...@@ -111,7 +111,7 @@ func testStartStopBatcher(t *testing.T, cfgMod func(*e2esys.SystemConfig)) { ...@@ -111,7 +111,7 @@ func testStartStopBatcher(t *testing.T, cfgMod func(*e2esys.SystemConfig)) {
receipt = sendTx() receipt = sendTx()
// wait until the block the tx was first included in shows up in the safe chain on the verifier // wait until the block the tx was first included in shows up in the safe chain on the verifier
_, err = geth.WaitForBlock(receipt.BlockNumber, l2Verif, safeBlockInclusionDuration) _, err = geth.WaitForBlock(receipt.BlockNumber, l2Verif)
require.NoError(t, err, "Waiting for block on verifier") require.NoError(t, err, "Waiting for block on verifier")
require.NoError(t, wait.ForProcessingFullBatch(context.Background(), rollupClient)) require.NoError(t, wait.ForProcessingFullBatch(context.Background(), rollupClient))
......
...@@ -690,7 +690,7 @@ func (cfg SystemConfig) Start(t *testing.T, startOpts ...StartOption) (*System, ...@@ -690,7 +690,7 @@ func (cfg SystemConfig) Start(t *testing.T, startOpts ...StartOption) (*System,
} }
l1Client := sys.NodeClient(RoleL1) l1Client := sys.NodeClient(RoleL1)
_, err = geth.WaitForBlock(big.NewInt(2), l1Client, 6*time.Second*time.Duration(cfg.DeployConfig.L1BlockTime)) _, err = geth.WaitForBlock(big.NewInt(2), l1Client)
if err != nil { if err != nil {
return nil, fmt.Errorf("waiting for blocks: %w", err) return nil, fmt.Errorf("waiting for blocks: %w", err)
} }
......
...@@ -95,7 +95,7 @@ func TestEIP1559Params(t *testing.T) { ...@@ -95,7 +95,7 @@ func TestEIP1559Params(t *testing.T) {
delta := ((gasTarget - int64(h.GasUsed)) * h.BaseFee.Int64() / gasTarget / int64(expectedDenom)) delta := ((gasTarget - int64(h.GasUsed)) * h.BaseFee.Int64() / gasTarget / int64(expectedDenom))
expectedNextFee := h.BaseFee.Int64() - delta expectedNextFee := h.BaseFee.Int64() - delta
b, err := geth.WaitForBlock(big.NewInt(h.Number.Int64()+1), l2Seq, txTimeoutDuration) b, err := geth.WaitForBlock(big.NewInt(h.Number.Int64()+1), l2Seq)
require.NoError(t, err, "waiting for next L2 block") require.NoError(t, err, "waiting for next L2 block")
require.Equal(t, expectedNextFee, b.Header().BaseFee.Int64()) require.Equal(t, expectedNextFee, b.Header().BaseFee.Int64())
......
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"context" "context"
"math/big" "math/big"
"testing" "testing"
"time"
op_e2e "github.com/ethereum-optimism/optimism/op-e2e" op_e2e "github.com/ethereum-optimism/optimism/op-e2e"
...@@ -90,7 +89,7 @@ func testFees(t *testing.T, cfg e2esys.SystemConfig) { ...@@ -90,7 +89,7 @@ func testFees(t *testing.T, cfg e2esys.SystemConfig) {
l1 := sys.NodeClient("l1") l1 := sys.NodeClient("l1")
// Wait for first block after genesis. The genesis block has zero L1Block values and will throw off the GPO checks // Wait for first block after genesis. The genesis block has zero L1Block values and will throw off the GPO checks
_, err = geth.WaitForBlock(big.NewInt(1), l2Verif, time.Minute) _, err = geth.WaitForBlock(big.NewInt(1), l2Verif)
require.NoError(t, err) require.NoError(t, err)
config := sys.L2Genesis().Config config := sys.L2Genesis().Config
......
...@@ -113,9 +113,9 @@ func TestL1InfoContract(t *testing.T) { ...@@ -113,9 +113,9 @@ func TestL1InfoContract(t *testing.T) {
endVerifBlockNumber := big.NewInt(4) endVerifBlockNumber := big.NewInt(4)
endSeqBlockNumber := big.NewInt(6) endSeqBlockNumber := big.NewInt(6)
endVerifBlock, err := geth.WaitForBlock(endVerifBlockNumber, l2Verif, time.Minute) endVerifBlock, err := geth.WaitForBlock(endVerifBlockNumber, l2Verif)
require.Nil(t, err) require.Nil(t, err)
endSeqBlock, err := geth.WaitForBlock(endSeqBlockNumber, l2Seq, time.Minute) endSeqBlock, err := geth.WaitForBlock(endSeqBlockNumber, l2Seq)
require.Nil(t, err) require.Nil(t, err)
seqL1Info, err := bindings.NewL1Block(cfg.L1InfoPredeployAddress, l2Seq) seqL1Info, err := bindings.NewL1Block(cfg.L1InfoPredeployAddress, l2Seq)
......
...@@ -3,7 +3,6 @@ package p2p ...@@ -3,7 +3,6 @@ package p2p
import ( import (
"math/big" "math/big"
"testing" "testing"
"time"
op_e2e "github.com/ethereum-optimism/optimism/op-e2e" op_e2e "github.com/ethereum-optimism/optimism/op-e2e"
...@@ -31,7 +30,7 @@ func TestTxGossip(t *testing.T) { ...@@ -31,7 +30,7 @@ func TestTxGossip(t *testing.T) {
geth.ConnectP2P(t, seqClient, verifClient) geth.ConnectP2P(t, seqClient, verifClient)
// This prevents the below tx-sending from flaking in CI // This prevents the below tx-sending from flaking in CI
_, err = geth.WaitForBlock(big.NewInt(10), verifClient, time.Minute) _, err = geth.WaitForBlock(big.NewInt(10), verifClient)
require.NoError(t, err) require.NoError(t, err)
// Send a transaction to the verifier and it should be gossiped to the sequencer and included in a block. // Send a transaction to the verifier and it should be gossiped to the sequencer and included in a block.
......
...@@ -40,7 +40,7 @@ func TestL2OutputSubmitterFaultProofs(t *testing.T) { ...@@ -40,7 +40,7 @@ func TestL2OutputSubmitterFaultProofs(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
l2Verif := sys.NodeClient("verifier") l2Verif := sys.NodeClient("verifier")
_, err = geth.WaitForBlock(big.NewInt(6), l2Verif, 10*time.Duration(cfg.DeployConfig.L2BlockTime)*time.Second) _, err = geth.WaitForBlock(big.NewInt(6), l2Verif)
require.Nil(t, err) require.Nil(t, err)
timeoutCh := time.After(15 * time.Second) timeoutCh := time.After(15 * time.Second)
......
...@@ -42,7 +42,7 @@ func TestL2OutputSubmitter(t *testing.T) { ...@@ -42,7 +42,7 @@ func TestL2OutputSubmitter(t *testing.T) {
// for that block and subsequently reorgs to match what the verifier derives when running the // for that block and subsequently reorgs to match what the verifier derives when running the
// reconcillation process. // reconcillation process.
l2Verif := sys.NodeClient("verifier") l2Verif := sys.NodeClient("verifier")
_, err = geth.WaitForBlock(big.NewInt(6), l2Verif, 20*time.Duration(cfg.DeployConfig.L2BlockTime)*time.Second) _, err = geth.WaitForBlock(big.NewInt(6), l2Verif)
require.Nil(t, err) require.Nil(t, err)
// Wait for batch submitter to update L2 output oracle. // Wait for batch submitter to update L2 output oracle.
......
...@@ -48,7 +48,7 @@ func TestMissingBatchE2E(t *testing.T) { ...@@ -48,7 +48,7 @@ func TestMissingBatchE2E(t *testing.T) {
}) })
// Wait until the block it was first included in shows up in the safe chain on the verifier // Wait until the block it was first included in shows up in the safe chain on the verifier
_, err = geth.WaitForBlock(receipt.BlockNumber, l2Verif, time.Duration((sys.RollupConfig.SeqWindowSize+4)*cfg.DeployConfig.L1BlockTime)*time.Second) _, err = geth.WaitForBlock(receipt.BlockNumber, l2Verif)
require.Nil(t, err, "Waiting for block on verifier") require.Nil(t, err, "Waiting for block on verifier")
// Assert that the transaction is not found on the verifier // Assert that the transaction is not found on the verifier
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment