Commit 783252e6 authored by inphi's avatar inphi

Introduce new Backend Queue

The BackendQueue can be used by verifiers to sync transactions from
Google PubSub
parent 9f05c877
......@@ -832,7 +832,7 @@ var (
}
RollupBackendFlag = cli.StringFlag{
Name: "rollup.backend",
Usage: "Sync backend for verifiers (\"l1\" or \"l2\"), defaults to l1",
Usage: "Sync backend for verifiers (\"l1\", \"l2\" or \"queue\"), defaults to l1",
Value: "l1",
EnvVar: "ROLLUP_BACKEND",
}
......@@ -869,12 +869,12 @@ var (
}
TxPublisherEnableFlag = cli.BoolFlag{
Name: "txpublisher.enable",
Usage: "Enable Transaction logging to PubSub",
Usage: "Enable transaction logging to PubSub",
EnvVar: "TX_PUBLISHER_ENABLE",
}
TxPublisherProjectIDFlag = cli.StringFlag{
Name: "txpublisher.projectid",
Usage: "GCP Project ID for the tx PubSub",
Usage: "GCP project ID for the tx PubSub",
EnvVar: "TX_PUBLISHER_PROJECT_ID",
}
TxPublisherTopicIDFlag = cli.StringFlag{
......@@ -887,6 +887,31 @@ var (
Usage: "Transaction publishing timeout",
EnvVar: "TX_PUBLISHER_TIMEOUT",
}
TxQueueEnableFlag = cli.BoolFlag{
Name: "txqueue.enable",
Usage: "Enable transaction syncing from the Backend Queue",
EnvVar: "TX_QUEUE_ENABLE",
}
TxQueueProjectIDFlag = cli.StringFlag{
Name: "txqueue.projectid",
Usage: "Backend Queue project ID",
EnvVar: "TX_QUEUE_PROJECT_ID",
}
TxQueueSubscriptionIDFlag = cli.StringFlag{
Name: "txqueue.subscriptionid",
Usage: "Transaction Queue subscription ID",
EnvVar: "TX_QUEUE_SUBSCRIPTION_ID",
}
TxQueueMaxOutstandingMessagesFlag = cli.IntFlag{
Name: "txqueue.maxoutstandingmessages",
Usage: "Max number of messages buffered in the transaction queue subscriber",
EnvVar: "TX_QUEUE_MAX_OUTSTANDING_MESSAGES",
}
TxQueueMaxOutstandingBytesFlag = cli.IntFlag{
Name: "txqueue.maxoutstandingbytes",
Usage: "Max outstanding bytes bufferered in the transaction queue subscriber",
EnvVar: "TX_QUEUE_MAX_OUTSTANDING_BYTES",
}
)
// MakeDataDir retrieves the currently requested data directory, terminating
......@@ -1185,6 +1210,26 @@ func setTxPublisher(ctx *cli.Context, cfg *pub.Config) {
}
}
// UsingOVM
// setTxQueueSubscriber configures the Queue Backend
func setTxQueueSubscriber(ctx *cli.Context, cfg *rollup.QueueSubscriberConfig) {
if ctx.GlobalIsSet(TxPublisherEnableFlag.Name) {
cfg.Enable = ctx.GlobalBool(TxQueueEnableFlag.Name)
}
if ctx.GlobalIsSet(TxPublisherProjectIDFlag.Name) {
cfg.ProjectID = ctx.GlobalString(TxQueueProjectIDFlag.Name)
}
if ctx.GlobalIsSet(TxQueueSubscriptionIDFlag.Name) {
cfg.SubscriptionID = ctx.GlobalString(TxQueueSubscriptionIDFlag.Name)
}
if ctx.GlobalIsSet(TxQueueMaxOutstandingMessagesFlag.Name) {
cfg.MaxOutstandingMessages = ctx.GlobalInt(TxQueueMaxOutstandingMessagesFlag.Name)
}
if ctx.GlobalIsSet(TxQueueMaxOutstandingBytesFlag.Name) {
cfg.MaxOutstandingBytes = ctx.GlobalInt(TxQueueMaxOutstandingBytesFlag.Name)
}
}
// setLes configures the les server and ultra light client settings from the command line flags.
func setLes(ctx *cli.Context, cfg *eth.Config) {
if ctx.GlobalIsSet(LightLegacyServFlag.Name) {
......@@ -1645,6 +1690,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
setEth1(ctx, &cfg.Rollup)
setRollup(ctx, &cfg.Rollup)
setTxPublisher(ctx, &cfg.TxPublisher)
setTxQueueSubscriber(ctx, &cfg.TxQueueSubscriber)
if ctx.GlobalIsSet(SyncModeFlag.Name) {
cfg.SyncMode = *GlobalTextMarshaler(ctx, SyncModeFlag.Name).(*downloader.SyncMode)
......
......@@ -32,6 +32,17 @@ func (q QueueOrigin) String() string {
}
}
func (q QueueOrigin) MarshalJSON() ([]byte, error) {
switch q {
case QueueOriginSequencer:
return []byte("\"sequencer\""), nil
case QueueOriginL1ToL2:
return []byte("\"l1\""), nil
default:
return []byte("\"\""), nil
}
}
func (q *QueueOrigin) UnmarshalJSON(b []byte) error {
switch string(b) {
case "\"sequencer\"":
......
......@@ -213,7 +213,13 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
return nil, err
}
eth.syncService, err = rollup.NewSyncService(context.Background(), config.Rollup, eth.txPool, eth.blockchain, eth.chainDb, txLogger)
var txQueueSubscriber rollup.QueueSubscriber
txQueueSubscriber, err = rollup.NewQueueSubscriber(context.Background(), config.TxQueueSubscriber)
if err != nil {
return nil, err
}
eth.syncService, err = rollup.NewSyncService(context.Background(), config.Rollup, eth.txPool, eth.blockchain, eth.chainDb, txLogger, txQueueSubscriber)
if err != nil {
return nil, fmt.Errorf("Cannot initialize syncservice: %w", err)
}
......
......@@ -186,4 +186,6 @@ type Config struct {
Rollup rollup.Config
TxPublisher pub.Config
TxQueueSubscriber rollup.QueueSubscriberConfig
}
package rollup
import (
"context"
"cloud.google.com/go/pubsub"
)
type QueueSubscriberMessage interface {
Data() []byte
Ack()
Nack()
}
type QueueSubscriber interface {
ReceiveMessage(ctx context.Context, cb func(ctx context.Context, msg QueueSubscriberMessage))
Close() error
}
type QueueSubscriberConfig struct {
Enable bool
ProjectID string
SubscriptionID string
MaxOutstandingMessages int
MaxOutstandingBytes int
}
type queueSubscriber struct {
client *pubsub.Client
sub *pubsub.Subscription
}
func NewQueueSubscriber(ctx context.Context, config QueueSubscriberConfig) (QueueSubscriber, error) {
if !config.Enable {
return &noopQueueSubscriber{}, nil
}
client, err := pubsub.NewClient(ctx, config.ProjectID)
if err != nil {
return nil, err
}
sub := client.Subscription(config.SubscriptionID)
maxOutstandingMsgs := config.MaxOutstandingMessages
if maxOutstandingMsgs == 0 {
maxOutstandingMsgs = 10000
}
maxOutstandingBytes := config.MaxOutstandingBytes
if maxOutstandingBytes == 0 {
maxOutstandingBytes = 1e9
}
sub.ReceiveSettings = pubsub.ReceiveSettings{
MaxOutstandingMessages: maxOutstandingMsgs,
MaxOutstandingBytes: maxOutstandingBytes,
}
return &queueSubscriber{client, sub}, nil
}
func (q *queueSubscriber) ReceiveMessage(ctx context.Context, cb func(ctx context.Context, msg QueueSubscriberMessage)) {
q.sub.Receive(ctx, func(ctx context.Context, pmsg *pubsub.Message) {
cb(ctx, &queueSubscriberMessage{pmsg})
})
}
func (q *queueSubscriber) Close() error {
return q.client.Close()
}
type queueSubscriberMessage struct {
inner *pubsub.Message
}
func (q *queueSubscriberMessage) Data() []byte {
return q.inner.Data
}
func (q *queueSubscriberMessage) Ack() {
q.inner.Ack()
}
func (q *queueSubscriberMessage) Nack() {
q.inner.Nack()
}
type noopQueueSubscriber struct{}
func (q *noopQueueSubscriber) ReceiveMessage(ctx context.Context, cb func(ctx context.Context, msg QueueSubscriberMessage)) {
}
func (q *noopQueueSubscriber) Close() error { return nil }
......@@ -3,6 +3,7 @@ package rollup
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
......@@ -17,6 +18,7 @@ import (
"github.com/ethereum-optimism/optimism/l2geth/ethdb"
"github.com/ethereum-optimism/optimism/l2geth/event"
"github.com/ethereum-optimism/optimism/l2geth/log"
"github.com/ethereum-optimism/optimism/l2geth/rlp"
"github.com/ethereum-optimism/optimism/l2geth/core/rawdb"
"github.com/ethereum-optimism/optimism/l2geth/core/types"
......@@ -71,10 +73,11 @@ type SyncService struct {
feeThresholdUp *big.Float
feeThresholdDown *big.Float
txLogger pub.Publisher
queueSub QueueSubscriber
}
// NewSyncService returns an initialized sync service
func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *core.BlockChain, db ethdb.Database, txLogger pub.Publisher) (*SyncService, error) {
func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *core.BlockChain, db ethdb.Database, txLogger pub.Publisher, queueSub QueueSubscriber) (*SyncService, error) {
if bc == nil {
return nil, errors.New("Must pass BlockChain to SyncService")
}
......@@ -147,6 +150,7 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co
feeThresholdDown: cfg.FeeThresholdDown,
feeThresholdUp: cfg.FeeThresholdUp,
txLogger: txLogger,
queueSub: queueSub,
}
// The chainHeadSub is used to synchronize the SyncService with the chain.
......@@ -422,6 +426,10 @@ func (s *SyncService) verify() error {
if err := s.syncTransactionsToTip(); err != nil {
return fmt.Errorf("Verifier cannot sync transactions with BackendL2: %w", err)
}
case BackendQueue:
if err := s.syncTransactionFromQueue(); err != nil {
return fmt.Errorf("Verifier cannot sync transactions with BackendQueue: %w", err)
}
}
return nil
}
......@@ -877,12 +885,7 @@ func (s *SyncService) applyTransactionToTip(tx *types.Transaction) error {
log.Debug("Applying transaction to tip", "index", *tx.GetMeta().Index, "hash", tx.Hash().Hex(), "origin", tx.QueueOrigin().String())
// Log transaction to the failover log
encodedTx := new(bytes.Buffer)
if err := tx.EncodeRLP(encodedTx); err != nil {
return err
}
if err := s.txLogger.Publish(s.ctx, encodedTx.Bytes()); err != nil {
pubTxDropCounter.Inc(1)
if err := s.publishTransaction(tx); err != nil {
log.Error("Failed to publish transaction to log", "msg", err)
return fmt.Errorf("internal error: transaction logging failed")
}
......@@ -1230,12 +1233,77 @@ func (s *SyncService) syncTransactionRange(start, end uint64, backend Backend) e
return nil
}
// syncTransactionFromQueue will sync the earliest transaction from an external message queue
func (s *SyncService) syncTransactionFromQueue() error {
cb := func(ctx context.Context, msg QueueSubscriberMessage) {
var (
txMeta types.TransactionMeta
tx types.Transaction
)
if err := json.Unmarshal(msg.Data(), &txMeta); err != nil {
msg.Nack()
return
}
if err := rlp.DecodeBytes(txMeta.RawTransaction, &tx); err != nil {
msg.Nack()
return
}
if txMeta.L1BlockNumber == nil || txMeta.L1Timestamp == 0 {
log.Warn("missing required queued transaction fields", "msg", string(msg.Data()))
msg.Nack()
return
}
if readTx, _, _, _ := rawdb.ReadTransaction(s.db, tx.Hash()); readTx != nil {
msg.Ack()
return
}
if err := s.applyTransactionToTip(&tx); err != nil {
msg.Nack()
return
}
msg.Ack()
}
s.queueSub.ReceiveMessage(s.ctx, cb)
return nil
}
// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
func (s *SyncService) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return s.scope.Track(s.txFeed.Subscribe(ch))
}
func (s *SyncService) publishTransaction(tx *types.Transaction) error {
rawTx := new(bytes.Buffer)
if err := tx.EncodeRLP(rawTx); err != nil {
return err
}
if tx.L1BlockNumber() == nil || tx.L1Timestamp() == 0 {
return fmt.Errorf("transaction doesn't contain required fields")
}
// Manually populate RawTransaction as it's not always available
txMeta := tx.GetMeta()
txMeta.RawTransaction = rawTx.Bytes()
txLog := AsQueuedTransactionMeta(txMeta)
encodedTxLog, err := json.Marshal(&txLog)
if err != nil {
return err
}
if err := s.txLogger.Publish(s.ctx, encodedTxLog); err != nil {
pubTxDropCounter.Inc(1)
return err
}
return nil
}
func stringify(i *uint64) string {
if i == nil {
return "<nil>"
......@@ -1248,3 +1316,25 @@ func stringify(i *uint64) string {
func (s *SyncService) IngestTransaction(tx *types.Transaction) error {
return s.applyTransaction(tx)
}
type QueuedTransactionMeta struct {
L1BlockNumber *big.Int `json:"l1BlockNumber"`
L1Timestamp *uint64 `json:"l1Timestamp"`
L1MessageSender *common.Address `json:"l1MessageSender"`
QueueOrigin *types.QueueOrigin `json:"queueOrigin"`
Index *uint64 `json:"index"`
QueueIndex *uint64 `json:"queueIndex"`
RawTransaction []byte `json:"rawTransaction"`
}
func AsQueuedTransactionMeta(txMeta *types.TransactionMeta) *QueuedTransactionMeta {
return &QueuedTransactionMeta{
L1BlockNumber: txMeta.L1BlockNumber,
L1Timestamp: &txMeta.L1Timestamp,
L1MessageSender: txMeta.L1MessageSender,
QueueOrigin: &txMeta.QueueOrigin,
Index: txMeta.Index,
QueueIndex: txMeta.QueueIndex,
RawTransaction: txMeta.RawTransaction,
}
}
......@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/rand"
"encoding/json"
"errors"
"fmt"
"math/big"
......@@ -23,7 +24,6 @@ import (
"github.com/ethereum-optimism/optimism/l2geth/ethdb"
"github.com/ethereum-optimism/optimism/l2geth/event"
"github.com/ethereum-optimism/optimism/l2geth/params"
"github.com/ethereum-optimism/optimism/l2geth/rlp"
"github.com/ethereum-optimism/optimism/l2geth/rollup/pub"
"github.com/ethereum-optimism/optimism/l2geth/rollup/rcfg"
)
......@@ -899,7 +899,7 @@ func TestBadFeeThresholds(t *testing.T) {
cfg.FeeThresholdDown = tt.thresholdDown
cfg.FeeThresholdUp = tt.thresholdUp
_, err := NewSyncService(context.Background(), cfg, txPool, chain, db, &pub.NoopPublisher{})
_, err := NewSyncService(context.Background(), cfg, txPool, chain, db, &pub.NoopPublisher{}, &noopQueueSubscriber{})
if !errors.Is(err, tt.err) {
t.Fatalf("%s: %s", name, err)
}
......@@ -931,15 +931,15 @@ func TestSyncServiceTransactionLog(t *testing.T) {
txLogger.unblockPublish()
<-txCh
var loggedTx *types.Transaction
loggedTxMeta := new(QueuedTransactionMeta)
buf := <-txLogger.msgs
if err := rlp.DecodeBytes(buf, &loggedTx); err != nil {
t.Fatalf("unable to decode logged transaction: %v", err)
if err := json.Unmarshal(buf, loggedTxMeta); err != nil {
t.Fatalf("unable to decode logged transaction meta: %v", err)
}
txJSON, _ := tx.MarshalJSON()
loggedTxJSON, _ := loggedTx.MarshalJSON()
if !bytes.Equal(txJSON, loggedTxJSON) {
txMetaJSON, _ := tx.GetMeta().MarshalJSON()
loggedTxJSON, _ := json.Marshal(loggedTxMeta)
if !bytes.Equal(txMetaJSON, loggedTxJSON) {
t.Fatal("mismatched logged transactions")
}
}
......@@ -1011,7 +1011,7 @@ func newTestSyncService(isVerifier bool, alloc *common.Address, txLogger pub.Pub
if txLogger == nil {
txLogger = &pub.NoopPublisher{}
}
service, err := NewSyncService(context.Background(), cfg, txPool, chain, db, txLogger)
service, err := NewSyncService(context.Background(), cfg, txPool, chain, db, txLogger, &noopQueueSubscriber{})
if err != nil {
return nil, nil, nil, fmt.Errorf("Cannot initialize syncservice: %w", err)
}
......
......@@ -26,6 +26,8 @@ func (s Backend) String() string {
return "l1"
case BackendL2:
return "l2"
case BackendQueue:
return "queue"
default:
return ""
}
......@@ -38,6 +40,8 @@ func NewBackend(typ string) (Backend, error) {
return BackendL1, nil
case "l2":
return BackendL2, nil
case "queue":
return BackendQueue, nil
default:
return 0, fmt.Errorf("Unknown Backend: %s", typ)
}
......@@ -54,6 +58,11 @@ const (
// around the transactions as they have not been submitted via a batch to
// L1.
BackendL2
// BackendQueue Backend involves syncing transactions from an external message queue.
// This has the same guarantees as BackendL2 as such transactions may not have been
// submitted via a batch to L1.
BackendQueue
)
func isCtcTxEqual(a, b *types.Transaction) bool {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment