Commit 16571952 authored by Conner Fromknecht's avatar Conner Fromknecht

feat: define Service and sequencer/proposer drivers

parent c861fa43
......@@ -4,11 +4,15 @@ import (
"context"
"crypto/ecdsa"
"fmt"
"math/big"
"net/http"
"os"
"strconv"
"time"
"github.com/ethereum-optimism/optimism/go/batch-submitter/drivers/proposer"
"github.com/ethereum-optimism/optimism/go/batch-submitter/drivers/sequencer"
"github.com/ethereum-optimism/optimism/go/batch-submitter/txmgr"
l2ethclient "github.com/ethereum-optimism/optimism/l2geth/ethclient"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
......@@ -42,12 +46,25 @@ func Main(gitVersion string) func(ctx *cli.Context) error {
defer sentry.Flush(2 * time.Second)
}
_, err = NewBatchSubmitter(cfg, gitVersion)
log.Info("Initializing batch submitter")
batchSubmitter, err := NewBatchSubmitter(cfg, gitVersion)
if err != nil {
log.Error("Unable to create batch submitter", "error", err)
return err
}
log.Info("Starting batch submitter")
if err := batchSubmitter.Start(); err != nil {
return err
}
defer batchSubmitter.Stop()
log.Info("Batch submitter started")
<-(chan struct{})(nil)
return nil
}
}
......@@ -63,6 +80,9 @@ type BatchSubmitter struct {
proposerPrivKey *ecdsa.PrivateKey
ctcAddress common.Address
sccAddress common.Address
batchTxService *Service
batchStateService *Service
}
// NewBatchSubmitter initializes the BatchSubmitter, gathering any resources
......@@ -135,18 +155,107 @@ func NewBatchSubmitter(cfg Config, gitVersion string) (*BatchSubmitter, error) {
go runMetricsServer(cfg.MetricsHostname, cfg.MetricsPort)
}
chainID, err := l1Client.ChainID(ctx)
if err != nil {
return nil, err
}
txManagerConfig := txmgr.Config{
MinGasPrice: gasPriceFromGwei(1),
MaxGasPrice: gasPriceFromGwei(cfg.MaxGasPriceInGwei),
GasRetryIncrement: gasPriceFromGwei(cfg.GasRetryIncrement),
ResubmissionTimeout: cfg.ResubmissionTimeout,
ReceiptQueryInterval: time.Second,
}
var batchTxService *Service
if cfg.RunTxBatchSubmitter {
batchTxDriver, err := sequencer.NewDriver(sequencer.Config{
Name: "SEQUENCER",
L1Client: l1Client,
L2Client: l2Client,
BlockOffset: cfg.BlockOffset,
MaxTxSize: cfg.MaxL1TxSize,
CTCAddr: ctcAddress,
ChainID: chainID,
PrivKey: sequencerPrivKey,
})
if err != nil {
return nil, err
}
batchTxService = NewService(ServiceConfig{
Context: ctx,
Driver: batchTxDriver,
PollInterval: cfg.PollInterval,
L1Client: l1Client,
TxManagerConfig: txManagerConfig,
})
}
var batchStateService *Service
if cfg.RunStateBatchSubmitter {
batchStateDriver, err := proposer.NewDriver(proposer.Config{
Name: "PROPOSER",
L1Client: l1Client,
L2Client: l2Client,
BlockOffset: cfg.BlockOffset,
MaxTxSize: cfg.MaxL1TxSize,
SCCAddr: sccAddress,
CTCAddr: ctcAddress,
ChainID: chainID,
PrivKey: proposerPrivKey,
})
if err != nil {
return nil, err
}
batchStateService = NewService(ServiceConfig{
Context: ctx,
Driver: batchStateDriver,
PollInterval: cfg.PollInterval,
L1Client: l1Client,
TxManagerConfig: txManagerConfig,
})
}
return &BatchSubmitter{
ctx: ctx,
cfg: cfg,
l1Client: l1Client,
l2Client: l2Client,
sequencerPrivKey: sequencerPrivKey,
proposerPrivKey: proposerPrivKey,
ctcAddress: ctcAddress,
sccAddress: sccAddress,
ctx: ctx,
cfg: cfg,
l1Client: l1Client,
l2Client: l2Client,
sequencerPrivKey: sequencerPrivKey,
proposerPrivKey: proposerPrivKey,
ctcAddress: ctcAddress,
sccAddress: sccAddress,
batchTxService: batchTxService,
batchStateService: batchStateService,
}, nil
}
func (b *BatchSubmitter) Start() error {
if b.cfg.RunTxBatchSubmitter {
if err := b.batchTxService.Start(); err != nil {
return err
}
}
if b.cfg.RunStateBatchSubmitter {
if err := b.batchStateService.Start(); err != nil {
return err
}
}
return nil
}
func (b *BatchSubmitter) Stop() {
if b.cfg.RunTxBatchSubmitter {
_ = b.batchTxService.Stop()
}
if b.cfg.RunStateBatchSubmitter {
_ = b.batchStateService.Stop()
}
}
// parseWalletPrivKeyAndContractAddr returns the wallet private key to use for
// sending transactions as well as the contract address to send to for a
// particular sub-service.
......@@ -226,3 +335,7 @@ func traceRateToFloat64(rate time.Duration) float64 {
}
return rate64
}
func gasPriceFromGwei(gasPriceInGwei uint64) *big.Int {
return new(big.Int).SetUint64(gasPriceInGwei * 1e9)
}
package proposer
import (
"context"
"crypto/ecdsa"
"fmt"
"math/big"
"github.com/ethereum-optimism/optimism/go/batch-submitter/bindings/ctc"
"github.com/ethereum-optimism/optimism/go/batch-submitter/bindings/scc"
l2types "github.com/ethereum-optimism/optimism/l2geth/core/types"
l2ethclient "github.com/ethereum-optimism/optimism/l2geth/ethclient"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
)
var bigOne = new(big.Int).SetUint64(1) //nolint:unused
type Config struct {
Name string
L1Client *ethclient.Client
L2Client *l2ethclient.Client
BlockOffset uint64
MaxTxSize uint64
SCCAddr common.Address
CTCAddr common.Address
ChainID *big.Int
PrivKey *ecdsa.PrivateKey
}
type Driver struct {
cfg Config
sccContract *scc.StateCommitmentChain
ctcContract *ctc.CanonicalTransactionChain
walletAddr common.Address
}
func NewDriver(cfg Config) (*Driver, error) {
sccContract, err := scc.NewStateCommitmentChain(
cfg.SCCAddr, cfg.L1Client,
)
if err != nil {
return nil, err
}
ctcContract, err := ctc.NewCanonicalTransactionChain(
cfg.CTCAddr, cfg.L1Client,
)
if err != nil {
return nil, err
}
walletAddr := crypto.PubkeyToAddress(cfg.PrivKey.PublicKey)
return &Driver{
cfg: cfg,
sccContract: sccContract,
ctcContract: ctcContract,
walletAddr: walletAddr,
}, nil
}
// Name is an identifier used to prefix logs for a particular service.
func (d *Driver) Name() string {
return d.cfg.Name
}
// WalletAddr is the wallet address used to pay for batch transaction fees.
func (d *Driver) WalletAddr() common.Address {
return d.walletAddr
}
// GetBatchBlockRange returns the start and end L2 block heights that need to be
// processed. Note that the end value is *exclusive*, therefore if the returned
// values are identical nothing needs to be processed.
func (d *Driver) GetBatchBlockRange(
ctx context.Context) (*big.Int, *big.Int, error) {
blockOffset := new(big.Int).SetUint64(d.cfg.BlockOffset)
maxBatchSize := new(big.Int).SetUint64(1)
start, err := d.sccContract.GetTotalElements(&bind.CallOpts{
Pending: false,
Context: ctx,
})
if err != nil {
return nil, nil, err
}
start.Add(start, blockOffset)
totalElements, err := d.ctcContract.GetTotalElements(&bind.CallOpts{
Pending: false,
Context: ctx,
})
if err != nil {
return nil, nil, err
}
totalElements.Add(totalElements, blockOffset)
// Take min(start + blockOffset + maxBatchSize, totalElements).
end := new(big.Int).Add(start, maxBatchSize)
if totalElements.Cmp(end) < 0 {
end.Set(totalElements)
}
if start.Cmp(end) > 0 {
return nil, nil, fmt.Errorf("invalid range, "+
"end(%v) < start(%v)", end, start)
}
return start, end, nil
}
// SubmitBatchTx transforms the L2 blocks between start and end into a batch
// transaction using the given nonce and gasPrice. The final transaction is
// published and returned to the call.
func (d *Driver) SubmitBatchTx(
ctx context.Context,
start, end, nonce, gasPrice *big.Int) (*types.Transaction, error) {
var blocks []*l2types.Block
for i := new(big.Int).Set(start); i.Cmp(end) < 0; i.Add(i, bigOne) {
block, err := d.cfg.L2Client.BlockByNumber(ctx, i)
if err != nil {
return nil, err
}
blocks = append(blocks, block)
// TODO(conner): remove when moving to multiple blocks
break //nolint
}
var stateRoots = make([][32]byte, 0, len(blocks))
for _, block := range blocks {
stateRoots = append(stateRoots, block.Root())
}
opts, err := bind.NewKeyedTransactorWithChainID(
d.cfg.PrivKey, d.cfg.ChainID,
)
if err != nil {
return nil, err
}
opts.Nonce = nonce
opts.Context = ctx
opts.GasPrice = gasPrice
blockOffset := new(big.Int).SetUint64(d.cfg.BlockOffset)
offsetStartsAtIndex := new(big.Int).Sub(start, blockOffset)
return d.sccContract.AppendStateBatch(opts, stateRoots, offsetStartsAtIndex)
}
package sequencer
import (
"context"
"crypto/ecdsa"
"encoding/hex"
"fmt"
"math/big"
"strings"
"github.com/ethereum-optimism/optimism/go/batch-submitter/bindings/ctc"
l2types "github.com/ethereum-optimism/optimism/l2geth/core/types"
l2ethclient "github.com/ethereum-optimism/optimism/l2geth/ethclient"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
)
const (
appendSequencerBatchMethodName = "appendSequencerBatch"
)
var bigOne = new(big.Int).SetUint64(1)
type Config struct {
Name string
L1Client *ethclient.Client
L2Client *l2ethclient.Client
BlockOffset uint64
MaxTxSize uint64
CTCAddr common.Address
ChainID *big.Int
PrivKey *ecdsa.PrivateKey
}
type Driver struct {
cfg Config
ctcContract *ctc.CanonicalTransactionChain
rawCtcContract *bind.BoundContract
walletAddr common.Address
ctcABI *abi.ABI
}
func NewDriver(cfg Config) (*Driver, error) {
ctcContract, err := ctc.NewCanonicalTransactionChain(
cfg.CTCAddr, cfg.L1Client,
)
if err != nil {
return nil, err
}
parsed, err := abi.JSON(strings.NewReader(
ctc.CanonicalTransactionChainABI,
))
if err != nil {
return nil, err
}
ctcABI, err := ctc.CanonicalTransactionChainMetaData.GetAbi()
if err != nil {
return nil, err
}
rawCtcContract := bind.NewBoundContract(
cfg.CTCAddr, parsed, cfg.L1Client, cfg.L1Client,
cfg.L1Client,
)
walletAddr := crypto.PubkeyToAddress(cfg.PrivKey.PublicKey)
return &Driver{
cfg: cfg,
ctcContract: ctcContract,
rawCtcContract: rawCtcContract,
walletAddr: walletAddr,
ctcABI: ctcABI,
}, nil
}
// Name is an identifier used to prefix logs for a particular service.
func (d *Driver) Name() string {
return d.cfg.Name
}
// WalletAddr is the wallet address used to pay for batch transaction fees.
func (d *Driver) WalletAddr() common.Address {
return d.walletAddr
}
// GetBatchBlockRange returns the start and end L2 block heights that need to be
// processed. Note that the end value is *exclusive*, therefore if the returned
// values are identical nothing needs to be processed.
func (d *Driver) GetBatchBlockRange(
ctx context.Context) (*big.Int, *big.Int, error) {
blockOffset := new(big.Int).SetUint64(d.cfg.BlockOffset)
start, err := d.ctcContract.GetTotalElements(&bind.CallOpts{
Pending: false,
Context: ctx,
})
if err != nil {
return nil, nil, err
}
start.Add(start, blockOffset)
latestHeader, err := d.cfg.L2Client.HeaderByNumber(ctx, nil)
if err != nil {
return nil, nil, err
}
// Add one because end is *exclusive*.
end := new(big.Int).Add(latestHeader.Number, bigOne)
if start.Cmp(end) > 0 {
return nil, nil, fmt.Errorf("invalid range, "+
"end(%v) < start(%v)", end, start)
}
return start, end, nil
}
// SubmitBatchTx transforms the L2 blocks between start and end into a batch
// transaction using the given nonce and gasPrice. The final transaction is
// published and returned to the call.
func (d *Driver) SubmitBatchTx(
ctx context.Context,
start, end, nonce, gasPrice *big.Int) (*types.Transaction, error) {
name := d.cfg.Name
log.Info(name+" submitting batch tx", "start", start, "end", end,
"gasPrice", gasPrice)
var blocks []*l2types.Block
for i := new(big.Int).Set(start); i.Cmp(end) < 0; i.Add(i, bigOne) {
block, err := d.cfg.L2Client.BlockByNumber(ctx, i)
if err != nil {
return nil, err
}
blocks = append(blocks, block)
// TODO(conner): remove when moving to multiple blocks
break //nolint
}
var batchElements = make([]BatchElement, 0, len(blocks))
for _, block := range blocks {
batchElements = append(batchElements, BatchElementFromBlock(block))
}
shouldStartAt := start.Uint64()
batchParams, err := GenSequencerBatchParams(
shouldStartAt, d.cfg.BlockOffset, batchElements,
)
if err != nil {
return nil, err
}
log.Info(name+" batch params", "params", fmt.Sprintf("%#v", batchParams))
batchArguments, err := batchParams.Serialize()
if err != nil {
return nil, err
}
appendSequencerBatchID := d.ctcABI.Methods[appendSequencerBatchMethodName].ID
batchCallData := append(appendSequencerBatchID, batchArguments...)
if uint64(len(batchCallData)) > d.cfg.MaxTxSize {
panic("call data too large")
}
log.Info(name+" batch call data", "data", hex.EncodeToString(batchCallData))
opts, err := bind.NewKeyedTransactorWithChainID(
d.cfg.PrivKey, d.cfg.ChainID,
)
if err != nil {
return nil, err
}
opts.Nonce = nonce
opts.Context = ctx
opts.GasPrice = gasPrice
return d.rawCtcContract.RawTransact(opts, batchCallData)
}
package batchsubmitter
import (
"context"
"math/big"
"sync"
"time"
"github.com/ethereum-optimism/optimism/go/batch-submitter/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
)
// Driver is an interface for creating and submitting batch transactions for a
// specific contract.
type Driver interface {
// Name is an identifier used to prefix logs for a particular service.
Name() string
// WalletAddr is the wallet address used to pay for batch transaction
// fees.
WalletAddr() common.Address
// GetBatchBlockRange returns the start and end L2 block heights that
// need to be processed. Note that the end value is *exclusive*,
// therefore if the returned values are identical nothing needs to be
// processed.
GetBatchBlockRange(ctx context.Context) (*big.Int, *big.Int, error)
// SubmitBatchTx transforms the L2 blocks between start and end into a
// batch transaction using the given nonce and gasPrice. The final
// transaction is published and returned to the call.
SubmitBatchTx(
ctx context.Context,
start, end, nonce, gasPrice *big.Int,
) (*types.Transaction, error)
}
type ServiceConfig struct {
Context context.Context
Driver Driver
PollInterval time.Duration
L1Client *ethclient.Client
TxManagerConfig txmgr.Config
}
type Service struct {
cfg ServiceConfig
ctx context.Context
cancel func()
txMgr txmgr.TxManager
wg sync.WaitGroup
}
func NewService(cfg ServiceConfig) *Service {
ctx, cancel := context.WithCancel(cfg.Context)
txMgr := txmgr.NewSimpleTxManager(
cfg.Driver.Name(), cfg.TxManagerConfig, cfg.L1Client,
)
return &Service{
cfg: cfg,
ctx: ctx,
cancel: cancel,
txMgr: txMgr,
}
}
func (s *Service) Start() error {
s.wg.Add(1)
go s.eventLoop()
return nil
}
func (s *Service) Stop() error {
s.cancel()
s.wg.Wait()
return nil
}
func (s *Service) eventLoop() {
defer s.wg.Done()
name := s.cfg.Driver.Name()
for {
select {
case <-time.After(s.cfg.PollInterval):
log.Info(name + " fetching current block range")
start, end, err := s.cfg.Driver.GetBatchBlockRange(s.ctx)
if err != nil {
log.Error(name+" unable to get block range", "err", err)
continue
}
log.Info(name+" block range", "start", start, "end", end)
// No new updates.
if start.Cmp(end) == 0 {
continue
}
nonce64, err := s.cfg.L1Client.NonceAt(
s.ctx, s.cfg.Driver.WalletAddr(), nil,
)
if err != nil {
log.Error(name+" unable to get current nonce",
"err", err)
continue
}
nonce := new(big.Int).SetUint64(nonce64)
sendTx := func(
ctx context.Context,
gasPrice *big.Int,
) (*types.Transaction, error) {
log.Info(name+" attempting batch tx", "start", start,
"end", end, "nonce", nonce,
"gasPrice", gasPrice)
return s.cfg.Driver.SubmitBatchTx(
ctx, start, end, nonce, gasPrice,
)
}
receipt, err := s.txMgr.Send(s.ctx, sendTx)
if err != nil {
log.Error(name+" unable to publish batch tx",
"err", err)
continue
}
log.Info(name+" batch tx successfully published",
"tx_hash", receipt.TxHash)
case err := <-s.ctx.Done():
log.Error(name+" service shutting down", "err", err)
return
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment