Commit 5f47148c authored by clabby's avatar clabby Committed by GitHub

feat(op-program): 4844 blobs fetcher in host (#9105)

* Add blob preimage type

* rebase

* Start 4844 host changes

* implement hint route

* fix prefetcher test

* Hook up prefetcher / flags / mocks

* Test

* tidy

* @protolambda review

* fix fmt

* flags

* @refcell review

* rebase
parent 1e580e50
......@@ -6,6 +6,8 @@ require (
github.com/BurntSushi/toml v1.3.2
github.com/btcsuite/btcd v0.24.0
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0
github.com/consensys/gnark-crypto v0.12.1
github.com/crate-crypto/go-kzg-4844 v0.7.0
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0
github.com/ethereum-optimism/go-ethereum-hdwallet v0.1.3
github.com/ethereum-optimism/superchain-registry/superchain v0.0.0-20240131175747-1300b1825140
......@@ -70,12 +72,10 @@ require (
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
github.com/consensys/bavard v0.1.13 // indirect
github.com/consensys/gnark-crypto v0.12.1 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/crate-crypto/go-ipa v0.0.0-20230601170251-1830d0757c80 // indirect
github.com/crate-crypto/go-kzg-4844 v0.7.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/deckarep/golang-set/v2 v2.1.0 // indirect
......
......@@ -35,14 +35,15 @@ var (
type Config struct {
Rollup *rollup.Config
// DataDir is the directory to read/write pre-image data from/to.
//If not set, an in-memory key-value store is used and fetching data must be enabled
// If not set, an in-memory key-value store is used and fetching data must be enabled
DataDir string
// L1Head is the block has of the L1 chain head block
L1Head common.Hash
L1URL string
L1TrustRPC bool
L1RPCKind sources.RPCProviderKind
L1Head common.Hash
L1URL string
L1BeaconURL string
L1TrustRPC bool
L1RPCKind sources.RPCProviderKind
// L2Head is the l2 block hash contained in the L2 Output referenced by the L2OutputRoot
// TODO(inphi): This can be made optional with hardcoded rollup configs and output oracle addresses by searching the oracle for the l2 output root
......@@ -104,6 +105,7 @@ func (c *Config) Check() error {
}
func (c *Config) FetchingEnabled() bool {
// TODO: Include Beacon URL once cancun is active on all chains we fault prove.
return c.L1URL != "" && c.L2URL != ""
}
......@@ -193,6 +195,7 @@ func NewConfigFromCLI(log log.Logger, ctx *cli.Context) (*Config, error) {
L2ClaimBlockNumber: l2ClaimBlockNum,
L1Head: l1Head,
L1URL: ctx.String(flags.L1NodeAddr.Name),
L1BeaconURL: ctx.String(flags.L1BeaconAddr.Name),
L1TrustRPC: ctx.Bool(flags.L1TrustRPC.Name),
L1RPCKind: sources.RPCProviderKind(ctx.String(flags.L1RPCProviderKind.Name)),
ExecCmd: ctx.String(flags.Exec.Name),
......
......@@ -75,6 +75,12 @@ var (
Usage: "Address of L1 JSON-RPC endpoint to use (eth namespace required)",
EnvVars: prefixEnvVars("L1_RPC"),
}
L1BeaconAddr = &cli.StringFlag{
Name: "l1.beacon",
Usage: "Address of L1 Beacon API endpoint to use",
EnvVars: prefixEnvVars("L1_BEACON_API"),
Hidden: true,
}
L1TrustRPC = &cli.BoolFlag{
Name: "l1.trustrpc",
Usage: "Trust the L1 RPC, sync faster at risk of malicious/buggy RPC providing bad or inconsistent L1 data",
......@@ -112,6 +118,7 @@ var requiredFlags = []cli.Flag{
L2Claim,
L2BlockNumber,
}
var programFlags = []cli.Flag{
RollupConfig,
Network,
......@@ -119,6 +126,7 @@ var programFlags = []cli.Flag{
L2NodeAddr,
L2GenesisPath,
L1NodeAddr,
L1BeaconAddr,
L1TrustRPC,
L1RPCProviderKind,
Exec,
......
......@@ -201,12 +201,14 @@ func makePrefetcher(ctx context.Context, logger log.Logger, kv kvstore.KV, cfg *
if err != nil {
return nil, fmt.Errorf("failed to create L1 client: %w", err)
}
l1Beacon := client.NewBasicHTTPClient(cfg.L1BeaconURL, logger)
l1BlobFetcher := sources.NewL1BeaconClient(l1Beacon, sources.L1BeaconClientConfig{FetchAllSidecars: false})
l2Cl, err := NewL2Client(l2RPC, logger, nil, &L2ClientConfig{L2ClientConfig: l2ClCfg, L2Head: cfg.L2Head})
if err != nil {
return nil, fmt.Errorf("failed to create L2 client: %w", err)
}
l2DebugCl := &L2Source{L2Client: l2Cl, DebugClient: sources.NewDebugClient(l2RPC.CallContext)}
return prefetcher.NewPrefetcher(logger, l1Cl, l2DebugCl, kv), nil
return prefetcher.NewPrefetcher(logger, l1Cl, l1BlobFetcher, l2DebugCl, kv), nil
}
func routeHints(logger log.Logger, hHostRW io.ReadWriter, hinter preimage.HintHandler) chan error {
......
......@@ -2,6 +2,7 @@ package prefetcher
import (
"context"
"encoding/binary"
"errors"
"fmt"
"strings"
......@@ -17,6 +18,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
)
type L1Source interface {
......@@ -25,6 +27,11 @@ type L1Source interface {
FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error)
}
type L1BlobSource interface {
GetBlobSidecars(ctx context.Context, ref eth.L1BlockRef, hashes []eth.IndexedBlobHash) ([]*eth.BlobSidecar, error)
GetBlobs(ctx context.Context, ref eth.L1BlockRef, hashes []eth.IndexedBlobHash) ([]*eth.Blob, error)
}
type L2Source interface {
InfoAndTxsByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Transactions, error)
NodeByHash(ctx context.Context, hash common.Hash) ([]byte, error)
......@@ -33,19 +40,21 @@ type L2Source interface {
}
type Prefetcher struct {
logger log.Logger
l1Fetcher L1Source
l2Fetcher L2Source
lastHint string
kvStore kvstore.KV
logger log.Logger
l1Fetcher L1Source
l1BlobFetcher L1BlobSource
l2Fetcher L2Source
lastHint string
kvStore kvstore.KV
}
func NewPrefetcher(logger log.Logger, l1Fetcher L1Source, l2Fetcher L2Source, kvStore kvstore.KV) *Prefetcher {
func NewPrefetcher(logger log.Logger, l1Fetcher L1Source, l1BlobFetcher L1BlobSource, l2Fetcher L2Source, kvStore kvstore.KV) *Prefetcher {
return &Prefetcher{
logger: logger,
l1Fetcher: NewRetryingL1Source(logger, l1Fetcher),
l2Fetcher: NewRetryingL2Source(logger, l2Fetcher),
kvStore: kvStore,
logger: logger,
l1Fetcher: NewRetryingL1Source(logger, l1Fetcher),
l1BlobFetcher: NewRetryingL1BlobSource(logger, l1BlobFetcher),
l2Fetcher: NewRetryingL2Source(logger, l2Fetcher),
kvStore: kvStore,
}
}
......@@ -75,13 +84,17 @@ func (p *Prefetcher) GetPreimage(ctx context.Context, key common.Hash) ([]byte,
}
func (p *Prefetcher) prefetch(ctx context.Context, hint string) error {
hintType, hash, err := parseHint(hint)
hintType, hintBytes, err := parseHint(hint)
if err != nil {
return err
}
p.logger.Debug("Prefetching", "type", hintType, "hash", hash)
p.logger.Debug("Prefetching", "type", hintType, "bytes", hexutil.Bytes(hintBytes))
switch hintType {
case l1.HintL1BlockHeader:
if len(hintBytes) != 32 {
return fmt.Errorf("invalid L1 block hint: %x", hint)
}
hash := common.Hash(hintBytes)
header, err := p.l1Fetcher.InfoByHash(ctx, hash)
if err != nil {
return fmt.Errorf("failed to fetch L1 block %s header: %w", hash, err)
......@@ -92,18 +105,68 @@ func (p *Prefetcher) prefetch(ctx context.Context, hint string) error {
}
return p.kvStore.Put(preimage.Keccak256Key(hash).PreimageKey(), data)
case l1.HintL1Transactions:
if len(hintBytes) != 32 {
return fmt.Errorf("invalid L1 transactions hint: %x", hint)
}
hash := common.Hash(hintBytes)
_, txs, err := p.l1Fetcher.InfoAndTxsByHash(ctx, hash)
if err != nil {
return fmt.Errorf("failed to fetch L1 block %s txs: %w", hash, err)
}
return p.storeTransactions(txs)
case l1.HintL1Receipts:
if len(hintBytes) != 32 {
return fmt.Errorf("invalid L1 receipts hint: %x", hint)
}
hash := common.Hash(hintBytes)
_, receipts, err := p.l1Fetcher.FetchReceipts(ctx, hash)
if err != nil {
return fmt.Errorf("failed to fetch L1 block %s receipts: %w", hash, err)
}
return p.storeReceipts(receipts)
case l1.HintL1Blob:
if len(hintBytes) != 48 {
return fmt.Errorf("invalid blob hint: %x", hint)
}
blobVersionHash := common.Hash(hintBytes[:32])
blobHashIndex := binary.BigEndian.Uint64(hintBytes[32:40])
refTimestamp := binary.BigEndian.Uint64(hintBytes[40:48])
// Fetch the blob sidecar for the indexed blob hash passed in the hint.
indexedBlobHash := eth.IndexedBlobHash{
Hash: blobVersionHash,
Index: blobHashIndex,
}
// We pass an `eth.L1BlockRef`, but `GetBlobSidecars` only uses the timestamp, which we received in the hint.
sidecars, err := p.l1BlobFetcher.GetBlobSidecars(ctx, eth.L1BlockRef{Time: refTimestamp}, []eth.IndexedBlobHash{indexedBlobHash})
if err != nil || len(sidecars) != 1 {
return fmt.Errorf("failed to fetch blob sidecars for %s %d: %w", blobVersionHash, blobHashIndex, err)
}
sidecar := sidecars[0]
// Put the preimage for the versioned hash into the kv store
if err = p.kvStore.Put(preimage.Sha256Key(blobVersionHash).PreimageKey(), sidecar.KZGCommitment[:]); err != nil {
return err
}
// Put all of the blob's field elements into the kv store. There should be 4096. The preimage oracle key for
// each field element is the keccak256 hash of `abi.encodePacked(sidecar.KZGCommitment, uint256(i))`
blobKey := make([]byte, 80)
copy(blobKey[:48], sidecar.KZGCommitment[:])
for i := 0; i < params.BlobTxFieldElementsPerBlob; i++ {
binary.BigEndian.PutUint64(blobKey[72:], uint64(i))
blobKeyHash := crypto.Keccak256Hash(blobKey)
if err = p.kvStore.Put(preimage.BlobKey(blobKeyHash).PreimageKey(), sidecar.Blob[i<<5:(i+1)<<5]); err != nil {
return err
}
}
return nil
case l2.HintL2BlockHeader, l2.HintL2Transactions:
if len(hintBytes) != 32 {
return fmt.Errorf("invalid L2 header/tx hint: %x", hint)
}
hash := common.Hash(hintBytes)
header, txs, err := p.l2Fetcher.InfoAndTxsByHash(ctx, hash)
if err != nil {
return fmt.Errorf("failed to fetch L2 block %s: %w", hash, err)
......@@ -118,18 +181,30 @@ func (p *Prefetcher) prefetch(ctx context.Context, hint string) error {
}
return p.storeTransactions(txs)
case l2.HintL2StateNode:
if len(hintBytes) != 32 {
return fmt.Errorf("invalid L2 state node hint: %x", hint)
}
hash := common.Hash(hintBytes)
node, err := p.l2Fetcher.NodeByHash(ctx, hash)
if err != nil {
return fmt.Errorf("failed to fetch L2 state node %s: %w", hash, err)
}
return p.kvStore.Put(preimage.Keccak256Key(hash).PreimageKey(), node)
case l2.HintL2Code:
if len(hintBytes) != 32 {
return fmt.Errorf("invalid L2 code hint: %x", hint)
}
hash := common.Hash(hintBytes)
code, err := p.l2Fetcher.CodeByHash(ctx, hash)
if err != nil {
return fmt.Errorf("failed to fetch L2 contract code %s: %w", hash, err)
}
return p.kvStore.Put(preimage.Keccak256Key(hash).PreimageKey(), code)
case l2.HintL2Output:
if len(hintBytes) != 32 {
return fmt.Errorf("invalid L2 output hint: %x", hint)
}
hash := common.Hash(hintBytes)
output, err := p.l2Fetcher.OutputByRoot(ctx, hash)
if err != nil {
return fmt.Errorf("failed to fetch L2 output root %s: %w", hash, err)
......@@ -167,14 +242,15 @@ func (p *Prefetcher) storeTrieNodes(values []hexutil.Bytes) error {
}
// parseHint parses a hint string in wire protocol. Returns the hint type, requested hash and error (if any).
func parseHint(hint string) (string, common.Hash, error) {
hintType, hashStr, found := strings.Cut(hint, " ")
func parseHint(hint string) (string, []byte, error) {
hintType, bytesStr, found := strings.Cut(hint, " ")
if !found {
return "", common.Hash{}, fmt.Errorf("unsupported hint: %s", hint)
return "", nil, fmt.Errorf("unsupported hint: %s", hint)
}
hash := common.HexToHash(hashStr)
if hash == (common.Hash{}) {
return "", common.Hash{}, fmt.Errorf("invalid hash: %s", hashStr)
hintBytes, err := hexutil.Decode(bytesStr)
if err != nil {
return "", make([]byte, 0), fmt.Errorf("invalid bytes: %s", bytesStr)
}
return hintType, hash, nil
return hintType, hintBytes, nil
}
......@@ -59,6 +59,42 @@ func (s *RetryingL1Source) FetchReceipts(ctx context.Context, blockHash common.H
var _ L1Source = (*RetryingL1Source)(nil)
type RetryingL1BlobSource struct {
logger log.Logger
source L1BlobSource
strategy retry.Strategy
}
func NewRetryingL1BlobSource(logger log.Logger, source L1BlobSource) *RetryingL1BlobSource {
return &RetryingL1BlobSource{
logger: logger,
source: source,
strategy: retry.Exponential(),
}
}
func (s *RetryingL1BlobSource) GetBlobSidecars(ctx context.Context, ref eth.L1BlockRef, hashes []eth.IndexedBlobHash) ([]*eth.BlobSidecar, error) {
return retry.Do(ctx, maxAttempts, s.strategy, func() ([]*eth.BlobSidecar, error) {
sidecars, err := s.source.GetBlobSidecars(ctx, ref, hashes)
if err != nil {
s.logger.Warn("Failed to retrieve blob sidecars", "ref", ref, "err", err)
}
return sidecars, err
})
}
func (s *RetryingL1BlobSource) GetBlobs(ctx context.Context, ref eth.L1BlockRef, hashes []eth.IndexedBlobHash) ([]*eth.Blob, error) {
return retry.Do(ctx, maxAttempts, s.strategy, func() ([]*eth.Blob, error) {
blobs, err := s.source.GetBlobs(ctx, ref, hashes)
if err != nil {
s.logger.Warn("Failed to retrieve blobs", "ref", ref, "err", err)
}
return blobs, err
})
}
var _ L1BlobSource = (*RetryingL1BlobSource)(nil)
type RetryingL2Source struct {
logger log.Logger
source L2Source
......
......@@ -2,6 +2,7 @@ package prefetcher
import (
"context"
"crypto/sha256"
"errors"
"testing"
......@@ -11,6 +12,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/retry"
......@@ -111,6 +113,117 @@ func createL1Source(t *testing.T) (*RetryingL1Source, *testutils.MockL1Source) {
return source, mock
}
func TestRetryingL1BlobSource(t *testing.T) {
ctx := context.Background()
blob := GetRandBlob(0xFACADE)
commitment, err := kzgCtx.BlobToKZGCommitment(blob, 0)
require.NoError(t, err)
versionedHash := sha256.Sum256(commitment[:])
versionedHash[0] = params.BlobTxHashVersion
blobHash := eth.IndexedBlobHash{Hash: versionedHash, Index: 0xFACADE}
l1BlockRef := eth.L1BlockRef{Time: 0}
t.Run("GetBlobs Success", func(t *testing.T) {
source, mock := createL1BlobSource(t)
defer mock.AssertExpectations(t)
mock.ExpectOnGetBlobs(
ctx,
l1BlockRef,
[]eth.IndexedBlobHash{blobHash},
[]*eth.Blob{(*eth.Blob)(&blob)},
nil,
)
result, err := source.GetBlobs(ctx, l1BlockRef, []eth.IndexedBlobHash{blobHash})
require.NoError(t, err)
require.Equal(t, len(result), 1)
require.Equal(t, blob[:], result[0][:])
})
t.Run("GetBlobs Error", func(t *testing.T) {
source, mock := createL1BlobSource(t)
defer mock.AssertExpectations(t)
expectedErr := errors.New("boom")
mock.ExpectOnGetBlobs(
ctx,
l1BlockRef,
[]eth.IndexedBlobHash{blobHash},
nil,
expectedErr,
)
mock.ExpectOnGetBlobs(
ctx,
l1BlockRef,
[]eth.IndexedBlobHash{blobHash},
[]*eth.Blob{(*eth.Blob)(&blob)},
nil,
)
result, err := source.GetBlobs(ctx, l1BlockRef, []eth.IndexedBlobHash{blobHash})
require.NoError(t, err)
require.Equal(t, len(result), 1)
require.Equal(t, blob[:], result[0][:])
})
t.Run("GetBlobSidecars Success", func(t *testing.T) {
source, mock := createL1BlobSource(t)
defer mock.AssertExpectations(t)
mock.ExpectOnGetBlobSidecars(
ctx,
l1BlockRef,
[]eth.IndexedBlobHash{blobHash},
(eth.Bytes48)(commitment),
[]*eth.Blob{(*eth.Blob)(&blob)},
nil,
)
result, err := source.GetBlobSidecars(ctx, l1BlockRef, []eth.IndexedBlobHash{blobHash})
require.NoError(t, err)
require.Equal(t, len(result), 1)
require.Equal(t, blob[:], result[0].Blob[:])
require.Equal(t, blobHash.Index, uint64(result[0].Index))
require.Equal(t, (eth.Bytes48)(commitment), result[0].KZGCommitment)
})
t.Run("GetBlobSidecars Error", func(t *testing.T) {
source, mock := createL1BlobSource(t)
defer mock.AssertExpectations(t)
expectedErr := errors.New("boom")
mock.ExpectOnGetBlobSidecars(
ctx,
l1BlockRef,
[]eth.IndexedBlobHash{blobHash},
(eth.Bytes48)(commitment),
[]*eth.Blob{(*eth.Blob)(&blob)},
expectedErr,
)
mock.ExpectOnGetBlobSidecars(
ctx,
l1BlockRef,
[]eth.IndexedBlobHash{blobHash},
(eth.Bytes48)(commitment),
[]*eth.Blob{(*eth.Blob)(&blob)},
nil,
)
result, err := source.GetBlobSidecars(ctx, l1BlockRef, []eth.IndexedBlobHash{blobHash})
require.NoError(t, err)
require.Equal(t, len(result), 1)
require.Equal(t, blob[:], result[0].Blob[:])
require.Equal(t, blobHash.Index, uint64(result[0].Index))
require.Equal(t, (eth.Bytes48)(commitment), result[0].KZGCommitment)
})
}
func createL1BlobSource(t *testing.T) (*RetryingL1BlobSource, *testutils.MockBlobsFetcher) {
logger := testlog.Logger(t, log.LvlDebug)
mock := &testutils.MockBlobsFetcher{}
source := NewRetryingL1BlobSource(logger, mock)
// Avoid sleeping in tests by using a fixed retry strategy with no delay
source.strategy = retry.Fixed(0)
return source, mock
}
func TestRetryingL2Source(t *testing.T) {
ctx := context.Background()
hash := common.Hash{0xab}
......
package testutils
import (
"context"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/stretchr/testify/mock"
)
type MockBlobsFetcher struct {
mock.Mock
}
func (cl *MockBlobsFetcher) GetBlobs(ctx context.Context, ref eth.L1BlockRef, hashes []eth.IndexedBlobHash) ([]*eth.Blob, error) {
out := cl.Mock.MethodCalled("GetBlobs", ref, hashes)
return out.Get(0).([]*eth.Blob), out.Error(1)
}
func (cl *MockBlobsFetcher) ExpectOnGetBlobs(ctx context.Context, ref eth.L1BlockRef, hashes []eth.IndexedBlobHash, blobs []*eth.Blob, err error) {
cl.Mock.On("GetBlobs", ref, hashes).Once().Return(blobs, err)
}
func (cl *MockBlobsFetcher) GetBlobSidecars(ctx context.Context, ref eth.L1BlockRef, hashes []eth.IndexedBlobHash) ([]*eth.BlobSidecar, error) {
out := cl.Mock.MethodCalled("GetBlobSidecars", ref, hashes)
return out.Get(0).([]*eth.BlobSidecar), out.Error(1)
}
func (cl *MockBlobsFetcher) ExpectOnGetBlobSidecars(ctx context.Context, ref eth.L1BlockRef, hashes []eth.IndexedBlobHash, commitment eth.Bytes48, blobs []*eth.Blob, err error) {
cl.Mock.On("GetBlobSidecars", ref, hashes).Once().Return([]*eth.BlobSidecar{{
Blob: *blobs[0],
Index: eth.Uint64String(hashes[0].Index),
KZGCommitment: commitment,
}}, err)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment