Commit 88b1a95a authored by Samuel Laferriere's avatar Samuel Laferriere Committed by GitHub

feat: concurrent alt-da requests (#11698)

* feat: initial goroutine blob submission implementation

test(batcher): add e2e test for concurrent altda requests

doc: add explanation comment for FakeDAServer

chore: fix if condition in altda sendTransaction path

feat: add maxConcurrentDaRequests config flag + semaphore

refactor: batcher to use errgroup for da instead of separate semaphore/waitgroup

fix: nil pointer bug after using wrong function after rebase

fix: defn of maxConcurrentDaRequests=0

fix: TestBatcherConcurrentAltDARequests

chore: remove unneeded if statement around time.Sleep

refactor: use TryGo instead of Go to make logic local and easier to read

chore: clean up some comments in batcher

chore: make batcher shutdown cancel pending altda requests by using shutdownCtx instead of killCtx

* chore(batcher): make altda wg wait + log only when useAltDa is true

* refactor: batcher altda submission code into its own function

* test: refactor batcher e2e test to only count batcher txs

* chore: log errors from wait functions

* chore: refactor and minimize time that e2e batcher system tests can run

* chore: lower timeout duration in test

* fix(batcher): maxConcurentDARequests was not being initialized
parent 144a7750
......@@ -3,6 +3,7 @@ package altda
import (
"fmt"
"net/url"
"time"
"github.com/urfave/cli/v2"
)
......@@ -11,7 +12,10 @@ var (
EnabledFlagName = altDAFlags("enabled")
DaServerAddressFlagName = altDAFlags("da-server")
VerifyOnReadFlagName = altDAFlags("verify-on-read")
DaServiceFlag = altDAFlags("da-service")
DaServiceFlagName = altDAFlags("da-service")
PutTimeoutFlagName = altDAFlags("put-timeout")
GetTimeoutFlagName = altDAFlags("get-timeout")
MaxConcurrentRequestsFlagName = altDAFlags("max-concurrent-da-requests")
)
// altDAFlags returns the flag names for altDA
......@@ -46,12 +50,30 @@ func CLIFlags(envPrefix string, category string) []cli.Flag {
Category: category,
},
&cli.BoolFlag{
Name: DaServiceFlag,
Name: DaServiceFlagName,
Usage: "Use DA service type where commitments are generated by Alt-DA server",
Value: false,
EnvVars: altDAEnvs(envPrefix, "DA_SERVICE"),
Category: category,
},
&cli.DurationFlag{
Name: PutTimeoutFlagName,
Usage: "Timeout for put requests. 0 means no timeout.",
Value: time.Duration(0),
EnvVars: altDAEnvs(envPrefix, "PUT_TIMEOUT"),
},
&cli.DurationFlag{
Name: GetTimeoutFlagName,
Usage: "Timeout for get requests. 0 means no timeout.",
Value: time.Duration(0),
EnvVars: altDAEnvs(envPrefix, "GET_TIMEOUT"),
},
&cli.Uint64Flag{
Name: MaxConcurrentRequestsFlagName,
Usage: "Maximum number of concurrent requests to the DA server",
Value: 1,
EnvVars: altDAEnvs(envPrefix, "MAX_CONCURRENT_DA_REQUESTS"),
},
}
}
......@@ -60,6 +82,9 @@ type CLIConfig struct {
DAServerURL string
VerifyOnRead bool
GenericDA bool
PutTimeout time.Duration
GetTimeout time.Duration
MaxConcurrentRequests uint64
}
func (c CLIConfig) Check() error {
......@@ -75,7 +100,7 @@ func (c CLIConfig) Check() error {
}
func (c CLIConfig) NewDAClient() *DAClient {
return &DAClient{url: c.DAServerURL, verify: c.VerifyOnRead, precompute: !c.GenericDA}
return &DAClient{url: c.DAServerURL, verify: c.VerifyOnRead, precompute: !c.GenericDA, getTimeout: c.GetTimeout, putTimeout: c.PutTimeout}
}
func ReadCLIConfig(c *cli.Context) CLIConfig {
......@@ -83,6 +108,9 @@ func ReadCLIConfig(c *cli.Context) CLIConfig {
Enabled: c.Bool(EnabledFlagName),
DAServerURL: c.String(DaServerAddressFlagName),
VerifyOnRead: c.Bool(VerifyOnReadFlagName),
GenericDA: c.Bool(DaServiceFlag),
GenericDA: c.Bool(DaServiceFlagName),
PutTimeout: c.Duration(PutTimeoutFlagName),
GetTimeout: c.Duration(GetTimeoutFlagName),
MaxConcurrentRequests: c.Uint64(MaxConcurrentRequestsFlagName),
}
}
......@@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"time"
)
// ErrNotFound is returned when the server could not find the input.
......@@ -23,10 +24,16 @@ type DAClient struct {
verify bool
// whether commitment is precomputable (only applicable to keccak256)
precompute bool
getTimeout time.Duration
putTimeout time.Duration
}
func NewDAClient(url string, verify bool, pc bool) *DAClient {
return &DAClient{url, verify, pc}
return &DAClient{
url: url,
verify: verify,
precompute: pc,
}
}
// GetInput returns the input data for the given encoded commitment bytes.
......@@ -35,7 +42,8 @@ func (c *DAClient) GetInput(ctx context.Context, comm CommitmentData) ([]byte, e
if err != nil {
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
}
resp, err := http.DefaultClient.Do(req)
client := &http.Client{Timeout: c.getTimeout}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
......@@ -91,7 +99,8 @@ func (c *DAClient) setInputWithCommit(ctx context.Context, comm CommitmentData,
return fmt.Errorf("failed to create HTTP request: %w", err)
}
req.Header.Set("Content-Type", "application/octet-stream")
resp, err := http.DefaultClient.Do(req)
client := &http.Client{Timeout: c.putTimeout}
resp, err := client.Do(req)
if err != nil {
return err
}
......@@ -116,7 +125,8 @@ func (c *DAClient) setInput(ctx context.Context, img []byte) (CommitmentData, er
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
}
req.Header.Set("Content-Type", "application/octet-stream")
resp, err := http.DefaultClient.Do(req)
client := &http.Client{Timeout: c.putTimeout}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
......
......@@ -2,48 +2,14 @@ package altda
import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
type MemStore struct {
db map[string][]byte
lock sync.RWMutex
}
func NewMemStore() *MemStore {
return &MemStore{
db: make(map[string][]byte),
}
}
// Get retrieves the given key if it's present in the key-value store.
func (s *MemStore) Get(ctx context.Context, key []byte) ([]byte, error) {
s.lock.RLock()
defer s.lock.RUnlock()
if entry, ok := s.db[string(key)]; ok {
return common.CopyBytes(entry), nil
}
return nil, ErrNotFound
}
// Put inserts the given value into the key-value store.
func (s *MemStore) Put(ctx context.Context, key []byte, value []byte) error {
s.lock.Lock()
defer s.lock.Unlock()
s.db[string(key)] = common.CopyBytes(value)
return nil
}
func TestDAClientPrecomputed(t *testing.T) {
store := NewMemStore()
logger := testlog.Logger(t, log.LevelDebug)
......@@ -56,7 +22,7 @@ func TestDAClientPrecomputed(t *testing.T) {
cfg := CLIConfig{
Enabled: true,
DAServerURL: fmt.Sprintf("http://%s", server.Endpoint()),
DAServerURL: server.HttpEndpoint(),
VerifyOnRead: true,
}
require.NoError(t, cfg.Check())
......@@ -113,7 +79,7 @@ func TestDAClientService(t *testing.T) {
cfg := CLIConfig{
Enabled: true,
DAServerURL: fmt.Sprintf("http://%s", server.Endpoint()),
DAServerURL: server.HttpEndpoint(),
VerifyOnRead: false,
GenericDA: false,
}
......
......@@ -4,8 +4,12 @@ import (
"context"
"errors"
"io"
"net/http"
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/log"
......@@ -99,3 +103,84 @@ func (d *AltDADisabled) OnFinalizedHeadSignal(f HeadSignalFn) {
func (d *AltDADisabled) AdvanceL1Origin(ctx context.Context, l1 L1Fetcher, blockId eth.BlockID) error {
return ErrNotEnabled
}
// FakeDAServer is a fake DA server for e2e tests.
// It is a small wrapper around DAServer that allows for setting request latencies,
// to mimic a DA service with slow responses (eg. eigenDA with 10 min batching interval).
type FakeDAServer struct {
*DAServer
putRequestLatency time.Duration
getRequestLatency time.Duration
}
func NewFakeDAServer(host string, port int, log log.Logger) *FakeDAServer {
store := NewMemStore()
fakeDAServer := &FakeDAServer{
DAServer: NewDAServer(host, port, store, log, true),
putRequestLatency: 0,
getRequestLatency: 0,
}
return fakeDAServer
}
func (s *FakeDAServer) HandleGet(w http.ResponseWriter, r *http.Request) {
time.Sleep(s.getRequestLatency)
s.DAServer.HandleGet(w, r)
}
func (s *FakeDAServer) HandlePut(w http.ResponseWriter, r *http.Request) {
time.Sleep(s.putRequestLatency)
s.DAServer.HandlePut(w, r)
}
func (s *FakeDAServer) Start() error {
err := s.DAServer.Start()
if err != nil {
return err
}
// Override the HandleGet/Put method registrations
mux := http.NewServeMux()
mux.HandleFunc("/get/", s.HandleGet)
mux.HandleFunc("/put/", s.HandlePut)
s.httpServer.Handler = mux
return nil
}
func (s *FakeDAServer) SetPutRequestLatency(latency time.Duration) {
s.putRequestLatency = latency
}
func (s *FakeDAServer) SetGetRequestLatency(latency time.Duration) {
s.getRequestLatency = latency
}
type MemStore struct {
db map[string][]byte
lock sync.RWMutex
}
func NewMemStore() *MemStore {
return &MemStore{
db: make(map[string][]byte),
}
}
// Get retrieves the given key if it's present in the key-value store.
func (s *MemStore) Get(ctx context.Context, key []byte) ([]byte, error) {
s.lock.RLock()
defer s.lock.RUnlock()
if entry, ok := s.db[string(key)]; ok {
return common.CopyBytes(entry), nil
}
return nil, ErrNotFound
}
// Put inserts the given value into the key-value store.
func (s *MemStore) Put(ctx context.Context, key []byte, value []byte) error {
s.lock.Lock()
defer s.lock.Unlock()
s.db[string(key)] = common.CopyBytes(value)
return nil
}
......@@ -187,8 +187,8 @@ func (d *DAServer) HandlePut(w http.ResponseWriter, r *http.Request) {
}
}
func (b *DAServer) Endpoint() string {
return b.listener.Addr().String()
func (b *DAServer) HttpEndpoint() string {
return fmt.Sprintf("http://%s", b.listener.Addr().String())
}
func (b *DAServer) Stop() error {
......
......@@ -22,6 +22,7 @@ import (
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"golang.org/x/sync/errgroup"
)
var (
......@@ -302,6 +303,12 @@ func (l *BatchSubmitter) loop() {
receiptsCh := make(chan txmgr.TxReceipt[txRef])
queue := txmgr.NewQueue[txRef](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions)
daGroup := &errgroup.Group{}
// errgroup with limit of 0 means no goroutine is able to run concurrently,
// so we only set the limit if it is greater than 0.
if l.Config.MaxConcurrentDARequests > 0 {
daGroup.SetLimit(int(l.Config.MaxConcurrentDARequests))
}
// start the receipt/result processing loop
receiptLoopDone := make(chan struct{})
......@@ -339,9 +346,20 @@ func (l *BatchSubmitter) loop() {
defer ticker.Stop()
publishAndWait := func() {
l.publishStateToL1(queue, receiptsCh)
l.publishStateToL1(queue, receiptsCh, daGroup)
if !l.Txmgr.IsClosed() {
queue.Wait()
if l.Config.UseAltDA {
l.Log.Info("Waiting for altDA writes to complete...")
err := daGroup.Wait()
if err != nil {
l.Log.Error("Error returned by one of the altda goroutines waited on", "err", err)
}
}
l.Log.Info("Waiting for L1 txs to be confirmed...")
err := queue.Wait()
if err != nil {
l.Log.Error("Error returned by one of the txmgr goroutines waited on", "err", err)
}
} else {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
}
......@@ -368,7 +386,7 @@ func (l *BatchSubmitter) loop() {
l.clearState(l.shutdownCtx)
continue
}
l.publishStateToL1(queue, receiptsCh)
l.publishStateToL1(queue, receiptsCh, daGroup)
case <-l.shutdownCtx.Done():
if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
......@@ -425,7 +443,7 @@ func (l *BatchSubmitter) waitNodeSync() error {
// publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is
// no more data to queue for publishing or if there was an error queing the data.
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) {
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) {
for {
// if the txmgr is closed, we stop the transaction sending
if l.Txmgr.IsClosed() {
......@@ -436,7 +454,7 @@ func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh
l.Log.Info("txpool state is not good, aborting state publishing")
return
}
err := l.publishTxToL1(l.killCtx, queue, receiptsCh)
err := l.publishTxToL1(l.killCtx, queue, receiptsCh, daGroup)
if err != nil {
if err != io.EOF {
......@@ -487,7 +505,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) {
}
// publishTxToL1 submits a single state tx to the L1
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error {
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) error {
// send all available transactions
l1tip, err := l.l1Tip(ctx)
if err != nil {
......@@ -496,7 +514,8 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
}
l.recordL1Tip(l1tip)
// Collect next transaction data
// Collect next transaction data. This pulls data out of the channel, so we need to make sure
// to put it back if ever da or txmgr requests fail, by calling l.recordFailedDARequest/recordFailedTx.
txdata, err := l.state.TxData(l1tip.ID())
if err == io.EOF {
......@@ -507,7 +526,7 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
return err
}
if err = l.sendTransaction(ctx, txdata, queue, receiptsCh); err != nil {
if err = l.sendTransaction(txdata, queue, receiptsCh, daGroup); err != nil {
return fmt.Errorf("BatchSubmitter.sendTransaction failed: %w", err)
}
return nil
......@@ -552,12 +571,56 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh
l.sendTx(txData{}, true, candidate, queue, receiptsCh)
}
// publishToAltDAAndL1 posts the txdata to the DA Provider and then sends the commitment to L1.
func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) {
// sanity checks
if nf := len(txdata.frames); nf != 1 {
l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf)
}
if txdata.asBlob {
l.Log.Crit("Unexpected blob txdata with AltDA enabled")
}
// when posting txdata to an external DA Provider, we use a goroutine to avoid blocking the main loop
// since it may take a while for the request to return.
goroutineSpawned := daGroup.TryGo(func() error {
// TODO: probably shouldn't be using the global shutdownCtx here, see https://go.dev/blog/context-and-structs
// but sendTransaction receives l.killCtx as an argument, which currently is only canceled after waiting for the main loop
// to exit, which would wait on this DA call to finish, which would take a long time.
// So we prefer to mimic the behavior of txmgr and cancel all pending DA/txmgr requests when the batcher is stopped.
comm, err := l.AltDA.SetInput(l.shutdownCtx, txdata.CallData())
if err != nil {
l.Log.Error("Failed to post input to Alt DA", "error", err)
// requeue frame if we fail to post to the DA Provider so it can be retried
// note: this assumes that the da server caches requests, otherwise it might lead to resubmissions of the blobs
l.recordFailedDARequest(txdata.ID(), err)
return nil
}
l.Log.Info("Set altda input", "commitment", comm, "tx", txdata.ID())
candidate := l.calldataTxCandidate(comm.TxData())
l.sendTx(txdata, false, candidate, queue, receiptsCh)
return nil
})
if !goroutineSpawned {
// We couldn't start the goroutine because the errgroup.Group limit
// is already reached. Since we can't send the txdata, we have to
// return it for later processing. We use nil error to skip error logging.
l.recordFailedDARequest(txdata.ID(), nil)
}
}
// sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`.
// This call will block if the txmgr queue is at the max-pending limit.
// The method will block if the queue's MaxPendingTransactions is exceeded.
func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error {
func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) error {
var err error
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.
// if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment.
if l.Config.UseAltDA {
l.publishToAltDAAndL1(txdata, queue, receiptsCh, daGroup)
// we return nil to allow publishStateToL1 to keep processing the next txdata
return nil
}
var candidate *txmgr.TxCandidate
if txdata.asBlob {
......@@ -573,21 +636,7 @@ func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, que
if nf := len(txdata.frames); nf != 1 {
l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf)
}
data := txdata.CallData()
// if AltDA is enabled we post the txdata to the DA Provider and replace it with the commitment.
if l.Config.UseAltDA {
comm, err := l.AltDA.SetInput(ctx, data)
if err != nil {
l.Log.Error("Failed to post input to Alt DA", "error", err)
// requeue frame if we fail to post to the DA Provider so it can be retried
l.recordFailedTx(txdata.ID(), err)
return nil
}
l.Log.Info("Set AltDA input", "commitment", comm, "tx", txdata.ID())
// signal AltDA commitment tx with TxDataVersion1
data = comm.TxData()
}
candidate = l.calldataTxCandidate(data)
candidate = l.calldataTxCandidate(txdata.CallData())
}
l.sendTx(txdata, false, candidate, queue, receiptsCh)
......@@ -649,6 +698,13 @@ func (l *BatchSubmitter) recordL1Tip(l1tip eth.L1BlockRef) {
l.Metr.RecordLatestL1Block(l1tip)
}
func (l *BatchSubmitter) recordFailedDARequest(id txID, err error) {
if err != nil {
l.Log.Warn("DA request failed", logFields(id, err)...)
}
l.state.TxFailed(id)
}
func (l *BatchSubmitter) recordFailedTx(id txID, err error) {
l.Log.Warn("Transaction failed to send", logFields(id, err)...)
l.state.TxFailed(id)
......
......@@ -39,6 +39,8 @@ type BatcherConfig struct {
// UseAltDA is true if the rollup config has a DA challenge address so the batcher
// will post inputs to the DA server and post commitments to blobs or calldata.
UseAltDA bool
// maximum number of concurrent blob put requests to the DA server
MaxConcurrentDARequests uint64
WaitNodeSync bool
CheckRecentTxsDepth int
......@@ -93,6 +95,7 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string,
bs.PollInterval = cfg.PollInterval
bs.MaxPendingTransactions = cfg.MaxPendingTransactions
bs.MaxConcurrentDARequests = cfg.AltDA.MaxConcurrentRequests
bs.NetworkTimeout = cfg.TxMgrConfig.NetworkTimeout
bs.CheckRecentTxsDepth = cfg.CheckRecentTxsDepth
bs.WaitNodeSync = cfg.WaitNodeSync
......
package transactions
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
func TransactionsBySender(block *types.Block, sender common.Address) (int64, error) {
txCount := int64(0)
for _, tx := range block.Transactions() {
signer := types.NewCancunSigner(tx.ChainId())
txSender, err := types.Sender(signer, tx)
if err != nil {
return 0, err
}
if txSender == sender {
txCount++
}
}
return txCount, nil
}
......@@ -39,6 +39,7 @@ import (
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
altda "github.com/ethereum-optimism/optimism/op-alt-da"
bss "github.com/ethereum-optimism/optimism/op-batcher/batcher"
batcherFlags "github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-chain-ops/genesis"
......@@ -168,13 +169,14 @@ func DefaultSystemConfig(t testing.TB) SystemConfig {
RoleSeq: testlog.Logger(t, log.LevelInfo).New("role", RoleSeq),
"batcher": testlog.Logger(t, log.LevelInfo).New("role", "batcher"),
"proposer": testlog.Logger(t, log.LevelInfo).New("role", "proposer"),
"da-server": testlog.Logger(t, log.LevelInfo).New("role", "da-server"),
},
GethOptions: map[string][]geth.GethOption{},
P2PTopology: nil, // no P2P connectivity by default
NonFinalizedProposals: false,
ExternalL2Shim: config.ExternalL2Shim,
DataAvailabilityType: batcherFlags.CalldataType,
MaxPendingTransactions: 1,
BatcherMaxPendingTransactions: 1,
BatcherTargetNumFrames: 1,
}
}
......@@ -298,12 +300,16 @@ type SystemConfig struct {
// If >0, limits the number of blocks per span batch
BatcherMaxBlocksPerSpanBatch int
// BatcherMaxPendingTransactions determines how many transactions the batcher will try to send
// concurrently. 0 means unlimited.
BatcherMaxPendingTransactions uint64
// BatcherMaxConcurrentDARequest determines how many DAserver requests the batcher is allowed to
// make concurrently. 0 means unlimited.
BatcherMaxConcurrentDARequest uint64
// SupportL1TimeTravel determines if the L1 node supports quickly skipping forward in time
SupportL1TimeTravel bool
// MaxPendingTransactions determines how many transactions the batcher will try to send
// concurrently. 0 means unlimited.
MaxPendingTransactions uint64
}
type System struct {
......@@ -319,6 +325,7 @@ type System struct {
L2OutputSubmitter *l2os.ProposerService
BatchSubmitter *bss.BatcherService
Mocknet mocknet.Mocknet
FakeAltDAServer *altda.FakeDAServer
L1BeaconAPIAddr endpoint.RestHTTP
......@@ -543,6 +550,16 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
}
}
var rollupAltDAConfig *rollup.AltDAConfig
if cfg.DeployConfig.UseAltDA {
rollupAltDAConfig = &rollup.AltDAConfig{
DAChallengeAddress: cfg.L1Deployments.DataAvailabilityChallengeProxy,
DAChallengeWindow: cfg.DeployConfig.DAChallengeWindow,
DAResolveWindow: cfg.DeployConfig.DAResolveWindow,
CommitmentType: altda.GenericCommitmentString,
}
}
makeRollupConfig := func() rollup.Config {
return rollup.Config{
Genesis: rollup.Genesis{
......@@ -574,6 +591,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
GraniteTime: cfg.DeployConfig.GraniteTime(uint64(cfg.DeployConfig.L1GenesisBlockTimestamp)),
InteropTime: cfg.DeployConfig.InteropTime(uint64(cfg.DeployConfig.L1GenesisBlockTimestamp)),
ProtocolVersionsAddress: cfg.L1Deployments.ProtocolVersionsProxy,
AltDAConfig: rollupAltDAConfig,
}
}
defaultConfig := makeRollupConfig()
......@@ -819,11 +837,27 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
compressionAlgo = derive.Brotli10
}
var batcherAltDACLIConfig altda.CLIConfig
if cfg.DeployConfig.UseAltDA {
fakeAltDAServer := altda.NewFakeDAServer("127.0.0.1", 0, sys.Cfg.Loggers["da-server"])
if err := fakeAltDAServer.Start(); err != nil {
return nil, fmt.Errorf("failed to start fake altDA server: %w", err)
}
sys.FakeAltDAServer = fakeAltDAServer
batcherAltDACLIConfig = altda.CLIConfig{
Enabled: cfg.DeployConfig.UseAltDA,
DAServerURL: fakeAltDAServer.HttpEndpoint(),
VerifyOnRead: true,
GenericDA: true,
MaxConcurrentRequests: cfg.BatcherMaxConcurrentDARequest,
}
}
batcherCLIConfig := &bss.CLIConfig{
L1EthRpc: sys.EthInstances[RoleL1].UserRPC().RPC(),
L2EthRpc: sys.EthInstances[RoleSeq].UserRPC().RPC(),
RollupRpc: sys.RollupNodes[RoleSeq].UserRPC().RPC(),
MaxPendingTransactions: cfg.MaxPendingTransactions,
MaxPendingTransactions: cfg.BatcherMaxPendingTransactions,
MaxChannelDuration: 1,
MaxL1TxSize: batcherMaxL1TxSizeBytes,
TestUseMaxTxSizeForBlobs: cfg.BatcherUseMaxTxSizeForBlobs,
......@@ -841,6 +875,7 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
MaxBlocksPerSpanBatch: cfg.BatcherMaxBlocksPerSpanBatch,
DataAvailabilityType: sys.Cfg.DataAvailabilityType,
CompressionAlgo: compressionAlgo,
AltDA: batcherAltDACLIConfig,
}
// Batch Submitter
batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.Cfg.Loggers["batcher"])
......
......@@ -14,6 +14,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
......@@ -1362,7 +1363,7 @@ func TestBatcherMultiTx(t *testing.T) {
InitParallel(t)
cfg := DefaultSystemConfig(t)
cfg.MaxPendingTransactions = 0 // no limit on parallel txs
cfg.BatcherMaxPendingTransactions = 0 // no limit on parallel txs
// ensures that batcher txs are as small as possible
cfg.BatcherMaxL1TxSizeBytes = derive.FrameV0OverHeadSize + 1 /*version bytes*/ + 1
cfg.DisableBatcher = true
......@@ -1375,7 +1376,7 @@ func TestBatcherMultiTx(t *testing.T) {
_, err = geth.WaitForBlock(big.NewInt(10), l2Seq, time.Duration(cfg.DeployConfig.L2BlockTime*15)*time.Second)
require.NoError(t, err, "Waiting for L2 blocks")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
l1Number, err := l1Client.BlockNumber(ctx)
require.NoError(t, err)
......@@ -1385,16 +1386,80 @@ func TestBatcherMultiTx(t *testing.T) {
err = driver.StartBatchSubmitting()
require.NoError(t, err)
totalTxCount := 0
// wait for up to 10 L1 blocks, usually only 3 is required, but it's
totalBatcherTxsCount := int64(0)
// wait for up to 5 L1 blocks, usually only 3 is required, but it's
// possible additional L1 blocks will be created before the batcher starts,
// so we wait additional blocks.
for i := int64(0); i < 10; i++ {
block, err := geth.WaitForBlock(big.NewInt(int64(l1Number)+i), l1Client, time.Duration(cfg.DeployConfig.L1BlockTime*5)*time.Second)
for i := int64(0); i < 5; i++ {
block, err := geth.WaitForBlock(big.NewInt(int64(l1Number)+i), l1Client, time.Duration(cfg.DeployConfig.L1BlockTime*2)*time.Second)
require.NoError(t, err, "Waiting for l1 blocks")
// there are possibly other services (proposer/challenger) in the background sending txs
// so we only count the batcher txs
batcherTxCount, err := transactions.TransactionsBySender(block, cfg.DeployConfig.BatchSenderAddress)
require.NoError(t, err)
totalBatcherTxsCount += int64(batcherTxCount)
if totalBatcherTxsCount >= 10 {
return
}
}
t.Fatal("Expected at least 10 transactions from the batcher")
}
func TestBatcherConcurrentAltDARequests(t *testing.T) {
InitParallel(t)
numL1TxsExpected := int64(10)
cfg := DefaultSystemConfig(t)
cfg.DeployConfig.UseAltDA = true
cfg.BatcherMaxPendingTransactions = 0 // no limit on parallel txs
// ensures that batcher txs are as small as possible
cfg.BatcherMaxL1TxSizeBytes = derive.FrameV0OverHeadSize + 1 /*version bytes*/ + 1
cfg.BatcherBatchType = 0
cfg.DataAvailabilityType = flags.CalldataType
cfg.BatcherMaxConcurrentDARequest = uint64(numL1TxsExpected)
// disable batcher because we start it manually below
cfg.DisableBatcher = true
sys, err := cfg.Start(t)
require.NoError(t, err, "Error starting up system")
defer sys.Close()
// make every request take 5 seconds, such that only concurrent requests will be able to make progress fast enough
sys.FakeAltDAServer.SetPutRequestLatency(5 * time.Second)
l1Client := sys.NodeClient("l1")
l2Seq := sys.NodeClient("sequencer")
// we wait for numL1TxsExpected L2 blocks to have been produced, just to make sure the sequencer is working properly
_, err = geth.WaitForBlock(big.NewInt(numL1TxsExpected), l2Seq, time.Duration(cfg.DeployConfig.L2BlockTime*uint64(numL1TxsExpected))*time.Second)
require.NoError(t, err, "Waiting for L2 blocks")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
startingL1BlockNum, err := l1Client.BlockNumber(ctx)
require.NoError(t, err)
// start batch submission
driver := sys.BatchSubmitter.TestDriver()
err = driver.StartBatchSubmitting()
require.NoError(t, err)
totalBatcherTxsCount := int64(0)
// wait for up to 5 L1 blocks, expecting 10 L2 batcher txs in them.
// usually only 3 is required, but it's possible additional L1 blocks will be created
// before the batcher starts, so we wait additional blocks.
for i := int64(0); i < 5; i++ {
block, err := geth.WaitForBlock(big.NewInt(int64(startingL1BlockNum)+i), l1Client, time.Duration(cfg.DeployConfig.L1BlockTime*2)*time.Second)
require.NoError(t, err, "Waiting for l1 blocks")
totalTxCount += len(block.Transactions())
// there are possibly other services (proposer/challenger) in the background sending txs
// so we only count the batcher txs
batcherTxCount, err := transactions.TransactionsBySender(block, cfg.DeployConfig.BatchSenderAddress)
require.NoError(t, err)
totalBatcherTxsCount += int64(batcherTxCount)
if totalTxCount >= 10 {
if totalBatcherTxsCount >= numL1TxsExpected {
return
}
}
......
......@@ -44,11 +44,11 @@ func NewQueue[T any](ctx context.Context, txMgr TxManager, maxPending uint64) *Q
}
// Wait waits for all pending txs to complete (or fail).
func (q *Queue[T]) Wait() {
func (q *Queue[T]) Wait() error {
if q.group == nil {
return
return nil
}
_ = q.group.Wait()
return q.group.Wait()
}
// Send will wait until the number of pending txs is below the max pending,
......
......@@ -222,7 +222,7 @@ func TestQueue_Send(t *testing.T) {
require.Equal(t, c.queued, queued, msg)
}
// wait for the queue to drain (all txs complete or failed)
queue.Wait()
_ = queue.Wait()
duration := time.Since(start)
// expect the execution time within a certain window
now := time.Now()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment