Commit d4467a1f authored by Axel Kingsley's avatar Axel Kingsley Committed by GitHub

Add Logging ; Fix Broken Maintenance Context (#11931)

* Add Logging ; Fix Broken Maintenance Context

* Remove unneeded timing ; use Testlogger
parent 95765dfc
......@@ -53,6 +53,9 @@ func TestInteropTrivial(t *testing.T) {
expectedBalance, _ := big.NewInt(0).SetString("10000000000000000000000000", 10)
require.Equal(t, expectedBalance, bobBalance)
// sleep for a bit to allow the chain to start
time.Sleep(30 * time.Second)
// send a tx from Alice to Bob
s2.SendL2Tx(
chainA,
......
......@@ -413,7 +413,7 @@ func (s *interopE2ESystem) prepareSupervisor() *supervisor.SupervisorService {
ListenEnabled: false,
},
LogConfig: oplog.CLIConfig{
Level: log.LevelTrace,
Level: log.LevelDebug,
Format: oplog.FormatText,
},
RPC: oprpc.CLIConfig{
......@@ -469,9 +469,10 @@ func (s *interopE2ESystem) prepare(t *testing.T, w worldResourcePaths) {
s.l2s = s.prepareL2s()
// add the L2 RPCs to the supervisor now that the L2s are created
ctx := context.Background()
for _, l2 := range s.l2s {
err := s.SupervisorClient().AddL2RPC(context.Background(), l2.l2Geth.UserRPC().RPC())
require.NoError(s.t, err, "failed to add L2 RPC to supervisor")
err := s.SupervisorClient().AddL2RPC(ctx, l2.l2Geth.UserRPC().RPC())
require.NoError(s.t, err, "failed to add L2 RPC to supervisor", "error", err)
}
}
......
......@@ -21,6 +21,34 @@ func NewSupervisorClient(client client.RPC) *SupervisorClient {
}
}
func (cl *SupervisorClient) Stop(
ctx context.Context,
) error {
var result error
err := cl.client.CallContext(
ctx,
&result,
"admin_stop")
if err != nil {
return fmt.Errorf("failed to stop Supervisor: %w", err)
}
return result
}
func (cl *SupervisorClient) Start(
ctx context.Context,
) error {
var result error
err := cl.client.CallContext(
ctx,
&result,
"admin_start")
if err != nil {
return fmt.Errorf("failed to start Supervisor: %w", err)
}
return result
}
func (cl *SupervisorClient) AddL2RPC(
ctx context.Context,
rpc string,
......
......@@ -53,7 +53,7 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
}
// create the chains db
db := db.NewChainsDB(map[types.ChainID]db.LogStorage{}, headTracker)
db := db.NewChainsDB(map[types.ChainID]db.LogStorage{}, headTracker, logger)
// create an empty map of chain monitors
chainMonitors := make(map[types.ChainID]*source.ChainMonitor, len(cfg.L2RPCs))
......@@ -85,6 +85,7 @@ func (su *SupervisorBackend) addFromRPC(ctx context.Context, logger log.Logger,
if err != nil {
return err
}
su.logger.Info("adding from rpc connection", "rpc", rpc, "chainID", chainID)
// create metrics and a logdb for the chain
cm := newChainMetrics(chainID, su.m)
path, err := prepLogDBPath(chainID, su.dataDir)
......@@ -135,15 +136,17 @@ func (su *SupervisorBackend) Start(ctx context.Context) error {
}
}
// start db maintenance loop
maintinenceCtx, cancel := context.WithCancel(ctx)
maintinenceCtx, cancel := context.WithCancel(context.Background())
su.db.StartCrossHeadMaintenance(maintinenceCtx)
su.maintenanceCancel = cancel
return nil
}
var errAlreadyStopped = errors.New("already stopped")
func (su *SupervisorBackend) Stop(ctx context.Context) error {
if !su.started.CompareAndSwap(true, false) {
return errors.New("already stopped")
return errAlreadyStopped
}
// signal the maintenance loop to stop
su.maintenanceCancel()
......
......@@ -43,12 +43,14 @@ type ChainsDB struct {
logDBs map[types.ChainID]LogStorage
heads HeadsStorage
maintenanceReady chan struct{}
logger log.Logger
}
func NewChainsDB(logDBs map[types.ChainID]LogStorage, heads HeadsStorage) *ChainsDB {
func NewChainsDB(logDBs map[types.ChainID]LogStorage, heads HeadsStorage, l log.Logger) *ChainsDB {
return &ChainsDB{
logDBs: logDBs,
heads: heads,
logger: l,
}
}
......@@ -76,17 +78,21 @@ func (db *ChainsDB) Resume() error {
// for now it does not prevent multiple instances of this process from running
func (db *ChainsDB) StartCrossHeadMaintenance(ctx context.Context) {
go func() {
db.logger.Info("cross-head maintenance loop started")
// run the maintenance loop every 10 seconds for now
ticker := time.NewTicker(time.Second * 10)
ticker := time.NewTicker(time.Second * 1)
for {
select {
case <-ctx.Done():
db.logger.Warn("context cancelled, stopping maintenance loop")
return
case <-ticker.C:
db.logger.Debug("regular maintenance requested")
db.RequestMaintenance()
case <-db.maintenanceReady:
db.logger.Debug("running maintenance")
if err := db.updateAllHeads(); err != nil {
log.Error("failed to update cross-heads", "err", err)
db.logger.Error("failed to update cross-heads", "err", err)
}
}
}
......@@ -184,6 +190,7 @@ func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker Safe
// this allows for the maintenance loop to handle cascading updates
// instead of waiting for the next scheduled update
if updated {
db.logger.Debug("heads were updated, requesting maintenance")
db.RequestMaintenance()
}
return nil
......
......@@ -6,17 +6,19 @@ import (
"testing"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func TestChainsDB_AddLog(t *testing.T) {
t.Run("UnknownChain", func(t *testing.T) {
db := NewChainsDB(nil, &stubHeadStorage{})
db := NewChainsDB(nil, &stubHeadStorage{}, testlog.Logger(t, log.LevelDebug))
err := db.AddLog(types.ChainIDFromUInt64(2), backendTypes.TruncatedHash{}, eth.BlockID{}, 1234, 33, nil)
require.ErrorIs(t, err, ErrUnknownChain)
})
......@@ -26,7 +28,8 @@ func TestChainsDB_AddLog(t *testing.T) {
logDB := &stubLogDB{}
db := NewChainsDB(map[types.ChainID]LogStorage{
chainID: logDB,
}, &stubHeadStorage{})
}, &stubHeadStorage{},
testlog.Logger(t, log.LevelDebug))
err := db.AddLog(chainID, backendTypes.TruncatedHash{}, eth.BlockID{}, 1234, 33, nil)
require.NoError(t, err, err)
require.Equal(t, 1, logDB.addLogCalls)
......@@ -35,7 +38,7 @@ func TestChainsDB_AddLog(t *testing.T) {
func TestChainsDB_Rewind(t *testing.T) {
t.Run("UnknownChain", func(t *testing.T) {
db := NewChainsDB(nil, &stubHeadStorage{})
db := NewChainsDB(nil, &stubHeadStorage{}, testlog.Logger(t, log.LevelDebug))
err := db.Rewind(types.ChainIDFromUInt64(2), 42)
require.ErrorIs(t, err, ErrUnknownChain)
})
......@@ -45,7 +48,8 @@ func TestChainsDB_Rewind(t *testing.T) {
logDB := &stubLogDB{}
db := NewChainsDB(map[types.ChainID]LogStorage{
chainID: logDB,
}, &stubHeadStorage{})
}, &stubHeadStorage{},
testlog.Logger(t, log.LevelDebug))
err := db.Rewind(chainID, 23)
require.NoError(t, err, err)
require.EqualValues(t, 23, logDB.headBlockNum)
......@@ -69,7 +73,8 @@ func TestChainsDB_LastLogInBlock(t *testing.T) {
db := NewChainsDB(
map[types.ChainID]LogStorage{
chainID: logDB},
&stubHeadStorage{h})
&stubHeadStorage{h},
testlog.Logger(t, log.LevelDebug))
// LastLogInBlock is expected to:
// 1. get a block iterator for block 10 (stubbed)
......@@ -98,7 +103,8 @@ func TestChainsDB_LastLogInBlockEOF(t *testing.T) {
db := NewChainsDB(
map[types.ChainID]LogStorage{
chainID: logDB},
&stubHeadStorage{h})
&stubHeadStorage{h},
testlog.Logger(t, log.LevelDebug))
// LastLogInBlock is expected to:
// 1. get a block iterator for block 10 (stubbed)
......@@ -127,7 +133,8 @@ func TestChainsDB_LastLogInBlockNotFound(t *testing.T) {
db := NewChainsDB(
map[types.ChainID]LogStorage{
chainID: logDB},
&stubHeadStorage{h})
&stubHeadStorage{h},
testlog.Logger(t, log.LevelDebug))
// LastLogInBlock is expected to:
// 1. get a block iterator for block 10 (stubbed)
......@@ -154,7 +161,8 @@ func TestChainsDB_LastLogInBlockError(t *testing.T) {
db := NewChainsDB(
map[types.ChainID]LogStorage{
chainID: logDB},
&stubHeadStorage{h})
&stubHeadStorage{h},
testlog.Logger(t, log.LevelDebug))
// LastLogInBlock is expected to:
// 1. get a block iterator for block 10 (stubbed)
......@@ -174,7 +182,8 @@ func TestChainsDB_UpdateCrossHeads(t *testing.T) {
db := NewChainsDB(
map[types.ChainID]LogStorage{
chainID: logDB},
&stubHeadStorage{h})
&stubHeadStorage{h},
testlog.Logger(t, log.LevelDebug))
// Update cross-heads is expected to:
// 1. get a last checkpoint iterator from the logDB (stubbed to be at 15)
......@@ -198,7 +207,8 @@ func TestChainsDB_UpdateCrossHeadsBeyondLocal(t *testing.T) {
db := NewChainsDB(
map[types.ChainID]LogStorage{
chainID: logDB},
&stubHeadStorage{h})
&stubHeadStorage{h},
testlog.Logger(t, log.LevelDebug))
// Update cross-heads is expected to:
// 1. get a last checkpoint iterator from the logDB (stubbed to be at 15)
......@@ -224,7 +234,8 @@ func TestChainsDB_UpdateCrossHeadsEOF(t *testing.T) {
db := NewChainsDB(
map[types.ChainID]LogStorage{
chainID: logDB},
&stubHeadStorage{h})
&stubHeadStorage{h},
testlog.Logger(t, log.LevelDebug))
// Update cross-heads is expected to:
// 1. get a last checkpoint iterator from the logDB (stubbed to be at 15)
......@@ -250,7 +261,8 @@ func TestChainsDB_UpdateCrossHeadsError(t *testing.T) {
db := NewChainsDB(
map[types.ChainID]LogStorage{
chainID: logDB},
&stubHeadStorage{h})
&stubHeadStorage{h},
testlog.Logger(t, log.LevelDebug))
// Update cross-heads is expected to:
// 1. get a last checkpoint iterator from the logDB (stubbed to be at 10)
......
......@@ -4,10 +4,12 @@ import (
"fmt"
"testing"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
......@@ -25,7 +27,7 @@ func TestHeadsForChain(t *testing.T) {
CrossFinalized: entrydb.EntryIdx(6),
}
h.Put(types.ChainIDFromUInt64(1), chainHeads)
chainsDB := NewChainsDB(nil, &stubHeadStorage{h})
chainsDB := NewChainsDB(nil, &stubHeadStorage{h}, testlog.Logger(t, log.LevelDebug))
tcases := []struct {
name string
chainID types.ChainID
......@@ -92,7 +94,7 @@ func TestCheck(t *testing.T) {
types.ChainIDFromUInt64(1): logDB,
}
chainsDB := NewChainsDB(logsStore, &stubHeadStorage{h})
chainsDB := NewChainsDB(logsStore, &stubHeadStorage{h}, testlog.Logger(t, log.LevelDebug))
tcases := []struct {
name string
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment