batch_submitter.go 6.36 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
package batchsubmitter

import (
	"context"
	"crypto/ecdsa"
	"fmt"
	"net/http"
	"os"
	"strconv"
	"time"

	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/crypto"
	"github.com/ethereum/go-ethereum/ethclient"
	"github.com/ethereum/go-ethereum/log"
	"github.com/getsentry/sentry-go"
	"github.com/prometheus/client_golang/prometheus/promhttp"
	"github.com/urfave/cli"
)

const (
	// defaultDialTimeout is default duration the service will wait on
	// startup to make a connection to either the L1 or L2 backends.
	defaultDialTimeout = 5 * time.Second
)

// Main is the entrypoint into the batch submitter service. This method returns
// a closure that executes the service and blocks until the service exits. The
// use of a closure allows the parameters bound to the top-level main package,
// e.g. GitVersion, to be captured and used once the function is executed.
func Main(gitVersion string) func(ctx *cli.Context) error {
	return func(ctx *cli.Context) error {
		cfg, err := NewConfig(ctx)
		if err != nil {
			return err
		}

		// The call to defer is done here so that any errors logged from
		// this point on are posted to Sentry before exiting.
		if cfg.SentryEnable {
			defer sentry.Flush(2 * time.Second)
		}

		_, err = NewBatchSubmitter(cfg, gitVersion)
		if err != nil {
			log.Error("Unable to create batch submitter", "error", err)
			return err
		}

		return nil
	}
}

// BatchSubmitter is a service that configures the necessary resources for
// running the TxBatchSubmitter and StateBatchSubmitter sub-services.
type BatchSubmitter struct {
	ctx              context.Context
	cfg              Config
	l1Client         *ethclient.Client
	l2Client         *ethclient.Client
	sequencerPrivKey *ecdsa.PrivateKey
	proposerPrivKey  *ecdsa.PrivateKey
	ctcAddress       common.Address
	sccAddress       common.Address
}

// NewBatchSubmitter initializes the BatchSubmitter, gathering any resources
// that will be needed by the TxBatchSubmitter and StateBatchSubmitter
// sub-services.
func NewBatchSubmitter(cfg Config, gitVersion string) (*BatchSubmitter, error) {
	ctx := context.Background()

	// Set up our logging. If Sentry is enabled, we will use our custom
	// log handler that logs to stdout and forwards any error messages to
	// Sentry for collection. Otherwise, logs will only be posted to stdout.
	var logHandler log.Handler
	if cfg.SentryEnable {
		err := sentry.Init(sentry.ClientOptions{
			Dsn:              cfg.SentryDsn,
			Environment:      cfg.EthNetworkName,
			Release:          "batch-submitter@" + gitVersion,
			TracesSampleRate: traceRateToFloat64(cfg.SentryTraceRate),
			Debug:            false,
		})
		if err != nil {
			return nil, err
		}

		logHandler = SentryStreamHandler(os.Stdout, log.TerminalFormat(true))
	} else {
		logHandler = log.StreamHandler(os.Stdout, log.TerminalFormat(true))
	}

	logLevel, err := log.LvlFromString(cfg.LogLevel)
	if err != nil {
		return nil, err
	}

	log.Root().SetHandler(log.LvlFilterHandler(logLevel, logHandler))

	log.Info("Config", "config", fmt.Sprintf("%#v", cfg))

	// Parse sequencer private key and CTC contract address.
	sequencerPrivKey, ctcAddress, err := parseWalletPrivKeyAndContractAddr(
		"Sequencer", cfg.Mnemonic, cfg.SequencerHDPath,
		cfg.SequencerPrivateKey, cfg.CTCAddress,
	)
	if err != nil {
		return nil, err
	}

	// Parse proposer private key and SCC contract address.
	proposerPrivKey, sccAddress, err := parseWalletPrivKeyAndContractAddr(
		"Proposer", cfg.Mnemonic, cfg.ProposerHDPath,
		cfg.ProposerPrivateKey, cfg.SCCAddress,
	)
	if err != nil {
		return nil, err
	}

	// Connect to L1 and L2 providers. Perform these lastsince they are the
	// most expensive.
	l1Client, err := dialEthClientWithTimeout(ctx, cfg.L1EthRpc)
	if err != nil {
		return nil, err
	}

	l2Client, err := dialEthClientWithTimeout(ctx, cfg.L2EthRpc)
	if err != nil {
		return nil, err
	}

	if cfg.MetricsServerEnable {
		go runMetricsServer(cfg.MetricsHostname, cfg.MetricsPort)
	}

	return &BatchSubmitter{
		ctx:              ctx,
		cfg:              cfg,
		l1Client:         l1Client,
		l2Client:         l2Client,
		sequencerPrivKey: sequencerPrivKey,
		proposerPrivKey:  proposerPrivKey,
		ctcAddress:       ctcAddress,
		sccAddress:       sccAddress,
	}, nil
}

// parseWalletPrivKeyAndContractAddr returns the wallet private key to use for
// sending transactions as well as the contract address to send to for a
// particular sub-service.
func parseWalletPrivKeyAndContractAddr(
	name string,
	mnemonic string,
	hdPath string,
	privKeyStr string,
	contractAddrStr string,
) (*ecdsa.PrivateKey, common.Address, error) {

	// Parse wallet private key from either privkey string or BIP39 mnemonic
	// and BIP32 HD derivation path.
	privKey, err := GetConfiguredPrivateKey(mnemonic, hdPath, privKeyStr)
	if err != nil {
		return nil, common.Address{}, err
	}

	// Parse the target contract address the wallet will send to.
	contractAddress, err := ParseAddress(contractAddrStr)
	if err != nil {
		return nil, common.Address{}, err
	}

	// Log wallet address rather than private key...
	walletAddress := crypto.PubkeyToAddress(privKey.PublicKey)

	log.Info(name+" wallet params parsed successfully", "wallet_address",
		walletAddress, "contract_address", contractAddress)

	return privKey, contractAddress, nil
}

// runMetricsServer spins up a prometheus metrics server at the provided
// hostname and port.
//
// NOTE: This method MUST be run as a goroutine.
func runMetricsServer(hostname string, port uint64) {
	metricsPortStr := strconv.FormatUint(port, 10)
	metricsAddr := fmt.Sprintf("%s: %s", hostname, metricsPortStr)

	http.Handle("/metrics", promhttp.Handler())
	_ = http.ListenAndServe(metricsAddr, nil)
}

// dialEthClientWithTimeout attempts to dial the L1 or L2 provider using the
// provided URL. If the dial doesn't complete within defaultDialTimeout seconds,
// this method will return an error.
func dialEthClientWithTimeout(ctx context.Context, url string) (
	*ethclient.Client, error) {

	ctxt, cancel := context.WithTimeout(ctx, defaultDialTimeout)
	defer cancel()

	return ethclient.DialContext(ctxt, url)
}

// traceRateToFloat64 converts a time.Duration into a valid float64 for the
// Sentry client. The client only accepts values between 0.0 and 1.0, so this
// method clamps anything greater than 1 second to 1.0.
func traceRateToFloat64(rate time.Duration) float64 {
	rate64 := float64(rate) / float64(time.Second)
	if rate64 > 1.0 {
		rate64 = 1.0
	}
	return rate64
}