Commit c04cefe0 authored by Evan Richard's avatar Evan Richard Committed by GitHub

op-service, op-batcher, op-proposer: Active sequencer follow mode (#8585)

* op-service: Add ActiveL2EndpointProvider.

* Fix bug in initialization, and handle case where no ethUrls are provided.

* Split active L2 provider into active rollup and active L2 provider.

* Re-duplicate some code until tests are passing.

* op-proposer: Add ability to enable active provider.

* op-batcher: Add ability to enable active provider.

* Add an empty test skeleton.

* Add an empty test skeleton.

* op-service: add, but do not yet use, RollupClientInterface and EthClientInterface.

* op-service: update mocks and interfaces for endpoint provider testing.

* op-service - WIP on Active L2 Providers: unit tests pass, design and impl contains TODOs.

* op-service: restore design in Active Endpoint Providers that only keeps one client open at a time.

* op-service: when dialing a new sequencer, close() the old connection.

* op-service: obey coderabbit suggestion around safer handling of p.currentIndex in Active L2 Providers.

* op-service, op-batcher, op-proposer: address review comments in PR#8585.

* op-service: Active L2 Provider - add test case for a sequencer returning an error.

* op-service: Active L2/Rollup Providers: improve unit testing and logging.

* op-service, op-batcher: address review comments in 8585 regarding first-startup behavior and testing.

* op-service: address review comments through adding more tests, and moving "nil client" behavior from client getter to constructor.

* op-service: minor error message change in active endpoint providers.

* Update op-service/dial/active_l2_provider.go
Co-authored-by: default avatarcoderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* op-service: obey linter in rabbit-provided error message change.

* Update op-service/dial/active_l2_provider.go
Co-authored-by: default avatarSebastian Stammler <seb@oplabs.co>

* op-service active L2 provider tests: assertAllExpectations after most tests.

* op-service: more elegantly handle startup in active l2 providers, and improve testing.

* Change remaining longDurationTests to be able to use ept.assertAllExpectations.

* use new error errSeqUnset.

* Add test for scenario where many sequencers are inactive, and only the last is active.

* Readability change: move the on-creation initialization to its own function.

* Move extra one-time dial to constructor.

* Update op-service/dial/active_rollup_provider.go
Co-authored-by: default avatarcoderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* Add nil check to active l2 provider.

* Update op-service/dial/active_rollup_provider.go
Co-authored-by: default avatarSebastian Stammler <seb@oplabs.co>

* Address review comment: change many-inactive tests to many-undialable tests.

* Add test that reproduces internal state corruption.

* op-service: Improve active seq provider

- Preserve the invariant that the index and current rollup/eth
  client match.
- Dial at the start of the loop instead of at the end.

* Fix some tests.

* Move usage of ExpectClose to MaybeClose, we don't want to enforce a particular close behavior in these tests.

* add a missing call to assertAllExpectations.

* Test even the case where the active providers are managing a list of 1 element.

* Revert experimental hunk in active_l2_provider.

---------
Co-authored-by: default avatarcoderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: default avatarSebastian Stammler <seb@oplabs.co>
parent 078214c1
......@@ -3,6 +3,7 @@ package batcher
import (
"errors"
"fmt"
"strings"
"time"
"github.com/urfave/cli/v2"
......@@ -20,10 +21,10 @@ type CLIConfig struct {
// L1EthRpc is the HTTP provider URL for L1.
L1EthRpc string
// L2EthRpc is the HTTP provider URL for the L2 execution engine.
// L2EthRpc is the HTTP provider URL for the L2 execution engine. A comma-separated list enables the active L2 provider. Such a list needs to match the number of RollupRpcs provided.
L2EthRpc string
// RollupRpc is the HTTP provider URL for the L2 rollup node.
// RollupRpc is the HTTP provider URL for the L2 rollup node. A comma-separated list enables the active L2 provider. Such a list needs to match the number of L2EthRpcs provided.
RollupRpc string
// MaxChannelDuration is the maximum duration (in #L1-blocks) to keep a
......@@ -74,6 +75,9 @@ func (c *CLIConfig) Check() error {
if c.RollupRpc == "" {
return errors.New("empty rollup RPC URL")
}
if strings.Count(c.RollupRpc, ",") != strings.Count(c.L2EthRpc, ",") {
return errors.New("number of rollup and eth URLs must match")
}
if c.PollInterval == 0 {
return errors.New("must set PollInterval")
}
......
......@@ -8,6 +8,7 @@ import (
"net"
_ "net/http/pprof"
"strconv"
"strings"
"sync/atomic"
"time"
......@@ -125,9 +126,16 @@ func (bs *BatcherService) initRPCClients(ctx context.Context, cfg *CLIConfig) er
}
bs.L1Client = l1Client
endpointProvider, err := dial.NewStaticL2EndpointProvider(ctx, bs.Log, cfg.L2EthRpc, cfg.RollupRpc)
var endpointProvider dial.L2EndpointProvider
if strings.Contains(cfg.RollupRpc, ",") && strings.Contains(cfg.L2EthRpc, ",") {
rollupUrls := strings.Split(cfg.RollupRpc, ",")
ethUrls := strings.Split(cfg.L2EthRpc, ",")
endpointProvider, err = dial.NewActiveL2EndpointProvider(ctx, ethUrls, rollupUrls, dial.DefaultActiveSequencerFollowerCheckDuration, dial.DefaultDialTimeout, bs.Log)
} else {
endpointProvider, err = dial.NewStaticL2EndpointProvider(ctx, bs.Log, cfg.L2EthRpc, cfg.RollupRpc)
}
if err != nil {
return fmt.Errorf("failed to create L2 endpoint provider: %w", err)
return fmt.Errorf("failed to build L2 endpoint provider: %w", err)
}
bs.EndpointProvider = endpointProvider
......
......@@ -30,12 +30,12 @@ var (
}
L2EthRpcFlag = &cli.StringFlag{
Name: "l2-eth-rpc",
Usage: "HTTP provider URL for L2 execution engine",
Usage: "HTTP provider URL for L2 execution engine. A comma-separated list enables the active L2 endpoint provider. Such a list needs to match the number of rollup-rpcs provided.",
EnvVars: prefixEnvVars("L2_ETH_RPC"),
}
RollupRpcFlag = &cli.StringFlag{
Name: "rollup-rpc",
Usage: "HTTP provider URL for Rollup node",
Usage: "HTTP provider URL for Rollup node. A comma-separated list enables the active L2 endpoint provider. Such a list needs to match the number of l2-eth-rpcs provided.",
EnvVars: prefixEnvVars("ROLLUP_RPC"),
}
// Optional flags
......
......@@ -29,7 +29,7 @@ var (
}
RollupRpcFlag = &cli.StringFlag{
Name: "rollup-rpc",
Usage: "HTTP provider URL for the rollup node",
Usage: "HTTP provider URL for the rollup node. A comma-separated list enables the active rollup provider.",
EnvVars: prefixEnvVars("ROLLUP_RPC"),
}
L2OOAddressFlag = &cli.StringFlag{
......
......@@ -22,7 +22,7 @@ type CLIConfig struct {
// L1EthRpc is the HTTP provider URL for L1.
L1EthRpc string
// RollupRpc is the HTTP provider URL for the rollup node.
// RollupRpc is the HTTP provider URL for the rollup node. A comma-separated list enables the active rollup provider.
RollupRpc string
// L2OOAddress is the L2OutputOracle contract address.
......
......@@ -7,6 +7,7 @@ import (
"io"
"net"
"strconv"
"strings"
"sync/atomic"
"time"
......@@ -121,7 +122,13 @@ func (ps *ProposerService) initRPCClients(ctx context.Context, cfg *CLIConfig) e
}
ps.L1Client = l1Client
rollupProvider, err := dial.NewStaticL2RollupProvider(ctx, ps.Log, cfg.RollupRpc)
var rollupProvider dial.RollupProvider
if strings.Contains(cfg.RollupRpc, ",") {
rollupUrls := strings.Split(cfg.RollupRpc, ",")
rollupProvider, err = dial.NewActiveL2RollupProvider(ctx, rollupUrls, dial.DefaultActiveSequencerFollowerCheckDuration, dial.DefaultDialTimeout, ps.Log)
} else {
rollupProvider, err = dial.NewStaticL2RollupProvider(ctx, ps.Log, cfg.RollupRpc)
}
if err != nil {
return fmt.Errorf("failed to build L2 endpoint provider: %w", err)
}
......
package dial
import (
"context"
"errors"
"fmt"
"time"
"github.com/ethereum/go-ethereum/log"
)
const DefaultActiveSequencerFollowerCheckDuration = 2 * DefaultDialTimeout
type ethDialer func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (EthClientInterface, error)
// ActiveL2EndpointProvider is an interface for providing a RollupClient and l2 eth client
// It manages the lifecycle of the RollupClient and eth client for callers
// It does this by failing over down the list of rollupUrls if the current one is inactive or broken
type ActiveL2EndpointProvider struct {
ActiveL2RollupProvider
currentEthClient EthClientInterface
ethClientIndex int
ethDialer ethDialer
ethUrls []string
}
// NewActiveL2EndpointProvider creates a new ActiveL2EndpointProvider
// the checkDuration is the duration between checks to see if the current rollup client is active
// provide a checkDuration of 0 to check every time
func NewActiveL2EndpointProvider(ctx context.Context,
ethUrls, rollupUrls []string,
checkDuration time.Duration,
networkTimeout time.Duration,
logger log.Logger,
) (*ActiveL2EndpointProvider, error) {
ethDialer := func(ctx context.Context, timeout time.Duration,
log log.Logger, url string,
) (EthClientInterface, error) {
return DialEthClientWithTimeout(ctx, timeout, log, url)
}
rollupDialer := func(ctx context.Context, timeout time.Duration,
log log.Logger, url string,
) (RollupClientInterface, error) {
return DialRollupClientWithTimeout(ctx, timeout, log, url)
}
return newActiveL2EndpointProvider(ctx, ethUrls, rollupUrls, checkDuration, networkTimeout, logger, ethDialer, rollupDialer)
}
func newActiveL2EndpointProvider(
ctx context.Context,
ethUrls, rollupUrls []string,
checkDuration time.Duration,
networkTimeout time.Duration,
logger log.Logger,
ethDialer ethDialer,
rollupDialer rollupDialer,
) (*ActiveL2EndpointProvider, error) {
if len(rollupUrls) == 0 {
return nil, errors.New("empty rollup urls list, expected at least one URL")
}
if len(ethUrls) != len(rollupUrls) {
return nil, fmt.Errorf("number of eth urls (%d) and rollup urls (%d) mismatch", len(ethUrls), len(rollupUrls))
}
rollupProvider, err := newActiveL2RollupProvider(ctx, rollupUrls, checkDuration, networkTimeout, logger, rollupDialer)
if err != nil {
return nil, err
}
p := &ActiveL2EndpointProvider{
ActiveL2RollupProvider: *rollupProvider,
ethDialer: ethDialer,
ethUrls: ethUrls,
}
cctx, cancel := context.WithTimeout(ctx, networkTimeout)
defer cancel()
if _, err = p.EthClient(cctx); err != nil {
return nil, fmt.Errorf("setting provider eth client: %w", err)
}
return p, nil
}
func (p *ActiveL2EndpointProvider) EthClient(ctx context.Context) (EthClientInterface, error) {
p.clientLock.Lock()
defer p.clientLock.Unlock()
err := p.ensureActiveEndpoint(ctx)
if err != nil {
return nil, err
}
if p.ethClientIndex != p.rollupIndex || p.currentEthClient == nil {
// we changed sequencers, dial a new EthClient
cctx, cancel := context.WithTimeout(ctx, p.networkTimeout)
defer cancel()
idx := p.rollupIndex
ep := p.ethUrls[idx]
log.Info("sequencer changed (or ethClient was nil due to startup), dialing new eth client", "new_index", idx, "new_url", ep)
ethClient, err := p.ethDialer(cctx, p.networkTimeout, p.log, ep)
if err != nil {
return nil, fmt.Errorf("dialing eth client: %w", err)
}
if p.currentEthClient != nil {
p.currentEthClient.Close()
}
p.ethClientIndex = idx
p.currentEthClient = ethClient
}
return p.currentEthClient, nil
}
func (p *ActiveL2EndpointProvider) Close() {
if p.currentEthClient != nil {
p.currentEthClient.Close()
}
p.ActiveL2RollupProvider.Close()
}
package dial
import (
"context"
"fmt"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
// endpointProviderTest is a test harness for setting up endpoint provider tests.
type endpointProviderTest struct {
t *testing.T
rollupClients []*testutils.MockRollupClient
ethClients []*testutils.MockEthClient
rollupDialOutcomes map[int]bool // true for success, false for failure
ethDialOutcomes map[int]bool // true for success, false for failure
}
// setupEndpointProviderTest sets up the basic structure of the endpoint provider tests.
func setupEndpointProviderTest(t *testing.T, numSequencers int) *endpointProviderTest {
ept := &endpointProviderTest{
t: t,
rollupClients: make([]*testutils.MockRollupClient, numSequencers),
ethClients: make([]*testutils.MockEthClient, numSequencers),
rollupDialOutcomes: make(map[int]bool),
ethDialOutcomes: make(map[int]bool),
}
for i := 0; i < numSequencers; i++ {
ept.rollupClients[i] = new(testutils.MockRollupClient)
ept.ethClients[i] = new(testutils.MockEthClient)
ept.rollupDialOutcomes[i] = true // by default, all dials succeed
ept.ethDialOutcomes[i] = true // by default, all dials succeed
}
return ept
}
// newActiveL2EndpointProvider constructs a new ActiveL2RollupProvider using the test harness setup.
func (et *endpointProviderTest) newActiveL2RollupProvider(checkDuration time.Duration) (*ActiveL2RollupProvider, error) {
mockRollupDialer := func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (RollupClientInterface, error) {
for i, client := range et.rollupClients {
if url == fmt.Sprintf("rollup%d", i) {
if !et.rollupDialOutcomes[i] {
return nil, fmt.Errorf("simulated dial failure for rollup %d", i)
}
return client, nil
}
}
return nil, fmt.Errorf("unknown test url: %s", url)
}
// make the "URLs"
rollupUrls := make([]string, len(et.rollupClients))
for i := range et.rollupClients {
rollupUrl := fmt.Sprintf("rollup%d", i)
rollupUrls[i] = rollupUrl
}
return newActiveL2RollupProvider(
context.Background(),
rollupUrls,
checkDuration,
1*time.Minute,
testlog.Logger(et.t, log.LvlDebug),
mockRollupDialer,
)
}
// newActiveL2EndpointProvider constructs a new ActiveL2EndpointProvider using the test harness setup.
func (et *endpointProviderTest) newActiveL2EndpointProvider(checkDuration time.Duration) (*ActiveL2EndpointProvider, error) {
mockRollupDialer := func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (RollupClientInterface, error) {
for i, client := range et.rollupClients {
if url == fmt.Sprintf("rollup%d", i) {
if !et.rollupDialOutcomes[i] {
return nil, fmt.Errorf("simulated dial failure for rollup %d", i)
}
return client, nil
}
}
return nil, fmt.Errorf("unknown test url: %s", url)
}
mockEthDialer := func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (EthClientInterface, error) {
for i, client := range et.ethClients {
if url == fmt.Sprintf("eth%d", i) {
if !et.ethDialOutcomes[i] {
return nil, fmt.Errorf("simulated dial failure for eth %d", i)
}
return client, nil
}
}
return nil, fmt.Errorf("unknown test url: %s", url)
}
// make the "URLs"
rollupUrls := make([]string, len(et.rollupClients))
for i := range et.rollupClients {
rollupUrl := fmt.Sprintf("rollup%d", i)
rollupUrls[i] = rollupUrl
}
ethUrls := make([]string, len(et.ethClients))
for i := range et.ethClients {
ethUrl := fmt.Sprintf("eth%d", i)
ethUrls[i] = ethUrl
}
return newActiveL2EndpointProvider(
context.Background(),
ethUrls,
rollupUrls,
checkDuration,
1*time.Minute,
testlog.Logger(et.t, log.LvlDebug),
mockEthDialer,
mockRollupDialer,
)
}
func (et *endpointProviderTest) assertAllExpectations(t *testing.T) {
for _, sequencer := range et.rollupClients {
sequencer.AssertExpectations(t)
}
for _, ethClient := range et.ethClients {
ethClient.AssertExpectations(t)
}
}
func (et *endpointProviderTest) setRollupDialOutcome(index int, success bool) {
et.rollupDialOutcomes[index] = success
}
// TestRollupProvider_FailoverOnInactiveSequencer verifies that the ActiveL2RollupProvider
// will switch to the next provider if the current one becomes inactive.
func TestRollupProvider_FailoverOnInactiveSequencer(t *testing.T) {
ept := setupEndpointProviderTest(t, 2)
primarySequencer, secondarySequencer := ept.rollupClients[0], ept.rollupClients[1]
primarySequencer.ExpectSequencerActive(true, nil) // respond true once on creation
primarySequencer.ExpectSequencerActive(true, nil) // respond true again when the test calls `RollupClient()` the first time
rollupProvider, err := ept.newActiveL2RollupProvider(0)
require.NoError(t, err)
firstSequencerUsed, err := rollupProvider.RollupClient(context.Background())
require.NoError(t, err)
require.Same(t, primarySequencer, firstSequencerUsed)
primarySequencer.ExpectSequencerActive(false, nil) // become inactive after that
primarySequencer.MaybeClose()
secondarySequencer.ExpectSequencerActive(true, nil)
secondSequencerUsed, err := rollupProvider.RollupClient(context.Background())
require.NoError(t, err)
require.Same(t, secondarySequencer, secondSequencerUsed)
ept.assertAllExpectations(t)
}
// TestEndpointProvider_FailoverOnInactiveSequencer verifies that the ActiveL2EndpointProvider
// will switch to the next provider if the current one becomes inactive.
func TestEndpointProvider_FailoverOnInactiveSequencer(t *testing.T) {
// as TestActiveSequencerFailoverBehavior_RollupProviders,
// but ensure the added `EthClient()` method also triggers the failover.
ept := setupEndpointProviderTest(t, 2)
primarySequencer, secondarySequencer := ept.rollupClients[0], ept.rollupClients[1]
primarySequencer.ExpectSequencerActive(true, nil) // primary sequencer gets hit once on creation: embedded call of `RollupClient()`
primarySequencer.ExpectSequencerActive(true, nil) // primary sequencer gets hit twice on creation: implicit call of `EthClient()`
primarySequencer.ExpectSequencerActive(true, nil) // respond true again when the test calls `EthClient()` the first time
activeProvider, err := ept.newActiveL2EndpointProvider(0)
require.NoError(t, err)
firstSequencerUsed, err := activeProvider.EthClient(context.Background())
require.NoError(t, err)
require.Same(t, ept.ethClients[0], firstSequencerUsed)
primarySequencer.ExpectSequencerActive(false, nil) // become inactive after that
secondarySequencer.ExpectSequencerActive(true, nil)
primarySequencer.MaybeClose()
ept.ethClients[0].MaybeClose() // we close the ethclient when we switch over to the next sequencer
secondSequencerUsed, err := activeProvider.EthClient(context.Background())
require.NoError(t, err)
require.Same(t, ept.ethClients[1], secondSequencerUsed)
ept.assertAllExpectations(t)
}
// TestRollupProvider_FailoverOnErroredSequencer verifies that the ActiveL2RollupProvider
// will switch to the next provider if the current one returns an error.
func TestRollupProvider_FailoverOnErroredSequencer(t *testing.T) {
ept := setupEndpointProviderTest(t, 2)
primarySequencer, secondarySequencer := ept.rollupClients[0], ept.rollupClients[1]
primarySequencer.ExpectSequencerActive(true, nil) // respond true once on creation
primarySequencer.ExpectSequencerActive(true, nil) // respond true again when the test calls `RollupClient()` the first time
rollupProvider, err := ept.newActiveL2RollupProvider(0)
require.NoError(t, err)
firstSequencerUsed, err := rollupProvider.RollupClient(context.Background())
require.NoError(t, err)
require.Same(t, primarySequencer, firstSequencerUsed)
primarySequencer.ExpectSequencerActive(true, fmt.Errorf("a test error")) // error-out after that
primarySequencer.MaybeClose()
secondarySequencer.ExpectSequencerActive(true, nil)
secondSequencerUsed, err := rollupProvider.RollupClient(context.Background())
require.NoError(t, err)
require.Same(t, secondarySequencer, secondSequencerUsed)
ept.assertAllExpectations(t)
}
// TestEndpointProvider_FailoverOnErroredSequencer verifies that the ActiveL2EndpointProvider
// will switch to the next provider if the current one returns an error.
func TestEndpointProvider_FailoverOnErroredSequencer(t *testing.T) {
ept := setupEndpointProviderTest(t, 2)
primarySequencer, secondarySequencer := ept.rollupClients[0], ept.rollupClients[1]
primaryEthClient, secondaryEthClient := ept.ethClients[0], ept.ethClients[1]
primarySequencer.ExpectSequencerActive(true, nil) // primary sequencer gets hit once on creation: embedded call of `RollupClient()`
primarySequencer.ExpectSequencerActive(true, nil) // primary sequencer gets hit twice on creation: implicit call of `EthClient()`
activeProvider, err := ept.newActiveL2EndpointProvider(0)
require.NoError(t, err)
primarySequencer.ExpectSequencerActive(true, nil) // respond true again when the test calls `EthClient()` the first time
firstSequencerUsed, err := activeProvider.EthClient(context.Background())
require.NoError(t, err)
require.Same(t, primaryEthClient, firstSequencerUsed)
primarySequencer.ExpectSequencerActive(true, fmt.Errorf("a test error")) // error out after that
primarySequencer.MaybeClose()
primaryEthClient.MaybeClose()
secondarySequencer.ExpectSequencerActive(true, nil)
secondSequencerUsed, err := activeProvider.EthClient(context.Background())
require.NoError(t, err)
require.Same(t, secondaryEthClient, secondSequencerUsed)
ept.assertAllExpectations(t)
}
// TestRollupProvider_NoExtraCheckOnActiveSequencer verifies that the ActiveL2RollupProvider
// does not change if the current sequencer is active.
func TestRollupProvider_NoExtraCheckOnActiveSequencer(t *testing.T) {
ept := setupEndpointProviderTest(t, 2)
primarySequencer := ept.rollupClients[0]
primarySequencer.ExpectSequencerActive(true, nil) // default test provider, which always checks, checks Active on creation
rollupProvider, err := ept.newActiveL2RollupProvider(0)
require.NoError(t, err)
require.Same(t, primarySequencer, rollupProvider.currentRollupClient)
primarySequencer.ExpectSequencerActive(true, nil) // default test provider, which always checks, checks again on RollupClient()
firstSequencerUsed, err := rollupProvider.RollupClient(context.Background())
require.NoError(t, err)
require.Same(t, primarySequencer, firstSequencerUsed)
ept.assertAllExpectations(t)
}
// TestEndpointProvider_NoExtraCheckOnActiveSequencer verifies that the ActiveL2EndpointProvider
// does not change if the current sequencer is active.
func TestEndpointProvider_NoExtraCheckOnActiveSequencer(t *testing.T) {
ept := setupEndpointProviderTest(t, 2)
primarySequencer := ept.rollupClients[0]
primarySequencer.ExpectSequencerActive(true, nil) // default test provider, which always checks, checks Active twice on creation (once for internal RollupClient() call)
primarySequencer.ExpectSequencerActive(true, nil) // default test provider, which always checks, checks Active twice on creation (once for internal EthClient() call)
endpointProvider, err := ept.newActiveL2EndpointProvider(0)
require.NoError(t, err)
require.Same(t, ept.ethClients[0], endpointProvider.currentEthClient)
primarySequencer.ExpectSequencerActive(true, nil) // default test provider, which always checks, checks again on EthClient()
firstEthClientUsed, err := endpointProvider.EthClient(context.Background())
require.NoError(t, err)
require.Same(t, ept.ethClients[0], firstEthClientUsed)
ept.assertAllExpectations(t)
}
// TestRollupProvider_FailoverAndReturn verifies the ActiveL2RollupProvider's ability to
// failover and then return to the primary sequencer once it becomes active again.
func TestRollupProvider_FailoverAndReturn(t *testing.T) {
ept := setupEndpointProviderTest(t, 2)
primarySequencer, secondarySequencer := ept.rollupClients[0], ept.rollupClients[1]
// Primary initially active
primarySequencer.ExpectSequencerActive(true, nil)
rollupProvider, err := ept.newActiveL2RollupProvider(0)
require.NoError(t, err)
// Primary becomes inactive, secondary active
primarySequencer.ExpectSequencerActive(false, nil)
primarySequencer.MaybeClose()
secondarySequencer.ExpectSequencerActive(true, nil)
// Fails over to secondary
secondSequencerUsed, err := rollupProvider.RollupClient(context.Background())
require.NoError(t, err)
require.Same(t, secondarySequencer, secondSequencerUsed)
// Primary becomes active again, secondary becomes inactive
primarySequencer.ExpectSequencerActive(true, nil)
secondarySequencer.ExpectSequencerActive(false, nil)
secondarySequencer.MaybeClose()
// Should return to primary
thirdSequencerUsed, err := rollupProvider.RollupClient(context.Background())
require.NoError(t, err)
require.Same(t, primarySequencer, thirdSequencerUsed)
ept.assertAllExpectations(t)
}
// TestEndpointProvider_FailoverAndReturn verifies the ActiveL2EndpointProvider's ability to
// failover and then return to the primary sequencer once it becomes active again.
func TestEndpointProvider_FailoverAndReturn(t *testing.T) {
ept := setupEndpointProviderTest(t, 2)
primarySequencer, secondarySequencer := ept.rollupClients[0], ept.rollupClients[1]
// Primary initially active
primarySequencer.ExpectSequencerActive(true, nil)
primarySequencer.ExpectSequencerActive(true, nil) // see comment in other tests about why we expect this twice
endpointProvider, err := ept.newActiveL2EndpointProvider(0)
require.NoError(t, err)
// Primary becomes inactive, secondary active
primarySequencer.ExpectSequencerActive(false, nil)
primarySequencer.MaybeClose()
ept.ethClients[0].MaybeClose()
secondarySequencer.ExpectSequencerActive(true, nil)
// Fails over to secondary
secondEthClient, err := endpointProvider.EthClient(context.Background())
require.NoError(t, err)
require.Same(t, ept.ethClients[1], secondEthClient)
// Primary becomes active again, secondary becomes inactive
primarySequencer.ExpectSequencerActive(true, nil)
secondarySequencer.ExpectSequencerActive(false, nil)
secondarySequencer.MaybeClose()
ept.ethClients[1].MaybeClose()
// // Should return to primary
thirdSequencerUsed, err := endpointProvider.EthClient(context.Background())
require.NoError(t, err)
require.Same(t, ept.ethClients[0], thirdSequencerUsed)
ept.assertAllExpectations(t)
}
// TestRollupProvider_InitialActiveSequencerSelection verifies that the ActiveL2RollupProvider
// selects the active sequencer correctly at the time of creation.
func TestRollupProvider_InitialActiveSequencerSelection(t *testing.T) {
ept := setupEndpointProviderTest(t, 2)
primarySequencer := ept.rollupClients[0]
// Primary active at creation
primarySequencer.ExpectSequencerActive(true, nil)
rollupProvider, err := ept.newActiveL2RollupProvider(0)
require.NoError(t, err)
// Check immediately after creation without additional Active check
require.Same(t, primarySequencer, rollupProvider.currentRollupClient)
ept.assertAllExpectations(t)
}
// TestEndpointProvider_InitialActiveSequencerSelection verifies that the ActiveL2EndpointProvider
// selects the active sequencer correctly at the time of creation.
func TestEndpointProvider_InitialActiveSequencerSelection(t *testing.T) {
ept := setupEndpointProviderTest(t, 2)
primarySequencer := ept.rollupClients[0]
// Primary active at creation
primarySequencer.ExpectSequencerActive(true, nil)
primarySequencer.ExpectSequencerActive(true, nil) // see comment in other tests about why we expect this twice
rollupProvider, err := ept.newActiveL2EndpointProvider(0)
require.NoError(t, err)
// Check immediately after creation without additional Active check
require.Same(t, primarySequencer, rollupProvider.currentRollupClient)
ept.assertAllExpectations(t)
}
// TestRollupProvider_SelectSecondSequencerIfFirstInactiveAtCreation verifies that if the first sequencer
// is inactive at the time of ActiveL2RollupProvider creation, the second active sequencer is chosen.
func TestRollupProvider_SelectSecondSequencerIfFirstInactiveAtCreation(t *testing.T) {
ept := setupEndpointProviderTest(t, 2)
// First sequencer is inactive, second sequencer is active
ept.rollupClients[0].ExpectSequencerActive(false, nil)
ept.rollupClients[0].MaybeClose()
ept.rollupClients[1].ExpectSequencerActive(true, nil)
rollupProvider, err := ept.newActiveL2RollupProvider(0)
require.NoError(t, err)
require.Same(t, ept.rollupClients[1], rollupProvider.currentRollupClient)
ept.assertAllExpectations(t)
}
// TestRollupProvider_SelectLastSequencerIfManyOfflineAtCreation verifies that if all but the last sequencer
// are offline at the time of ActiveL2RollupProvider creation, the last active sequencer is chosen.
func TestRollupProvider_SelectLastSequencerIfManyOfflineAtCreation(t *testing.T) {
ept := setupEndpointProviderTest(t, 5)
// First four sequencers are dead, last sequencer is active
for i := 0; i < 4; i++ {
ept.setRollupDialOutcome(i, false)
}
ept.rollupClients[4].ExpectSequencerActive(true, nil)
rollupProvider, err := ept.newActiveL2RollupProvider(0)
require.NoError(t, err)
require.Same(t, ept.rollupClients[4], rollupProvider.currentRollupClient)
ept.assertAllExpectations(t)
}
// TestEndpointProvider_SelectSecondSequencerIfFirstOfflineAtCreation verifies that if the first sequencer
// is inactive at the time of ActiveL2EndpointProvider creation, the second active sequencer is chosen.
func TestEndpointProvider_SelectSecondSequencerIfFirstOfflineAtCreation(t *testing.T) {
ept := setupEndpointProviderTest(t, 2)
// First sequencer is inactive, second sequencer is active
ept.rollupClients[0].ExpectSequencerActive(false, nil)
ept.rollupClients[0].MaybeClose()
ept.rollupClients[1].ExpectSequencerActive(true, nil)
ept.rollupClients[1].ExpectSequencerActive(true, nil) // see comment in other tests about why we expect this twice
endpointProvider, err := ept.newActiveL2EndpointProvider(0)
require.NoError(t, err)
require.Same(t, ept.ethClients[1], endpointProvider.currentEthClient)
ept.assertAllExpectations(t)
}
// TestEndpointProvider_SelectLastSequencerIfManyInactiveAtCreation verifies that if all but the last sequencer
// are inactive at the time of ActiveL2EndpointProvider creation, the last active sequencer is chosen.
func TestEndpointProvider_SelectLastSequencerIfManyInactiveAtCreation(t *testing.T) {
ept := setupEndpointProviderTest(t, 5)
// First four sequencers are dead, last sequencer is active
for i := 0; i < 4; i++ {
ept.setRollupDialOutcome(i, false)
}
ept.rollupClients[4].ExpectSequencerActive(true, nil)
ept.rollupClients[4].ExpectSequencerActive(true, nil) // Double check due to embedded call of `EthClient()`
endpointProvider, err := ept.newActiveL2EndpointProvider(0)
require.NoError(t, err)
require.Same(t, ept.ethClients[4], endpointProvider.currentEthClient)
ept.assertAllExpectations(t)
}
// TestRollupProvider_ConstructorErrorOnFirstSequencerOffline verifies that the ActiveL2RollupProvider
// constructor handles the case where the first sequencer (index 0) is offline at startup.
func TestRollupProvider_ConstructorErrorOnFirstSequencerOffline(t *testing.T) {
ept := setupEndpointProviderTest(t, 2)
// First sequencer is dead, second sequencer is active
ept.rollupClients[0].ExpectSequencerActive(false, fmt.Errorf("I am offline"))
ept.rollupClients[0].MaybeClose()
ept.rollupClients[1].ExpectSequencerActive(true, nil)
rollupProvider, err := ept.newActiveL2RollupProvider(0)
require.NoError(t, err)
require.Same(t, ept.rollupClients[1], rollupProvider.currentRollupClient)
ept.assertAllExpectations(t)
}
// TestEndpointProvider_ConstructorErrorOnFirstSequencerOffline verifies that the ActiveL2EndpointProvider
// constructor handles the case where the first sequencer (index 0) is offline at startup.
func TestEndpointProvider_ConstructorErrorOnFirstSequencerOffline(t *testing.T) {
ept := setupEndpointProviderTest(t, 2)
// First sequencer is dead, second sequencer is active
ept.rollupClients[0].ExpectSequencerActive(false, fmt.Errorf("I am offline"))
ept.rollupClients[0].MaybeClose()
ept.rollupClients[1].ExpectSequencerActive(true, nil)
ept.rollupClients[1].ExpectSequencerActive(true, nil) // see comment in other tests about why we expect this twice
endpointProvider, err := ept.newActiveL2EndpointProvider(0)
require.NoError(t, err)
require.Same(t, ept.ethClients[1], endpointProvider.currentEthClient)
ept.assertAllExpectations(t)
}
// TestRollupProvider_FailOnAllInactiveSequencers verifies that the ActiveL2RollupProvider
// fails to be created when all sequencers are inactive.
func TestRollupProvider_FailOnAllInactiveSequencers(t *testing.T) {
ept := setupEndpointProviderTest(t, 2)
// All sequencers are inactive
for _, sequencer := range ept.rollupClients {
sequencer.ExpectSequencerActive(false, nil)
sequencer.MaybeClose()
}
_, err := ept.newActiveL2RollupProvider(0)
require.Error(t, err) // Expect an error as all sequencers are inactive
ept.assertAllExpectations(t)
}
// TestEndpointProvider_FailOnAllInactiveSequencers verifies that the ActiveL2EndpointProvider
// fails to be created when all sequencers are inactive.
func TestEndpointProvider_FailOnAllInactiveSequencers(t *testing.T) {
ept := setupEndpointProviderTest(t, 2)
// All sequencers are inactive
for _, sequencer := range ept.rollupClients {
sequencer.ExpectSequencerActive(false, nil)
sequencer.MaybeClose()
}
_, err := ept.newActiveL2EndpointProvider(0)
require.Error(t, err) // Expect an error as all sequencers are inactive
ept.assertAllExpectations(t)
}
// TestRollupProvider_FailOnAllErroredSequencers verifies that the ActiveL2RollupProvider
// fails to create when all sequencers return an error.
func TestRollupProvider_FailOnAllErroredSequencers(t *testing.T) {
ept := setupEndpointProviderTest(t, 2)
// All sequencers are inactive
for _, sequencer := range ept.rollupClients {
sequencer.ExpectSequencerActive(true, fmt.Errorf("a test error"))
sequencer.MaybeClose()
}
_, err := ept.newActiveL2RollupProvider(0)
require.Error(t, err) // Expect an error as all sequencers are inactive
ept.assertAllExpectations(t)
}
// TestEndpointProvider_FailOnAllErroredSequencers verifies that the ActiveL2EndpointProvider
// fails to create when all sequencers return an error.
func TestEndpointProvider_FailOnAllErroredSequencers(t *testing.T) {
ept := setupEndpointProviderTest(t, 2)
// All sequencers are inactive
for _, sequencer := range ept.rollupClients {
sequencer.ExpectSequencerActive(true, fmt.Errorf("a test error"))
sequencer.MaybeClose()
}
_, err := ept.newActiveL2EndpointProvider(0)
require.Error(t, err) // Expect an error as all sequencers are inactive
ept.assertAllExpectations(t)
}
// TestRollupProvider_LongCheckDuration verifies the behavior of ActiveL2RollupProvider with a long check duration.
func TestRollupProvider_LongCheckDuration(t *testing.T) {
ept := setupEndpointProviderTest(t, 2)
primarySequencer := ept.rollupClients[0]
longCheckDuration := 1 * time.Hour
primarySequencer.ExpectSequencerActive(true, nil) // Active check on creation
rollupProvider, err := ept.newActiveL2RollupProvider(longCheckDuration)
require.NoError(t, err)
// Should return the same client without extra checks
firstSequencerUsed, err := rollupProvider.RollupClient(context.Background())
require.NoError(t, err)
require.Same(t, primarySequencer, firstSequencerUsed)
secondSequencerUsed, err := rollupProvider.RollupClient(context.Background())
require.NoError(t, err)
require.Same(t, primarySequencer, secondSequencerUsed)
ept.assertAllExpectations(t)
}
// TestEndpointProvider_LongCheckDuration verifies the behavior of ActiveL2EndpointProvider with a long check duration.
func TestEndpointProvider_LongCheckDuration(t *testing.T) {
ept := setupEndpointProviderTest(t, 2)
primarySequencer := ept.rollupClients[0]
longCheckDuration := 1 * time.Hour
primarySequencer.ExpectSequencerActive(true, nil) // Active check on creation
endpointProvider, err := ept.newActiveL2EndpointProvider(longCheckDuration)
require.NoError(t, err)
// Should return the same client without extra checks
firstEthClient, err := endpointProvider.EthClient(context.Background())
require.NoError(t, err)
require.Same(t, ept.ethClients[0], firstEthClient)
secondEthClient, err := endpointProvider.EthClient(context.Background())
require.NoError(t, err)
require.Same(t, ept.ethClients[0], secondEthClient)
ept.assertAllExpectations(t)
}
// TestRollupProvider_ErrorWhenAllSequencersInactive verifies that RollupClient() returns an error
// if all sequencers become inactive after the provider is successfully created.
func TestRollupProvider_ErrorWhenAllSequencersInactive(t *testing.T) {
ept := setupEndpointProviderTest(t, 2)
ept.rollupClients[0].ExpectSequencerActive(true, nil) // Main sequencer initially active
rollupProvider, err := ept.newActiveL2RollupProvider(0)
require.NoError(t, err)
// All sequencers become inactive
for _, sequencer := range ept.rollupClients {
sequencer.ExpectSequencerActive(false, nil)
sequencer.MaybeClose()
}
_, err = rollupProvider.RollupClient(context.Background())
require.Error(t, err) // Expect an error as all sequencers are inactive
ept.assertAllExpectations(t)
}
// TestEndpointProvider_ErrorWhenAllSequencersInactive verifies that EthClient() returns an error
// if all sequencers become inactive after the provider is successfully created.
func TestEndpointProvider_ErrorWhenAllSequencersInactive(t *testing.T) {
ept := setupEndpointProviderTest(t, 2)
ept.rollupClients[0].ExpectSequencerActive(true, nil) // Main sequencer initially active
ept.rollupClients[0].ExpectSequencerActive(true, nil) // Main sequencer initially active (double check due to embedded call of `EthClient()`)
endpointProvider, err := ept.newActiveL2EndpointProvider(0)
require.NoError(t, err)
// All sequencers become inactive
for _, sequencer := range ept.rollupClients {
sequencer.ExpectSequencerActive(false, nil)
sequencer.MaybeClose()
}
_, err = endpointProvider.EthClient(context.Background())
require.Error(t, err) // Expect an error as all sequencers are inactive
ept.assertAllExpectations(t)
}
// TestRollupProvider_ReturnsSameSequencerOnInactiveWithLongCheckDuration verifies that the ActiveL2RollupProvider
// still returns the same sequencer across calls even if it becomes inactive, due to a long check duration.
func TestRollupProvider_ReturnsSameSequencerOnInactiveWithLongCheckDuration(t *testing.T) {
ept := setupEndpointProviderTest(t, 2)
primarySequencer := ept.rollupClients[0]
longCheckDuration := 1 * time.Hour
primarySequencer.ExpectSequencerActive(true, nil) // Active on creation
rollupProvider, err := ept.newActiveL2RollupProvider(longCheckDuration)
require.NoError(t, err)
// Primary sequencer becomes inactive, but the provider won't check immediately due to longCheckDuration
primarySequencer.ExpectSequencerActive(false, nil)
firstSequencerUsed, err := rollupProvider.RollupClient(context.Background())
require.NoError(t, err)
require.Same(t, primarySequencer, firstSequencerUsed)
active, err := primarySequencer.SequencerActive(context.Background())
require.NoError(t, err)
require.False(t, active)
secondSequencerUsed, err := rollupProvider.RollupClient(context.Background())
require.NoError(t, err)
require.Same(t, primarySequencer, secondSequencerUsed)
ept.assertAllExpectations(t)
}
// TestEndpointProvider_ReturnsSameSequencerOnInactiveWithLongCheckDuration verifies that the ActiveL2EndpointProvider
// still returns the same sequencer across calls even if it becomes inactive, due to a long check duration.
func TestEndpointProvider_ReturnsSameSequencerOnInactiveWithLongCheckDuration(t *testing.T) {
ept := setupEndpointProviderTest(t, 2)
primarySequencer := ept.rollupClients[0]
longCheckDuration := 1 * time.Hour
primarySequencer.ExpectSequencerActive(true, nil) // Active on creation
endpointProvider, err := ept.newActiveL2EndpointProvider(longCheckDuration)
require.NoError(t, err)
// Primary sequencer becomes inactive, but the provider won't check immediately due to longCheckDuration
primarySequencer.ExpectSequencerActive(false, nil)
firstEthClientUsed, err := endpointProvider.EthClient(context.Background())
require.NoError(t, err)
require.Same(t, ept.ethClients[0], firstEthClientUsed)
active, err := primarySequencer.SequencerActive(context.Background())
require.NoError(t, err)
require.False(t, active)
secondEthClientUsed, err := endpointProvider.EthClient(context.Background())
require.NoError(t, err)
require.Same(t, ept.ethClients[0], secondEthClientUsed)
ept.assertAllExpectations(t)
}
// TestRollupProvider_HandlesManyIndexClientMismatch verifies that the ActiveL2RollupProvider avoids
// the case where the index of the current sequencer does not match the index of the current rollup client.
func TestRollupProvider_HandlesManyIndexClientMismatch(t *testing.T) {
ept := setupEndpointProviderTest(t, 3)
seq0, seq1, seq2 := ept.rollupClients[0], ept.rollupClients[1], ept.rollupClients[2]
// "start happy": primarySequencer is active on creation
seq0.ExpectSequencerActive(true, nil) // active on creation
rollupProvider, err := ept.newActiveL2RollupProvider(0)
require.NoError(t, err)
// primarySequencer goes down
seq0.ExpectSequencerActive(false, fmt.Errorf("I'm offline now"))
seq0.MaybeClose()
ept.setRollupDialOutcome(0, false) // primarySequencer fails to dial
// secondarySequencer is inactive, but online
seq1.ExpectSequencerActive(false, nil)
seq1.MaybeClose()
// tertiarySequencer can't even be dialed
ept.setRollupDialOutcome(2, false)
// In a prior buggy implementation, this scenario lead to an internal inconsistent state
// where the current client didn't match the index. On a subsequent try, this led to the
// active sequencer at 0 to be skipped entirely, while the sequencer at index 1
// was checked twice.
rollupClient, err := rollupProvider.RollupClient(context.Background())
require.Error(t, err)
require.Nil(t, rollupClient)
// internal state would now be inconsistent in a buggy impl.
// now seq0 is dialable and active
ept.setRollupDialOutcome(0, true)
seq0.ExpectSequencerActive(true, nil)
seq0.MaybeClose()
// now seq1 and seq2 are dialable, but inactive
ept.setRollupDialOutcome(1, true)
seq1.ExpectSequencerActive(false, nil)
seq1.MaybeClose()
ept.setRollupDialOutcome(2, true)
seq2.ExpectSequencerActive(false, nil)
seq2.MaybeClose()
// this would trigger the prior bug: request the rollup client.
rollupClient, err = rollupProvider.RollupClient(context.Background())
require.NoError(t, err)
require.Same(t, seq0, rollupClient)
ept.assertAllExpectations(t)
}
// TestRollupProvider_HandlesSingleSequencer verifies that the ActiveL2RollupProvider
// can handle being passed a single sequencer endpoint without issue.
func TestRollupProvider_HandlesSingleSequencer(t *testing.T) {
ept := setupEndpointProviderTest(t, 1)
onlySequencer := ept.rollupClients[0]
onlySequencer.ExpectSequencerActive(true, nil) // respond true once on creation
rollupProvider, err := ept.newActiveL2RollupProvider(0)
require.NoError(t, err)
onlySequencer.ExpectSequencerActive(true, nil) // respond true again when the test calls `RollupClient()` the first time
firstSequencerUsed, err := rollupProvider.RollupClient(context.Background())
require.NoError(t, err)
require.Same(t, onlySequencer, firstSequencerUsed)
onlySequencer.ExpectSequencerActive(false, nil) // become inactive after that
onlySequencer.MaybeClose()
secondSequencerUsed, err := rollupProvider.RollupClient(context.Background())
require.Error(t, err)
require.Nil(t, secondSequencerUsed)
ept.assertAllExpectations(t)
}
// TestEndpointProvider_HandlesSingleSequencer verifies that the ActiveL2EndpointProvider
// can handle being passed a single sequencer endpoint without issue.
func TestEndpointProvider_HandlesSingleSequencer(t *testing.T) {
ept := setupEndpointProviderTest(t, 1)
onlySequencer := ept.rollupClients[0]
onlySequencer.ExpectSequencerActive(true, nil) // respond true once on creation
onlySequencer.ExpectSequencerActive(true, nil) // respond true again when the constructor calls `RollupClient()`
endpointProvider, err := ept.newActiveL2EndpointProvider(0)
require.NoError(t, err)
onlySequencer.ExpectSequencerActive(true, nil) // respond true a once more on fall-through check in `EthClient()`
firstEthClientUsed, err := endpointProvider.EthClient(context.Background())
require.NoError(t, err)
require.Same(t, ept.ethClients[0], firstEthClientUsed)
onlySequencer.ExpectSequencerActive(false, nil) // become inactive after that
onlySequencer.MaybeClose()
secondEthClientUsed, err := endpointProvider.EthClient(context.Background())
require.Error(t, err)
require.Nil(t, secondEthClientUsed)
ept.assertAllExpectations(t)
}
package dial
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
)
type rollupDialer func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (RollupClientInterface, error)
// ActiveL2EndpointProvider is an interface for providing a RollupClient
// It manages the lifecycle of the RollupClient for callers
// It does this by failing over down the list of rollupUrls if the current one is inactive or broken
type ActiveL2RollupProvider struct {
checkDuration time.Duration
networkTimeout time.Duration
log log.Logger
activeTimeout time.Time
rollupUrls []string
rollupDialer rollupDialer
currentRollupClient RollupClientInterface
rollupIndex int
clientLock *sync.Mutex
}
// NewActiveL2RollupProvider creates a new ActiveL2RollupProvider
// the checkDuration is the duration between checks to see if the current rollup client is active
// provide a checkDuration of 0 to check every time
func NewActiveL2RollupProvider(
ctx context.Context,
rollupUrls []string,
checkDuration time.Duration,
networkTimeout time.Duration,
logger log.Logger,
) (*ActiveL2RollupProvider, error) {
rollupDialer := func(ctx context.Context, timeout time.Duration,
log log.Logger, url string,
) (RollupClientInterface, error) {
return DialRollupClientWithTimeout(ctx, timeout, log, url)
}
return newActiveL2RollupProvider(ctx, rollupUrls, checkDuration, networkTimeout, logger, rollupDialer)
}
func newActiveL2RollupProvider(
ctx context.Context,
rollupUrls []string,
checkDuration time.Duration,
networkTimeout time.Duration,
logger log.Logger,
dialer rollupDialer,
) (*ActiveL2RollupProvider, error) {
if len(rollupUrls) == 0 {
return nil, errors.New("empty rollup urls list")
}
p := &ActiveL2RollupProvider{
checkDuration: checkDuration,
networkTimeout: networkTimeout,
log: logger,
rollupUrls: rollupUrls,
rollupDialer: dialer,
clientLock: &sync.Mutex{},
}
cctx, cancel := context.WithTimeout(ctx, networkTimeout)
defer cancel()
if _, err := p.RollupClient(cctx); err != nil {
return nil, fmt.Errorf("setting provider rollup client: %w", err)
}
return p, nil
}
func (p *ActiveL2RollupProvider) RollupClient(ctx context.Context) (RollupClientInterface, error) {
p.clientLock.Lock()
defer p.clientLock.Unlock()
err := p.ensureActiveEndpoint(ctx)
if err != nil {
return nil, err
}
return p.currentRollupClient, nil
}
func (p *ActiveL2RollupProvider) ensureActiveEndpoint(ctx context.Context) error {
if !p.shouldCheck() {
return nil
}
if err := p.findActiveEndpoints(ctx); err != nil {
return err
}
p.activeTimeout = time.Now().Add(p.checkDuration)
return nil
}
func (p *ActiveL2RollupProvider) shouldCheck() bool {
return time.Now().After(p.activeTimeout)
}
func (p *ActiveL2RollupProvider) findActiveEndpoints(ctx context.Context) error {
startIdx := p.rollupIndex
var errs error
for offset := range p.rollupUrls {
idx := (startIdx + offset) % p.numEndpoints()
if offset != 0 || p.currentRollupClient == nil {
if err := p.dialSequencer(ctx, idx); err != nil {
errs = errors.Join(errs, err)
p.log.Warn("Error dialing next sequencer.", "err", err, "index", p.rollupIndex)
continue
}
}
ep := p.rollupUrls[idx]
if active, err := p.checkCurrentSequencer(ctx); err != nil {
errs = errors.Join(errs, err)
p.log.Warn("Error querying active sequencer, trying next.", "err", err, "index", idx, "url", ep)
} else if active {
if offset == 0 {
p.log.Debug("Current sequencer active.", "index", idx, "url", ep)
} else {
p.log.Info("Found new active sequencer.", "index", idx, "url", ep)
}
return nil
} else {
p.log.Info("Sequencer inactive, trying next.", "index", idx, "url", ep)
}
}
return fmt.Errorf("failed to find an active sequencer, tried following urls: %v; errs: %w", p.rollupUrls, errs)
}
func (p *ActiveL2RollupProvider) checkCurrentSequencer(ctx context.Context) (bool, error) {
cctx, cancel := context.WithTimeout(ctx, p.networkTimeout)
defer cancel()
return p.currentRollupClient.SequencerActive(cctx)
}
func (p *ActiveL2RollupProvider) numEndpoints() int {
return len(p.rollupUrls)
}
// dialSequencer dials the sequencer for the url at the given index.
// If successful, the currentRollupClient and rollupIndex are updated and the
// old rollup client is closed.
func (p *ActiveL2RollupProvider) dialSequencer(ctx context.Context, idx int) error {
cctx, cancel := context.WithTimeout(ctx, p.networkTimeout)
defer cancel()
ep := p.rollupUrls[idx]
p.log.Info("Dialing next sequencer.", "index", idx, "url", ep)
rollupClient, err := p.rollupDialer(cctx, p.networkTimeout, p.log, ep)
if err != nil {
return fmt.Errorf("dialing rollup client: %w", err)
}
if p.currentRollupClient != nil {
p.currentRollupClient.Close()
}
p.rollupIndex = idx
p.currentRollupClient = rollupClient
return nil
}
func (p *ActiveL2RollupProvider) Close() {
if p.currentRollupClient != nil {
p.currentRollupClient.Close()
}
}
package dial
import (
"context"
"math/big"
"github.com/ethereum/go-ethereum/core/types"
)
// EthClientInterface is an interface for providing an ethclient.Client
// It does not describe all of the functions an ethclient.Client has, only the ones used by callers of the L2 Providers
type EthClientInterface interface {
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
Close()
}
package dial
import (
"context"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
)
// RollupClientInterface is an interface for providing a RollupClient
// It does not describe all of the functions a RollupClient has, only the ones used by the L2 Providers and their callers
type RollupClientInterface interface {
OutputAtBlock(ctx context.Context, blockNum uint64) (*eth.OutputResponse, error)
SyncStatus(ctx context.Context) (*eth.SyncStatus, error)
RollupConfig(ctx context.Context) (*rollup.Config, error)
StartSequencer(ctx context.Context, unsafeHead common.Hash) error
SequencerActive(ctx context.Context) (bool, error)
Close()
}
......@@ -13,7 +13,7 @@ import (
type L2EndpointProvider interface {
RollupProvider
// EthClient(ctx) returns the underlying ethclient pointing to the L2 execution node
EthClient(ctx context.Context) (*ethclient.Client, error)
EthClient(ctx context.Context) (EthClientInterface, error)
}
// StaticL2EndpointProvider is a L2EndpointProvider that always returns the same static RollupClient and eth client
......@@ -38,7 +38,7 @@ func NewStaticL2EndpointProvider(ctx context.Context, log log.Logger, ethClientU
}, nil
}
func (p *StaticL2EndpointProvider) EthClient(context.Context) (*ethclient.Client, error) {
func (p *StaticL2EndpointProvider) EthClient(context.Context) (EthClientInterface, error) {
return p.ethClient, nil
}
......
......@@ -11,7 +11,7 @@ import (
// It manages the lifecycle of the RollupClient for callers
type RollupProvider interface {
// RollupClient(ctx) returns the underlying sources.RollupClient pointing to the L2 rollup consensus node
RollupClient(ctx context.Context) (*sources.RollupClient, error)
RollupClient(ctx context.Context) (RollupClientInterface, error)
// Close() closes the underlying client or clients
Close()
}
......@@ -39,7 +39,7 @@ func NewStaticL2RollupProviderFromExistingRollup(rollupCl *sources.RollupClient)
}, nil
}
func (p *StaticL2RollupProvider) RollupClient(context.Context) (*sources.RollupClient, error) {
func (p *StaticL2RollupProvider) RollupClient(context.Context) (RollupClientInterface, error) {
return p.rollupClient, nil
}
......
......@@ -2,6 +2,7 @@ package testutils
import (
"context"
"math/big"
"github.com/stretchr/testify/mock"
......@@ -131,3 +132,24 @@ func (m *MockEthClient) ReadStorageAt(ctx context.Context, address common.Addres
func (m *MockEthClient) ExpectReadStorageAt(ctx context.Context, address common.Address, storageSlot common.Hash, blockHash common.Hash, result common.Hash, err error) {
m.Mock.On("ReadStorageAt", address, storageSlot, blockHash).Once().Return(result, err)
}
func (m *MockEthClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
out := m.Mock.Called(number)
return out.Get(0).(*types.Block), out.Error(1)
}
func (m *MockEthClient) ExpectBlockByNumber(number *big.Int, block *types.Block, err error) {
m.Mock.On("BlockByNumber", number).Once().Return(block, err)
}
func (m *MockEthClient) ExpectClose() {
m.Mock.On("Close").Once()
}
func (m *MockEthClient) MaybeClose() {
m.Mock.On("Close").Maybe()
}
func (m *MockEthClient) Close() {
m.Mock.Called()
}
package testutils
import (
"context"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/mock"
)
type MockRollupClient struct {
mock.Mock
}
func (m *MockRollupClient) OutputAtBlock(ctx context.Context, blockNum uint64) (*eth.OutputResponse, error) {
out := m.Mock.Called(blockNum)
return out.Get(0).(*eth.OutputResponse), out.Error(1)
}
func (m *MockRollupClient) ExpectOutputAtBlock(blockNum uint64, response *eth.OutputResponse, err error) {
m.Mock.On("OutputAtBlock", blockNum).Once().Return(response, err)
}
func (m *MockRollupClient) SyncStatus(ctx context.Context) (*eth.SyncStatus, error) {
out := m.Mock.Called()
return out.Get(0).(*eth.SyncStatus), out.Error(1)
}
func (m *MockRollupClient) ExpectSyncStatus(status *eth.SyncStatus, err error) {
m.Mock.On("SyncStatus").Once().Return(status, err)
}
func (m *MockRollupClient) RollupConfig(ctx context.Context) (*rollup.Config, error) {
out := m.Mock.Called()
return out.Get(0).(*rollup.Config), out.Error(1)
}
func (m *MockRollupClient) ExpectRollupConfig(config *rollup.Config, err error) {
m.Mock.On("RollupConfig").Once().Return(config, err)
}
func (m *MockRollupClient) StartSequencer(ctx context.Context, unsafeHead common.Hash) error {
out := m.Mock.Called(unsafeHead)
return out.Error(0)
}
func (m *MockRollupClient) ExpectStartSequencer(unsafeHead common.Hash, err error) {
m.Mock.On("StartSequencer", unsafeHead).Once().Return(err)
}
func (m *MockRollupClient) SequencerActive(ctx context.Context) (bool, error) {
out := m.Mock.Called()
return out.Bool(0), out.Error(1)
}
func (m *MockRollupClient) ExpectSequencerActive(active bool, err error) {
m.Mock.On("SequencerActive").Once().Return(active, err)
}
func (m *MockRollupClient) ExpectClose() {
m.Mock.On("Close").Once()
}
func (m *MockRollupClient) MaybeClose() {
m.Mock.On("Close").Maybe()
}
func (m *MockRollupClient) Close() {
m.Mock.Called()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment