Commit 68106135 authored by inphi's avatar inphi

fix flags

and use l1 block params during tx apply
parent 3e7b5ab8
......@@ -168,6 +168,11 @@ var (
utils.TxPublisherProjectIDFlag,
utils.TxPublisherTopicIDFlag,
utils.TxPublisherTimeoutFlag,
utils.TxQueueEnableFlag,
utils.TxQueueProjectIDFlag,
utils.TxQueueSubscriptionIDFlag,
utils.TxQueueMaxOutstandingBytesFlag,
utils.TxQueueMaxOutstandingMessagesFlag,
}
rpcFlags = []cli.Flag{
......
......@@ -82,6 +82,11 @@ var AppHelpFlagGroups = []flagGroup{
utils.TxPublisherProjectIDFlag,
utils.TxPublisherTopicIDFlag,
utils.TxPublisherTimeoutFlag,
utils.TxQueueEnableFlag,
utils.TxQueueProjectIDFlag,
utils.TxQueueSubscriptionIDFlag,
utils.TxQueueMaxOutstandingBytesFlag,
utils.TxQueueMaxOutstandingMessagesFlag,
},
},
{
......
......@@ -1213,10 +1213,10 @@ func setTxPublisher(ctx *cli.Context, cfg *pub.Config) {
// UsingOVM
// setTxQueueSubscriber configures the Queue Backend
func setTxQueueSubscriber(ctx *cli.Context, cfg *rollup.QueueSubscriberConfig) {
if ctx.GlobalIsSet(TxPublisherEnableFlag.Name) {
if ctx.GlobalIsSet(TxQueueEnableFlag.Name) {
cfg.Enable = ctx.GlobalBool(TxQueueEnableFlag.Name)
}
if ctx.GlobalIsSet(TxPublisherProjectIDFlag.Name) {
if ctx.GlobalIsSet(TxQueueProjectIDFlag.Name) {
cfg.ProjectID = ctx.GlobalString(TxQueueProjectIDFlag.Name)
}
if ctx.GlobalIsSet(TxQueueSubscriptionIDFlag.Name) {
......
......@@ -4,6 +4,7 @@ import (
"context"
"cloud.google.com/go/pubsub"
"github.com/ethereum-optimism/optimism/l2geth/log"
)
type QueueSubscriberMessage interface {
......@@ -13,7 +14,7 @@ type QueueSubscriberMessage interface {
}
type QueueSubscriber interface {
ReceiveMessage(ctx context.Context, cb func(ctx context.Context, msg QueueSubscriberMessage))
ReceiveMessage(ctx context.Context, cb func(ctx context.Context, msg QueueSubscriberMessage)) error
Close() error
}
......@@ -55,11 +56,12 @@ func NewQueueSubscriber(ctx context.Context, config QueueSubscriberConfig) (Queu
MaxOutstandingBytes: maxOutstandingBytes,
}
log.Info("Created Queue Subscriber", "projectID", config.ProjectID, "subscriptionID", config.SubscriptionID)
return &queueSubscriber{client, sub}, nil
}
func (q *queueSubscriber) ReceiveMessage(ctx context.Context, cb func(ctx context.Context, msg QueueSubscriberMessage)) {
q.sub.Receive(ctx, func(ctx context.Context, pmsg *pubsub.Message) {
func (q *queueSubscriber) ReceiveMessage(ctx context.Context, cb func(ctx context.Context, msg QueueSubscriberMessage)) error {
return q.sub.Receive(ctx, func(ctx context.Context, pmsg *pubsub.Message) {
cb(ctx, &queueSubscriberMessage{pmsg})
})
}
......@@ -86,6 +88,7 @@ func (q *queueSubscriberMessage) Nack() {
type noopQueueSubscriber struct{}
func (q *noopQueueSubscriber) ReceiveMessage(ctx context.Context, cb func(ctx context.Context, msg QueueSubscriberMessage)) {
func (q *noopQueueSubscriber) ReceiveMessage(ctx context.Context, cb func(ctx context.Context, msg QueueSubscriberMessage)) error {
return nil
}
func (q *noopQueueSubscriber) Close() error { return nil }
......@@ -165,7 +165,7 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co
// a remote server that indexes the layer one contracts. Place this
// code behind this if statement so that this can run without the
// requirement of the remote server being up.
if service.enable {
if service.enable && service.backend != BackendQueue {
// Ensure that the rollup client can connect to a remote server
// before starting. Retry until it can connect.
tEnsure := time.NewTicker(10 * time.Second)
......@@ -427,7 +427,7 @@ func (s *SyncService) verify() error {
return fmt.Errorf("Verifier cannot sync transactions with BackendL2: %w", err)
}
case BackendQueue:
if err := s.syncTransactionFromQueue(); err != nil {
if err := s.syncTransactionsFromQueue(); err != nil {
return fmt.Errorf("Verifier cannot sync transactions with BackendQueue: %w", err)
}
}
......@@ -1233,30 +1233,43 @@ func (s *SyncService) syncTransactionRange(start, end uint64, backend Backend) e
return nil
}
// syncTransactionFromQueue will sync the earliest transaction from an external message queue
func (s *SyncService) syncTransactionFromQueue() error {
// syncTransactionsFromQueue will sync the earliest transaction from an external message queue
func (s *SyncService) syncTransactionsFromQueue() error {
// we don't drop messages unless they're already applied
cb := func(ctx context.Context, msg QueueSubscriberMessage) {
var (
txMeta types.TransactionMeta
tx types.Transaction
queuedTxMeta QueuedTransactionMeta
tx types.Transaction
txMeta *types.TransactionMeta
)
if err := json.Unmarshal(msg.Data(), &txMeta); err != nil {
log.Debug("Reading transaction from queue", "json", string(msg.Data()))
if err := json.Unmarshal(msg.Data(), &queuedTxMeta); err != nil {
log.Error("Failed to unmarshal logged TransactionMeta", "msg", err)
msg.Nack()
return
}
if err := rlp.DecodeBytes(txMeta.RawTransaction, &tx); err != nil {
if err := rlp.DecodeBytes(queuedTxMeta.RawTransaction, &tx); err != nil {
log.Error("decoding raw transaction failed", "msg", err)
msg.Nack()
return
}
if txMeta.L1BlockNumber == nil || txMeta.L1Timestamp == 0 {
if queuedTxMeta.L1BlockNumber == nil || queuedTxMeta.L1Timestamp == nil {
log.Error("Missing required queued transaction fields", "msg", string(msg.Data()))
msg.Nack()
return
}
txMeta = types.NewTransactionMeta(
queuedTxMeta.L1BlockNumber,
*queuedTxMeta.L1Timestamp,
queuedTxMeta.L1MessageSender,
*queuedTxMeta.QueueOrigin,
queuedTxMeta.Index,
queuedTxMeta.QueueIndex,
queuedTxMeta.RawTransaction)
tx.SetTransactionMeta(txMeta)
if readTx, _, _, _ := rawdb.ReadTransaction(s.db, tx.Hash()); readTx != nil {
msg.Ack()
return
......@@ -1268,12 +1281,12 @@ func (s *SyncService) syncTransactionFromQueue() error {
return
}
log.Debug("Successfully applied queued transaction", "txhash", tx.Hash())
msg.Ack()
}
// This blocks until there's a new message in the queue or ctx deadline hits
s.queueSub.ReceiveMessage(s.ctx, cb)
return nil
return s.queueSub.ReceiveMessage(s.ctx, cb)
}
// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and
......
......@@ -990,7 +990,7 @@ func TestSyncServiceBackendQueue(t *testing.T) {
}()
go func() {
followerService.syncTransactionFromQueue()
followerService.syncTransactionsFromQueue()
}()
// forward the logged transaction from the "active sequencer"
......@@ -1036,7 +1036,7 @@ func TestSyncServiceBackendQueueNack(t *testing.T) {
}
go func() {
service.syncTransactionFromQueue()
service.syncTransactionsFromQueue()
}()
queueSub.ProduceMessage(msg)
......@@ -1164,9 +1164,10 @@ func (p *mockQueueSubscriber) ProduceMessage(msg []byte) {
p.msgs <- msg
}
func (p *mockQueueSubscriber) ReceiveMessage(ctx context.Context, cb func(ctx context.Context, msg QueueSubscriberMessage)) {
func (p *mockQueueSubscriber) ReceiveMessage(ctx context.Context, cb func(ctx context.Context, msg QueueSubscriberMessage)) error {
msg := <-p.msgs
cb(ctx, &mockQueueSubscriberMessage{msg, p.events})
return nil
}
func (p *mockQueueSubscriber) Close() error { return nil }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment