Commit 9f05c877 authored by inphi's avatar inphi

add txpublisher.enable flag

parent a11c4c4f
...@@ -164,6 +164,7 @@ var ( ...@@ -164,6 +164,7 @@ var (
utils.RollupFeeThresholdDownFlag, utils.RollupFeeThresholdDownFlag,
utils.RollupFeeThresholdUpFlag, utils.RollupFeeThresholdUpFlag,
utils.SequencerClientHttpFlag, utils.SequencerClientHttpFlag,
utils.TxPublisherEnableFlag,
utils.TxPublisherProjectIDFlag, utils.TxPublisherProjectIDFlag,
utils.TxPublisherTopicIDFlag, utils.TxPublisherTopicIDFlag,
utils.TxPublisherTimeoutFlag, utils.TxPublisherTimeoutFlag,
......
...@@ -78,6 +78,7 @@ var AppHelpFlagGroups = []flagGroup{ ...@@ -78,6 +78,7 @@ var AppHelpFlagGroups = []flagGroup{
utils.RollupFeeThresholdDownFlag, utils.RollupFeeThresholdDownFlag,
utils.RollupFeeThresholdUpFlag, utils.RollupFeeThresholdUpFlag,
utils.SequencerClientHttpFlag, utils.SequencerClientHttpFlag,
utils.TxPublisherEnableFlag,
utils.TxPublisherProjectIDFlag, utils.TxPublisherProjectIDFlag,
utils.TxPublisherTopicIDFlag, utils.TxPublisherTopicIDFlag,
utils.TxPublisherTimeoutFlag, utils.TxPublisherTimeoutFlag,
......
...@@ -867,6 +867,11 @@ var ( ...@@ -867,6 +867,11 @@ var (
Usage: "HTTP endpoint for the sequencer client", Usage: "HTTP endpoint for the sequencer client",
EnvVar: "SEQUENCER_CLIENT_HTTP", EnvVar: "SEQUENCER_CLIENT_HTTP",
} }
TxPublisherEnableFlag = cli.BoolFlag{
Name: "txpublisher.enable",
Usage: "Enable Transaction logging to PubSub",
EnvVar: "TX_PUBLISHER_ENABLE",
}
TxPublisherProjectIDFlag = cli.StringFlag{ TxPublisherProjectIDFlag = cli.StringFlag{
Name: "txpublisher.projectid", Name: "txpublisher.projectid",
Usage: "GCP Project ID for the tx PubSub", Usage: "GCP Project ID for the tx PubSub",
...@@ -1166,6 +1171,9 @@ func setRollup(ctx *cli.Context, cfg *rollup.Config) { ...@@ -1166,6 +1171,9 @@ func setRollup(ctx *cli.Context, cfg *rollup.Config) {
// UsingOVM // UsingOVM
// setTxPublisher configures the transaction logger // setTxPublisher configures the transaction logger
func setTxPublisher(ctx *cli.Context, cfg *pub.Config) { func setTxPublisher(ctx *cli.Context, cfg *pub.Config) {
if ctx.GlobalIsSet(TxPublisherEnableFlag.Name) {
cfg.Enable = ctx.GlobalBool(TxPublisherEnableFlag.Name)
}
if ctx.GlobalIsSet(TxPublisherProjectIDFlag.Name) { if ctx.GlobalIsSet(TxPublisherProjectIDFlag.Name) {
cfg.ProjectID = ctx.GlobalString(TxPublisherProjectIDFlag.Name) cfg.ProjectID = ctx.GlobalString(TxPublisherProjectIDFlag.Name)
} }
......
...@@ -207,13 +207,10 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { ...@@ -207,13 +207,10 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
} }
eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain) eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain)
var txLogger pub.Publisher = &pub.NoopPublisher{} var txLogger pub.Publisher
if config.TxPublisher.ProjectID != "" { txLogger, err = pub.NewGooglePublisher(context.Background(), config.TxPublisher)
var err error if err != nil {
txLogger, err = pub.NewGooglePublisher(context.Background(), config.TxPublisher) return nil, err
if err != nil {
return nil, err
}
} }
eth.syncService, err = rollup.NewSyncService(context.Background(), config.Rollup, eth.txPool, eth.blockchain, eth.chainDb, txLogger) eth.syncService, err = rollup.NewSyncService(context.Background(), config.Rollup, eth.txPool, eth.blockchain, eth.chainDb, txLogger)
......
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
const messageOrderingKey = "o" const messageOrderingKey = "o"
type Config struct { type Config struct {
Enable bool
ProjectID string ProjectID string
TopicID string TopicID string
Timeout time.Duration Timeout time.Duration
...@@ -25,7 +26,11 @@ type GooglePublisher struct { ...@@ -25,7 +26,11 @@ type GooglePublisher struct {
mutex sync.Mutex mutex sync.Mutex
} }
func NewGooglePublisher(ctx context.Context, config Config) (*GooglePublisher, error) { func NewGooglePublisher(ctx context.Context, config Config) (Publisher, error) {
if !config.Enable {
return &NoopPublisher{}, nil
}
client, err := pubsub.NewClient(ctx, config.ProjectID) client, err := pubsub.NewClient(ctx, config.ProjectID)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -43,6 +48,8 @@ func NewGooglePublisher(ctx context.Context, config Config) (*GooglePublisher, e ...@@ -43,6 +48,8 @@ func NewGooglePublisher(ctx context.Context, config Config) (*GooglePublisher, e
log.Info("Sanitizing publisher timeout to 2 seconds") log.Info("Sanitizing publisher timeout to 2 seconds")
timeout = time.Second * 2 timeout = time.Second * 2
} }
log.Info("Initialized transaction log to PubSub", "topic", config.TopicID)
return &GooglePublisher{ return &GooglePublisher{
client: client, client: client,
topic: topic, topic: topic,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment