Commit 32d69199 authored by Murphy Law's avatar Murphy Law Committed by GitHub

Merge pull request #2296 from Inphi/feat/publish-tx

l2geth: Sync from Backend Queue

Introducing a new Backend verifiers can use to synchronize transactions from an external message queue. The *Queue* Backend can be used to follow a sequencer via Google PubSub. Sequencers, when configured, can now send transactions to an external message Queue.

To facilitate failover recovery without causing a chainsplit. The sequencer will only accept transactions once they've been successfully published to the configured external message queue.
parents 4bfddfbb f0618203
---
'@eth-optimism/batch-submitter-service': patch
'@eth-optimism/l2geth': patch
---
l2geth: Sync from Backend Queue
This diff is collapsed.
...@@ -164,6 +164,15 @@ var ( ...@@ -164,6 +164,15 @@ var (
utils.RollupFeeThresholdDownFlag, utils.RollupFeeThresholdDownFlag,
utils.RollupFeeThresholdUpFlag, utils.RollupFeeThresholdUpFlag,
utils.SequencerClientHttpFlag, utils.SequencerClientHttpFlag,
utils.TxPublisherEnableFlag,
utils.TxPublisherProjectIDFlag,
utils.TxPublisherTopicIDFlag,
utils.TxPublisherTimeoutFlag,
utils.TxQueueEnableFlag,
utils.TxQueueProjectIDFlag,
utils.TxQueueSubscriptionIDFlag,
utils.TxQueueMaxOutstandingBytesFlag,
utils.TxQueueMaxOutstandingMessagesFlag,
} }
rpcFlags = []cli.Flag{ rpcFlags = []cli.Flag{
......
...@@ -78,6 +78,15 @@ var AppHelpFlagGroups = []flagGroup{ ...@@ -78,6 +78,15 @@ var AppHelpFlagGroups = []flagGroup{
utils.RollupFeeThresholdDownFlag, utils.RollupFeeThresholdDownFlag,
utils.RollupFeeThresholdUpFlag, utils.RollupFeeThresholdUpFlag,
utils.SequencerClientHttpFlag, utils.SequencerClientHttpFlag,
utils.TxPublisherEnableFlag,
utils.TxPublisherProjectIDFlag,
utils.TxPublisherTopicIDFlag,
utils.TxPublisherTimeoutFlag,
utils.TxQueueEnableFlag,
utils.TxQueueProjectIDFlag,
utils.TxQueueSubscriptionIDFlag,
utils.TxQueueMaxOutstandingBytesFlag,
utils.TxQueueMaxOutstandingMessagesFlag,
}, },
}, },
{ {
......
...@@ -61,6 +61,7 @@ import ( ...@@ -61,6 +61,7 @@ import (
"github.com/ethereum-optimism/optimism/l2geth/p2p/netutil" "github.com/ethereum-optimism/optimism/l2geth/p2p/netutil"
"github.com/ethereum-optimism/optimism/l2geth/params" "github.com/ethereum-optimism/optimism/l2geth/params"
"github.com/ethereum-optimism/optimism/l2geth/rollup" "github.com/ethereum-optimism/optimism/l2geth/rollup"
"github.com/ethereum-optimism/optimism/l2geth/rollup/pub"
"github.com/ethereum-optimism/optimism/l2geth/rpc" "github.com/ethereum-optimism/optimism/l2geth/rpc"
whisper "github.com/ethereum-optimism/optimism/l2geth/whisper/whisperv6" whisper "github.com/ethereum-optimism/optimism/l2geth/whisper/whisperv6"
pcsclite "github.com/gballet/go-libpcsclite" pcsclite "github.com/gballet/go-libpcsclite"
...@@ -831,7 +832,7 @@ var ( ...@@ -831,7 +832,7 @@ var (
} }
RollupBackendFlag = cli.StringFlag{ RollupBackendFlag = cli.StringFlag{
Name: "rollup.backend", Name: "rollup.backend",
Usage: "Sync backend for verifiers (\"l1\" or \"l2\"), defaults to l1", Usage: "Sync backend for verifiers (\"l1\", \"l2\" or \"queue\"), defaults to l1",
Value: "l1", Value: "l1",
EnvVar: "ROLLUP_BACKEND", EnvVar: "ROLLUP_BACKEND",
} }
...@@ -866,6 +867,51 @@ var ( ...@@ -866,6 +867,51 @@ var (
Usage: "HTTP endpoint for the sequencer client", Usage: "HTTP endpoint for the sequencer client",
EnvVar: "SEQUENCER_CLIENT_HTTP", EnvVar: "SEQUENCER_CLIENT_HTTP",
} }
TxPublisherEnableFlag = cli.BoolFlag{
Name: "txpublisher.enable",
Usage: "Enable transaction logging to PubSub",
EnvVar: "TX_PUBLISHER_ENABLE",
}
TxPublisherProjectIDFlag = cli.StringFlag{
Name: "txpublisher.projectid",
Usage: "GCP project ID for the tx PubSub",
EnvVar: "TX_PUBLISHER_PROJECT_ID",
}
TxPublisherTopicIDFlag = cli.StringFlag{
Name: "txpublisher.topicid",
Usage: "Topic ID used for PubSub",
EnvVar: "TX_PUBLISHER_TOPIC_ID",
}
TxPublisherTimeoutFlag = cli.DurationFlag{
Name: "txpublisher.timeout",
Usage: "Transaction publishing timeout",
EnvVar: "TX_PUBLISHER_TIMEOUT",
}
TxQueueEnableFlag = cli.BoolFlag{
Name: "txqueue.enable",
Usage: "Enable transaction syncing from the Backend Queue",
EnvVar: "TX_QUEUE_ENABLE",
}
TxQueueProjectIDFlag = cli.StringFlag{
Name: "txqueue.projectid",
Usage: "Backend Queue project ID",
EnvVar: "TX_QUEUE_PROJECT_ID",
}
TxQueueSubscriptionIDFlag = cli.StringFlag{
Name: "txqueue.subscriptionid",
Usage: "Transaction Queue subscription ID",
EnvVar: "TX_QUEUE_SUBSCRIPTION_ID",
}
TxQueueMaxOutstandingMessagesFlag = cli.IntFlag{
Name: "txqueue.maxoutstandingmessages",
Usage: "Max number of messages buffered in the transaction queue subscriber",
EnvVar: "TX_QUEUE_MAX_OUTSTANDING_MESSAGES",
}
TxQueueMaxOutstandingBytesFlag = cli.IntFlag{
Name: "txqueue.maxoutstandingbytes",
Usage: "Max outstanding bytes bufferered in the transaction queue subscriber",
EnvVar: "TX_QUEUE_MAX_OUTSTANDING_BYTES",
}
) )
// MakeDataDir retrieves the currently requested data directory, terminating // MakeDataDir retrieves the currently requested data directory, terminating
...@@ -1147,6 +1193,43 @@ func setRollup(ctx *cli.Context, cfg *rollup.Config) { ...@@ -1147,6 +1193,43 @@ func setRollup(ctx *cli.Context, cfg *rollup.Config) {
} }
} }
// UsingOVM
// setTxPublisher configures the transaction logger
func setTxPublisher(ctx *cli.Context, cfg *pub.Config) {
if ctx.GlobalIsSet(TxPublisherEnableFlag.Name) {
cfg.Enable = ctx.GlobalBool(TxPublisherEnableFlag.Name)
}
if ctx.GlobalIsSet(TxPublisherProjectIDFlag.Name) {
cfg.ProjectID = ctx.GlobalString(TxPublisherProjectIDFlag.Name)
}
if ctx.GlobalIsSet(TxPublisherTopicIDFlag.Name) {
cfg.TopicID = ctx.GlobalString(TxPublisherTopicIDFlag.Name)
}
if ctx.GlobalIsSet(TxPublisherTimeoutFlag.Name) {
cfg.Timeout = ctx.GlobalDuration(TxPublisherTimeoutFlag.Name)
}
}
// UsingOVM
// setTxQueueSubscriber configures the Queue Backend
func setTxQueueSubscriber(ctx *cli.Context, cfg *rollup.QueueSubscriberConfig) {
if ctx.GlobalIsSet(TxQueueEnableFlag.Name) {
cfg.Enable = ctx.GlobalBool(TxQueueEnableFlag.Name)
}
if ctx.GlobalIsSet(TxQueueProjectIDFlag.Name) {
cfg.ProjectID = ctx.GlobalString(TxQueueProjectIDFlag.Name)
}
if ctx.GlobalIsSet(TxQueueSubscriptionIDFlag.Name) {
cfg.SubscriptionID = ctx.GlobalString(TxQueueSubscriptionIDFlag.Name)
}
if ctx.GlobalIsSet(TxQueueMaxOutstandingMessagesFlag.Name) {
cfg.MaxOutstandingMessages = ctx.GlobalInt(TxQueueMaxOutstandingMessagesFlag.Name)
}
if ctx.GlobalIsSet(TxQueueMaxOutstandingBytesFlag.Name) {
cfg.MaxOutstandingBytes = ctx.GlobalInt(TxQueueMaxOutstandingBytesFlag.Name)
}
}
// setLes configures the les server and ultra light client settings from the command line flags. // setLes configures the les server and ultra light client settings from the command line flags.
func setLes(ctx *cli.Context, cfg *eth.Config) { func setLes(ctx *cli.Context, cfg *eth.Config) {
if ctx.GlobalIsSet(LightLegacyServFlag.Name) { if ctx.GlobalIsSet(LightLegacyServFlag.Name) {
...@@ -1606,6 +1689,8 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) { ...@@ -1606,6 +1689,8 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
setLes(ctx, cfg) setLes(ctx, cfg)
setEth1(ctx, &cfg.Rollup) setEth1(ctx, &cfg.Rollup)
setRollup(ctx, &cfg.Rollup) setRollup(ctx, &cfg.Rollup)
setTxPublisher(ctx, &cfg.TxPublisher)
setTxQueueSubscriber(ctx, &cfg.TxQueueSubscriber)
if ctx.GlobalIsSet(SyncModeFlag.Name) { if ctx.GlobalIsSet(SyncModeFlag.Name) {
cfg.SyncMode = *GlobalTextMarshaler(ctx, SyncModeFlag.Name).(*downloader.SyncMode) cfg.SyncMode = *GlobalTextMarshaler(ctx, SyncModeFlag.Name).(*downloader.SyncMode)
......
...@@ -32,6 +32,17 @@ func (q QueueOrigin) String() string { ...@@ -32,6 +32,17 @@ func (q QueueOrigin) String() string {
} }
} }
func (q QueueOrigin) MarshalJSON() ([]byte, error) {
switch q {
case QueueOriginSequencer:
return []byte(`"sequencer"`), nil
case QueueOriginL1ToL2:
return []byte(`"l1"`), nil
default:
return []byte(`""`), nil
}
}
func (q *QueueOrigin) UnmarshalJSON(b []byte) error { func (q *QueueOrigin) UnmarshalJSON(b []byte) error {
switch string(b) { switch string(b) {
case "\"sequencer\"": case "\"sequencer\"":
......
...@@ -52,6 +52,7 @@ import ( ...@@ -52,6 +52,7 @@ import (
"github.com/ethereum-optimism/optimism/l2geth/params" "github.com/ethereum-optimism/optimism/l2geth/params"
"github.com/ethereum-optimism/optimism/l2geth/rlp" "github.com/ethereum-optimism/optimism/l2geth/rlp"
"github.com/ethereum-optimism/optimism/l2geth/rollup" "github.com/ethereum-optimism/optimism/l2geth/rollup"
"github.com/ethereum-optimism/optimism/l2geth/rollup/pub"
"github.com/ethereum-optimism/optimism/l2geth/rollup/rcfg" "github.com/ethereum-optimism/optimism/l2geth/rollup/rcfg"
"github.com/ethereum-optimism/optimism/l2geth/rpc" "github.com/ethereum-optimism/optimism/l2geth/rpc"
) )
...@@ -206,7 +207,19 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { ...@@ -206,7 +207,19 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
} }
eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain) eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain)
eth.syncService, err = rollup.NewSyncService(context.Background(), config.Rollup, eth.txPool, eth.blockchain, eth.chainDb) var txLogger pub.Publisher
txLogger, err = pub.NewGooglePublisher(context.Background(), config.TxPublisher)
if err != nil {
return nil, err
}
var txQueueSubscriber rollup.QueueSubscriber
txQueueSubscriber, err = rollup.NewQueueSubscriber(context.Background(), config.TxQueueSubscriber)
if err != nil {
return nil, err
}
eth.syncService, err = rollup.NewSyncService(context.Background(), config.Rollup, eth.txPool, eth.blockchain, eth.chainDb, txLogger, txQueueSubscriber)
if err != nil { if err != nil {
return nil, fmt.Errorf("Cannot initialize syncservice: %w", err) return nil, fmt.Errorf("Cannot initialize syncservice: %w", err)
} }
......
...@@ -32,6 +32,7 @@ import ( ...@@ -32,6 +32,7 @@ import (
"github.com/ethereum-optimism/optimism/l2geth/miner" "github.com/ethereum-optimism/optimism/l2geth/miner"
"github.com/ethereum-optimism/optimism/l2geth/params" "github.com/ethereum-optimism/optimism/l2geth/params"
"github.com/ethereum-optimism/optimism/l2geth/rollup" "github.com/ethereum-optimism/optimism/l2geth/rollup"
"github.com/ethereum-optimism/optimism/l2geth/rollup/pub"
) )
// DefaultConfig contains default settings for use on the Ethereum main net. // DefaultConfig contains default settings for use on the Ethereum main net.
...@@ -183,4 +184,8 @@ type Config struct { ...@@ -183,4 +184,8 @@ type Config struct {
// Optimism Rollup Config // Optimism Rollup Config
Rollup rollup.Config Rollup rollup.Config
TxPublisher pub.Config
TxQueueSubscriber rollup.QueueSubscriberConfig
} }
...@@ -3,6 +3,7 @@ module github.com/ethereum-optimism/optimism/l2geth ...@@ -3,6 +3,7 @@ module github.com/ethereum-optimism/optimism/l2geth
go 1.15 go 1.15
require ( require (
cloud.google.com/go/pubsub v1.18.0
github.com/Azure/azure-storage-blob-go v0.7.0 github.com/Azure/azure-storage-blob-go v0.7.0
github.com/VictoriaMetrics/fastcache v1.6.0 github.com/VictoriaMetrics/fastcache v1.6.0
github.com/aristanetworks/goarista v0.0.0-20170210015632-ea17b1a17847 github.com/aristanetworks/goarista v0.0.0-20170210015632-ea17b1a17847
...@@ -21,7 +22,7 @@ require ( ...@@ -21,7 +22,7 @@ require (
github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff
github.com/go-resty/resty/v2 v2.4.0 github.com/go-resty/resty/v2 v2.4.0
github.com/go-stack/stack v1.8.0 github.com/go-stack/stack v1.8.0
github.com/golang/protobuf v1.4.3 github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.4 github.com/golang/snappy v0.0.4
github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket v1.4.2
github.com/graph-gophers/graphql-go v0.0.0-20201113091052-beb923fada29 github.com/graph-gophers/graphql-go v0.0.0-20201113091052-beb923fada29
...@@ -51,9 +52,9 @@ require ( ...@@ -51,9 +52,9 @@ require (
github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208 github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912 golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27
golang.org/x/text v0.3.6 golang.org/x/text v0.3.6
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce
gopkg.in/olebedev/go-duktape.v3 v3.0.0-20200619000410-60c24ae608a6 gopkg.in/olebedev/go-duktape.v3 v3.0.0-20200619000410-60c24ae608a6
gopkg.in/sourcemap.v1 v1.0.5 // indirect gopkg.in/sourcemap.v1 v1.0.5 // indirect
......
This diff is collapsed.
package rollup
import "github.com/ethereum-optimism/optimism/l2geth/metrics"
var (
pubTxDropCounter = metrics.NewRegisteredCounter("rollup/pub/txdrops", nil)
)
package pub
import (
"context"
"sync"
"time"
"cloud.google.com/go/pubsub"
"github.com/ethereum-optimism/optimism/l2geth/log"
)
const messageOrderingKey = "o"
type Config struct {
Enable bool
ProjectID string
TopicID string
Timeout time.Duration
}
type GooglePublisher struct {
client *pubsub.Client
topic *pubsub.Topic
publishSettings pubsub.PublishSettings
timeout time.Duration
mutex sync.Mutex
}
func NewGooglePublisher(ctx context.Context, config Config) (Publisher, error) {
if !config.Enable {
return &NoopPublisher{}, nil
}
client, err := pubsub.NewClient(ctx, config.ProjectID)
if err != nil {
return nil, err
}
topic := client.Topic(config.TopicID)
topic.EnableMessageOrdering = true
// Publish messages immediately
publishSettings := pubsub.PublishSettings{
DelayThreshold: 0,
CountThreshold: 0,
}
timeout := config.Timeout
if timeout == 0 {
log.Info("Sanitizing publisher timeout to 2 seconds")
timeout = time.Second * 2
}
log.Info("Initialized transaction log to PubSub", "topic", config.TopicID)
return &GooglePublisher{
client: client,
topic: topic,
publishSettings: publishSettings,
timeout: timeout,
}, nil
}
func (p *GooglePublisher) Publish(ctx context.Context, msg []byte) error {
ctx, cancel := context.WithTimeout(ctx, p.timeout)
defer cancel()
pmsg := pubsub.Message{
Data: msg,
OrderingKey: messageOrderingKey,
}
p.mutex.Lock()
// If there was an error previously, clear it out to allow publishing to proceed again
p.topic.ResumePublish(messageOrderingKey)
result := p.topic.Publish(ctx, &pmsg)
_, err := result.Get(ctx)
p.mutex.Unlock()
return err
}
package pub
import "context"
type Publisher interface {
// Publish schedules an ordereed message to be sent
Publish(ctx context.Context, msg []byte) error
}
type NoopPublisher struct{}
func (p *NoopPublisher) Publish(ctx context.Context, msg []byte) error {
return nil
}
package rollup
import (
"context"
"cloud.google.com/go/pubsub"
"github.com/ethereum-optimism/optimism/l2geth/log"
)
type QueueSubscriberMessage interface {
Data() []byte
Ack()
Nack()
}
type QueueSubscriber interface {
ReceiveMessage(ctx context.Context, cb func(ctx context.Context, msg QueueSubscriberMessage)) error
Close() error
}
type QueueSubscriberConfig struct {
Enable bool
ProjectID string
SubscriptionID string
MaxOutstandingMessages int
MaxOutstandingBytes int
}
type queueSubscriber struct {
client *pubsub.Client
sub *pubsub.Subscription
}
func NewQueueSubscriber(ctx context.Context, config QueueSubscriberConfig) (QueueSubscriber, error) {
if !config.Enable {
return &noopQueueSubscriber{}, nil
}
client, err := pubsub.NewClient(ctx, config.ProjectID)
if err != nil {
return nil, err
}
sub := client.Subscription(config.SubscriptionID)
maxOutstandingMsgs := config.MaxOutstandingMessages
if maxOutstandingMsgs == 0 {
maxOutstandingMsgs = 10000
}
maxOutstandingBytes := config.MaxOutstandingBytes
if maxOutstandingBytes == 0 {
maxOutstandingBytes = 1e9
}
sub.ReceiveSettings = pubsub.ReceiveSettings{
MaxOutstandingMessages: maxOutstandingMsgs,
MaxOutstandingBytes: maxOutstandingBytes,
}
log.Info("Created Queue Subscriber", "projectID", config.ProjectID, "subscriptionID", config.SubscriptionID)
return &queueSubscriber{client, sub}, nil
}
func (q *queueSubscriber) ReceiveMessage(ctx context.Context, cb func(ctx context.Context, msg QueueSubscriberMessage)) error {
return q.sub.Receive(ctx, func(ctx context.Context, pmsg *pubsub.Message) {
cb(ctx, &queueSubscriberMessage{pmsg})
})
}
func (q *queueSubscriber) Close() error {
return q.client.Close()
}
type queueSubscriberMessage struct {
inner *pubsub.Message
}
func (q *queueSubscriberMessage) Data() []byte {
return q.inner.Data
}
func (q *queueSubscriberMessage) Ack() {
q.inner.Ack()
}
func (q *queueSubscriberMessage) Nack() {
q.inner.Nack()
}
type noopQueueSubscriber struct{}
func (q *noopQueueSubscriber) ReceiveMessage(ctx context.Context, cb func(ctx context.Context, msg QueueSubscriberMessage)) error {
return nil
}
func (q *noopQueueSubscriber) Close() error { return nil }
package rollup package rollup
import ( import (
"bytes"
"context" "context"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"math/big" "math/big"
...@@ -16,12 +18,14 @@ import ( ...@@ -16,12 +18,14 @@ import (
"github.com/ethereum-optimism/optimism/l2geth/ethdb" "github.com/ethereum-optimism/optimism/l2geth/ethdb"
"github.com/ethereum-optimism/optimism/l2geth/event" "github.com/ethereum-optimism/optimism/l2geth/event"
"github.com/ethereum-optimism/optimism/l2geth/log" "github.com/ethereum-optimism/optimism/l2geth/log"
"github.com/ethereum-optimism/optimism/l2geth/rlp"
"github.com/ethereum-optimism/optimism/l2geth/core/rawdb" "github.com/ethereum-optimism/optimism/l2geth/core/rawdb"
"github.com/ethereum-optimism/optimism/l2geth/core/types" "github.com/ethereum-optimism/optimism/l2geth/core/types"
"github.com/ethereum-optimism/optimism/l2geth/eth/gasprice" "github.com/ethereum-optimism/optimism/l2geth/eth/gasprice"
"github.com/ethereum-optimism/optimism/l2geth/rollup/fees" "github.com/ethereum-optimism/optimism/l2geth/rollup/fees"
"github.com/ethereum-optimism/optimism/l2geth/rollup/pub"
"github.com/ethereum-optimism/optimism/l2geth/rollup/rcfg" "github.com/ethereum-optimism/optimism/l2geth/rollup/rcfg"
) )
...@@ -68,10 +72,12 @@ type SyncService struct { ...@@ -68,10 +72,12 @@ type SyncService struct {
signer types.Signer signer types.Signer
feeThresholdUp *big.Float feeThresholdUp *big.Float
feeThresholdDown *big.Float feeThresholdDown *big.Float
txLogger pub.Publisher
queueSub QueueSubscriber
} }
// NewSyncService returns an initialized sync service // NewSyncService returns an initialized sync service
func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *core.BlockChain, db ethdb.Database) (*SyncService, error) { func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *core.BlockChain, db ethdb.Database, txLogger pub.Publisher, queueSub QueueSubscriber) (*SyncService, error) {
if bc == nil { if bc == nil {
return nil, errors.New("Must pass BlockChain to SyncService") return nil, errors.New("Must pass BlockChain to SyncService")
} }
...@@ -143,6 +149,8 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co ...@@ -143,6 +149,8 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co
signer: types.NewEIP155Signer(chainID), signer: types.NewEIP155Signer(chainID),
feeThresholdDown: cfg.FeeThresholdDown, feeThresholdDown: cfg.FeeThresholdDown,
feeThresholdUp: cfg.FeeThresholdUp, feeThresholdUp: cfg.FeeThresholdUp,
txLogger: txLogger,
queueSub: queueSub,
} }
// The chainHeadSub is used to synchronize the SyncService with the chain. // The chainHeadSub is used to synchronize the SyncService with the chain.
...@@ -157,7 +165,8 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co ...@@ -157,7 +165,8 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co
// a remote server that indexes the layer one contracts. Place this // a remote server that indexes the layer one contracts. Place this
// code behind this if statement so that this can run without the // code behind this if statement so that this can run without the
// requirement of the remote server being up. // requirement of the remote server being up.
if service.enable { // If we're syncing from the Queue, then we can skip all this and rely on L2 published transactions
if service.enable && service.backend != BackendQueue {
// Ensure that the rollup client can connect to a remote server // Ensure that the rollup client can connect to a remote server
// before starting. Retry until it can connect. // before starting. Retry until it can connect.
tEnsure := time.NewTicker(10 * time.Second) tEnsure := time.NewTicker(10 * time.Second)
...@@ -418,6 +427,10 @@ func (s *SyncService) verify() error { ...@@ -418,6 +427,10 @@ func (s *SyncService) verify() error {
if err := s.syncTransactionsToTip(); err != nil { if err := s.syncTransactionsToTip(); err != nil {
return fmt.Errorf("Verifier cannot sync transactions with BackendL2: %w", err) return fmt.Errorf("Verifier cannot sync transactions with BackendL2: %w", err)
} }
case BackendQueue:
if err := s.syncTransactionsFromQueue(); err != nil {
return fmt.Errorf("Verifier cannot sync transactions with BackendQueue: %w", err)
}
} }
return nil return nil
} }
...@@ -872,6 +885,12 @@ func (s *SyncService) applyTransactionToTip(tx *types.Transaction) error { ...@@ -872,6 +885,12 @@ func (s *SyncService) applyTransactionToTip(tx *types.Transaction) error {
// The index was set above so it is safe to dereference // The index was set above so it is safe to dereference
log.Debug("Applying transaction to tip", "index", *tx.GetMeta().Index, "hash", tx.Hash().Hex(), "origin", tx.QueueOrigin().String()) log.Debug("Applying transaction to tip", "index", *tx.GetMeta().Index, "hash", tx.Hash().Hex(), "origin", tx.QueueOrigin().String())
// Log transaction to the failover log
if err := s.publishTransaction(tx); err != nil {
log.Error("Failed to publish transaction to log", "msg", err)
return fmt.Errorf("internal error: transaction logging failed")
}
txs := types.Transactions{tx} txs := types.Transactions{tx}
errCh := make(chan error, 1) errCh := make(chan error, 1)
s.txFeed.Send(core.NewTxsEvent{ s.txFeed.Send(core.NewTxsEvent{
...@@ -1215,12 +1234,95 @@ func (s *SyncService) syncTransactionRange(start, end uint64, backend Backend) e ...@@ -1215,12 +1234,95 @@ func (s *SyncService) syncTransactionRange(start, end uint64, backend Backend) e
return nil return nil
} }
// syncTransactionsFromQueue will sync the earliest transaction from an external message queue
func (s *SyncService) syncTransactionsFromQueue() error {
// we don't drop messages unless they're already applied
cb := func(ctx context.Context, msg QueueSubscriberMessage) {
var (
queuedTxMeta QueuedTransactionMeta
tx types.Transaction
txMeta *types.TransactionMeta
)
log.Debug("Reading transaction from queue", "json", string(msg.Data()))
if err := json.Unmarshal(msg.Data(), &queuedTxMeta); err != nil {
log.Error("Failed to unmarshal logged TransactionMeta", "msg", err)
msg.Nack()
return
}
if err := rlp.DecodeBytes(queuedTxMeta.RawTransaction, &tx); err != nil {
log.Error("decoding raw transaction failed", "msg", err)
msg.Nack()
return
}
if queuedTxMeta.L1BlockNumber == nil || queuedTxMeta.L1Timestamp == nil {
log.Error("Missing required queued transaction fields", "msg", string(msg.Data()))
msg.Nack()
return
}
txMeta = types.NewTransactionMeta(
queuedTxMeta.L1BlockNumber,
*queuedTxMeta.L1Timestamp,
queuedTxMeta.L1MessageSender,
*queuedTxMeta.QueueOrigin,
queuedTxMeta.Index,
queuedTxMeta.QueueIndex,
queuedTxMeta.RawTransaction)
tx.SetTransactionMeta(txMeta)
if readTx, _, _, _ := rawdb.ReadTransaction(s.db, tx.Hash()); readTx != nil {
msg.Ack()
return
}
if err := s.applyTransactionToTip(&tx); err != nil {
log.Error("Unable to apply transactions to tip from Queue", "msg", err)
msg.Nack()
return
}
log.Debug("Successfully applied queued transaction", "txhash", tx.Hash())
msg.Ack()
}
// This blocks until there's a new message in the queue or ctx deadline hits
return s.queueSub.ReceiveMessage(s.ctx, cb)
}
// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and // SubscribeNewTxsEvent registers a subscription of NewTxsEvent and
// starts sending event to the given channel. // starts sending event to the given channel.
func (s *SyncService) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { func (s *SyncService) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return s.scope.Track(s.txFeed.Subscribe(ch)) return s.scope.Track(s.txFeed.Subscribe(ch))
} }
func (s *SyncService) publishTransaction(tx *types.Transaction) error {
rawTx := new(bytes.Buffer)
if err := tx.EncodeRLP(rawTx); err != nil {
return err
}
if tx.L1BlockNumber() == nil || tx.L1Timestamp() == 0 {
return fmt.Errorf("transaction doesn't contain required fields")
}
// Manually populate RawTransaction as it's not always available
txMeta := tx.GetMeta()
txMeta.RawTransaction = rawTx.Bytes()
txLog := AsQueuedTransactionMeta(txMeta)
encodedTxLog, err := json.Marshal(&txLog)
if err != nil {
return err
}
if err := s.txLogger.Publish(s.ctx, encodedTxLog); err != nil {
pubTxDropCounter.Inc(1)
return err
}
return nil
}
func stringify(i *uint64) string { func stringify(i *uint64) string {
if i == nil { if i == nil {
return "<nil>" return "<nil>"
...@@ -1233,3 +1335,25 @@ func stringify(i *uint64) string { ...@@ -1233,3 +1335,25 @@ func stringify(i *uint64) string {
func (s *SyncService) IngestTransaction(tx *types.Transaction) error { func (s *SyncService) IngestTransaction(tx *types.Transaction) error {
return s.applyTransaction(tx) return s.applyTransaction(tx)
} }
type QueuedTransactionMeta struct {
L1BlockNumber *big.Int `json:"l1BlockNumber"`
L1Timestamp *uint64 `json:"l1Timestamp"`
L1MessageSender *common.Address `json:"l1MessageSender"`
QueueOrigin *types.QueueOrigin `json:"queueOrigin"`
Index *uint64 `json:"index"`
QueueIndex *uint64 `json:"queueIndex"`
RawTransaction []byte `json:"rawTransaction"`
}
func AsQueuedTransactionMeta(txMeta *types.TransactionMeta) *QueuedTransactionMeta {
return &QueuedTransactionMeta{
L1BlockNumber: txMeta.L1BlockNumber,
L1Timestamp: &txMeta.L1Timestamp,
L1MessageSender: txMeta.L1MessageSender,
QueueOrigin: &txMeta.QueueOrigin,
Index: txMeta.Index,
QueueIndex: txMeta.QueueIndex,
RawTransaction: txMeta.RawTransaction,
}
}
This diff is collapsed.
...@@ -26,6 +26,8 @@ func (s Backend) String() string { ...@@ -26,6 +26,8 @@ func (s Backend) String() string {
return "l1" return "l1"
case BackendL2: case BackendL2:
return "l2" return "l2"
case BackendQueue:
return "queue"
default: default:
return "" return ""
} }
...@@ -38,6 +40,8 @@ func NewBackend(typ string) (Backend, error) { ...@@ -38,6 +40,8 @@ func NewBackend(typ string) (Backend, error) {
return BackendL1, nil return BackendL1, nil
case "l2": case "l2":
return BackendL2, nil return BackendL2, nil
case "queue":
return BackendQueue, nil
default: default:
return 0, fmt.Errorf("Unknown Backend: %s", typ) return 0, fmt.Errorf("Unknown Backend: %s", typ)
} }
...@@ -54,6 +58,11 @@ const ( ...@@ -54,6 +58,11 @@ const (
// around the transactions as they have not been submitted via a batch to // around the transactions as they have not been submitted via a batch to
// L1. // L1.
BackendL2 BackendL2
// BackendQueue Backend involves syncing transactions from an external message queue.
// This has the same guarantees as BackendL2 as such transactions may not have been
// submitted via a batch to L1.
BackendQueue
) )
func isCtcTxEqual(a, b *types.Transaction) bool { func isCtcTxEqual(a, b *types.Transaction) bool {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment