Commit 3e016ae8 authored by inphi's avatar inphi

l2geth: Publish transactions to external msg queue

To facilitate failover recovery without causing a chainsplit.
parent fde01d46
...@@ -164,6 +164,9 @@ var ( ...@@ -164,6 +164,9 @@ var (
utils.RollupFeeThresholdDownFlag, utils.RollupFeeThresholdDownFlag,
utils.RollupFeeThresholdUpFlag, utils.RollupFeeThresholdUpFlag,
utils.SequencerClientHttpFlag, utils.SequencerClientHttpFlag,
utils.TxPublisherProjectIDFlag,
utils.TxPublisherTopicIDFlag,
utils.TxPublisherTimeoutFlag,
} }
rpcFlags = []cli.Flag{ rpcFlags = []cli.Flag{
......
...@@ -78,6 +78,9 @@ var AppHelpFlagGroups = []flagGroup{ ...@@ -78,6 +78,9 @@ var AppHelpFlagGroups = []flagGroup{
utils.RollupFeeThresholdDownFlag, utils.RollupFeeThresholdDownFlag,
utils.RollupFeeThresholdUpFlag, utils.RollupFeeThresholdUpFlag,
utils.SequencerClientHttpFlag, utils.SequencerClientHttpFlag,
utils.TxPublisherProjectIDFlag,
utils.TxPublisherTopicIDFlag,
utils.TxPublisherTimeoutFlag,
}, },
}, },
{ {
......
...@@ -61,6 +61,7 @@ import ( ...@@ -61,6 +61,7 @@ import (
"github.com/ethereum-optimism/optimism/l2geth/p2p/netutil" "github.com/ethereum-optimism/optimism/l2geth/p2p/netutil"
"github.com/ethereum-optimism/optimism/l2geth/params" "github.com/ethereum-optimism/optimism/l2geth/params"
"github.com/ethereum-optimism/optimism/l2geth/rollup" "github.com/ethereum-optimism/optimism/l2geth/rollup"
"github.com/ethereum-optimism/optimism/l2geth/rollup/pub"
"github.com/ethereum-optimism/optimism/l2geth/rpc" "github.com/ethereum-optimism/optimism/l2geth/rpc"
whisper "github.com/ethereum-optimism/optimism/l2geth/whisper/whisperv6" whisper "github.com/ethereum-optimism/optimism/l2geth/whisper/whisperv6"
pcsclite "github.com/gballet/go-libpcsclite" pcsclite "github.com/gballet/go-libpcsclite"
...@@ -866,6 +867,21 @@ var ( ...@@ -866,6 +867,21 @@ var (
Usage: "HTTP endpoint for the sequencer client", Usage: "HTTP endpoint for the sequencer client",
EnvVar: "SEQUENCER_CLIENT_HTTP", EnvVar: "SEQUENCER_CLIENT_HTTP",
} }
TxPublisherProjectIDFlag = cli.StringFlag{
Name: "txpublisher.projectid",
Usage: "GCP Project ID for the tx PubSub",
EnvVar: "TX_PUBLISHER_PROJECT_ID",
}
TxPublisherTopicIDFlag = cli.StringFlag{
Name: "txpublisher.topicid",
Usage: "Topic ID used for PubSub",
EnvVar: "TX_PUBLISHER_TOPIC_ID",
}
TxPublisherTimeoutFlag = cli.DurationFlag{
Name: "txpublisher.timeout",
Usage: "Transaction publishing timeout",
EnvVar: "TX_PUBLISHER_TIMEOUT",
}
) )
// MakeDataDir retrieves the currently requested data directory, terminating // MakeDataDir retrieves the currently requested data directory, terminating
...@@ -1147,6 +1163,20 @@ func setRollup(ctx *cli.Context, cfg *rollup.Config) { ...@@ -1147,6 +1163,20 @@ func setRollup(ctx *cli.Context, cfg *rollup.Config) {
} }
} }
// UsingOVM
// setTxPublisher configures the transaction logger
func setTxPublisher(ctx *cli.Context, cfg *pub.Config) {
if ctx.GlobalIsSet(TxPublisherProjectIDFlag.Name) {
cfg.ProjectID = ctx.GlobalString(TxPublisherProjectIDFlag.Name)
}
if ctx.GlobalIsSet(TxPublisherTopicIDFlag.Name) {
cfg.TopicID = ctx.GlobalString(TxPublisherTopicIDFlag.Name)
}
if ctx.GlobalIsSet(TxPublisherTimeoutFlag.Name) {
cfg.Timeout = ctx.GlobalDuration(TxPublisherTimeoutFlag.Name)
}
}
// setLes configures the les server and ultra light client settings from the command line flags. // setLes configures the les server and ultra light client settings from the command line flags.
func setLes(ctx *cli.Context, cfg *eth.Config) { func setLes(ctx *cli.Context, cfg *eth.Config) {
if ctx.GlobalIsSet(LightLegacyServFlag.Name) { if ctx.GlobalIsSet(LightLegacyServFlag.Name) {
...@@ -1606,6 +1636,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) { ...@@ -1606,6 +1636,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
setLes(ctx, cfg) setLes(ctx, cfg)
setEth1(ctx, &cfg.Rollup) setEth1(ctx, &cfg.Rollup)
setRollup(ctx, &cfg.Rollup) setRollup(ctx, &cfg.Rollup)
setTxPublisher(ctx, &cfg.TxPublisher)
if ctx.GlobalIsSet(SyncModeFlag.Name) { if ctx.GlobalIsSet(SyncModeFlag.Name) {
cfg.SyncMode = *GlobalTextMarshaler(ctx, SyncModeFlag.Name).(*downloader.SyncMode) cfg.SyncMode = *GlobalTextMarshaler(ctx, SyncModeFlag.Name).(*downloader.SyncMode)
......
...@@ -52,6 +52,7 @@ import ( ...@@ -52,6 +52,7 @@ import (
"github.com/ethereum-optimism/optimism/l2geth/params" "github.com/ethereum-optimism/optimism/l2geth/params"
"github.com/ethereum-optimism/optimism/l2geth/rlp" "github.com/ethereum-optimism/optimism/l2geth/rlp"
"github.com/ethereum-optimism/optimism/l2geth/rollup" "github.com/ethereum-optimism/optimism/l2geth/rollup"
"github.com/ethereum-optimism/optimism/l2geth/rollup/pub"
"github.com/ethereum-optimism/optimism/l2geth/rollup/rcfg" "github.com/ethereum-optimism/optimism/l2geth/rollup/rcfg"
"github.com/ethereum-optimism/optimism/l2geth/rpc" "github.com/ethereum-optimism/optimism/l2geth/rpc"
) )
...@@ -206,7 +207,16 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { ...@@ -206,7 +207,16 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
} }
eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain) eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain)
eth.syncService, err = rollup.NewSyncService(context.Background(), config.Rollup, eth.txPool, eth.blockchain, eth.chainDb) var txLogger pub.Publisher = &pub.NoopPublisher{}
if config.TxPublisher.ProjectID != "" {
var err error
txLogger, err = pub.NewGooglePublisher(context.Background(), config.TxPublisher)
if err != nil {
return nil, err
}
}
eth.syncService, err = rollup.NewSyncService(context.Background(), config.Rollup, eth.txPool, eth.blockchain, eth.chainDb, txLogger)
if err != nil { if err != nil {
return nil, fmt.Errorf("Cannot initialize syncservice: %w", err) return nil, fmt.Errorf("Cannot initialize syncservice: %w", err)
} }
......
...@@ -32,6 +32,7 @@ import ( ...@@ -32,6 +32,7 @@ import (
"github.com/ethereum-optimism/optimism/l2geth/miner" "github.com/ethereum-optimism/optimism/l2geth/miner"
"github.com/ethereum-optimism/optimism/l2geth/params" "github.com/ethereum-optimism/optimism/l2geth/params"
"github.com/ethereum-optimism/optimism/l2geth/rollup" "github.com/ethereum-optimism/optimism/l2geth/rollup"
"github.com/ethereum-optimism/optimism/l2geth/rollup/pub"
) )
// DefaultConfig contains default settings for use on the Ethereum main net. // DefaultConfig contains default settings for use on the Ethereum main net.
...@@ -183,4 +184,6 @@ type Config struct { ...@@ -183,4 +184,6 @@ type Config struct {
// Optimism Rollup Config // Optimism Rollup Config
Rollup rollup.Config Rollup rollup.Config
TxPublisher pub.Config
} }
...@@ -3,6 +3,7 @@ module github.com/ethereum-optimism/optimism/l2geth ...@@ -3,6 +3,7 @@ module github.com/ethereum-optimism/optimism/l2geth
go 1.15 go 1.15
require ( require (
cloud.google.com/go/pubsub v1.18.0
github.com/Azure/azure-storage-blob-go v0.7.0 github.com/Azure/azure-storage-blob-go v0.7.0
github.com/VictoriaMetrics/fastcache v1.6.0 github.com/VictoriaMetrics/fastcache v1.6.0
github.com/aristanetworks/goarista v0.0.0-20170210015632-ea17b1a17847 github.com/aristanetworks/goarista v0.0.0-20170210015632-ea17b1a17847
...@@ -21,7 +22,7 @@ require ( ...@@ -21,7 +22,7 @@ require (
github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff
github.com/go-resty/resty/v2 v2.4.0 github.com/go-resty/resty/v2 v2.4.0
github.com/go-stack/stack v1.8.0 github.com/go-stack/stack v1.8.0
github.com/golang/protobuf v1.4.3 github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.4 github.com/golang/snappy v0.0.4
github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket v1.4.2
github.com/graph-gophers/graphql-go v0.0.0-20201113091052-beb923fada29 github.com/graph-gophers/graphql-go v0.0.0-20201113091052-beb923fada29
...@@ -51,9 +52,9 @@ require ( ...@@ -51,9 +52,9 @@ require (
github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208 github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912 golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27
golang.org/x/text v0.3.6 golang.org/x/text v0.3.6
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce
gopkg.in/olebedev/go-duktape.v3 v3.0.0-20200619000410-60c24ae608a6 gopkg.in/olebedev/go-duktape.v3 v3.0.0-20200619000410-60c24ae608a6
gopkg.in/sourcemap.v1 v1.0.5 // indirect gopkg.in/sourcemap.v1 v1.0.5 // indirect
......
This diff is collapsed.
package rollup
import "github.com/ethereum-optimism/optimism/l2geth/metrics"
var (
pubTxDropCounter = metrics.NewRegisteredCounter("rollup/pub/txdrops", nil)
)
package pub
import (
"context"
"time"
"cloud.google.com/go/pubsub"
"github.com/ethereum-optimism/optimism/l2geth/log"
)
const messageOrderingKey = "o"
type Config struct {
ProjectID string
TopicID string
Timeout time.Duration
}
type GooglePublisher struct {
client *pubsub.Client
topic *pubsub.Topic
publishSettings pubsub.PublishSettings
Timeout time.Duration
}
func NewGooglePublisher(ctx context.Context, config Config) (*GooglePublisher, error) {
client, err := pubsub.NewClient(ctx, config.ProjectID)
if err != nil {
return nil, err
}
topic := client.Topic(config.TopicID)
topic.EnableMessageOrdering = true
// Publish messages immediately
publishSettings := pubsub.PublishSettings{
DelayThreshold: 0,
CountThreshold: 0,
}
timeout := config.Timeout
if timeout == 0 {
log.Info("Sanitizing publisher timeout to 2 seconds")
timeout = time.Second * 2
}
return &GooglePublisher{client, topic, publishSettings, timeout}, nil
}
func (p *GooglePublisher) Publish(ctx context.Context, msg []byte) error {
ctx, cancel := context.WithTimeout(ctx, p.Timeout)
defer cancel()
pmsg := pubsub.Message{
Data: msg,
OrderingKey: messageOrderingKey,
}
result := p.topic.Publish(ctx, &pmsg)
_, err := result.Get(ctx)
return err
}
package pub
import "context"
type Publisher interface {
// Publish schedules an ordereed message to be sent
Publish(ctx context.Context, msg []byte) error
}
type NoopPublisher struct{}
func (p *NoopPublisher) Publish(ctx context.Context, msg []byte) error {
return nil
}
package rollup package rollup
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"fmt" "fmt"
...@@ -22,6 +23,7 @@ import ( ...@@ -22,6 +23,7 @@ import (
"github.com/ethereum-optimism/optimism/l2geth/eth/gasprice" "github.com/ethereum-optimism/optimism/l2geth/eth/gasprice"
"github.com/ethereum-optimism/optimism/l2geth/rollup/fees" "github.com/ethereum-optimism/optimism/l2geth/rollup/fees"
"github.com/ethereum-optimism/optimism/l2geth/rollup/pub"
"github.com/ethereum-optimism/optimism/l2geth/rollup/rcfg" "github.com/ethereum-optimism/optimism/l2geth/rollup/rcfg"
) )
...@@ -68,10 +70,11 @@ type SyncService struct { ...@@ -68,10 +70,11 @@ type SyncService struct {
signer types.Signer signer types.Signer
feeThresholdUp *big.Float feeThresholdUp *big.Float
feeThresholdDown *big.Float feeThresholdDown *big.Float
txLogger pub.Publisher
} }
// NewSyncService returns an initialized sync service // NewSyncService returns an initialized sync service
func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *core.BlockChain, db ethdb.Database) (*SyncService, error) { func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *core.BlockChain, db ethdb.Database, txLogger pub.Publisher) (*SyncService, error) {
if bc == nil { if bc == nil {
return nil, errors.New("Must pass BlockChain to SyncService") return nil, errors.New("Must pass BlockChain to SyncService")
} }
...@@ -143,6 +146,7 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co ...@@ -143,6 +146,7 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co
signer: types.NewEIP155Signer(chainID), signer: types.NewEIP155Signer(chainID),
feeThresholdDown: cfg.FeeThresholdDown, feeThresholdDown: cfg.FeeThresholdDown,
feeThresholdUp: cfg.FeeThresholdUp, feeThresholdUp: cfg.FeeThresholdUp,
txLogger: txLogger,
} }
// The chainHeadSub is used to synchronize the SyncService with the chain. // The chainHeadSub is used to synchronize the SyncService with the chain.
...@@ -872,6 +876,17 @@ func (s *SyncService) applyTransactionToTip(tx *types.Transaction) error { ...@@ -872,6 +876,17 @@ func (s *SyncService) applyTransactionToTip(tx *types.Transaction) error {
// The index was set above so it is safe to dereference // The index was set above so it is safe to dereference
log.Debug("Applying transaction to tip", "index", *tx.GetMeta().Index, "hash", tx.Hash().Hex(), "origin", tx.QueueOrigin().String()) log.Debug("Applying transaction to tip", "index", *tx.GetMeta().Index, "hash", tx.Hash().Hex(), "origin", tx.QueueOrigin().String())
// Log transaction to the failover log
encodedTx := new(bytes.Buffer)
if err := tx.EncodeRLP(encodedTx); err != nil {
return err
}
if err := s.txLogger.Publish(s.ctx, encodedTx.Bytes()); err != nil {
pubTxDropCounter.Inc(1)
log.Error("Failed to publish transaction to log", "msg", err)
return fmt.Errorf("internal error: transaction logging failed")
}
txs := types.Transactions{tx} txs := types.Transactions{tx}
errCh := make(chan error, 1) errCh := make(chan error, 1)
s.txFeed.Send(core.NewTxsEvent{ s.txFeed.Send(core.NewTxsEvent{
......
package rollup package rollup
import ( import (
"bytes"
"context" "context"
"crypto/rand" "crypto/rand"
"errors" "errors"
...@@ -22,13 +23,15 @@ import ( ...@@ -22,13 +23,15 @@ import (
"github.com/ethereum-optimism/optimism/l2geth/ethdb" "github.com/ethereum-optimism/optimism/l2geth/ethdb"
"github.com/ethereum-optimism/optimism/l2geth/event" "github.com/ethereum-optimism/optimism/l2geth/event"
"github.com/ethereum-optimism/optimism/l2geth/params" "github.com/ethereum-optimism/optimism/l2geth/params"
"github.com/ethereum-optimism/optimism/l2geth/rlp"
"github.com/ethereum-optimism/optimism/l2geth/rollup/pub"
"github.com/ethereum-optimism/optimism/l2geth/rollup/rcfg" "github.com/ethereum-optimism/optimism/l2geth/rollup/rcfg"
) )
// Test that the timestamps are updated correctly. // Test that the timestamps are updated correctly.
// This impacts execution, for `block.timestamp` // This impacts execution, for `block.timestamp`
func TestSyncServiceTimestampUpdate(t *testing.T) { func TestSyncServiceTimestampUpdate(t *testing.T) {
service, txCh, _, err := newTestSyncService(false, nil) service, txCh, _, err := newTestSyncService(false, nil, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -111,7 +114,7 @@ func TestSyncServiceTimestampUpdate(t *testing.T) { ...@@ -111,7 +114,7 @@ func TestSyncServiceTimestampUpdate(t *testing.T) {
// Test that the L1 blocknumber is updated correctly // Test that the L1 blocknumber is updated correctly
func TestSyncServiceL1BlockNumberUpdate(t *testing.T) { func TestSyncServiceL1BlockNumberUpdate(t *testing.T) {
service, txCh, _, err := newTestSyncService(false, nil) service, txCh, _, err := newTestSyncService(false, nil, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -179,7 +182,7 @@ func TestSyncServiceL1BlockNumberUpdate(t *testing.T) { ...@@ -179,7 +182,7 @@ func TestSyncServiceL1BlockNumberUpdate(t *testing.T) {
// after the transaction enqueued event is emitted. Set `false` as // after the transaction enqueued event is emitted. Set `false` as
// the argument to start as a sequencer // the argument to start as a sequencer
func TestSyncServiceTransactionEnqueued(t *testing.T) { func TestSyncServiceTransactionEnqueued(t *testing.T) {
service, txCh, _, err := newTestSyncService(false, nil) service, txCh, _, err := newTestSyncService(false, nil, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -241,7 +244,7 @@ func TestSyncServiceTransactionEnqueued(t *testing.T) { ...@@ -241,7 +244,7 @@ func TestSyncServiceTransactionEnqueued(t *testing.T) {
} }
func TestTransactionToTipNoIndex(t *testing.T) { func TestTransactionToTipNoIndex(t *testing.T) {
service, txCh, _, err := newTestSyncService(false, nil) service, txCh, _, err := newTestSyncService(false, nil, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -292,7 +295,7 @@ func TestTransactionToTipNoIndex(t *testing.T) { ...@@ -292,7 +295,7 @@ func TestTransactionToTipNoIndex(t *testing.T) {
} }
func TestTransactionToTipTimestamps(t *testing.T) { func TestTransactionToTipTimestamps(t *testing.T) {
service, txCh, _, err := newTestSyncService(false, nil) service, txCh, _, err := newTestSyncService(false, nil, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -368,7 +371,7 @@ func TestTransactionToTipTimestamps(t *testing.T) { ...@@ -368,7 +371,7 @@ func TestTransactionToTipTimestamps(t *testing.T) {
} }
func TestApplyIndexedTransaction(t *testing.T) { func TestApplyIndexedTransaction(t *testing.T) {
service, txCh, _, err := newTestSyncService(true, nil) service, txCh, _, err := newTestSyncService(true, nil, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -411,7 +414,7 @@ func TestApplyIndexedTransaction(t *testing.T) { ...@@ -411,7 +414,7 @@ func TestApplyIndexedTransaction(t *testing.T) {
} }
func TestApplyBatchedTransaction(t *testing.T) { func TestApplyBatchedTransaction(t *testing.T) {
service, txCh, _, err := newTestSyncService(true, nil) service, txCh, _, err := newTestSyncService(true, nil, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -448,7 +451,7 @@ func TestApplyBatchedTransaction(t *testing.T) { ...@@ -448,7 +451,7 @@ func TestApplyBatchedTransaction(t *testing.T) {
} }
func TestIsAtTip(t *testing.T) { func TestIsAtTip(t *testing.T) {
service, _, _, err := newTestSyncService(true, nil) service, _, _, err := newTestSyncService(true, nil, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -521,7 +524,7 @@ func TestIsAtTip(t *testing.T) { ...@@ -521,7 +524,7 @@ func TestIsAtTip(t *testing.T) {
} }
func TestSyncQueue(t *testing.T) { func TestSyncQueue(t *testing.T) {
service, txCh, _, err := newTestSyncService(true, nil) service, txCh, _, err := newTestSyncService(true, nil, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -578,7 +581,7 @@ func TestSyncQueue(t *testing.T) { ...@@ -578,7 +581,7 @@ func TestSyncQueue(t *testing.T) {
} }
func TestSyncServiceL1GasPrice(t *testing.T) { func TestSyncServiceL1GasPrice(t *testing.T) {
service, _, _, err := newTestSyncService(true, nil) service, _, _, err := newTestSyncService(true, nil, nil)
setupMockClient(service, map[string]interface{}{}) setupMockClient(service, map[string]interface{}{})
if err != nil { if err != nil {
...@@ -616,7 +619,7 @@ func TestSyncServiceL1GasPrice(t *testing.T) { ...@@ -616,7 +619,7 @@ func TestSyncServiceL1GasPrice(t *testing.T) {
} }
func TestSyncServiceL2GasPrice(t *testing.T) { func TestSyncServiceL2GasPrice(t *testing.T) {
service, _, _, err := newTestSyncService(true, nil) service, _, _, err := newTestSyncService(true, nil, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -651,7 +654,7 @@ func TestSyncServiceL2GasPrice(t *testing.T) { ...@@ -651,7 +654,7 @@ func TestSyncServiceL2GasPrice(t *testing.T) {
} }
func TestSyncServiceGasPriceOracleOwnerAddress(t *testing.T) { func TestSyncServiceGasPriceOracleOwnerAddress(t *testing.T) {
service, _, _, err := newTestSyncService(true, nil) service, _, _, err := newTestSyncService(true, nil, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -685,7 +688,7 @@ func TestSyncServiceGasPriceOracleOwnerAddress(t *testing.T) { ...@@ -685,7 +688,7 @@ func TestSyncServiceGasPriceOracleOwnerAddress(t *testing.T) {
// Only the gas price oracle owner can send 0 gas price txs // Only the gas price oracle owner can send 0 gas price txs
// when fees are enforced // when fees are enforced
func TestFeeGasPriceOracleOwnerTransactions(t *testing.T) { func TestFeeGasPriceOracleOwnerTransactions(t *testing.T) {
service, _, _, err := newTestSyncService(true, nil) service, _, _, err := newTestSyncService(true, nil, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -741,7 +744,7 @@ func TestFeeGasPriceOracleOwnerTransactions(t *testing.T) { ...@@ -741,7 +744,7 @@ func TestFeeGasPriceOracleOwnerTransactions(t *testing.T) {
// Pass true to set as a verifier // Pass true to set as a verifier
func TestSyncServiceSync(t *testing.T) { func TestSyncServiceSync(t *testing.T) {
service, txCh, sub, err := newTestSyncService(true, nil) service, txCh, sub, err := newTestSyncService(true, nil, nil)
defer sub.Unsubscribe() defer sub.Unsubscribe()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -793,7 +796,7 @@ func TestSyncServiceSync(t *testing.T) { ...@@ -793,7 +796,7 @@ func TestSyncServiceSync(t *testing.T) {
} }
func TestInitializeL1ContextPostGenesis(t *testing.T) { func TestInitializeL1ContextPostGenesis(t *testing.T) {
service, _, _, err := newTestSyncService(true, nil) service, _, _, err := newTestSyncService(true, nil, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -896,7 +899,7 @@ func TestBadFeeThresholds(t *testing.T) { ...@@ -896,7 +899,7 @@ func TestBadFeeThresholds(t *testing.T) {
cfg.FeeThresholdDown = tt.thresholdDown cfg.FeeThresholdDown = tt.thresholdDown
cfg.FeeThresholdUp = tt.thresholdUp cfg.FeeThresholdUp = tt.thresholdUp
_, err := NewSyncService(context.Background(), cfg, txPool, chain, db) _, err := NewSyncService(context.Background(), cfg, txPool, chain, db, &pub.NoopPublisher{})
if !errors.Is(err, tt.err) { if !errors.Is(err, tt.err) {
t.Fatalf("%s: %s", name, err) t.Fatalf("%s: %s", name, err)
} }
...@@ -904,6 +907,69 @@ func TestBadFeeThresholds(t *testing.T) { ...@@ -904,6 +907,69 @@ func TestBadFeeThresholds(t *testing.T) {
} }
} }
// Ensure that Transaction Logging preceeds transaction apply
func TestSyncServiceTransactionLog(t *testing.T) {
txLogger := newMockPublisher()
service, txCh, _, err := newTestSyncService(false, nil, txLogger)
if err != nil {
t.Fatal(err)
}
tx := mockTx()
go func() {
err = service.applyTransactionToTip(tx)
}()
txLogger.waitForPublish()
select {
case <-txCh:
t.Fatal("transaction applied before being logged")
default:
}
txLogger.unblockPublish()
<-txCh
var loggedTx *types.Transaction
buf := <-txLogger.msgs
if err := rlp.DecodeBytes(buf, &loggedTx); err != nil {
t.Fatalf("unable to decode logged transaction: %v", err)
}
txJSON, _ := tx.MarshalJSON()
loggedTxJSON, _ := loggedTx.MarshalJSON()
if !bytes.Equal(txJSON, loggedTxJSON) {
t.Fatal("mismatched logged transactions")
}
}
func TestSyncServiceTransactionLogFailed(t *testing.T) {
txLogger := &failingMockPublisher{}
service, txCh, _, err := newTestSyncService(false, nil, txLogger)
if err != nil {
t.Fatal(err)
}
tx := mockTx()
errCh := make(chan error)
go func() {
err = service.applyTransactionToTip(tx)
errCh <- err
close(errCh)
}()
if err := <-errCh; err == nil {
t.Fatal("transaction applied with log failure")
}
select {
case <-txCh:
t.Fatal("transaction applied after log failed")
default:
}
}
func newTestSyncServiceDeps(isVerifier bool, alloc *common.Address) (Config, *core.TxPool, *core.BlockChain, ethdb.Database, error) { func newTestSyncServiceDeps(isVerifier bool, alloc *common.Address) (Config, *core.TxPool, *core.BlockChain, ethdb.Database, error) {
chainCfg := params.AllEthashProtocolChanges chainCfg := params.AllEthashProtocolChanges
chainID := big.NewInt(420) chainID := big.NewInt(420)
...@@ -937,12 +1003,15 @@ func newTestSyncServiceDeps(isVerifier bool, alloc *common.Address) (Config, *co ...@@ -937,12 +1003,15 @@ func newTestSyncServiceDeps(isVerifier bool, alloc *common.Address) (Config, *co
return cfg, txPool, chain, db, nil return cfg, txPool, chain, db, nil
} }
func newTestSyncService(isVerifier bool, alloc *common.Address) (*SyncService, chan core.NewTxsEvent, event.Subscription, error) { func newTestSyncService(isVerifier bool, alloc *common.Address, txLogger pub.Publisher) (*SyncService, chan core.NewTxsEvent, event.Subscription, error) {
cfg, txPool, chain, db, err := newTestSyncServiceDeps(isVerifier, alloc) cfg, txPool, chain, db, err := newTestSyncServiceDeps(isVerifier, alloc)
if err != nil { if err != nil {
return nil, nil, nil, fmt.Errorf("Cannot initialize syncservice: %w", err) return nil, nil, nil, fmt.Errorf("Cannot initialize syncservice: %w", err)
} }
service, err := NewSyncService(context.Background(), cfg, txPool, chain, db) if txLogger == nil {
txLogger = &pub.NoopPublisher{}
}
service, err := NewSyncService(context.Background(), cfg, txPool, chain, db, txLogger)
if err != nil { if err != nil {
return nil, nil, nil, fmt.Errorf("Cannot initialize syncservice: %w", err) return nil, nil, nil, fmt.Errorf("Cannot initialize syncservice: %w", err)
} }
...@@ -954,6 +1023,44 @@ func newTestSyncService(isVerifier bool, alloc *common.Address) (*SyncService, c ...@@ -954,6 +1023,44 @@ func newTestSyncService(isVerifier bool, alloc *common.Address) (*SyncService, c
return service, txCh, sub, nil return service, txCh, sub, nil
} }
type mockPublisher struct {
wg1 sync.WaitGroup
wg2 sync.WaitGroup
msgs chan ([]byte)
}
func newMockPublisher() *mockPublisher {
p := mockPublisher{
msgs: make(chan []byte, 1024),
}
p.wg1.Add(1)
p.wg2.Add(1)
return &p
}
// unblockPublish allows a call to Publish to proceed
func (p *mockPublisher) unblockPublish() {
p.wg1.Done()
}
// waitForPublish blocks until a call is made to Publish
func (p *mockPublisher) waitForPublish() {
p.wg2.Wait()
}
func (p *mockPublisher) Publish(ctx context.Context, msg []byte) error {
p.wg2.Done()
p.wg1.Wait()
p.msgs <- msg
return nil
}
type failingMockPublisher struct{}
func (p *failingMockPublisher) Publish(ctx context.Context, msg []byte) error {
return fmt.Errorf("publish failed")
}
type mockClient struct { type mockClient struct {
getEnqueueCallCount int getEnqueueCallCount int
getEnqueue []*types.Transaction getEnqueue []*types.Transaction
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment