Commit bda36504 authored by Adrian Sutton's avatar Adrian Sutton Committed by GitHub

op-supervisor: Fix startup from empty database. (#11106)

Add tests for finding the block to start from.
parent b0b8d188
...@@ -5,7 +5,6 @@ import ( ...@@ -5,7 +5,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"math"
"sync/atomic" "sync/atomic"
"time" "time"
...@@ -21,12 +20,18 @@ import ( ...@@ -21,12 +20,18 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
type LogStore interface {
io.Closer
ClosestBlockInfo(blockNum uint64) (uint64, db.TruncatedHash, error)
Rewind(headBlockNum uint64) error
}
type SupervisorBackend struct { type SupervisorBackend struct {
started atomic.Bool started atomic.Bool
logger log.Logger logger log.Logger
chainMonitors []*source.ChainMonitor chainMonitors []*source.ChainMonitor
logDBs []*db.DB logDBs []LogStore
} }
var _ frontend.Backend = (*SupervisorBackend)(nil) var _ frontend.Backend = (*SupervisorBackend)(nil)
...@@ -35,7 +40,7 @@ var _ io.Closer = (*SupervisorBackend)(nil) ...@@ -35,7 +40,7 @@ var _ io.Closer = (*SupervisorBackend)(nil)
func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) { func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) {
chainMonitors := make([]*source.ChainMonitor, len(cfg.L2RPCs)) chainMonitors := make([]*source.ChainMonitor, len(cfg.L2RPCs))
logDBs := make([]*db.DB, len(cfg.L2RPCs)) logDBs := make([]LogStore, len(cfg.L2RPCs))
for i, rpc := range cfg.L2RPCs { for i, rpc := range cfg.L2RPCs {
rpcClient, chainID, err := createRpcClient(ctx, logger, rpc) rpcClient, chainID, err := createRpcClient(ctx, logger, rpc)
if err != nil { if err != nil {
...@@ -52,18 +57,9 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg ...@@ -52,18 +57,9 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
} }
logDBs[i] = logDB logDBs[i] = logDB
// Get the last checkpoint that was written then Rewind the db block, err := Resume(logDB)
// to the block prior to that block and start from there.
// Guarantees we will always roll back at least one block
// so we know we're always starting from a fully written block.
checkPointBlock, _, err := logDB.ClosestBlockInfo(math.MaxUint64)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get block from checkpoint: %w", err) return nil, err
}
block := checkPointBlock - 1
err = logDB.Rewind(block)
if err != nil {
return nil, fmt.Errorf("failed to 'Rewind' the database: %w", err)
} }
monitor, err := source.NewChainMonitor(ctx, logger, cm, chainID, rpc, rpcClient, logDB, block) monitor, err := source.NewChainMonitor(ctx, logger, cm, chainID, rpc, rpcClient, logDB, block)
if err != nil { if err != nil {
......
package backend
import (
"fmt"
"io"
"testing"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/stretchr/testify/require"
)
func TestRecover(t *testing.T) {
tests := []struct {
name string
stubDB *stubLogStore
expectedBlockNum uint64
expectRewoundTo uint64
}{
{
name: "emptydb",
stubDB: &stubLogStore{closestBlockErr: fmt.Errorf("no entries: %w", io.EOF)},
expectedBlockNum: 0,
expectRewoundTo: 0,
},
{
name: "genesis",
stubDB: &stubLogStore{},
expectedBlockNum: 0,
expectRewoundTo: 0,
},
{
name: "with_blocks",
stubDB: &stubLogStore{closestBlockNumber: 15},
expectedBlockNum: 14,
expectRewoundTo: 14,
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
block, err := Resume(test.stubDB)
require.NoError(t, err)
require.Equal(t, test.expectedBlockNum, block)
require.Equal(t, test.expectRewoundTo, test.stubDB.rewoundTo)
})
}
}
type stubLogStore struct {
closestBlockNumber uint64
closestBlockErr error
rewoundTo uint64
}
func (s *stubLogStore) Close() error {
return nil
}
func (s *stubLogStore) ClosestBlockInfo(blockNum uint64) (uint64, db.TruncatedHash, error) {
if s.closestBlockErr != nil {
return 0, db.TruncatedHash{}, s.closestBlockErr
}
return s.closestBlockNumber, db.TruncatedHash{}, nil
}
func (s *stubLogStore) Rewind(headBlockNum uint64) error {
s.rewoundTo = headBlockNum
return nil
}
package backend
import (
"errors"
"fmt"
"io"
"math"
)
// Resume prepares the given LogStore to resume recording events.
// It returns the block number of the last block that is guaranteed to have been fully recorded to the database
// and rewinds the database to ensure it can resume recording from the first log of the next block.
func Resume(logDB LogStore) (uint64, error) {
// Get the last checkpoint that was written then Rewind the db
// to the block prior to that block and start from there.
// Guarantees we will always roll back at least one block
// so we know we're always starting from a fully written block.
checkPointBlock, _, err := logDB.ClosestBlockInfo(math.MaxUint64)
if errors.Is(err, io.EOF) {
// No blocks recorded in the database, start from genesis
return 0, nil
} else if err != nil {
return 0, fmt.Errorf("failed to get block from checkpoint: %w", err)
}
if checkPointBlock == 0 {
return 0, nil
}
block := checkPointBlock - 1
err = logDB.Rewind(block)
if err != nil {
return 0, fmt.Errorf("failed to 'Rewind' the database: %w", err)
}
return block, nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment