Commit b6131611 authored by Axel Kingsley's avatar Axel Kingsley Committed by GitHub

supervisor: L1 Processor (#13206)

* supervisor: L1 Processor

* comments ; test fixes

* Make L1 source separate from RPC Addr

* fix test

* Add atomic bool for singleton processor routine
parent 53034d28
......@@ -480,6 +480,7 @@ func (s *interopE2ESystem) prepareSupervisor() *supervisor.SupervisorService {
EnableAdmin: true,
},
L2RPCs: []string{},
L1RPC: s.l1.UserRPC().RPC(),
Datadir: path.Join(s.t.TempDir(), "supervisor"),
}
depSet := make(map[supervisortypes.ChainID]*depset.StaticConfigDependency)
......@@ -536,10 +537,11 @@ func (s *interopE2ESystem) prepare(t *testing.T, w worldResourcePaths) {
s.hdWallet = s.prepareHDWallet()
s.worldDeployment, s.worldOutput = s.prepareWorld(w)
// the supervisor and client are created first so that the L2s can use the supervisor
// L1 first so that the Supervisor and L2s can connect to it
s.beacon, s.l1 = s.prepareL1()
s.supervisor = s.prepareSupervisor()
s.beacon, s.l1 = s.prepareL1()
s.l2s = s.prepareL2s()
s.prepareContracts()
......
......@@ -16,6 +16,7 @@ import (
)
var (
ValidL1RPC = "http://localhost:8545"
ValidL2RPCs = []string{"http;//localhost:8545"}
ValidDatadir = "./supervisor_test_datadir"
)
......@@ -38,7 +39,7 @@ func TestLogLevel(t *testing.T) {
func TestDefaultCLIOptionsMatchDefaultConfig(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs())
depSet := &depset.JsonDependencySetLoader{Path: "test"}
defaultCfgTempl := config.NewConfig(ValidL2RPCs, depSet, ValidDatadir)
defaultCfgTempl := config.NewConfig(ValidL1RPC, ValidL2RPCs, depSet, ValidDatadir)
defaultCfg := *defaultCfgTempl
defaultCfg.Version = Version
require.Equal(t, defaultCfg, *cfg)
......@@ -125,6 +126,7 @@ func toArgList(req map[string]string) []string {
func requiredArgs() map[string]string {
args := map[string]string{
"--l1-rpc": ValidL1RPC,
"--l2-rpcs": ValidL2RPCs[0],
"--dependency-set": "test",
"--datadir": ValidDatadir,
......
......@@ -33,6 +33,8 @@ type Config struct {
// requiring manual triggers for the backend to process anything.
SynchronousProcessors bool
L1RPC string
L2RPCs []string
Datadir string
}
......@@ -56,7 +58,7 @@ func (c *Config) Check() error {
// NewConfig creates a new config using default values whenever possible.
// Required options with no suitable default are passed as parameters.
func NewConfig(l2RPCs []string, depSet depset.DependencySetSource, datadir string) *Config {
func NewConfig(l1RPC string, l2RPCs []string, depSet depset.DependencySetSource, datadir string) *Config {
return &Config{
LogConfig: oplog.DefaultCLIConfig(),
MetricsConfig: opmetrics.DefaultCLIConfig(),
......@@ -64,6 +66,7 @@ func NewConfig(l2RPCs []string, depSet depset.DependencySetSource, datadir strin
RPC: oprpc.DefaultCLIConfig(),
DependencySetSource: depSet,
MockRun: false,
L1RPC: l1RPC,
L2RPCs: l2RPCs,
Datadir: datadir,
}
......
......@@ -67,5 +67,5 @@ func validConfig() *Config {
panic(err)
}
// Should be valid using only the required arguments passed in via the constructor.
return NewConfig([]string{"http://localhost:8545"}, depSet, "./supervisor_testdir")
return NewConfig("http://localhost:8545", []string{"http://localhost:8545"}, depSet, "./supervisor_testdir")
}
......@@ -21,6 +21,11 @@ func prefixEnvVars(name string) []string {
}
var (
L1RPCFlag = &cli.StringFlag{
Name: "l1-rpc",
Usage: "L1 RPC source.",
EnvVars: prefixEnvVars("L1_RPC"),
}
L2RPCsFlag = &cli.StringSliceFlag{
Name: "l2-rpcs",
Usage: "L2 RPC sources.",
......@@ -46,6 +51,7 @@ var (
)
var requiredFlags = []cli.Flag{
L1RPCFlag,
L2RPCsFlag,
DataDirFlag,
DependencySetFlag,
......@@ -86,6 +92,7 @@ func ConfigFromCLI(ctx *cli.Context, version string) *config.Config {
RPC: oprpc.ReadCLIConfig(ctx),
DependencySetSource: &depset.JsonDependencySetLoader{Path: ctx.Path(DependencySetFlag.Name)},
MockRun: ctx.Bool(MockRunFlag.Name),
L1RPC: ctx.String(L1RPCFlag.Name),
L2RPCs: ctx.StringSlice(L2RPCsFlag.Name),
Datadir: ctx.Path(DataDirFlag.Name),
}
......
......@@ -36,6 +36,9 @@ type SupervisorBackend struct {
// chainDBs holds on to the DB indices for each chain
chainDBs *db.ChainsDB
// l1Processor watches for new L1 blocks, updates the local-safe DB, and kicks off derivation orchestration
l1Processor *processors.L1Processor
// chainProcessors are notified of new unsafe blocks, and add the unsafe log events data into the events DB
chainProcessors locks.RWMap[types.ChainID, *processors.ChainProcessor]
......@@ -125,6 +128,14 @@ func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Conf
su.chainProcessors.Set(chainID, chainProcessor)
}
if cfg.L1RPC != "" {
if err := su.attachL1RPC(ctx, cfg.L1RPC); err != nil {
return fmt.Errorf("failed to create L1 processor: %w", err)
}
} else {
su.logger.Warn("No L1 RPC configured, L1 processor will not be started")
}
// the config has some RPC connections to attach to the chain-processors
for _, rpc := range cfg.L2RPCs {
err := su.attachRPC(ctx, rpc)
......@@ -230,6 +241,38 @@ func (su *SupervisorBackend) AttachProcessorSource(chainID types.ChainID, src pr
return nil
}
func (su *SupervisorBackend) attachL1RPC(ctx context.Context, l1RPCAddr string) error {
su.logger.Info("attaching L1 RPC to L1 processor", "rpc", l1RPCAddr)
logger := su.logger.New("l1-rpc", l1RPCAddr)
l1RPC, err := client.NewRPC(ctx, logger, l1RPCAddr)
if err != nil {
return fmt.Errorf("failed to setup L1 RPC: %w", err)
}
l1Client, err := sources.NewL1Client(
l1RPC,
su.logger,
nil,
// placeholder config for the L1
sources.L1ClientSimpleConfig(true, sources.RPCKindBasic, 100))
if err != nil {
return fmt.Errorf("failed to setup L1 Client: %w", err)
}
su.AttachL1Source(l1Client)
return nil
}
// attachL1Source attaches an L1 source to the L1 processor.
// If the L1 processor does not exist, it is created and started.
func (su *SupervisorBackend) AttachL1Source(source processors.L1Source) {
if su.l1Processor == nil {
su.l1Processor = processors.NewL1Processor(su.logger, su.chainDBs, source)
su.l1Processor.Start()
} else {
su.l1Processor.AttachClient(source)
}
}
func clientForL2(ctx context.Context, logger log.Logger, rpc string) (client.RPC, types.ChainID, error) {
ethClient, err := dial.DialEthClientWithTimeout(ctx, 10*time.Second, logger, rpc)
if err != nil {
......@@ -254,6 +297,11 @@ func (su *SupervisorBackend) Start(ctx context.Context) error {
return fmt.Errorf("failed to resume chains db: %w", err)
}
// start the L1 processor if it exists
if su.l1Processor != nil {
su.l1Processor.Start()
}
if !su.synchronousProcessors {
// Make all the chain-processors run automatic background processing
su.chainProcessors.Range(func(_ types.ChainID, processor *processors.ChainProcessor) bool {
......@@ -278,6 +326,12 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error {
return errAlreadyStopped
}
su.logger.Info("Closing supervisor backend")
// stop the L1 processor
if su.l1Processor != nil {
su.l1Processor.Stop()
}
// close all processors
su.chainProcessors.Range(func(id types.ChainID, processor *processors.ChainProcessor) bool {
su.logger.Info("stopping chain processor", "chainID", id)
......
......@@ -62,6 +62,7 @@ func TestBackendLifetime(t *testing.T) {
require.NoError(t, err)
t.Log("initialized!")
l1Src := &testutils.MockL1Source{}
src := &testutils.MockL1Source{}
blockX := eth.BlockRef{
......@@ -77,6 +78,7 @@ func TestBackendLifetime(t *testing.T) {
Time: blockX.Time + 2,
}
b.AttachL1Source(l1Src)
require.NoError(t, b.AttachProcessorSource(chainA, src))
require.FileExists(t, filepath.Join(cfg.Datadir, "900", "log.db"), "must have logs DB 900")
......
......@@ -30,6 +30,32 @@ func (db *ChainsDB) LatestBlockNum(chain types.ChainID) (num uint64, ok bool) {
return logDB.LatestSealedBlockNum()
}
// LastCommonL1 returns the latest common L1 block between all chains in the database.
// it only considers block numbers, not hash. That's because the L1 source is the same for all chains
// this data can be used to determine the starting point for L1 processing
func (db *ChainsDB) LastCommonL1() (types.BlockSeal, error) {
common := types.BlockSeal{}
for _, chain := range db.depSet.Chains() {
ldb, ok := db.localDBs.Get(chain)
if !ok {
return types.BlockSeal{}, types.ErrUnknownChain
}
_, derivedFrom, err := ldb.Latest()
if err != nil {
return types.BlockSeal{}, fmt.Errorf("failed to determine Last Common L1: %w", err)
}
common = derivedFrom
// if the common block isn't yet set,
// or if the new common block is older than the current common block
// set the common block
if common == (types.BlockSeal{}) ||
derivedFrom.Number < common.Number {
common = derivedFrom
}
}
return common, nil
}
func (db *ChainsDB) IsCrossUnsafe(chainID types.ChainID, block eth.BlockID) error {
v, ok := db.crossUnsafe.Get(chainID)
if !ok {
......
package db
import (
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common"
......@@ -86,3 +87,40 @@ func (db *ChainsDB) UpdateFinalizedL1(finalized eth.BlockRef) error {
db.logger.Info("Updated finalized L1", "finalizedL1", finalized)
return nil
}
// RecordNewL1 records a new L1 block in the database.
// it uses the latest derived L2 block as the derived block for the new L1 block.
func (db *ChainsDB) RecordNewL1(ref eth.BlockRef) error {
for _, chain := range db.depSet.Chains() {
// get local derivation database
ldb, ok := db.localDBs.Get(chain)
if !ok {
return fmt.Errorf("cannot RecordNewL1 to chain %s: %w", chain, types.ErrUnknownChain)
}
// get the latest derived and derivedFrom blocks
derivedFrom, derived, err := ldb.Latest()
if err != nil {
return fmt.Errorf("failed to get latest derivedFrom for chain %s: %w", chain, err)
}
// make a ref from the latest derived block
derivedParent, err := ldb.PreviousDerived(derived.ID())
if errors.Is(err, types.ErrFuture) {
db.logger.Warn("Empty DB, Recording first L1 block", "chain", chain, "err", err)
} else if err != nil {
db.logger.Warn("Failed to get latest derivedfrom to insert new L1 block", "chain", chain, "err", err)
return err
}
derivedRef := derived.MustWithParent(derivedParent.ID())
// don't push the new L1 block if it's not newer than the latest derived block
if derivedFrom.Number >= ref.Number {
db.logger.Warn("L1 block has already been processed for this height", "chain", chain, "block", ref, "latest", derivedFrom)
continue
}
// the database is extended with the new L1 and the existing L2
if err = db.UpdateLocalSafe(chain, ref, derivedRef); err != nil {
db.logger.Error("Failed to update local safe", "chain", chain, "block", ref, "derived", derived, "err", err)
return err
}
}
return nil
}
package processors
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/log"
)
type chainsDB interface {
RecordNewL1(ref eth.BlockRef) error
LastCommonL1() (types.BlockSeal, error)
}
type L1Source interface {
L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error)
}
type L1Processor struct {
log log.Logger
client L1Source
clientMu sync.Mutex
running atomic.Bool
currentNumber uint64
tickDuration time.Duration
db chainsDB
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
func NewL1Processor(log log.Logger, cdb chainsDB, client L1Source) *L1Processor {
ctx, cancel := context.WithCancel(context.Background())
return &L1Processor{
client: client,
db: cdb,
log: log.New("service", "l1-processor"),
tickDuration: 6 * time.Second,
ctx: ctx,
cancel: cancel,
}
}
func (p *L1Processor) AttachClient(client L1Source) {
p.clientMu.Lock()
defer p.clientMu.Unlock()
p.client = client
}
func (p *L1Processor) Start() {
// if already running, do nothing
if p.running.Load() {
return
}
p.running.Store(true)
p.currentNumber = 0
// if there is an issue getting the last common L1, default to starting from 0
// consider making this a fatal error in the future once initialization is more robust
if lastL1, err := p.db.LastCommonL1(); err == nil {
p.currentNumber = lastL1.Number
}
p.wg.Add(1)
go p.worker()
}
func (p *L1Processor) Stop() {
// if not running, do nothing
if !p.running.Load() {
return
}
p.cancel()
p.wg.Wait()
p.running.Store(false)
}
// worker runs a loop that checks for new L1 blocks at a regular interval
func (p *L1Processor) worker() {
defer p.wg.Done()
delay := time.NewTicker(p.tickDuration)
for {
select {
case <-p.ctx.Done():
return
case <-delay.C:
p.log.Debug("Checking for new L1 block", "current", p.currentNumber)
err := p.work()
if err != nil {
p.log.Warn("Failed to process L1", "err", err)
}
}
}
}
// work checks for a new L1 block and processes it if found
// the starting point is set when Start is called, and blocks are processed searched incrementally
// if a new block is found, it is recorded in the database and the target number is updated
// in the future it will also kick of derivation management for the sync nodes
func (p *L1Processor) work() error {
p.clientMu.Lock()
defer p.clientMu.Unlock()
nextNumber := p.currentNumber + 1
ref, err := p.client.L1BlockRefByNumber(p.ctx, nextNumber)
if err != nil {
return err
}
// record the new L1 block
p.log.Debug("Processing new L1 block", "block", ref)
err = p.db.RecordNewL1(ref)
if err != nil {
return err
}
// go drive derivation on this new L1 input
// only possible once bidirectional RPC and new derivers are in place
// could do this as a function callback to a more appropriate driver
// update the target number
p.currentNumber = nextNumber
return nil
}
package processors
import (
"context"
"fmt"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
type mockChainsDB struct {
recordNewL1Fn func(ref eth.BlockRef) error
lastCommonL1Fn func() (types.BlockSeal, error)
}
func (m *mockChainsDB) RecordNewL1(ref eth.BlockRef) error {
if m.recordNewL1Fn != nil {
return m.recordNewL1Fn(ref)
}
return nil
}
func (m *mockChainsDB) LastCommonL1() (types.BlockSeal, error) {
if m.lastCommonL1Fn != nil {
return m.lastCommonL1Fn()
}
return types.BlockSeal{}, nil
}
type mockL1BlockRefByNumberFetcher struct {
l1BlockByNumberFn func() (eth.L1BlockRef, error)
}
func (m *mockL1BlockRefByNumberFetcher) L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error) {
if m.l1BlockByNumberFn != nil {
return m.l1BlockByNumberFn()
}
return eth.L1BlockRef{}, nil
}
func TestL1Processor(t *testing.T) {
processorForTesting := func() *L1Processor {
ctx, cancel := context.WithCancel(context.Background())
proc := &L1Processor{
log: testlog.Logger(t, log.LvlInfo),
client: &mockL1BlockRefByNumberFetcher{},
currentNumber: 0,
tickDuration: 1 * time.Second,
db: &mockChainsDB{},
ctx: ctx,
cancel: cancel,
}
return proc
}
t.Run("Initializes LastCommonL1", func(t *testing.T) {
proc := processorForTesting()
proc.db.(*mockChainsDB).lastCommonL1Fn = func() (types.BlockSeal, error) {
return types.BlockSeal{Number: 10}, nil
}
// before starting, the current number should be 0
require.Equal(t, uint64(0), proc.currentNumber)
proc.Start()
defer proc.Stop()
// after starting, the current number should still be 0
require.Equal(t, uint64(10), proc.currentNumber)
})
t.Run("Initializes LastCommonL1 at 0 if error", func(t *testing.T) {
proc := processorForTesting()
proc.db.(*mockChainsDB).lastCommonL1Fn = func() (types.BlockSeal, error) {
return types.BlockSeal{Number: 10}, fmt.Errorf("error")
}
// before starting, the current number should be 0
require.Equal(t, uint64(0), proc.currentNumber)
proc.Start()
defer proc.Stop()
// the error means the current number should still be 0
require.Equal(t, uint64(0), proc.currentNumber)
})
t.Run("Records new L1", func(t *testing.T) {
proc := processorForTesting()
// return a new block number each time
num := uint64(0)
proc.client.(*mockL1BlockRefByNumberFetcher).l1BlockByNumberFn = func() (eth.L1BlockRef, error) {
defer func() { num++ }()
return eth.L1BlockRef{Number: num}, nil
}
// confirm that recordNewL1 is called for each block number received
called := uint64(0)
proc.db.(*mockChainsDB).recordNewL1Fn = func(ref eth.BlockRef) error {
require.Equal(t, called, ref.Number)
called++
return nil
}
proc.Start()
defer proc.Stop()
require.Eventually(t, func() bool {
return called >= 1 && proc.currentNumber >= 1
}, 10*time.Second, 100*time.Millisecond)
})
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment