Commit a5ebbf14 authored by Michael de Hoog's avatar Michael de Hoog

Add shadow compressor implementation

parent 710d8ba4
package batcher
import (
"fmt"
"strings"
"time"
"github.com/ethereum/go-ethereum/ethclient"
......@@ -11,6 +13,7 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-batcher/rpc"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/sources"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
......@@ -94,6 +97,9 @@ type CLIConfig struct {
// compression algorithm.
ApproxComprRatio float64
// CompressorKind is the compressor implementation to use.
CompressorKind CompressorKind
Stopped bool
TxMgrConfig txmgr.CLIConfig
......@@ -139,6 +145,7 @@ func NewConfig(ctx *cli.Context) CLIConfig {
TargetL1TxSize: ctx.GlobalUint64(flags.TargetL1TxSizeBytesFlag.Name),
TargetNumFrames: ctx.GlobalInt(flags.TargetNumFramesFlag.Name),
ApproxComprRatio: ctx.GlobalFloat64(flags.ApproxComprRatioFlag.Name),
CompressorKind: CompressorKind(strings.ToLower(ctx.GlobalString(flags.CompressorFlag.Name))),
Stopped: ctx.GlobalBool(flags.StoppedFlag.Name),
TxMgrConfig: txmgr.ReadCLIConfig(ctx),
RPCConfig: rpc.ReadCLIConfig(ctx),
......@@ -147,3 +154,52 @@ func NewConfig(ctx *cli.Context) CLIConfig {
PprofConfig: oppprof.ReadCLIConfig(ctx),
}
}
func (c CLIConfig) NewCompressor() (derive.Compressor, error) {
switch c.CompressorKind {
case CompressorShadow:
return NewShadowCompressor(
c.MaxL1TxSize - 1, // subtract 1 byte for version
)
default:
return NewTargetSizeCompressor(
c.TargetL1TxSize-1, // subtract 1 byte for version
c.TargetNumFrames,
c.ApproxComprRatio,
)
}
}
// CompressorKind identifies a compressor implementation.
type CompressorKind string
const (
CompressorTarget CompressorKind = "target"
CompressorShadow CompressorKind = "shadow"
)
var CompressorKinds = []CompressorKind{
CompressorTarget,
CompressorShadow,
}
func (kind CompressorKind) String() string {
return string(kind)
}
func (kind *CompressorKind) Set(value string) error {
if !ValidCompressorKind(CompressorKind(value)) {
return fmt.Errorf("unknown compressor kind: %q", value)
}
*kind = CompressorKind(value)
return nil
}
func ValidCompressorKind(value CompressorKind) bool {
for _, k := range CompressorKinds {
if k == value {
return true
}
}
return false
}
......@@ -75,11 +75,7 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
return nil, err
}
compressor, err := NewTargetSizeCompressor(
cfg.TargetL1TxSize-1, // subtract 1 byte for version
cfg.TargetNumFrames,
cfg.ApproxComprRatio,
)
compressor, err := cfg.NewCompressor()
if err != nil {
return nil, err
}
......
package batcher
import (
"bytes"
"compress/zlib"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)
type ShadowCompressor struct {
// The maximum byte-size a frame can have.
MaxFrameSize uint64
buf bytes.Buffer
compress *zlib.Writer
shadowBuf bytes.Buffer
shadowCompress *zlib.Writer
fullErr error
}
func NewShadowCompressor(maxFrameSize uint64) (derive.Compressor, error) {
c := &ShadowCompressor{
MaxFrameSize: maxFrameSize,
}
var err error
c.compress, err = zlib.NewWriterLevel(&c.buf, zlib.BestCompression)
if err != nil {
return nil, err
}
c.shadowCompress, err = zlib.NewWriterLevel(&c.shadowBuf, zlib.BestCompression)
if err != nil {
return nil, err
}
return c, nil
}
func (t *ShadowCompressor) Write(p []byte) (int, error) {
_, err := t.shadowCompress.Write(p)
if err != nil {
return 0, err
}
if t.Len() > 0 {
err = t.shadowCompress.Flush()
if err != nil {
return 0, err
}
if uint64(t.shadowBuf.Len()) > t.MaxFrameSize {
t.fullErr = derive.CompressorFullErr
return 0, t.fullErr
}
}
return t.compress.Write(p)
}
func (t *ShadowCompressor) Close() error {
return t.compress.Close()
}
func (t *ShadowCompressor) Read(p []byte) (int, error) {
return t.buf.Read(p)
}
func (t *ShadowCompressor) Reset() {
t.buf.Reset()
t.compress.Reset(&t.buf)
t.shadowBuf.Reset()
t.shadowCompress.Reset(&t.shadowBuf)
t.fullErr = nil
}
func (t *ShadowCompressor) Len() int {
return t.buf.Len()
}
func (t *ShadowCompressor) Flush() error {
return t.compress.Flush()
}
func (t *ShadowCompressor) FullErr() error {
return t.fullErr
}
......@@ -6,7 +6,9 @@ import (
"github.com/urfave/cli"
"github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-batcher/rpc"
"github.com/ethereum-optimism/optimism/op-node/flags"
opservice "github.com/ethereum-optimism/optimism/op-service"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
......@@ -85,6 +87,16 @@ var (
Value: 0.4,
EnvVar: opservice.PrefixEnvVar(EnvVarPrefix, "APPROX_COMPR_RATIO"),
}
CompressorFlag = cli.GenericFlag{
Name: "compressor",
Usage: "The type of compressor. Valid options: " +
flags.EnumString[batcher.CompressorKind](batcher.CompressorKinds),
EnvVar: opservice.PrefixEnvVar(envVarPrefix, "COMPRESSOR"),
Value: func() *batcher.CompressorKind {
out := batcher.CompressorTarget
return &out
}(),
}
StoppedFlag = cli.BoolFlag{
Name: "stopped",
Usage: "Initialize the batcher in a stopped state. The batcher can be started using the admin_startBatcher RPC",
......@@ -109,6 +121,7 @@ var optionalFlags = []cli.Flag{
TargetL1TxSizeBytesFlag,
TargetNumFramesFlag,
ApproxComprRatioFlag,
CompressorFlag,
StoppedFlag,
SequencerHDPathFlag,
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment