Commit 0c4d4e08 authored by Murphy Law's avatar Murphy Law Committed by GitHub

l2geth: Revert pub/sub transactions (#2406)

* Revert "l2geth: Publish transactions to external msg queue"

This reverts commits:
* 9f05c877^..f8348862
* 3e016ae8^..36991e68

* add changeet
Co-authored-by: default avatarmergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
parent 24bf252e
---
'@eth-optimism/batch-submitter-service': patch
'@eth-optimism/l2geth': patch
---
l2geth: Revert transaction pubsub feature
This diff is collapsed.
......@@ -165,15 +165,6 @@ var (
utils.RollupFeeThresholdUpFlag,
utils.RollupGenesisTimeoutSecondsFlag,
utils.SequencerClientHttpFlag,
utils.TxPublisherEnableFlag,
utils.TxPublisherProjectIDFlag,
utils.TxPublisherTopicIDFlag,
utils.TxPublisherTimeoutFlag,
utils.TxQueueEnableFlag,
utils.TxQueueProjectIDFlag,
utils.TxQueueSubscriptionIDFlag,
utils.TxQueueMaxOutstandingBytesFlag,
utils.TxQueueMaxOutstandingMessagesFlag,
}
rpcFlags = []cli.Flag{
......
......@@ -79,15 +79,6 @@ var AppHelpFlagGroups = []flagGroup{
utils.RollupFeeThresholdUpFlag,
utils.RollupGenesisTimeoutSecondsFlag,
utils.SequencerClientHttpFlag,
utils.TxPublisherEnableFlag,
utils.TxPublisherProjectIDFlag,
utils.TxPublisherTopicIDFlag,
utils.TxPublisherTimeoutFlag,
utils.TxQueueEnableFlag,
utils.TxQueueProjectIDFlag,
utils.TxQueueSubscriptionIDFlag,
utils.TxQueueMaxOutstandingBytesFlag,
utils.TxQueueMaxOutstandingMessagesFlag,
},
},
{
......
......@@ -61,7 +61,6 @@ import (
"github.com/ethereum-optimism/optimism/l2geth/p2p/netutil"
"github.com/ethereum-optimism/optimism/l2geth/params"
"github.com/ethereum-optimism/optimism/l2geth/rollup"
"github.com/ethereum-optimism/optimism/l2geth/rollup/pub"
"github.com/ethereum-optimism/optimism/l2geth/rpc"
whisper "github.com/ethereum-optimism/optimism/l2geth/whisper/whisperv6"
pcsclite "github.com/gballet/go-libpcsclite"
......@@ -832,7 +831,7 @@ var (
}
RollupBackendFlag = cli.StringFlag{
Name: "rollup.backend",
Usage: "Sync backend for verifiers (\"l1\", \"l2\" or \"queue\"), defaults to l1",
Usage: "Sync backend for verifiers (\"l1\" or \"l2\"), defaults to l1",
Value: "l1",
EnvVar: "ROLLUP_BACKEND",
}
......@@ -873,51 +872,6 @@ var (
Usage: "HTTP endpoint for the sequencer client",
EnvVar: "SEQUENCER_CLIENT_HTTP",
}
TxPublisherEnableFlag = cli.BoolFlag{
Name: "txpublisher.enable",
Usage: "Enable transaction logging to PubSub",
EnvVar: "TX_PUBLISHER_ENABLE",
}
TxPublisherProjectIDFlag = cli.StringFlag{
Name: "txpublisher.projectid",
Usage: "GCP project ID for the tx PubSub",
EnvVar: "TX_PUBLISHER_PROJECT_ID",
}
TxPublisherTopicIDFlag = cli.StringFlag{
Name: "txpublisher.topicid",
Usage: "Topic ID used for PubSub",
EnvVar: "TX_PUBLISHER_TOPIC_ID",
}
TxPublisherTimeoutFlag = cli.DurationFlag{
Name: "txpublisher.timeout",
Usage: "Transaction publishing timeout",
EnvVar: "TX_PUBLISHER_TIMEOUT",
}
TxQueueEnableFlag = cli.BoolFlag{
Name: "txqueue.enable",
Usage: "Enable transaction syncing from the Backend Queue",
EnvVar: "TX_QUEUE_ENABLE",
}
TxQueueProjectIDFlag = cli.StringFlag{
Name: "txqueue.projectid",
Usage: "Backend Queue project ID",
EnvVar: "TX_QUEUE_PROJECT_ID",
}
TxQueueSubscriptionIDFlag = cli.StringFlag{
Name: "txqueue.subscriptionid",
Usage: "Transaction Queue subscription ID",
EnvVar: "TX_QUEUE_SUBSCRIPTION_ID",
}
TxQueueMaxOutstandingMessagesFlag = cli.IntFlag{
Name: "txqueue.maxoutstandingmessages",
Usage: "Max number of messages buffered in the transaction queue subscriber",
EnvVar: "TX_QUEUE_MAX_OUTSTANDING_MESSAGES",
}
TxQueueMaxOutstandingBytesFlag = cli.IntFlag{
Name: "txqueue.maxoutstandingbytes",
Usage: "Max outstanding bytes bufferered in the transaction queue subscriber",
EnvVar: "TX_QUEUE_MAX_OUTSTANDING_BYTES",
}
)
// MakeDataDir retrieves the currently requested data directory, terminating
......@@ -1199,43 +1153,6 @@ func setRollup(ctx *cli.Context, cfg *rollup.Config) {
}
}
// UsingOVM
// setTxPublisher configures the transaction logger
func setTxPublisher(ctx *cli.Context, cfg *pub.Config) {
if ctx.GlobalIsSet(TxPublisherEnableFlag.Name) {
cfg.Enable = ctx.GlobalBool(TxPublisherEnableFlag.Name)
}
if ctx.GlobalIsSet(TxPublisherProjectIDFlag.Name) {
cfg.ProjectID = ctx.GlobalString(TxPublisherProjectIDFlag.Name)
}
if ctx.GlobalIsSet(TxPublisherTopicIDFlag.Name) {
cfg.TopicID = ctx.GlobalString(TxPublisherTopicIDFlag.Name)
}
if ctx.GlobalIsSet(TxPublisherTimeoutFlag.Name) {
cfg.Timeout = ctx.GlobalDuration(TxPublisherTimeoutFlag.Name)
}
}
// UsingOVM
// setTxQueueSubscriber configures the Queue Backend
func setTxQueueSubscriber(ctx *cli.Context, cfg *rollup.QueueSubscriberConfig) {
if ctx.GlobalIsSet(TxQueueEnableFlag.Name) {
cfg.Enable = ctx.GlobalBool(TxQueueEnableFlag.Name)
}
if ctx.GlobalIsSet(TxQueueProjectIDFlag.Name) {
cfg.ProjectID = ctx.GlobalString(TxQueueProjectIDFlag.Name)
}
if ctx.GlobalIsSet(TxQueueSubscriptionIDFlag.Name) {
cfg.SubscriptionID = ctx.GlobalString(TxQueueSubscriptionIDFlag.Name)
}
if ctx.GlobalIsSet(TxQueueMaxOutstandingMessagesFlag.Name) {
cfg.MaxOutstandingMessages = ctx.GlobalInt(TxQueueMaxOutstandingMessagesFlag.Name)
}
if ctx.GlobalIsSet(TxQueueMaxOutstandingBytesFlag.Name) {
cfg.MaxOutstandingBytes = ctx.GlobalInt(TxQueueMaxOutstandingBytesFlag.Name)
}
}
// setLes configures the les server and ultra light client settings from the command line flags.
func setLes(ctx *cli.Context, cfg *eth.Config) {
if ctx.GlobalIsSet(LightLegacyServFlag.Name) {
......@@ -1695,8 +1612,6 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
setLes(ctx, cfg)
setEth1(ctx, &cfg.Rollup)
setRollup(ctx, &cfg.Rollup)
setTxPublisher(ctx, &cfg.TxPublisher)
setTxQueueSubscriber(ctx, &cfg.TxQueueSubscriber)
if ctx.GlobalIsSet(SyncModeFlag.Name) {
cfg.SyncMode = *GlobalTextMarshaler(ctx, SyncModeFlag.Name).(*downloader.SyncMode)
......
......@@ -32,17 +32,6 @@ func (q QueueOrigin) String() string {
}
}
func (q QueueOrigin) MarshalJSON() ([]byte, error) {
switch q {
case QueueOriginSequencer:
return []byte(`"sequencer"`), nil
case QueueOriginL1ToL2:
return []byte(`"l1"`), nil
default:
return []byte(`""`), nil
}
}
func (q *QueueOrigin) UnmarshalJSON(b []byte) error {
switch string(b) {
case "\"sequencer\"":
......
......@@ -52,7 +52,6 @@ import (
"github.com/ethereum-optimism/optimism/l2geth/params"
"github.com/ethereum-optimism/optimism/l2geth/rlp"
"github.com/ethereum-optimism/optimism/l2geth/rollup"
"github.com/ethereum-optimism/optimism/l2geth/rollup/pub"
"github.com/ethereum-optimism/optimism/l2geth/rollup/rcfg"
"github.com/ethereum-optimism/optimism/l2geth/rpc"
)
......@@ -207,19 +206,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
}
eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain)
var txLogger pub.Publisher
txLogger, err = pub.NewGooglePublisher(context.Background(), config.TxPublisher)
if err != nil {
return nil, err
}
var txQueueSubscriber rollup.QueueSubscriber
txQueueSubscriber, err = rollup.NewQueueSubscriber(context.Background(), config.TxQueueSubscriber)
if err != nil {
return nil, err
}
eth.syncService, err = rollup.NewSyncService(context.Background(), config.Rollup, eth.txPool, eth.blockchain, eth.chainDb, txLogger, txQueueSubscriber)
eth.syncService, err = rollup.NewSyncService(context.Background(), config.Rollup, eth.txPool, eth.blockchain, eth.chainDb)
if err != nil {
return nil, fmt.Errorf("Cannot initialize syncservice: %w", err)
}
......
......@@ -32,7 +32,6 @@ import (
"github.com/ethereum-optimism/optimism/l2geth/miner"
"github.com/ethereum-optimism/optimism/l2geth/params"
"github.com/ethereum-optimism/optimism/l2geth/rollup"
"github.com/ethereum-optimism/optimism/l2geth/rollup/pub"
)
// DefaultConfig contains default settings for use on the Ethereum main net.
......@@ -184,8 +183,4 @@ type Config struct {
// Optimism Rollup Config
Rollup rollup.Config
TxPublisher pub.Config
TxQueueSubscriber rollup.QueueSubscriberConfig
}
......@@ -3,7 +3,6 @@ module github.com/ethereum-optimism/optimism/l2geth
go 1.15
require (
cloud.google.com/go/pubsub v1.18.0
github.com/Azure/azure-storage-blob-go v0.7.0
github.com/VictoriaMetrics/fastcache v1.6.0
github.com/aristanetworks/goarista v0.0.0-20170210015632-ea17b1a17847
......@@ -22,7 +21,7 @@ require (
github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff
github.com/go-resty/resty/v2 v2.4.0
github.com/go-stack/stack v1.8.0
github.com/golang/protobuf v1.5.2
github.com/golang/protobuf v1.4.3
github.com/golang/snappy v0.0.4
github.com/gorilla/websocket v1.4.2
github.com/graph-gophers/graphql-go v0.0.0-20201113091052-beb923fada29
......@@ -52,9 +51,9 @@ require (
github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27
golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912
golang.org/x/text v0.3.6
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce
gopkg.in/olebedev/go-duktape.v3 v3.0.0-20200619000410-60c24ae608a6
gopkg.in/sourcemap.v1 v1.0.5 // indirect
......
This diff is collapsed.
package rollup
import "github.com/ethereum-optimism/optimism/l2geth/metrics"
var (
pubTxDropCounter = metrics.NewRegisteredCounter("rollup/pub/txdrops", nil)
)
package pub
import (
"context"
"sync"
"time"
"cloud.google.com/go/pubsub"
"github.com/ethereum-optimism/optimism/l2geth/log"
)
const messageOrderingKey = "o"
type Config struct {
Enable bool
ProjectID string
TopicID string
Timeout time.Duration
}
type GooglePublisher struct {
client *pubsub.Client
topic *pubsub.Topic
publishSettings pubsub.PublishSettings
timeout time.Duration
mutex sync.Mutex
}
func NewGooglePublisher(ctx context.Context, config Config) (Publisher, error) {
if !config.Enable {
return &NoopPublisher{}, nil
}
client, err := pubsub.NewClient(ctx, config.ProjectID)
if err != nil {
return nil, err
}
topic := client.Topic(config.TopicID)
topic.EnableMessageOrdering = true
// Publish messages immediately
publishSettings := pubsub.PublishSettings{
DelayThreshold: 0,
CountThreshold: 0,
}
timeout := config.Timeout
if timeout == 0 {
log.Info("Sanitizing publisher timeout to 2 seconds")
timeout = time.Second * 2
}
log.Info("Initialized transaction log to PubSub", "topic", config.TopicID)
return &GooglePublisher{
client: client,
topic: topic,
publishSettings: publishSettings,
timeout: timeout,
}, nil
}
func (p *GooglePublisher) Publish(ctx context.Context, msg []byte) error {
ctx, cancel := context.WithTimeout(ctx, p.timeout)
defer cancel()
pmsg := pubsub.Message{
Data: msg,
OrderingKey: messageOrderingKey,
}
p.mutex.Lock()
// If there was an error previously, clear it out to allow publishing to proceed again
p.topic.ResumePublish(messageOrderingKey)
result := p.topic.Publish(ctx, &pmsg)
_, err := result.Get(ctx)
p.mutex.Unlock()
return err
}
package pub
import "context"
type Publisher interface {
// Publish schedules an ordereed message to be sent
Publish(ctx context.Context, msg []byte) error
}
type NoopPublisher struct{}
func (p *NoopPublisher) Publish(ctx context.Context, msg []byte) error {
return nil
}
package rollup
import (
"context"
"cloud.google.com/go/pubsub"
"github.com/ethereum-optimism/optimism/l2geth/log"
)
type QueueSubscriberMessage interface {
Data() []byte
Ack()
Nack()
}
type QueueSubscriber interface {
ReceiveMessage(ctx context.Context, cb func(ctx context.Context, msg QueueSubscriberMessage)) error
Close() error
}
type QueueSubscriberConfig struct {
Enable bool
ProjectID string
SubscriptionID string
MaxOutstandingMessages int
MaxOutstandingBytes int
}
type queueSubscriber struct {
client *pubsub.Client
sub *pubsub.Subscription
}
func NewQueueSubscriber(ctx context.Context, config QueueSubscriberConfig) (QueueSubscriber, error) {
if !config.Enable {
return &noopQueueSubscriber{}, nil
}
client, err := pubsub.NewClient(ctx, config.ProjectID)
if err != nil {
return nil, err
}
sub := client.Subscription(config.SubscriptionID)
maxOutstandingMsgs := config.MaxOutstandingMessages
if maxOutstandingMsgs == 0 {
maxOutstandingMsgs = 10000
}
maxOutstandingBytes := config.MaxOutstandingBytes
if maxOutstandingBytes == 0 {
maxOutstandingBytes = 1e9
}
sub.ReceiveSettings = pubsub.ReceiveSettings{
MaxOutstandingMessages: maxOutstandingMsgs,
MaxOutstandingBytes: maxOutstandingBytes,
}
log.Info("Created Queue Subscriber", "projectID", config.ProjectID, "subscriptionID", config.SubscriptionID)
return &queueSubscriber{client, sub}, nil
}
func (q *queueSubscriber) ReceiveMessage(ctx context.Context, cb func(ctx context.Context, msg QueueSubscriberMessage)) error {
return q.sub.Receive(ctx, func(ctx context.Context, pmsg *pubsub.Message) {
cb(ctx, &queueSubscriberMessage{pmsg})
})
}
func (q *queueSubscriber) Close() error {
return q.client.Close()
}
type queueSubscriberMessage struct {
inner *pubsub.Message
}
func (q *queueSubscriberMessage) Data() []byte {
return q.inner.Data
}
func (q *queueSubscriberMessage) Ack() {
q.inner.Ack()
}
func (q *queueSubscriberMessage) Nack() {
q.inner.Nack()
}
type noopQueueSubscriber struct{}
func (q *noopQueueSubscriber) ReceiveMessage(ctx context.Context, cb func(ctx context.Context, msg QueueSubscriberMessage)) error {
return nil
}
func (q *noopQueueSubscriber) Close() error { return nil }
package rollup
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
......@@ -18,14 +16,12 @@ import (
"github.com/ethereum-optimism/optimism/l2geth/ethdb"
"github.com/ethereum-optimism/optimism/l2geth/event"
"github.com/ethereum-optimism/optimism/l2geth/log"
"github.com/ethereum-optimism/optimism/l2geth/rlp"
"github.com/ethereum-optimism/optimism/l2geth/core/rawdb"
"github.com/ethereum-optimism/optimism/l2geth/core/types"
"github.com/ethereum-optimism/optimism/l2geth/eth/gasprice"
"github.com/ethereum-optimism/optimism/l2geth/rollup/fees"
"github.com/ethereum-optimism/optimism/l2geth/rollup/pub"
"github.com/ethereum-optimism/optimism/l2geth/rollup/rcfg"
)
......@@ -72,12 +68,10 @@ type SyncService struct {
signer types.Signer
feeThresholdUp *big.Float
feeThresholdDown *big.Float
txLogger pub.Publisher
queueSub QueueSubscriber
}
// NewSyncService returns an initialized sync service
func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *core.BlockChain, db ethdb.Database, txLogger pub.Publisher, queueSub QueueSubscriber) (*SyncService, error) {
func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *core.BlockChain, db ethdb.Database) (*SyncService, error) {
if bc == nil {
return nil, errors.New("Must pass BlockChain to SyncService")
}
......@@ -149,8 +143,6 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co
signer: types.NewEIP155Signer(chainID),
feeThresholdDown: cfg.FeeThresholdDown,
feeThresholdUp: cfg.FeeThresholdUp,
txLogger: txLogger,
queueSub: queueSub,
}
// The chainHeadSub is used to synchronize the SyncService with the chain.
......@@ -165,8 +157,7 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co
// a remote server that indexes the layer one contracts. Place this
// code behind this if statement so that this can run without the
// requirement of the remote server being up.
// If we're syncing from the Queue, then we can skip all this and rely on L2 published transactions
if service.enable && service.backend != BackendQueue {
if service.enable {
// Ensure that the rollup client can connect to a remote server
// before starting. Retry until it can connect.
tEnsure := time.NewTicker(10 * time.Second)
......@@ -427,10 +418,6 @@ func (s *SyncService) verify() error {
if err := s.syncTransactionsToTip(); err != nil {
return fmt.Errorf("Verifier cannot sync transactions with BackendL2: %w", err)
}
case BackendQueue:
if err := s.syncTransactionsFromQueue(); err != nil {
return fmt.Errorf("Verifier cannot sync transactions with BackendQueue: %w", err)
}
}
return nil
}
......@@ -885,12 +872,6 @@ func (s *SyncService) applyTransactionToTip(tx *types.Transaction) error {
// The index was set above so it is safe to dereference
log.Debug("Applying transaction to tip", "index", *tx.GetMeta().Index, "hash", tx.Hash().Hex(), "origin", tx.QueueOrigin().String())
// Log transaction to the failover log
if err := s.publishTransaction(tx); err != nil {
log.Error("Failed to publish transaction to log", "msg", err)
return fmt.Errorf("internal error: transaction logging failed")
}
txs := types.Transactions{tx}
errCh := make(chan error, 1)
s.txFeed.Send(core.NewTxsEvent{
......@@ -1234,95 +1215,12 @@ func (s *SyncService) syncTransactionRange(start, end uint64, backend Backend) e
return nil
}
// syncTransactionsFromQueue will sync the earliest transaction from an external message queue
func (s *SyncService) syncTransactionsFromQueue() error {
// we don't drop messages unless they're already applied
cb := func(ctx context.Context, msg QueueSubscriberMessage) {
var (
queuedTxMeta QueuedTransactionMeta
tx types.Transaction
txMeta *types.TransactionMeta
)
log.Debug("Reading transaction from queue", "json", string(msg.Data()))
if err := json.Unmarshal(msg.Data(), &queuedTxMeta); err != nil {
log.Error("Failed to unmarshal logged TransactionMeta", "msg", err)
msg.Nack()
return
}
if err := rlp.DecodeBytes(queuedTxMeta.RawTransaction, &tx); err != nil {
log.Error("decoding raw transaction failed", "msg", err)
msg.Nack()
return
}
if queuedTxMeta.L1BlockNumber == nil || queuedTxMeta.L1Timestamp == nil {
log.Error("Missing required queued transaction fields", "msg", string(msg.Data()))
msg.Nack()
return
}
txMeta = types.NewTransactionMeta(
queuedTxMeta.L1BlockNumber,
*queuedTxMeta.L1Timestamp,
queuedTxMeta.L1MessageSender,
*queuedTxMeta.QueueOrigin,
queuedTxMeta.Index,
queuedTxMeta.QueueIndex,
queuedTxMeta.RawTransaction)
tx.SetTransactionMeta(txMeta)
if readTx, _, _, _ := rawdb.ReadTransaction(s.db, tx.Hash()); readTx != nil {
msg.Ack()
return
}
if err := s.applyTransactionToTip(&tx); err != nil {
log.Error("Unable to apply transactions to tip from Queue", "msg", err)
msg.Nack()
return
}
log.Debug("Successfully applied queued transaction", "txhash", tx.Hash())
msg.Ack()
}
// This blocks until there's a new message in the queue or ctx deadline hits
return s.queueSub.ReceiveMessage(s.ctx, cb)
}
// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
func (s *SyncService) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return s.scope.Track(s.txFeed.Subscribe(ch))
}
func (s *SyncService) publishTransaction(tx *types.Transaction) error {
rawTx := new(bytes.Buffer)
if err := tx.EncodeRLP(rawTx); err != nil {
return err
}
if tx.L1BlockNumber() == nil || tx.L1Timestamp() == 0 {
return fmt.Errorf("transaction doesn't contain required fields")
}
// Manually populate RawTransaction as it's not always available
txMeta := tx.GetMeta()
txMeta.RawTransaction = rawTx.Bytes()
txLog := AsQueuedTransactionMeta(txMeta)
encodedTxLog, err := json.Marshal(&txLog)
if err != nil {
return err
}
if err := s.txLogger.Publish(s.ctx, encodedTxLog); err != nil {
pubTxDropCounter.Inc(1)
return err
}
return nil
}
func stringify(i *uint64) string {
if i == nil {
return "<nil>"
......@@ -1335,25 +1233,3 @@ func stringify(i *uint64) string {
func (s *SyncService) IngestTransaction(tx *types.Transaction) error {
return s.applyTransaction(tx)
}
type QueuedTransactionMeta struct {
L1BlockNumber *big.Int `json:"l1BlockNumber"`
L1Timestamp *uint64 `json:"l1Timestamp"`
L1MessageSender *common.Address `json:"l1MessageSender"`
QueueOrigin *types.QueueOrigin `json:"queueOrigin"`
Index *uint64 `json:"index"`
QueueIndex *uint64 `json:"queueIndex"`
RawTransaction []byte `json:"rawTransaction"`
}
func AsQueuedTransactionMeta(txMeta *types.TransactionMeta) *QueuedTransactionMeta {
return &QueuedTransactionMeta{
L1BlockNumber: txMeta.L1BlockNumber,
L1Timestamp: &txMeta.L1Timestamp,
L1MessageSender: txMeta.L1MessageSender,
QueueOrigin: &txMeta.QueueOrigin,
Index: txMeta.Index,
QueueIndex: txMeta.QueueIndex,
RawTransaction: txMeta.RawTransaction,
}
}
This diff is collapsed.
......@@ -26,8 +26,6 @@ func (s Backend) String() string {
return "l1"
case BackendL2:
return "l2"
case BackendQueue:
return "queue"
default:
return ""
}
......@@ -40,8 +38,6 @@ func NewBackend(typ string) (Backend, error) {
return BackendL1, nil
case "l2":
return BackendL2, nil
case "queue":
return BackendQueue, nil
default:
return 0, fmt.Errorf("Unknown Backend: %s", typ)
}
......@@ -58,11 +54,6 @@ const (
// around the transactions as they have not been submitted via a batch to
// L1.
BackendL2
// BackendQueue Backend involves syncing transactions from an external message queue.
// This has the same guarantees as BackendL2 as such transactions may not have been
// submitted via a batch to L1.
BackendQueue
)
func isCtcTxEqual(a, b *types.Transaction) bool {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment