Commit b353c11f authored by tre's avatar tre

Merge branch 'develop' of https://github.com/ethereum-optimism/optimism into develop

parents 47629fed 382d38b7
---
'@eth-optimism/contracts-periphery': patch
---
Add faucet contract
...@@ -62,7 +62,7 @@ require ( ...@@ -62,7 +62,7 @@ require (
github.com/deckarep/golang-set/v2 v2.1.0 // indirect github.com/deckarep/golang-set/v2 v2.1.0 // indirect
github.com/decred/dcrd/crypto/blake256 v1.0.0 // indirect github.com/decred/dcrd/crypto/blake256 v1.0.0 // indirect
github.com/deepmap/oapi-codegen v1.8.2 // indirect github.com/deepmap/oapi-codegen v1.8.2 // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect github.com/docker/distribution v2.8.2+incompatible // indirect
github.com/docker/go-units v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect github.com/edsrzf/mmap-go v1.1.0 // indirect
github.com/elastic/gosigar v0.14.2 // indirect github.com/elastic/gosigar v0.14.2 // indirect
......
...@@ -124,8 +124,8 @@ github.com/dgraph-io/ristretto v0.0.2 h1:a5WaUrDa0qm0YrAAS1tUykT5El3kt62KNZZeMxQ ...@@ -124,8 +124,8 @@ github.com/dgraph-io/ristretto v0.0.2 h1:a5WaUrDa0qm0YrAAS1tUykT5El3kt62KNZZeMxQ
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dlclark/regexp2 v1.7.0 h1:7lJfhqlPssTb1WQx4yvTHN0uElPEv52sbaECrAQxjAo= github.com/dlclark/regexp2 v1.7.0 h1:7lJfhqlPssTb1WQx4yvTHN0uElPEv52sbaECrAQxjAo=
github.com/docker/distribution v2.8.1+incompatible h1:Q50tZOPR6T/hjNsyc9g8/syEs6bk8XXApsHjKukMl68= github.com/docker/distribution v2.8.2+incompatible h1:T3de5rq0dB1j30rp0sA2rER+m322EBzniBPB6ZIzuh8=
github.com/docker/distribution v2.8.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= github.com/docker/distribution v2.8.2+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
github.com/docker/docker v20.10.24+incompatible h1:Ugvxm7a8+Gz6vqQYQQ2W7GYq5EUPaAiuPgIfVyI3dYE= github.com/docker/docker v20.10.24+incompatible h1:Ugvxm7a8+Gz6vqQYQQ2W7GYq5EUPaAiuPgIfVyI3dYE=
github.com/docker/docker v20.10.24+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/docker v20.10.24+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
......
...@@ -7,13 +7,13 @@ import ( ...@@ -7,13 +7,13 @@ import (
"io" "io"
"math" "math"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
var ( var (
ErrInvalidChannelTimeout = errors.New("channel timeout is less than the safety margin") ErrInvalidChannelTimeout = errors.New("channel timeout is less than the safety margin")
ErrInputTargetReached = errors.New("target amount of input data reached")
ErrMaxFrameIndex = errors.New("max frame index reached (uint16)") ErrMaxFrameIndex = errors.New("max frame index reached (uint16)")
ErrMaxDurationReached = errors.New("max channel duration reached") ErrMaxDurationReached = errors.New("max channel duration reached")
ErrChannelTimeoutClose = errors.New("close to channel timeout") ErrChannelTimeoutClose = errors.New("close to channel timeout")
...@@ -55,19 +55,9 @@ type ChannelConfig struct { ...@@ -55,19 +55,9 @@ type ChannelConfig struct {
SubSafetyMargin uint64 SubSafetyMargin uint64
// The maximum byte-size a frame can have. // The maximum byte-size a frame can have.
MaxFrameSize uint64 MaxFrameSize uint64
// The target number of frames to create per channel. Note that if the
// realized compression ratio is worse than the approximate, more frames may // CompressorConfig contains the configuration for creating new compressors.
// actually be created. This also depends on how close TargetFrameSize is to CompressorConfig compressor.Config
// MaxFrameSize.
TargetFrameSize uint64
// The target number of frames to create in this channel. If the realized
// compression ratio is worse than approxComprRatio, additional leftover
// frame(s) might get created.
TargetNumFrames int
// Approximated compression ratio to assume. Should be slightly smaller than
// average from experiments to avoid the chances of creating a small
// additional leftover frame.
ApproxComprRatio float64
} }
// Check validates the [ChannelConfig] parameters. // Check validates the [ChannelConfig] parameters.
...@@ -96,12 +86,6 @@ func (cc *ChannelConfig) Check() error { ...@@ -96,12 +86,6 @@ func (cc *ChannelConfig) Check() error {
return nil return nil
} }
// InputThreshold calculates the input data threshold in bytes from the given
// parameters.
func (c ChannelConfig) InputThreshold() uint64 {
return uint64(float64(c.TargetNumFrames) * float64(c.TargetFrameSize) / c.ApproxComprRatio)
}
type frameID struct { type frameID struct {
chID derive.ChannelID chID derive.ChannelID
frameNumber uint16 frameNumber uint16
...@@ -142,7 +126,11 @@ type channelBuilder struct { ...@@ -142,7 +126,11 @@ type channelBuilder struct {
// newChannelBuilder creates a new channel builder or returns an error if the // newChannelBuilder creates a new channel builder or returns an error if the
// channel out could not be created. // channel out could not be created.
func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) { func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) {
co, err := derive.NewChannelOut() c, err := cfg.CompressorConfig.NewCompressor()
if err != nil {
return nil, err
}
co, err := derive.NewChannelOut(c)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -209,7 +197,7 @@ func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error ...@@ -209,7 +197,7 @@ func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error
return l1info, fmt.Errorf("converting block to batch: %w", err) return l1info, fmt.Errorf("converting block to batch: %w", err)
} }
if _, err = c.co.AddBatch(batch); errors.Is(err, derive.ErrTooManyRLPBytes) { if _, err = c.co.AddBatch(batch); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.CompressorFullErr) {
c.setFullErr(err) c.setFullErr(err)
return l1info, c.FullErr() return l1info, c.FullErr()
} else if err != nil { } else if err != nil {
...@@ -218,8 +206,8 @@ func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error ...@@ -218,8 +206,8 @@ func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error
c.blocks = append(c.blocks, block) c.blocks = append(c.blocks, block)
c.updateSwTimeout(batch) c.updateSwTimeout(batch)
if c.inputTargetReached() { if err = c.co.FullErr(); err != nil {
c.setFullErr(ErrInputTargetReached) c.setFullErr(err)
// Adding this block still worked, so don't return error, just mark as full // Adding this block still worked, so don't return error, just mark as full
} }
...@@ -293,12 +281,6 @@ func (c *channelBuilder) TimedOut(blockNum uint64) bool { ...@@ -293,12 +281,6 @@ func (c *channelBuilder) TimedOut(blockNum uint64) bool {
return c.timeout != 0 && blockNum >= c.timeout return c.timeout != 0 && blockNum >= c.timeout
} }
// inputTargetReached says whether the target amount of input data has been
// reached in this channel builder. No more blocks can be added afterwards.
func (c *channelBuilder) inputTargetReached() bool {
return uint64(c.co.InputBytes()) >= c.cfg.InputThreshold()
}
// IsFull returns whether the channel is full. // IsFull returns whether the channel is full.
// FullErr returns the reason for the channel being full. // FullErr returns the reason for the channel being full.
func (c *channelBuilder) IsFull() bool { func (c *channelBuilder) IsFull() bool {
...@@ -310,7 +292,7 @@ func (c *channelBuilder) IsFull() bool { ...@@ -310,7 +292,7 @@ func (c *channelBuilder) IsFull() bool {
// //
// It returns a ChannelFullError wrapping one of the following possible reasons // It returns a ChannelFullError wrapping one of the following possible reasons
// for the channel being full: // for the channel being full:
// - ErrInputTargetReached if the target amount of input data has been reached, // - derive.CompressorFullErr if the compressor target has been reached,
// - derive.MaxRLPBytesPerChannel if the general maximum amount of input data // - derive.MaxRLPBytesPerChannel if the general maximum amount of input data
// would have been exceeded by the latest AddBlock call, // would have been exceeded by the latest AddBlock call,
// - ErrMaxFrameIndex if the maximum number of frames has been generated // - ErrMaxFrameIndex if the maximum number of frames has been generated
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
...@@ -28,9 +29,11 @@ var defaultTestChannelConfig = ChannelConfig{ ...@@ -28,9 +29,11 @@ var defaultTestChannelConfig = ChannelConfig{
MaxChannelDuration: 1, MaxChannelDuration: 1,
SubSafetyMargin: 4, SubSafetyMargin: 4,
MaxFrameSize: 120000, MaxFrameSize: 120000,
TargetFrameSize: 100000, CompressorConfig: compressor.Config{
TargetNumFrames: 1, TargetFrameSize: 100000,
ApproxComprRatio: 0.4, TargetNumFrames: 1,
ApproxComprRatio: 0.4,
},
} }
// TestChannelConfig_Check tests the [ChannelConfig] [Check] function. // TestChannelConfig_Check tests the [ChannelConfig] [Check] function.
...@@ -416,7 +419,9 @@ func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) { ...@@ -416,7 +419,9 @@ func TestChannelBuilder_OutputWrongFramePanic(t *testing.T) {
// Mock the internals of `channelBuilder.outputFrame` // Mock the internals of `channelBuilder.outputFrame`
// to construct a single frame // to construct a single frame
co, err := derive.NewChannelOut() c, err := channelConfig.CompressorConfig.NewCompressor()
require.NoError(t, err)
co, err := derive.NewChannelOut(c)
require.NoError(t, err) require.NoError(t, err)
var buf bytes.Buffer var buf bytes.Buffer
fn, err := co.OutputFrame(&buf, channelConfig.MaxFrameSize) fn, err := co.OutputFrame(&buf, channelConfig.MaxFrameSize)
...@@ -483,8 +488,8 @@ func TestChannelBuilder_MaxRLPBytesPerChannel(t *testing.T) { ...@@ -483,8 +488,8 @@ func TestChannelBuilder_MaxRLPBytesPerChannel(t *testing.T) {
t.Parallel() t.Parallel()
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig
channelConfig.MaxFrameSize = derive.MaxRLPBytesPerChannel * 2 channelConfig.MaxFrameSize = derive.MaxRLPBytesPerChannel * 2
channelConfig.TargetFrameSize = derive.MaxRLPBytesPerChannel * 2 channelConfig.CompressorConfig.TargetFrameSize = derive.MaxRLPBytesPerChannel * 2
channelConfig.ApproxComprRatio = 1 channelConfig.CompressorConfig.ApproxComprRatio = 1
// Construct the channel builder // Construct the channel builder
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig)
...@@ -500,9 +505,9 @@ func TestChannelBuilder_MaxRLPBytesPerChannel(t *testing.T) { ...@@ -500,9 +505,9 @@ func TestChannelBuilder_MaxRLPBytesPerChannel(t *testing.T) {
func TestChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T) { func TestChannelBuilder_OutputFramesMaxFrameIndex(t *testing.T) {
channelConfig := defaultTestChannelConfig channelConfig := defaultTestChannelConfig
channelConfig.MaxFrameSize = 24 channelConfig.MaxFrameSize = 24
channelConfig.TargetNumFrames = math.MaxInt channelConfig.CompressorConfig.TargetNumFrames = math.MaxInt
channelConfig.TargetFrameSize = 24 channelConfig.CompressorConfig.TargetFrameSize = 24
channelConfig.ApproxComprRatio = 0 channelConfig.CompressorConfig.ApproxComprRatio = 0
// Continuously add blocks until the max frame index is reached // Continuously add blocks until the max frame index is reached
// This should cause the [channelBuilder.OutputFrames] function // This should cause the [channelBuilder.OutputFrames] function
...@@ -544,9 +549,9 @@ func TestChannelBuilder_AddBlock(t *testing.T) { ...@@ -544,9 +549,9 @@ func TestChannelBuilder_AddBlock(t *testing.T) {
channelConfig.MaxFrameSize = 30 channelConfig.MaxFrameSize = 30
// Configure the Input Threshold params so we observe a full channel // Configure the Input Threshold params so we observe a full channel
channelConfig.TargetFrameSize = 30 channelConfig.CompressorConfig.TargetFrameSize = 30
channelConfig.TargetNumFrames = 2 channelConfig.CompressorConfig.TargetNumFrames = 2
channelConfig.ApproxComprRatio = 1 channelConfig.CompressorConfig.ApproxComprRatio = 1
// Construct the channel builder // Construct the channel builder
cb, err := newChannelBuilder(channelConfig) cb, err := newChannelBuilder(channelConfig)
...@@ -564,7 +569,7 @@ func TestChannelBuilder_AddBlock(t *testing.T) { ...@@ -564,7 +569,7 @@ func TestChannelBuilder_AddBlock(t *testing.T) {
// Since the channel output is full, the next call to AddBlock // Since the channel output is full, the next call to AddBlock
// should return the channel out full error // should return the channel out full error
require.ErrorIs(t, addMiniBlock(cb), ErrInputTargetReached) require.ErrorIs(t, addMiniBlock(cb), derive.CompressorFullErr)
} }
// TestChannelBuilder_Reset tests the [Reset] function // TestChannelBuilder_Reset tests the [Reset] function
...@@ -701,10 +706,10 @@ func TestChannelBuilder_OutputBytes(t *testing.T) { ...@@ -701,10 +706,10 @@ func TestChannelBuilder_OutputBytes(t *testing.T) {
require := require.New(t) require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano())) rng := rand.New(rand.NewSource(time.Now().UnixNano()))
cfg := defaultTestChannelConfig cfg := defaultTestChannelConfig
cfg.TargetFrameSize = 1000 cfg.CompressorConfig.TargetFrameSize = 1000
cfg.MaxFrameSize = 1000 cfg.MaxFrameSize = 1000
cfg.TargetNumFrames = 16 cfg.CompressorConfig.TargetNumFrames = 16
cfg.ApproxComprRatio = 1.0 cfg.CompressorConfig.ApproxComprRatio = 1.0
cb, err := newChannelBuilder(cfg) cb, err := newChannelBuilder(cfg)
require.NoError(err, "newChannelBuilder") require.NoError(err, "newChannelBuilder")
...@@ -713,7 +718,7 @@ func TestChannelBuilder_OutputBytes(t *testing.T) { ...@@ -713,7 +718,7 @@ func TestChannelBuilder_OutputBytes(t *testing.T) {
for { for {
block, _ := dtest.RandomL2Block(rng, rng.Intn(32)) block, _ := dtest.RandomL2Block(rng, rng.Intn(32))
_, err := cb.AddBlock(block) _, err := cb.AddBlock(block)
if errors.Is(err, ErrInputTargetReached) { if errors.Is(err, derive.CompressorFullErr) {
break break
} }
require.NoError(err) require.NoError(err)
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
...@@ -98,9 +99,11 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) { ...@@ -98,9 +99,11 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
TargetFrameSize: 0, MaxFrameSize: 120_000,
MaxFrameSize: 120_000, CompressorConfig: compressor.Config{
ApproxComprRatio: 1.0, TargetFrameSize: 0,
ApproxComprRatio: 1.0,
},
}) })
a := newMiniL2Block(0) a := newMiniL2Block(0)
...@@ -170,9 +173,12 @@ func TestChannelManager_Clear(t *testing.T) { ...@@ -170,9 +173,12 @@ func TestChannelManager_Clear(t *testing.T) {
ChannelTimeout: 10, ChannelTimeout: 10,
// Have to set the max frame size here otherwise the channel builder would not // Have to set the max frame size here otherwise the channel builder would not
// be able to output any frames // be able to output any frames
MaxFrameSize: 24, MaxFrameSize: 24,
TargetFrameSize: 24, CompressorConfig: compressor.Config{
ApproxComprRatio: 1.0, TargetFrameSize: 24,
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
}) })
// Channel Manager state should be empty by default // Channel Manager state should be empty by default
...@@ -331,9 +337,11 @@ func TestChannelManager_TxResend(t *testing.T) { ...@@ -331,9 +337,11 @@ func TestChannelManager_TxResend(t *testing.T) {
log := testlog.Logger(t, log.LvlError) log := testlog.Logger(t, log.LvlError)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
TargetFrameSize: 0, MaxFrameSize: 120_000,
MaxFrameSize: 120_000, CompressorConfig: compressor.Config{
ApproxComprRatio: 1.0, TargetFrameSize: 0,
ApproxComprRatio: 1.0,
},
}) })
a, _ := derivetest.RandomL2Block(rng, 4) a, _ := derivetest.RandomL2Block(rng, 4)
...@@ -372,10 +380,12 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) { ...@@ -372,10 +380,12 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
TargetFrameSize: 0, MaxFrameSize: 100,
MaxFrameSize: 100, ChannelTimeout: 1000,
ApproxComprRatio: 1.0, CompressorConfig: compressor.Config{
ChannelTimeout: 1000, TargetFrameSize: 0,
ApproxComprRatio: 1.0,
},
}) })
a, _ := derivetest.RandomL2Block(rng, 4) a, _ := derivetest.RandomL2Block(rng, 4)
...@@ -397,10 +407,13 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) { ...@@ -397,10 +407,13 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
TargetFrameSize: 0, MaxFrameSize: 100,
MaxFrameSize: 100, ChannelTimeout: 1000,
ApproxComprRatio: 1.0, CompressorConfig: compressor.Config{
ChannelTimeout: 1000, TargetFrameSize: 1,
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
}) })
a := newMiniL2Block(0) a := newMiniL2Block(0)
b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash()) b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash())
...@@ -433,11 +446,13 @@ func TestChannelManagerClosePendingChannel(t *testing.T) { ...@@ -433,11 +446,13 @@ func TestChannelManagerClosePendingChannel(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
TargetNumFrames: 100, MaxFrameSize: 1000,
TargetFrameSize: 1000, ChannelTimeout: 1000,
MaxFrameSize: 1000, CompressorConfig: compressor.Config{
ApproxComprRatio: 1.0, TargetNumFrames: 100,
ChannelTimeout: 1000, TargetFrameSize: 1000,
ApproxComprRatio: 1.0,
},
}) })
a := newMiniL2Block(50_000) a := newMiniL2Block(50_000)
...@@ -476,11 +491,13 @@ func TestChannelManagerCloseAllTxsFailed(t *testing.T) { ...@@ -476,11 +491,13 @@ func TestChannelManagerCloseAllTxsFailed(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit) log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{ ChannelConfig{
TargetNumFrames: 100, MaxFrameSize: 1000,
TargetFrameSize: 1000, ChannelTimeout: 1000,
MaxFrameSize: 1000, CompressorConfig: compressor.Config{
ApproxComprRatio: 1.0, TargetNumFrames: 100,
ChannelTimeout: 1000, TargetFrameSize: 1000,
ApproxComprRatio: 1.0,
},
}) })
a := newMiniL2Block(50_000) a := newMiniL2Block(50_000)
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli" "github.com/urfave/cli"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/flags" "github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-batcher/rpc" "github.com/ethereum-optimism/optimism/op-batcher/rpc"
...@@ -84,23 +85,14 @@ type CLIConfig struct { ...@@ -84,23 +85,14 @@ type CLIConfig struct {
// MaxL1TxSize is the maximum size of a batch tx submitted to L1. // MaxL1TxSize is the maximum size of a batch tx submitted to L1.
MaxL1TxSize uint64 MaxL1TxSize uint64
// TargetL1TxSize is the target size of a batch tx submitted to L1.
TargetL1TxSize uint64
// TargetNumFrames is the target number of frames per channel.
TargetNumFrames int
// ApproxComprRatio is the approximate compression ratio (<= 1.0) of the used
// compression algorithm.
ApproxComprRatio float64
Stopped bool Stopped bool
TxMgrConfig txmgr.CLIConfig TxMgrConfig txmgr.CLIConfig
RPCConfig rpc.CLIConfig RPCConfig rpc.CLIConfig
LogConfig oplog.CLIConfig LogConfig oplog.CLIConfig
MetricsConfig opmetrics.CLIConfig MetricsConfig opmetrics.CLIConfig
PprofConfig oppprof.CLIConfig PprofConfig oppprof.CLIConfig
CompressorConfig compressor.CLIConfig
} }
func (c CLIConfig) Check() error { func (c CLIConfig) Check() error {
...@@ -136,14 +128,12 @@ func NewConfig(ctx *cli.Context) CLIConfig { ...@@ -136,14 +128,12 @@ func NewConfig(ctx *cli.Context) CLIConfig {
MaxPendingTransactions: ctx.GlobalUint64(flags.MaxPendingTransactionsFlag.Name), MaxPendingTransactions: ctx.GlobalUint64(flags.MaxPendingTransactionsFlag.Name),
MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name), MaxChannelDuration: ctx.GlobalUint64(flags.MaxChannelDurationFlag.Name),
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name), MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name),
TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name),
ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name),
Stopped: ctx.GlobalBool(flags.StoppedFlag.Name), Stopped: ctx.GlobalBool(flags.StoppedFlag.Name),
TxMgrConfig: txmgr.ReadCLIConfig(ctx), TxMgrConfig: txmgr.ReadCLIConfig(ctx),
RPCConfig: rpc.ReadCLIConfig(ctx), RPCConfig: rpc.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx), LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx), MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx), PprofConfig: oppprof.ReadCLIConfig(ctx),
CompressorConfig: compressor.ReadCLIConfig(ctx),
} }
} }
...@@ -89,10 +89,8 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri ...@@ -89,10 +89,8 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
ChannelTimeout: rcfg.ChannelTimeout, ChannelTimeout: rcfg.ChannelTimeout,
MaxChannelDuration: cfg.MaxChannelDuration, MaxChannelDuration: cfg.MaxChannelDuration,
SubSafetyMargin: cfg.SubSafetyMargin, SubSafetyMargin: cfg.SubSafetyMargin,
MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version
TargetFrameSize: cfg.TargetL1TxSize - 1, // subtract 1 byte for version CompressorConfig: cfg.CompressorConfig.Config(),
TargetNumFrames: cfg.TargetNumFrames,
ApproxComprRatio: cfg.ApproxComprRatio,
}, },
} }
......
package compressor
import (
"strings"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/urfave/cli"
)
const (
TargetL1TxSizeBytesFlagName = "target-l1-tx-size-bytes"
TargetNumFramesFlagName = "target-num-frames"
ApproxComprRatioFlagName = "approx-compr-ratio"
KindFlagName = "compressor"
)
func CLIFlags(envPrefix string) []cli.Flag {
return []cli.Flag{
cli.Uint64Flag{
Name: TargetL1TxSizeBytesFlagName,
Usage: "The target size of a batch tx submitted to L1.",
Value: 100_000,
EnvVar: opservice.PrefixEnvVar(envPrefix, "TARGET_L1_TX_SIZE_BYTES"),
},
cli.IntFlag{
Name: TargetNumFramesFlagName,
Usage: "The target number of frames to create per channel",
Value: 1,
EnvVar: opservice.PrefixEnvVar(envPrefix, "TARGET_NUM_FRAMES"),
},
cli.Float64Flag{
Name: ApproxComprRatioFlagName,
Usage: "The approximate compression ratio (<= 1.0)",
Value: 0.4,
EnvVar: opservice.PrefixEnvVar(envPrefix, "APPROX_COMPR_RATIO"),
},
cli.StringFlag{
Name: KindFlagName,
Usage: "The type of compressor. Valid options: " + strings.Join(KindKeys, ", "),
EnvVar: opservice.PrefixEnvVar(envPrefix, "COMPRESSOR"),
Value: RatioKind,
},
}
}
type CLIConfig struct {
// TargetL1TxSizeBytes to target when creating channel frames. Note that if the
// realized compression ratio is worse than the approximate, more frames may
// actually be created. This also depends on how close the target is to the
// max frame size.
TargetL1TxSizeBytes uint64
// TargetNumFrames to create in this channel. If the realized compression ratio
// is worse than approxComprRatio, additional leftover frame(s) might get created.
TargetNumFrames int
// ApproxComprRatio to assume. Should be slightly smaller than average from
// experiments to avoid the chances of creating a small additional leftover frame.
ApproxComprRatio float64
// Type of compressor to use. Must be one of KindKeys.
Kind string
}
func (c *CLIConfig) Config() Config {
return Config{
TargetFrameSize: c.TargetL1TxSizeBytes - 1, // subtract 1 byte for version
TargetNumFrames: c.TargetNumFrames,
ApproxComprRatio: c.ApproxComprRatio,
Kind: c.Kind,
}
}
func ReadCLIConfig(ctx *cli.Context) CLIConfig {
return CLIConfig{
Kind: ctx.GlobalString(KindFlagName),
TargetL1TxSizeBytes: ctx.GlobalUint64(TargetL1TxSizeBytesFlagName),
TargetNumFrames: ctx.GlobalInt(TargetNumFramesFlagName),
ApproxComprRatio: ctx.GlobalFloat64(ApproxComprRatioFlagName),
}
}
package compressor
import (
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)
type FactoryFunc func(Config) (derive.Compressor, error)
const RatioKind = "ratio"
const ShadowKind = "shadow"
var Kinds = map[string]FactoryFunc{
RatioKind: NewRatioCompressor,
ShadowKind: NewShadowCompressor,
}
var KindKeys []string
func init() {
for k := range Kinds {
KindKeys = append(KindKeys, k)
}
}
package compressor
import (
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)
type Config struct {
// TargetFrameSize to target when creating channel frames. Note that if the
// realized compression ratio is worse than the approximate, more frames may
// actually be created. This also depends on how close the target is to the
// max frame size.
TargetFrameSize uint64
// TargetNumFrames to create in this channel. If the realized compression ratio
// is worse than approxComprRatio, additional leftover frame(s) might get created.
TargetNumFrames int
// ApproxComprRatio to assume. Should be slightly smaller than average from
// experiments to avoid the chances of creating a small additional leftover frame.
ApproxComprRatio float64
// Kind of compressor to use. Must
Kind string
}
func (c Config) NewCompressor() (derive.Compressor, error) {
if k, ok := Kinds[c.Kind]; ok {
return k(c)
}
// default to RatioCompressor
return Kinds[RatioKind](c)
}
package compressor
import (
"bytes"
"compress/zlib"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)
type RatioCompressor struct {
config Config
inputBytes int
buf bytes.Buffer
compress *zlib.Writer
}
// NewRatioCompressor creates a new derive.Compressor implementation that uses the target
// size and a compression ratio parameter to determine how much data can be written to
// the compressor before it's considered full. The full calculation is as follows:
//
// full = uncompressedLength * approxCompRatio >= targetFrameSize * targetNumFrames
func NewRatioCompressor(config Config) (derive.Compressor, error) {
c := &RatioCompressor{
config: config,
}
compress, err := zlib.NewWriterLevel(&c.buf, zlib.BestCompression)
if err != nil {
return nil, err
}
c.compress = compress
return c, nil
}
func (t *RatioCompressor) Write(p []byte) (int, error) {
if err := t.FullErr(); err != nil {
return 0, err
}
t.inputBytes += len(p)
return t.compress.Write(p)
}
func (t *RatioCompressor) Close() error {
return t.compress.Close()
}
func (t *RatioCompressor) Read(p []byte) (int, error) {
return t.buf.Read(p)
}
func (t *RatioCompressor) Reset() {
t.buf.Reset()
t.compress.Reset(&t.buf)
t.inputBytes = 0
}
func (t *RatioCompressor) Len() int {
return t.buf.Len()
}
func (t *RatioCompressor) Flush() error {
return t.compress.Flush()
}
func (t *RatioCompressor) FullErr() error {
if t.inputTargetReached() {
return derive.CompressorFullErr
}
return nil
}
// InputThreshold calculates the input data threshold in bytes from the given
// parameters.
func (t *RatioCompressor) InputThreshold() uint64 {
return uint64(float64(t.config.TargetNumFrames) * float64(t.config.TargetFrameSize) / t.config.ApproxComprRatio)
}
// inputTargetReached says whether the target amount of input data has been
// reached in this channel builder. No more blocks can be added afterwards.
func (t *RatioCompressor) inputTargetReached() bool {
return uint64(t.inputBytes) >= t.InputThreshold()
}
package batcher_test package compressor_test
import ( import (
"math" "math"
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-batcher/batcher" "github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
...@@ -118,12 +118,13 @@ func TestInputThreshold(t *testing.T) { ...@@ -118,12 +118,13 @@ func TestInputThreshold(t *testing.T) {
// Validate each test case // Validate each test case
for _, tt := range tests { for _, tt := range tests {
config := batcher.ChannelConfig{ comp, err := compressor.NewRatioCompressor(compressor.Config{
TargetFrameSize: tt.input.TargetFrameSize, TargetFrameSize: tt.input.TargetFrameSize,
TargetNumFrames: tt.input.TargetNumFrames, TargetNumFrames: tt.input.TargetNumFrames,
ApproxComprRatio: tt.input.ApproxComprRatio, ApproxComprRatio: tt.input.ApproxComprRatio,
} })
got := config.InputThreshold() require.NoError(t, err)
got := comp.(*compressor.RatioCompressor).InputThreshold()
tt.assertion(got) tt.assertion(got)
} }
} }
package compressor
import (
"bytes"
"compress/zlib"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)
type ShadowCompressor struct {
config Config
buf bytes.Buffer
compress *zlib.Writer
shadowBuf bytes.Buffer
shadowCompress *zlib.Writer
fullErr error
}
// NewShadowCompressor creates a new derive.Compressor implementation that contains two
// compression buffers: one used for size estimation, and one used for the final
// compressed output. The first is flushed on every write, the second isn't, which means
// the final compressed data is always slightly smaller than the target. There is one
// exception to this rule: the first write to the buffer is not checked against the
// target, which allows individual blocks larger than the target to be included (and will
// be split across multiple channel frames).
func NewShadowCompressor(config Config) (derive.Compressor, error) {
c := &ShadowCompressor{
config: config,
}
var err error
c.compress, err = zlib.NewWriterLevel(&c.buf, zlib.BestCompression)
if err != nil {
return nil, err
}
c.shadowCompress, err = zlib.NewWriterLevel(&c.shadowBuf, zlib.BestCompression)
if err != nil {
return nil, err
}
return c, nil
}
func (t *ShadowCompressor) Write(p []byte) (int, error) {
_, err := t.shadowCompress.Write(p)
if err != nil {
return 0, err
}
err = t.shadowCompress.Flush()
if err != nil {
return 0, err
}
if uint64(t.shadowBuf.Len()) > t.config.TargetFrameSize*uint64(t.config.TargetNumFrames) {
t.fullErr = derive.CompressorFullErr
if t.Len() > 0 {
// only return an error if we've already written data to this compressor before
// (otherwise individual blocks over the target would never be written)
return 0, t.fullErr
}
}
return t.compress.Write(p)
}
func (t *ShadowCompressor) Close() error {
return t.compress.Close()
}
func (t *ShadowCompressor) Read(p []byte) (int, error) {
return t.buf.Read(p)
}
func (t *ShadowCompressor) Reset() {
t.buf.Reset()
t.compress.Reset(&t.buf)
t.shadowBuf.Reset()
t.shadowCompress.Reset(&t.shadowBuf)
t.fullErr = nil
}
func (t *ShadowCompressor) Len() int {
return t.buf.Len()
}
func (t *ShadowCompressor) Flush() error {
return t.compress.Flush()
}
func (t *ShadowCompressor) FullErr() error {
return t.fullErr
}
package compressor_test
import (
"bytes"
"compress/zlib"
"io"
"math/rand"
"testing"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/stretchr/testify/require"
)
func randomBytes(t *testing.T, length int) []byte {
b := make([]byte, length)
_, err := rand.Read(b)
require.NoError(t, err)
return b
}
func TestShadowCompressor(t *testing.T) {
type test struct {
name string
targetFrameSize uint64
targetNumFrames int
data [][]byte
errs []error
fullErr error
}
tests := []test{{
name: "no data",
targetFrameSize: 1,
targetNumFrames: 1,
data: [][]byte{},
errs: []error{},
fullErr: nil,
}, {
name: "large first block",
targetFrameSize: 1,
targetNumFrames: 1,
data: [][]byte{bytes.Repeat([]byte{0}, 1024)},
errs: []error{nil},
fullErr: derive.CompressorFullErr,
}, {
name: "large second block",
targetFrameSize: 1,
targetNumFrames: 1,
data: [][]byte{bytes.Repeat([]byte{0}, 512), bytes.Repeat([]byte{0}, 1024)},
errs: []error{nil, derive.CompressorFullErr},
fullErr: derive.CompressorFullErr,
}, {
name: "random data",
targetFrameSize: 1200,
targetNumFrames: 1,
data: [][]byte{randomBytes(t, 512), randomBytes(t, 512), randomBytes(t, 512)},
errs: []error{nil, nil, derive.CompressorFullErr},
fullErr: derive.CompressorFullErr,
}}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
require.Equal(t, len(test.errs), len(test.data), "invalid test case: len(data) != len(errs)")
sc, err := compressor.NewShadowCompressor(compressor.Config{
TargetFrameSize: test.targetFrameSize,
TargetNumFrames: test.targetNumFrames,
})
require.NoError(t, err)
for i, d := range test.data {
_, err = sc.Write(d)
if test.errs[i] != nil {
require.ErrorIs(t, err, test.errs[i])
require.Equal(t, i, len(test.data)-1)
} else {
require.NoError(t, err)
}
}
if test.fullErr != nil {
require.ErrorIs(t, sc.FullErr(), test.fullErr)
} else {
require.NoError(t, sc.FullErr())
}
err = sc.Close()
require.NoError(t, err)
buf, err := io.ReadAll(sc)
require.NoError(t, err)
r, err := zlib.NewReader(bytes.NewBuffer(buf))
require.NoError(t, err)
uncompressed, err := io.ReadAll(r)
require.NoError(t, err)
concat := make([]byte, 0)
for i, d := range test.data {
if test.errs[i] != nil {
break
}
concat = append(concat, d...)
}
require.Equal(t, concat, uncompressed)
})
}
}
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"github.com/urfave/cli" "github.com/urfave/cli"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/rpc" "github.com/ethereum-optimism/optimism/op-batcher/rpc"
opservice "github.com/ethereum-optimism/optimism/op-service" opservice "github.com/ethereum-optimism/optimism/op-service"
oplog "github.com/ethereum-optimism/optimism/op-service/log" oplog "github.com/ethereum-optimism/optimism/op-service/log"
...@@ -67,24 +68,6 @@ var ( ...@@ -67,24 +68,6 @@ var (
Value: 120_000, Value: 120_000,
EnvVar: opservice.PrefixEnvVar(EnvVarPrefix, "MAX_L1_TX_SIZE_BYTES"), EnvVar: opservice.PrefixEnvVar(EnvVarPrefix, "MAX_L1_TX_SIZE_BYTES"),
} }
TargetL1TxSizeBytesFlag = cli.Uint64Flag{
Name: "target-l1-tx-size-bytes",
Usage: "The target size of a batch tx submitted to L1.",
Value: 100_000,
EnvVar: opservice.PrefixEnvVar(EnvVarPrefix, "TARGET_L1_TX_SIZE_BYTES"),
}
TargetNumFramesFlag = cli.IntFlag{
Name: "target-num-frames",
Usage: "The target number of frames to create per channel",
Value: 1,
EnvVar: opservice.PrefixEnvVar(EnvVarPrefix, "TARGET_NUM_FRAMES"),
}
ApproxComprRatioFlag = cli.Float64Flag{
Name: "approx-compr-ratio",
Usage: "The approximate compression ratio (<= 1.0)",
Value: 0.4,
EnvVar: opservice.PrefixEnvVar(EnvVarPrefix, "APPROX_COMPR_RATIO"),
}
StoppedFlag = cli.BoolFlag{ StoppedFlag = cli.BoolFlag{
Name: "stopped", Name: "stopped",
Usage: "Initialize the batcher in a stopped state. The batcher can be started using the admin_startBatcher RPC", Usage: "Initialize the batcher in a stopped state. The batcher can be started using the admin_startBatcher RPC",
...@@ -106,9 +89,6 @@ var optionalFlags = []cli.Flag{ ...@@ -106,9 +89,6 @@ var optionalFlags = []cli.Flag{
MaxPendingTransactionsFlag, MaxPendingTransactionsFlag,
MaxChannelDurationFlag, MaxChannelDurationFlag,
MaxL1TxSizeBytesFlag, MaxL1TxSizeBytesFlag,
TargetL1TxSizeBytesFlag,
TargetNumFramesFlag,
ApproxComprRatioFlag,
StoppedFlag, StoppedFlag,
SequencerHDPathFlag, SequencerHDPathFlag,
} }
...@@ -120,6 +100,7 @@ func init() { ...@@ -120,6 +100,7 @@ func init() {
optionalFlags = append(optionalFlags, oppprof.CLIFlags(EnvVarPrefix)...) optionalFlags = append(optionalFlags, oppprof.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, rpc.CLIFlags(EnvVarPrefix)...) optionalFlags = append(optionalFlags, rpc.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, txmgr.CLIFlags(EnvVarPrefix)...) optionalFlags = append(optionalFlags, txmgr.CLIFlags(EnvVarPrefix)...)
optionalFlags = append(optionalFlags, compressor.CLIFlags(EnvVarPrefix)...)
Flags = append(requiredFlags, optionalFlags...) Flags = append(requiredFlags, optionalFlags...)
} }
......
...@@ -202,6 +202,10 @@ func main() { ...@@ -202,6 +202,10 @@ func main() {
dryRun := ctx.Bool("dry-run") dryRun := ctx.Bool("dry-run")
noCheck := ctx.Bool("no-check") noCheck := ctx.Bool("no-check")
if noCheck {
panic("must run with check on")
}
// Perform the migration // Perform the migration
res, err := genesis.MigrateDB(ldb, config, block, &migrationData, !dryRun, noCheck) res, err := genesis.MigrateDB(ldb, config, block, &migrationData, !dryRun, noCheck)
if err != nil { if err != nil {
......
...@@ -57,7 +57,9 @@ func (w *LegacyWithdrawal) Encode() ([]byte, error) { ...@@ -57,7 +57,9 @@ func (w *LegacyWithdrawal) Encode() ([]byte, error) {
return out, nil return out, nil
} }
// Decode will decode a serialized LegacyWithdrawal // Decode will decode a serialized LegacyWithdrawal. There is a known inconsistency
// where the decoded `msg.sender` is not authenticated. A round trip of encoding and
// decoding with a spoofed withdrawal will result in a different message being recovered.
func (w *LegacyWithdrawal) Decode(data []byte) error { func (w *LegacyWithdrawal) Decode(data []byte) error {
if len(data) < len(predeploys.L2CrossDomainMessengerAddr)+4 { if len(data) < len(predeploys.L2CrossDomainMessengerAddr)+4 {
return fmt.Errorf("withdrawal data too short: %d", len(data)) return fmt.Errorf("withdrawal data too short: %d", len(data))
...@@ -68,6 +70,8 @@ func (w *LegacyWithdrawal) Decode(data []byte) error { ...@@ -68,6 +70,8 @@ func (w *LegacyWithdrawal) Decode(data []byte) error {
return fmt.Errorf("invalid selector: 0x%x", data[0:4]) return fmt.Errorf("invalid selector: 0x%x", data[0:4])
} }
// This should be the L2CrossDomainMessenger address but is not guaranteed
// to be.
msgSender := data[len(data)-len(predeploys.L2CrossDomainMessengerAddr):] msgSender := data[len(data)-len(predeploys.L2CrossDomainMessengerAddr):]
raw := data[4 : len(data)-len(predeploys.L2CrossDomainMessengerAddr)] raw := data[4 : len(data)-len(predeploys.L2CrossDomainMessengerAddr)]
......
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
...@@ -133,7 +134,13 @@ func (s *L2Batcher) Buffer(t Testing) error { ...@@ -133,7 +134,13 @@ func (s *L2Batcher) Buffer(t Testing) error {
if s.l2BatcherCfg.GarbageCfg != nil { if s.l2BatcherCfg.GarbageCfg != nil {
ch, err = NewGarbageChannelOut(s.l2BatcherCfg.GarbageCfg) ch, err = NewGarbageChannelOut(s.l2BatcherCfg.GarbageCfg)
} else { } else {
ch, err = derive.NewChannelOut() c, e := compressor.NewRatioCompressor(compressor.Config{
TargetFrameSize: s.l2BatcherCfg.MaxL1TxSize,
TargetNumFrames: 1,
ApproxComprRatio: 1,
})
require.NoError(t, e, "failed to create compressor")
ch, err = derive.NewChannelOut(c)
} }
require.NoError(t, err, "failed to create channel") require.NoError(t, err, "failed to create channel")
s.l2ChannelOut = ch s.l2ChannelOut = ch
......
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"time" "time"
bss "github.com/ethereum-optimism/optimism/op-batcher/batcher" bss "github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
batchermetrics "github.com/ethereum-optimism/optimism/op-batcher/metrics" batchermetrics "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/chaincfg" "github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/sources"
...@@ -335,12 +336,14 @@ func TestMigration(t *testing.T) { ...@@ -335,12 +336,14 @@ func TestMigration(t *testing.T) {
RollupRpc: rollupNode.HTTPEndpoint(), RollupRpc: rollupNode.HTTPEndpoint(),
MaxChannelDuration: 1, MaxChannelDuration: 1,
MaxL1TxSize: 120_000, MaxL1TxSize: 120_000,
TargetL1TxSize: 100_000, CompressorConfig: compressor.CLIConfig{
TargetNumFrames: 1, TargetL1TxSizeBytes: 100_000,
ApproxComprRatio: 0.4, TargetNumFrames: 1,
SubSafetyMargin: 4, ApproxComprRatio: 0.4,
PollInterval: 50 * time.Millisecond, },
TxMgrConfig: newTxMgrConfig(forkedL1URL, secrets.Batcher), SubSafetyMargin: 4,
PollInterval: 50 * time.Millisecond,
TxMgrConfig: newTxMgrConfig(forkedL1URL, secrets.Batcher),
LogConfig: oplog.CLIConfig{ LogConfig: oplog.CLIConfig{
Level: "info", Level: "info",
Format: "text", Format: "text",
......
...@@ -24,6 +24,7 @@ import ( ...@@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
bss "github.com/ethereum-optimism/optimism/op-batcher/batcher" bss "github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
batchermetrics "github.com/ethereum-optimism/optimism/op-batcher/metrics" batchermetrics "github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-bindings/predeploys" "github.com/ethereum-optimism/optimism/op-bindings/predeploys"
"github.com/ethereum-optimism/optimism/op-chain-ops/genesis" "github.com/ethereum-optimism/optimism/op-chain-ops/genesis"
...@@ -599,12 +600,14 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -599,12 +600,14 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
MaxPendingTransactions: 1, MaxPendingTransactions: 1,
MaxChannelDuration: 1, MaxChannelDuration: 1,
MaxL1TxSize: 120_000, MaxL1TxSize: 120_000,
TargetL1TxSize: 100_000, CompressorConfig: compressor.CLIConfig{
TargetNumFrames: 1, TargetL1TxSizeBytes: 100_000,
ApproxComprRatio: 0.4, TargetNumFrames: 1,
SubSafetyMargin: 4, ApproxComprRatio: 0.4,
PollInterval: 50 * time.Millisecond, },
TxMgrConfig: newTxMgrConfig(sys.Nodes["l1"].WSEndpoint(), cfg.Secrets.Batcher), SubSafetyMargin: 4,
PollInterval: 50 * time.Millisecond,
TxMgrConfig: newTxMgrConfig(sys.Nodes["l1"].WSEndpoint(), cfg.Secrets.Batcher),
LogConfig: oplog.CLIConfig{ LogConfig: oplog.CLIConfig{
Level: "info", Level: "info",
Format: "text", Format: "text",
......
...@@ -37,7 +37,6 @@ func newRPCServer(ctx context.Context, rpcCfg *RPCConfig, rollupCfg *rollup.Conf ...@@ -37,7 +37,6 @@ func newRPCServer(ctx context.Context, rpcCfg *RPCConfig, rollupCfg *rollup.Conf
apis: []rpc.API{{ apis: []rpc.API{{
Namespace: "optimism", Namespace: "optimism",
Service: api, Service: api,
Public: true,
Authenticated: false, Authenticated: false,
}}, }},
appVersion: appVersion, appVersion: appVersion,
...@@ -51,7 +50,6 @@ func (s *rpcServer) EnableAdminAPI(api *adminAPI) { ...@@ -51,7 +50,6 @@ func (s *rpcServer) EnableAdminAPI(api *adminAPI) {
Namespace: "admin", Namespace: "admin",
Version: "", Version: "",
Service: api, Service: api,
Public: true, // TODO: this field is deprecated. Do we even need this anymore?
Authenticated: false, Authenticated: false,
}) })
} }
...@@ -61,7 +59,6 @@ func (s *rpcServer) EnableP2P(backend *p2p.APIBackend) { ...@@ -61,7 +59,6 @@ func (s *rpcServer) EnableP2P(backend *p2p.APIBackend) {
Namespace: p2p.NamespaceRPC, Namespace: p2p.NamespaceRPC,
Version: "", Version: "",
Service: backend, Service: backend,
Public: true,
Authenticated: false, Authenticated: false,
}) })
} }
......
...@@ -2,7 +2,6 @@ package derive ...@@ -2,7 +2,6 @@ package derive
import ( import (
"bytes" "bytes"
"compress/zlib"
"crypto/rand" "crypto/rand"
"errors" "errors"
"fmt" "fmt"
...@@ -25,6 +24,30 @@ var ErrTooManyRLPBytes = errors.New("batch would cause RLP bytes to go over limi ...@@ -25,6 +24,30 @@ var ErrTooManyRLPBytes = errors.New("batch would cause RLP bytes to go over limi
// [Frame Format]: https://github.com/ethereum-optimism/optimism/blob/develop/specs/derivation.md#frame-format // [Frame Format]: https://github.com/ethereum-optimism/optimism/blob/develop/specs/derivation.md#frame-format
const FrameV0OverHeadSize = 23 const FrameV0OverHeadSize = 23
var CompressorFullErr = errors.New("compressor is full")
type Compressor interface {
// Writer is used to write uncompressed data which will be compressed. Should return
// CompressorFullErr if the compressor is full and no more data should be written.
io.Writer
// Closer Close function should be called before reading any data.
io.Closer
// Reader is used to Read compressed data; should only be called after Close.
io.Reader
// Reset will reset all written data
Reset()
// Len returns an estimate of the current length of the compressed data; calling Flush will
// increase the accuracy at the expense of a poorer compression ratio.
Len() int
// Flush flushes any uncompressed data to the compression buffer. This will result in a
// non-optimal compression ratio.
Flush() error
// FullErr returns CompressorFullErr if the compressor is known to be full. Note that
// calls to Write will fail if an error is returned from this method, but calls to Write
// can still return CompressorFullErr even if this does not.
FullErr() error
}
type ChannelOut struct { type ChannelOut struct {
id ChannelID id ChannelID
// Frame ID of the next frame to emit. Increment after emitting // Frame ID of the next frame to emit. Increment after emitting
...@@ -33,9 +56,7 @@ type ChannelOut struct { ...@@ -33,9 +56,7 @@ type ChannelOut struct {
rlpLength int rlpLength int
// Compressor stage. Write input data to it // Compressor stage. Write input data to it
compress *zlib.Writer compress Compressor
// post compression buffer
buf bytes.Buffer
closed bool closed bool
} }
...@@ -44,23 +65,18 @@ func (co *ChannelOut) ID() ChannelID { ...@@ -44,23 +65,18 @@ func (co *ChannelOut) ID() ChannelID {
return co.id return co.id
} }
func NewChannelOut() (*ChannelOut, error) { func NewChannelOut(compress Compressor) (*ChannelOut, error) {
c := &ChannelOut{ c := &ChannelOut{
id: ChannelID{}, // TODO: use GUID here instead of fully random data id: ChannelID{}, // TODO: use GUID here instead of fully random data
frame: 0, frame: 0,
rlpLength: 0, rlpLength: 0,
compress: compress,
} }
_, err := rand.Read(c.id[:]) _, err := rand.Read(c.id[:])
if err != nil { if err != nil {
return nil, err return nil, err
} }
compress, err := zlib.NewWriterLevel(&c.buf, zlib.BestCompression)
if err != nil {
return nil, err
}
c.compress = compress
return c, nil return c, nil
} }
...@@ -68,8 +84,7 @@ func NewChannelOut() (*ChannelOut, error) { ...@@ -68,8 +84,7 @@ func NewChannelOut() (*ChannelOut, error) {
func (co *ChannelOut) Reset() error { func (co *ChannelOut) Reset() error {
co.frame = 0 co.frame = 0
co.rlpLength = 0 co.rlpLength = 0
co.buf.Reset() co.compress.Reset()
co.compress.Reset(&co.buf)
co.closed = false co.closed = false
_, err := rand.Read(co.id[:]) _, err := rand.Read(co.id[:])
return err return err
...@@ -116,7 +131,8 @@ func (co *ChannelOut) AddBatch(batch *BatchData) (uint64, error) { ...@@ -116,7 +131,8 @@ func (co *ChannelOut) AddBatch(batch *BatchData) (uint64, error) {
} }
co.rlpLength += buf.Len() co.rlpLength += buf.Len()
written, err := io.Copy(co.compress, &buf) // avoid using io.Copy here, because we need all or nothing
written, err := co.compress.Write(buf.Bytes())
return uint64(written), err return uint64(written), err
} }
...@@ -129,7 +145,7 @@ func (co *ChannelOut) InputBytes() int { ...@@ -129,7 +145,7 @@ func (co *ChannelOut) InputBytes() int {
// Use `Flush` or `Close` to move data from the compression buffer into the ready buffer if more bytes // Use `Flush` or `Close` to move data from the compression buffer into the ready buffer if more bytes
// are needed. Add blocks may add to the ready buffer, but it is not guaranteed due to the compression stage. // are needed. Add blocks may add to the ready buffer, but it is not guaranteed due to the compression stage.
func (co *ChannelOut) ReadyBytes() int { func (co *ChannelOut) ReadyBytes() int {
return co.buf.Len() return co.compress.Len()
} }
// Flush flushes the internal compression stage to the ready buffer. It enables pulling a larger & more // Flush flushes the internal compression stage to the ready buffer. It enables pulling a larger & more
...@@ -138,6 +154,10 @@ func (co *ChannelOut) Flush() error { ...@@ -138,6 +154,10 @@ func (co *ChannelOut) Flush() error {
return co.compress.Flush() return co.compress.Flush()
} }
func (co *ChannelOut) FullErr() error {
return co.compress.FullErr()
}
func (co *ChannelOut) Close() error { func (co *ChannelOut) Close() error {
if co.closed { if co.closed {
return errors.New("already closed") return errors.New("already closed")
...@@ -166,8 +186,8 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, erro ...@@ -166,8 +186,8 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, erro
// Copy data from the local buffer into the frame data buffer // Copy data from the local buffer into the frame data buffer
maxDataSize := maxSize - FrameV0OverHeadSize maxDataSize := maxSize - FrameV0OverHeadSize
if maxDataSize > uint64(co.buf.Len()) { if maxDataSize > uint64(co.compress.Len()) {
maxDataSize = uint64(co.buf.Len()) maxDataSize = uint64(co.compress.Len())
// If we are closed & will not spill past the current frame // If we are closed & will not spill past the current frame
// mark it is the final frame of the channel. // mark it is the final frame of the channel.
if co.closed { if co.closed {
...@@ -176,7 +196,7 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, erro ...@@ -176,7 +196,7 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, erro
} }
f.Data = make([]byte, maxDataSize) f.Data = make([]byte, maxDataSize)
if _, err := io.ReadFull(&co.buf, f.Data); err != nil { if _, err := io.ReadFull(co.compress, f.Data); err != nil {
return 0, err return 0, err
} }
......
...@@ -11,8 +11,25 @@ import ( ...@@ -11,8 +11,25 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
// basic implementation of the Compressor interface that does no compression
type nonCompressor struct {
bytes.Buffer
}
func (s *nonCompressor) Flush() error {
return nil
}
func (s *nonCompressor) Close() error {
return nil
}
func (s *nonCompressor) FullErr() error {
return nil
}
func TestChannelOutAddBlock(t *testing.T) { func TestChannelOutAddBlock(t *testing.T) {
cout, err := NewChannelOut() cout, err := NewChannelOut(&nonCompressor{})
require.NoError(t, err) require.NoError(t, err)
t.Run("returns err if first tx is not an l1info tx", func(t *testing.T) { t.Run("returns err if first tx is not an l1info tx", func(t *testing.T) {
...@@ -33,7 +50,7 @@ func TestChannelOutAddBlock(t *testing.T) { ...@@ -33,7 +50,7 @@ func TestChannelOutAddBlock(t *testing.T) {
// max size that is below the fixed frame size overhead of 23, will return // max size that is below the fixed frame size overhead of 23, will return
// an error. // an error.
func TestOutputFrameSmallMaxSize(t *testing.T) { func TestOutputFrameSmallMaxSize(t *testing.T) {
cout, err := NewChannelOut() cout, err := NewChannelOut(&nonCompressor{})
require.NoError(t, err) require.NoError(t, err)
// Call OutputFrame with the range of small max size values that err // Call OutputFrame with the range of small max size values that err
......
package l2 package l2
import ( import (
"fmt"
"math/big" "math/big"
"testing" "testing"
...@@ -54,7 +55,7 @@ func TestGet(t *testing.T) { ...@@ -54,7 +55,7 @@ func TestGet(t *testing.T) {
db := NewOracleBackedDB(oracle) db := NewOracleBackedDB(oracle)
key := make([]byte, common.HashLength) key := make([]byte, common.HashLength)
copy(rawdb.CodePrefix, key) copy(rawdb.CodePrefix, key)
println(key[0]) fmt.Println(key[0])
expected := []byte{1, 2, 3} expected := []byte{1, 2, 3}
oracle.Data[common.BytesToHash(key)] = expected oracle.Data[common.BytesToHash(key)] = expected
val, err := db.Get(key) val, err := db.Get(key)
......
...@@ -56,6 +56,7 @@ func Run(l1RpcUrl string, l2RpcUrl string, l2OracleAddr common.Address) error { ...@@ -56,6 +56,7 @@ func Run(l1RpcUrl string, l2RpcUrl string, l2OracleAddr common.Address) error {
if err != nil { if err != nil {
return fmt.Errorf("get l2 safe head: %w", err) return fmt.Errorf("get l2 safe head: %w", err)
} }
fmt.Printf("Found L2 finalized head number: %v hash: %v\n", l2FinalizedHead.NumberU64(), l2FinalizedHead.Hash())
// Find L1 finalized block. Can't be re-orged and must contain all batches for the L2 finalized block // Find L1 finalized block. Can't be re-orged and must contain all batches for the L2 finalized block
l1BlockNum := big.NewInt(int64(rpc.FinalizedBlockNumber)) l1BlockNum := big.NewInt(int64(rpc.FinalizedBlockNumber))
...@@ -63,18 +64,28 @@ func Run(l1RpcUrl string, l2RpcUrl string, l2OracleAddr common.Address) error { ...@@ -63,18 +64,28 @@ func Run(l1RpcUrl string, l2RpcUrl string, l2OracleAddr common.Address) error {
if err != nil { if err != nil {
return fmt.Errorf("find L1 head: %w", err) return fmt.Errorf("find L1 head: %w", err)
} }
fmt.Printf("Found l1 head block number: %v hash: %v\n", l1HeadBlock.NumberU64(), l1HeadBlock.Hash())
// Get the most published L2 output from before the finalized block // Get the most published L2 output from before the finalized block
callOpts := &bind.CallOpts{Context: ctx} callOpts := &bind.CallOpts{Context: ctx}
outputIndex, err := outputOracle.GetL2OutputIndexAfter(callOpts, l2FinalizedHead.Number()) outputIndex, err := outputOracle.GetL2OutputIndexAfter(callOpts, l2FinalizedHead.Number())
if err != nil { if err != nil {
return fmt.Errorf("get output index after finalized block: %w", err) fmt.Println("Failed to get output index after finalized block. Checking latest output", "finalized", l2FinalizedHead.Number(), "err", err)
outputIndex, err = outputOracle.LatestOutputIndex(callOpts)
if err != nil {
return fmt.Errorf("get latest output index: %w", err)
}
} else {
outputIndex = outputIndex.Sub(outputIndex, big.NewInt(1))
} }
outputIndex = outputIndex.Sub(outputIndex, big.NewInt(1))
output, err := outputOracle.GetL2Output(callOpts, outputIndex) output, err := outputOracle.GetL2Output(callOpts, outputIndex)
if err != nil { if err != nil {
return fmt.Errorf("retrieve latest output: %w", err) return fmt.Errorf("retrieve latest output: %w", err)
} }
// Check we wound up with an output prior to the finalized block
if output.L2BlockNumber.Uint64() > l2FinalizedHead.NumberU64() {
return fmt.Errorf("selected output is after finalized head output block: %v, finalized block: %v", output.L2BlockNumber.Uint64(), l2FinalizedHead.NumberU64())
}
l1Head := l1HeadBlock.Hash() l1Head := l1HeadBlock.Hash()
l2Claim := common.Hash(output.OutputRoot) l2Claim := common.Hash(output.OutputRoot)
...@@ -98,7 +109,7 @@ func Run(l1RpcUrl string, l2RpcUrl string, l2OracleAddr common.Address) error { ...@@ -98,7 +109,7 @@ func Run(l1RpcUrl string, l2RpcUrl string, l2OracleAddr common.Address) error {
defer func() { defer func() {
err := os.RemoveAll(temp) err := os.RemoveAll(temp)
if err != nil { if err != nil {
println("Failed to remove temp dir:" + err.Error()) fmt.Println("Failed to remove temp dir:" + err.Error())
} }
}() }()
fmt.Printf("Using temp dir: %s\n", temp) fmt.Printf("Using temp dir: %s\n", temp)
......
{
"finalSystemOwner": "0x62790eFcB3a5f3A5D398F95B47930A9Addd83807",
"portalGuardian": "0x62790eFcB3a5f3A5D398F95B47930A9Addd83807",
"controller": "0x2d30335B0b807bBa1682C487BaAFD2Ad6da5D675",
"l1StartingBlockTag": "0x4104895a540d87127ff11eef0d51d8f63ce00a6fc211db751a45a4b3a61a9c83",
"l1ChainID": 5,
"l2ChainID": 420,
"l2BlockTime": 2,
"maxSequencerDrift": 1200,
"sequencerWindowSize": 3600,
"channelTimeout": 120,
"p2pSequencerAddress": "0xCBABF46d40982B4530c0EAc9889f6e44e17f0383",
"batchInboxAddress": "0xff00000000000000000000000000000000000420",
"batchSenderAddress": "0x3a2baA0160275024A50C1be1FC677375E7DB4Bd7",
"l2OutputOracleSubmissionInterval": 20,
"l2OutputOracleStartingTimestamp": 1670625264,
"l2OutputOracleStartingBlockNumber": 3324764,
"l2OutputOracleProposer": "0x88BCa4Af3d950625752867f826E073E337076581",
"l2OutputOracleChallenger": "0x88BCa4Af3d950625752867f826E073E337076581",
"finalizationPeriodSeconds": 2,
"proxyAdminOwner": "0x62790eFcB3a5f3A5D398F95B47930A9Addd83807",
"governanceTokenName": "Optimism",
"governanceTokenSymbol": "OP",
"governanceTokenOwner": "0x038a8825A3C3B0c08d52Cc76E5E361953Cf6Dc76",
"l2GenesisBlockGasLimit": "0x1c9c380",
"l2GenesisBlockCoinbase": "0x4200000000000000000000000000000000000011",
"l2GenesisBlockBaseFeePerGas": "0x3b9aca00",
"gasPriceOracleOverhead": 2100,
"gasPriceOracleScalar": 1000000,
"eip1559Denominator": 50,
"eip1559Elasticity": 10
}
import { DeployConfig } from '../src/deploy-config'
import config from './final-migration-rehearsal.json'
export default config satisfies DeployConfig
...@@ -93,12 +93,6 @@ const config: HardhatUserConfig = { ...@@ -93,12 +93,6 @@ const config: HardhatUserConfig = {
accounts: [process.env.PRIVATE_KEY_DEPLOYER || ethers.constants.HashZero], accounts: [process.env.PRIVATE_KEY_DEPLOYER || ethers.constants.HashZero],
live: true, live: true,
}, },
'final-migration-rehearsal': {
chainId: 5,
url: process.env.L1_RPC || '',
accounts: [process.env.PRIVATE_KEY_DEPLOYER || ethers.constants.HashZero],
live: true,
},
'internal-devnet': { 'internal-devnet': {
chainId: 5, chainId: 5,
url: process.env.L1_RPC || '', url: process.env.L1_RPC || '',
...@@ -149,10 +143,6 @@ const config: HardhatUserConfig = { ...@@ -149,10 +143,6 @@ const config: HardhatUserConfig = {
'../contracts/deployments/goerli', '../contracts/deployments/goerli',
'../contracts-periphery/deployments/goerli', '../contracts-periphery/deployments/goerli',
], ],
'final-migration-rehearsal': [
'../contracts/deployments/goerli',
'../contracts-periphery/deployments/goerli',
],
}, },
}, },
solidity: { solidity: {
......
...@@ -16,8 +16,13 @@ yarn build ...@@ -16,8 +16,13 @@ yarn build
## Running the service ## Running the service
Copy `.env.example` into a new file named `.env`, then set the environment variables listed there. Copy `.env.example` into a new file named `.env`, then set the environment variables listed there. Additional env setting are listed on `--help`. If running the fault detector against
Once your environment variables have been set, run the service via: a custom op chain, the necessary contract addresses must also be set associated with the op-chain.
- Bedrock: `OptimismPortal`
- Legacy: `StateCommitmentChain`
Once your environment variables or flags have been set, run the service via:
``` ```
yarn start yarn start
...@@ -34,8 +39,14 @@ yarn start ...@@ -34,8 +39,14 @@ yarn start
The `fault-detector` detects differences between the transaction results generated by your local Optimism node and the transaction results actually published to Ethereum. The `fault-detector` detects differences between the transaction results generated by your local Optimism node and the transaction results actually published to Ethereum.
Currently, transaction results take the form of [the root of the Optimism state trie](https://medium.com/@eiki1212/ethereum-state-trie-architecture-explained-a30237009d4e). Currently, transaction results take the form of [the root of the Optimism state trie](https://medium.com/@eiki1212/ethereum-state-trie-architecture-explained-a30237009d4e).
For every Optimism block, the state root of the block is published to the [`StateCommitmentChain`](https://github.com/ethereum-optimism/optimism/blob/39b7262cc3ffd78cd314341b8512b2683c1d9af7/packages/contracts/contracts/L1/rollup/StateCommitmentChain.sol) contract on Ethereum.
- For bedrock chains, the state root of the block is published to the [`L2OutputOracle`](https://github.com/ethereum-optimism/optimism/blob/39b7262cc3ffd78cd314341b8512b2683c1d9af7/packages/contracts-bedrock/contracts/L1/L2OutputOracle.sol) contract on Ethereum.
- ***Note***: The service accepts the `OptimismPortal` as a flag instead of the `L2OutputOracle` for backwards compatibility with early versions of these contracts. The `L2OutputOracle`
is inferred from the portal contract.
- For bedrock chains, the state root of the block is published to the [`StateCommitmentChain`](https://github.com/ethereum-optimism/optimism/blob/39b7262cc3ffd78cd314341b8512b2683c1d9af7/packages/contracts/contracts/L1/rollup/StateCommitmentChain.sol) contract on Ethereum.
We can therefore detect differences by, for each block, checking the state root of the given block as reported by an Optimism node and the state root as published to Ethereum. We can therefore detect differences by, for each block, checking the state root of the given block as reported by an Optimism node and the state root as published to Ethereum.
In order for the fault detector to differentiate between bedrock and legacy chains, please make sure to specify `--bedrock`.
We export a series of Prometheus metrics that you can use to trigger alerting when issues are detected. We export a series of Prometheus metrics that you can use to trigger alerting when issues are detected.
Check the list of available metrics via `yarn start --help`: Check the list of available metrics via `yarn start --help`:
...@@ -52,6 +63,9 @@ Options: ...@@ -52,6 +63,9 @@ Options:
--startbatchindex Batch index to start checking from. Setting it to -1 will cause the fault detector to find the first state batch index that has not yet passed the fault proof window (env: FAULT_DETECTOR__START_BATCH_INDEX, default value: -1) --startbatchindex Batch index to start checking from. Setting it to -1 will cause the fault detector to find the first state batch index that has not yet passed the fault proof window (env: FAULT_DETECTOR__START_BATCH_INDEX, default value: -1)
--loopintervalms Loop interval in milliseconds (env: FAULT_DETECTOR__LOOP_INTERVAL_MS) --loopintervalms Loop interval in milliseconds (env: FAULT_DETECTOR__LOOP_INTERVAL_MS)
--bedrock Whether or not the service is running against a Bedrock chain (env: FAULT_DETECTOR__BEDROCK, default value: false) --bedrock Whether or not the service is running against a Bedrock chain (env: FAULT_DETECTOR__BEDROCK, default value: false)
--optimismportaladdress [Custom Bedrock Chains] Deployed OptimismPortal contract address. Used to retrieve necessary info for ouput verification (env: FAULT_DETECTOR__OPTIMISM_PORTAL_ADDRESS, default 0x0)
--statecommitmentchainaddress [Custom Legacy Chains] Deployed StateCommitmentChain contract address. Used to fetch necessary info for output verification. (env: FAULT_DETECTOR__STATE_COMMITMENT_CHAIN_ADDRESS, default 0x0)
--port Port for the app server (env: FAULT_DETECTOR__PORT) --port Port for the app server (env: FAULT_DETECTOR__PORT)
--hostname Hostname for the app server (env: FAULT_DETECTOR__HOSTNAME) --hostname Hostname for the app server (env: FAULT_DETECTOR__HOSTNAME)
-h, --help display help for command -h, --help display help for command
......
...@@ -7,7 +7,13 @@ import { ...@@ -7,7 +7,13 @@ import {
waitForProvider, waitForProvider,
} from '@eth-optimism/common-ts' } from '@eth-optimism/common-ts'
import { getChainId, sleep, toRpcHexString } from '@eth-optimism/core-utils' import { getChainId, sleep, toRpcHexString } from '@eth-optimism/core-utils'
import { CrossChainMessenger } from '@eth-optimism/sdk' import {
CONTRACT_ADDRESSES,
CrossChainMessenger,
getOEContract,
L2ChainID,
OEL1ContractsLike,
} from '@eth-optimism/sdk'
import { Provider } from '@ethersproject/abstract-provider' import { Provider } from '@ethersproject/abstract-provider'
import { ethers, Transaction } from 'ethers' import { ethers, Transaction } from 'ethers'
import dateformat from 'dateformat' import dateformat from 'dateformat'
...@@ -26,6 +32,8 @@ type Options = { ...@@ -26,6 +32,8 @@ type Options = {
l2RpcProvider: Provider l2RpcProvider: Provider
startBatchIndex: number startBatchIndex: number
bedrock: boolean bedrock: boolean
optimismPortalAddress?: string
stateCommitmentChainAddress?: string
} }
type Metrics = { type Metrics = {
...@@ -73,6 +81,18 @@ export class FaultDetector extends BaseServiceV2<Options, Metrics, State> { ...@@ -73,6 +81,18 @@ export class FaultDetector extends BaseServiceV2<Options, Metrics, State> {
desc: 'Whether or not the service is running against a Bedrock chain', desc: 'Whether or not the service is running against a Bedrock chain',
public: true, public: true,
}, },
optimismPortalAddress: {
validator: validators.str,
default: ethers.constants.AddressZero,
desc: '[Custom Bedrock Chains] Deployed OptimismPortal contract address. Used to retrieve necessary info for ouput verification ',
public: true,
},
stateCommitmentChainAddress: {
validator: validators.str,
default: ethers.constants.AddressZero,
desc: '[Custom Legacy Chains] Deployed StateCommitmentChain contract address. Used to fetch necessary info for output verification.',
public: true,
},
}, },
metricsSpec: { metricsSpec: {
highestBatchIndex: { highestBatchIndex: {
...@@ -93,6 +113,83 @@ export class FaultDetector extends BaseServiceV2<Options, Metrics, State> { ...@@ -93,6 +113,83 @@ export class FaultDetector extends BaseServiceV2<Options, Metrics, State> {
}) })
} }
/**
* Provides the required set of addresses used by the fault detector. For recognized op-chains, this
* will fallback to the pre-defined set of addresses from options, otherwise aborting if unset.
*
* Required Contracts
* - Bedrock: OptimismPortal (used to also fetch L2OutputOracle address variable). This is the preferred address
* since in early versions of bedrock, OptimismPortal holds the FINALIZATION_WINDOW variable instead of L2OutputOracle.
* The retrieved L2OutputOracle address from OptimismPortal is used to query for output roots.
* - Legacy: StateCommitmentChain to query for output roots.
*
* @param l2ChainId op chain id
* @returns OEL1ContractsLike set of L1 contracts with only the required addresses set
*/
async getOEL1Contracts(l2ChainId: number): Promise<OEL1ContractsLike> {
// CrossChainMessenger requires all address to be defined. Default to `AddressZero` to ignore unused contracts
let contracts: OEL1ContractsLike = {
AddressManager: ethers.constants.AddressZero,
L1CrossDomainMessenger: ethers.constants.AddressZero,
L1StandardBridge: ethers.constants.AddressZero,
StateCommitmentChain: ethers.constants.AddressZero,
CanonicalTransactionChain: ethers.constants.AddressZero,
BondManager: ethers.constants.AddressZero,
OptimismPortal: ethers.constants.AddressZero,
L2OutputOracle: ethers.constants.AddressZero,
}
const chainType = this.options.bedrock ? 'bedrock' : 'legacy'
this.logger.info(`Setting contracts for OP chain type: ${chainType}`)
const knownChainId = L2ChainID[l2ChainId] !== undefined
if (knownChainId) {
this.logger.info(`Recognized L2 chain id ${L2ChainID[l2ChainId]}`)
// fallback to the predefined defaults for this chain id
contracts = CONTRACT_ADDRESSES[l2ChainId].l1
}
this.logger.info('checking contract address options...')
if (this.options.bedrock) {
const address = this.options.optimismPortalAddress
if (!knownChainId && address === ethers.constants.AddressZero) {
this.logger.error('OptimismPortal contract unspecified')
throw new Error(
'--optimismportalcontractaddress needs to set for custom bedrock op chains'
)
}
if (address !== ethers.constants.AddressZero) {
this.logger.info('set OptimismPortal contract override')
contracts.OptimismPortal = address
this.logger.info('fetching L2OutputOracle contract from OptimismPortal')
const opts = { address, signerOrProvider: this.options.l1RpcProvider }
const portalContract = getOEContract('OptimismPortal', l2ChainId, opts)
contracts.L2OutputOracle = await portalContract.L2_ORACLE()
}
// ... for a known chain ids without an override, the L2OutputOracle will already
// be set via the hardcoded default
} else {
const address = this.options.stateCommitmentChainAddress
if (!knownChainId && address === ethers.constants.AddressZero) {
this.logger.error('StateCommitmentChain contract unspecified')
throw new Error(
'--statecommitmentchainaddress needs to set for custom legacy op chains'
)
}
if (address !== ethers.constants.AddressZero) {
this.logger.info('set StateCommitmentChain contract override')
contracts.StateCommitmentChain = address
}
}
return contracts
}
async init(): Promise<void> { async init(): Promise<void> {
// Connect to L1. // Connect to L1.
await waitForProvider(this.options.l1RpcProvider, { await waitForProvider(this.options.l1RpcProvider, {
...@@ -106,12 +203,15 @@ export class FaultDetector extends BaseServiceV2<Options, Metrics, State> { ...@@ -106,12 +203,15 @@ export class FaultDetector extends BaseServiceV2<Options, Metrics, State> {
name: 'L2', name: 'L2',
}) })
const l1ChainId = await getChainId(this.options.l1RpcProvider)
const l2ChainId = await getChainId(this.options.l2RpcProvider)
this.state.messenger = new CrossChainMessenger({ this.state.messenger = new CrossChainMessenger({
l1SignerOrProvider: this.options.l1RpcProvider, l1SignerOrProvider: this.options.l1RpcProvider,
l2SignerOrProvider: this.options.l2RpcProvider, l2SignerOrProvider: this.options.l2RpcProvider,
l1ChainId: await getChainId(this.options.l1RpcProvider), l1ChainId,
l2ChainId: await getChainId(this.options.l2RpcProvider), l2ChainId,
bedrock: this.options.bedrock, bedrock: this.options.bedrock,
contracts: { l1: await this.getOEL1Contracts(l2ChainId) },
}) })
// Not diverged by default. // Not diverged by default.
......
...@@ -44,6 +44,7 @@ type backendState struct { ...@@ -44,6 +44,7 @@ type backendState struct {
latestBlockNumber hexutil.Uint64 latestBlockNumber hexutil.Uint64
latestBlockHash string latestBlockHash string
peerCount uint64 peerCount uint64
inSync bool
lastUpdate time.Time lastUpdate time.Time
...@@ -215,7 +216,13 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { ...@@ -215,7 +216,13 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
RecordConsensusBackendBanned(be, banned) RecordConsensusBackendBanned(be, banned)
if banned { if banned {
log.Debug("skipping backend banned", "backend", be.Name) log.Debug("skipping backend - banned", "backend", be.Name)
return
}
// if backend exhausted rate limit we'll skip it for now
if be.IsRateLimited() {
log.Debug("skipping backend - rate limited", "backend", be.Name)
return return
} }
...@@ -228,24 +235,16 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { ...@@ -228,24 +235,16 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
// if backend it not in sync we'll check again after ban // if backend it not in sync we'll check again after ban
inSync, err := cp.isInSync(ctx, be) inSync, err := cp.isInSync(ctx, be)
if err != nil || !inSync { RecordConsensusBackendInSync(be, err == nil && inSync)
log.Warn("backend banned - not in sync", "backend", be.Name) if err != nil {
cp.Ban(be) log.Warn("error updating backend sync state", "name", be.Name, "err", err)
return
}
RecordConsensusBackendInSync(be, inSync)
// if backend exhausted rate limit we'll skip it for now
if be.IsRateLimited() {
return
} }
var peerCount uint64 var peerCount uint64
if !be.skipPeerCountCheck { if !be.skipPeerCountCheck {
peerCount, err = cp.getPeerCount(ctx, be) peerCount, err = cp.getPeerCount(ctx, be)
if err != nil { if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err) log.Warn("error updating backend peer count", "name", be.Name, "err", err)
return
} }
RecordConsensusBackendPeerCount(be, peerCount) RecordConsensusBackendPeerCount(be, peerCount)
} }
...@@ -253,10 +252,9 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { ...@@ -253,10 +252,9 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest") latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest")
if err != nil { if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err) log.Warn("error updating backend", "name", be.Name, "err", err)
return
} }
changed, updateDelay := cp.setBackendState(be, peerCount, latestBlockNumber, latestBlockHash) changed, updateDelay := cp.setBackendState(be, peerCount, inSync, latestBlockNumber, latestBlockHash)
if changed { if changed {
RecordBackendLatestBlock(be, latestBlockNumber) RecordBackendLatestBlock(be, latestBlockNumber)
...@@ -264,6 +262,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { ...@@ -264,6 +262,7 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
log.Debug("backend state updated", log.Debug("backend state updated",
"name", be.Name, "name", be.Name,
"peerCount", peerCount, "peerCount", peerCount,
"inSync", inSync,
"latestBlockNumber", latestBlockNumber, "latestBlockNumber", latestBlockNumber,
"latestBlockHash", latestBlockHash, "latestBlockHash", latestBlockHash,
"updateDelay", updateDelay) "updateDelay", updateDelay)
...@@ -280,11 +279,14 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { ...@@ -280,11 +279,14 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
// find the highest block, in order to use it defining the highest non-lagging ancestor block // find the highest block, in order to use it defining the highest non-lagging ancestor block
for _, be := range cp.backendGroup.Backends { for _, be := range cp.backendGroup.Backends {
peerCount, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be) peerCount, inSync, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be)
if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { if !be.skipPeerCountCheck && peerCount < cp.minPeerCount {
continue continue
} }
if !inSync {
continue
}
if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
continue continue
} }
...@@ -296,11 +298,14 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { ...@@ -296,11 +298,14 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
// find the highest common ancestor block // find the highest common ancestor block
for _, be := range cp.backendGroup.Backends { for _, be := range cp.backendGroup.Backends {
peerCount, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be) peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be)
if !be.skipPeerCountCheck && peerCount < cp.minPeerCount { if !be.skipPeerCountCheck && peerCount < cp.minPeerCount {
continue continue
} }
if !inSync {
continue
}
if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) { if lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) {
continue continue
} }
...@@ -351,12 +356,12 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { ...@@ -351,12 +356,12 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
- not lagging - not lagging
*/ */
peerCount, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be) peerCount, inSync, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be)
notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now()) notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now())
isBanned := time.Now().Before(bannedUntil) isBanned := time.Now().Before(bannedUntil)
notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount
lagging := latestBlockNumber < proposedBlock lagging := latestBlockNumber < proposedBlock
if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers || lagging { if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers || lagging || !inSync {
filteredBackendsNames = append(filteredBackendsNames, be.Name) filteredBackendsNames = append(filteredBackendsNames, be.Name)
continue continue
} }
...@@ -498,23 +503,25 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo ...@@ -498,23 +503,25 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo
return res, nil return res, nil
} }
func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time, bannedUntil time.Time) { func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time, bannedUntil time.Time) {
bs := cp.backendState[be] bs := cp.backendState[be]
defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
peerCount = bs.peerCount peerCount = bs.peerCount
inSync = bs.inSync
blockNumber = bs.latestBlockNumber blockNumber = bs.latestBlockNumber
blockHash = bs.latestBlockHash blockHash = bs.latestBlockHash
lastUpdate = bs.lastUpdate lastUpdate = bs.lastUpdate
bannedUntil = bs.bannedUntil bannedUntil = bs.bannedUntil
bs.backendStateMux.Unlock()
return return
} }
func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, blockNumber hexutil.Uint64, blockHash string) (changed bool, updateDelay time.Duration) { func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string) (changed bool, updateDelay time.Duration) {
bs := cp.backendState[be] bs := cp.backendState[be]
bs.backendStateMux.Lock() bs.backendStateMux.Lock()
changed = bs.latestBlockHash != blockHash changed = bs.latestBlockHash != blockHash
bs.peerCount = peerCount bs.peerCount = peerCount
bs.inSync = inSync
bs.latestBlockNumber = blockNumber bs.latestBlockNumber = blockNumber
bs.latestBlockHash = blockHash bs.latestBlockHash = blockHash
updateDelay = time.Since(bs.lastUpdate) updateDelay = time.Since(bs.lastUpdate)
......
...@@ -94,6 +94,7 @@ func TestConsensus(t *testing.T) { ...@@ -94,6 +94,7 @@ func TestConsensus(t *testing.T) {
consensusGroup := bg.Consensus.GetConsensusGroup() consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, be) require.NotContains(t, consensusGroup, be)
require.False(t, bg.Consensus.IsBanned(be))
require.Equal(t, 1, len(consensusGroup)) require.Equal(t, 1, len(consensusGroup))
}) })
...@@ -132,6 +133,7 @@ func TestConsensus(t *testing.T) { ...@@ -132,6 +133,7 @@ func TestConsensus(t *testing.T) {
be := backend(bg, "node1") be := backend(bg, "node1")
require.NotNil(t, be) require.NotNil(t, be)
require.NotContains(t, consensusGroup, be) require.NotContains(t, consensusGroup, be)
require.False(t, bg.Consensus.IsBanned(be))
require.Equal(t, 1, len(consensusGroup)) require.Equal(t, 1, len(consensusGroup))
}) })
...@@ -232,6 +234,7 @@ func TestConsensus(t *testing.T) { ...@@ -232,6 +234,7 @@ func TestConsensus(t *testing.T) {
consensusGroup := bg.Consensus.GetConsensusGroup() consensusGroup := bg.Consensus.GetConsensusGroup()
require.NotContains(t, consensusGroup, be) require.NotContains(t, consensusGroup, be)
require.False(t, bg.Consensus.IsBanned(be))
require.Equal(t, 1, len(consensusGroup)) require.Equal(t, 1, len(consensusGroup))
}) })
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment