Commit 1f3c5279 authored by Roberto Bayardo's avatar Roberto Bayardo

improve compression size estimation in shadow compression

parent 1a34669c
...@@ -7,6 +7,17 @@ import ( ...@@ -7,6 +7,17 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
) )
const (
// safeCompressionOverhead is the largest potential blow-up in bytes we expect to see when
// compressing arbitrary (e.g. random) data. Here we account for a 2 byte header, 4 byte
// digest, 5 byte EOF indicator, and then 5 byte flate block header for each 16k of potential
// data. Assuming frames are max 128k size (the current max blob size) this is 2+4+5+(5*8) = 51
// bytes. If we start using larger frames (e.g. should max blob size increase) a larger blowup
// might be possible, but it would be highly unlikely, and the system still works if our
// estimate is wrong -- we just end up writing one more tx for the overflow.
safeCompressionOverhead = 51
)
type ShadowCompressor struct { type ShadowCompressor struct {
config Config config Config
...@@ -17,6 +28,8 @@ type ShadowCompressor struct { ...@@ -17,6 +28,8 @@ type ShadowCompressor struct {
shadowCompress *zlib.Writer shadowCompress *zlib.Writer
fullErr error fullErr error
bound uint64 // best known upperbound on the size of the compressed output
} }
// NewShadowCompressor creates a new derive.Compressor implementation that contains two // NewShadowCompressor creates a new derive.Compressor implementation that contains two
...@@ -41,26 +54,39 @@ func NewShadowCompressor(config Config) (derive.Compressor, error) { ...@@ -41,26 +54,39 @@ func NewShadowCompressor(config Config) (derive.Compressor, error) {
return nil, err return nil, err
} }
c.bound = safeCompressionOverhead
return c, nil return c, nil
} }
func (t *ShadowCompressor) Write(p []byte) (int, error) { func (t *ShadowCompressor) Write(p []byte) (int, error) {
_, err := t.shadowCompress.Write(p) if t.fullErr != nil {
if err != nil { return 0, t.fullErr
return 0, err
} }
err = t.shadowCompress.Flush() _, err := t.shadowCompress.Write(p)
if err != nil { if err != nil {
return 0, err return 0, err
} }
if uint64(t.shadowBuf.Len()) > t.config.TargetFrameSize*uint64(t.config.TargetNumFrames) { newBound := t.bound + uint64(len(p))
t.fullErr = derive.CompressorFullErr cap := t.config.TargetFrameSize * uint64(t.config.TargetNumFrames)
if t.Len() > 0 { if newBound > cap {
// only return an error if we've already written data to this compressor before // Do not flush the buffer unless there's some chance we will be over the size limit.
// (otherwise individual blocks over the target would never be written) // This reduces CPU but more importantly it makes the shadow compression ratio more
return 0, t.fullErr // closely reflect the ultimate compression ratio.
err = t.shadowCompress.Flush()
if err != nil {
return 0, err
}
newBound = uint64(t.shadowBuf.Len()) + 4 // + 4 is to account for the digest written on close()
if newBound > cap {
t.fullErr = derive.CompressorFullErr
if t.Len() > 0 {
// only return an error if we've already written data to this compressor before
// (otherwise individual blocks over the target would never be written)
return 0, t.fullErr
}
} }
} }
t.bound = newBound
return t.compress.Write(p) return t.compress.Write(p)
} }
...@@ -78,6 +104,7 @@ func (t *ShadowCompressor) Reset() { ...@@ -78,6 +104,7 @@ func (t *ShadowCompressor) Reset() {
t.shadowBuf.Reset() t.shadowBuf.Reset()
t.shadowCompress.Reset(&t.shadowBuf) t.shadowCompress.Reset(&t.shadowBuf)
t.fullErr = nil t.fullErr = nil
t.bound = safeCompressionOverhead
} }
func (t *ShadowCompressor) Len() int { func (t *ShadowCompressor) Len() int {
......
package compressor_test package compressor
import ( import (
"bytes" "bytes"
"compress/zlib" "compress/zlib"
"crypto/rand"
"io" "io"
"math/rand"
"testing" "testing"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
var r *rand.Rand
func init() {
r = rand.New(rand.NewSource(99))
}
func randomBytes(t *testing.T, length int) []byte { func randomBytes(t *testing.T, length int) []byte {
b := make([]byte, length) b := make([]byte, length)
_, err := rand.Read(b) _, err := r.Read(b)
require.NoError(t, err) require.NoError(t, err)
return b return b
} }
...@@ -52,9 +57,9 @@ func TestShadowCompressor(t *testing.T) { ...@@ -52,9 +57,9 @@ func TestShadowCompressor(t *testing.T) {
fullErr: derive.CompressorFullErr, fullErr: derive.CompressorFullErr,
}, { }, {
name: "random data", name: "random data",
targetFrameSize: 1200, targetFrameSize: 1 << 17,
targetNumFrames: 1, targetNumFrames: 1,
data: [][]byte{randomBytes(t, 512), randomBytes(t, 512), randomBytes(t, 512)}, data: [][]byte{randomBytes(t, (1<<17)-1000), randomBytes(t, 512), randomBytes(t, 512)},
errs: []error{nil, nil, derive.CompressorFullErr}, errs: []error{nil, nil, derive.CompressorFullErr},
fullErr: derive.CompressorFullErr, fullErr: derive.CompressorFullErr,
}} }}
...@@ -64,7 +69,7 @@ func TestShadowCompressor(t *testing.T) { ...@@ -64,7 +69,7 @@ func TestShadowCompressor(t *testing.T) {
t.Parallel() t.Parallel()
require.Equal(t, len(test.errs), len(test.data), "invalid test case: len(data) != len(errs)") require.Equal(t, len(test.errs), len(test.data), "invalid test case: len(data) != len(errs)")
sc, err := compressor.NewShadowCompressor(compressor.Config{ sc, err := NewShadowCompressor(Config{
TargetFrameSize: test.targetFrameSize, TargetFrameSize: test.targetFrameSize,
TargetNumFrames: test.targetNumFrames, TargetNumFrames: test.targetNumFrames,
}) })
...@@ -88,6 +93,7 @@ func TestShadowCompressor(t *testing.T) { ...@@ -88,6 +93,7 @@ func TestShadowCompressor(t *testing.T) {
err = sc.Close() err = sc.Close()
require.NoError(t, err) require.NoError(t, err)
require.LessOrEqual(t, uint64(sc.Len()), sc.(*ShadowCompressor).bound)
buf, err := io.ReadAll(sc) buf, err := io.ReadAll(sc)
require.NoError(t, err) require.NoError(t, err)
...@@ -110,3 +116,28 @@ func TestShadowCompressor(t *testing.T) { ...@@ -110,3 +116,28 @@ func TestShadowCompressor(t *testing.T) {
}) })
} }
} }
// TestBoundInaccruateForLargeRandomData documents where our bounding heuristic starts to fail
// (writing at least 128k of random data)
func TestBoundInaccurateForLargeRandomData(t *testing.T) {
var sizeLimit int = 1 << 17
sc, err := NewShadowCompressor(Config{
TargetFrameSize: uint64(sizeLimit + 100),
TargetNumFrames: 1,
})
require.NoError(t, err)
_, err = sc.Write(randomBytes(t, sizeLimit+1))
require.NoError(t, err)
err = sc.Close()
require.NoError(t, err)
require.Greater(t, uint64(sc.Len()), sc.(*ShadowCompressor).bound)
sc.Reset()
_, err = sc.Write(randomBytes(t, sizeLimit))
require.NoError(t, err)
err = sc.Close()
require.NoError(t, err)
require.LessOrEqual(t, uint64(sc.Len()), sc.(*ShadowCompressor).bound)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment