Commit 78db51b4 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into hamdi/fd.checked.batch.index

parents 91ef5319 df65becb
...@@ -7,13 +7,13 @@ import ( ...@@ -7,13 +7,13 @@ import (
"io" "io"
"math" "math"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
var ( var (
ErrInvalidChannelTimeout = errors.New("channel timeout is less than the safety margin") ErrInvalidChannelTimeout = errors.New("channel timeout is less than the safety margin")
ErrInputTargetReached = errors.New("target amount of input data reached")
ErrMaxFrameIndex = errors.New("max frame index reached (uint16)") ErrMaxFrameIndex = errors.New("max frame index reached (uint16)")
ErrMaxDurationReached = errors.New("max channel duration reached") ErrMaxDurationReached = errors.New("max channel duration reached")
ErrChannelTimeoutClose = errors.New("close to channel timeout") ErrChannelTimeoutClose = errors.New("close to channel timeout")
...@@ -55,19 +55,9 @@ type ChannelConfig struct { ...@@ -55,19 +55,9 @@ type ChannelConfig struct {
SubSafetyMargin uint64 SubSafetyMargin uint64
// The maximum byte-size a frame can have. // The maximum byte-size a frame can have.
MaxFrameSize uint64 MaxFrameSize uint64
// The target number of frames to create per channel. Note that if the
// realized compression ratio is worse than the approximate, more frames may // CompressorConfig contains the configuration for creating new compressors.
// actually be created. This also depends on how close TargetFrameSize is to CompressorConfig compressor.Config
// MaxFrameSize.
TargetFrameSize uint64
// The target number of frames to create in this channel. If the realized
// compression ratio is worse than approxComprRatio, additional leftover
// frame(s) might get created.
TargetNumFrames int
// Approximated compression ratio to assume. Should be slightly smaller than
// average from experiments to avoid the chances of creating a small
// additional leftover frame.
ApproxComprRatio float64
} }
// Check validates the [ChannelConfig] parameters. // Check validates the [ChannelConfig] parameters.
...@@ -96,12 +86,6 @@ func (cc *ChannelConfig) Check() error { ...@@ -96,12 +86,6 @@ func (cc *ChannelConfig) Check() error {
return nil return nil
} }
// InputThreshold calculates the input data threshold in bytes from the given
// parameters.
func (c ChannelConfig) InputThreshold() uint64 {
return uint64(float64(c.TargetNumFrames) * float64(c.TargetFrameSize) / c.ApproxComprRatio)
}
type frameID struct { type frameID struct {
chID derive.ChannelID chID derive.ChannelID
frameNumber uint16 frameNumber uint16
...@@ -142,7 +126,11 @@ type channelBuilder struct { ...@@ -142,7 +126,11 @@ type channelBuilder struct {
// newChannelBuilder creates a new channel builder or returns an error if the // newChannelBuilder creates a new channel builder or returns an error if the
// channel out could not be created. // channel out could not be created.
func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) { func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) {
co, err := derive.NewChannelOut() c, err := cfg.CompressorConfig.NewCompressor()
if err != nil {
return nil, err
}
co, err := derive.NewChannelOut(c)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -209,7 +197,7 @@ func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error ...@@ -209,7 +197,7 @@ func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error
return l1info, fmt.Errorf("converting block to batch: %w", err) return l1info, fmt.Errorf("converting block to batch: %w", err)
} }
if _, err = c.co.AddBatch(batch); errors.Is(err, derive.ErrTooManyRLPBytes) { if _, err = c.co.AddBatch(batch); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.CompressorFullErr) {
c.setFullErr(err) c.setFullErr(err)
return l1info, c.FullErr() return l1info, c.FullErr()
} else if err != nil { } else if err != nil {
...@@ -218,8 +206,8 @@ func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error ...@@ -218,8 +206,8 @@ func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error
c.blocks = append(c.blocks, block) c.blocks = append(c.blocks, block)
c.updateSwTimeout(batch) c.updateSwTimeout(batch)
if c.inputTargetReached() { if err = c.co.FullErr(); err != nil {
c.setFullErr(ErrInputTargetReached) c.setFullErr(err)
// Adding this block still worked, so don't return error, just mark as full // Adding this block still worked, so don't return error, just mark as full
} }
...@@ -293,12 +281,6 @@ func (c *channelBuilder) TimedOut(blockNum uint64) bool { ...@@ -293,12 +281,6 @@ func (c *channelBuilder) TimedOut(blockNum uint64) bool {
return c.timeout != 0 && blockNum >= c.timeout return c.timeout != 0 && blockNum >= c.timeout
} }
// inputTargetReached says whether the target amount of input data has been
// reached in this channel builder. No more blocks can be added afterwards.
func (c *channelBuilder) inputTargetReached() bool {
return uint64(c.co.InputBytes()) >= c.cfg.InputThreshold()
}
// IsFull returns whether the channel is full. // IsFull returns whether the channel is full.
// FullErr returns the reason for the channel being full. // FullErr returns the reason for the channel being full.
func (c *channelBuilder) IsFull() bool { func (c *channelBuilder) IsFull() bool {
...@@ -310,7 +292,7 @@ func (c *channelBuilder) IsFull() bool { ...@@ -310,7 +292,7 @@ func (c *channelBuilder) IsFull() bool {
// //
// It returns a ChannelFullError wrapping one of the following possible reasons // It returns a ChannelFullError wrapping one of the following possible reasons
// for the channel being full: // for the channel being full:
// - ErrInputTargetReached if the target amount of input data has been reached, // - derive.CompressorFullErr if the compressor target has been reached,
// - derive.MaxRLPBytesPerChannel if the general maximum amount of input data // - derive.MaxRLPBytesPerChannel if the general maximum amount of input data
// would have been exceeded by the latest AddBlock call, // would have been exceeded by the latest AddBlock call,
// - ErrMaxFrameIndex if the maximum number of frames has been generated // - ErrMaxFrameIndex if the maximum number of frames has been generated
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
...@@ -28,9 +29,11 @@ var defaultTestChannelConfig = ChannelConfig{ ...@@ -28,9 +29,11 @@ var defaultTestChannelConfig = ChannelConfig{
MaxChannelDuration: 1, MaxChannelDuration: 1,
SubSafetyMargin: 4, SubSafetyMargin: 4,
MaxFrameSize: 120000, MaxFrameSize: 120000,
TargetFrameSize: 100000, CompressorConfig: compressor.Config{
TargetNumFrames: 1, TargetFrameSize: 100000,
ApproxComprRatio: 0.4, TargetNumFrames: 1,
ApproxComprRatio: 0.4,
},
} }
// TestChannelConfig_Check tests the [ChannelConfig] [Check] function. // TestChannelConfig_Check tests the [ChannelConfig] [Check] function.
...@@ -416,7 +419,9 @@ func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) { ...@@ -416,7 +419,9 @@ func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) {
// Mock the internals of `channelBuilder.outputFrame` // Mock the internals of `channelBuilder.outputFrame`
// to construct a single frame // to construct a single frame
co, err := derive.NewChannelOut() c, err := channelConfig.CompressorConfig.NewCompressor()
require.NoError(t, err)
co, err := derive.NewChannelOut(c)
require.NoError(t, err) require.NoError(t, err)
var buf bytes.Buffer var buf bytes.Buffer
fn, err := co.OutputFrame(&buf, channelConfig.MaxFrameSize) fn, err := co.OutputFrame(&buf, channelConfig.MaxFrameSize)
...@@ -483,8 +488,8 @@ func TestChannelBuilder_MaxRLPBytesPerChannel(t *testing.T) { ...@@ -483,8 +488,8 @@ func TestChannelBuilder_MaxRLPBytesPerChannel(t *testing.T) {
t.Parallel() t.Parallel()
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig
channelConfig.MaxFrameSize = derive.MaxRLPBytesPerChannel * 2 channelConfig.MaxFrameSize = derive.MaxRLPBytesPerChannel * 2
channelConfig.TargetFrameSize = derive.MaxRLPBytesPerChannel * 2 channelConfig.CompressorConfig.TargetFrameSize = derive.MaxRLPBytesPerChannel * 2
channelConfig.ApproxComprRatio = 1 channelConfig.CompressorConfig.ApproxComprRatio = 1
// Construct the channel builder // Construct the channel builder
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig)
...@@ -500,9 +505,9 @@ func TestChannelBuilder_MaxRLPBytesPerChannel(t *testing.T) { ...@@ -500,9 +505,9 @@ func TestChannelBuilder_MaxRLPBytesPerChannel(t *testing.T) {
func TestChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T) { func TestChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig
channelConfig.MaxFrameSize = 24 channelConfig.MaxFrameSize = 24
channelConfig.TargetNumFrames = math.MaxInt channelConfig.CompressorConfig.TargetNumFrames = math.MaxInt
channelConfig.TargetFrameSize = 24 channelConfig.CompressorConfig.TargetFrameSize = 24
channelConfig.ApproxComprRatio = 0 channelConfig.CompressorConfig.ApproxComprRatio = 0
// Continuously add blocks until the max frame index is reached // Continuously add blocks until the max frame index is reached
// This should cause the [channelBuilder.OutputFrames] function // This should cause the [channelBuilder.OutputFrames] function
...@@ -544,9 +549,9 @@ func TestChannelBuilder_AddBlock(t *testing.T) { ...@@ -544,9 +549,9 @@ func TestChannelBuilder_AddBlock(t *testing.T) {
channelConfig.MaxFrameSize = 30 channelConfig.MaxFrameSize = 30
// Configure the Input Threshold params so we observe a full channel // Configure the Input Threshold params so we observe a full channel
channelConfig.TargetFrameSize = 30 channelConfig.CompressorConfig.TargetFrameSize = 30
channelConfig.TargetNumFrames = 2 channelConfig.CompressorConfig.TargetNumFrames = 2
channelConfig.ApproxComprRatio = 1 channelConfig.CompressorConfig.ApproxComprRatio = 1
// Construct the channel builder // Construct the channel builder
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig)
...@@ -564,7 +569,7 @@ func TestChannelBuilder_AddBlock(t *testing.T) { ...@@ -564,7 +569,7 @@ func TestChannelBuilder_AddBlock(t *testing.T) {
// Since the channel output is full, the next call to AddBlock // Since the channel output is full, the next call to AddBlock
// should return the channel out full error // should return the channel out full error
require.ErrorIs(t, addMiniBlock(cb), ErrInputTargetReached) require.ErrorIs(t, addMiniBlock(cb), derive.CompressorFullErr)
} }
// TestChannelBuilder_Reset tests the [Reset] function // TestChannelBuilder_Reset tests the [Reset] function
...@@ -701,10 +706,10 @@ func TestChannelBuilder_OutputBytes(t *testing.T) { ...@@ -701,10 +706,10 @@ func TestChannelBuilder_OutputBytes(t *testing.T) {
require := require.New(t) require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano())) rng := rand.New(rand.NewSource(time.Now().UnixNano()))
cfg := defaultTestChannelConfig cfg := defaultTestChannelConfig
cfg.TargetFrameSize = 1000 cfg.CompressorConfig.TargetFrameSize = 1000
cfg.MaxFrameSize = 1000 cfg.MaxFrameSize = 1000
cfg.TargetNumFrames = 16 cfg.CompressorConfig.TargetNumFrames = 16
cfg.ApproxComprRatio = 1.0 cfg.CompressorConfig.ApproxComprRatio = 1.0
cb, err := newChannelBuilder(cfg) cb, err := newChannelBuilder(cfg)
require.NoError(err, "newChannelBuilder") require.NoError(err, "newChannelBuilder")
...@@ -713,7 +718,7 @@ func TestChannelBuilder_OutputBytes(t *testing.T) { ...@@ -713,7 +718,7 @@ func TestChannelBuilder_OutputBytes(t *testing.T) {
for { for {
block, _ := dtest.RandomL2Block(rng, rng.Intn(32)) block, _ := dtest.RandomL2Block(rng, rng.Intn(32))
_, err := cb.AddBlock(block) _, err := cb.AddBlock(block)
if errors.Is(err, ErrInputTargetReached) { if errors.Is(err, derive.CompressorFullErr) {
break break
} }
require.NoError(err) require.NoError(err)
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
...@@ -98,9 +99,11 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) { ...@@ -98,9 +99,11 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
TargetFrameSize: 0, MaxFrameSize: 120_000,
MaxFrameSize: 120_000, CompressorConfig: compressor.Config{
ApproxComprRatio: 1.0, TargetFrameSize: 0,
ApproxComprRatio: 1.0,
},
}) })
a := newMiniL2Block(0) a := newMiniL2Block(0)
...@@ -170,9 +173,12 @@ func TestChannelManager_Clear(t *testing.T) { ...@@ -170,9 +173,12 @@ func TestChannelManager_Clear(t *testing.T) {
ChannelTimeout: 10, ChannelTimeout: 10,
// Have to set the max frame size here otherwise the channel builder would not // Have to set the max frame size here otherwise the channel builder would not
// be able to output any frames // be able to output any frames
MaxFrameSize: 24, MaxFrameSize: 24,
TargetFrameSize: 24, CompressorConfig: compressor.Config{
ApproxComprRatio: 1.0, TargetFrameSize: 24,
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
}) })
// Channel Manager state should be empty by default // Channel Manager state should be empty by default
...@@ -331,9 +337,11 @@ func TestChannelManager_TxResend(t *testing.T) { ...@@ -331,9 +337,11 @@ func TestChannelManager_TxResend(t *testing.T) {
log := testlog.Logger(t, log.LvlError) log := testlog.Logger(t, log.LvlError)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
TargetFrameSize: 0, MaxFrameSize: 120_000,
MaxFrameSize: 120_000, CompressorConfig: compressor.Config{
ApproxComprRatio: 1.0, TargetFrameSize: 0,
ApproxComprRatio: 1.0,
},
}) })
a, _ := derivetest.RandomL2Block(rng, 4) a, _ := derivetest.RandomL2Block(rng, 4)
...@@ -372,10 +380,12 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) { ...@@ -372,10 +380,12 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
TargetFrameSize: 0, MaxFrameSize: 100,
MaxFrameSize: 100, ChannelTimeout: 1000,
ApproxComprRatio: 1.0, CompressorConfig: compressor.Config{
ChannelTimeout: 1000, TargetFrameSize: 0,
ApproxComprRatio: 1.0,
},
}) })
a, _ := derivetest.RandomL2Block(rng, 4) a, _ := derivetest.RandomL2Block(rng, 4)
...@@ -397,10 +407,13 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) { ...@@ -397,10 +407,13 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
TargetFrameSize: 0, MaxFrameSize: 100,
MaxFrameSize: 100, ChannelTimeout: 1000,
ApproxComprRatio: 1.0, CompressorConfig: compressor.Config{
ChannelTimeout: 1000, TargetFrameSize: 1,
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
}) })
a := newMiniL2Block(0) a := newMiniL2Block(0)
b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash()) b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash())
...@@ -433,11 +446,13 @@ func TestChannelManagerClosePendingChannel(t *testing.T) { ...@@ -433,11 +446,13 @@ func TestChannelManagerClosePendingChannel(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
TargetNumFrames: 100, MaxFrameSize: 1000,
TargetFrameSize: 1000, ChannelTimeout: 1000,
MaxFrameSize: 1000, CompressorConfig: compressor.Config{
ApproxComprRatio: 1.0, TargetNumFrames: 100,
ChannelTimeout: 1000, TargetFrameSize: 1000,
ApproxComprRatio: 1.0,
},
}) })
a := newMiniL2Block(50_000) a := newMiniL2Block(50_000)
...@@ -476,11 +491,13 @@ func TestChannelManagerCloseAllTxsFailed(t *testing.T) { ...@@ -476,11 +491,13 @@ func TestChannelManagerCloseAllTxsFailed(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
TargetNumFrames: 100, MaxFrameSize: 1000,
TargetFrameSize: 1000, ChannelTimeout: 1000,
MaxFrameSize: 1000, CompressorConfig: compressor.Config{
ApproxComprRatio: 1.0, TargetNumFrames: 100,
ChannelTimeout: 1000, TargetFrameSize: 1000,
ApproxComprRatio: 1.0,
},
}) })
a := newMiniL2Block(50_000) a := newMiniL2Block(50_000)
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli" "github.com/urfave/cli"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-batcher/rpc" "github.com/ethereum-optimism/optimism/op-batcher/rpc"
...@@ -84,23 +85,14 @@ type CLIConfig struct { ...@@ -84,23 +85,14 @@ type CLIConfig struct {
// MaxL1TxSize is the maximum size of a batch tx submitted to L1. // MaxL1TxSize is the maximum size of a batch tx submitted to L1.
MaxL1TxSize uint64 MaxL1TxSize uint64
// TargetL1TxSize is the target size of a batch tx submitted to L1.
TargetL1TxSize uint64
// TargetNumFrames is the target number of frames per channel.
TargetNumFrames int
// ApproxComprRatio is the approximate compression ratio (<= 1.0) of the used
// compression algorithm.
ApproxComprRatio float64
Stopped bool Stopped bool
TxMgrConfig txmgr.CLIConfig TxMgrConfig txmgr.CLIConfig
RPCConfig rpc.CLIConfig RPCConfig rpc.CLIConfig
LogConfig oplog.CLIConfig LogConfig oplog.CLIConfig
MetricsConfig opmetrics.CLIConfig MetricsConfig opmetrics.CLIConfig
PprofConfig oppprof.CLIConfig PprofConfig oppprof.CLIConfig
CompressorConfig compressor.CLIConfig
} }
func (c CLIConfig) Check() error { func (c CLIConfig) Check() error {
...@@ -136,14 +128,12 @@ func NewConfig(ctx *cli.Context) CLIConfig { ...@@ -136,14 +128,12 @@ func NewConfig(ctx *cli.Context) CLIConfig {
MaxPendingTransactions: ctx.GlobalUint64(flags.MaxPendingTransactionsFlag.Name), MaxPendingTransactions: ctx.GlobalUint64(flags.MaxPendingTransactionsFlag.Name),
MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name), MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name),
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name), MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name),
TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name),
ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name),
Stopped: ctx.GlobalBool(flags.StoppedFlag.Name), Stopped: ctx.GlobalBool(flags.StoppedFlag.Name),
TxMgrConfig: txmgr.ReadCLIConfig(ctx), TxMgrConfig: txmgr.ReadCLIConfig(ctx),
RPCConfig: rpc.ReadCLIConfig(ctx), RPCConfig: rpc.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx), LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx), MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx), PprofConfig: oppprof.ReadCLIConfig(ctx),
CompressorConfig: compressor.ReadCLIConfig(ctx),
} }
} }
...@@ -89,10 +89,8 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri ...@@ -89,10 +89,8 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
ChannelTimeout: rcfg.ChannelTimeout, ChannelTimeout: rcfg.ChannelTimeout,
MaxChannelDuration: cfg.MaxChannelDuration, MaxChannelDuration: cfg.MaxChannelDuration,
SubSafetyMargin: cfg.SubSafetyMargin, SubSafetyMargin: cfg.SubSafetyMargin,
MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version
TargetFrameSize: cfg.TargetL1TxSize - 1, // subtract 1 byte for version CompressorConfig: cfg.CompressorConfig.Config(),
TargetNumFrames: cfg.TargetNumFrames,
ApproxComprRatio: cfg.ApproxComprRatio,
}, },
} }
......
package compressor
import (
"strings"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/urfave/cli"
)
const (
TargetL1TxSizeBytesFlagName = "target-l1-tx-size-bytes"
TargetNumFramesFlagName = "target-num-frames"
ApproxComprRatioFlagName = "approx-compr-ratio"
KindFlagName = "compressor"
)
func CLIFlags(envPrefix string) []cli.Flag {
return []cli.Flag{
cli.Uint64Flag{
Name: TargetL1TxSizeBytesFlagName,
Usage: "The target size of a batch tx submitted to L1.",
Value: 100_000,
EnvVar: opservice.PrefixEnvVar(envPrefix, "TARGET_L1_TX_SIZE_BYTES"),
},
cli.IntFlag{
Name: TargetNumFramesFlagName,
Usage: "The target number of frames to create per channel",
Value: 1,
EnvVar: opservice.PrefixEnvVar(envPrefix, "TARGET_NUM_FRAMES"),
},
cli.Float64Flag{
Name: ApproxComprRatioFlagName,
Usage: "The approximate compression ratio (<= 1.0)",
Value: 0.4,
EnvVar: opservice.PrefixEnvVar(envPrefix, "APPROX_COMPR_RATIO"),
},
cli.StringFlag{
Name: KindFlagName,
Usage: "The type of compressor. Valid options: " + strings.Join(KindKeys, ", "),
EnvVar: opservice.PrefixEnvVar(envPrefix, "COMPRESSOR"),
Value: RatioKind,
},
}
}
type CLIConfig struct {
// TargetL1TxSizeBytes to target when creating channel frames. Note that if the
// realized compression ratio is worse than the approximate, more frames may
// actually be created. This also depends on how close the target is to the
// max frame size.
TargetL1TxSizeBytes uint64
// TargetNumFrames to create in this channel. If the realized compression ratio
// is worse than approxComprRatio, additional leftover frame(s) might get created.
TargetNumFrames int
// ApproxComprRatio to assume. Should be slightly smaller than average from
// experiments to avoid the chances of creating a small additional leftover frame.
ApproxComprRatio float64
// Type of compressor to use. Must be one of KindKeys.
Kind string
}
func (c *CLIConfig) Config() Config {
return Config{
TargetFrameSize: c.TargetL1TxSizeBytes - 1, // subtract 1 byte for version
TargetNumFrames: c.TargetNumFrames,
ApproxComprRatio: c.ApproxComprRatio,
Kind: c.Kind,
}
}
func ReadCLIConfig(ctx *cli.Context) CLIConfig {
return CLIConfig{
Kind: ctx.GlobalString(KindFlagName),
TargetL1TxSizeBytes: ctx.GlobalUint64(TargetL1TxSizeBytesFlagName),
TargetNumFrames: ctx.GlobalInt(TargetNumFramesFlagName),
ApproxComprRatio: ctx.GlobalFloat64(ApproxComprRatioFlagName),
}
}
package compressor
import (
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)
type FactoryFunc func(Config) (derive.Compressor, error)
const RatioKind = "ratio"
const ShadowKind = "shadow"
var Kinds = map[string]FactoryFunc{
RatioKind: NewRatioCompressor,
ShadowKind: NewShadowCompressor,
}
var KindKeys []string
func init() {
for k := range Kinds {
KindKeys = append(KindKeys, k)
}
}
package compressor
import (
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)
type Config struct {
// TargetFrameSize to target when creating channel frames. Note that if the
// realized compression ratio is worse than the approximate, more frames may
// actually be created. This also depends on how close the target is to the
// max frame size.
TargetFrameSize uint64
// TargetNumFrames to create in this channel. If the realized compression ratio
// is worse than approxComprRatio, additional leftover frame(s) might get created.
TargetNumFrames int
// ApproxComprRatio to assume. Should be slightly smaller than average from
// experiments to avoid the chances of creating a small additional leftover frame.
ApproxComprRatio float64
// Kind of compressor to use. Must
Kind string
}
func (c Config) NewCompressor() (derive.Compressor, error) {
if k, ok := Kinds[c.Kind]; ok {
return k(c)
}
// default to RatioCompressor
return Kinds[RatioKind](c)
}
package compressor
import (
"bytes"
"compress/zlib"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)
type RatioCompressor struct {
config Config
inputBytes int
buf bytes.Buffer
compress *zlib.Writer
}
// NewRatioCompressor creates a new derive.Compressor implementation that uses the target
// size and a compression ratio parameter to determine how much data can be written to
// the compressor before it's considered full. The full calculation is as follows:
//
// full = uncompressedLength * approxCompRatio >= targetFrameSize * targetNumFrames
func NewRatioCompressor(config Config) (derive.Compressor, error) {
c := &RatioCompressor{
config: config,
}
compress, err := zlib.NewWriterLevel(&c.buf, zlib.BestCompression)
if err != nil {
return nil, err
}
c.compress = compress
return c, nil
}
func (t *RatioCompressor) Write(p []byte) (int, error) {
if err := t.FullErr(); err != nil {
return 0, err
}
t.inputBytes += len(p)
return t.compress.Write(p)
}
func (t *RatioCompressor) Close() error {
return t.compress.Close()
}
func (t *RatioCompressor) Read(p []byte) (int, error) {
return t.buf.Read(p)
}
func (t *RatioCompressor) Reset() {
t.buf.Reset()
t.compress.Reset(&t.buf)
t.inputBytes = 0
}
func (t *RatioCompressor) Len() int {
return t.buf.Len()
}
func (t *RatioCompressor) Flush() error {
return t.compress.Flush()
}
func (t *RatioCompressor) FullErr() error {
if t.inputTargetReached() {
return derive.CompressorFullErr
}
return nil
}
// InputThreshold calculates the input data threshold in bytes from the given
// parameters.
func (t *RatioCompressor) InputThreshold() uint64 {
return uint64(float64(t.config.TargetNumFrames) * float64(t.config.TargetFrameSize) / t.config.ApproxComprRatio)
}
// inputTargetReached says whether the target amount of input data has been
// reached in this channel builder. No more blocks can be added afterwards.
func (t *RatioCompressor) inputTargetReached() bool {
return uint64(t.inputBytes) >= t.InputThreshold()
}
package batcher_test package compressor_test
import ( import (
"math" "math"
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-batcher/batcher" "github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
...@@ -118,12 +118,13 @@ func TestInputThreshold(t *testing.T) { ...@@ -118,12 +118,13 @@ func TestInputThreshold(t *testing.T) {
// Validate each test case // Validate each test case
for _, tt := range tests { for _, tt := range tests {
config := batcher.ChannelConfig{ comp, err := compressor.NewRatioCompressor(compressor.Config{
TargetFrameSize: tt.input.TargetFrameSize, TargetFrameSize: tt.input.TargetFrameSize,
TargetNumFrames: tt.input.TargetNumFrames, TargetNumFrames: tt.input.TargetNumFrames,
ApproxComprRatio: tt.input.ApproxComprRatio, ApproxComprRatio: tt.input.ApproxComprRatio,
} })
got := config.InputThreshold() require.NoError(t, err)
got := comp.(*compressor.RatioCompressor).InputThreshold()
tt.assertion(got) tt.assertion(got)
} }
} }
package compressor
import (
"bytes"
"compress/zlib"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)
type ShadowCompressor struct {
config Config
buf bytes.Buffer
compress *zlib.Writer
shadowBuf bytes.Buffer
shadowCompress *zlib.Writer
fullErr error
}
// NewShadowCompressor creates a new derive.Compressor implementation that contains two
// compression buffers: one used for size estimation, and one used for the final
// compressed output. The first is flushed on every write, the second isn't, which means
// the final compressed data is always slightly smaller than the target. There is one
// exception to this rule: the first write to the buffer is not checked against the
// target, which allows individual blocks larger than the target to be included (and will
// be split across multiple channel frames).
func NewShadowCompressor(config Config) (derive.Compressor, error) {
c := &ShadowCompressor{
config: config,
}
var err error
c.compress, err = zlib.NewWriterLevel(&c.buf, zlib.BestCompression)
if err != nil {
return nil, err
}
c.shadowCompress, err = zlib.NewWriterLevel(&c.shadowBuf, zlib.BestCompression)
if err != nil {
return nil, err
}
return c, nil
}
func (t *ShadowCompressor) Write(p []byte) (int, error) {
_, err := t.shadowCompress.Write(p)
if err != nil {
return 0, err
}
err = t.shadowCompress.Flush()
if err != nil {
return 0, err
}
if uint64(t.shadowBuf.Len()) > t.config.TargetFrameSize*uint64(t.config.TargetNumFrames) {
t.fullErr = derive.CompressorFullErr
if t.Len() > 0 {
// only return an error if we've already written data to this compressor before
// (otherwise individual blocks over the target would never be written)
return 0, t.fullErr
}
}
return t.compress.Write(p)
}
func (t *ShadowCompressor) Close() error {
return t.compress.Close()
}
func (t *ShadowCompressor) Read(p []byte) (int, error) {
return t.buf.Read(p)
}
func (t *ShadowCompressor) Reset() {
t.buf.Reset()
t.compress.Reset(&t.buf)
t.shadowBuf.Reset()
t.shadowCompress.Reset(&t.shadowBuf)
t.fullErr = nil
}
func (t *ShadowCompressor) Len() int {
return t.buf.Len()
}
func (t *ShadowCompressor) Flush() error {
return t.compress.Flush()
}
func (t *ShadowCompressor) FullErr() error {
return t.fullErr
}
package compressor_test
import (
"bytes"
"compress/zlib"
"io"
"math/rand"
"testing"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/stretchr/testify/require"
)
func randomBytes(t *testing.T, length int) []byte {
b := make([]byte, length)
_, err := rand.Read(b)
require.NoError(t, err)
return b
}
func TestShadowCompressor(t *testing.T) {
type test struct {
name string
targetFrameSize uint64
targetNumFrames int
data [][]byte
errs []error
fullErr error
}
tests := []test{{
name: "no data",
targetFrameSize: 1,
targetNumFrames: 1,
data: [][]byte{},
errs: []error{},
fullErr: nil,
}, {
name: "large first block",
targetFrameSize: 1,
targetNumFrames: 1,
data: [][]byte{bytes.Repeat([]byte{0}, 1024)},
errs: []error{nil},
fullErr: derive.CompressorFullErr,
}, {
name: "large second block",
targetFrameSize: 1,
targetNumFrames: 1,
data: [][]byte{bytes.Repeat([]byte{0}, 512), bytes.Repeat([]byte{0}, 1024)},
errs: []error{nil, derive.CompressorFullErr},
fullErr: derive.CompressorFullErr,
}, {
name: "random data",
targetFrameSize: 1200,
targetNumFrames: 1,
data: [][]byte{randomBytes(t, 512), randomBytes(t, 512), randomBytes(t, 512)},
errs: []error{nil, nil, derive.CompressorFullErr},
fullErr: derive.CompressorFullErr,
}}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
require.Equal(t, len(test.errs), len(test.data), "invalid test case: len(data) != len(errs)")
sc, err := compressor.NewShadowCompressor(compressor.Config{
TargetFrameSize: test.targetFrameSize,
TargetNumFrames: test.targetNumFrames,
})
require.NoError(t, err)
for i, d := range test.data {
_, err = sc.Write(d)
if test.errs[i] != nil {
require.ErrorIs(t, err, test.errs[i])
require.Equal(t, i, len(test.data)-1)
} else {
require.NoError(t, err)
}
}
if test.fullErr != nil {
require.ErrorIs(t, sc.FullErr(), test.fullErr)
} else {
require.NoError(t, sc.FullErr())
}
err = sc.Close()
require.NoError(t, err)
buf, err := io.ReadAll(sc)
require.NoError(t, err)
r, err := zlib.NewReader(bytes.NewBuffer(buf))
require.NoError(t, err)
uncompressed, err := io.ReadAll(r)
require.NoError(t, err)
concat := make([]byte, 0)
for i, d := range test.data {
if test.errs[i] != nil {
break
}
concat = append(concat, d...)
}
require.Equal(t, concat, uncompressed)
})
}
}
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"github.com/urfave/cli" "github.com/urfave/cli"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/rpc" "github.com/ethereum-optimism/optimism/op-batcher/rpc"
opservice "github.com/ethereum-optimism/optimism/op-service" opservice "github.com/ethereum-optimism/optimism/op-service"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
...@@ -67,24 +68,6 @@ var ( ...@@ -67,24 +68,6 @@ var (
Value: 120_000, Value: 120_000,
EnvVar: opservice.PrefixEnvVar(EnvVarPrefix, "MAX_L1_TX_SIZE_BYTES"), EnvVar: opservice.PrefixEnvVar(EnvVarPrefix, "MAX_L1_TX_SIZE_BYTES"),
} }
TargetL1TxSizeBytesFlag = cli.Uint64Flag{
Name: "target-l1-tx-size-bytes",
Usage: "The target size of a batch tx submitted to L1.",
Value: 100_000,
EnvVar: opservice.PrefixEnvVar(EnvVarPrefix, "TARGET_L1_TX_SIZE_BYTES"),
}
TargetNumFramesFlag = cli.IntFlag{
Name: "target-num-frames",
Usage: "The target number of frames to create per channel",
Value: 1,
EnvVar: opservice.PrefixEnvVar(EnvVarPrefix, "TARGET_NUM_FRAMES"),
}
ApproxComprRatioFlag = cli.Float64Flag{
Name: "approx-compr-ratio",
Usage: "The approximate compression ratio (<= 1.0)",
Value: 0.4,
EnvVar: opservice.PrefixEnvVar(EnvVarPrefix, "APPROX_COMPR_RATIO"),
}
StoppedFlag = cli.BoolFlag{ StoppedFlag = cli.BoolFlag{
Name: "stopped", Name: "stopped",
Usage: "Initialize the batcher in a stopped state. The batcher can be started using the admin_startBatcher RPC", Usage: "Initialize the batcher in a stopped state. The batcher can be started using the admin_startBatcher RPC",
...@@ -106,9 +89,6 @@ var optionalFlags = []cli.Flag{ ...@@ -106,9 +89,6 @@ var optionalFlags = []cli.Flag{
MaxPendingTransactionsFlag, MaxPendingTransactionsFlag,
MaxChannelDurationFlag, MaxChannelDurationFlag,
MaxL1TxSizeBytesFlag, MaxL1TxSizeBytesFlag,
TargetL1TxSizeBytesFlag,
TargetNumFramesFlag,
ApproxComprRatioFlag,
StoppedFlag, StoppedFlag,
SequencerHDPathFlag, SequencerHDPathFlag,
} }
...@@ -120,6 +100,7 @@ func init() { ...@@ -120,6 +100,7 @@ func init() {
optionalFlags = append(optionalFlags, oppprof.CLIFlags(EnvVarPrefix)...) optionalFlags = append(optionalFlags, oppprof.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, rpc.CLIFlags(EnvVarPrefix)...) optionalFlags = append(optionalFlags, rpc.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, txmgr.CLIFlags(EnvVarPrefix)...) optionalFlags = append(optionalFlags, txmgr.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, compressor.CLIFlags(EnvVarPrefix)...)
Flags = append(requiredFlags, optionalFlags...) Flags = append(requiredFlags, optionalFlags...)
} }
......
...@@ -202,6 +202,10 @@ func main() { ...@@ -202,6 +202,10 @@ func main() {
dryRun := ctx.Bool("dry-run") dryRun := ctx.Bool("dry-run")
noCheck := ctx.Bool("no-check") noCheck := ctx.Bool("no-check")
if noCheck {
panic("must run with check on")
}
// Perform the migration // Perform the migration
res, err := genesis.MigrateDB(ldb, config, block, &migrationData, !dryRun, noCheck) res, err := genesis.MigrateDB(ldb, config, block, &migrationData, !dryRun, noCheck)
if err != nil { if err != nil {
......
...@@ -57,7 +57,9 @@ func (w *LegacyWithdrawal) Encode() ([]byte, error) { ...@@ -57,7 +57,9 @@ func (w *LegacyWithdrawal) Encode() ([]byte, error) {
return out, nil return out, nil
} }
// Decode will decode a serialized LegacyWithdrawal // Decode will decode a serialized LegacyWithdrawal. There is a known inconsistency
// where the decoded `msg.sender` is not authenticated. A round trip of encoding and
// decoding with a spoofed withdrawal will result in a different message being recovered.
func (w *LegacyWithdrawal) Decode(data []byte) error { func (w *LegacyWithdrawal) Decode(data []byte) error {
if len(data) < len(predeploys.L2CrossDomainMessengerAddr)+4 { if len(data) < len(predeploys.L2CrossDomainMessengerAddr)+4 {
return fmt.Errorf("withdrawal data too short: %d", len(data)) return fmt.Errorf("withdrawal data too short: %d", len(data))
...@@ -68,6 +70,8 @@ func (w *LegacyWithdrawal) Decode(data []byte) error { ...@@ -68,6 +70,8 @@ func (w *LegacyWithdrawal) Decode(data []byte) error {
return fmt.Errorf("invalid selector: 0x%x", data[0:4]) return fmt.Errorf("invalid selector: 0x%x", data[0:4])
} }
// This should be the L2CrossDomainMessenger address but is not guaranteed
// to be.
msgSender := data[len(data)-len(predeploys.L2CrossDomainMessengerAddr):] msgSender := data[len(data)-len(predeploys.L2CrossDomainMessengerAddr):]
raw := data[4 : len(data)-len(predeploys.L2CrossDomainMessengerAddr)] raw := data[4 : len(data)-len(predeploys.L2CrossDomainMessengerAddr)]
......
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
...@@ -133,7 +134,13 @@ func (s *L2Batcher) Buffer(t Testing) error { ...@@ -133,7 +134,13 @@ func (s *L2Batcher) Buffer(t Testing) error {
if s.l2BatcherCfg.GarbageCfg != nil { if s.l2BatcherCfg.GarbageCfg != nil {
ch, err = NewGarbageChannelOut(s.l2BatcherCfg.GarbageCfg) ch, err = NewGarbageChannelOut(s.l2BatcherCfg.GarbageCfg)
} else { } else {
ch, err = derive.NewChannelOut() c, e := compressor.NewRatioCompressor(compressor.Config{
TargetFrameSize: s.l2BatcherCfg.MaxL1TxSize,
TargetNumFrames: 1,
ApproxComprRatio: 1,
})
require.NoError(t, e, "failed to create compressor")
ch, err = derive.NewChannelOut(c)
} }
require.NoError(t, err, "failed to create channel") require.NoError(t, err, "failed to create channel")
s.l2ChannelOut = ch s.l2ChannelOut = ch
......
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"time" "time"
bss "github.com/ethereum-optimism/optimism/op-batcher/batcher" bss "github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
batchermetrics "github.com/ethereum-optimism/optimism/op-batcher/metrics" batchermetrics "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/chaincfg" "github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
...@@ -335,12 +336,14 @@ func TestMigration(t *testing.T) { ...@@ -335,12 +336,14 @@ func TestMigration(t *testing.T) {
RollupRpc: rollupNode.HTTPEndpoint(), RollupRpc: rollupNode.HTTPEndpoint(),
MaxChannelDuration: 1, MaxChannelDuration: 1,
MaxL1TxSize: 120_000, MaxL1TxSize: 120_000,
TargetL1TxSize: 100_000, CompressorConfig: compressor.CLIConfig{
TargetNumFrames: 1, TargetL1TxSizeBytes: 100_000,
ApproxComprRatio: 0.4, TargetNumFrames: 1,
SubSafetyMargin: 4, ApproxComprRatio: 0.4,
PollInterval: 50 * time.Millisecond, },
TxMgrConfig: newTxMgrConfig(forkedL1URL, secrets.Batcher), SubSafetyMargin: 4,
PollInterval: 50 * time.Millisecond,
TxMgrConfig: newTxMgrConfig(forkedL1URL, secrets.Batcher),
LogConfig: oplog.CLIConfig{ LogConfig: oplog.CLIConfig{
Level: "info", Level: "info",
Format: "text", Format: "text",
......
...@@ -24,6 +24,7 @@ import ( ...@@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
bss "github.com/ethereum-optimism/optimism/op-batcher/batcher" bss "github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
batchermetrics "github.com/ethereum-optimism/optimism/op-batcher/metrics" batchermetrics "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-bindings/predeploys" "github.com/ethereum-optimism/optimism/op-bindings/predeploys"
"github.com/ethereum-optimism/optimism/op-chain-ops/genesis" "github.com/ethereum-optimism/optimism/op-chain-ops/genesis"
...@@ -599,12 +600,14 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -599,12 +600,14 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
MaxPendingTransactions: 1, MaxPendingTransactions: 1,
MaxChannelDuration: 1, MaxChannelDuration: 1,
MaxL1TxSize: 120_000, MaxL1TxSize: 120_000,
TargetL1TxSize: 100_000, CompressorConfig: compressor.CLIConfig{
TargetNumFrames: 1, TargetL1TxSizeBytes: 100_000,
ApproxComprRatio: 0.4, TargetNumFrames: 1,
SubSafetyMargin: 4, ApproxComprRatio: 0.4,
PollInterval: 50 * time.Millisecond, },
TxMgrConfig: newTxMgrConfig(sys.Nodes["l1"].WSEndpoint(), cfg.Secrets.Batcher), SubSafetyMargin: 4,
PollInterval: 50 * time.Millisecond,
TxMgrConfig: newTxMgrConfig(sys.Nodes["l1"].WSEndpoint(), cfg.Secrets.Batcher),
LogConfig: oplog.CLIConfig{ LogConfig: oplog.CLIConfig{
Level: "info", Level: "info",
Format: "text", Format: "text",
......
...@@ -37,7 +37,6 @@ func newRPCServer(ctx context.Context, rpcCfg *RPCConfig, rollupCfg *rollup.Conf ...@@ -37,7 +37,6 @@ func newRPCServer(ctx context.Context, rpcCfg *RPCConfig, rollupCfg *rollup.Conf
apis: []rpc.API{{ apis: []rpc.API{{
Namespace: "optimism", Namespace: "optimism",
Service: api, Service: api,
Public: true,
Authenticated: false, Authenticated: false,
}}, }},
appVersion: appVersion, appVersion: appVersion,
...@@ -51,7 +50,6 @@ func (s *rpcServer) EnableAdminAPI(api *adminAPI) { ...@@ -51,7 +50,6 @@ func (s *rpcServer) EnableAdminAPI(api *adminAPI) {
Namespace: "admin", Namespace: "admin",
Version: "", Version: "",
Service: api, Service: api,
Public: true, // TODO: this field is deprecated. Do we even need this anymore?
Authenticated: false, Authenticated: false,
}) })
} }
...@@ -61,7 +59,6 @@ func (s *rpcServer) EnableP2P(backend *p2p.APIBackend) { ...@@ -61,7 +59,6 @@ func (s *rpcServer) EnableP2P(backend *p2p.APIBackend) {
Namespace: p2p.NamespaceRPC, Namespace: p2p.NamespaceRPC,
Version: "", Version: "",
Service: backend, Service: backend,
Public: true,
Authenticated: false, Authenticated: false,
}) })
} }
......
...@@ -2,7 +2,6 @@ package derive ...@@ -2,7 +2,6 @@ package derive
import ( import (
"bytes" "bytes"
"compress/zlib"
"crypto/rand" "crypto/rand"
"errors" "errors"
"fmt" "fmt"
...@@ -25,6 +24,30 @@ var ErrTooManyRLPBytes = errors.New("batch would cause RLP bytes to go over limi ...@@ -25,6 +24,30 @@ var ErrTooManyRLPBytes = errors.New("batch would cause RLP bytes to go over limi
// [Frame Format]: https://github.com/ethereum-optimism/optimism/blob/develop/specs/derivation.md#frame-format // [Frame Format]: https://github.com/ethereum-optimism/optimism/blob/develop/specs/derivation.md#frame-format
const FrameV0OverHeadSize = 23 const FrameV0OverHeadSize = 23
var CompressorFullErr = errors.New("compressor is full")
type Compressor interface {
// Writer is used to write uncompressed data which will be compressed. Should return
// CompressorFullErr if the compressor is full and no more data should be written.
io.Writer
// Closer Close function should be called before reading any data.
io.Closer
// Reader is used to Read compressed data; should only be called after Close.
io.Reader
// Reset will reset all written data
Reset()
// Len returns an estimate of the current length of the compressed data; calling Flush will
// increase the accuracy at the expense of a poorer compression ratio.
Len() int
// Flush flushes any uncompressed data to the compression buffer. This will result in a
// non-optimal compression ratio.
Flush() error
// FullErr returns CompressorFullErr if the compressor is known to be full. Note that
// calls to Write will fail if an error is returned from this method, but calls to Write
// can still return CompressorFullErr even if this does not.
FullErr() error
}
type ChannelOut struct { type ChannelOut struct {
id ChannelID id ChannelID
// Frame ID of the next frame to emit. Increment after emitting // Frame ID of the next frame to emit. Increment after emitting
...@@ -33,9 +56,7 @@ type ChannelOut struct { ...@@ -33,9 +56,7 @@ type ChannelOut struct {
rlpLength int rlpLength int
// Compressor stage. Write input data to it // Compressor stage. Write input data to it
compress *zlib.Writer compress Compressor
// post compression buffer
buf bytes.Buffer
closed bool closed bool
} }
...@@ -44,23 +65,18 @@ func (co *ChannelOut) ID() ChannelID { ...@@ -44,23 +65,18 @@ func (co *ChannelOut) ID() ChannelID {
return co.id return co.id
} }
func NewChannelOut() (*ChannelOut, error) { func NewChannelOut(compress Compressor) (*ChannelOut, error) {
c := &ChannelOut{ c := &ChannelOut{
id: ChannelID{}, // TODO: use GUID here instead of fully random data id: ChannelID{}, // TODO: use GUID here instead of fully random data
frame: 0, frame: 0,
rlpLength: 0, rlpLength: 0,
compress: compress,
} }
_, err := rand.Read(c.id[:]) _, err := rand.Read(c.id[:])
if err != nil { if err != nil {
return nil, err return nil, err
} }
compress, err := zlib.NewWriterLevel(&c.buf, zlib.BestCompression)
if err != nil {
return nil, err
}
c.compress = compress
return c, nil return c, nil
} }
...@@ -68,8 +84,7 @@ func NewChannelOut() (*ChannelOut, error) { ...@@ -68,8 +84,7 @@ func NewChannelOut() (*ChannelOut, error) {
func (co *ChannelOut) Reset() error { func (co *ChannelOut) Reset() error {
co.frame = 0 co.frame = 0
co.rlpLength = 0 co.rlpLength = 0
co.buf.Reset() co.compress.Reset()
co.compress.Reset(&co.buf)
co.closed = false co.closed = false
_, err := rand.Read(co.id[:]) _, err := rand.Read(co.id[:])
return err return err
...@@ -116,7 +131,8 @@ func (co *ChannelOut) AddBatch(batch *BatchData) (uint64, error) { ...@@ -116,7 +131,8 @@ func (co *ChannelOut) AddBatch(batch *BatchData) (uint64, error) {
} }
co.rlpLength += buf.Len() co.rlpLength += buf.Len()
written, err := io.Copy(co.compress, &buf) // avoid using io.Copy here, because we need all or nothing
written, err := co.compress.Write(buf.Bytes())
return uint64(written), err return uint64(written), err
} }
...@@ -129,7 +145,7 @@ func (co *ChannelOut) InputBytes() int { ...@@ -129,7 +145,7 @@ func (co *ChannelOut) InputBytes() int {
// Use `Flush` or `Close` to move data from the compression buffer into the ready buffer if more bytes // Use `Flush` or `Close` to move data from the compression buffer into the ready buffer if more bytes
// are needed. Add blocks may add to the ready buffer, but it is not guaranteed due to the compression stage. // are needed. Add blocks may add to the ready buffer, but it is not guaranteed due to the compression stage.
func (co *ChannelOut) ReadyBytes() int { func (co *ChannelOut) ReadyBytes() int {
return co.buf.Len() return co.compress.Len()
} }
// Flush flushes the internal compression stage to the ready buffer. It enables pulling a larger & more // Flush flushes the internal compression stage to the ready buffer. It enables pulling a larger & more
...@@ -138,6 +154,10 @@ func (co *ChannelOut) Flush() error { ...@@ -138,6 +154,10 @@ func (co *ChannelOut) Flush() error {
return co.compress.Flush() return co.compress.Flush()
} }
func (co *ChannelOut) FullErr() error {
return co.compress.FullErr()
}
func (co *ChannelOut) Close() error { func (co *ChannelOut) Close() error {
if co.closed { if co.closed {
return errors.New("already closed") return errors.New("already closed")
...@@ -166,8 +186,8 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, erro ...@@ -166,8 +186,8 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, erro
// Copy data from the local buffer into the frame data buffer // Copy data from the local buffer into the frame data buffer
maxDataSize := maxSize - FrameV0OverHeadSize maxDataSize := maxSize - FrameV0OverHeadSize
if maxDataSize > uint64(co.buf.Len()) { if maxDataSize > uint64(co.compress.Len()) {
maxDataSize = uint64(co.buf.Len()) maxDataSize = uint64(co.compress.Len())
// If we are closed & will not spill past the current frame // If we are closed & will not spill past the current frame
// mark it is the final frame of the channel. // mark it is the final frame of the channel.
if co.closed { if co.closed {
...@@ -176,7 +196,7 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, erro ...@@ -176,7 +196,7 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, erro
} }
f.Data = make([]byte, maxDataSize) f.Data = make([]byte, maxDataSize)
if _, err := io.ReadFull(&co.buf, f.Data); err != nil { if _, err := io.ReadFull(co.compress, f.Data); err != nil {
return 0, err return 0, err
} }
......
...@@ -11,8 +11,25 @@ import ( ...@@ -11,8 +11,25 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
// basic implementation of the Compressor interface that does no compression
type nonCompressor struct {
bytes.Buffer
}
func (s *nonCompressor) Flush() error {
return nil
}
func (s *nonCompressor) Close() error {
return nil
}
func (s *nonCompressor) FullErr() error {
return nil
}
func TestChannelOutAddBlock(t *testing.T) { func TestChannelOutAddBlock(t *testing.T) {
cout, err := NewChannelOut() cout, err := NewChannelOut(&nonCompressor{})
require.NoError(t, err) require.NoError(t, err)
t.Run("returns err if first tx is not an l1info tx", func(t *testing.T) { t.Run("returns err if first tx is not an l1info tx", func(t *testing.T) {
...@@ -33,7 +50,7 @@ func TestChannelOutAddBlock(t *testing.T) { ...@@ -33,7 +50,7 @@ func TestChannelOutAddBlock(t *testing.T) {
// max size that is below the fixed frame size overhead of 23, will return // max size that is below the fixed frame size overhead of 23, will return
// an error. // an error.
func TestOutputFrameSmallMaxSize(t *testing.T) { func TestOutputFrameSmallMaxSize(t *testing.T) {
cout, err := NewChannelOut() cout, err := NewChannelOut(&nonCompressor{})
require.NoError(t, err) require.NoError(t, err)
// Call OutputFrame with the range of small max size values that err // Call OutputFrame with the range of small max size values that err
......
{
"finalSystemOwner": "0x62790eFcB3a5f3A5D398F95B47930A9Addd83807",
"portalGuardian": "0x62790eFcB3a5f3A5D398F95B47930A9Addd83807",
"controller": "0x2d30335B0b807bBa1682C487BaAFD2Ad6da5D675",
"l1StartingBlockTag": "0x4104895a540d87127ff11eef0d51d8f63ce00a6fc211db751a45a4b3a61a9c83",
"l1ChainID": 5,
"l2ChainID": 420,
"l2BlockTime": 2,
"maxSequencerDrift": 1200,
"sequencerWindowSize": 3600,
"channelTimeout": 120,
"p2pSequencerAddress": "0xCBABF46d40982B4530c0EAc9889f6e44e17f0383",
"batchInboxAddress": "0xff00000000000000000000000000000000000420",
"batchSenderAddress": "0x3a2baA0160275024A50C1be1FC677375E7DB4Bd7",
"l2OutputOracleSubmissionInterval": 20,
"l2OutputOracleStartingTimestamp": 1670625264,
"l2OutputOracleStartingBlockNumber": 3324764,
"l2OutputOracleProposer": "0x88BCa4Af3d950625752867f826E073E337076581",
"l2OutputOracleChallenger": "0x88BCa4Af3d950625752867f826E073E337076581",
"finalizationPeriodSeconds": 2,
"proxyAdminOwner": "0x62790eFcB3a5f3A5D398F95B47930A9Addd83807",
"governanceTokenName": "Optimism",
"governanceTokenSymbol": "OP",
"governanceTokenOwner": "0x038a8825A3C3B0c08d52Cc76E5E361953Cf6Dc76",
"l2GenesisBlockGasLimit": "0x1c9c380",
"l2GenesisBlockCoinbase": "0x4200000000000000000000000000000000000011",
"l2GenesisBlockBaseFeePerGas": "0x3b9aca00",
"gasPriceOracleOverhead": 2100,
"gasPriceOracleScalar": 1000000,
"eip1559Denominator": 50,
"eip1559Elasticity": 10
}
import { DeployConfig } from '../src/deploy-config'
import config from './final-migration-rehearsal.json'
export default config satisfies DeployConfig
...@@ -19,9 +19,9 @@ ...@@ -19,9 +19,9 @@
"l2OutputOracleChallenger": "0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC", "l2OutputOracleChallenger": "0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC",
"finalizationPeriodSeconds": 2, "finalizationPeriodSeconds": 2,
"proxyAdminOwner": "0x90F79bf6EB2c4f870365E785982E1f101E93b906", "proxyAdminOwner": "0x90F79bf6EB2c4f870365E785982E1f101E93b906",
"baseFeeVaultRecipient": "0x90F79bf6EB2c4f870365E785982E1f101E93b906", "baseFeeVaultRecipient": "0xa3d596EAfaB6B13Ab18D40FaE1A962700C84ADEa",
"l1FeeVaultRecipient": "0x90F79bf6EB2c4f870365E785982E1f101E93b906", "l1FeeVaultRecipient": "0xa3d596EAfaB6B13Ab18D40FaE1A962700C84ADEa",
"sequencerFeeVaultRecipient": "0x90F79bf6EB2c4f870365E785982E1f101E93b906", "sequencerFeeVaultRecipient": "0xa3d596EAfaB6B13Ab18D40FaE1A962700C84ADEa",
"governanceTokenName": "Optimism", "governanceTokenName": "Optimism",
"governanceTokenSymbol": "OP", "governanceTokenSymbol": "OP",
"governanceTokenOwner": "0x90F79bf6EB2c4f870365E785982E1f101E93b906", "governanceTokenOwner": "0x90F79bf6EB2c4f870365E785982E1f101E93b906",
......
...@@ -93,12 +93,6 @@ const config: HardhatUserConfig = { ...@@ -93,12 +93,6 @@ const config: HardhatUserConfig = {
accounts: [process.env.PRIVATE_KEY_DEPLOYER || ethers.constants.HashZero], accounts: [process.env.PRIVATE_KEY_DEPLOYER || ethers.constants.HashZero],
live: true, live: true,
}, },
'final-migration-rehearsal': {
chainId: 5,
url: process.env.L1_RPC || '',
accounts: [process.env.PRIVATE_KEY_DEPLOYER || ethers.constants.HashZero],
live: true,
},
'internal-devnet': { 'internal-devnet': {
chainId: 5, chainId: 5,
url: process.env.L1_RPC || '', url: process.env.L1_RPC || '',
...@@ -149,10 +143,6 @@ const config: HardhatUserConfig = { ...@@ -149,10 +143,6 @@ const config: HardhatUserConfig = {
'../contracts/deployments/goerli', '../contracts/deployments/goerli',
'../contracts-periphery/deployments/goerli', '../contracts-periphery/deployments/goerli',
], ],
'final-migration-rehearsal': [
'../contracts/deployments/goerli',
'../contracts-periphery/deployments/goerli',
],
}, },
}, },
solidity: { solidity: {
......
...@@ -44,6 +44,7 @@ type backendState struct { ...@@ -44,6 +44,7 @@ type backendState struct {
latestBlockNumber hexutil.Uint64 latestBlockNumber hexutil.Uint64
latestBlockHash string latestBlockHash string
peerCount uint64 peerCount uint64
inSync bool
lastUpdate time.Time lastUpdate time.Time
...@@ -215,7 +216,13 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { ...@@ -215,7 +216,13 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
RecordConsensusBackendBanned(be, banned) RecordConsensusBackendBanned(be, banned)
if banned { if banned {
log.Debug("skipping backend banned", "backend", be.Name) log.Debug("skipping backend - banned", "backend", be.Name)
return
}
// if backend exhausted rate limit we'll skip it for now
if be.IsRateLimited() {
log.Debug("skipping backend - rate limited", "backend", be.Name)
return return
} }
...@@ -228,24 +235,16 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { ...@@ -228,24 +235,16 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
// if backend it not in sync we'll check again after ban // if backend it not in sync we'll check again after ban
inSync, err := cp.isInSync(ctx, be) inSync, err := cp.isInSync(ctx, be)
if err != nil || !inSync { RecordConsensusBackendInSync(be, err == nil && inSync)
log.Warn("backend banned - not in sync", "backend", be.Name) if err != nil {
cp.Ban(be) log.Warn("error updating backend sync state", "name", be.Name, "err", err)
return
}
RecordConsensusBackendInSync(be, inSync)
// if backend exhausted rate limit we'll skip it for now
if be.IsRateLimited() {
return
} }
var peerCount uint64 var peerCount uint64
if !be.skipPeerCountCheck { if !be.skipPeerCountCheck {
peerCount, err = cp.getPeerCount(ctx, be) peerCount, err = cp.getPeerCount(ctx, be)
if err != nil { if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err) log.Warn("error updating backend peer count", "name", be.Name, "err", err)
return
} }
RecordConsensusBackendPeerCount(be, peerCount) RecordConsensusBackendPeerCount(be, peerCount)
} }
...@@ -253,10 +252,9 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { ...@@ -253,10 +252,9 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest") latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest")
if err != nil { if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err) log.Warn("error updating backend", "name", be.Name, "err", err)
return
} }
changed, updateDelay := cp.setBackendState(be, peerCount, latestBlockNumber, latestBlockHash) changed, updateDelay := cp.setBackendState(be, peerCount, inSync, latestBlockNumber, latestBlockHash)
if changed { if changed {
RecordBackendLatestBlock(be, latestBlockNumber) RecordBackendLatestBlock(be, latestBlockNumber)
...@@ -264,6 +262,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { ...@@ -264,6 +262,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
log.Debug("backend state updated", log.Debug("backend state updated",
"name", be.Name, "name", be.Name,
"peerCount", peerCount, "peerCount", peerCount,
"inSync", inSync,
"latestBlockNumber", latestBlockNumber, "latestBlockNumber", latestBlockNumber,
"latestBlockHash", latestBlockHash, "latestBlockHash", latestBlockHash,
"updateDelay", updateDelay) "updateDelay", updateDelay)
...@@ -280,11 +279,14 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { ...@@ -280,11 +279,14 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
// find the highest block, in order to use it defining the highest non-lagging ancestor block // find the highest block, in order to use it defining the highest non-lagging ancestor block
for _, be := range cp.backendGroup.Backends { for _, be := range cp.backendGroup.Backends {
peerCount, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be) peerCount, inSync, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be)
if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { if !be.skipPeerCountCheck && peerCount < cp.minPeerCount {
continue continue
} }
if !inSync {
continue
}
if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
continue continue
} }
...@@ -296,11 +298,14 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { ...@@ -296,11 +298,14 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
// find the highest common ancestor block // find the highest common ancestor block
for _, be := range cp.backendGroup.Backends { for _, be := range cp.backendGroup.Backends {
peerCount, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be) peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be)
if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { if !be.skipPeerCountCheck && peerCount < cp.minPeerCount {
continue continue
} }
if !inSync {
continue
}
if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
continue continue
} }
...@@ -351,12 +356,12 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { ...@@ -351,12 +356,12 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
- not lagging - not lagging
*/ */
peerCount, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be) peerCount, inSync, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be)
notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now())
isBanned := time.Now().Before(bannedUntil) isBanned := time.Now().Before(bannedUntil)
notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount
lagging := latestBlockNumber < proposedBlock lagging := latestBlockNumber < proposedBlock
if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers || lagging { if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers || lagging || !inSync {
filteredBackendsNames = append(filteredBackendsNames, be.Name) filteredBackendsNames = append(filteredBackendsNames, be.Name)
continue continue
} }
...@@ -498,23 +503,25 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo ...@@ -498,23 +503,25 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo
return res, nil return res, nil
} }
func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time, bannedUntil time.Time) { func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time, bannedUntil time.Time) {
bs := cp.backendState[be] bs := cp.backendState[be]
defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
peerCount = bs.peerCount peerCount = bs.peerCount
inSync = bs.inSync
blockNumber = bs.latestBlockNumber blockNumber = bs.latestBlockNumber
blockHash = bs.latestBlockHash blockHash = bs.latestBlockHash
lastUpdate = bs.lastUpdate lastUpdate = bs.lastUpdate
bannedUntil = bs.bannedUntil bannedUntil = bs.bannedUntil
bs.backendStateMux.Unlock()
return return
} }
func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, blockNumber hexutil.Uint64, blockHash string) (changed bool, updateDelay time.Duration) { func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string) (changed bool, updateDelay time.Duration) {
bs := cp.backendState[be] bs := cp.backendState[be]
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
changed = bs.latestBlockHash != blockHash changed = bs.latestBlockHash != blockHash
bs.peerCount = peerCount bs.peerCount = peerCount
bs.inSync = inSync
bs.latestBlockNumber = blockNumber bs.latestBlockNumber = blockNumber
bs.latestBlockHash = blockHash bs.latestBlockHash = blockHash
updateDelay = time.Since(bs.lastUpdate) updateDelay = time.Since(bs.lastUpdate)
......
...@@ -94,6 +94,7 @@ func TestConsensus(t *testing.T) { ...@@ -94,6 +94,7 @@ func TestConsensus(t *testing.T) {
consensusGroup := bg.Consensus.GetConsensusGroup() consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, be) require.NotContains(t, consensusGroup, be)
require.False(t, bg.Consensus.IsBanned(be))
require.Equal(t, 1, len(consensusGroup)) require.Equal(t, 1, len(consensusGroup))
}) })
...@@ -132,6 +133,7 @@ func TestConsensus(t *testing.T) { ...@@ -132,6 +133,7 @@ func TestConsensus(t *testing.T) {
be := backend(bg, "node1") be := backend(bg, "node1")
require.NotNil(t, be) require.NotNil(t, be)
require.NotContains(t, consensusGroup, be) require.NotContains(t, consensusGroup, be)
require.False(t, bg.Consensus.IsBanned(be))
require.Equal(t, 1, len(consensusGroup)) require.Equal(t, 1, len(consensusGroup))
}) })
...@@ -232,6 +234,7 @@ func TestConsensus(t *testing.T) { ...@@ -232,6 +234,7 @@ func TestConsensus(t *testing.T) {
consensusGroup := bg.Consensus.GetConsensusGroup() consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, be) require.NotContains(t, consensusGroup, be)
require.False(t, bg.Consensus.IsBanned(be))
require.Equal(t, 1, len(consensusGroup)) require.Equal(t, 1, len(consensusGroup))
}) })
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment