Commit db82b315 authored by Ethen's avatar Ethen Committed by GitHub

op-batcher: Support generic DA posting (#10489)

* op-batcher: Support generic DA commitment type

* op-batcher: Support generic DA commitment type - update generic flag info

* op-batcher: Support generic DA commitment type - change flag name to respect op-plasma spec

* op-batcher: Support generic DA commitment type - update devnet script to specify da svc

* op-batcher: Support generic DA commitment type - fix failing multi challenge test

* op-batcher: Support generic DA commitment type - fix failing multi challenge test

* op-batcher: Support generic DA commitment type - rm dbug regression in devnet script

* op-batcher: Support generic DA commitment type - fix bugs in commitment propagation from test da server

* op-batcher: Support generic DA commitment type - rm commitment type assertion when reading in op-node

* op-batcher: Support generic DA commitment type - rm dead code

* op-batcher: Support generic DA commitment type - disable da service in devnet script

* op-batcher: Support generic DA commitment type - less verbose comments

* op-batcher: Support generic DA commitment type - add tests for generic DA client <-> interactions

* op-batcher: Support generic DA commitment type - add tests for generic DA client <-> interactions

* op-batcher: Support generic DA commitment type - title case cli var
parent c4b37bf5
...@@ -273,8 +273,10 @@ def devnet_deploy(paths): ...@@ -273,8 +273,10 @@ def devnet_deploy(paths):
if DEVNET_PLASMA: if DEVNET_PLASMA:
docker_env['PLASMA_ENABLED'] = 'true' docker_env['PLASMA_ENABLED'] = 'true'
docker_env['PLASMA_DA_SERVICE'] = 'false'
else: else:
docker_env['PLASMA_ENABLED'] = 'false' docker_env['PLASMA_ENABLED'] = 'false'
docker_env['PLASMA_DA_SERVICE'] = 'false'
# Bring up the rest of the services. # Bring up the rest of the services.
log.info('Bringing up `op-node`, `op-proposer` and `op-batcher`.') log.info('Bringing up `op-node`, `op-proposer` and `op-batcher`.')
......
...@@ -67,12 +67,6 @@ func (s *PlasmaDataSource) Next(ctx context.Context) (eth.Data, error) { ...@@ -67,12 +67,6 @@ func (s *PlasmaDataSource) Next(ctx context.Context) (eth.Data, error) {
s.log.Warn("invalid commitment", "commitment", data, "err", err) s.log.Warn("invalid commitment", "commitment", data, "err", err)
return nil, NotEnoughData return nil, NotEnoughData
} }
// only support keccak256 commitments for now.
// TODO: support other commitment types via flag
if comm.CommitmentType() != plasma.Keccak256CommitmentType {
s.log.Warn("wrong commitment type", "commitmentType", comm.CommitmentType())
return nil, NotEnoughData
}
s.comm = comm s.comm = comm
} }
// use the commitment to fetch the input from the plasma DA provider. // use the commitment to fetch the input from the plasma DA provider.
......
...@@ -92,7 +92,7 @@ func TestPlasmaDataSource(t *testing.T) { ...@@ -92,7 +92,7 @@ func TestPlasmaDataSource(t *testing.T) {
} }
// keep track of random input data to validate against // keep track of random input data to validate against
var inputs [][]byte var inputs [][]byte
var comms []plasma.Keccak256Commitment var comms []plasma.CommitmentData
signer := cfg.L1Signer() signer := cfg.L1Signer()
......
...@@ -11,6 +11,7 @@ const ( ...@@ -11,6 +11,7 @@ const (
EnabledFlagName = "plasma.enabled" EnabledFlagName = "plasma.enabled"
DaServerAddressFlagName = "plasma.da-server" DaServerAddressFlagName = "plasma.da-server"
VerifyOnReadFlagName = "plasma.verify-on-read" VerifyOnReadFlagName = "plasma.verify-on-read"
DaServiceFlag = "plasma.da-service"
) )
func plasmaEnv(envprefix, v string) []string { func plasmaEnv(envprefix, v string) []string {
...@@ -39,6 +40,13 @@ func CLIFlags(envPrefix string, category string) []cli.Flag { ...@@ -39,6 +40,13 @@ func CLIFlags(envPrefix string, category string) []cli.Flag {
EnvVars: plasmaEnv(envPrefix, "VERIFY_ON_READ"), EnvVars: plasmaEnv(envPrefix, "VERIFY_ON_READ"),
Category: category, Category: category,
}, },
&cli.BoolFlag{
Name: DaServiceFlag,
Usage: "Use DA service type where commitments are generated by plasma server",
Value: false,
EnvVars: plasmaEnv(envPrefix, "DA_SERVICE"),
Category: category,
},
} }
} }
...@@ -46,6 +54,7 @@ type CLIConfig struct { ...@@ -46,6 +54,7 @@ type CLIConfig struct {
Enabled bool Enabled bool
DAServerURL string DAServerURL string
VerifyOnRead bool VerifyOnRead bool
GenericDA bool
} }
func (c CLIConfig) Check() error { func (c CLIConfig) Check() error {
...@@ -61,7 +70,7 @@ func (c CLIConfig) Check() error { ...@@ -61,7 +70,7 @@ func (c CLIConfig) Check() error {
} }
func (c CLIConfig) NewDAClient() *DAClient { func (c CLIConfig) NewDAClient() *DAClient {
return &DAClient{url: c.DAServerURL, verify: c.VerifyOnRead} return &DAClient{url: c.DAServerURL, verify: c.VerifyOnRead, precompute: !c.GenericDA}
} }
func ReadCLIConfig(c *cli.Context) CLIConfig { func ReadCLIConfig(c *cli.Context) CLIConfig {
...@@ -69,5 +78,6 @@ func ReadCLIConfig(c *cli.Context) CLIConfig { ...@@ -69,5 +78,6 @@ func ReadCLIConfig(c *cli.Context) CLIConfig {
Enabled: c.Bool(EnabledFlagName), Enabled: c.Bool(EnabledFlagName),
DAServerURL: c.String(DaServerAddressFlagName), DAServerURL: c.String(DaServerAddressFlagName),
VerifyOnRead: c.Bool(VerifyOnReadFlagName), VerifyOnRead: c.Bool(VerifyOnReadFlagName),
GenericDA: c.Bool(DaServiceFlag),
} }
} }
...@@ -17,16 +17,16 @@ var ErrInvalidInput = errors.New("invalid input") ...@@ -17,16 +17,16 @@ var ErrInvalidInput = errors.New("invalid input")
// DAClient is an HTTP client to communicate with a DA storage service. // DAClient is an HTTP client to communicate with a DA storage service.
// It creates commitments and retrieves input data + verifies if needed. // It creates commitments and retrieves input data + verifies if needed.
// Currently only supports Keccak256 commitments but may be extended eventually.
type DAClient struct { type DAClient struct {
url string url string
// VerifyOnRead sets the client to verify the commitment on read. // verify sets the client to verify a Keccak256 commitment on read.
// SHOULD enable if the storage service is not trusted.
verify bool verify bool
// whether commitment is precomputable (only applicable to keccak256)
precompute bool
} }
func NewDAClient(url string, verify bool) *DAClient { func NewDAClient(url string, verify bool, pc bool) *DAClient {
return &DAClient{url, verify} return &DAClient{url, verify, pc}
} }
// GetInput returns the input data for the given encoded commitment bytes. // GetInput returns the input data for the given encoded commitment bytes.
...@@ -50,6 +50,7 @@ func (c *DAClient) GetInput(ctx context.Context, comm CommitmentData) ([]byte, e ...@@ -50,6 +50,7 @@ func (c *DAClient) GetInput(ctx context.Context, comm CommitmentData) ([]byte, e
if err != nil { if err != nil {
return nil, err return nil, err
} }
if c.verify { if c.verify {
if err := comm.Verify(input); err != nil { if err := comm.Verify(input); err != nil {
return nil, err return nil, err
...@@ -59,18 +60,58 @@ func (c *DAClient) GetInput(ctx context.Context, comm CommitmentData) ([]byte, e ...@@ -59,18 +60,58 @@ func (c *DAClient) GetInput(ctx context.Context, comm CommitmentData) ([]byte, e
return input, nil return input, nil
} }
// SetInput sets the input data and returns the keccak256 hash commitment. // SetInput sets the input data and returns the respective commitment.
func (c *DAClient) SetInput(ctx context.Context, img []byte) (CommitmentData, error) { func (c *DAClient) SetInput(ctx context.Context, img []byte) (CommitmentData, error) {
if len(img) == 0 { if len(img) == 0 {
return nil, ErrInvalidInput return nil, ErrInvalidInput
} }
// TODO(#10312): this is hard-coded to produce Keccak256 commitments
comm := NewCommitmentData(Keccak256CommitmentType, img) if c.precompute { // precompute commitment (only applicable to keccak256)
comm := NewKeccak256Commitment(img)
if err := c.setInputWithCommit(ctx, comm, img); err != nil {
return nil, err
}
return comm, nil
}
// let DA server generate commitment
return c.setInput(ctx, img)
}
// setInputWithCommit sets a precomputed commitment for some pre-image data.
func (c *DAClient) setInputWithCommit(ctx context.Context, comm CommitmentData, img []byte) error {
// encode with commitment type prefix // encode with commitment type prefix
key := comm.Encode() key := comm.Encode()
body := bytes.NewReader(img) body := bytes.NewReader(img)
url := fmt.Sprintf("%s/put/0x%x", c.url, key) url := fmt.Sprintf("%s/put/0x%x", c.url, key)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, body) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, body)
if err != nil {
return fmt.Errorf("failed to create HTTP request: %w", err)
}
req.Header.Set("Content-Type", "application/octet-stream")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to store preimage: %v", resp.StatusCode)
}
return nil
}
// setInputs sets the input data and reads the respective DA generated commitment.
func (c *DAClient) setInput(ctx context.Context, img []byte) (CommitmentData, error) {
if len(img) == 0 {
return nil, ErrInvalidInput
}
body := bytes.NewReader(img)
url := fmt.Sprintf("%s/put/", c.url)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, body)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create HTTP request: %w", err) return nil, fmt.Errorf("failed to create HTTP request: %w", err)
} }
...@@ -81,7 +122,18 @@ func (c *DAClient) SetInput(ctx context.Context, img []byte) (CommitmentData, er ...@@ -81,7 +122,18 @@ func (c *DAClient) SetInput(ctx context.Context, img []byte) (CommitmentData, er
} }
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to store preimage: %v", resp.StatusCode) return nil, fmt.Errorf("failed to store data: %v", resp.StatusCode)
} }
b, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
comm, err := DecodeGenericCommitment(b)
if err != nil {
return nil, err
}
return comm, nil return comm, nil
} }
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/testlog" "github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
...@@ -44,7 +45,7 @@ func (s *MemStore) Put(ctx context.Context, key []byte, value []byte) error { ...@@ -44,7 +45,7 @@ func (s *MemStore) Put(ctx context.Context, key []byte, value []byte) error {
return nil return nil
} }
func TestDAClient(t *testing.T) { func TestDAClientPrecomputed(t *testing.T) {
store := NewMemStore() store := NewMemStore()
logger := testlog.Logger(t, log.LevelDebug) logger := testlog.Logger(t, log.LevelDebug)
...@@ -100,3 +101,62 @@ func TestDAClient(t *testing.T) { ...@@ -100,3 +101,62 @@ func TestDAClient(t *testing.T) {
_, err = client.GetInput(ctx, NewKeccak256Commitment(input)) _, err = client.GetInput(ctx, NewKeccak256Commitment(input))
require.Error(t, err) require.Error(t, err)
} }
func TestDAClientService(t *testing.T) {
store := NewMemStore()
logger := testlog.Logger(t, log.LevelDebug)
ctx := context.Background()
server := NewDAServer("127.0.0.1", 0, store, logger)
require.NoError(t, server.Start())
cfg := CLIConfig{
Enabled: true,
DAServerURL: fmt.Sprintf("http://%s", server.Endpoint()),
VerifyOnRead: false,
GenericDA: true,
}
require.NoError(t, cfg.Check())
client := cfg.NewDAClient()
rng := rand.New(rand.NewSource(1234))
input := RandomData(rng, 2000)
comm, err := client.SetInput(ctx, input)
require.NoError(t, err)
require.Equal(t, comm, NewGenericCommitment(crypto.Keccak256(input)))
stored, err := client.GetInput(ctx, comm)
require.NoError(t, err)
require.Equal(t, input, stored)
// set a bad commitment in the store
require.NoError(t, store.Put(ctx, comm.Encode(), []byte("bad data")))
// assert no error as generic commitments cannot be verified client side
_, err = client.GetInput(ctx, comm)
require.NoError(t, err)
// test not found error
comm = NewGenericCommitment(RandomData(rng, 32))
_, err = client.GetInput(ctx, comm)
require.ErrorIs(t, err, ErrNotFound)
// test storing bad data
_, err = client.SetInput(ctx, []byte{})
require.ErrorIs(t, err, ErrInvalidInput)
// server not responsive
require.NoError(t, server.Stop())
_, err = client.SetInput(ctx, input)
require.Error(t, err)
_, err = client.GetInput(ctx, NewGenericCommitment(input))
require.Error(t, err)
}
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/rpc" "github.com/ethereum-optimism/optimism/op-service/rpc"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -96,27 +97,32 @@ func (d *DAServer) HandleGet(w http.ResponseWriter, r *http.Request) { ...@@ -96,27 +97,32 @@ func (d *DAServer) HandleGet(w http.ResponseWriter, r *http.Request) {
key := path.Base(r.URL.Path) key := path.Base(r.URL.Path)
comm, err := hexutil.Decode(key) comm, err := hexutil.Decode(key)
if err != nil { if err != nil {
d.log.Error("Failed to decode commitment", "err", err, "key", key)
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
} }
input, err := d.store.Get(r.Context(), comm) input, err := d.store.Get(r.Context(), comm)
if err != nil && errors.Is(err, ErrNotFound) { if err != nil && errors.Is(err, ErrNotFound) {
d.log.Error("Commitment not found", "key", key, "error", err)
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
return return
} }
if err != nil { if err != nil {
d.log.Error("Failed to read commitment", "err", err, "key", key)
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
return return
} }
if _, err := w.Write(input); err != nil { if _, err := w.Write(input); err != nil {
d.log.Error("Failed to write pre-image", "err", err, "key", key)
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
return return
} }
}
w.WriteHeader(http.StatusOK)
}
func (d *DAServer) HandlePut(w http.ResponseWriter, r *http.Request) { func (d *DAServer) HandlePut(w http.ResponseWriter, r *http.Request) {
d.log.Debug("PUT", "url", r.URL) d.log.Info("PUT", "url", r.URL)
route := path.Dir(r.URL.Path) route := path.Dir(r.URL.Path)
if route != "/put" { if route != "/put" {
...@@ -126,26 +132,44 @@ func (d *DAServer) HandlePut(w http.ResponseWriter, r *http.Request) { ...@@ -126,26 +132,44 @@ func (d *DAServer) HandlePut(w http.ResponseWriter, r *http.Request) {
input, err := io.ReadAll(r.Body) input, err := io.ReadAll(r.Body)
if err != nil { if err != nil {
d.log.Error("Failed to read request body", "err", err)
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
} }
key := path.Base(r.URL.Path) if r.URL.Path == "/put" || r.URL.Path == "/put/" { // without commitment
comm, err := hexutil.Decode(key)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
if err := d.store.Put(r.Context(), comm, input); err != nil { comm := GenericCommitment(crypto.Keccak256Hash(input).Bytes())
d.log.Info("Failed to store commitment to the DA server", "err", err, "key", key) if err = d.store.Put(r.Context(), comm.Encode(), input); err != nil {
w.WriteHeader(http.StatusInternalServerError) d.log.Error("Failed to store commitment to the DA server", "err", err, "comm", comm)
return w.WriteHeader(http.StatusInternalServerError)
} return
if _, err := w.Write(comm); err != nil { }
w.WriteHeader(http.StatusInternalServerError)
return if _, err := w.Write(comm); err != nil {
d.log.Error("Failed to write commitment request body", "err", err, "comm", comm)
w.WriteHeader(http.StatusInternalServerError)
return
}
} else {
key := path.Base(r.URL.Path)
comm, err := hexutil.Decode(key)
if err != nil {
d.log.Error("Failed to decode commitment", "err", err, "key", key)
w.WriteHeader(http.StatusBadRequest)
return
}
if err := d.store.Put(r.Context(), comm, input); err != nil {
d.log.Error("Failed to store commitment to the DA server", "err", err, "key", key)
w.WriteHeader(http.StatusInternalServerError)
return
}
} }
w.WriteHeader(http.StatusOK)
} }
func (b *DAServer) Endpoint() string { func (b *DAServer) Endpoint() string {
......
...@@ -83,6 +83,7 @@ services: ...@@ -83,6 +83,7 @@ services:
--rpc.enable-admin --rpc.enable-admin
--safedb.path=/db --safedb.path=/db
--plasma.enabled=${PLASMA_ENABLED} --plasma.enabled=${PLASMA_ENABLED}
--plasma.da-service=${PLASMA_DA_SERVICE}
--plasma.da-server=http://da-server:3100 --plasma.da-server=http://da-server:3100
ports: ports:
- "7545:8545" - "7545:8545"
...@@ -156,6 +157,7 @@ services: ...@@ -156,6 +157,7 @@ services:
OP_BATCHER_RPC_ENABLE_ADMIN: "true" OP_BATCHER_RPC_ENABLE_ADMIN: "true"
OP_BATCHER_BATCH_TYPE: 0 OP_BATCHER_BATCH_TYPE: 0
OP_BATCHER_PLASMA_ENABLED: "${PLASMA_ENABLED}" OP_BATCHER_PLASMA_ENABLED: "${PLASMA_ENABLED}"
OP_BATCHER_PLASMA_DA_SERVICE: "${PLASMA_DA_SERVICE}"
OP_BATCHER_PLASMA_DA_SERVER: "http://da-server:3100" OP_BATCHER_PLASMA_DA_SERVER: "http://da-server:3100"
op-challenger: op-challenger:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment