Commit a25acbbd authored by Mark Tyneway's avatar Mark Tyneway Committed by GitHub

Refactor the SyncService to more closely implement the specification (#552)

* l2geth: add Backend enums and config parsing

* l2geth: move OVMContext to types file

* l2geth: implement syncservice spec

* l2geth: fix error handling for get tx batch

* l2geth: update tests to compile and pass

* l2geth: add sync range functions

* l2geth: add batch index indexing

* l2geth: update syncservice hot path logging

* l2geth: use indexed batch index

* chore: add changeset

* l2geth: sync spec refactor (#822)

* l2geth: more in depth usage string

* l2geth: add standard client getters for index

* l2geth: refactor sync service into shared codepaths

* l2geth: clean up tests

* l2geth: better logging and error handling

* test: improve test coverage around timestamps

* l2geth: improve docstring

* l2geth: rename variable

* sync-service: better logline

* l2geth: better logline

* l2geth: test apply indexed transaction

* l2geth: better logline

* linting: fix

* syncservice: fix logline

* l2geth: add error and fix logline

* l2geth: sync service tests

* fix: get last confirmed enqueue (#846)

* l2geth: fix get last confirmed enqueue

* chore: add changeset

* client: return error correctly

* batch-submitter: updated config (#847)

* batch-submitter: backwards compatible configuration

* chore: add changeset

* deps: update

* js: move bcfg interface to core-utils

* batch-submitter: parse USE_SENTRY and add to env example

* chore: add changeset

* batch-submitter: parse as float instead of int

* batch-submitter: better error logging

* l2geth: update rawdb logline
Co-authored-by: default avatarGeorgios Konstantopoulos <me@gakonst.com>

* l2geth: more robust testing
Co-authored-by: default avatarGeorgios Konstantopoulos <me@gakonst.com>

* l2geth: add sanity check for L1ToL2 timestamps

* l2geth: handle error in single place

* l2geth: fix test tx queue origin

* l2geth: add new arg to start.sh

* l2geth: return error in the SyncService.Start()

* chore: go fmt
Co-authored-by: default avatarGeorgios Konstantopoulos <me@gakonst.com>
parent c3d39df8
---
"@eth-optimism/l2geth": patch
---
Refactor the SyncService to more closely implement the specification. This includes using query params to select the backend from the DTL, trailing syncing of batches for the sequencer, syncing by batches as the verifier as well as unified code paths for transaction ingestion to prevent double ingestion or missed ingestion
......@@ -165,6 +165,7 @@ var (
utils.RollupMaxCalldataSizeFlag,
utils.RollupDataPriceFlag,
utils.RollupExecutionPriceFlag,
utils.RollupBackendFlag,
utils.RollupEnableL2GasPollingFlag,
utils.RollupGasPriceOracleAddressFlag,
utils.RollupEnforceFeesFlag,
......
......@@ -80,6 +80,7 @@ var AppHelpFlagGroups = []flagGroup{
utils.RollupMaxCalldataSizeFlag,
utils.RollupDataPriceFlag,
utils.RollupExecutionPriceFlag,
utils.RollupBackendFlag,
utils.RollupEnableL2GasPollingFlag,
utils.RollupGasPriceOracleAddressFlag,
utils.RollupEnforceFeesFlag,
......
......@@ -851,6 +851,12 @@ var (
Value: time.Minute * 3,
EnvVar: "ROLLUP_TIMESTAMP_REFRESH",
}
RollupBackendFlag = cli.StringFlag{
Name: "rollup.backend",
Usage: "Sync backend for verifiers (\"l1\" or \"l2\"), defaults to l1",
Value: "l1",
EnvVar: "ROLLUP_BACKEND",
}
// Flag to enable verifier mode
RollupEnableVerifierFlag = cli.BoolFlag{
Name: "rollup.verifier",
......@@ -1188,6 +1194,15 @@ func setRollup(ctx *cli.Context, cfg *rollup.Config) {
if ctx.GlobalIsSet(RollupExecutionPriceFlag.Name) {
cfg.ExecutionPrice = GlobalBig(ctx, RollupExecutionPriceFlag.Name)
}
if ctx.GlobalIsSet(RollupBackendFlag.Name) {
val := ctx.GlobalString(RollupBackendFlag.Name)
backend, err := rollup.NewBackend(val)
if err != nil {
log.Error("Configured with unknown sync backend, defaulting to l1", "backend", val)
backend, _ = rollup.NewBackend("l1")
}
cfg.Backend = backend
}
if ctx.GlobalIsSet(RollupGasPriceOracleAddressFlag.Name) {
addr := ctx.GlobalString(RollupGasPriceOracleAddressFlag.Name)
cfg.GasPriceOracleAddress = common.HexToAddress(addr)
......
......@@ -69,3 +69,24 @@ func WriteHeadVerifiedIndex(db ethdb.KeyValueWriter, index uint64) {
log.Crit("Failed to store verifier index", "err", err)
}
}
// ReadHeadBatchIndex will read the known tip of the processed batches
func ReadHeadBatchIndex(db ethdb.KeyValueReader) *uint64 {
data, _ := db.Get(headBatchKey)
if len(data) == 0 {
return nil
}
ret := new(big.Int).SetBytes(data).Uint64()
return &ret
}
// WriteHeadBatchIndex will write the known tip of the processed batches
func WriteHeadBatchIndex(db ethdb.KeyValueWriter, index uint64) {
value := new(big.Int).SetUint64(index).Bytes()
if index == 0 {
value = []byte{0}
}
if err := db.Put(headBatchKey, value); err != nil {
log.Crit("Failed to store head batch index", "err", err)
}
}
......@@ -62,6 +62,8 @@ var (
headQueueIndexKey = []byte("LastQueueIndex")
// headVerifiedIndexKey tracks the latest verified index
headVerifiedIndexKey = []byte("LastVerifiedIndex")
// headBatchKey tracks the latest processed batch
headBatchKey = []byte("LastBatch")
preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage
configPrefix = []byte("ethereum-config-") // config prefix for the db
......
......@@ -330,7 +330,7 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction)
}
}
}
return b.eth.syncService.ApplyTransaction(signedTx)
return b.eth.syncService.ValidateAndApplySequencerTransaction(signedTx)
}
// OVM Disabled
return b.eth.txPool.AddLocal(signedTx)
......
......@@ -116,14 +116,17 @@ type decoded struct {
type RollupClient interface {
GetEnqueue(index uint64) (*types.Transaction, error)
GetLatestEnqueue() (*types.Transaction, error)
GetTransaction(uint64) (*types.Transaction, error)
GetLatestTransaction() (*types.Transaction, error)
GetLatestEnqueueIndex() (*uint64, error)
GetTransaction(uint64, Backend) (*types.Transaction, error)
GetLatestTransaction(Backend) (*types.Transaction, error)
GetLatestTransactionIndex(Backend) (*uint64, error)
GetEthContext(uint64) (*EthContext, error)
GetLatestEthContext() (*EthContext, error)
GetLastConfirmedEnqueue() (*types.Transaction, error)
GetLatestTransactionBatch() (*Batch, []*types.Transaction, error)
GetLatestTransactionBatchIndex() (*uint64, error)
GetTransactionBatch(uint64) (*Batch, []*types.Transaction, error)
SyncStatus() (*SyncStatus, error)
SyncStatus(Backend) (*SyncStatus, error)
GetL1GasPrice() (*big.Int, error)
}
......@@ -270,6 +273,43 @@ func (c *Client) GetLatestEnqueue() (*types.Transaction, error) {
return tx, nil
}
// GetLatestEnqueueIndex returns the latest `enqueue()` index
func (c *Client) GetLatestEnqueueIndex() (*uint64, error) {
tx, err := c.GetLatestEnqueue()
if err != nil {
return nil, err
}
index := tx.GetMeta().QueueIndex
if index == nil {
return nil, errors.New("Latest queue index is nil")
}
return index, nil
}
// GetLatestTransactionIndex returns the latest CTC index that has been batch
// submitted or not, depending on the backend
func (c *Client) GetLatestTransactionIndex(backend Backend) (*uint64, error) {
tx, err := c.GetLatestTransaction(backend)
if err != nil {
return nil, err
}
index := tx.GetMeta().Index
if index == nil {
return nil, errors.New("Latest index is nil")
}
return index, nil
}
// GetLatestTransactionBatchIndex returns the latest transaction batch index
func (c *Client) GetLatestTransactionBatchIndex() (*uint64, error) {
batch, _, err := c.GetLatestTransactionBatch()
if err != nil {
return nil, err
}
index := batch.Index
return &index, nil
}
// batchedTransactionToTransaction converts a transaction into a
// types.Transaction that can be consumed by the SyncService
func batchedTransactionToTransaction(res *transaction, signer *types.EIP155Signer) (*types.Transaction, error) {
......@@ -364,12 +404,15 @@ func batchedTransactionToTransaction(res *transaction, signer *types.EIP155Signe
}
// GetTransaction will get a transaction by Canonical Transaction Chain index
func (c *Client) GetTransaction(index uint64) (*types.Transaction, error) {
func (c *Client) GetTransaction(index uint64, backend Backend) (*types.Transaction, error) {
str := strconv.FormatUint(index, 10)
response, err := c.client.R().
SetPathParams(map[string]string{
"index": str,
}).
SetQueryParams(map[string]string{
"backend": backend.String(),
}).
SetResult(&TransactionResponse{}).
Get("/transaction/index/{index}")
......@@ -385,9 +428,12 @@ func (c *Client) GetTransaction(index uint64) (*types.Transaction, error) {
// GetLatestTransaction will get the latest transaction, meaning the transaction
// with the greatest Canonical Transaction Chain index
func (c *Client) GetLatestTransaction() (*types.Transaction, error) {
func (c *Client) GetLatestTransaction(backend Backend) (*types.Transaction, error) {
response, err := c.client.R().
SetResult(&TransactionResponse{}).
SetQueryParams(map[string]string{
"backend": backend.String(),
}).
Get("/transaction/latest")
if err != nil {
......@@ -477,9 +523,12 @@ func (c *Client) GetLastConfirmedEnqueue() (*types.Transaction, error) {
}
// SyncStatus will query the remote server to determine if it is still syncing
func (c *Client) SyncStatus() (*SyncStatus, error) {
func (c *Client) SyncStatus(backend Backend) (*SyncStatus, error) {
response, err := c.client.R().
SetResult(&SyncStatus{}).
SetQueryParams(map[string]string{
"backend": backend.String(),
}).
Get("/eth/syncing")
if err != nil {
......@@ -533,8 +582,8 @@ func (c *Client) GetTransactionBatch(index uint64) (*Batch, []*types.Transaction
// parseTransactionBatchResponse will turn a TransactionBatchResponse into a
// Batch and its corresponding types.Transactions
func parseTransactionBatchResponse(txBatch *TransactionBatchResponse, signer *types.EIP155Signer) (*Batch, []*types.Transaction, error) {
if txBatch == nil {
return nil, nil, nil
if txBatch == nil || txBatch.Batch == nil {
return nil, nil, errElementNotFound
}
batch := txBatch.Batch
txs := make([]*types.Transaction, len(txBatch.Transactions))
......
......@@ -40,6 +40,8 @@ type Config struct {
DataPrice *big.Int
// The gas price to use for L2 congestion costs
ExecutionPrice *big.Int
// Represents the source of the transactions that is being synced
Backend Backend
// Only accept transactions with fees
EnforceFees bool
}
......@@ -23,19 +23,17 @@ import (
"github.com/ethereum/go-ethereum/eth/gasprice"
)
// OVMContext represents the blocknumber and timestamp
// that exist during L2 execution
type OVMContext struct {
blockNumber uint64
timestamp uint64
}
// errShortRemoteTip is an error for when the remote tip is shorter than the
// local tip
var errShortRemoteTip = errors.New("Unexpected remote less than tip")
// L2GasPrice slot refers to the storage slot that the execution price is stored
// in the L2 predeploy contract, the GasPriceOracle
var l2GasPriceSlot = common.BigToHash(big.NewInt(1))
// SyncService implements the verifier functionality as well as the reorg
// protection for the sequencer.
// SyncService implements the main functionality around pulling in transactions
// and executing them. It can be configured to run in both sequencer mode and in
// verifier mode.
type SyncService struct {
ctx context.Context
cancel context.CancelFunc
......@@ -44,6 +42,7 @@ type SyncService struct {
scope event.SubscriptionScope
txFeed event.Feed
txLock sync.Mutex
loopLock sync.Mutex
enable bool
eth1ChainId uint64
bc *core.BlockChain
......@@ -51,10 +50,13 @@ type SyncService struct {
RollupGpo *gasprice.RollupOracle
client RollupClient
syncing atomic.Value
chainHeadSub event.Subscription
OVMContext OVMContext
confirmationDepth uint64
pollInterval time.Duration
timestampRefreshThreshold time.Duration
chainHeadCh chan core.ChainHeadEvent
backend Backend
gpoAddress common.Address
enableL2GasPolling bool
enforceFees bool
......@@ -70,9 +72,9 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co
_ = cancel // satisfy govet
if cfg.IsVerifier {
log.Info("Running in verifier mode")
log.Info("Running in verifier mode", "sync-backend", cfg.Backend.String())
} else {
log.Info("Running in sequencer mode")
log.Info("Running in sequencer mode", "sync-backend", cfg.Backend.String())
}
pollInterval := cfg.PollInterval
......@@ -104,16 +106,26 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co
syncing: atomic.Value{},
bc: bc,
txpool: txpool,
chainHeadCh: make(chan core.ChainHeadEvent, 1),
eth1ChainId: cfg.Eth1ChainId,
client: client,
db: db,
pollInterval: pollInterval,
timestampRefreshThreshold: timestampRefreshThreshold,
backend: cfg.Backend,
gpoAddress: cfg.GasPriceOracleAddress,
enableL2GasPolling: cfg.EnableL2GasPolling,
enforceFees: cfg.EnforceFees,
}
// The chainHeadSub is used to synchronize the SyncService with the chain.
// As the SyncService processes transactions, it waits until the transaction
// is added to the chain. This synchronization is required for handling
// reorgs and also favors safety over liveliness. If a transaction breaks
// things downstream, it is expected that this channel will halt ingestion
// of additional transactions by the SyncService.
service.chainHeadSub = service.bc.SubscribeChainHeadEvent(service.chainHeadCh)
// Initial sync service setup if it is enabled. This code depends on
// a remote server that indexes the layer one contracts. Place this
// code behind this if statement so that this can run without the
......@@ -126,9 +138,9 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co
return nil, fmt.Errorf("Rollup client unable to connect: %w", err)
}
// Ensure that the remote is still not syncing
// Wait until the remote service is done syncing
for {
status, err := service.client.SyncStatus()
status, err := service.client.SyncStatus(service.backend)
if err != nil {
log.Error("Cannot get sync status")
continue
......@@ -143,30 +155,26 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co
// Initialize the latest L1 data here to make sure that
// it happens before the RPC endpoints open up
// Only do it if the sync service is enabled so that this
// can be ran without needing to have a configured client.
// can be ran without needing to have a configured RollupClient.
err = service.initializeLatestL1(cfg.CanonicalTransactionChainDeployHeight)
if err != nil {
return nil, fmt.Errorf("Cannot initialize latest L1 data: %w", err)
}
// Log the OVMContext information on startup
bn := service.GetLatestL1BlockNumber()
ts := service.GetLatestL1Timestamp()
log.Info("Initialized Latest L1 Info", "blocknumber", bn, "timestamp", ts)
var i, q string
index := service.GetLatestIndex()
queueIndex := service.GetLatestEnqueueIndex()
if index == nil {
i = "<nil>"
} else {
i = strconv.FormatUint(*index, 10)
}
if queueIndex == nil {
q = "<nil>"
} else {
q = strconv.FormatUint(*queueIndex, 10)
verifiedIndex := service.GetLatestVerifiedIndex()
block := service.bc.CurrentBlock()
if block == nil {
block = types.NewBlock(&types.Header{}, nil, nil, nil)
}
log.Info("Initialized Eth Context", "index", i, "queue-index", q)
header := block.Header()
log.Info("Initial Rollup State", "state", header.Root.Hex(), "index", stringify(index), "queue-index", stringify(queueIndex), "verified-index", verifiedIndex)
// The sequencer needs to sync to the tip at start up
// By setting the sync status to true, it will prevent RPC calls.
......@@ -175,10 +183,11 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co
service.setSyncStatus(true)
}
}
return &service, nil
}
// ensureClient checks to make sure that the remote transaction source is
// available. It will return an error if it cannot connect via HTTP
func (s *SyncService) ensureClient() error {
_, err := s.client.GetLatestEthContext()
if err != nil {
......@@ -187,33 +196,29 @@ func (s *SyncService) ensureClient() error {
return nil
}
// Start initializes the service, connecting to Ethereum1 and starting the
// subservices required for the operation of the SyncService.
// txs through syncservice go to mempool.locals
// txs through rpc go to mempool.remote
// Start initializes the service
func (s *SyncService) Start() error {
if !s.enable {
log.Info("Running without syncing enabled")
return nil
}
log.Info("Initializing Sync Service", "eth1-chainid", s.eth1ChainId)
s.updateL2GasPrice(nil)
s.updateL1GasPrice()
// When a sequencer, be sure to sync to the tip of the ctc before allowing
// user transactions.
if !s.verifier {
err := s.syncTransactionsToTip()
if err != nil {
return fmt.Errorf("Cannot sync transactions to the tip: %w", err)
}
// TODO: This should also sync the enqueue'd transactions that have not
// been synced yet
s.setSyncStatus(false)
}
if s.verifier {
go s.VerifierLoop()
} else {
// The sequencer must sync the transactions to the tip and the
// pending queue transactions on start before setting sync status
// to false and opening up the RPC to accept transactions.
if err := s.syncTransactionsToTip(); err != nil {
return fmt.Errorf("Sequencer cannot sync transactions to tip: %w", err)
}
if err := s.syncQueueToTip(); err != nil {
return fmt.Errorf("Sequencer cannot sync queue to tip: %w", err)
}
s.setSyncStatus(false)
go s.SequencerLoop()
}
return nil
......@@ -230,6 +235,7 @@ func (s *SyncService) initializeLatestL1(ctcDeployHeight *big.Int) error {
if ctcDeployHeight == nil {
return errors.New("Must configure with canonical transaction chain deploy height")
}
log.Info("Initializing initial OVM Context", "ctc-deploy-height", ctcDeployHeight.Uint64())
context, err := s.client.GetEthContext(ctcDeployHeight.Uint64())
if err != nil {
return fmt.Errorf("Cannot fetch ctc deploy block at height %d: %w", ctcDeployHeight.Uint64(), err)
......@@ -243,7 +249,7 @@ func (s *SyncService) initializeLatestL1(ctcDeployHeight *big.Int) error {
block = s.bc.CurrentBlock()
idx := block.Number().Uint64()
if idx > *index {
// This is recoverable with a reorg
// This is recoverable with a reorg but should never happen
return fmt.Errorf("Current block height greater than index")
}
s.SetLatestIndex(&idx)
......@@ -257,24 +263,21 @@ func (s *SyncService) initializeLatestL1(ctcDeployHeight *big.Int) error {
s.SetLatestL1Timestamp(tx.L1Timestamp())
s.SetLatestL1BlockNumber(tx.L1BlockNumber().Uint64())
}
// Only the sequencer cares about latest queue index
if !s.verifier {
queueIndex := s.GetLatestEnqueueIndex()
if queueIndex == nil {
enqueue, err := s.client.GetLastConfirmedEnqueue()
// There are no enqueues yet
if errors.Is(err, errElementNotFound) {
return nil
}
// Other unexpected error
if err != nil {
return fmt.Errorf("Cannot fetch last confirmed queue tx: %w", err)
}
// No error, the queue element was found
queueIndex = enqueue.GetMeta().QueueIndex
queueIndex := s.GetLatestEnqueueIndex()
if queueIndex == nil {
enqueue, err := s.client.GetLastConfirmedEnqueue()
// There are no enqueues yet
if errors.Is(err, errElementNotFound) {
return nil
}
s.SetLatestEnqueueIndex(queueIndex)
// Other unexpected error
if err != nil {
return fmt.Errorf("Cannot fetch last confirmed queue tx: %w", err)
}
// No error, the queue element was found
queueIndex = enqueue.GetMeta().QueueIndex
}
s.SetLatestEnqueueIndex(queueIndex)
return nil
}
......@@ -301,6 +304,8 @@ func (s *SyncService) IsSyncing() bool {
// started by this service.
func (s *SyncService) Stop() error {
s.scope.Close()
s.chainHeadSub.Unsubscribe()
close(s.chainHeadCh)
if s.cancel != nil {
defer s.cancel()
......@@ -308,6 +313,7 @@ func (s *SyncService) Stop() error {
return nil
}
// VerifierLoop is the main loop for Verifier mode
func (s *SyncService) VerifierLoop() {
log.Info("Starting Verifier Loop", "poll-interval", s.pollInterval, "timestamp-refresh-threshold", s.timestampRefreshThreshold)
for {
......@@ -324,43 +330,24 @@ func (s *SyncService) VerifierLoop() {
}
}
// verify is the main logic for the Verifier. The verifier logic is different
// depending on the Backend
func (s *SyncService) verify() error {
// The verifier polls for ctc transactions.
// the ctc transactions are extending the chain.
latest, err := s.client.GetLatestTransaction()
if errors.Is(err, errElementNotFound) {
log.Debug("latest transaction not found")
return nil
}
if err != nil {
return err
}
var start uint64
if s.GetLatestIndex() == nil {
start = 0
} else {
start = *s.GetLatestIndex() + 1
}
end := *latest.GetMeta().Index
log.Info("Polling transactions", "start", start, "end", end)
for i := start; i <= end; i++ {
tx, err := s.client.GetTransaction(i)
if err != nil {
return fmt.Errorf("cannot get tx in loop: %w", err)
switch s.backend {
case BackendL1:
if err := s.syncBatchesToTip(); err != nil {
return fmt.Errorf("Verifier cannot sync transaction batches to tip: %w", err)
}
log.Debug("Applying transaction", "index", i)
err = s.maybeApplyTransaction(tx)
if err != nil {
return fmt.Errorf("could not apply transaction: %w", err)
case BackendL2:
if err := s.syncTransactionsToTip(); err != nil {
return fmt.Errorf("Verifier cannot sync transactions with BackendL2: %w", err)
}
s.SetLatestIndex(&i)
}
return nil
}
// SequencerLoop is the polling loop that runs in sequencer mode. It sequences
// transactions and then updates the EthContext.
func (s *SyncService) SequencerLoop() {
log.Info("Starting Sequencer Loop", "poll-interval", s.pollInterval, "timestamp-refresh-threshold", s.timestampRefreshThreshold)
for {
......@@ -368,8 +355,7 @@ func (s *SyncService) SequencerLoop() {
log.Error("Cannot update L1 gas price", "msg", err)
}
s.txLock.Lock()
err := s.sequence()
if err != nil {
if err := s.sequence(); err != nil {
log.Error("Could not sequence", "error", err)
}
s.txLock.Unlock()
......@@ -377,86 +363,53 @@ func (s *SyncService) SequencerLoop() {
if err := s.updateL2GasPrice(nil); err != nil {
log.Error("Cannot update L2 gas price", "msg", err)
}
if s.updateContext() != nil {
if err := s.updateContext(); err != nil {
log.Error("Could not update execution context", "error", err)
}
time.Sleep(s.pollInterval)
}
}
// sequence is the main logic for the Sequencer. It will sync any `enqueue`
// transactions it has yet to sync and then pull in transaction batches to
// compare against the transactions it has in its local state. The sequencer
// should reorg based on the transaction batches that are posted because
// L1 is the source of truth. The sequencer concurrently accepts user
// transactions via the RPC.
func (s *SyncService) sequence() error {
// Only the sequencer needs to poll for enqueue transactions
// and then can choose when to apply them. We choose to apply
// transactions such that it makes for efficient batch submitting.
// Place as many L1ToL2 transactions in the same context as possible
// by executing them one after another.
latest, err := s.client.GetLatestEnqueue()
if errors.Is(err, errElementNotFound) {
log.Debug("No enqueue transactions found")
return nil
if err := s.syncQueueToTip(); err != nil {
return fmt.Errorf("Sequencer cannot sequence queue: %w", err)
}
if err != nil {
return fmt.Errorf("cannot fetch latest enqueue: %w", err)
}
// Compare the remote latest queue index to the local latest
// queue index. If the remote latest queue index is greater
// than the local latest queue index, be sure to ingest more
// enqueued transactions
var start uint64
if s.GetLatestEnqueueIndex() == nil {
start = 0
} else {
start = *s.GetLatestEnqueueIndex() + 1
if err := s.syncBatchesToTip(); err != nil {
return fmt.Errorf("Sequencer cannot sync transaction batches: %w", err)
}
end := *latest.GetMeta().QueueIndex
log.Info("Polling enqueued transactions", "start", start, "end", end)
for i := start; i <= end; i++ {
enqueue, err := s.client.GetEnqueue(i)
if err != nil {
return fmt.Errorf("Cannot get enqueue in loop %d: %w", i, err)
}
if enqueue == nil {
log.Debug("No enqueue transaction found")
return nil
}
// This should never happen
if enqueue.L1BlockNumber() == nil {
return fmt.Errorf("No blocknumber for enqueue idx %d, timestamp %d, blocknumber %d", i, enqueue.L1Timestamp(), enqueue.L1BlockNumber())
}
// Update the timestamp and blocknumber based on the enqueued
// transactions
if enqueue.L1Timestamp() > s.GetLatestL1Timestamp() {
ts := enqueue.L1Timestamp()
bn := enqueue.L1BlockNumber().Uint64()
s.SetLatestL1Timestamp(ts)
s.SetLatestL1BlockNumber(bn)
log.Info("Updated Eth Context from enqueue", "index", i, "timestamp", ts, "blocknumber", bn)
}
return nil
}
log.Debug("Applying enqueue transaction", "index", i)
err = s.applyTransaction(enqueue)
if err != nil {
return fmt.Errorf("could not apply transaction: %w", err)
}
func (s *SyncService) syncQueueToTip() error {
if err := s.syncToTip(s.syncQueue, s.client.GetLatestEnqueueIndex); err != nil {
return fmt.Errorf("Cannot sync queue to tip: %w", err)
}
return nil
}
s.SetLatestEnqueueIndex(enqueue.GetMeta().QueueIndex)
if enqueue.GetMeta().Index == nil {
latest := s.GetLatestIndex()
index := uint64(0)
if latest != nil {
index = *latest + 1
}
s.SetLatestIndex(&index)
} else {
s.SetLatestIndex(enqueue.GetMeta().Index)
}
func (s *SyncService) syncBatchesToTip() error {
if err := s.syncToTip(s.syncBatches, s.client.GetLatestTransactionBatchIndex); err != nil {
return fmt.Errorf("Cannot sync transaction batches to tip: %w", err)
}
return nil
}
func (s *SyncService) syncTransactionsToTip() error {
sync := func() (*uint64, error) {
return s.syncTransactions(s.backend)
}
check := func() (*uint64, error) {
return s.client.GetLatestTransactionIndex(s.backend)
}
if err := s.syncToTip(sync, check); err != nil {
return fmt.Errorf("Verifier cannot sync transactions with backend %s: %w", s.backend.String(), err)
}
return nil
}
......@@ -466,7 +419,7 @@ func (s *SyncService) sequence() error {
func (s *SyncService) updateL1GasPrice() error {
l1GasPrice, err := s.client.GetL1GasPrice()
if err != nil {
return err
return fmt.Errorf("cannot fetch L1 gas price: %w", err)
}
s.RollupGpo.SetL1GasPrice(l1GasPrice)
return nil
......@@ -509,7 +462,6 @@ func (s *SyncService) updateContext() error {
if err != nil {
return err
}
current := time.Unix(int64(s.GetLatestL1Timestamp()), 0)
next := time.Unix(int64(context.Timestamp), 0)
if next.Sub(current) > s.timestampRefreshThreshold {
......@@ -517,81 +469,9 @@ func (s *SyncService) updateContext() error {
s.SetLatestL1BlockNumber(context.BlockNumber)
s.SetLatestL1Timestamp(context.Timestamp)
}
return nil
}
// This function must sync all the way to the tip
// TODO: it should then sync all of the enqueue transactions
func (s *SyncService) syncTransactionsToTip() error {
// Then set up a while loop that only breaks when the latest
// transaction does not change through two runs of the loop.
// The latest transaction can change during the timeframe of
// all of the transactions being sync'd.
for {
// This function must be sure to sync all the way to the tip.
// First query the latest transaction
latest, err := s.client.GetLatestTransaction()
if errors.Is(err, errElementNotFound) {
log.Info("No transactions to sync")
return nil
}
if err != nil {
log.Error("Cannot get latest transaction", "msg", err)
time.Sleep(time.Second * 2)
continue
}
tipHeight := latest.GetMeta().Index
index := rawdb.ReadHeadIndex(s.db)
start := uint64(0)
if index != nil {
start = *index + 1
}
log.Info("Syncing transactions to tip", "start", start, "end", *tipHeight)
for i := start; i <= *tipHeight; i++ {
tx, err := s.client.GetTransaction(i)
if err != nil {
log.Error("Cannot get transaction", "index", i, "msg", err)
time.Sleep(time.Second * 2)
continue
}
// The transaction does not yet exist in the ctc
if tx == nil {
index := latest.GetMeta().Index
if index == nil {
return fmt.Errorf("Unexpected nil index")
}
return fmt.Errorf("Transaction %d not found when %d is latest", i, *index)
}
err = s.maybeApplyTransaction(tx)
if err != nil {
return fmt.Errorf("Cannot apply transaction: %w", err)
}
if err != nil {
log.Error("Cannot ingest transaction", "index", i)
}
s.SetLatestIndex(tx.GetMeta().Index)
if types.QueueOrigin(tx.QueueOrigin().Uint64()) == types.QueueOriginL1ToL2 {
queueIndex := tx.GetMeta().QueueIndex
s.SetLatestEnqueueIndex(queueIndex)
}
}
// Be sure to check that no transactions came in while
// the above loop was running
post, err := s.client.GetLatestTransaction()
if err != nil {
return fmt.Errorf("Cannot get latest transaction: %w", err)
}
// These transactions should always have an index since they
// are already in the ctc.
if *latest.GetMeta().Index == *post.GetMeta().Index {
log.Info("Done syncing transactions to tip")
return nil
}
}
}
// Methods for safely accessing and storing the latest
// L1 blocknumber and timestamp. These are held in memory.
......@@ -680,83 +560,169 @@ func (s *SyncService) SetLatestVerifiedIndex(index *uint64) {
}
}
// reorganize will reorganize to directly to the index passed in.
// The caller must handle the offset relative to the ctc.
func (s *SyncService) reorganize(index uint64) error {
if index == 0 {
return nil
}
err := s.bc.SetHead(index)
if err != nil {
return fmt.Errorf("Cannot reorganize in syncservice: %w", err)
}
// GetLatestBatchIndex reads the last processed transaction batch
func (s *SyncService) GetLatestBatchIndex() *uint64 {
return rawdb.ReadHeadBatchIndex(s.db)
}
// TODO: make sure no off by one error here
s.SetLatestIndex(&index)
// GetNextBatchIndex reads the index of the next transaction batch to process
func (s *SyncService) GetNextBatchIndex() uint64 {
index := s.GetLatestBatchIndex()
if index == nil {
return 0
}
return *index + 1
}
// When in sequencer mode, be sure to roll back the latest queue
// index as well.
if !s.verifier {
enqueue, err := s.client.GetLastConfirmedEnqueue()
if err != nil {
return fmt.Errorf("cannot reorganize: %w", err)
}
s.SetLatestEnqueueIndex(enqueue.GetMeta().QueueIndex)
// SetLatestBatchIndex writes the last index of the transaction batch that was processed
func (s *SyncService) SetLatestBatchIndex(index *uint64) {
if index != nil {
rawdb.WriteHeadBatchIndex(s.db, *index)
}
log.Info("Reorganizing", "height", index)
return nil
}
// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
func (s *SyncService) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return s.scope.Track(s.txFeed.Subscribe(ch))
// applyTransaction is a higher level API for applying a transaction
func (s *SyncService) applyTransaction(tx *types.Transaction) error {
if tx.GetMeta().Index != nil {
return s.applyIndexedTransaction(tx)
}
return s.applyTransactionToTip(tx)
}
// maybeApplyTransaction will potentially apply the transaction after first
// inspecting the local database. This is mean to prevent transactions from
// being replayed.
func (s *SyncService) maybeApplyTransaction(tx *types.Transaction) error {
// applyIndexedTransaction applys a transaction that has an index. This means
// that the source of the transaction was either a L1 batch or from the
// sequencer.
func (s *SyncService) applyIndexedTransaction(tx *types.Transaction) error {
if tx == nil {
return fmt.Errorf("nil transaction passed to maybeApplyTransaction")
return errors.New("Transaction is nil in applyIndexedTransaction")
}
index := tx.GetMeta().Index
if index == nil {
return errors.New("No index found in applyIndexedTransaction")
}
log.Trace("Applying indexed transaction", "index", *index)
next := s.GetNextIndex()
if *index == next {
return s.applyTransactionToTip(tx)
}
if *index < next {
return s.applyHistoricalTransaction(tx)
}
return fmt.Errorf("Received tx at index %d when looking for %d", *index, next)
}
log.Debug("Maybe applying transaction", "hash", tx.Hash().Hex())
// applyHistoricalTransaction will compare a historical transaction against what
// is locally indexed. This will trigger a reorg in the future
func (s *SyncService) applyHistoricalTransaction(tx *types.Transaction) error {
if tx == nil {
return errors.New("Transaction is nil in applyHistoricalTransaction")
}
index := tx.GetMeta().Index
if index == nil {
return fmt.Errorf("nil index in maybeApplyTransaction")
return errors.New("No index is found in applyHistoricalTransaction")
}
// Handle off by one
// Handle the off by one
block := s.bc.GetBlockByNumber(*index + 1)
// The transaction has yet to be played, so it is safe to apply
if block == nil {
err := s.applyTransaction(tx)
if err != nil {
return fmt.Errorf("Maybe apply transaction failed on index %d: %w", *index, err)
}
return nil
return fmt.Errorf("Block %d is not found", *index+1)
}
// There is already a transaction at that index, so check
// for its equality.
txs := block.Transactions()
if len(txs) != 1 {
log.Info("block", "txs", len(txs), "number", block.Number().Uint64())
return fmt.Errorf("More than 1 transaction in block")
return fmt.Errorf("More than one transaction found in block %d", *index+1)
}
if isCtcTxEqual(tx, txs[0]) {
log.Info("Matching transaction found", "index", *index)
if !isCtcTxEqual(tx, txs[0]) {
log.Error("Mismatched transaction", "index", *index)
} else {
log.Warn("Non matching transaction found", "index", *index)
log.Debug("Historical transaction matches", "index", *index, "hash", tx.Hash().Hex())
}
return nil
}
// Lower level API used to apply a transaction, must only be used with
// transactions that came from L1.
func (s *SyncService) applyTransaction(tx *types.Transaction) error {
// applyTransactionToTip will do sanity checks on the transaction before
// applying it to the tip. It blocks until the transaction has been included in
// the chain. It is assumed that validation around the index has already
// happened.
func (s *SyncService) applyTransactionToTip(tx *types.Transaction) error {
if tx == nil {
return errors.New("nil transaction passed to applyTransactionToTip")
}
// Queue Origin L1 to L2 transactions must have a timestamp that is set by
// the L1 block that holds the transaction. This should never happen but is
// a sanity check to prevent fraudulent execution.
if tx.QueueOrigin().Uint64() == uint64(types.QueueOriginL1ToL2) {
if tx.L1Timestamp() == 0 {
return fmt.Errorf("Queue origin L1 to L2 transaction without a timestamp: %s", tx.Hash().Hex())
}
}
// If there is no OVM timestamp assigned to the transaction, then assign a
// timestamp and blocknumber to it. This should only be the case for queue
// origin sequencer transactions that come in via RPC. The L1 to L2
// transactions that come in via `enqueue` should have a timestamp set based
// on the L1 block that it was included in.
// Note that Ethereum Layer one consensus rules dictate that the timestamp
// must be strictly increasing between blocks, so no need to check both the
// timestamp and the blocknumber.
if tx.L1Timestamp() == 0 {
ts := s.GetLatestL1Timestamp()
bn := s.GetLatestL1BlockNumber()
tx.SetL1Timestamp(ts)
tx.SetL1BlockNumber(bn)
} else if tx.L1Timestamp() > s.GetLatestL1Timestamp() {
// If the timestamp of the transaction is greater than the sync
// service's locally maintained timestamp, update the timestamp and
// blocknumber to equal that of the transaction's. This should happen
// with `enqueue` transactions.
ts := tx.L1Timestamp()
bn := tx.L1BlockNumber()
s.SetLatestL1Timestamp(ts)
s.SetLatestL1BlockNumber(bn.Uint64())
log.Debug("Updating OVM context based on new transaction", "timestamp", ts, "blocknumber", bn.Uint64(), "queue-origin", tx.QueueOrigin().Uint64())
} else if tx.L1Timestamp() < s.GetLatestL1Timestamp() {
log.Error("Timestamp monotonicity violation", "hash", tx.Hash().Hex())
}
if tx.GetMeta().Index == nil {
index := s.GetLatestIndex()
if index == nil {
tx.SetIndex(0)
} else {
tx.SetIndex(*index + 1)
}
}
s.SetLatestIndex(tx.GetMeta().Index)
if tx.GetMeta().QueueIndex != nil {
s.SetLatestEnqueueIndex(tx.GetMeta().QueueIndex)
}
// The index was set above so it is safe to dereference
log.Debug("Applying transaction to tip", "index", *tx.GetMeta().Index, "hash", tx.Hash().Hex())
txs := types.Transactions{tx}
s.txFeed.Send(core.NewTxsEvent{Txs: txs})
// Block until the transaction has been added to the chain
log.Trace("Waiting for transaction to be added to chain", "hash", tx.Hash().Hex())
<-s.chainHeadCh
return nil
}
// applyBatchedTransaction applies transactions that were batched to layer one.
// The sequencer checks for batches over time to make sure that it does not
// deviate from the L1 state and this is the main method of transaction
// ingestion for the verifier.
func (s *SyncService) applyBatchedTransaction(tx *types.Transaction) error {
if tx == nil {
return errors.New("nil transaction passed into applyBatchedTransaction")
}
index := tx.GetMeta().Index
if index == nil {
return errors.New("No index found on transaction")
}
log.Trace("Applying batched transaction", "index", *index)
err := s.applyIndexedTransaction(tx)
if err != nil {
return fmt.Errorf("Cannot apply batched transaction: %w", err)
}
s.SetLatestVerifiedIndex(index)
return nil
}
......@@ -800,20 +766,21 @@ func (s *SyncService) verifyFee(tx *types.Transaction) error {
// Higher level API for applying transactions. Should only be called for
// queue origin sequencer transactions, as the contracts on L1 manage the same
// validity checks that are done here.
func (s *SyncService) ApplyTransaction(tx *types.Transaction) error {
func (s *SyncService) ValidateAndApplySequencerTransaction(tx *types.Transaction) error {
if s.verifier {
return errors.New("Verifier does not accept transactions out of band")
}
if tx == nil {
return fmt.Errorf("nil transaction passed to ApplyTransaction")
return errors.New("nil transaction passed to ValidateAndApplySequencerTransaction")
}
if err := s.verifyFee(tx); err != nil {
return err
}
log.Debug("Sending transaction to sync service", "hash", tx.Hash().Hex())
s.txLock.Lock()
defer s.txLock.Unlock()
if s.verifier {
return errors.New("Verifier does not accept transactions out of band")
}
log.Trace("Sequencer transaction validation", "hash", tx.Hash().Hex())
qo := tx.QueueOrigin()
if qo == nil {
return errors.New("invalid transaction with no queue origin")
......@@ -825,14 +792,223 @@ func (s *SyncService) ApplyTransaction(tx *types.Transaction) error {
if err != nil {
return fmt.Errorf("invalid transaction: %w", err)
}
return s.applyTransaction(tx)
}
if tx.L1Timestamp() == 0 {
ts := s.GetLatestL1Timestamp()
bn := s.GetLatestL1BlockNumber()
tx.SetL1Timestamp(ts)
tx.SetL1BlockNumber(bn)
// syncer represents a function that can sync remote items and then returns the
// index that it synced to as well as an error if it encountered one. It has
// side effects on the state and its functionality depends on the current state
type syncer func() (*uint64, error)
// rangeSyncer represents a function that syncs a range of items between its two
// arguments (inclusive)
type rangeSyncer func(uint64, uint64) error
// nextGetter is a type that represents a function that will return the next
// index
type nextGetter func() uint64
// indexGetter is a type that represents a function that returns an index and an
// error if there is a problem fetching the index. The different types of
// indices are canonical transaction chain indices, queue indices and batch
// indices. It does not induce side effects on state
type indexGetter func() (*uint64, error)
// isAtTip is a function that will determine if the local chain is at the tip
// of the remote datasource
func (s *SyncService) isAtTip(index *uint64, get indexGetter) (bool, error) {
latest, err := get()
if errors.Is(err, errElementNotFound) {
if index == nil {
return true, nil
}
return false, nil
}
return s.applyTransaction(tx)
if err != nil {
return false, err
}
// There are no known enqueue transactions locally or remotely
if latest == nil && index == nil {
return true, nil
}
// Only one of the transactions are nil due to the check above so they
// cannot be equal
if latest == nil || index == nil {
return false, nil
}
// The indices are equal
if *latest == *index {
return true, nil
}
// The local tip is greater than the remote tip. This should never happen
if *latest < *index {
return false, fmt.Errorf("is at tip mismatch: remote (%d) - local (%d): %w", *latest, *index, errShortRemoteTip)
}
// The indices are not equal
return false, nil
}
// syncToTip is a function that can be used to sync to the tip of an ordered
// list of things. It is used to sync transactions, enqueue elements and batches
func (s *SyncService) syncToTip(sync syncer, getTip indexGetter) error {
s.loopLock.Lock()
defer s.loopLock.Unlock()
for {
index, err := sync()
if errors.Is(err, errElementNotFound) {
return nil
}
if err != nil {
return err
}
isAtTip, err := s.isAtTip(index, getTip)
if err != nil {
return err
}
if isAtTip {
return nil
}
}
}
// sync will sync a range of items
func (s *SyncService) sync(getLatest indexGetter, getNext nextGetter, syncer rangeSyncer) (*uint64, error) {
latestIndex, err := getLatest()
if err != nil {
return nil, fmt.Errorf("Cannot sync: %w", err)
}
if latestIndex == nil {
return nil, errors.New("Latest index is not defined")
}
nextIndex := getNext()
if nextIndex == *latestIndex+1 {
return latestIndex, nil
}
if err := syncer(nextIndex, *latestIndex); err != nil {
return nil, err
}
return latestIndex, nil
}
// syncBatches will sync a range of batches from the current known tip to the
// remote tip.
func (s *SyncService) syncBatches() (*uint64, error) {
index, err := s.sync(s.client.GetLatestTransactionBatchIndex, s.GetNextBatchIndex, s.syncTransactionBatchRange)
if err != nil {
return nil, fmt.Errorf("Cannot sync batches: %w", err)
}
return index, nil
}
// syncTransactionBatchRange will sync a range of batched transactions from
// start to end (inclusive)
func (s *SyncService) syncTransactionBatchRange(start, end uint64) error {
log.Info("Syncing transaction batch range", "start", start, "end", end)
for i := start; i <= end; i++ {
log.Debug("Fetching transaction batch", "index", i)
_, txs, err := s.client.GetTransactionBatch(i)
if err != nil {
return fmt.Errorf("Cannot get transaction batch: %w", err)
}
for _, tx := range txs {
if err := s.applyBatchedTransaction(tx); err != nil {
return fmt.Errorf("cannot apply batched transaction: %w", err)
}
}
s.SetLatestBatchIndex(&i)
}
return nil
}
// syncQueue will sync from the local tip to the known tip of the remote
// enqueue transaction feed.
func (s *SyncService) syncQueue() (*uint64, error) {
index, err := s.sync(s.client.GetLatestEnqueueIndex, s.GetNextEnqueueIndex, s.syncQueueTransactionRange)
if err != nil {
return nil, fmt.Errorf("Cannot sync queue: %w", err)
}
return index, nil
}
// syncQueueTransactionRange will apply a range of queue transactions from
// start to end (inclusive)
func (s *SyncService) syncQueueTransactionRange(start, end uint64) error {
log.Info("Syncing enqueue transactions range", "start", start, "end", end)
for i := start; i <= end; i++ {
tx, err := s.client.GetEnqueue(i)
if err != nil {
return fmt.Errorf("Canot get enqueue transaction; %w", err)
}
if err := s.applyTransaction(tx); err != nil {
return fmt.Errorf("Cannot apply transaction: %w", err)
}
}
return nil
}
// syncTransactions will sync transactions to the remote tip based on the
// backend
func (s *SyncService) syncTransactions(backend Backend) (*uint64, error) {
getLatest := func() (*uint64, error) {
return s.client.GetLatestTransactionIndex(backend)
}
sync := func(start, end uint64) error {
return s.syncTransactionRange(start, end, backend)
}
index, err := s.sync(getLatest, s.GetNextIndex, sync)
if err != nil {
return nil, fmt.Errorf("Cannot sync transactions with backend %s: %w", backend.String(), err)
}
return index, nil
}
// syncTransactionRange will sync a range of transactions from
// start to end (inclusive) from a specific Backend
func (s *SyncService) syncTransactionRange(start, end uint64, backend Backend) error {
log.Info("Syncing transaction range", "start", start, "end", end, "backend", backend.String())
for i := start; i <= end; i++ {
tx, err := s.client.GetTransaction(i, backend)
if err != nil {
return fmt.Errorf("cannot fetch transaction %d: %w", i, err)
}
if err = s.applyTransaction(tx); err != nil {
return fmt.Errorf("Cannot apply transaction: %w", err)
}
}
return nil
}
// updateEthContext will update the OVM execution context's
// timestamp and blocknumber if enough time has passed since
// it was last updated. This is a sequencer only function.
func (s *SyncService) updateEthContext() error {
context, err := s.client.GetLatestEthContext()
if err != nil {
return fmt.Errorf("Cannot get eth context: %w", err)
}
current := time.Unix(int64(s.GetLatestL1Timestamp()), 0)
next := time.Unix(int64(context.Timestamp), 0)
if next.Sub(current) > s.timestampRefreshThreshold {
log.Info("Updating Eth Context", "timetamp", context.Timestamp, "blocknumber", context.BlockNumber)
s.SetLatestL1BlockNumber(context.BlockNumber)
s.SetLatestL1Timestamp(context.Timestamp)
}
return nil
}
// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
func (s *SyncService) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return s.scope.Track(s.txFeed.Subscribe(ch))
}
func stringify(i *uint64) string {
if i == nil {
return "<nil>"
}
return strconv.FormatUint(*i, 10)
}
// IngestTransaction should only be called by trusted parties as it skips all
......
......@@ -2,11 +2,14 @@ package rollup
import (
"context"
"crypto/rand"
"errors"
"fmt"
"math/big"
"reflect"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
......@@ -111,7 +114,7 @@ func TestSyncServiceTransactionEnqueued(t *testing.T) {
// The queue index of the L1 to L2 transaction
queueIndex := uint64(0)
// The index in the ctc
index := uint64(5)
index := uint64(0)
tx := types.NewTransaction(0, target, big.NewInt(0), gasLimit, big.NewInt(0), data)
txMeta := types.NewTransactionMeta(
......@@ -133,14 +136,16 @@ func TestSyncServiceTransactionEnqueued(t *testing.T) {
})
// Run an iteration of the eloop
err = service.sequence()
if err != nil {
t.Fatal("sequencing failed", err)
}
err = nil
go func() {
err = service.syncQueueToTip()
}()
// Wait for the tx to be confirmed into the chain and then
// make sure it is the transactions that was set up with in the mockclient
event := <-txCh
if err != nil {
t.Fatal("sequencing failed", err)
}
if len(event.Txs) != 1 {
t.Fatal("Unexpected number of transactions")
}
......@@ -151,6 +156,332 @@ func TestSyncServiceTransactionEnqueued(t *testing.T) {
}
}
func TestTransactionToTipNoIndex(t *testing.T) {
service, txCh, _, err := newTestSyncService(false)
if err != nil {
t.Fatal(err)
}
// Get a reference to the current next index to compare with the index that
// is set to the transaction that is ingested
nextIndex := service.GetNextIndex()
timestamp := uint64(24)
target := common.HexToAddress("0x04668ec2f57cc15c381b461b9fedab5d451c8f7f")
l1TxOrigin := common.HexToAddress("0xEA674fdDe714fd979de3EdF0F56AA9716B898ec8")
gasLimit := uint64(66)
data := []byte{0x02, 0x92}
l1BlockNumber := big.NewInt(100)
tx := types.NewTransaction(0, target, big.NewInt(0), gasLimit, big.NewInt(0), data)
meta := types.NewTransactionMeta(
l1BlockNumber,
timestamp,
&l1TxOrigin,
types.SighashEIP155,
types.QueueOriginL1ToL2,
nil, // The index is `nil`, expect it to be set afterwards
nil,
nil,
)
tx.SetTransactionMeta(meta)
go func() {
err = service.applyTransactionToTip(tx)
}()
event := <-txCh
if err != nil {
t.Fatal("Cannot apply transaction to the tip")
}
confirmed := event.Txs[0]
// The transaction was applied without an index so the chain gave it the
// next index
index := confirmed.GetMeta().Index
if index == nil {
t.Fatal("Did not set index after applying tx to tip")
}
if *index != *service.GetLatestIndex() {
t.Fatal("Incorrect latest index")
}
if *index != nextIndex {
t.Fatal("Incorrect index")
}
}
func TestTransactionToTipTimestamps(t *testing.T) {
service, txCh, _, err := newTestSyncService(false)
if err != nil {
t.Fatal(err)
}
// Create two mock transactions with `nil` indices. This will allow
// assertions around the indices being updated correctly. Set the timestamp
// to 1 and 2 and assert that the timestamps in the sync service are updated
// correctly
tx1 := setMockTxL1Timestamp(mockTx(), 1)
tx2 := setMockTxL1Timestamp(mockTx(), 2)
txs := []*types.Transaction{
tx1,
tx2,
}
for _, tx := range txs {
nextIndex := service.GetNextIndex()
go func() {
err = service.applyTransactionToTip(tx)
}()
event := <-txCh
if err != nil {
t.Fatal(err)
}
conf := event.Txs[0]
// The index should be set to the next
if conf.GetMeta().Index == nil {
t.Fatal("Index is nil")
}
// The index that the sync service is tracking should be updated
if *conf.GetMeta().Index != *service.GetLatestIndex() {
t.Fatal("index on the service was not updated")
}
// The indexes should be incrementing by 1
if *conf.GetMeta().Index != nextIndex {
t.Fatalf("Mismatched index: got %d, expect %d", *conf.GetMeta().Index, nextIndex)
}
// The tx timestamp should be setting the services timestamp
if conf.L1Timestamp() != service.GetLatestL1Timestamp() {
t.Fatal("Mismatched timestamp")
}
}
// Send a transaction with no timestamp and then let it be updated
// by the sync service. This will prevent monotonicity errors as well
// as give timestamps to queue origin sequencer transactions
ts := service.GetLatestL1Timestamp()
tx3 := setMockTxL1Timestamp(mockTx(), 0)
go func() {
err = service.applyTransactionToTip(tx3)
}()
result := <-txCh
service.chainHeadCh <- core.ChainHeadEvent{}
if result.Txs[0].L1Timestamp() != ts {
t.Fatal("Timestamp not updated correctly")
}
}
func TestApplyIndexedTransaction(t *testing.T) {
service, txCh, _, err := newTestSyncService(true)
if err != nil {
t.Fatal(err)
}
// Create three transactions, two of which have a duplicate index.
// The first two transactions can be ingested without a problem and the
// third transaction has a duplicate index so it will not be ingested.
// Expect an error for the third transaction and expect the SyncService
// global index to be updated with the first two transactions
tx0 := setMockTxIndex(mockTx(), 0)
tx1 := setMockTxIndex(mockTx(), 1)
tx1a := setMockTxIndex(mockTx(), 1)
go func() {
err = service.applyIndexedTransaction(tx0)
}()
<-txCh
if err != nil {
t.Fatal(err)
}
if *tx0.GetMeta().Index != *service.GetLatestIndex() {
t.Fatal("Latest index mismatch")
}
go func() {
err = service.applyIndexedTransaction(tx1)
}()
<-txCh
if err != nil {
t.Fatal(err)
}
if *tx1.GetMeta().Index != *service.GetLatestIndex() {
t.Fatal("Latest index mismatch")
}
err = service.applyIndexedTransaction(tx1a)
if err == nil {
t.Fatal(err)
}
}
func TestApplyBatchedTransaction(t *testing.T) {
service, txCh, _, err := newTestSyncService(true)
if err != nil {
t.Fatal(err)
}
// Create a transactoin with the index of 0
tx0 := setMockTxIndex(mockTx(), 0)
// Ingest through applyBatchedTransaction which should set the latest
// verified index to the index of the transaction
go func() {
err = service.applyBatchedTransaction(tx0)
}()
service.chainHeadCh <- core.ChainHeadEvent{}
<-txCh
// Catch race conditions with the database write
wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
for {
if service.GetLatestVerifiedIndex() != nil {
wg.Done()
return
}
time.Sleep(100 * time.Millisecond)
}
}()
wg.Wait()
// Assert that the verified index is the same as the transaction index
if *tx0.GetMeta().Index != *service.GetLatestVerifiedIndex() {
t.Fatal("Latest verified index mismatch")
}
}
func TestIsAtTip(t *testing.T) {
service, _, _, err := newTestSyncService(true)
if err != nil {
t.Fatal(err)
}
data := []struct {
tip *uint64
get indexGetter
expect bool
err error
}{
{
tip: newUint64(1),
get: func() (*uint64, error) {
return newUint64(1), nil
},
expect: true,
err: nil,
},
{
tip: newUint64(0),
get: func() (*uint64, error) {
return newUint64(1), nil
},
expect: false,
err: nil,
},
{
tip: newUint64(1),
get: func() (*uint64, error) {
return newUint64(0), nil
},
expect: false,
err: errShortRemoteTip,
},
{
tip: nil,
get: func() (*uint64, error) {
return nil, nil
},
expect: true,
err: nil,
},
{
tip: nil,
get: func() (*uint64, error) {
return nil, errElementNotFound
},
expect: true,
err: nil,
},
{
tip: newUint64(0),
get: func() (*uint64, error) {
return nil, errElementNotFound
},
expect: false,
err: nil,
},
}
for _, d := range data {
isAtTip, err := service.isAtTip(d.tip, d.get)
if isAtTip != d.expect {
t.Fatal("expected does not match")
}
if !errors.Is(err, d.err) {
t.Fatal("error no match")
}
}
}
func TestSyncQueue(t *testing.T) {
service, txCh, _, err := newTestSyncService(true)
if err != nil {
t.Fatal(err)
}
setupMockClient(service, map[string]interface{}{
"GetEnqueue": []*types.Transaction{
setMockQueueIndex(mockTx(), 0),
setMockQueueIndex(mockTx(), 1),
setMockQueueIndex(mockTx(), 2),
setMockQueueIndex(mockTx(), 3),
},
})
var tip *uint64
go func() {
tip, err = service.syncQueue()
}()
for i := 0; i < 4; i++ {
service.chainHeadCh <- core.ChainHeadEvent{}
event := <-txCh
tx := event.Txs[0]
if *tx.GetMeta().QueueIndex != uint64(i) {
t.Fatal("queue index mismatch")
}
}
wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
for {
if tip != nil {
wg.Done()
return
}
time.Sleep(100 * time.Millisecond)
}
}()
wg.Wait()
if tip == nil {
t.Fatal("tip is nil")
}
// There were a total of 4 transactions synced and the indexing starts at 0
if *service.GetLatestIndex() != 3 {
t.Fatalf("Latest index mismatch")
}
// All of the transactions are `enqueue()`s
if *service.GetLatestEnqueueIndex() != 3 {
t.Fatal("Latest queue index mismatch")
}
if *tip != 3 {
t.Fatal("Tip mismatch")
}
}
func TestSyncServiceL1GasPrice(t *testing.T) {
service, _, _, err := newTestSyncService(true)
setupMockClient(service, map[string]interface{}{})
......@@ -253,12 +584,15 @@ func TestSyncServiceSync(t *testing.T) {
},
})
err = service.verify()
err = nil
go func() {
err = service.syncTransactionsToTip()
}()
event := <-txCh
if err != nil {
t.Fatal("verification failed", err)
}
event := <-txCh
if len(event.Txs) != 1 {
t.Fatal("Unexpected number of transactions")
}
......@@ -356,6 +690,7 @@ func newTestSyncService(isVerifier bool) (*SyncService, chan core.NewTxsEvent, e
// Set as an empty string as this is a dummy value anyways.
// The client needs to be mocked with a mockClient
RollupClientHttp: "",
Backend: BackendL2,
}
service, err := NewSyncService(context.Background(), cfg, txPool, chain, db)
......@@ -371,13 +706,15 @@ func newTestSyncService(isVerifier bool) (*SyncService, chan core.NewTxsEvent, e
}
type mockClient struct {
getEnqueueCallCount int
getEnqueue []*types.Transaction
getTransactionCallCount int
getTransaction []*types.Transaction
getEthContextCallCount int
getEthContext []*EthContext
getLatestEthContext *EthContext
getEnqueueCallCount int
getEnqueue []*types.Transaction
getTransactionCallCount int
getTransaction []*types.Transaction
getEthContextCallCount int
getEthContext []*EthContext
getLatestEthContext *EthContext
getLatestEnqueueIndex []func() (*uint64, error)
getLatestEnqueueIndexCallCount int
}
func setupMockClient(service *SyncService, responses map[string]interface{}) {
......@@ -391,6 +728,7 @@ func newMockClient(responses map[string]interface{}) *mockClient {
getTransactionResponses := []*types.Transaction{}
getEthContextResponses := []*EthContext{}
getLatestEthContextResponse := &EthContext{}
getLatestEnqueueIndexResponses := []func() (*uint64, error){}
enqueue, ok := responses["GetEnqueue"]
if ok {
......@@ -408,11 +746,17 @@ func newMockClient(responses map[string]interface{}) *mockClient {
if ok {
getLatestEthContextResponse = getLatestCtx.(*EthContext)
}
getLatestEnqueueIdx, ok := responses["GetLatestEnqueueIndex"]
if ok {
getLatestEnqueueIndexResponses = getLatestEnqueueIdx.([]func() (*uint64, error))
}
return &mockClient{
getEnqueue: getEnqueueResponses,
getTransaction: getTransactionResponses,
getEthContext: getEthContextResponses,
getLatestEthContext: getLatestEthContextResponse,
getEnqueue: getEnqueueResponses,
getTransaction: getTransactionResponses,
getEthContext: getEthContextResponses,
getLatestEthContext: getLatestEthContextResponse,
getLatestEnqueueIndex: getLatestEnqueueIndexResponses,
}
}
......@@ -427,23 +771,23 @@ func (m *mockClient) GetEnqueue(index uint64) (*types.Transaction, error) {
func (m *mockClient) GetLatestEnqueue() (*types.Transaction, error) {
if len(m.getEnqueue) == 0 {
return &types.Transaction{}, errors.New("")
return &types.Transaction{}, errors.New("enqueue not found")
}
return m.getEnqueue[len(m.getEnqueue)-1], nil
}
func (m *mockClient) GetTransaction(index uint64) (*types.Transaction, error) {
func (m *mockClient) GetTransaction(index uint64, backend Backend) (*types.Transaction, error) {
if m.getTransactionCallCount < len(m.getTransaction) {
tx := m.getTransaction[m.getTransactionCallCount]
m.getTransactionCallCount++
return tx, nil
}
return nil, errors.New("")
return nil, fmt.Errorf("Cannot get transaction: mocks (%d), call count (%d)", len(m.getTransaction), m.getTransactionCallCount)
}
func (m *mockClient) GetLatestTransaction() (*types.Transaction, error) {
func (m *mockClient) GetLatestTransaction(backend Backend) (*types.Transaction, error) {
if len(m.getTransaction) == 0 {
return nil, errors.New("")
return nil, errors.New("No transactions")
}
return m.getTransaction[len(m.getTransaction)-1], nil
}
......@@ -454,7 +798,7 @@ func (m *mockClient) GetEthContext(index uint64) (*EthContext, error) {
m.getEthContextCallCount++
return ctx, nil
}
return nil, errors.New("")
return nil, errors.New("Cannot get eth context")
}
func (m *mockClient) GetLatestEthContext() (*EthContext, error) {
......@@ -462,7 +806,7 @@ func (m *mockClient) GetLatestEthContext() (*EthContext, error) {
}
func (m *mockClient) GetLastConfirmedEnqueue() (*types.Transaction, error) {
return nil, nil
return nil, errElementNotFound
}
func (m *mockClient) GetLatestTransactionBatch() (*Batch, []*types.Transaction, error) {
......@@ -473,7 +817,7 @@ func (m *mockClient) GetTransactionBatch(index uint64) (*Batch, []*types.Transac
return nil, nil, nil
}
func (m *mockClient) SyncStatus() (*SyncStatus, error) {
func (m *mockClient) SyncStatus(backend Backend) (*SyncStatus, error) {
return &SyncStatus{
Syncing: false,
}, nil
......@@ -483,3 +827,80 @@ func (m *mockClient) GetL1GasPrice() (*big.Int, error) {
price := core.RoundL1GasPrice(big.NewInt(2))
return price, nil
}
func (m *mockClient) GetLatestEnqueueIndex() (*uint64, error) {
enqueue, err := m.GetLatestEnqueue()
if err != nil {
return nil, err
}
if enqueue == nil {
return nil, errElementNotFound
}
return enqueue.GetMeta().QueueIndex, nil
}
func (m *mockClient) GetLatestTransactionBatchIndex() (*uint64, error) {
return nil, nil
}
func (m *mockClient) GetLatestTransactionIndex(backend Backend) (*uint64, error) {
tx, err := m.GetLatestTransaction(backend)
if err != nil {
return nil, err
}
return tx.GetMeta().Index, nil
}
func mockTx() *types.Transaction {
address := make([]byte, 20)
rand.Read(address)
target := common.BytesToAddress(address)
timestamp := uint64(0)
rand.Read(address)
l1TxOrigin := common.BytesToAddress(address)
gasLimit := uint64(0)
data := []byte{0x00, 0x00}
l1BlockNumber := big.NewInt(0)
tx := types.NewTransaction(0, target, big.NewInt(0), gasLimit, big.NewInt(0), data)
meta := types.NewTransactionMeta(
l1BlockNumber,
timestamp,
&l1TxOrigin,
types.SighashEIP155,
types.QueueOriginSequencer,
nil,
nil,
nil,
)
tx.SetTransactionMeta(meta)
return tx
}
func setMockTxL1Timestamp(tx *types.Transaction, ts uint64) *types.Transaction {
meta := tx.GetMeta()
meta.L1Timestamp = ts
tx.SetTransactionMeta(meta)
return tx
}
func setMockTxIndex(tx *types.Transaction, index uint64) *types.Transaction {
meta := tx.GetMeta()
meta.Index = &index
tx.SetTransactionMeta(meta)
return tx
}
func setMockQueueIndex(tx *types.Transaction, index uint64) *types.Transaction {
meta := tx.GetMeta()
meta.QueueIndex = &index
tx.SetTransactionMeta(meta)
return tx
}
func newUint64(n uint64) *uint64 {
return &n
}
......@@ -2,11 +2,60 @@ package rollup
import (
"bytes"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
// OVMContext represents the blocknumber and timestamp
// that exist during L2 execution
type OVMContext struct {
blockNumber uint64
timestamp uint64
}
// Backend represents the type of transactions that are being synced.
// The different types have different security models.
type Backend uint
// String implements the Stringer interface
func (s Backend) String() string {
switch s {
case BackendL1:
return "l1"
case BackendL2:
return "l2"
default:
return ""
}
}
// NewBackend creates a Backend from a human readable string
func NewBackend(typ string) (Backend, error) {
switch typ {
case "l1":
return BackendL1, nil
case "l2":
return BackendL2, nil
default:
return 0, fmt.Errorf("Unknown Backend: %s", typ)
}
}
const (
// BackendL1 Backend involves syncing transactions that have been batched to
// Layer One. Once the transactions have been batched to L1, they cannot be
// removed assuming that they are not reorganized out of the chain.
BackendL1 Backend = iota
// BackendL2 Backend involves syncing transactions from the sequencer,
// meaning that the transactions may have not been batched to Layer One yet.
// This gives higher latency access to the sequencer data but no guarantees
// around the transactions as they have not been submitted via a batch to
// L1.
BackendL2
)
func isCtcTxEqual(a, b *types.Transaction) bool {
if a.To() == nil && b.To() != nil {
if !bytes.Equal(b.To().Bytes(), common.Address{}.Bytes()) {
......
......@@ -20,6 +20,7 @@ CACHE=1024
RPC_PORT=8545
WS_PORT=8546
VERBOSITY=3
ROLLUP_BACKEND=l1
USAGE="
Start the Sequencer or Verifier with most configuration pre-set.
......@@ -189,6 +190,15 @@ while (( "$#" )); do
exit 1
fi
;;
--rollup.backend)
if [ -n "$2" ] && [ ${2:0:1} != "-" ]; then
ROLLUP_BACKEND="$2"
shift 2
else
echo "Error: Argument for $1 is missing" >&2
exit 1
fi
;;
--cache)
if [ -n "$2" ] && [ ${2:0:1} != "-" ]; then
CACHE="$2"
......@@ -227,6 +237,7 @@ cmd="$cmd --eth1.l1ethgatewayaddress $ETH1_L1_GATEWAY_ADDRESS"
cmd="$cmd --rollup.clienthttp $ROLLUP_CLIENT_HTTP"
cmd="$cmd --rollup.pollinterval $ROLLUP_POLL_INTERVAL"
cmd="$cmd --rollup.timestamprefresh $ROLLUP_TIMESTAMP_REFRESH"
cmd="$cmd --rollup.backend $ROLLUP_BACKEND"
cmd="$cmd --cache $CACHE"
cmd="$cmd --rpc"
cmd="$cmd --dev"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment