Commit a78c074e authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into dependabot/npm_and_yarn/eslint-8.50.0

parents cabc86df fde87ff8
......@@ -25,35 +25,3 @@ func Clamp(start, end *big.Int, size uint64) *big.Int {
func Matcher(num int64) func(*big.Int) bool {
return func(bi *big.Int) bool { return bi.Int64() == num }
}
type Range struct {
Start *big.Int
End *big.Int
}
// Grouped will return a slice of inclusive ranges from (start, end),
// capped to the supplied size from `(start, end)`.
func Grouped(start, end *big.Int, size uint64) []Range {
if end.Cmp(start) < 0 || size == 0 {
return nil
}
bigMaxDiff := big.NewInt(int64(size - 1))
groups := []Range{}
for start.Cmp(end) <= 0 {
diff := new(big.Int).Sub(end, start)
switch {
case diff.Uint64()+1 <= size:
// re-use allocated diff as the next start
groups = append(groups, Range{start, end})
start = diff.Add(end, One)
default:
// re-use allocated diff as the next start
end := new(big.Int).Add(start, bigMaxDiff)
groups = append(groups, Range{start, end})
start = diff.Add(end, One)
}
}
return groups
}
......@@ -27,46 +27,3 @@ func TestClamp(t *testing.T) {
require.False(t, end == result)
require.Equal(t, uint64(5), result.Uint64())
}
func TestGrouped(t *testing.T) {
// base cases
require.Nil(t, Grouped(One, Zero, 1))
require.Nil(t, Grouped(Zero, One, 0))
// Same Start/End
group := Grouped(One, One, 1)
require.Len(t, group, 1)
require.Equal(t, One, group[0].Start)
require.Equal(t, One, group[0].End)
Three, Five := big.NewInt(3), big.NewInt(5)
// One at a time
group = Grouped(One, Three, 1)
require.Equal(t, One, group[0].End)
require.Equal(t, int64(1), group[0].End.Int64())
require.Equal(t, int64(2), group[1].Start.Int64())
require.Equal(t, int64(2), group[1].End.Int64())
require.Equal(t, int64(3), group[2].Start.Int64())
require.Equal(t, int64(3), group[2].End.Int64())
// Split groups
group = Grouped(One, Five, 3)
require.Len(t, group, 2)
require.Equal(t, One, group[0].Start)
require.Equal(t, int64(3), group[0].End.Int64())
require.Equal(t, int64(4), group[1].Start.Int64())
require.Equal(t, Five, group[1].End)
// Encompasses the range
group = Grouped(One, Five, 5)
require.Len(t, group, 1)
require.Equal(t, One, group[0].Start, Zero)
require.Equal(t, Five, group[0].End)
// Size larger than the entire range
group = Grouped(One, Five, 100)
require.Len(t, group, 1)
require.Equal(t, One, group[0].Start, Zero)
require.Equal(t, Five, group[0].End)
}
......@@ -2,8 +2,10 @@ package database
import (
"errors"
"fmt"
"math/big"
"github.com/ethereum-optimism/optimism/indexer/bigint"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
......@@ -51,7 +53,7 @@ type BlocksView interface {
L2BlockHeaderWithFilter(BlockHeader) (*L2BlockHeader, error)
L2LatestBlockHeader() (*L2BlockHeader, error)
LatestEpoch() (*Epoch, error)
LatestObservedEpoch(*big.Int, uint64) (*Epoch, error)
}
type BlocksDB interface {
......@@ -155,36 +157,74 @@ type Epoch struct {
L2BlockHeader L2BlockHeader `gorm:"embedded"`
}
// LatestEpoch return the latest epoch, seen on L1 & L2. In other words
// this returns the latest indexed L1 block that has a corresponding
// indexed L2 block with a matching L1Origin (equal timestamps).
// LatestObservedEpoch return the marker for latest epoch, observed on L1 & L2, within
// the specified bounds. In other words this returns the latest indexed L1 block that has
// a corresponding indexed L2 block with a matching L1Origin (equal timestamps).
//
// If `fromL1Height` (inclusive) is not specified, the search will start from genesis and
// continue all the way to latest indexed heights if `maxL1Range == 0`.
//
// For more, see the protocol spec:
// - https://github.com/ethereum-optimism/optimism/blob/develop/specs/derivation.md
func (db *blocksDB) LatestEpoch() (*Epoch, error) {
latestL1Header, err := db.L1LatestBlockHeader()
if err != nil {
return nil, err
} else if latestL1Header == nil {
return nil, nil
func (db *blocksDB) LatestObservedEpoch(fromL1Height *big.Int, maxL1Range uint64) (*Epoch, error) {
// We use timestamps since that translates to both L1 & L2
var fromTimestamp, toTimestamp uint64
if fromL1Height == nil {
fromL1Height = bigint.Zero
}
latestL2Header, err := db.L2LatestBlockHeader()
if err != nil {
return nil, err
} else if latestL2Header == nil {
return nil, nil
// Lower Bound (the default `fromTimestamp = 0` suffices genesis representation)
if fromL1Height.BitLen() > 0 {
var header L1BlockHeader
result := db.gorm.Where("number = ?", fromL1Height).Take(&header)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
fromTimestamp = header.Timestamp
}
minTime := latestL1Header.Timestamp
if latestL2Header.Timestamp < minTime {
minTime = latestL2Header.Timestamp
// Upper Bound (lowest timestamp indexed between L1/L2 bounded by `maxL1Range`)
{
l1QueryFilter := fmt.Sprintf("timestamp >= %d", fromTimestamp)
if maxL1Range > 0 {
maxHeight := new(big.Int).Add(fromL1Height, big.NewInt(int64(maxL1Range)))
l1QueryFilter = fmt.Sprintf("%s AND number <= %d", l1QueryFilter, maxHeight)
}
var l1Header L1BlockHeader
result := db.gorm.Where(l1QueryFilter).Order("timestamp DESC").Take(&l1Header)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
toTimestamp = l1Header.Timestamp
var l2Header L2BlockHeader
result = db.gorm.Where("timestamp <= ?", toTimestamp).Order("timestamp DESC").Take(&l2Header)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
if l2Header.Timestamp < toTimestamp {
toTimestamp = l2Header.Timestamp
}
}
// This is a faster query than doing an INNER JOIN between l1_block_headers and l2_block_headers
// which requires a full table scan to compute the resulting table.
l1Query := db.gorm.Table("l1_block_headers").Where("timestamp <= ?", minTime)
l2Query := db.gorm.Table("l2_block_headers").Where("timestamp <= ?", minTime)
// Search for the latest indexed epoch within range. This is a faster query than doing an INNER JOIN between
// l1_block_headers and l2_block_headers which requires a full table scan to compute the resulting table.
l1Query := db.gorm.Table("l1_block_headers").Where("timestamp >= ? AND timestamp <= ?", fromTimestamp, toTimestamp)
l2Query := db.gorm.Table("l2_block_headers").Where("timestamp >= ? AND timestamp <= ?", fromTimestamp, toTimestamp)
query := db.gorm.Raw(`SELECT * FROM (?) AS l1_block_headers, (?) AS l2_block_headers
WHERE l1_block_headers.timestamp = l2_block_headers.timestamp
ORDER BY l2_block_headers.number DESC LIMIT 1`, l1Query, l2Query)
......
package database
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/mock"
......@@ -51,7 +53,7 @@ func (m *MockBlocksView) L2LatestBlockHeader() (*L2BlockHeader, error) {
return args.Get(0).(*L2BlockHeader), args.Error(1)
}
func (m *MockBlocksView) LatestEpoch() (*Epoch, error) {
func (m *MockBlocksView) LatestObservedEpoch(*big.Int, uint64) (*Epoch, error) {
args := m.Called()
return args.Get(0).(*Epoch), args.Error(1)
}
......
......@@ -41,21 +41,24 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
dbUser := os.Getenv("DB_USER")
dbName := setupTestDatabase(t)
// Discard the Global Logger as each component
// has its own configured logger
// Rollup System Configuration. Unless specified,
// omit logs emitted by the various components. Maybe
// we can eventually dump these logs to a temp file
log.Root().SetHandler(log.DiscardHandler())
// Rollup System Configuration and Start
opCfg := op_e2e.DefaultSystemConfig(t)
opCfg.DeployConfig.FinalizationPeriodSeconds = 2
if len(os.Getenv("ENABLE_ROLLUP_LOGS")) == 0 {
t.Log("set env 'ENABLE_ROLLUP_LOGS' to show rollup logs")
for name, logger := range opCfg.Loggers {
t.Logf("discarding logs for %s", name)
logger.SetHandler(log.DiscardHandler())
}
}
// Rollup Start
opSys, err := opCfg.Start(t)
require.NoError(t, err)
t.Cleanup(func() { opSys.Close() })
// E2E tests can run on the order of magnitude of minutes. Once
// the system is running, mark this test for Parallel execution
t.Parallel()
// Indexer Configuration and Start
indexerCfg := config.Config{
DB: config.DBConfig{
......@@ -86,8 +89,14 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
MetricsServer: config.ServerConfig{Host: "127.0.0.1", Port: 0},
}
// Emit debug log levels
db, err := database.NewDB(testlog.Logger(t, log.LvlDebug).New("role", "db"), indexerCfg.DB)
// E2E tests can run on the order of magnitude of minutes. Once
// the system is running, mark this test for Parallel execution
t.Parallel()
// provide a DB for the unit test. disable logging
silentLog := testlog.Logger(t, log.LvlInfo)
silentLog.SetHandler(log.DiscardHandler())
db, err := database.NewDB(silentLog, indexerCfg.DB)
require.NoError(t, err)
t.Cleanup(func() { db.Close() })
......@@ -138,7 +147,6 @@ func setupTestDatabase(t *testing.T) string {
User: user,
Password: "",
}
// NewDB will create the database schema
silentLog := log.New()
silentLog.SetHandler(log.DiscardHandler())
......
......@@ -67,6 +67,8 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
// sequencing epoch and corresponding L1 origin that has also been indexed
// serves as this shared marker.
// Fire off independently on startup to check for
// new data or if we've indexed new L1 data.
l1EtlUpdates := b.l1Etl.Notify()
startup := make(chan interface{}, 1)
startup <- nil
......@@ -78,32 +80,45 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
b.log.Info("stopping bridge processor")
return nil
// Fire off independently on startup to check for any
// new data or if we've indexed new L1 data.
// Tickers
case <-startup:
case <-l1EtlUpdates:
}
latestEpoch, err := b.db.Blocks.LatestEpoch()
// In the event where we have a large number of un-observed epochs, we cap the search
// of epochs by 10k. If this turns out to be a bottleneck, we can parallelize the processing
// of epochs to significantly speed up sync times.
maxEpochRange := uint64(10_000)
var lastEpoch *big.Int
if b.LatestL1Header != nil {
lastEpoch = b.LatestL1Header.Number
}
latestEpoch, err := b.db.Blocks.LatestObservedEpoch(lastEpoch, maxEpochRange)
if err != nil {
return err
} else if latestEpoch == nil {
if b.LatestL1Header != nil || b.LatestL2Header != nil {
// Once we have some indexed state `latestEpoch` can never return nil
b.log.Error("bridge events indexed, but no indexed epoch returned", "latest_bridge_l1_block_number", b.LatestL1Header.Number)
return errors.New("bridge events indexed, but no indexed epoch returned")
// Once we have some indexed state `latestEpoch != nil` as `LatestObservedEpoch` is inclusive in its search with the last provided epoch.
b.log.Error("bridge events indexed, but no observed epoch returned", "latest_bridge_l1_block_number", b.LatestL1Header.Number)
return errors.New("bridge events indexed, but no observed epoch returned")
}
b.log.Warn("no indexed epochs available. waiting...")
b.log.Warn("no observed epochs available. waiting...")
continue
}
// Integrity Checks
if b.LatestL1Header != nil && latestEpoch.L1BlockHeader.Hash == b.LatestL1Header.Hash() {
b.log.Warn("all available epochs indexed", "latest_bridge_l1_block_number", b.LatestL1Header.Number)
continue
}
// Integrity Checks
genesisL1Height := big.NewInt(int64(b.chainConfig.L1StartingHeight))
if latestEpoch.L1BlockHeader.Number.Cmp(genesisL1Height) < 0 {
b.log.Error("L1 epoch less than starting L1 height observed", "l1_starting_number", genesisL1Height, "latest_epoch_number", latestEpoch.L1BlockHeader.Number)
return errors.New("L1 epoch less than starting L1 height observed")
}
if b.LatestL1Header != nil && latestEpoch.L1BlockHeader.Number.Cmp(b.LatestL1Header.Number) <= 0 {
b.log.Error("decreasing l1 block height observed", "latest_bridge_l1_block_number", b.LatestL1Header.Number, "latest_epoch_number", latestEpoch.L1BlockHeader.Number)
return errors.New("decreasing l1 block heght observed")
......@@ -116,7 +131,7 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
// Process Bridge Events
toL1Height, toL2Height := latestEpoch.L1BlockHeader.Number, latestEpoch.L2BlockHeader.Number
fromL1Height, fromL2Height := big.NewInt(int64(b.chainConfig.L1StartingHeight)), bigint.Zero
fromL1Height, fromL2Height := genesisL1Height, bigint.Zero
if b.LatestL1Header != nil {
fromL1Height = new(big.Int).Add(b.LatestL1Header.Number, bigint.One)
}
......@@ -128,59 +143,40 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
l2BedrockStartingHeight := big.NewInt(int64(b.chainConfig.L2BedrockStartingHeight))
batchLog := b.log.New("epoch_start_number", fromL1Height, "epoch_end_number", toL1Height)
batchLog.Info("unobserved epochs")
err = b.db.Transaction(func(tx *database.DB) error {
// In the event where we have a large number of un-observed blocks, group the block range
// on the order of 10k blocks at a time. If this turns out to be a bottleneck, we can
// parallelize these operations
maxBlockRange := uint64(10_000)
batchLog.Info("unobserved epochs", "latest_l1_block_number", fromL1Height, "latest_l2_block_number", fromL2Height)
if err := b.db.Transaction(func(tx *database.DB) error {
l1BridgeLog := b.log.New("bridge", "l1")
l2BridgeLog := b.log.New("bridge", "l2")
// FOR OP-MAINNET, OP-GOERLI ONLY! Specially handle the existence of pre-bedrock blocks
if l1BedrockStartingHeight.Cmp(fromL1Height) > 0 {
l1BridgeLog := l1BridgeLog.New("mode", "legacy")
l2BridgeLog := l2BridgeLog.New("mode", "legacy")
legacyFromL1Height, legacyToL1Height := fromL1Height, toL1Height
legacyFromL2Height, legacyToL2Height := fromL2Height, toL2Height
if l1BedrockStartingHeight.Cmp(toL1Height) <= 0 {
legacyToL1Height = new(big.Int).Sub(l1BedrockStartingHeight, big.NewInt(1))
legacyToL2Height = new(big.Int).Sub(l2BedrockStartingHeight, big.NewInt(1))
legacyToL1Height = new(big.Int).Sub(l1BedrockStartingHeight, bigint.One)
legacyToL2Height = new(big.Int).Sub(l2BedrockStartingHeight, bigint.One)
}
l1BridgeLog = l1BridgeLog.New("mode", "legacy", "from_l1_block_number", legacyFromL1Height, "to_l1_block_number", legacyToL1Height)
l1BridgeLog.Info("scanning for bridge events")
l2BridgeLog = l2BridgeLog.New("mode", "legacy", "from_l2_block_number", legacyFromL2Height, "to_l2_block_number", legacyToL2Height)
l2BridgeLog.Info("scanning for bridge events")
// First, find all possible initiated bridge events
l1BlockGroups := bigint.Grouped(legacyFromL1Height, legacyToL1Height, maxBlockRange)
l2BlockGroups := bigint.Grouped(legacyFromL2Height, legacyToL2Height, maxBlockRange)
for _, group := range l1BlockGroups {
log := l1BridgeLog.New("from_l1_block_number", group.Start, "to_l1_block_number", group.End)
log.Info("scanning for initiated bridge events")
if err := bridge.LegacyL1ProcessInitiatedBridgeEvents(log, tx, b.chainConfig.L1Contracts, group.Start, group.End); err != nil {
return err
}
if err := bridge.LegacyL1ProcessInitiatedBridgeEvents(l1BridgeLog, tx, b.chainConfig.L1Contracts, legacyFromL1Height, legacyToL1Height); err != nil {
return err
}
for _, group := range l2BlockGroups {
log := l2BridgeLog.New("from_l2_block_number", group.Start, "to_l2_block_number", group.End)
log.Info("scanning for initiated bridge events")
if err := bridge.LegacyL2ProcessInitiatedBridgeEvents(log, tx, b.chainConfig.L2Contracts, group.Start, group.End); err != nil {
return err
}
if err := bridge.LegacyL2ProcessInitiatedBridgeEvents(l2BridgeLog, tx, b.chainConfig.L2Contracts, legacyFromL2Height, legacyToL2Height); err != nil {
return err
}
// Now that all initiated events have been indexed, it is ensured that all finalization can find their counterpart.
for _, group := range l1BlockGroups {
log := l1BridgeLog.New("from_l1_block_number", group.Start, "to_l1_block_number", group.End)
log.Info("scanning for finalized bridge events")
if err := bridge.LegacyL1ProcessFinalizedBridgeEvents(log, tx, b.l1Etl.EthClient, b.chainConfig.L1Contracts, group.Start, group.End); err != nil {
return err
}
if err := bridge.LegacyL1ProcessFinalizedBridgeEvents(l1BridgeLog, tx, b.l1Etl.EthClient, b.chainConfig.L1Contracts, legacyFromL1Height, legacyToL1Height); err != nil {
return err
}
for _, group := range l2BlockGroups {
log := l2BridgeLog.New("from_l2_block_number", group.Start, "to_l2_block_number", group.End)
log.Info("scanning for finalized bridge events")
if err := bridge.LegacyL2ProcessFinalizedBridgeEvents(log, tx, b.chainConfig.L2Contracts, group.Start, group.End); err != nil {
return err
}
if err := bridge.LegacyL2ProcessFinalizedBridgeEvents(l2BridgeLog, tx, b.chainConfig.L2Contracts, legacyFromL2Height, legacyToL2Height); err != nil {
return err
}
if legacyToL1Height.Cmp(toL1Height) == 0 {
......@@ -193,52 +189,39 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
fromL2Height = l2BedrockStartingHeight
}
l1BridgeLog = l1BridgeLog.New("from_l1_block_number", fromL1Height, "to_l1_block_number", toL1Height)
l1BridgeLog.Info("scanning for bridge events")
l2BridgeLog = l2BridgeLog.New("from_l2_block_number", fromL2Height, "to_l2_block_number", toL2Height)
l2BridgeLog.Info("scanning for bridge events")
// First, find all possible initiated bridge events
l1BlockGroups := bigint.Grouped(fromL1Height, toL1Height, maxBlockRange)
l2BlockGroups := bigint.Grouped(fromL2Height, toL2Height, maxBlockRange)
for _, group := range l1BlockGroups {
log := l1BridgeLog.New("from_block_number", group.Start, "to_block_number", group.End)
log.Info("scanning for initiated bridge events")
if err := bridge.L1ProcessInitiatedBridgeEvents(log, tx, b.chainConfig.L1Contracts, group.Start, group.End); err != nil {
return err
}
if err := bridge.L1ProcessInitiatedBridgeEvents(l1BridgeLog, tx, b.chainConfig.L1Contracts, fromL1Height, toL1Height); err != nil {
return err
}
for _, group := range l2BlockGroups {
log := l2BridgeLog.New("from_block_number", group.Start, "to_block_number", group.End)
log.Info("scanning for initiated bridge events")
if err := bridge.L2ProcessInitiatedBridgeEvents(log, tx, b.chainConfig.L2Contracts, group.Start, group.End); err != nil {
return err
}
if err := bridge.L2ProcessInitiatedBridgeEvents(l2BridgeLog, tx, b.chainConfig.L2Contracts, fromL2Height, toL2Height); err != nil {
return err
}
// Now all finalization events can find their counterpart.
for _, group := range l1BlockGroups {
log := l1BridgeLog.New("from_block_number", group.Start, "to_block_number", group.End)
log.Info("scanning for finalized bridge events")
if err := bridge.L1ProcessFinalizedBridgeEvents(log, tx, b.chainConfig.L1Contracts, group.Start, group.End); err != nil {
return err
}
if err := bridge.L1ProcessFinalizedBridgeEvents(l1BridgeLog, tx, b.chainConfig.L1Contracts, fromL1Height, toL1Height); err != nil {
return err
}
for _, group := range l2BlockGroups {
log := l2BridgeLog.New("from_block_number", group.Start, "to_block_number", group.End)
log.Info("scanning for finalized bridge events")
if err := bridge.L2ProcessFinalizedBridgeEvents(log, tx, b.chainConfig.L2Contracts, group.Start, group.End); err != nil {
return err
}
if err := bridge.L2ProcessFinalizedBridgeEvents(l2BridgeLog, tx, b.chainConfig.L2Contracts, fromL2Height, toL2Height); err != nil {
return err
}
// a-ok
return nil
})
if err != nil {
}); err != nil {
// Try again on a subsequent interval
batchLog.Error("failed to index bridge events", "err", err)
} else {
batchLog.Info("indexed bridge events", "latest_l1_block_number", toL1Height, "latest_l2_block_number", toL2Height)
b.LatestL1Header = latestEpoch.L1BlockHeader.RLPHeader.Header()
b.LatestL2Header = latestEpoch.L2BlockHeader.RLPHeader.Header()
return err
}
batchLog.Info("indexed bridge events", "latest_l1_block_number", toL1Height, "latest_l2_block_number", toL2Height)
b.LatestL1Header = latestEpoch.L1BlockHeader.RLPHeader.Header()
b.LatestL2Header = latestEpoch.L2BlockHeader.RLPHeader.Header()
}
}
......@@ -58,22 +58,27 @@ func TestMonitorGames(t *testing.T) {
go func() {
headerNotSent := true
waitErr := wait.For(context.Background(), 100*time.Millisecond, func() (bool, error) {
for {
if len(sched.scheduled) >= 1 {
return true, nil
break
}
if mockHeadSource.sub == nil {
return false, nil
continue
}
if headerNotSent {
mockHeadSource.sub.headers <- &ethtypes.Header{
select {
case mockHeadSource.sub.headers <- &ethtypes.Header{
Number: big.NewInt(1),
}:
headerNotSent = false
case <-ctx.Done():
break
default:
}
headerNotSent = false
}
return false, nil
})
require.NoError(t, waitErr)
// Just to avoid a tight loop
time.Sleep(100 * time.Millisecond)
}
mockHeadSource.err = fmt.Errorf("eth subscribe test error")
cancel()
}()
......@@ -94,27 +99,29 @@ func TestMonitorGames(t *testing.T) {
defer cancel()
go func() {
headerNotSent := true
waitErr := wait.For(context.Background(), 100*time.Millisecond, func() (bool, error) {
return mockHeadSource.sub != nil, nil
})
require.NoError(t, waitErr)
mockHeadSource.sub.errChan <- fmt.Errorf("test error")
waitErr = wait.For(context.Background(), 100*time.Millisecond, func() (bool, error) {
for {
if len(sched.scheduled) >= 1 {
return true, nil
break
}
if mockHeadSource.sub == nil {
return false, nil
continue
}
if headerNotSent {
mockHeadSource.sub.headers <- &ethtypes.Header{
Number: big.NewInt(1),
}
headerNotSent = false
select {
case mockHeadSource.sub.headers <- &ethtypes.Header{
Number: big.NewInt(1),
}:
case <-ctx.Done():
break
default:
}
return false, nil
})
// Just to avoid a tight loop
time.Sleep(100 * time.Millisecond)
}
require.NoError(t, waitErr)
mockHeadSource.err = fmt.Errorf("eth subscribe test error")
cancel()
......@@ -122,7 +129,7 @@ func TestMonitorGames(t *testing.T) {
err := monitor.MonitorGames(ctx)
require.NoError(t, err)
require.Len(t, sched.scheduled, 1)
require.NotEmpty(t, sched.scheduled) // We might get more than one update scheduled.
require.Equal(t, []common.Address{addr1, addr2}, sched.scheduled[0])
})
}
......
......@@ -504,7 +504,7 @@ func TestBigL2Txs(gt *testing.T) {
if miner.l1GasPool.Gas() < tx.Gas() { // fill the L1 block with batcher txs until we run out of gas
break
}
log.Info("including batcher tx", "nonce", tx)
log.Info("including batcher tx", "nonce", tx.Nonce())
miner.IncludeTx(t, tx)
txs = txs[1:]
}
......
package actions
import (
"context"
"errors"
"time"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/wait"
"github.com/ethereum-optimism/optimism/op-program/client/l2/engineapi"
"github.com/stretchr/testify/require"
......@@ -176,12 +179,22 @@ func (e *L2Engine) ActL2IncludeTx(from common.Address) Action {
return
}
i := e.engineApi.PendingIndices(from)
txs, q := e.eth.TxPool().ContentFrom(from)
require.Greaterf(t, uint64(len(txs)), i,
"no pending txs from %s, and have %d unprocessable queued txs from this account", from, len(q))
var i uint64
var txs []*types.Transaction
var q []*types.Transaction
// Wait for the tx to be in the pending tx queue
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err := wait.For(ctx, time.Second, func() (bool, error) {
i = e.engineApi.PendingIndices(from)
txs, q = e.eth.TxPool().ContentFrom(from)
return uint64(len(txs)) > i, nil
})
require.NoError(t, err,
"no pending txs from %s, and have %d unprocessable queued txs from this account: %w", from, len(q), err)
tx := txs[i]
err := e.engineApi.IncludeTx(tx, from)
err = e.engineApi.IncludeTx(tx, from)
if errors.Is(err, engineapi.ErrNotBuildingBlock) {
t.InvalidAction(err.Error())
} else if errors.Is(err, engineapi.ErrUsesTooMuchGas) {
......
......@@ -197,7 +197,7 @@ func (h *Helper) WaitForGameDataDeletion(ctx context.Context, games ...GameAddr)
if err != nil {
return false, fmt.Errorf("failed to check dir %v is deleted: %w", dir, err)
}
h.t.Errorf("Game data directory %v not yet deleted", dir)
h.t.Logf("Game data directory %v not yet deleted", dir)
return false, nil
}
return true, nil
......
package wait
import (
"context"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum/core/types"
)
// BlockCaller is a subset of the [ethclient.Client] interface
// encompassing methods that query for block information.
type BlockCaller interface {
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
BlockNumber(ctx context.Context) (uint64, error)
}
func ForBlock(ctx context.Context, client BlockCaller, n uint64) error {
for {
if ctx.Done() != nil {
return ctx.Err()
}
height, err := client.BlockNumber(ctx)
if err != nil {
return err
}
if height < n {
time.Sleep(500 * time.Millisecond)
continue
}
break
}
return nil
}
func ForBlockWithTimestamp(ctx context.Context, client BlockCaller, target uint64) error {
_, err := AndGet(ctx, time.Second, func() (uint64, error) {
head, err := client.BlockByNumber(ctx, nil)
if err != nil {
return 0, err
}
return head.Time(), nil
}, func(actual uint64) bool {
return actual >= target
})
return err
}
func ForNextBlock(ctx context.Context, client BlockCaller) error {
current, err := client.BlockNumber(ctx)
// Long timeout so we don't have to care what the block time is. If the test passes this will complete early anyway.
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
if err != nil {
return fmt.Errorf("get starting block number: %w", err)
}
return ForBlock(ctx, client, current+1)
}
......@@ -69,59 +69,25 @@ func printDebugTrace(ctx context.Context, client *ethclient.Client, txHash commo
fmt.Printf("TxTrace: %v\n", trace)
}
func ForBlock(ctx context.Context, client *ethclient.Client, n uint64) error {
func For(ctx context.Context, rate time.Duration, cb func() (bool, error)) error {
tick := time.NewTicker(rate)
defer tick.Stop()
for {
height, err := client.BlockNumber(ctx)
// Perform the first check before any waiting.
done, err := cb()
if err != nil {
return err
}
if height < n {
time.Sleep(500 * time.Millisecond)
continue
}
break
}
return nil
}
func ForBlockWithTimestamp(ctx context.Context, client *ethclient.Client, target uint64) error {
_, err := AndGet(ctx, time.Second, func() (uint64, error) {
head, err := client.BlockByNumber(ctx, nil)
if err != nil {
return 0, err
if done {
return nil
}
return head.Time(), nil
}, func(actual uint64) bool {
return actual >= target
})
return err
}
func ForNextBlock(ctx context.Context, client *ethclient.Client) error {
current, err := client.BlockNumber(ctx)
if err != nil {
return fmt.Errorf("get starting block number: %w", err)
}
return ForBlock(ctx, client, current+1)
}
func For(ctx context.Context, rate time.Duration, cb func() (bool, error)) error {
tick := time.NewTicker(rate)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-tick.C:
done, err := cb()
if err != nil {
return err
}
if done {
return nil
}
// Allow loop to continue for next retry
}
}
}
......
......@@ -5,6 +5,7 @@ import (
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/wait"
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/node"
"github.com/ethereum-optimism/optimism/op-node/sources"
......@@ -34,10 +35,11 @@ func TestStopStartSequencer(t *testing.T) {
require.NoError(t, err)
require.True(t, active, "sequencer should be active")
blockBefore := latestBlock(t, l2Seq)
time.Sleep(time.Duration(cfg.DeployConfig.L2BlockTime+1) * time.Second)
blockAfter := latestBlock(t, l2Seq)
require.Greaterf(t, blockAfter, blockBefore, "Chain did not advance")
require.NoError(
t,
wait.ForNextBlock(ctx, l2Seq),
"Chain did not advance after starting sequencer",
)
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
......@@ -50,9 +52,9 @@ func TestStopStartSequencer(t *testing.T) {
require.NoError(t, err)
require.False(t, active, "sequencer should be inactive")
blockBefore = latestBlock(t, l2Seq)
blockBefore := latestBlock(t, l2Seq)
time.Sleep(time.Duration(cfg.DeployConfig.L2BlockTime+1) * time.Second)
blockAfter = latestBlock(t, l2Seq)
blockAfter := latestBlock(t, l2Seq)
require.Equal(t, blockAfter, blockBefore, "Chain advanced after stopping sequencer")
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
......@@ -66,10 +68,11 @@ func TestStopStartSequencer(t *testing.T) {
require.NoError(t, err)
require.True(t, active, "sequencer should be active again")
blockBefore = latestBlock(t, l2Seq)
time.Sleep(time.Duration(cfg.DeployConfig.L2BlockTime+1) * time.Second)
blockAfter = latestBlock(t, l2Seq)
require.Greater(t, blockAfter, blockBefore, "Chain did not advance after starting sequencer")
require.NoError(
t,
wait.ForNextBlock(ctx, l2Seq),
"Chain did not advance after starting sequencer",
)
}
func TestPersistSequencerStateWhenChanged(t *testing.T) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment