Commit 6d5ffa21 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into fix/min-gaslimit-deposit

parents ad0b4d12 f9928f1e
......@@ -364,6 +364,13 @@ jobs:
environment:
FOUNDRY_PROFILE: ci
working_directory: packages/contracts-bedrock
- run:
name: validate deploy configs
command: |
yarn validate-deploy-configs || echo "export DEPLOY_CONFIG_STATUS=1" >> "$BASH_ENV"
environment:
FOUNDRY_PROFILE: ci
working_directory: packages/contracts-bedrock
- run:
name: storage snapshot
command: |
......@@ -387,6 +394,10 @@ jobs:
FAILED=1
echo "Gas snapshot failed, see job output for details."
fi
if [[ "$DEPLOY_CONFIG_STATUS" -ne 0 ]]; then
FAILED=1
echo "Deploy configs invalid, see job output for details."
fi
if [[ "$STORAGE_SNAPSHOT_STATUS" -ne 0 ]]; then
echo "Storage snapshot failed, see job output for details."
FAILED=1
......
......@@ -34,6 +34,7 @@ require (
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa
golang.org/x/crypto v0.6.0
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb
golang.org/x/sync v0.1.0
golang.org/x/term v0.5.0
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af
)
......@@ -176,7 +177,6 @@ require (
go.uber.org/zap v1.24.0 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/tools v0.6.0 // indirect
......
......@@ -199,6 +199,11 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
return txData{}, io.EOF
}
// we have blocks, but we cannot add them to the channel right now
if s.pendingChannel != nil && s.pendingChannel.IsFull() {
return txData{}, io.EOF
}
if err := s.ensurePendingChannel(l1Head); err != nil {
return txData{}, err
}
......
......@@ -26,8 +26,9 @@ type Config struct {
RollupNode *sources.RollupClient
TxManager txmgr.TxManager
NetworkTimeout time.Duration
PollInterval time.Duration
NetworkTimeout time.Duration
PollInterval time.Duration
MaxPendingTransactions uint64
// RollupConfig is queried at startup
Rollup *rollup.Config
......@@ -76,6 +77,10 @@ type CLIConfig struct {
// and creating a new batch.
PollInterval time.Duration
// MaxPendingTransactions is the maximum number of concurrent pending
// transactions sent to the transaction manager.
MaxPendingTransactions uint64
// MaxL1TxSize is the maximum size of a batch tx submitted to L1.
MaxL1TxSize uint64
......@@ -128,16 +133,17 @@ func NewConfig(ctx *cli.Context) CLIConfig {
PollInterval: ctx.GlobalDuration(flags.PollIntervalFlag.Name),
/* Optional Flags */
MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name),
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name),
TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name),
ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name),
Stopped: ctx.GlobalBool(flags.StoppedFlag.Name),
TxMgrConfig: txmgr.ReadCLIConfig(ctx),
RPCConfig: rpc.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
MaxPendingTransactions: ctx.GlobalUint64(flags.MaxPendingTransactionsFlag.Name),
MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name),
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name),
TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name),
ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name),
Stopped: ctx.GlobalBool(flags.StoppedFlag.Name),
TxMgrConfig: txmgr.ReadCLIConfig(ctx),
RPCConfig: rpc.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
}
}
......@@ -75,13 +75,14 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
}
batcherCfg := Config{
L1Client: l1Client,
L2Client: l2Client,
RollupNode: rollupClient,
PollInterval: cfg.PollInterval,
NetworkTimeout: cfg.TxMgrConfig.NetworkTimeout,
TxManager: txManager,
Rollup: rcfg,
L1Client: l1Client,
L2Client: l2Client,
RollupNode: rollupClient,
PollInterval: cfg.PollInterval,
MaxPendingTransactions: cfg.MaxPendingTransactions,
NetworkTimeout: cfg.TxMgrConfig.NetworkTimeout,
TxManager: txManager,
Rollup: rcfg,
Channel: ChannelConfig{
SeqWindowSize: rcfg.SeqWindowSize,
ChannelTimeout: rcfg.ChannelTimeout,
......@@ -286,13 +287,23 @@ func (l *BatchSubmitter) loop() {
ticker := time.NewTicker(l.PollInterval)
defer ticker.Stop()
receiptsCh := make(chan txmgr.TxReceipt[txData])
queue := txmgr.NewQueue[txData](l.killCtx, l.txMgr, l.MaxPendingTransactions)
for {
select {
case <-ticker.C:
l.loadBlocksIntoState(l.shutdownCtx)
l.publishStateToL1(l.killCtx)
l.publishStateToL1(queue, receiptsCh, false)
case r := <-receiptsCh:
l.handleReceipt(r)
case <-l.shutdownCtx.Done():
l.publishStateToL1(l.killCtx)
err := l.state.Close()
if err != nil {
l.log.Error("error closing the channel manager", "err", err)
}
l.publishStateToL1(queue, receiptsCh, true)
return
}
}
......@@ -300,70 +311,90 @@ func (l *BatchSubmitter) loop() {
// publishStateToL1 loops through the block data loaded into `state` and
// submits the associated data to the L1 in the form of channel frames.
func (l *BatchSubmitter) publishStateToL1(ctx context.Context) {
for {
// Attempt to gracefully terminate the current channel, ensuring that no new frames will be
// produced. Any remaining frames must still be published to the L1 to prevent stalling.
select {
case <-ctx.Done():
err := l.state.Close()
if err != nil {
l.log.Error("error closing the channel manager", "err", err)
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData], drain bool) {
txDone := make(chan struct{})
// send/wait and receipt reading must be on a separate goroutines to avoid deadlocks
go func() {
defer func() {
if drain {
// if draining, we wait for all transactions to complete
queue.Wait()
}
case <-l.shutdownCtx.Done():
err := l.state.Close()
close(txDone)
}()
for {
err := l.publishTxToL1(l.killCtx, queue, receiptsCh)
if err != nil {
l.log.Error("error closing the channel manager", "err", err)
if drain && err != io.EOF {
l.log.Error("error sending tx while draining state", "err", err)
}
return
}
default:
}
}()
l1tip, err := l.l1Tip(ctx)
if err != nil {
l.log.Error("Failed to query L1 tip", "error", err)
for {
select {
case r := <-receiptsCh:
l.handleReceipt(r)
case <-txDone:
return
}
l.recordL1Tip(l1tip)
}
}
// Collect next transaction data
txdata, err := l.state.TxData(l1tip.ID())
if err == io.EOF {
l.log.Trace("no transaction data available")
break
} else if err != nil {
l.log.Error("unable to get tx data", "err", err)
break
}
// Record TX Status
if receipt, err := l.sendTransaction(ctx, txdata.Bytes()); err != nil {
l.recordFailedTx(txdata.ID(), err)
} else {
l.recordConfirmedTx(txdata.ID(), receipt)
}
// publishTxToL1 submits a single state tx to the L1
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) error {
// send all available transactions
l1tip, err := l.l1Tip(ctx)
if err != nil {
l.log.Error("Failed to query L1 tip", "error", err)
return err
}
l.recordL1Tip(l1tip)
// Collect next transaction data
txdata, err := l.state.TxData(l1tip.ID())
if err == io.EOF {
l.log.Trace("no transaction data available")
return err
} else if err != nil {
l.log.Error("unable to get tx data", "err", err)
return err
}
l.sendTransaction(txdata, queue, receiptsCh)
return nil
}
// sendTransaction creates & submits a transaction to the batch inbox address with the given `data`.
// It currently uses the underlying `txmgr` to handle transaction sending & price management.
// This is a blocking method. It should not be called concurrently.
func (l *BatchSubmitter) sendTransaction(ctx context.Context, data []byte) (*types.Receipt, error) {
func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txData], receiptsCh chan txmgr.TxReceipt[txData]) {
// Do the gas estimation offline. A value of 0 will cause the [txmgr] to estimate the gas limit.
data := txdata.Bytes()
intrinsicGas, err := core.IntrinsicGas(data, nil, false, true, true, false)
if err != nil {
return nil, fmt.Errorf("failed to calculate intrinsic gas: %w", err)
l.log.Error("Failed to calculate intrinsic gas", "error", err)
return
}
// Send the transaction through the txmgr
if receipt, err := l.txMgr.Send(ctx, txmgr.TxCandidate{
candidate := txmgr.TxCandidate{
To: &l.Rollup.BatchInboxAddress,
TxData: data,
GasLimit: intrinsicGas,
}); err != nil {
l.log.Warn("unable to publish tx", "err", err, "data_size", len(data))
return nil, err
}
queue.Send(txdata, candidate, receiptsCh)
}
func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txData]) {
// Record TX Status
if r.Err != nil {
l.log.Warn("unable to publish tx", "err", r.Err, "data_size", r.ID.Len())
l.recordFailedTx(r.ID.ID(), r.Err)
} else {
l.log.Info("tx successfully published", "tx_hash", receipt.TxHash, "data_size", len(data))
return receipt, nil
l.log.Info("tx successfully published", "tx_hash", r.Receipt.TxHash, "data_size", r.ID.Len())
l.recordConfirmedTx(r.ID.ID(), r.Receipt)
}
}
......
......@@ -26,6 +26,10 @@ func (td *txData) Bytes() []byte {
return append([]byte{derive.DerivationVersion0}, td.frame.data...)
}
func (td *txData) Len() int {
return 1 + len(td.frame.data)
}
// Frame returns the single frame of this tx data.
//
// Note: when the batcher is changed to possibly send multiple frames per tx,
......
......@@ -2,6 +2,7 @@ package flags
import (
"fmt"
"time"
"github.com/urfave/cli"
......@@ -33,21 +34,27 @@ var (
Usage: "HTTP provider URL for Rollup node",
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "ROLLUP_RPC"),
}
// Optional flags
SubSafetyMarginFlag = cli.Uint64Flag{
Name: "sub-safety-margin",
Usage: "The batcher tx submission safety margin (in #L1-blocks) to subtract " +
"from a channel's timeout and sequencing window, to guarantee safe inclusion " +
"of a channel on L1.",
Value: 10,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "SUB_SAFETY_MARGIN"),
}
PollIntervalFlag = cli.DurationFlag{
Name: "poll-interval",
Usage: "Delay between querying L2 for more transactions and " +
"creating a new batch",
Name: "poll-interval",
Usage: "How frequently to poll L2 for new blocks",
Value: 6 * time.Second,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "POLL_INTERVAL"),
}
// Optional flags
MaxPendingTransactionsFlag = cli.Uint64Flag{
Name: "max-pending-tx",
Usage: "The maximum number of pending transactions. 0 for no limit.",
Value: 1,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "MAX_PENDING_TX"),
}
MaxChannelDurationFlag = cli.Uint64Flag{
Name: "max-channel-duration",
Usage: "The maximum duration of L1-blocks to keep a channel open. 0 to disable.",
......@@ -91,11 +98,12 @@ var requiredFlags = []cli.Flag{
L1EthRpcFlag,
L2EthRpcFlag,
RollupRpcFlag,
SubSafetyMarginFlag,
PollIntervalFlag,
}
var optionalFlags = []cli.Flag{
SubSafetyMarginFlag,
PollIntervalFlag,
MaxPendingTransactionsFlag,
MaxChannelDurationFlag,
MaxL1TxSizeBytesFlag,
TargetL1TxSizeBytesFlag,
......
......@@ -19,6 +19,7 @@ import (
"github.com/ethereum-optimism/optimism/op-chain-ops/crossdomain"
"github.com/ethereum-optimism/optimism/op-chain-ops/genesis"
"github.com/ethereum-optimism/optimism/op-chain-ops/util"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
......@@ -933,14 +934,12 @@ func createOutput(
LatestBlockhash: header.Hash(),
}
// TODO(mark): import the function from `op-node` to compute the hash
// instead of doing this. Will update when testing against mainnet.
localOutputRootHash := crypto.Keccak256Hash(
outputRootProof.Version[:],
outputRootProof.StateRoot[:],
outputRootProof.MessagePasserStorageRoot[:],
outputRootProof.LatestBlockhash[:],
)
// Compute the output root locally
l2OutputRoot, err := rollup.ComputeL2OutputRoot(&outputRootProof)
localOutputRootHash := common.Hash(l2OutputRoot)
if err != nil {
return nil, bindings.TypesOutputRootProof{}, nil, err
}
// ensure that the locally computed hash matches
if l2Output.OutputRoot != localOutputRootHash {
......
......@@ -593,17 +593,18 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
// Batch Submitter
sys.BatchSubmitter, err = bss.NewBatchSubmitterFromCLIConfig(bss.CLIConfig{
L1EthRpc: sys.Nodes["l1"].WSEndpoint(),
L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(),
RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(),
MaxChannelDuration: 1,
MaxL1TxSize: 120_000,
TargetL1TxSize: 100_000,
TargetNumFrames: 1,
ApproxComprRatio: 0.4,
SubSafetyMargin: 4,
PollInterval: 50 * time.Millisecond,
TxMgrConfig: newTxMgrConfig(sys.Nodes["l1"].WSEndpoint(), cfg.Secrets.Batcher),
L1EthRpc: sys.Nodes["l1"].WSEndpoint(),
L2EthRpc: sys.Nodes["sequencer"].WSEndpoint(),
RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(),
MaxPendingTransactions: 1,
MaxChannelDuration: 1,
MaxL1TxSize: 120_000,
TargetL1TxSize: 100_000,
TargetNumFrames: 1,
ApproxComprRatio: 0.4,
SubSafetyMargin: 4,
PollInterval: 50 * time.Millisecond,
TxMgrConfig: newTxMgrConfig(sys.Nodes["l1"].WSEndpoint(), cfg.Secrets.Batcher),
LogConfig: oplog.CLIConfig{
Level: "info",
Format: "text",
......
......@@ -9,9 +9,11 @@ import (
"github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum-optimism/optimism/op-node/sources"
"github.com/ethereum-optimism/optimism/op-node/testlog"
oppcl "github.com/ethereum-optimism/optimism/op-program/client"
"github.com/ethereum-optimism/optimism/op-program/client/driver"
opp "github.com/ethereum-optimism/optimism/op-program/host"
oppconf "github.com/ethereum-optimism/optimism/op-program/host/config"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
......@@ -19,7 +21,27 @@ import (
"github.com/stretchr/testify/require"
)
// bypass the test runnner if running client to execute the fpp directly
func init() {
if !opp.RunningProgramInClient() {
return
}
logger := oplog.NewLogger(oplog.CLIConfig{
Level: "debug",
Format: "text",
})
oppcl.Main(logger)
}
func TestVerifyL2OutputRoot(t *testing.T) {
testVerifyL2OutputRoot(t, false)
}
func TestVerifyL2OutputRootDetached(t *testing.T) {
testVerifyL2OutputRoot(t, true)
}
func testVerifyL2OutputRoot(t *testing.T, detached bool) {
InitParallel(t)
ctx := context.Background()
......@@ -93,6 +115,7 @@ func TestVerifyL2OutputRoot(t *testing.T) {
fppConfig.L1URL = sys.NodeEndpoint("l1")
fppConfig.L2URL = sys.NodeEndpoint("sequencer")
fppConfig.DataDir = preimageDir
fppConfig.Detached = detached
// Check the FPP confirms the expected output
t.Log("Running fault proof in fetching mode")
......@@ -119,7 +142,11 @@ func TestVerifyL2OutputRoot(t *testing.T) {
t.Log("Running fault proof with invalid claim")
fppConfig.L2Claim = common.Hash{0xaa}
err = opp.FaultProofProgram(log, fppConfig)
require.ErrorIs(t, err, driver.ErrClaimNotValid)
if detached {
require.Error(t, err, "exit status 1")
} else {
require.ErrorIs(t, err, driver.ErrClaimNotValid)
}
}
func waitForSafeHead(ctx context.Context, safeBlockNum uint64, rollupClient *sources.RollupClient) error {
......
......@@ -10,35 +10,6 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup"
)
var Beta1 = rollup.Config{
Genesis: rollup.Genesis{
L1: eth.BlockID{
Hash: common.HexToHash("0x59c72db5fec5bf231e61ba59854cff33945ff6652699c55f2431ac2c010610d5"),
Number: 8046397,
},
L2: eth.BlockID{
Hash: common.HexToHash("0xa89b19033c8b43365e244f425a7e4acb5bae21d1893e1be0eb8cddeb29950d72"),
Number: 0,
},
L2Time: 1669088016,
SystemConfig: eth.SystemConfig{
BatcherAddr: common.HexToAddress("0x793b6822fd651af8c58039847be64cb9ee854bc9"),
Overhead: eth.Bytes32(common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000834")),
Scalar: eth.Bytes32(common.HexToHash("0x00000000000000000000000000000000000000000000000000000000000f4240")),
GasLimit: 30000000,
},
},
BlockTime: 2,
MaxSequencerDrift: 3600,
SeqWindowSize: 120,
ChannelTimeout: 30,
L1ChainID: big.NewInt(5),
L2ChainID: big.NewInt(902),
BatchInboxAddress: common.HexToAddress("0xFb3aECf08940785D4fB3Ad87cDC6e1Ceb20e9aac"),
DepositContractAddress: common.HexToAddress("0xf91795564662DcC9a17de67463ec5BA9C6DC207b"),
L1SystemConfigAddress: common.HexToAddress("0x686df068eaa71af78dadc1c427e35600e0fadac5"),
}
var Goerli = rollup.Config{
Genesis: rollup.Genesis{
L1: eth.BlockID{
......@@ -70,7 +41,6 @@ var Goerli = rollup.Config{
}
var NetworksByName = map[string]rollup.Config{
"beta-1": Beta1,
"goerli": Goerli,
}
......
package client
import (
"encoding/binary"
"encoding/json"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-program/preimage"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params"
)
const (
L1HeadLocalIndex preimage.LocalIndexKey = iota + 1
L2HeadLocalIndex
L2ClaimLocalIndex
L2ClaimBlockNumberLocalIndex
L2ChainConfigLocalIndex
RollupConfigLocalIndex
)
type BootInfo struct {
L1Head common.Hash
L2Head common.Hash
L2Claim common.Hash
L2ClaimBlockNumber uint64
L2ChainConfig *params.ChainConfig
RollupConfig *rollup.Config
}
type oracleClient interface {
Get(key preimage.Key) []byte
}
type BootstrapClient struct {
r oracleClient
}
func NewBootstrapClient(r oracleClient) *BootstrapClient {
return &BootstrapClient{r: r}
}
func (br *BootstrapClient) BootInfo() *BootInfo {
l1Head := common.BytesToHash(br.r.Get(L1HeadLocalIndex))
l2Head := common.BytesToHash(br.r.Get(L2HeadLocalIndex))
l2Claim := common.BytesToHash(br.r.Get(L2ClaimLocalIndex))
l2ClaimBlockNumber := binary.BigEndian.Uint64(br.r.Get(L2ClaimBlockNumberLocalIndex))
l2ChainConfig := new(params.ChainConfig)
err := json.Unmarshal(br.r.Get(L2ChainConfigLocalIndex), &l2ChainConfig)
if err != nil {
panic("failed to bootstrap l2ChainConfig")
}
rollupConfig := new(rollup.Config)
err = json.Unmarshal(br.r.Get(RollupConfigLocalIndex), rollupConfig)
if err != nil {
panic("failed to bootstrap rollup config")
}
return &BootInfo{
L1Head: l1Head,
L2Head: l2Head,
L2Claim: l2Claim,
L2ClaimBlockNumber: l2ClaimBlockNumber,
L2ChainConfig: l2ChainConfig,
RollupConfig: rollupConfig,
}
}
package client
import (
"encoding/binary"
"encoding/json"
"testing"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-program/preimage"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require"
)
func TestBootstrapClient(t *testing.T) {
bootInfo := &BootInfo{
L1Head: common.HexToHash("0x1111"),
L2Head: common.HexToHash("0x2222"),
L2Claim: common.HexToHash("0x3333"),
L2ClaimBlockNumber: 1,
L2ChainConfig: params.GoerliChainConfig,
RollupConfig: &chaincfg.Goerli,
}
mockOracle := &mockBoostrapOracle{bootInfo}
readBootInfo := NewBootstrapClient(mockOracle).BootInfo()
require.EqualValues(t, bootInfo, readBootInfo)
}
type mockBoostrapOracle struct {
b *BootInfo
}
func (o *mockBoostrapOracle) Get(key preimage.Key) []byte {
switch key.PreimageKey() {
case L1HeadLocalIndex.PreimageKey():
return o.b.L1Head[:]
case L2HeadLocalIndex.PreimageKey():
return o.b.L2Head[:]
case L2ClaimLocalIndex.PreimageKey():
return o.b.L2Claim[:]
case L2ClaimBlockNumberLocalIndex.PreimageKey():
return binary.BigEndian.AppendUint64(nil, o.b.L2ClaimBlockNumber)
case L2ChainConfigLocalIndex.PreimageKey():
b, _ := json.Marshal(o.b.L2ChainConfig)
return b
case RollupConfigLocalIndex.PreimageKey():
b, _ := json.Marshal(o.b.RollupConfig)
return b
default:
panic("unknown key")
}
}
package client
const (
// 0,1,2 used for stdin,stdout,stderr
HClientRFd = iota + 3
HClientWFd
PClientRFd
PClientWFd
MaxFd
)
......@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"os"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
......@@ -15,30 +16,53 @@ import (
cldr "github.com/ethereum-optimism/optimism/op-program/client/driver"
"github.com/ethereum-optimism/optimism/op-program/client/l1"
"github.com/ethereum-optimism/optimism/op-program/client/l2"
oppio "github.com/ethereum-optimism/optimism/op-program/io"
"github.com/ethereum-optimism/optimism/op-program/preimage"
)
// ClientProgram executes the Program, while attached to an IO based pre-image oracle, to be served by a host.
func ClientProgram(
logger log.Logger,
cfg *rollup.Config,
l2Cfg *params.ChainConfig,
l1Head common.Hash,
l2Head common.Hash,
l2Claim common.Hash,
l2ClaimBlockNumber uint64,
preimageOracle io.ReadWriter,
preimageHinter io.ReadWriter,
) error {
// Main executes the client program in a detached context and exits the current process.
// The client runtime environment must be preset before calling this function.
func Main(logger log.Logger) {
log.Info("Starting fault proof program client")
preimageOracle := CreatePreimageChannel()
preimageHinter := CreateHinterChannel()
if err := RunProgram(logger, preimageOracle, preimageHinter); errors.Is(err, cldr.ErrClaimNotValid) {
log.Error("Claim is invalid", "err", err)
os.Exit(1)
} else if err != nil {
log.Error("Program failed", "err", err)
os.Exit(2)
} else {
log.Info("Claim successfully verified")
os.Exit(0)
}
}
// RunProgram executes the Program, while attached to an IO based pre-image oracle, to be served by a host.
func RunProgram(logger log.Logger, preimageOracle io.ReadWriter, preimageHinter io.ReadWriter) error {
pClient := preimage.NewOracleClient(preimageOracle)
hClient := preimage.NewHintWriter(preimageHinter)
l1PreimageOracle := l1.NewPreimageOracle(pClient, hClient)
l2PreimageOracle := l2.NewPreimageOracle(pClient, hClient)
return Program(logger, cfg, l2Cfg, l1Head, l2Head, l2Claim, l2ClaimBlockNumber, l1PreimageOracle, l2PreimageOracle)
bootInfo := NewBootstrapClient(pClient).BootInfo()
logger.Info("Program Bootstrapped", "bootInfo", bootInfo)
return runDerivation(
logger,
bootInfo.RollupConfig,
bootInfo.L2ChainConfig,
bootInfo.L1Head,
bootInfo.L2Head,
bootInfo.L2Claim,
bootInfo.L2ClaimBlockNumber,
l1PreimageOracle,
l2PreimageOracle,
)
}
// Program executes the L2 state transition, given a minimal interface to retrieve data.
func Program(logger log.Logger, cfg *rollup.Config, l2Cfg *params.ChainConfig, l1Head common.Hash, l2Head common.Hash, l2Claim common.Hash, l2ClaimBlockNum uint64, l1Oracle l1.Oracle, l2Oracle l2.Oracle) error {
// runDerivation executes the L2 state transition, given a minimal interface to retrieve data.
func runDerivation(logger log.Logger, cfg *rollup.Config, l2Cfg *params.ChainConfig, l1Head common.Hash, l2Head common.Hash, l2Claim common.Hash, l2ClaimBlockNum uint64, l1Oracle l1.Oracle, l2Oracle l2.Oracle) error {
l1Source := l1.NewOracleL1Client(logger, l1Oracle, l1Head)
engineBackend, err := l2.NewOracleBackedL2Chain(logger, l2Oracle, l2Cfg, l2Head)
if err != nil {
......@@ -57,3 +81,16 @@ func Program(logger log.Logger, cfg *rollup.Config, l2Cfg *params.ChainConfig, l
}
return d.ValidateClaim(eth.Bytes32(l2Claim))
}
func CreateHinterChannel() oppio.FileChannel {
r := os.NewFile(HClientRFd, "preimage-hint-read")
w := os.NewFile(HClientWFd, "preimage-hint-write")
return oppio.NewReadWritePair(r, w)
}
// CreatePreimageChannel returns a FileChannel for the preimage oracle in a detached context
func CreatePreimageChannel() oppio.FileChannel {
r := os.NewFile(PClientRFd, "preimage-oracle-read")
w := os.NewFile(PClientWFd, "preimage-oracle-write")
return oppio.NewReadWritePair(r, w)
}
......@@ -5,6 +5,7 @@ import (
"fmt"
"os"
cl "github.com/ethereum-optimism/optimism/op-program/client"
"github.com/ethereum-optimism/optimism/op-program/client/driver"
"github.com/ethereum-optimism/optimism/op-program/host"
"github.com/ethereum-optimism/optimism/op-program/host/config"
......@@ -36,6 +37,11 @@ var VersionWithMeta = func() string {
}()
func main() {
if host.RunningProgramInClient() {
logger := oplog.NewLogger(oplog.DefaultCLIConfig())
cl.Main(logger)
panic("Client main should have exited process")
}
args := os.Args
if err := run(args, host.FaultProofProgram); errors.Is(err, driver.ErrClaimNotValid) {
log.Crit("Claim is invalid", "err", err)
......
This diff is collapsed.
package config
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params"
)
var OPGoerliChainConfig = &params.ChainConfig{
ChainID: big.NewInt(420),
HomesteadBlock: big.NewInt(0),
DAOForkBlock: nil,
DAOForkSupport: false,
EIP150Block: big.NewInt(0),
EIP150Hash: common.Hash{},
EIP155Block: big.NewInt(0),
EIP158Block: big.NewInt(0),
ByzantiumBlock: big.NewInt(0),
ConstantinopleBlock: big.NewInt(0),
PetersburgBlock: big.NewInt(0),
IstanbulBlock: big.NewInt(0),
MuirGlacierBlock: big.NewInt(0),
BerlinBlock: big.NewInt(0),
LondonBlock: big.NewInt(4061224),
ArrowGlacierBlock: big.NewInt(4061224),
GrayGlacierBlock: big.NewInt(4061224),
MergeNetsplitBlock: big.NewInt(4061224),
BedrockBlock: big.NewInt(4061224),
RegolithTime: &params.OptimismGoerliRegolithTime,
TerminalTotalDifficulty: big.NewInt(0),
TerminalTotalDifficultyPassed: true,
Optimism: &params.OptimismConfig{
EIP1559Elasticity: 10,
EIP1559Denominator: 50,
},
}
var L2ChainConfigsByName = map[string]*params.ChainConfig{
"goerli": OPGoerliChainConfig,
}
......@@ -49,6 +49,8 @@ type Config struct {
L2ClaimBlockNumber uint64
// L2ChainConfig is the op-geth chain config for the L2 execution engine
L2ChainConfig *params.ChainConfig
// Detached indicates that the program runs as a separate process
Detached bool
}
func (c *Config) Check() error {
......@@ -121,7 +123,16 @@ func NewConfigFromCLI(ctx *cli.Context) (*Config, error) {
return nil, ErrInvalidL1Head
}
l2GenesisPath := ctx.GlobalString(flags.L2GenesisPath.Name)
l2ChainConfig, err := loadChainConfigFromGenesis(l2GenesisPath)
var l2ChainConfig *params.ChainConfig
if l2GenesisPath == "" {
networkName := ctx.GlobalString(flags.Network.Name)
l2ChainConfig = L2ChainConfigsByName[networkName]
if l2ChainConfig == nil {
return nil, fmt.Errorf("flag %s is required for network %s", flags.L2GenesisPath.Name, networkName)
}
} else {
l2ChainConfig, err = loadChainConfigFromGenesis(l2GenesisPath)
}
if err != nil {
return nil, fmt.Errorf("invalid genesis: %w", err)
}
......@@ -137,6 +148,7 @@ func NewConfigFromCLI(ctx *cli.Context) (*Config, error) {
L1URL: ctx.GlobalString(flags.L1NodeAddr.Name),
L1TrustRPC: ctx.GlobalBool(flags.L1TrustRPC.Name),
L1RPCKind: sources.RPCProviderKind(ctx.GlobalString(flags.L1RPCProviderKind.Name)),
Detached: ctx.GlobalBool(flags.Detached.Name),
}, nil
}
......
......@@ -81,6 +81,11 @@ var (
return &out
}(),
}
Detached = cli.BoolFlag{
Name: "detached",
Usage: "Run the program as a separate process detached from the host",
EnvVar: service.PrefixEnvVar(envVarPrefix, "DETACHED"),
}
)
// Flags contains the list of configuration options available to the binary.
......@@ -91,16 +96,17 @@ var requiredFlags = []cli.Flag{
L2Head,
L2Claim,
L2BlockNumber,
L2GenesisPath,
}
var programFlags = []cli.Flag{
RollupConfig,
Network,
DataDir,
L2NodeAddr,
L2GenesisPath,
L1NodeAddr,
L1TrustRPC,
L1RPCProviderKind,
Detached,
}
func init() {
......@@ -118,6 +124,9 @@ func CheckRequired(ctx *cli.Context) error {
if rollupConfig != "" && network != "" {
return fmt.Errorf("cannot specify both %s and %s", RollupConfig.Name, Network.Name)
}
if network == "" && ctx.GlobalString(L2GenesisPath.Name) == "" {
return fmt.Errorf("flag %s is required for custom networks", L2GenesisPath.Name)
}
for _, flag := range requiredFlags {
if !ctx.IsSet(flag.GetName()) {
return fmt.Errorf("flag %s is required", flag.GetName())
......
......@@ -5,7 +5,9 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"os"
"os/exec"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/client"
......@@ -14,6 +16,7 @@ import (
"github.com/ethereum-optimism/optimism/op-program/host/config"
"github.com/ethereum-optimism/optimism/op-program/host/kvstore"
"github.com/ethereum-optimism/optimism/op-program/host/prefetcher"
oppio "github.com/ethereum-optimism/optimism/op-program/io"
"github.com/ethereum-optimism/optimism/op-program/preimage"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
......@@ -24,6 +27,13 @@ type L2Source struct {
*sources.DebugClient
}
const opProgramChildEnvName = "OP_PROGRAM_CHILD"
func RunningProgramInClient() bool {
value, _ := os.LookupEnv(opProgramChildEnvName)
return value == "true"
}
// FaultProofProgram is the programmatic entry-point for the fault proof program
func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
if err := cfg.Check(); err != nil {
......@@ -44,35 +54,15 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
kv = kvstore.NewDiskKV(cfg.DataDir)
}
var getPreimage func(key common.Hash) ([]byte, error)
var hinter func(hint string) error
var (
getPreimage func(key common.Hash) ([]byte, error)
hinter func(hint string) error
)
if cfg.FetchingEnabled() {
logger.Info("Connecting to L1 node", "l1", cfg.L1URL)
l1RPC, err := client.NewRPC(ctx, logger, cfg.L1URL)
if err != nil {
return fmt.Errorf("failed to setup L1 RPC: %w", err)
}
logger.Info("Connecting to L2 node", "l2", cfg.L2URL)
l2RPC, err := client.NewRPC(ctx, logger, cfg.L2URL)
if err != nil {
return fmt.Errorf("failed to setup L2 RPC: %w", err)
}
l1ClCfg := sources.L1ClientDefaultConfig(cfg.Rollup, cfg.L1TrustRPC, cfg.L1RPCKind)
l2ClCfg := sources.L2ClientDefaultConfig(cfg.Rollup, true)
l1Cl, err := sources.NewL1Client(l1RPC, logger, nil, l1ClCfg)
if err != nil {
return fmt.Errorf("failed to create L1 client: %w", err)
}
l2Cl, err := sources.NewL2Client(l2RPC, logger, nil, l2ClCfg)
prefetch, err := makePrefetcher(ctx, logger, kv, cfg)
if err != nil {
return fmt.Errorf("failed to create L2 client: %w", err)
return fmt.Errorf("failed to create prefetcher: %w", err)
}
l2DebugCl := &L2Source{L2Client: l2Cl, DebugClient: sources.NewDebugClient(l2RPC.CallContext)}
logger.Info("Setting up pre-fetcher")
prefetch := prefetcher.NewPrefetcher(logger, l1Cl, l2DebugCl, kv)
getPreimage = func(key common.Hash) ([]byte, error) { return prefetch.GetPreimage(ctx, key) }
hinter = prefetch.Hint
} else {
......@@ -87,53 +77,81 @@ func FaultProofProgram(logger log.Logger, cfg *config.Config) error {
localPreimageSource := kvstore.NewLocalPreimageSource(cfg)
splitter := kvstore.NewPreimageSourceSplitter(localPreimageSource.Get, getPreimage)
// Setup pipe for preimage oracle interaction
pClientRW, pHostRW := bidirectionalPipe()
// Setup client I/O for preimage oracle interaction
pClientRW, pHostRW, err := oppio.CreateBidirectionalChannel()
if err != nil {
return fmt.Errorf("failed to create preimage pipe: %w", err)
}
oracleServer := preimage.NewOracleServer(pHostRW)
// Setup pipe for hint comms
hClientRW, hHostRW := bidirectionalPipe()
hHost := preimage.NewHintReader(hHostRW)
launchOracleServer(logger, oracleServer, splitter.Get)
defer pHostRW.Close()
// Setup client I/O for hint comms
hClientRW, hHostRW, err := oppio.CreateBidirectionalChannel()
if err != nil {
return fmt.Errorf("failed to create hints pipe: %w", err)
}
defer hHostRW.Close()
hHost := preimage.NewHintReader(hHostRW)
routeHints(logger, hHost, hinter)
launchOracleServer(logger, oracleServer, splitter.Get)
return cl.ClientProgram(
logger,
cfg.Rollup,
cfg.L2ChainConfig,
cfg.L1Head,
cfg.L2Head,
cfg.L2Claim,
cfg.L2ClaimBlockNumber,
pClientRW,
hClientRW,
)
var cmd *exec.Cmd
if cfg.Detached {
cmd = exec.CommandContext(ctx, os.Args[0])
cmd.ExtraFiles = make([]*os.File, cl.MaxFd-3) // not including stdin, stdout and stderr
cmd.ExtraFiles[cl.HClientRFd-3] = hClientRW.Reader()
cmd.ExtraFiles[cl.HClientWFd-3] = hClientRW.Writer()
cmd.ExtraFiles[cl.PClientRFd-3] = pClientRW.Reader()
cmd.ExtraFiles[cl.PClientWFd-3] = pClientRW.Writer()
cmd.Stdout = os.Stdout // for debugging
cmd.Stderr = os.Stderr // for debugging
cmd.Env = append(os.Environ(), fmt.Sprintf("%s=true", opProgramChildEnvName))
err := cmd.Start()
if err != nil {
return fmt.Errorf("program cmd failed to start: %w", err)
}
if err := cmd.Wait(); err != nil {
return fmt.Errorf("failed to wait for child program: %w", err)
}
return nil
} else {
return cl.RunProgram(logger, pClientRW, hClientRW)
}
}
type readWritePair struct {
io.ReadCloser
io.WriteCloser
}
func makePrefetcher(ctx context.Context, logger log.Logger, kv kvstore.KV, cfg *config.Config) (*prefetcher.Prefetcher, error) {
logger.Info("Connecting to L1 node", "l1", cfg.L1URL)
l1RPC, err := client.NewRPC(ctx, logger, cfg.L1URL)
if err != nil {
return nil, fmt.Errorf("failed to setup L1 RPC: %w", err)
}
func (rw *readWritePair) Close() error {
if err := rw.ReadCloser.Close(); err != nil {
return err
logger.Info("Connecting to L2 node", "l2", cfg.L2URL)
l2RPC, err := client.NewRPC(ctx, logger, cfg.L2URL)
if err != nil {
return nil, fmt.Errorf("failed to setup L2 RPC: %w", err)
}
return rw.WriteCloser.Close()
}
func bidirectionalPipe() (a, b io.ReadWriteCloser) {
ar, bw := io.Pipe()
br, aw := io.Pipe()
return &readWritePair{ReadCloser: ar, WriteCloser: aw}, &readWritePair{ReadCloser: br, WriteCloser: bw}
l1ClCfg := sources.L1ClientDefaultConfig(cfg.Rollup, cfg.L1TrustRPC, cfg.L1RPCKind)
l2ClCfg := sources.L2ClientDefaultConfig(cfg.Rollup, true)
l1Cl, err := sources.NewL1Client(l1RPC, logger, nil, l1ClCfg)
if err != nil {
return nil, fmt.Errorf("failed to create L1 client: %w", err)
}
l2Cl, err := sources.NewL2Client(l2RPC, logger, nil, l2ClCfg)
if err != nil {
return nil, fmt.Errorf("failed to create L2 client: %w", err)
}
l2DebugCl := &L2Source{L2Client: l2Cl, DebugClient: sources.NewDebugClient(l2RPC.CallContext)}
return prefetcher.NewPrefetcher(logger, l1Cl, l2DebugCl, kv), nil
}
func routeHints(logger log.Logger, hintReader *preimage.HintReader, hinter func(hint string) error) {
go func() {
for {
if err := hintReader.NextHint(hinter); err != nil {
if err == io.EOF || errors.Is(err, io.ErrClosedPipe) {
if err == io.EOF || errors.Is(err, fs.ErrClosed) {
logger.Debug("closing pre-image hint handler")
return
}
......@@ -148,7 +166,7 @@ func launchOracleServer(logger log.Logger, server *preimage.OracleServer, getter
go func() {
for {
if err := server.NextPreimageRequest(getter); err != nil {
if err == io.EOF || errors.Is(err, io.ErrClosedPipe) {
if err == io.EOF || errors.Is(err, fs.ErrClosed) {
logger.Debug("closing pre-image server")
return
}
......
......@@ -4,8 +4,8 @@ import (
"encoding/binary"
"encoding/json"
"github.com/ethereum-optimism/optimism/op-program/client"
"github.com/ethereum-optimism/optimism/op-program/host/config"
"github.com/ethereum-optimism/optimism/op-program/preimage"
"github.com/ethereum/go-ethereum/common"
)
......@@ -17,32 +17,28 @@ func NewLocalPreimageSource(config *config.Config) *LocalPreimageSource {
return &LocalPreimageSource{config}
}
func localKey(num int64) common.Hash {
return preimage.LocalIndexKey(num).PreimageKey()
}
var (
L1HeadKey = localKey(1)
L2HeadKey = localKey(2)
L2ClaimKey = localKey(3)
L2ClaimBlockNumberKey = localKey(4)
L2ChainConfigKey = localKey(5)
RollupKey = localKey(6)
l1HeadKey = client.L1HeadLocalIndex.PreimageKey()
l2HeadKey = client.L2HeadLocalIndex.PreimageKey()
l2ClaimKey = client.L2ClaimLocalIndex.PreimageKey()
l2ClaimBlockNumberKey = client.L2ClaimBlockNumberLocalIndex.PreimageKey()
l2ChainConfigKey = client.L2ChainConfigLocalIndex.PreimageKey()
rollupKey = client.RollupConfigLocalIndex.PreimageKey()
)
func (s *LocalPreimageSource) Get(key common.Hash) ([]byte, error) {
switch key {
case L1HeadKey:
case l1HeadKey:
return s.config.L1Head.Bytes(), nil
case L2HeadKey:
case l2HeadKey:
return s.config.L2Head.Bytes(), nil
case L2ClaimKey:
case l2ClaimKey:
return s.config.L2Claim.Bytes(), nil
case L2ClaimBlockNumberKey:
case l2ClaimBlockNumberKey:
return binary.BigEndian.AppendUint64(nil, s.config.L2ClaimBlockNumber), nil
case L2ChainConfigKey:
case l2ChainConfigKey:
return json.Marshal(s.config.L2ChainConfig)
case RollupKey:
case rollupKey:
return json.Marshal(s.config.Rollup)
default:
return nil, ErrNotFound
......
......@@ -28,12 +28,12 @@ func TestLocalPreimageSource(t *testing.T) {
key common.Hash
expected []byte
}{
{"L1Head", L1HeadKey, cfg.L1Head.Bytes()},
{"L2Head", L2HeadKey, cfg.L2Head.Bytes()},
{"L2Claim", L2ClaimKey, cfg.L2Claim.Bytes()},
{"L2ClaimBlockNumber", L2ClaimBlockNumberKey, binary.BigEndian.AppendUint64(nil, cfg.L2ClaimBlockNumber)},
{"Rollup", RollupKey, asJson(t, cfg.Rollup)},
{"ChainConfig", L2ChainConfigKey, asJson(t, cfg.L2ChainConfig)},
{"L1Head", l1HeadKey, cfg.L1Head.Bytes()},
{"L2Head", l2HeadKey, cfg.L2Head.Bytes()},
{"L2Claim", l2ClaimKey, cfg.L2Claim.Bytes()},
{"L2ClaimBlockNumber", l2ClaimBlockNumberKey, binary.BigEndian.AppendUint64(nil, cfg.L2ClaimBlockNumber)},
{"Rollup", rollupKey, asJson(t, cfg.Rollup)},
{"ChainConfig", l2ChainConfigKey, asJson(t, cfg.L2ChainConfig)},
{"Unknown", preimage.LocalIndexKey(1000).PreimageKey(), nil},
}
for _, test := range tests {
......
package io
import (
"io"
"os"
)
// FileChannel is a unidirectional channel for file I/O
type FileChannel interface {
io.ReadWriteCloser
// Reader returns the file that is used for reading.
Reader() *os.File
// Writer returns the file that is used for writing.
Writer() *os.File
}
type ReadWritePair struct {
r *os.File
w *os.File
}
// NewReadWritePair creates a new FileChannel that uses the given files
func NewReadWritePair(r *os.File, w *os.File) *ReadWritePair {
return &ReadWritePair{r: r, w: w}
}
func (rw *ReadWritePair) Read(p []byte) (int, error) {
return rw.r.Read(p)
}
func (rw *ReadWritePair) Write(p []byte) (int, error) {
return rw.w.Write(p)
}
func (rw *ReadWritePair) Reader() *os.File {
return rw.r
}
func (rw *ReadWritePair) Writer() *os.File {
return rw.w
}
func (rw *ReadWritePair) Close() error {
if err := rw.r.Close(); err != nil {
return err
}
return rw.w.Close()
}
// CreateBidirectionalChannel creates a pair of FileChannels that are connected to each other.
func CreateBidirectionalChannel() (FileChannel, FileChannel, error) {
ar, bw, err := os.Pipe()
if err != nil {
return nil, nil, err
}
br, aw, err := os.Pipe()
if err != nil {
return nil, nil, err
}
return NewReadWritePair(ar, aw), NewReadWritePair(br, bw), nil
}
......@@ -2,6 +2,7 @@ package flags
import (
"fmt"
"time"
"github.com/urfave/cli"
......@@ -32,13 +33,14 @@ var (
Usage: "Address of the L2OutputOracle contract",
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "L2OO_ADDRESS"),
}
// Optional flags
PollIntervalFlag = cli.DurationFlag{
Name: "poll-interval",
Usage: "Delay between querying L2 for more transactions and " +
"creating a new batch",
Name: "poll-interval",
Usage: "How frequently to poll L2 for new blocks",
Value: 6 * time.Second,
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "POLL_INTERVAL"),
}
// Optional flags
AllowNonFinalizedFlag = cli.BoolFlag{
Name: "allow-non-finalized",
Usage: "Allow the proposer to submit proposals for L2 blocks derived from non-finalized L1 blocks.",
......@@ -52,10 +54,10 @@ var requiredFlags = []cli.Flag{
L1EthRpcFlag,
RollupRpcFlag,
L2OOAddressFlag,
PollIntervalFlag,
}
var optionalFlags = []cli.Flag{
PollIntervalFlag,
AllowNonFinalizedFlag,
}
......
......@@ -5,6 +5,7 @@ import "github.com/ethereum/go-ethereum/core/types"
type NoopTxMetrics struct{}
func (*NoopTxMetrics) RecordNonce(uint64) {}
func (*NoopTxMetrics) RecordPendingTx(int64) {}
func (*NoopTxMetrics) RecordGasBumpCount(int) {}
func (*NoopTxMetrics) RecordTxConfirmationLatency(int64) {}
func (*NoopTxMetrics) TxConfirmed(*types.Receipt) {}
......
......@@ -12,6 +12,7 @@ type TxMetricer interface {
RecordGasBumpCount(int)
RecordTxConfirmationLatency(int64)
RecordNonce(uint64)
RecordPendingTx(pending int64)
TxConfirmed(*types.Receipt)
TxPublished(string)
RPCError()
......@@ -24,6 +25,7 @@ type TxMetrics struct {
txFeeHistogram prometheus.Histogram
LatencyConfirmedTx prometheus.Gauge
currentNonce prometheus.Gauge
pendingTxs prometheus.Gauge
txPublishError *prometheus.CounterVec
publishEvent metrics.Event
confirmEvent metrics.EventVec
......@@ -82,6 +84,12 @@ func MakeTxMetrics(ns string, factory metrics.Factory) TxMetrics {
Help: "Current nonce of the from address",
Subsystem: "txmgr",
}),
pendingTxs: factory.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "pending_txs",
Help: "Number of transactions pending receipts",
Subsystem: "txmgr",
}),
txPublishError: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: "tx_publish_error_count",
......@@ -103,6 +111,10 @@ func (t *TxMetrics) RecordNonce(nonce uint64) {
t.currentNonce.Set(float64(nonce))
}
func (t *TxMetrics) RecordPendingTx(pending int64) {
t.pendingTxs.Set(float64(pending))
}
// TxConfirmed records lots of information about the confirmed transaction
func (t *TxMetrics) TxConfirmed(receipt *types.Receipt) {
fee := float64(receipt.EffectiveGasPrice.Uint64() * receipt.GasUsed / params.GWei)
......
package txmgr
import (
"context"
"math"
"sync"
"github.com/ethereum/go-ethereum/core/types"
"golang.org/x/sync/errgroup"
)
type TxReceipt[T any] struct {
// ID can be used to identify unique tx receipts within the recept channel
ID T
// Receipt result from the transaction send
Receipt *types.Receipt
// Err contains any error that occurred during the tx send
Err error
}
type Queue[T any] struct {
ctx context.Context
txMgr TxManager
maxPending uint64
groupLock sync.Mutex
groupCtx context.Context
group *errgroup.Group
}
// NewQueue creates a new transaction sending Queue, with the following parameters:
// - maxPending: max number of pending txs at once (0 == no limit)
// - pendingChanged: called whenever a tx send starts or finishes. The
// number of currently pending txs is passed as a parameter.
func NewQueue[T any](ctx context.Context, txMgr TxManager, maxPending uint64) *Queue[T] {
if maxPending > math.MaxInt {
// ensure we don't overflow as errgroup only accepts int; in reality this will never be an issue
maxPending = math.MaxInt
}
return &Queue[T]{
ctx: ctx,
txMgr: txMgr,
maxPending: maxPending,
}
}
// Wait waits for all pending txs to complete (or fail).
func (q *Queue[T]) Wait() {
if q.group == nil {
return
}
_ = q.group.Wait()
}
// Send will wait until the number of pending txs is below the max pending,
// and then send the next tx.
//
// The actual tx sending is non-blocking, with the receipt returned on the
// provided receipt channel. If the channel is unbuffered, the goroutine is
// blocked from completing until the channel is read from.
func (q *Queue[T]) Send(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) {
group, ctx := q.groupContext()
group.Go(func() error {
return q.sendTx(ctx, id, candidate, receiptCh)
})
}
// TrySend sends the next tx, but only if the number of pending txs is below the
// max pending.
//
// Returns false if there is no room in the queue to send. Otherwise, the
// transaction is queued and this method returns true.
//
// The actual tx sending is non-blocking, with the receipt returned on the
// provided receipt channel. If the channel is unbuffered, the goroutine is
// blocked from completing until the channel is read from.
func (q *Queue[T]) TrySend(id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) bool {
group, ctx := q.groupContext()
return group.TryGo(func() error {
return q.sendTx(ctx, id, candidate, receiptCh)
})
}
func (q *Queue[T]) sendTx(ctx context.Context, id T, candidate TxCandidate, receiptCh chan TxReceipt[T]) error {
receipt, err := q.txMgr.Send(ctx, candidate)
receiptCh <- TxReceipt[T]{
ID: id,
Receipt: receipt,
Err: err,
}
return err
}
// groupContext returns a Group and a Context to use when sending a tx.
//
// If any of the pending transactions returned an error, the queue's shared error Group is
// canceled. This method will wait on that Group for all pending transactions to return,
// and create a new Group with the queue's global context as its parent.
func (q *Queue[T]) groupContext() (*errgroup.Group, context.Context) {
q.groupLock.Lock()
defer q.groupLock.Unlock()
if q.groupCtx == nil || q.groupCtx.Err() != nil {
// no group exists, or the existing context has an error, so we need to wait
// for existing group threads to complete (if any) and create a new group
if q.group != nil {
_ = q.group.Wait()
}
q.group, q.groupCtx = errgroup.WithContext(q.ctx)
if q.maxPending > 0 {
q.group.SetLimit(int(q.maxPending))
}
}
return q.group, q.groupCtx
}
package txmgr
import (
"context"
"fmt"
"math/big"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/txmgr/metrics"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
)
type queueFunc func(id int, candidate TxCandidate, receiptCh chan TxReceipt[int], q *Queue[int]) bool
func sendQueueFunc(id int, candidate TxCandidate, receiptCh chan TxReceipt[int], q *Queue[int]) bool {
q.Send(id, candidate, receiptCh)
return true
}
func trySendQueueFunc(id int, candidate TxCandidate, receiptCh chan TxReceipt[int], q *Queue[int]) bool {
return q.TrySend(id, candidate, receiptCh)
}
type queueCall struct {
call queueFunc // queue call (either Send or TrySend, use function helpers above)
queued bool // true if the send was queued
txErr bool // true if the tx send should return an error
}
type testTx struct {
sendErr bool // error to return from send for this tx
}
type testCase struct {
name string // name of the test
max uint64 // max concurrency of the queue
calls []queueCall // calls to the queue
txs []testTx // txs to generate from the factory (and potentially error in send)
nonces []uint64 // expected sent tx nonces after all calls are made
total time.Duration // approx. total time it should take to complete all queue calls
}
type mockBackendWithNonce struct {
mockBackend
}
func newMockBackendWithNonce(g *gasPricer) *mockBackendWithNonce {
return &mockBackendWithNonce{
mockBackend: mockBackend{
g: g,
minedTxs: make(map[common.Hash]minedTxInfo),
},
}
}
func (b *mockBackendWithNonce) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) {
return uint64(len(b.minedTxs)), nil
}
func TestSend(t *testing.T) {
testCases := []testCase{
{
name: "success",
max: 5,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: true},
},
txs: []testTx{
{},
{},
},
nonces: []uint64{0, 1},
total: 1 * time.Second,
},
{
name: "no limit",
max: 0,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: true},
},
txs: []testTx{
{},
{},
},
nonces: []uint64{0, 1},
total: 1 * time.Second,
},
{
name: "single threaded",
max: 1,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: false},
{call: trySendQueueFunc, queued: false},
},
txs: []testTx{
{},
},
nonces: []uint64{0},
total: 1 * time.Second,
},
{
name: "single threaded blocking",
max: 1,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: false},
{call: sendQueueFunc, queued: true},
{call: sendQueueFunc, queued: true},
},
txs: []testTx{
{},
{},
{},
},
nonces: []uint64{0, 1, 2},
total: 3 * time.Second,
},
{
name: "dual threaded blocking",
max: 2,
calls: []queueCall{
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: true},
{call: trySendQueueFunc, queued: false},
{call: sendQueueFunc, queued: true},
{call: sendQueueFunc, queued: true},
{call: sendQueueFunc, queued: true},
},
txs: []testTx{
{},
{},
{},
{},
{},
},
nonces: []uint64{0, 1, 2, 3, 4},
total: 3 * time.Second,
},
{
name: "subsequent txs fail after tx failure",
max: 1,
calls: []queueCall{
{call: sendQueueFunc, queued: true},
{call: sendQueueFunc, queued: true, txErr: true},
{call: sendQueueFunc, queued: true, txErr: true},
},
txs: []testTx{
{},
{sendErr: true},
{},
},
nonces: []uint64{0, 1, 1},
total: 3 * time.Second,
},
}
for _, test := range testCases {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
conf := configWithNumConfs(1)
conf.ReceiptQueryInterval = 1 * time.Second // simulate a network send
conf.ResubmissionTimeout = 2 * time.Second // resubmit to detect errors
conf.SafeAbortNonceTooLowCount = 1
backend := newMockBackendWithNonce(newGasPricer(3))
mgr := &SimpleTxManager{
chainID: conf.ChainID,
name: "TEST",
cfg: conf,
backend: backend,
l: testlog.Logger(t, log.LvlCrit),
metr: &metrics.NoopTxMetrics{},
}
// track the nonces, and return any expected errors from tx sending
var nonces []uint64
sendTx := func(ctx context.Context, tx *types.Transaction) error {
index := int(tx.Data()[0])
nonces = append(nonces, tx.Nonce())
var testTx *testTx
if index < len(test.txs) {
testTx = &test.txs[index]
}
if testTx != nil && testTx.sendErr {
return core.ErrNonceTooLow
}
txHash := tx.Hash()
backend.mine(&txHash, tx.GasFeeCap())
return nil
}
backend.setTxSender(sendTx)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
queue := NewQueue[int](ctx, mgr, test.max)
// make all the queue calls given in the test case
start := time.Now()
for i, c := range test.calls {
msg := fmt.Sprintf("Call %d", i)
c := c
receiptCh := make(chan TxReceipt[int], 1)
candidate := TxCandidate{
TxData: []byte{byte(i)},
To: &common.Address{},
}
queued := c.call(i, candidate, receiptCh, queue)
require.Equal(t, c.queued, queued, msg)
go func() {
r := <-receiptCh
if c.txErr {
require.Error(t, r.Err, msg)
} else {
require.NoError(t, r.Err, msg)
}
}()
}
// wait for the queue to drain (all txs complete or failed)
queue.Wait()
duration := time.Since(start)
// expect the execution time within a certain window
now := time.Now()
require.WithinDuration(t, now.Add(test.total), now.Add(duration), 500*time.Millisecond, "unexpected queue transaction timing")
// check that the nonces match
slices.Sort(nonces)
require.Equal(t, test.nonces, nonces, "expected nonces do not match")
})
}
}
......@@ -4,10 +4,10 @@ import (
"context"
"errors"
"fmt"
"math/big"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum"
......@@ -38,7 +38,7 @@ type TxManager interface {
// It can be stopped by cancelling the provided context; however, the transaction
// may be included on L1 even if the context is cancelled.
//
// NOTE: Send should be called by AT MOST one caller at a time.
// NOTE: Send can be called concurrently, the nonce will be managed internally.
Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error)
// From returns the sending address associated with the instance of the transaction manager.
......@@ -84,6 +84,11 @@ type SimpleTxManager struct {
backend ETHBackend
l log.Logger
metr metrics.TxMetricer
nonce *uint64
nonceLock sync.RWMutex
pending atomic.Int64
}
// NewSimpleTxManager initializes a new SimpleTxManager with the passed Config.
......@@ -126,8 +131,21 @@ type TxCandidate struct {
// The transaction manager handles all signing. If and only if the gas limit is 0, the
// transaction manager will do a gas estimation.
//
// NOTE: Send should be called by AT MOST one caller at a time.
// NOTE: Send can be called concurrently, the nonce will be managed internally.
func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
m.metr.RecordPendingTx(m.pending.Add(1))
defer func() {
m.metr.RecordPendingTx(m.pending.Add(-1))
}()
receipt, err := m.send(ctx, candidate)
if err != nil {
m.resetNonce()
}
return receipt, err
}
// send performs the actual transaction creation and sending.
func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
if m.cfg.TxSendTimeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout)
......@@ -137,7 +155,7 @@ func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*typ
if err != nil {
return nil, fmt.Errorf("failed to create the tx: %w", err)
}
return m.send(ctx, tx)
return m.sendTx(ctx, tx)
}
// craftTx creates the signed transaction
......@@ -153,15 +171,10 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*
}
gasFeeCap := calcGasFeeCap(basefee, gasTipCap)
// Fetch the sender's nonce from the latest known block (nil `blockNumber`)
childCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel()
nonce, err := m.backend.NonceAt(childCtx, m.cfg.From, nil)
nonce, err := m.nextNonce(ctx)
if err != nil {
m.metr.RPCError()
return nil, fmt.Errorf("failed to get nonce: %w", err)
return nil, err
}
m.metr.RecordNonce(nonce)
rawTx := &types.DynamicFeeTx{
ChainID: m.chainID,
......@@ -192,14 +205,48 @@ func (m *SimpleTxManager) craftTx(ctx context.Context, candidate TxCandidate) (*
rawTx.Gas = gas
}
ctx, cancel = context.WithTimeout(ctx, m.cfg.NetworkTimeout)
ctx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel()
return m.cfg.Signer(ctx, m.cfg.From, types.NewTx(rawTx))
}
// nextNonce returns a nonce to use for the next transaction. It uses
// eth_getTransactionCount with "latest" once, and then subsequent calls simply
// increment this number. If the transaction manager is reset, it will query the
// eth_getTransactionCount nonce again.
func (m *SimpleTxManager) nextNonce(ctx context.Context) (uint64, error) {
m.nonceLock.Lock()
defer m.nonceLock.Unlock()
if m.nonce == nil {
// Fetch the sender's nonce from the latest known block (nil `blockNumber`)
childCtx, cancel := context.WithTimeout(ctx, m.cfg.NetworkTimeout)
defer cancel()
nonce, err := m.backend.NonceAt(childCtx, m.cfg.From, nil)
if err != nil {
m.metr.RPCError()
return 0, fmt.Errorf("failed to get nonce: %w", err)
}
m.nonce = &nonce
} else {
*m.nonce++
}
m.metr.RecordNonce(*m.nonce)
return *m.nonce, nil
}
// resetNonce resets the internal nonce tracking. This is called if any pending send
// returns an error.
func (m *SimpleTxManager) resetNonce() {
m.nonceLock.Lock()
defer m.nonceLock.Unlock()
m.nonce = nil
}
// send submits the same transaction several times with increasing gas prices as necessary.
// It waits for the transaction to be confirmed on chain.
func (m *SimpleTxManager) send(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) {
func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*types.Receipt, error) {
var wg sync.WaitGroup
defer wg.Wait()
ctx, cancel := context.WithCancel(ctx)
......
......@@ -277,7 +277,7 @@ func TestTxMgrConfirmAtMinGasPrice(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
......@@ -305,7 +305,7 @@ func TestTxMgrNeverConfirmCancel(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Equal(t, err, context.DeadlineExceeded)
require.Nil(t, receipt)
}
......@@ -334,7 +334,7 @@ func TestTxMgrConfirmsAtHigherGasPrice(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
......@@ -365,7 +365,7 @@ func TestTxMgrBlocksOnFailingRpcCalls(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Equal(t, err, context.DeadlineExceeded)
require.Nil(t, receipt)
}
......@@ -443,7 +443,7 @@ func TestTxMgrOnlyOnePublicationSucceeds(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err)
require.NotNil(t, receipt)
......@@ -478,7 +478,7 @@ func TestTxMgrConfirmsMinGasPriceAfterBumping(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
......@@ -523,7 +523,7 @@ func TestTxMgrDoesntAbortNonceTooLowAfterMiningTx(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
receipt, err := h.mgr.send(ctx, tx)
receipt, err := h.mgr.sendTx(ctx, tx)
require.Nil(t, err)
require.NotNil(t, receipt)
require.Equal(t, h.gasPricer.expGasFeeCap().Uint64(), receipt.GasUsed)
......@@ -870,3 +870,40 @@ func TestErrStringMatch(t *testing.T) {
})
}
}
func TestNonceReset(t *testing.T) {
conf := configWithNumConfs(1)
conf.SafeAbortNonceTooLowCount = 1
h := newTestHarnessWithConfig(t, conf)
index := -1
var nonces []uint64
sendTx := func(ctx context.Context, tx *types.Transaction) error {
index++
nonces = append(nonces, tx.Nonce())
// fail every 3rd tx
if index%3 == 0 {
return core.ErrNonceTooLow
}
txHash := tx.Hash()
h.backend.mine(&txHash, tx.GasFeeCap())
return nil
}
h.backend.setTxSender(sendTx)
ctx := context.Background()
for i := 0; i < 8; i++ {
_, err := h.mgr.Send(ctx, TxCandidate{
To: &common.Address{},
})
// expect every 3rd tx to fail
if i%3 == 0 {
require.Error(t, err)
} else {
require.NoError(t, err)
}
}
// internal nonce tracking should be reset every 3rd tx
require.Equal(t, []uint64{0, 0, 1, 2, 0, 1, 2, 0}, nonces)
}
......@@ -3,40 +3,29 @@
"portalGuardian": "0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc",
"controller": "0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266",
"proxyAdminOwner": "0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc",
"l1StartingBlockTag": "earliest",
"l1ChainID": 900,
"l2ChainID": 901,
"l2BlockTime": 2,
"maxSequencerDrift": 300,
"sequencerWindowSize": 15,
"channelTimeout": 40,
"p2pSequencerAddress": "0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc",
"batchInboxAddress": "0xff00000000000000000000000000000000000000",
"batchSenderAddress": "0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC",
"l2OutputOracleSubmissionInterval": 6,
"l2OutputOracleStartingTimestamp": 0,
"l2OutputOracleStartingBlockNumber": 0,
"l2OutputOracleProposer": "0x70997970C51812dc3A010C7d01b50e0d17dc79C8",
"l2OutputOracleChallenger": "0x6925B8704Ff96DEe942623d6FB5e946EF5884b63",
"l2GenesisBlockBaseFeePerGas": "0x3B9ACA00",
"l2GenesisBlockGasLimit": "0x17D7840",
"baseFeeVaultRecipient": "0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc",
"l1FeeVaultRecipient": "0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc",
"sequencerFeeVaultRecipient": "0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc",
"l1FeeVaultRecipient": "0x71bE63f3384f5fb98995898A86B02Fb2426c5788",
"sequencerFeeVaultRecipient": "0xfabb0ac9d68b0b445fb7357272ff202c5651694a",
"governanceTokenName": "Optimism",
"governanceTokenSymbol": "OP",
"governanceTokenOwner": "0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc",
"l1FeeVaultRecipient": "0x71bE63f3384f5fb98995898A86B02Fb2426c5788",
"sequencerFeeVaultRecipient": "0xfabb0ac9d68b0b445fb7357272ff202c5651694a",
"finalizationPeriodSeconds": 2,
"numDeployConfirmations": 1
}
}
\ No newline at end of file
......@@ -30,6 +30,7 @@
"coverage:lcov": "yarn build:differential && yarn build:fuzz && forge coverage --report lcov",
"gas-snapshot": "yarn build:differential && yarn build:fuzz && forge snapshot --no-match-test 'testDiff|testFuzz|invariant|generateArtifact'",
"storage-snapshot": "./scripts/storage-snapshot.sh",
"validate-deploy-configs": "hardhat compile && hardhat generate-deploy-config && ./scripts/validate-deploy-configs.sh",
"validate-spacers": "hardhat compile && hardhat validate-spacers",
"slither": "./scripts/slither.sh",
"slither:triage": "TRIAGE_MODE=1 ./scripts/slither.sh",
......
#!/usr/bin/env bash
set -e
dir=$(dirname "$0")
echo "Validating deployment configurations...\n"
for config in $dir/../deploy-config/*.json
do
echo "Found file: $config\n"
git diff --exit-code $config
done
echo "Deployment configs in $dir/../deploy-config validated!\n"
......@@ -31,6 +31,7 @@ require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/fjl/memsize v0.0.1 // indirect
github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
......
......@@ -140,6 +140,8 @@ github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7j
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ=
github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
......
package avg_sliding_window
import (
"time"
lm "github.com/emirpasic/gods/maps/linkedhashmap"
)
type Clock interface {
Now() time.Time
}
// DefaultClock provides a clock that gets current time from the system time
type DefaultClock struct{}
func NewDefaultClock() *DefaultClock {
return &DefaultClock{}
}
func (c DefaultClock) Now() time.Time {
return time.Now()
}
// AdjustableClock provides a static clock to easily override the system time
type AdjustableClock struct {
now time.Time
}
func NewAdjustableClock(now time.Time) *AdjustableClock {
return &AdjustableClock{now: now}
}
func (c *AdjustableClock) Now() time.Time {
return c.now
}
func (c *AdjustableClock) Set(now time.Time) {
c.now = now
}
type bucket struct {
sum float64
qty uint
}
// AvgSlidingWindow calculates moving averages efficiently.
// Data points are rounded to nearest bucket of size `bucketSize`,
// and evicted when they are too old based on `windowLength`
type AvgSlidingWindow struct {
bucketSize time.Duration
windowLength time.Duration
clock Clock
buckets *lm.Map
qty uint
sum float64
}
type SlidingWindowOpts func(sw *AvgSlidingWindow)
func NewSlidingWindow(opts ...SlidingWindowOpts) *AvgSlidingWindow {
sw := &AvgSlidingWindow{
buckets: lm.New(),
}
for _, opt := range opts {
opt(sw)
}
if sw.bucketSize == 0 {
sw.bucketSize = time.Second
}
if sw.windowLength == 0 {
sw.windowLength = 5 * time.Minute
}
if sw.clock == nil {
sw.clock = NewDefaultClock()
}
return sw
}
func WithWindowLength(windowLength time.Duration) SlidingWindowOpts {
return func(sw *AvgSlidingWindow) {
sw.windowLength = windowLength
}
}
func WithBucketSize(bucketSize time.Duration) SlidingWindowOpts {
return func(sw *AvgSlidingWindow) {
sw.bucketSize = bucketSize
}
}
func WithClock(clock Clock) SlidingWindowOpts {
return func(sw *AvgSlidingWindow) {
sw.clock = clock
}
}
func (sw *AvgSlidingWindow) inWindow(t time.Time) bool {
now := sw.clock.Now().Round(sw.bucketSize)
windowStart := now.Add(-sw.windowLength)
return windowStart.Before(t) && !t.After(now)
}
// Add inserts a new data point into the window, with value `val` with the current time
func (sw *AvgSlidingWindow) Add(val float64) {
t := sw.clock.Now()
sw.AddWithTime(t, val)
}
// AddWithTime inserts a new data point into the window, with value `val` and time `t`
func (sw *AvgSlidingWindow) AddWithTime(t time.Time, val float64) {
sw.advance()
key := t.Round(sw.bucketSize)
if !sw.inWindow(key) {
return
}
var b *bucket
current, found := sw.buckets.Get(key)
if !found {
b = &bucket{}
} else {
b = current.(*bucket)
}
// update bucket
bsum := b.sum
b.qty += 1
b.sum = bsum + val
// update window
wsum := sw.sum
sw.qty += 1
sw.sum = wsum - bsum + b.sum
sw.buckets.Put(key, b)
}
// advance evicts old data points
func (sw *AvgSlidingWindow) advance() {
now := sw.clock.Now().Round(sw.bucketSize)
windowStart := now.Add(-sw.windowLength)
keys := sw.buckets.Keys()
for _, k := range keys {
if k.(time.Time).After(windowStart) {
break
}
val, _ := sw.buckets.Get(k)
b := val.(*bucket)
sw.buckets.Remove(k)
if sw.buckets.Size() > 0 {
sw.qty -= b.qty
sw.sum = sw.sum - b.sum
} else {
sw.qty = 0
sw.sum = 0.0
}
}
}
// Avg retrieves the current average for the sliding window
func (sw *AvgSlidingWindow) Avg() float64 {
sw.advance()
if sw.qty == 0 {
return 0
}
return sw.sum / float64(sw.qty)
}
// Sum retrieves the current sum for the sliding window
func (sw *AvgSlidingWindow) Sum() float64 {
sw.advance()
return sw.sum
}
// Count retrieves the data point count for the sliding window
func (sw *AvgSlidingWindow) Count() uint {
sw.advance()
return sw.qty
}
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment