Commit dc29f9af authored by Sebastian Stammler's avatar Sebastian Stammler Committed by GitHub

op-node,sources: Add Beacon source option to fetch all sidecars (#9151)

* op-node,soruces: Add Beacon source option to fetch all sidecars

* Update op-service/sources/l1_beacon_client.go

Break out of inner loop after idx found
Co-authored-by: default avatarprotolambda <proto@protolambda.com>

* sources: Remove odering from blobsFromSidecars

* sources: Fix godoc of GetBlobSidecars

---------
Co-authored-by: default avatarprotolambda <proto@protolambda.com>
parent bf689a70
...@@ -21,7 +21,8 @@ func TestGetVersion(t *testing.T) { ...@@ -21,7 +21,8 @@ func TestGetVersion(t *testing.T) {
}) })
require.NoError(t, beaconApi.Start("127.0.0.1:0")) require.NoError(t, beaconApi.Start("127.0.0.1:0"))
cl := sources.NewL1BeaconClient(client.NewBasicHTTPClient(beaconApi.BeaconAddr(), l)) beaconCfg := sources.L1BeaconClientConfig{FetchAllSidecars: false}
cl := sources.NewL1BeaconClient(client.NewBasicHTTPClient(beaconApi.BeaconAddr(), l), beaconCfg)
version, err := cl.GetVersion(context.Background()) version, err := cl.GetVersion(context.Background())
require.NoError(t, err) require.NoError(t, err)
......
...@@ -56,6 +56,13 @@ var ( ...@@ -56,6 +56,13 @@ var (
Value: false, Value: false,
EnvVars: prefixEnvVars("L1_BEACON_IGNORE"), EnvVars: prefixEnvVars("L1_BEACON_IGNORE"),
} }
BeaconFetchAllSidecars = &cli.BoolFlag{
Name: "l1.beacon.fetch-all-sidecars",
Usage: "If true, all sidecars are fetched and filtered locally. Workaround for buggy Beacon nodes.",
Required: false,
Value: false,
EnvVars: prefixEnvVars("L1_BEACON_FETCH_ALL_SIDECARS"),
}
SyncModeFlag = &cli.GenericFlag{ SyncModeFlag = &cli.GenericFlag{
Name: "syncmode", Name: "syncmode",
Usage: fmt.Sprintf("IN DEVELOPMENT: Options are: %s", openum.EnumString(sync.ModeStrings)), Usage: fmt.Sprintf("IN DEVELOPMENT: Options are: %s", openum.EnumString(sync.ModeStrings)),
...@@ -285,6 +292,7 @@ var requiredFlags = []cli.Flag{ ...@@ -285,6 +292,7 @@ var requiredFlags = []cli.Flag{
var optionalFlags = []cli.Flag{ var optionalFlags = []cli.Flag{
BeaconAddr, BeaconAddr,
BeaconCheckIgnore, BeaconCheckIgnore,
BeaconFetchAllSidecars,
SyncModeFlag, SyncModeFlag,
RPCListenAddr, RPCListenAddr,
RPCListenPort, RPCListenPort,
......
...@@ -33,6 +33,7 @@ type L1BeaconEndpointSetup interface { ...@@ -33,6 +33,7 @@ type L1BeaconEndpointSetup interface {
Setup(ctx context.Context, log log.Logger) (cl client.HTTP, err error) Setup(ctx context.Context, log log.Logger) (cl client.HTTP, err error)
// ShouldIgnoreBeaconCheck returns true if the Beacon-node version check should not halt startup. // ShouldIgnoreBeaconCheck returns true if the Beacon-node version check should not halt startup.
ShouldIgnoreBeaconCheck() bool ShouldIgnoreBeaconCheck() bool
ShouldFetchAllSidecars() bool
Check() error Check() error
} }
...@@ -175,8 +176,9 @@ func (cfg *PreparedL1Endpoint) Check() error { ...@@ -175,8 +176,9 @@ func (cfg *PreparedL1Endpoint) Check() error {
} }
type L1BeaconEndpointConfig struct { type L1BeaconEndpointConfig struct {
BeaconAddr string // Address of L1 User Beacon-API endpoint to use (beacon namespace required) BeaconAddr string // Address of L1 User Beacon-API endpoint to use (beacon namespace required)
BeaconCheckIgnore bool // When false, halt startup if the beacon version endpoint fails BeaconCheckIgnore bool // When false, halt startup if the beacon version endpoint fails
BeaconFetchAllSidecars bool // Whether to fetch all blob sidecars and filter locally
} }
var _ L1BeaconEndpointSetup = (*L1BeaconEndpointConfig)(nil) var _ L1BeaconEndpointSetup = (*L1BeaconEndpointConfig)(nil)
...@@ -195,3 +197,7 @@ func (cfg *L1BeaconEndpointConfig) Check() error { ...@@ -195,3 +197,7 @@ func (cfg *L1BeaconEndpointConfig) Check() error {
func (cfg *L1BeaconEndpointConfig) ShouldIgnoreBeaconCheck() bool { func (cfg *L1BeaconEndpointConfig) ShouldIgnoreBeaconCheck() bool {
return cfg.BeaconCheckIgnore return cfg.BeaconCheckIgnore
} }
func (cfg *L1BeaconEndpointConfig) ShouldFetchAllSidecars() bool {
return cfg.BeaconFetchAllSidecars
}
...@@ -30,9 +30,7 @@ import ( ...@@ -30,9 +30,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources"
) )
var ( var ErrAlreadyClosed = errors.New("node is already closed")
ErrAlreadyClosed = errors.New("node is already closed")
)
type OpNode struct { type OpNode struct {
log log.Logger log log.Logger
...@@ -308,7 +306,10 @@ func (n *OpNode) initL1BeaconAPI(ctx context.Context, cfg *Config) error { ...@@ -308,7 +306,10 @@ func (n *OpNode) initL1BeaconAPI(ctx context.Context, cfg *Config) error {
if err != nil { if err != nil {
return fmt.Errorf("failed to setup L1 Beacon API client: %w", err) return fmt.Errorf("failed to setup L1 Beacon API client: %w", err)
} }
n.beacon = sources.NewL1BeaconClient(httpClient) beaconCfg := sources.L1BeaconClientConfig{
FetchAllSidecars: cfg.Beacon.ShouldFetchAllSidecars(),
}
n.beacon = sources.NewL1BeaconClient(httpClient, beaconCfg)
// Retry retrieval of the Beacon API version, to be more robust on startup against Beacon API connection issues. // Retry retrieval of the Beacon API version, to be more robust on startup against Beacon API connection issues.
beaconVersion, missingEndpoint, err := retry.Do2[string, bool](ctx, 5, retry.Exponential(), func() (string, bool, error) { beaconVersion, missingEndpoint, err := retry.Do2[string, bool](ctx, 5, retry.Exponential(), func() (string, bool, error) {
......
...@@ -126,8 +126,9 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { ...@@ -126,8 +126,9 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) {
func NewBeaconEndpointConfig(ctx *cli.Context) node.L1BeaconEndpointSetup { func NewBeaconEndpointConfig(ctx *cli.Context) node.L1BeaconEndpointSetup {
return &node.L1BeaconEndpointConfig{ return &node.L1BeaconEndpointConfig{
BeaconAddr: ctx.String(flags.BeaconAddr.Name), BeaconAddr: ctx.String(flags.BeaconAddr.Name),
BeaconCheckIgnore: ctx.Bool(flags.BeaconCheckIgnore.Name), BeaconCheckIgnore: ctx.Bool(flags.BeaconCheckIgnore.Name),
BeaconFetchAllSidecars: ctx.Bool(flags.BeaconFetchAllSidecars.Name),
} }
} }
......
...@@ -8,7 +8,6 @@ import ( ...@@ -8,7 +8,6 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"path" "path"
"slices"
"strconv" "strconv"
"sync" "sync"
...@@ -25,16 +24,21 @@ const ( ...@@ -25,16 +24,21 @@ const (
sidecarsMethodPrefix = "eth/v1/beacon/blob_sidecars/" sidecarsMethodPrefix = "eth/v1/beacon/blob_sidecars/"
) )
type L1BeaconClientConfig struct {
FetchAllSidecars bool
}
type L1BeaconClient struct { type L1BeaconClient struct {
cl client.HTTP cl client.HTTP
cfg L1BeaconClientConfig
initLock sync.Mutex initLock sync.Mutex
timeToSlotFn TimeToSlotFn timeToSlotFn TimeToSlotFn
} }
// NewL1BeaconClient returns a client for making requests to an L1 consensus layer node. // NewL1BeaconClient returns a client for making requests to an L1 consensus layer node.
func NewL1BeaconClient(cl client.HTTP) *L1BeaconClient { func NewL1BeaconClient(cl client.HTTP, cfg L1BeaconClientConfig) *L1BeaconClient {
return &L1BeaconClient{cl: cl} return &L1BeaconClient{cl: cl, cfg: cfg}
} }
func (cl *L1BeaconClient) apiReq(ctx context.Context, dest any, reqPath string, reqQuery url.Values) error { func (cl *L1BeaconClient) apiReq(ctx context.Context, dest any, reqPath string, reqQuery url.Values) error {
...@@ -42,7 +46,7 @@ func (cl *L1BeaconClient) apiReq(ctx context.Context, dest any, reqPath string, ...@@ -42,7 +46,7 @@ func (cl *L1BeaconClient) apiReq(ctx context.Context, dest any, reqPath string,
headers.Add("Accept", "application/json") headers.Add("Accept", "application/json")
resp, err := cl.cl.Get(ctx, reqPath, reqQuery, headers) resp, err := cl.cl.Get(ctx, reqPath, reqQuery, headers)
if err != nil { if err != nil {
return fmt.Errorf("%w: http Get failed", err) return fmt.Errorf("http Get failed: %w", err)
} }
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
errMsg, _ := io.ReadAll(resp.Body) errMsg, _ := io.ReadAll(resp.Body)
...@@ -54,7 +58,7 @@ func (cl *L1BeaconClient) apiReq(ctx context.Context, dest any, reqPath string, ...@@ -54,7 +58,7 @@ func (cl *L1BeaconClient) apiReq(ctx context.Context, dest any, reqPath string,
return err return err
} }
if err := resp.Body.Close(); err != nil { if err := resp.Body.Close(); err != nil {
return fmt.Errorf("%w: failed to close response body", err) return fmt.Errorf("failed to close response body: %w", err)
} }
return nil return nil
} }
...@@ -93,37 +97,54 @@ func (cl *L1BeaconClient) GetTimeToSlotFn(ctx context.Context) (TimeToSlotFn, er ...@@ -93,37 +97,54 @@ func (cl *L1BeaconClient) GetTimeToSlotFn(ctx context.Context) (TimeToSlotFn, er
return cl.timeToSlotFn, nil return cl.timeToSlotFn, nil
} }
// GetBlobSidecars fetches blob sidecars that were confirmed in the specified L1 block with the // GetBlobSidecars fetches blob sidecars that were confirmed in the specified
// given indexed hashes. Order of the returned sidecars is not guaranteed, and blob data is not // L1 block with the given indexed hashes.
// checked for validity. // Order of the returned sidecars is guaranteed to be that of the hashes.
// Blob data is not checked for validity.
func (cl *L1BeaconClient) GetBlobSidecars(ctx context.Context, ref eth.L1BlockRef, hashes []eth.IndexedBlobHash) ([]*eth.BlobSidecar, error) { func (cl *L1BeaconClient) GetBlobSidecars(ctx context.Context, ref eth.L1BlockRef, hashes []eth.IndexedBlobHash) ([]*eth.BlobSidecar, error) {
if len(hashes) == 0 { if len(hashes) == 0 {
return []*eth.BlobSidecar{}, nil return []*eth.BlobSidecar{}, nil
} }
slotFn, err := cl.GetTimeToSlotFn(ctx) slotFn, err := cl.GetTimeToSlotFn(ctx)
if err != nil { if err != nil {
return nil, fmt.Errorf("%w: failed to get time to slot function", err) return nil, fmt.Errorf("failed to get time to slot function: %w", err)
} }
slot, err := slotFn(ref.Time) slot, err := slotFn(ref.Time)
if err != nil { if err != nil {
return nil, fmt.Errorf("%w: error in converting ref.Time to slot", err) return nil, fmt.Errorf("error in converting ref.Time to slot: %w", err)
} }
reqPath := path.Join(sidecarsMethodPrefix, strconv.FormatUint(slot, 10)) reqPath := path.Join(sidecarsMethodPrefix, strconv.FormatUint(slot, 10))
reqQuery := url.Values{} var reqQuery url.Values
for i := range hashes { if !cl.cfg.FetchAllSidecars {
reqQuery.Add("indices", strconv.FormatUint(hashes[i].Index, 10)) reqQuery = url.Values{}
for i := range hashes {
reqQuery.Add("indices", strconv.FormatUint(hashes[i].Index, 10))
}
} }
var resp eth.APIGetBlobSidecarsResponse var resp eth.APIGetBlobSidecarsResponse
if err := cl.apiReq(ctx, &resp, reqPath, reqQuery); err != nil { if err := cl.apiReq(ctx, &resp, reqPath, reqQuery); err != nil {
return nil, fmt.Errorf("%w: failed to fetch blob sidecars for slot %v block %v", err, slot, ref) return nil, fmt.Errorf("failed to fetch blob sidecars for slot %v block %v: %w", slot, ref, err)
}
apiscs := make([]*eth.APIBlobSidecar, 0, len(hashes))
// filter and order by hashes
for _, h := range hashes {
for _, apisc := range resp.Data {
if h.Index == uint64(apisc.Index) {
apiscs = append(apiscs, apisc)
break
}
}
} }
if len(hashes) != len(resp.Data) {
if len(hashes) != len(apiscs) {
return nil, fmt.Errorf("expected %v sidecars but got %v", len(hashes), len(resp.Data)) return nil, fmt.Errorf("expected %v sidecars but got %v", len(hashes), len(resp.Data))
} }
bscs := make([]*eth.BlobSidecar, 0, len(hashes)) bscs := make([]*eth.BlobSidecar, 0, len(hashes))
for _, apisc := range resp.Data { for _, apisc := range apiscs {
bscs = append(bscs, apisc.BlobSidecar()) bscs = append(bscs, apisc.BlobSidecar())
} }
...@@ -137,24 +158,22 @@ func (cl *L1BeaconClient) GetBlobSidecars(ctx context.Context, ref eth.L1BlockRe ...@@ -137,24 +158,22 @@ func (cl *L1BeaconClient) GetBlobSidecars(ctx context.Context, ref eth.L1BlockRe
func (cl *L1BeaconClient) GetBlobs(ctx context.Context, ref eth.L1BlockRef, hashes []eth.IndexedBlobHash) ([]*eth.Blob, error) { func (cl *L1BeaconClient) GetBlobs(ctx context.Context, ref eth.L1BlockRef, hashes []eth.IndexedBlobHash) ([]*eth.Blob, error) {
blobSidecars, err := cl.GetBlobSidecars(ctx, ref, hashes) blobSidecars, err := cl.GetBlobSidecars(ctx, ref, hashes)
if err != nil { if err != nil {
return nil, fmt.Errorf("%w: failed to get blob sidecars for L1BlockRef %s", err, ref) return nil, fmt.Errorf("failed to get blob sidecars for L1BlockRef %s: %w", ref, err)
} }
return blobsFromSidecars(blobSidecars, hashes) return blobsFromSidecars(blobSidecars, hashes)
} }
func blobsFromSidecars(blobSidecars []*eth.BlobSidecar, hashes []eth.IndexedBlobHash) ([]*eth.Blob, error) { func blobsFromSidecars(blobSidecars []*eth.BlobSidecar, hashes []eth.IndexedBlobHash) ([]*eth.Blob, error) {
if len(blobSidecars) != len(hashes) {
return nil, fmt.Errorf("number of hashes and blobSidecars mismatch, %d != %d", len(hashes), len(blobSidecars))
}
out := make([]*eth.Blob, len(hashes)) out := make([]*eth.Blob, len(hashes))
for i, ih := range hashes { for i, ih := range hashes {
// The beacon node api makes no guarantees on order of the returned blob sidecars, so sidecar := blobSidecars[i]
// search for the sidecar that matches the current indexed hash to ensure blobs are if sidx := uint64(sidecar.Index); sidx != ih.Index {
// returned in the same order. return nil, fmt.Errorf("expected sidecars to be ordered by hashes, but got %d != %d", sidx, ih.Index)
scIndex := slices.IndexFunc(
blobSidecars,
func(sc *eth.BlobSidecar) bool { return uint64(sc.Index) == ih.Index })
if scIndex == -1 {
return nil, fmt.Errorf("no blob in response matches desired index: %v", ih.Index)
} }
sidecar := blobSidecars[scIndex]
// make sure the blob's kzg commitment hashes to the expected value // make sure the blob's kzg commitment hashes to the expected value
hash := eth.KZGToVersionedHash(kzg4844.Commitment(sidecar.KZGCommitment)) hash := eth.KZGToVersionedHash(kzg4844.Commitment(sidecar.KZGCommitment))
...@@ -164,7 +183,7 @@ func blobsFromSidecars(blobSidecars []*eth.BlobSidecar, hashes []eth.IndexedBlob ...@@ -164,7 +183,7 @@ func blobsFromSidecars(blobSidecars []*eth.BlobSidecar, hashes []eth.IndexedBlob
// confirm blob data is valid by verifying its proof against the commitment // confirm blob data is valid by verifying its proof against the commitment
if err := eth.VerifyBlobProof(&sidecar.Blob, kzg4844.Commitment(sidecar.KZGCommitment), kzg4844.Proof(sidecar.KZGProof)); err != nil { if err := eth.VerifyBlobProof(&sidecar.Blob, kzg4844.Commitment(sidecar.KZGCommitment), kzg4844.Proof(sidecar.KZGProof)); err != nil {
return nil, fmt.Errorf("%w: blob at index %d failed verification", err, i) return nil, fmt.Errorf("blob at index %d failed verification: %w", i, err)
} }
out[i] = &sidecar.Blob out[i] = &sidecar.Blob
} }
......
...@@ -41,9 +41,18 @@ func TestBlobsFromSidecars(t *testing.T) { ...@@ -41,9 +41,18 @@ func TestBlobsFromSidecars(t *testing.T) {
hashes := []eth.IndexedBlobHash{index0, index1, index2} hashes := []eth.IndexedBlobHash{index0, index1, index2}
// put the sidecars in scrambled order of expectation to confirm function appropriately // put the sidecars in scrambled order to confirm error
// reorders the output to match that of the blob hashes
sidecars := []*eth.BlobSidecar{sidecar2, sidecar0, sidecar1} sidecars := []*eth.BlobSidecar{sidecar2, sidecar0, sidecar1}
_, err := blobsFromSidecars(sidecars, hashes)
require.Error(t, err)
// too few sidecars should error
sidecars = []*eth.BlobSidecar{sidecar0, sidecar1}
_, err = blobsFromSidecars(sidecars, hashes)
require.Error(t, err)
// correct order should work
sidecars = []*eth.BlobSidecar{sidecar0, sidecar1, sidecar2}
blobs, err := blobsFromSidecars(sidecars, hashes) blobs, err := blobsFromSidecars(sidecars, hashes)
require.NoError(t, err) require.NoError(t, err)
// confirm order by checking first blob byte against expected index // confirm order by checking first blob byte against expected index
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment