Commit 287cb116 authored by Adrian Sutton's avatar Adrian Sutton Committed by GitHub

op-challenger: Replace duplicate tx sending code with TxSender (#9186)

Co-authored-by: default avatarrefcell.eth <abigger87@gmail.com>
parent ccc76bc3
......@@ -188,6 +188,26 @@ func TestMaxConcurrency(t *testing.T) {
})
}
func TestMaxPendingTx(t *testing.T) {
t.Run("Valid", func(t *testing.T) {
expected := uint64(345)
cfg := configForArgs(t, addRequiredArgs(config.TraceTypeAlphabet, "--max-pending-tx", "345"))
require.Equal(t, expected, cfg.MaxPendingTx)
})
t.Run("Zero", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs(config.TraceTypeAlphabet, "--max-pending-tx", "0"))
require.Equal(t, uint64(0), cfg.MaxPendingTx)
})
t.Run("Invalid", func(t *testing.T) {
verifyArgsInvalid(
t,
"invalid value \"abc\" for flag -max-pending-tx",
addRequiredArgs(config.TraceTypeAlphabet, "--max-pending-tx", "abc"))
})
}
func TestPollInterval(t *testing.T) {
t.Run("UsesDefault", func(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs(config.TraceTypeCannon))
......
......@@ -92,6 +92,7 @@ const (
// The default value is 11 days, which is a 4 day resolution buffer
// plus the 7 day game finalization window.
DefaultGameWindow = time.Duration(11 * 24 * time.Hour)
DefaultMaxPendingTx = 10
)
// Config is a well typed config that is parsed from the CLI params.
......@@ -122,6 +123,8 @@ type Config struct {
CannonSnapshotFreq uint // Frequency of snapshots to create when executing cannon (in VM instructions)
CannonInfoFreq uint // Frequency of cannon progress log messages (in VM instructions)
MaxPendingTx uint64 // Maximum number of pending transactions (0 == no limit)
TxMgrConfig txmgr.CLIConfig
MetricsConfig opmetrics.CLIConfig
PprofConfig oppprof.CLIConfig
......@@ -141,6 +144,8 @@ func NewConfig(
TraceTypes: supportedTraceTypes,
MaxPendingTx: DefaultMaxPendingTx,
TxMgrConfig: txmgr.NewCLIConfig(l1EthRpc, txmgr.DefaultChallengerFlagValues),
MetricsConfig: opmetrics.DefaultCLIConfig(),
PprofConfig: oppprof.DefaultCLIConfig(),
......
......@@ -63,6 +63,12 @@ var (
EnvVars: prefixEnvVars("MAX_CONCURRENCY"),
Value: uint(runtime.NumCPU()),
}
MaxPendingTransactionsFlag = &cli.Uint64Flag{
Name: "max-pending-tx",
Usage: "The maximum number of pending transactions. 0 for no limit.",
Value: config.DefaultMaxPendingTx,
EnvVars: prefixEnvVars("MAX_PENDING_TX"),
}
HTTPPollInterval = &cli.DurationFlag{
Name: "http-poll-interval",
Usage: "Polling interval for latest-block subscription when using an HTTP RPC provider.",
......@@ -143,6 +149,7 @@ var requiredFlags = []cli.Flag{
var optionalFlags = []cli.Flag{
TraceTypeFlag,
MaxConcurrencyFlag,
MaxPendingTransactionsFlag,
HTTPPollInterval,
RollupRpcFlag,
GameAllowlistFlag,
......@@ -276,6 +283,7 @@ func NewConfigFromCLI(ctx *cli.Context) (*config.Config, error) {
GameAllowlist: allowedGames,
GameWindow: ctx.Duration(GameWindowFlag.Name),
MaxConcurrency: maxConcurrency,
MaxPendingTx: ctx.Uint64(MaxPendingTransactionsFlag.Name),
PollInterval: ctx.Duration(HTTPPollInterval.Name),
RollupRpc: ctx.String(RollupRpcFlag.Name),
CannonNetwork: ctx.String(CannonNetworkFlag.Name),
......
......@@ -18,9 +18,9 @@ import (
// For full op-challenger this means executing the transaction on chain.
type Responder interface {
CallResolve(ctx context.Context) (gameTypes.GameStatus, error)
Resolve(ctx context.Context) error
Resolve() error
CallResolveClaim(ctx context.Context, claimIdx uint64) error
ResolveClaim(ctx context.Context, claimIdx uint64) error
ResolveClaim(claimIdx uint64) error
PerformAction(ctx context.Context, action types.Action) error
}
......@@ -102,7 +102,7 @@ func (a *Agent) tryResolve(ctx context.Context) bool {
return false
}
a.log.Info("Resolving game")
if err := a.responder.Resolve(ctx); err != nil {
if err := a.responder.Resolve(); err != nil {
a.log.Error("Failed to resolve the game", "err", err)
}
return true
......@@ -138,7 +138,7 @@ func (a *Agent) tryResolveClaims(ctx context.Context) error {
claimIdx := claimIdx
go func() {
defer wg.Done()
err := a.responder.ResolveClaim(ctx, uint64(claimIdx))
err := a.responder.ResolveClaim(uint64(claimIdx))
if err != nil {
a.log.Error("Failed to resolve claim", "err", err)
}
......
......@@ -109,7 +109,7 @@ func (s *stubResponder) CallResolve(ctx context.Context) (gameTypes.GameStatus,
return s.callResolveStatus, s.callResolveErr
}
func (s *stubResponder) Resolve(ctx context.Context) error {
func (s *stubResponder) Resolve() error {
s.resolveCount++
return s.resolveErr
}
......@@ -119,7 +119,7 @@ func (s *stubResponder) CallResolveClaim(ctx context.Context, clainIdx uint64) e
return s.callResolveClaimErr
}
func (s *stubResponder) ResolveClaim(ctx context.Context, clainIdx uint64) error {
func (s *stubResponder) ResolveClaim(clainIdx uint64) error {
s.resolveClaimCount++
return nil
}
......
......@@ -10,7 +10,6 @@ import (
"github.com/ethereum-optimism/optimism/op-challenger/game/fault/types"
gameTypes "github.com/ethereum-optimism/optimism/op-challenger/game/types"
"github.com/ethereum-optimism/optimism/op-challenger/metrics"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
......@@ -48,7 +47,7 @@ func NewGamePlayer(
m metrics.Metricer,
dir string,
addr common.Address,
txMgr txmgr.TxManager,
txSender gameTypes.TxSender,
loader GameContract,
validators []Validator,
creator resourceCreator,
......@@ -88,15 +87,15 @@ func NewGamePlayer(
if err != nil {
return nil, fmt.Errorf("failed to load oracle: %w", err)
}
minLargePreimageSize, err := oracle.MinLargePreimageSize(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load min large preimage size: %w", err)
}
direct := preimages.NewDirectPreimageUploader(logger, txMgr, loader)
large := preimages.NewLargePreimageUploader(logger, txMgr, oracle)
direct := preimages.NewDirectPreimageUploader(logger, txSender, loader)
large := preimages.NewLargePreimageUploader(logger, txSender, oracle)
uploader := preimages.NewSplitPreimageUploader(direct, large, minLargePreimageSize)
responder, err := responder.NewFaultResponder(logger, txMgr, loader, uploader, oracle)
responder, err := responder.NewFaultResponder(logger, txSender, loader, uploader, oracle)
if err != nil {
return nil, fmt.Errorf("failed to create the responder: %w", err)
}
......
......@@ -5,8 +5,8 @@ import (
"fmt"
"github.com/ethereum-optimism/optimism/op-challenger/game/fault/types"
gameTypes "github.com/ethereum-optimism/optimism/op-challenger/game/types"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
......@@ -21,12 +21,12 @@ type PreimageGameContract interface {
type DirectPreimageUploader struct {
log log.Logger
txMgr txmgr.TxManager
txSender gameTypes.TxSender
contract PreimageGameContract
}
func NewDirectPreimageUploader(logger log.Logger, txMgr txmgr.TxManager, contract PreimageGameContract) *DirectPreimageUploader {
return &DirectPreimageUploader{logger, txMgr, contract}
func NewDirectPreimageUploader(logger log.Logger, txSender gameTypes.TxSender, contract PreimageGameContract) *DirectPreimageUploader {
return &DirectPreimageUploader{logger, txSender, contract}
}
func (d *DirectPreimageUploader) UploadPreimage(ctx context.Context, claimIdx uint64, data *types.PreimageOracleData) error {
......@@ -38,23 +38,8 @@ func (d *DirectPreimageUploader) UploadPreimage(ctx context.Context, claimIdx ui
if err != nil {
return fmt.Errorf("failed to create pre-image oracle tx: %w", err)
}
if err := d.sendTxAndWait(ctx, candidate); err != nil {
if _, err := d.txSender.SendAndWait("populate pre-image oracle", candidate); err != nil {
return fmt.Errorf("failed to populate pre-image oracle: %w", err)
}
return nil
}
// sendTxAndWait sends a transaction through the [txmgr] and waits for a receipt.
// This sets the tx GasLimit to 0, performing gas estimation online through the [txmgr].
func (d *DirectPreimageUploader) sendTxAndWait(ctx context.Context, candidate txmgr.TxCandidate) error {
receipt, err := d.txMgr.Send(ctx, candidate)
if err != nil {
return err
}
if receipt.Status == ethtypes.ReceiptStatusFailed {
d.log.Error("DirectPreimageUploader tx successfully published but reverted", "tx_hash", receipt.TxHash)
} else {
d.log.Debug("DirectPreimageUploader tx successfully published", "tx_hash", receipt.TxHash)
}
return nil
}
......@@ -53,34 +53,9 @@ func TestDirectPreimageUploader_UploadPreimage(t *testing.T) {
})
}
func TestDirectPreimageUploader_SendTxAndWait(t *testing.T) {
t.Run("SendFails", func(t *testing.T) {
oracle, txMgr, _ := newTestDirectPreimageUploader(t)
txMgr.sendFails = true
err := oracle.sendTxAndWait(context.Background(), txmgr.TxCandidate{})
require.ErrorIs(t, err, mockTxMgrSendError)
require.Equal(t, 1, txMgr.sends)
})
t.Run("ReceiptStatusFailed", func(t *testing.T) {
oracle, txMgr, _ := newTestDirectPreimageUploader(t)
txMgr.statusFail = true
err := oracle.sendTxAndWait(context.Background(), txmgr.TxCandidate{})
require.NoError(t, err)
require.Equal(t, 1, txMgr.sends)
})
t.Run("Success", func(t *testing.T) {
oracle, txMgr, _ := newTestDirectPreimageUploader(t)
err := oracle.sendTxAndWait(context.Background(), txmgr.TxCandidate{})
require.NoError(t, err)
require.Equal(t, 1, txMgr.sends)
})
}
func newTestDirectPreimageUploader(t *testing.T) (*DirectPreimageUploader, *mockTxMgr, *mockPreimageGameContract) {
func newTestDirectPreimageUploader(t *testing.T) (*DirectPreimageUploader, *mockTxSender, *mockPreimageGameContract) {
logger := testlog.Logger(t, log.LvlError)
txMgr := &mockTxMgr{}
txMgr := &mockTxSender{}
contract := &mockPreimageGameContract{}
return NewDirectPreimageUploader(logger, txMgr, contract), txMgr, contract
}
......@@ -98,23 +73,23 @@ func (s *mockPreimageGameContract) UpdateOracleTx(_ context.Context, _ uint64, _
return txmgr.TxCandidate{}, nil
}
type mockTxMgr struct {
type mockTxSender struct {
sends int
sendFails bool
statusFail bool
}
func (s *mockTxMgr) Send(_ context.Context, _ txmgr.TxCandidate) (*ethtypes.Receipt, error) {
func (s *mockTxSender) From() common.Address {
return common.Address{}
}
func (s *mockTxSender) SendAndWait(_ string, _ ...txmgr.TxCandidate) ([]*ethtypes.Receipt, error) {
s.sends++
if s.sendFails {
return nil, mockTxMgrSendError
}
if s.statusFail {
return &ethtypes.Receipt{Status: ethtypes.ReceiptStatusFailed}, nil
return []*ethtypes.Receipt{{Status: ethtypes.ReceiptStatusFailed}}, nil
}
return &ethtypes.Receipt{Status: ethtypes.ReceiptStatusSuccessful}, nil
return []*ethtypes.Receipt{{Status: ethtypes.ReceiptStatusSuccessful}}, nil
}
func (s *mockTxMgr) BlockNumber(_ context.Context) (uint64, error) { return 0, nil }
func (s *mockTxMgr) From() common.Address { return common.Address{} }
func (s *mockTxMgr) Close() {}
......@@ -12,9 +12,9 @@ import (
"github.com/ethereum-optimism/optimism/op-challenger/game/fault/types"
"github.com/ethereum-optimism/optimism/op-challenger/game/keccak/matrix"
keccakTypes "github.com/ethereum-optimism/optimism/op-challenger/game/keccak/types"
gameTypes "github.com/ethereum-optimism/optimism/op-challenger/game/types"
"github.com/ethereum-optimism/optimism/op-service/sources/batching"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
)
......@@ -35,12 +35,12 @@ const MaxChunkSize = MaxBlocksPerChunk * keccakTypes.BlockSize
type LargePreimageUploader struct {
log log.Logger
txMgr txmgr.TxManager
txSender gameTypes.TxSender
contract PreimageOracleContract
}
func NewLargePreimageUploader(logger log.Logger, txMgr txmgr.TxManager, contract PreimageOracleContract) *LargePreimageUploader {
return &LargePreimageUploader{logger, txMgr, contract}
func NewLargePreimageUploader(logger log.Logger, txSender gameTypes.TxSender, contract PreimageOracleContract) *LargePreimageUploader {
return &LargePreimageUploader{logger, txSender, contract}
}
func (p *LargePreimageUploader) UploadPreimage(ctx context.Context, parent uint64, data *types.PreimageOracleData) error {
......@@ -52,7 +52,7 @@ func (p *LargePreimageUploader) UploadPreimage(ctx context.Context, parent uint6
uuid := p.newUUID(data)
// Fetch the current metadata for this preimage data, if it exists.
ident := keccakTypes.LargePreimageIdent{Claimant: p.txMgr.From(), UUID: uuid}
ident := keccakTypes.LargePreimageIdent{Claimant: p.txSender.From(), UUID: uuid}
metadata, err := p.contract.GetProposalMetadata(ctx, batching.BlockLatest, ident)
if err != nil {
return fmt.Errorf("failed to get pre-image oracle metadata: %w", err)
......@@ -60,7 +60,7 @@ func (p *LargePreimageUploader) UploadPreimage(ctx context.Context, parent uint6
// The proposal is not initialized if the queried metadata has a claimed size of 0.
if len(metadata) == 1 && metadata[0].ClaimedSize == 0 {
err = p.initLargePreimage(ctx, uuid, data.OracleOffset, uint32(len(data.OracleData)))
err = p.initLargePreimage(uuid, data.OracleOffset, uint32(len(data.OracleData)))
if err != nil {
return fmt.Errorf("failed to initialize large preimage with uuid: %s: %w", uuid, err)
}
......@@ -76,7 +76,7 @@ func (p *LargePreimageUploader) UploadPreimage(ctx context.Context, parent uint6
}
}
err = p.addLargePreimageData(ctx, uuid, calls)
err = p.addLargePreimageData(uuid, calls)
if err != nil {
return fmt.Errorf("failed to add leaves to large preimage with uuid: %s: %w", uuid, err)
}
......@@ -87,7 +87,7 @@ func (p *LargePreimageUploader) UploadPreimage(ctx context.Context, parent uint6
// newUUID generates a new unique identifier for the preimage by hashing the
// concatenated preimage data, preimage offset, and sender address.
func (p *LargePreimageUploader) newUUID(data *types.PreimageOracleData) *big.Int {
sender := p.txMgr.From()
sender := p.txSender.From()
offset := make([]byte, 4)
binary.LittleEndian.PutUint32(offset, data.OracleOffset)
concatenated := append(data.OracleData, offset...)
......@@ -129,14 +129,14 @@ func (p *LargePreimageUploader) Squeeze(ctx context.Context, uuid *big.Int, stat
// This allows the responder to retry the squeeze later.
// Other errors should force the responder to stop retrying.
// Nil errors should indicate the squeeze was successful.
if err := p.contract.CallSqueeze(ctx, p.txMgr.From(), uuid, stateMatrix, prestate, prestateProof, poststate, poststateProof); err != nil {
if err := p.contract.CallSqueeze(ctx, p.txSender.From(), uuid, stateMatrix, prestate, prestateProof, poststate, poststateProof); err != nil {
return fmt.Errorf("failed to call squeeze: %w", err)
}
tx, err := p.contract.Squeeze(p.txMgr.From(), uuid, stateMatrix, prestate, prestateProof, poststate, poststateProof)
tx, err := p.contract.Squeeze(p.txSender.From(), uuid, stateMatrix, prestate, prestateProof, poststate, poststateProof)
if err != nil {
return fmt.Errorf("failed to create pre-image oracle tx: %w", err)
}
if err := p.sendTxAndWait(ctx, tx); err != nil {
if _, err := p.txSender.SendAndWait("squeeze large preimage", tx); err != nil {
return fmt.Errorf("failed to populate pre-image oracle: %w", err)
}
return nil
......@@ -144,12 +144,12 @@ func (p *LargePreimageUploader) Squeeze(ctx context.Context, uuid *big.Int, stat
// initLargePreimage initializes the large preimage proposal.
// This method *must* be called before adding any leaves.
func (p *LargePreimageUploader) initLargePreimage(ctx context.Context, uuid *big.Int, partOffset uint32, claimedSize uint32) error {
func (p *LargePreimageUploader) initLargePreimage(uuid *big.Int, partOffset uint32, claimedSize uint32) error {
candidate, err := p.contract.InitLargePreimage(uuid, partOffset, claimedSize)
if err != nil {
return fmt.Errorf("failed to create pre-image oracle tx: %w", err)
}
if err := p.sendTxAndWait(ctx, candidate); err != nil {
if _, err := p.txSender.SendAndWait("init large preimage", candidate); err != nil {
return fmt.Errorf("failed to populate pre-image oracle: %w", err)
}
return nil
......@@ -158,9 +158,8 @@ func (p *LargePreimageUploader) initLargePreimage(ctx context.Context, uuid *big
// addLargePreimageData adds data to the large preimage proposal.
// This method **must** be called after calling [initLargePreimage].
// SAFETY: submits transactions in a [Queue] for latency while preserving submission order.
func (p *LargePreimageUploader) addLargePreimageData(ctx context.Context, uuid *big.Int, chunks []keccakTypes.InputData) error {
queue := txmgr.NewQueue[int](ctx, p.txMgr, 10)
receiptChs := make([]chan txmgr.TxReceipt[int], len(chunks))
func (p *LargePreimageUploader) addLargePreimageData(uuid *big.Int, chunks []keccakTypes.InputData) error {
txs := make([]txmgr.TxCandidate, len(chunks))
blocksProcessed := int64(0)
for i, chunk := range chunks {
tx, err := p.contract.AddLeaves(uuid, big.NewInt(blocksProcessed), chunk.Input, chunk.Commitments, chunk.Finalize)
......@@ -168,34 +167,8 @@ func (p *LargePreimageUploader) addLargePreimageData(ctx context.Context, uuid *
return fmt.Errorf("failed to create pre-image oracle tx: %w", err)
}
blocksProcessed += int64(len(chunk.Input) / keccakTypes.BlockSize)
receiptChs[i] = make(chan txmgr.TxReceipt[int], 1)
queue.Send(i, tx, receiptChs[i])
txs[i] = tx
}
for _, receiptCh := range receiptChs {
receipt := <-receiptCh
if receipt.Err != nil {
return receipt.Err
}
if receipt.Receipt.Status == ethtypes.ReceiptStatusFailed {
p.log.Error("LargePreimageUploader add leafs tx successfully published but reverted", "tx_hash", receipt.Receipt.TxHash)
} else {
p.log.Debug("LargePreimageUploader add leafs tx successfully published", "tx_hash", receipt.Receipt.TxHash)
}
}
return nil
}
// sendTxAndWait sends a transaction through the [txmgr] and waits for a receipt.
// This sets the tx GasLimit to 0, performing gas estimation online through the [txmgr].
func (p *LargePreimageUploader) sendTxAndWait(ctx context.Context, candidate txmgr.TxCandidate) error {
receipt, err := p.txMgr.Send(ctx, candidate)
if err != nil {
_, err := p.txSender.SendAndWait("add leaf to large preimage", txs...)
return err
}
if receipt.Status == ethtypes.ReceiptStatusFailed {
p.log.Error("LargePreimageUploader tx successfully published but reverted", "tx_hash", receipt.TxHash)
} else {
p.log.Debug("LargePreimageUploader tx successfully published", "tx_hash", receipt.TxHash)
}
return nil
}
......@@ -262,13 +262,13 @@ func TestLargePreimageUploader_UploadPreimage_Succeeds(t *testing.T) {
}
func newTestLargePreimageUploader(t *testing.T) (*LargePreimageUploader, *mockTxMgr, *mockPreimageOracleContract) {
func newTestLargePreimageUploader(t *testing.T) (*LargePreimageUploader, *mockTxSender, *mockPreimageOracleContract) {
logger := testlog.Logger(t, log.LvlError)
txMgr := &mockTxMgr{}
txSender := &mockTxSender{}
contract := &mockPreimageOracleContract{
addData: make([]byte, 0),
}
return NewLargePreimageUploader(logger, txMgr, contract), txMgr, contract
return NewLargePreimageUploader(logger, txSender, contract), txSender, contract
}
type mockPreimageOracleContract struct {
......
......@@ -14,7 +14,6 @@ import (
"github.com/ethereum-optimism/optimism/op-challenger/game/types"
"github.com/ethereum-optimism/optimism/op-challenger/metrics"
"github.com/ethereum-optimism/optimism/op-service/sources/batching"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
)
......@@ -37,7 +36,7 @@ func RegisterGameTypes(
m metrics.Metricer,
cfg *config.Config,
rollupClient outputs.OutputRollupClient,
txMgr txmgr.TxManager,
txSender types.TxSender,
gameFactory *contracts.DisputeGameFactoryContract,
caller *batching.MultiCaller,
) (CloseFunc, error) {
......@@ -52,12 +51,12 @@ func RegisterGameTypes(
closer = l2Client.Close
}
if cfg.TraceTypeEnabled(config.TraceTypeCannon) {
if err := registerCannon(registry, ctx, logger, m, cfg, rollupClient, txMgr, gameFactory, caller, l2Client); err != nil {
if err := registerCannon(registry, ctx, logger, m, cfg, rollupClient, txSender, gameFactory, caller, l2Client); err != nil {
return nil, fmt.Errorf("failed to register cannon game type: %w", err)
}
}
if cfg.TraceTypeEnabled(config.TraceTypeAlphabet) {
if err := registerAlphabet(registry, ctx, logger, m, rollupClient, txMgr, gameFactory, caller); err != nil {
if err := registerAlphabet(registry, ctx, logger, m, rollupClient, txSender, gameFactory, caller); err != nil {
return nil, fmt.Errorf("failed to register alphabet game type: %w", err)
}
}
......@@ -70,7 +69,7 @@ func registerAlphabet(
logger log.Logger,
m metrics.Metricer,
rollupClient outputs.OutputRollupClient,
txMgr txmgr.TxManager,
txSender types.TxSender,
gameFactory *contracts.DisputeGameFactoryContract,
caller *batching.MultiCaller,
) error {
......@@ -97,7 +96,7 @@ func registerAlphabet(
}
prestateValidator := NewPrestateValidator(contract.GetAbsolutePrestateHash, prestateProvider)
genesisValidator := NewPrestateValidator(contract.GetGenesisOutputRoot, prestateProvider)
return NewGamePlayer(ctx, logger, m, dir, game.Proxy, txMgr, contract, []Validator{prestateValidator, genesisValidator}, creator)
return NewGamePlayer(ctx, logger, m, dir, game.Proxy, txSender, contract, []Validator{prestateValidator, genesisValidator}, creator)
}
oracle, err := createOracle(ctx, gameFactory, caller, alphabetGameType)
if err != nil {
......@@ -130,7 +129,7 @@ func registerCannon(
m metrics.Metricer,
cfg *config.Config,
rollupClient outputs.OutputRollupClient,
txMgr txmgr.TxManager,
txSender types.TxSender,
gameFactory *contracts.DisputeGameFactoryContract,
caller *batching.MultiCaller,
l2Client cannon.L2HeaderSource,
......@@ -158,7 +157,7 @@ func registerCannon(
}
prestateValidator := NewPrestateValidator(contract.GetAbsolutePrestateHash, prestateProvider)
genesisValidator := NewPrestateValidator(contract.GetGenesisOutputRoot, prestateProvider)
return NewGamePlayer(ctx, logger, m, dir, game.Proxy, txMgr, contract, []Validator{prestateValidator, genesisValidator}, creator)
return NewGamePlayer(ctx, logger, m, dir, game.Proxy, txSender, contract, []Validator{prestateValidator, genesisValidator}, creator)
}
oracle, err := createOracle(ctx, gameFactory, caller, cannonGameType)
if err != nil {
......
......@@ -11,7 +11,6 @@ import (
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
......@@ -34,17 +33,17 @@ type Oracle interface {
type FaultResponder struct {
log log.Logger
txMgr txmgr.TxManager
sender gameTypes.TxSender
contract GameContract
uploader preimages.PreimageUploader
oracle Oracle
}
// NewFaultResponder returns a new [FaultResponder].
func NewFaultResponder(logger log.Logger, txMgr txmgr.TxManager, contract GameContract, uploader preimages.PreimageUploader, oracle Oracle) (*FaultResponder, error) {
func NewFaultResponder(logger log.Logger, sender gameTypes.TxSender, contract GameContract, uploader preimages.PreimageUploader, oracle Oracle) (*FaultResponder, error) {
return &FaultResponder{
log: logger,
txMgr: txMgr,
sender: sender,
contract: contract,
uploader: uploader,
oracle: oracle,
......@@ -58,13 +57,13 @@ func (r *FaultResponder) CallResolve(ctx context.Context) (gameTypes.GameStatus,
}
// Resolve executes a resolve transaction to resolve a fault dispute game.
func (r *FaultResponder) Resolve(ctx context.Context) error {
func (r *FaultResponder) Resolve() error {
candidate, err := r.contract.ResolveTx()
if err != nil {
return err
}
return r.sendTxAndWait(ctx, candidate)
return r.sendTxAndWait("resolve game", candidate)
}
// CallResolveClaim determines if the resolveClaim function on the fault dispute game contract
......@@ -74,12 +73,12 @@ func (r *FaultResponder) CallResolveClaim(ctx context.Context, claimIdx uint64)
}
// ResolveClaim executes a resolveClaim transaction to resolve a fault dispute game.
func (r *FaultResponder) ResolveClaim(ctx context.Context, claimIdx uint64) error {
func (r *FaultResponder) ResolveClaim(claimIdx uint64) error {
candidate, err := r.contract.ResolveClaimTx(claimIdx)
if err != nil {
return err
}
return r.sendTxAndWait(ctx, candidate)
return r.sendTxAndWait("resolve claim", candidate)
}
func (r *FaultResponder) PerformAction(ctx context.Context, action types.Action) error {
......@@ -124,20 +123,12 @@ func (r *FaultResponder) PerformAction(ctx context.Context, action types.Action)
if err != nil {
return err
}
return r.sendTxAndWait(ctx, candidate)
return r.sendTxAndWait("perform action", candidate)
}
// sendTxAndWait sends a transaction through the [txmgr] and waits for a receipt.
// This sets the tx GasLimit to 0, performing gas estimation online through the [txmgr].
func (r *FaultResponder) sendTxAndWait(ctx context.Context, candidate txmgr.TxCandidate) error {
receipt, err := r.txMgr.Send(ctx, candidate)
if err != nil {
func (r *FaultResponder) sendTxAndWait(purpose string, candidate txmgr.TxCandidate) error {
_, err := r.sender.SendAndWait(purpose, candidate)
return err
}
if receipt.Status == ethtypes.ReceiptStatusFailed {
r.log.Error("Responder tx successfully published but reverted", "tx_hash", receipt.TxHash)
} else {
r.log.Debug("Responder tx successfully published", "tx_hash", receipt.TxHash)
}
return nil
}
......@@ -50,14 +50,14 @@ func TestResolve(t *testing.T) {
t.Run("SendFails", func(t *testing.T) {
responder, mockTxMgr, _, _, _ := newTestFaultResponder(t)
mockTxMgr.sendFails = true
err := responder.Resolve(context.Background())
err := responder.Resolve()
require.ErrorIs(t, err, mockSendError)
require.Equal(t, 0, mockTxMgr.sends)
})
t.Run("Success", func(t *testing.T) {
responder, mockTxMgr, _, _, _ := newTestFaultResponder(t)
err := responder.Resolve(context.Background())
err := responder.Resolve()
require.NoError(t, err)
require.Equal(t, 1, mockTxMgr.sends)
})
......@@ -84,14 +84,14 @@ func TestResolveClaim(t *testing.T) {
t.Run("SendFails", func(t *testing.T) {
responder, mockTxMgr, _, _, _ := newTestFaultResponder(t)
mockTxMgr.sendFails = true
err := responder.ResolveClaim(context.Background(), 0)
err := responder.ResolveClaim(0)
require.ErrorIs(t, err, mockSendError)
require.Equal(t, 0, mockTxMgr.sends)
})
t.Run("Success", func(t *testing.T) {
responder, mockTxMgr, _, _, _ := newTestFaultResponder(t)
err := responder.ResolveClaim(context.Background(), 0)
err := responder.ResolveClaim(0)
require.NoError(t, err)
require.Equal(t, 1, mockTxMgr.sends)
})
......@@ -325,17 +325,21 @@ type mockTxManager struct {
sendFails bool
}
func (m *mockTxManager) Send(_ context.Context, candidate txmgr.TxCandidate) (*ethtypes.Receipt, error) {
func (m *mockTxManager) SendAndWait(_ string, txs ...txmgr.TxCandidate) ([]*ethtypes.Receipt, error) {
rcpts := make([]*ethtypes.Receipt, 0, len(txs))
for _, tx := range txs {
if m.sendFails {
return nil, mockSendError
}
m.sends++
m.sent = append(m.sent, candidate)
return ethtypes.NewReceipt(
m.sent = append(m.sent, tx)
rcpts = append(rcpts, ethtypes.NewReceipt(
[]byte{},
false,
0,
), nil
))
}
return rcpts, nil
}
func (m *mockTxManager) BlockNumber(_ context.Context) (uint64, error) {
......
......@@ -9,6 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-challenger/game/keccak"
"github.com/ethereum-optimism/optimism/op-challenger/game/keccak/fetcher"
"github.com/ethereum-optimism/optimism/op-challenger/sender"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
......@@ -42,6 +43,7 @@ type Service struct {
preimages *keccak.LargePreimageScheduler
txMgr *txmgr.SimpleTxManager
txSender *sender.TxSender
loader *loader.GameLoader
......@@ -76,7 +78,7 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*Se
}
func (s *Service) initFromConfig(ctx context.Context, cfg *config.Config) error {
if err := s.initTxManager(cfg); err != nil {
if err := s.initTxManager(ctx, cfg); err != nil {
return fmt.Errorf("failed to init tx manager: %w", err)
}
if err := s.initL1Client(ctx, cfg); err != nil {
......@@ -117,12 +119,13 @@ func (s *Service) initFromConfig(ctx context.Context, cfg *config.Config) error
return nil
}
func (s *Service) initTxManager(cfg *config.Config) error {
func (s *Service) initTxManager(ctx context.Context, cfg *config.Config) error {
txMgr, err := txmgr.NewSimpleTxManager("challenger", s.logger, s.metrics, cfg.TxMgrConfig)
if err != nil {
return fmt.Errorf("failed to create the transaction manager: %w", err)
}
s.txMgr = txMgr
s.txSender = sender.NewTxSender(ctx, s.logger, txMgr, cfg.MaxPendingTx)
return nil
}
......@@ -176,7 +179,7 @@ func (s *Service) initMetricsServer(cfg *opmetrics.CLIConfig) error {
}
s.logger.Info("started metrics server", "addr", metricsSrv.Addr())
s.metricsSrv = metricsSrv
s.balanceMetricer = s.metrics.StartBalanceMetrics(s.logger, s.l1Client, s.txMgr.From())
s.balanceMetricer = s.metrics.StartBalanceMetrics(s.logger, s.l1Client, s.txSender.From())
return nil
}
......@@ -210,7 +213,7 @@ func (s *Service) initRollupClient(ctx context.Context, cfg *config.Config) erro
func (s *Service) registerGameTypes(ctx context.Context, cfg *config.Config) error {
gameTypeRegistry := registry.NewGameTypeRegistry()
caller := batching.NewMultiCaller(s.l1Client.Client(), batching.DefaultBatchSize)
closer, err := fault.RegisterGameTypes(gameTypeRegistry, ctx, s.logger, s.metrics, cfg, s.rollupClient, s.txMgr, s.factoryContract, caller)
closer, err := fault.RegisterGameTypes(gameTypeRegistry, ctx, s.logger, s.metrics, cfg, s.rollupClient, s.txSender, s.factoryContract, caller)
if err != nil {
return err
}
......
......@@ -3,7 +3,9 @@ package types
import (
"fmt"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
)
type GameStatus uint8
......@@ -41,3 +43,8 @@ type GameMetadata struct {
Timestamp uint64
Proxy common.Address
}
type TxSender interface {
From() common.Address
SendAndWait(txPurpose string, txs ...txmgr.TxCandidate) ([]*ethtypes.Receipt, error)
}
package sender
import (
"context"
"errors"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
type TxSender struct {
log log.Logger
txMgr txmgr.TxManager
queue *txmgr.Queue[int]
}
func NewTxSender(ctx context.Context, logger log.Logger, txMgr txmgr.TxManager, maxPending uint64) *TxSender {
queue := txmgr.NewQueue[int](ctx, txMgr, maxPending)
return &TxSender{
log: logger,
txMgr: txMgr,
queue: queue,
}
}
func (s *TxSender) From() common.Address {
return s.txMgr.From()
}
func (s *TxSender) SendAndWait(txPurpose string, txs ...txmgr.TxCandidate) ([]*types.Receipt, error) {
receiptsCh := make(chan txmgr.TxReceipt[int], len(txs))
for i, tx := range txs {
s.queue.Send(i, tx, receiptsCh)
}
receipts := make([]*types.Receipt, len(txs))
completed := 0
var errs []error
for completed < len(txs) {
rcpt := <-receiptsCh
receipts[rcpt.ID] = rcpt.Receipt
completed++
if rcpt.Err != nil {
errs = append(errs, rcpt.Err)
} else if rcpt.Receipt != nil {
if rcpt.Receipt.Status != types.ReceiptStatusSuccessful {
s.log.Error("Transaction published but reverted", "tx_hash", rcpt.Receipt.TxHash, "purpose", txPurpose)
} else {
s.log.Debug("Transaction successfully published", "tx_hash", rcpt.Receipt.TxHash, "purpose", txPurpose)
}
}
}
return receipts, errors.Join(errs...)
}
package sender
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
"golang.org/x/exp/maps"
)
func TestSendAndWait(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
txMgr := &stubTxMgr{sending: make(map[byte]chan *types.Receipt)}
sender := NewTxSender(ctx, testlog.Logger(t, log.LvlInfo), txMgr, 5)
tx := func(i byte) txmgr.TxCandidate {
return txmgr.TxCandidate{TxData: []byte{i}}
}
sendAsync := func(txs ...txmgr.TxCandidate) chan []*types.Receipt {
ch := make(chan []*types.Receipt, 1)
go func() {
rcpts, err := sender.SendAndWait("testing", txs...)
require.NoError(t, err)
ch <- rcpts
close(ch)
}()
return ch
}
wait := func(ch chan []*types.Receipt) []*types.Receipt {
select {
case rcpts := <-ch:
return rcpts
case <-ctx.Done():
require.FailNow(t, "Timeout waiting for receipt")
return nil
}
}
batch1 := sendAsync(tx(1), tx(2), tx(3))
batch2 := sendAsync(tx(4), tx(5))
require.Eventually(t, func() bool {
return txMgr.sentCount() == 5
}, 10*time.Second, 1*time.Millisecond, "Wait for first transactions to send")
require.Len(t, batch1, 0, "Should not have completed batch1")
require.Len(t, batch2, 0, "Should not have completed batch2")
// Send a third batch after the first set have started sending to avoid races
batch3 := sendAsync(tx(6))
require.Len(t, batch3, 0, "Should not have completed batch3")
// Sends the 6th tx after one of the previous ones completes
txMgr.txSuccess(tx(5))
require.Eventually(t, func() bool {
return txMgr.sentCount() == 6
}, 10*time.Second, 1*time.Millisecond, "Wait for final transaction to send")
require.Len(t, batch1, 0, "Should not have completed batch1")
require.Len(t, batch2, 0, "Should not have completed batch2")
require.Len(t, batch3, 0, "Should not have completed batch3")
// Batches complete as soon as they are sent
txMgr.txSuccess(tx(6))
require.Len(t, wait(batch3), 1, "Batch3 should complete")
require.Len(t, batch1, 0, "Should not have completed batch1")
require.Len(t, batch2, 0, "Should not have completed batch2")
txMgr.txSuccess(tx(2))
txMgr.txSuccess(tx(3))
require.Len(t, batch1, 0, "Should not have completed batch1")
require.Len(t, batch2, 0, "Should not have completed batch2")
txMgr.txSuccess(tx(1))
require.Len(t, wait(batch1), 3, "Batch1 should complete")
require.Len(t, batch2, 0, "Should not have completed batch2")
txMgr.txSuccess(tx(4))
require.Len(t, wait(batch2), 2, "Batch2 should complete")
}
type stubTxMgr struct {
m sync.Mutex
sending map[byte]chan *types.Receipt
}
func (s *stubTxMgr) Send(ctx context.Context, candidate txmgr.TxCandidate) (*types.Receipt, error) {
ch := s.recordTx(candidate)
return <-ch, nil
}
func (s *stubTxMgr) recordTx(candidate txmgr.TxCandidate) chan *types.Receipt {
s.m.Lock()
defer s.m.Unlock()
id := candidate.TxData[0]
if _, ok := s.sending[id]; ok {
// Shouldn't happen if tests are well written, but double check...
panic("Sending duplicate transaction")
}
ch := make(chan *types.Receipt, 1)
s.sending[id] = ch
return ch
}
func (s *stubTxMgr) txSuccess(candidate txmgr.TxCandidate) {
s.m.Lock()
defer s.m.Unlock()
ch, ok := s.sending[candidate.TxData[0]]
if !ok {
// Shouldn't happen if tests are well written, but double check...
panic(fmt.Sprintf("Completing unknown transaction: %v Known: %v", candidate.TxData[0], maps.Keys(s.sending)))
}
ch <- &types.Receipt{Status: types.ReceiptStatusSuccessful}
close(ch)
}
func (s *stubTxMgr) sentCount() int {
s.m.Lock()
defer s.m.Unlock()
return len(s.sending)
}
func (s *stubTxMgr) From() common.Address {
panic("unsupported")
}
func (s *stubTxMgr) BlockNumber(_ context.Context) (uint64, error) {
panic("unsupported")
}
func (s *stubTxMgr) Close() {
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment