Commit 102a9c22 authored by Joshua Gutow's avatar Joshua Gutow Committed by GitHub

op-node: Switch attributes queue to be pull based (#3598)

The progress API is very nearly removed from the engine queue stage.
parent 48b7cc5b
......@@ -22,75 +22,60 @@ import (
// This stage can be reset by clearing it's batch buffer.
// This stage does not need to retain any references to L1 blocks.
type AttributesQueueOutput interface {
AddSafeAttributes(attributes *eth.PayloadAttributes)
SafeL2Head() eth.L2BlockRef
StageProgress
}
type AttributesQueue struct {
log log.Logger
config *rollup.Config
dl L1ReceiptsFetcher
next AttributesQueueOutput
prev *BatchQueue
progress Progress
batches []*BatchData
batch *BatchData
}
func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, next AttributesQueueOutput, prev *BatchQueue) *AttributesQueue {
func NewAttributesQueue(log log.Logger, cfg *rollup.Config, l1Fetcher L1ReceiptsFetcher, prev *BatchQueue) *AttributesQueue {
return &AttributesQueue{
log: log,
config: cfg,
dl: l1Fetcher,
next: next,
prev: prev,
}
}
func (aq *AttributesQueue) AddBatch(batch *BatchData) {
aq.log.Debug("Received next batch", "batch_epoch", batch.EpochNum, "batch_timestamp", batch.Timestamp, "tx_count", len(batch.Transactions))
aq.batches = append(aq.batches, batch)
}
func (aq *AttributesQueue) Progress() Progress {
return aq.progress
func (aq *AttributesQueue) Origin() eth.L1BlockRef {
return aq.prev.Origin()
}
func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error {
if aq.progress.Origin != aq.prev.Origin() {
aq.progress.Closed = false
aq.progress.Origin = aq.prev.Origin()
return nil
func (aq *AttributesQueue) NextAttributes(ctx context.Context, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
// Get a batch if we need it
if aq.batch == nil {
batch, err := aq.prev.NextBatch(ctx, l2SafeHead)
if err != nil {
return nil, err
}
aq.batch = batch
}
if len(aq.batches) == 0 {
batch, err := aq.prev.NextBatch(ctx, aq.next.SafeL2Head())
if err == io.EOF {
if !aq.progress.Closed {
aq.progress.Closed = true
return nil
// Actually generate the next attributes
if attrs, err := aq.createNextAttributes(ctx, aq.batch, l2SafeHead); err != nil {
return nil, err
} else {
return io.EOF
// Clear out the local state once we will succeed
aq.batch = nil
return attrs, nil
}
} else if err != nil {
return err
}
aq.batches = append(aq.batches, batch)
}
batch := aq.batches[0]
}
safeL2Head := aq.next.SafeL2Head()
// createNextAttributes transforms a batch into a payload attributes. This sets `NoTxPool` and appends the batched transactions
// to the attributes transaction list
func (aq *AttributesQueue) createNextAttributes(ctx context.Context, batch *BatchData, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
// sanity check parent hash
if batch.ParentHash != safeL2Head.Hash {
return NewCriticalError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, safeL2Head.Hash))
if batch.ParentHash != l2SafeHead.Hash {
return nil, NewResetError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, l2SafeHead.Hash))
}
fetchCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, safeL2Head, batch.Timestamp, batch.Epoch())
attrs, err := PreparePayloadAttributes(fetchCtx, aq.config, aq.dl, l2SafeHead, batch.Timestamp, batch.Epoch())
if err != nil {
return err
return nil, err
}
// we are verifying, not sequencing, we've got all transactions and do not pull from the tx-pool
......@@ -100,19 +85,9 @@ func (aq *AttributesQueue) Step(ctx context.Context, outer Progress) error {
aq.log.Info("generated attributes in payload queue", "txs", len(attrs.Transactions), "timestamp", batch.Timestamp)
// Slice off the batch once we are guaranteed to succeed
aq.batches = aq.batches[1:]
aq.next.AddSafeAttributes(attrs)
return nil
return attrs, nil
}
func (aq *AttributesQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error {
aq.batches = aq.batches[:0]
aq.progress = aq.next.Progress()
func (aq *AttributesQueue) Reset(ctx context.Context, _ eth.L1BlockRef) error {
return io.EOF
}
func (aq *AttributesQueue) SafeL2Head() eth.L2BlockRef {
return aq.next.SafeL2Head()
}
......@@ -2,7 +2,6 @@ package derive
import (
"context"
"io"
"math/big"
"math/rand"
"testing"
......@@ -17,30 +16,10 @@ import (
"github.com/ethereum/go-ethereum/log"
)
type MockAttributesQueueOutput struct {
MockOriginStage
}
func (m *MockAttributesQueueOutput) AddSafeAttributes(attributes *eth.PayloadAttributes) {
m.Mock.MethodCalled("AddSafeAttributes", attributes)
}
func (m *MockAttributesQueueOutput) ExpectAddSafeAttributes(attributes *eth.PayloadAttributes) {
m.Mock.On("AddSafeAttributes", attributes).Once().Return()
}
func (m *MockAttributesQueueOutput) SafeL2Head() eth.L2BlockRef {
return m.Mock.MethodCalled("SafeL2Head").Get(0).(eth.L2BlockRef)
}
func (m *MockAttributesQueueOutput) ExpectSafeL2Head(head eth.L2BlockRef) {
m.Mock.On("SafeL2Head").Once().Return(head)
}
var _ AttributesQueueOutput = (*MockAttributesQueueOutput)(nil)
func TestAttributesQueue_Step(t *testing.T) {
t.Skip("don't fake out batch queue")
// TestAttributesQueue checks that it properly uses the PreparePayloadAttributes function
// (which is well tested) and that it properly sets NoTxPool and adds in the candidate
// transactions.
func TestAttributesQueue(t *testing.T) {
// test config, only init the necessary fields
cfg := &rollup.Config{
BlockTime: 2,
......@@ -57,18 +36,9 @@ func TestAttributesQueue_Step(t *testing.T) {
l1Fetcher.ExpectInfoByHash(l1Info.InfoHash, l1Info, nil)
out := &MockAttributesQueueOutput{}
out.progress = Progress{
Origin: l1Info.BlockRef(),
Closed: false,
}
defer out.AssertExpectations(t)
safeHead := testutils.RandomL2BlockRef(rng)
safeHead.L1Origin = l1Info.ID()
out.ExpectSafeL2Head(safeHead)
batch := &BatchData{BatchV1{
ParentHash: safeHead.Hash,
EpochNum: rollup.Epoch(l1Info.InfoNum),
......@@ -86,13 +56,11 @@ func TestAttributesQueue_Step(t *testing.T) {
Transactions: []eth.Data{l1InfoTx, eth.Data("foobar"), eth.Data("example")},
NoTxPool: true,
}
out.ExpectAddSafeAttributes(&attrs)
aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, l1Fetcher, out, nil)
require.NoError(t, RepeatResetStep(t, aq.ResetStep, l1Fetcher, 1))
aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, l1Fetcher, nil)
aq.AddBatch(batch)
actual, err := aq.createNextAttributes(context.Background(), batch, safeHead)
require.NoError(t, aq.Step(context.Background(), out.progress), "adding batch to next stage, no EOF yet")
require.Equal(t, io.EOF, aq.Step(context.Background(), out.progress), "done with batches")
require.Nil(t, err)
require.Equal(t, attrs, *actual)
}
......@@ -17,6 +17,11 @@ import (
"github.com/ethereum/go-ethereum/log"
)
type NextAttributesProvider interface {
Origin() eth.L1BlockRef
NextAttributes(context.Context, eth.L2BlockRef) (*eth.PayloadAttributes, error)
}
type Engine interface {
GetPayload(ctx context.Context, payloadId eth.PayloadID) (*eth.ExecutionPayload, error)
ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error)
......@@ -64,8 +69,6 @@ type EngineQueue struct {
finalizedL1 eth.BlockID
progress Progress
safeAttributes []*eth.PayloadAttributes
unsafePayloads PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps
......@@ -73,14 +76,15 @@ type EngineQueue struct {
finalityData []FinalityData
engine Engine
prev NextAttributesProvider
progress Progress // only used for pipeline resets
metrics Metrics
}
var _ AttributesQueueOutput = (*EngineQueue)(nil)
// NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use.
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics) *EngineQueue {
func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider) *EngineQueue {
return &EngineQueue{
log: log,
cfg: cfg,
......@@ -91,6 +95,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M
MaxSize: maxUnsafePayloadsMemory,
SizeFn: payloadMemSize,
},
prev: prev,
}
}
......@@ -146,17 +151,30 @@ func (eq *EngineQueue) LastL2Time() uint64 {
return uint64(eq.safeAttributes[len(eq.safeAttributes)-1].Timestamp)
}
func (eq *EngineQueue) Step(ctx context.Context, outer Progress) error {
if changed, err := eq.progress.Update(outer); err != nil || changed {
return err
}
func (eq *EngineQueue) Step(ctx context.Context, _ Progress) error {
if len(eq.safeAttributes) > 0 {
return eq.tryNextSafeAttributes(ctx)
}
outOfData := false
if len(eq.safeAttributes) == 0 {
if next, err := eq.prev.NextAttributes(ctx, eq.safeHead); err == io.EOF {
outOfData = true
} else if err != nil {
return err
} else {
eq.safeAttributes = append(eq.safeAttributes, next)
return NotEnoughData
}
}
if eq.unsafePayloads.Len() > 0 {
return eq.tryNextUnsafePayload(ctx)
}
if outOfData {
return io.EOF
} else {
return nil
}
}
// tryFinalizeL2 traverses the past L1 blocks, checks if any has been finalized,
......@@ -186,11 +204,11 @@ func (eq *EngineQueue) postProcessSafeL2() {
eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:finalityLookback]...)
}
// remember the last L2 block that we fully derived from the given finality data
if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.progress.Origin.Number {
if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.prev.Origin().Number {
// append entry for new L1 block
eq.finalityData = append(eq.finalityData, FinalityData{
L2Block: eq.safeHead,
L1Block: eq.progress.Origin.ID(),
L1Block: eq.prev.Origin().ID(),
})
} else {
// if it's a now L2 block that was derived from the same latest L1 block, then just update the entry
......@@ -205,7 +223,7 @@ func (eq *EngineQueue) logSyncProgress(reason string) {
"l2_safe", eq.safeHead,
"l2_unsafe", eq.unsafeHead,
"l2_time", eq.unsafeHead.Time,
"l1_derived", eq.progress.Origin,
"l1_derived", eq.prev.Origin(),
)
}
......@@ -415,7 +433,6 @@ func (eq *EngineQueue) ResetStep(ctx context.Context, l1Fetcher L1Fetcher) error
// note: we do not clear the unsafe payloadds queue; if the payloads are not applicable anymore the parent hash checks will clear out the old payloads.
eq.progress = Progress{
Origin: pipelineOrigin,
Closed: false,
}
eq.metrics.RecordL2Ref("l2_finalized", finalized)
eq.metrics.RecordL2Ref("l2_safe", safe)
......
package derive
import (
"context"
"io"
"math/rand"
"testing"
......@@ -14,6 +16,20 @@ import (
"github.com/ethereum/go-ethereum/log"
)
type fakeAttributesQueue struct {
origin eth.L1BlockRef
}
func (f *fakeAttributesQueue) Origin() eth.L1BlockRef {
return f.origin
}
func (f *fakeAttributesQueue) NextAttributes(_ context.Context, _ eth.L2BlockRef) (*eth.PayloadAttributes, error) {
return nil, io.EOF
}
var _ NextAttributesProvider = (*fakeAttributesQueue)(nil)
func TestEngineQueue_Finalize(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
......@@ -211,8 +227,10 @@ func TestEngineQueue_Finalize(t *testing.T) {
l1F.ExpectL1BlockRefByHash(refB.Hash, refB, nil)
l1F.ExpectL1BlockRefByNumber(refB.Number, refB, nil)
eq := NewEngineQueue(logger, cfg, eng, metrics)
require.NoError(t, RepeatResetStep(t, eq.ResetStep, l1F, 20))
prev := &fakeAttributesQueue{}
eq := NewEngineQueue(logger, cfg, eng, metrics, prev)
require.ErrorIs(t, eq.ResetStep(context.Background(), l1F), io.EOF)
require.Equal(t, refB1, eq.SafeL2Head(), "L2 reset should go back to sequence window ago: blocks with origin E and D are not safe until we reconcile, C is extra, and B1 is the end we look for")
require.Equal(t, refB, eq.Progress().Origin, "Expecting to be set back derivation L1 progress to B")
......@@ -220,20 +238,19 @@ func TestEngineQueue_Finalize(t *testing.T) {
// now say C1 was included in D and became the new safe head
eq.progress.Origin = refD
prev.origin = refD
eq.safeHead = refC1
eq.postProcessSafeL2()
// now say D0 was included in E and became the new safe head
eq.progress.Origin = refE
prev.origin = refE
eq.safeHead = refD0
eq.postProcessSafeL2()
// let's finalize D (current L1), from which we fully derived C1 (it was safe head), but not D0 (included in E)
eq.Finalize(refD.ID())
// Now a few steps later, without consuming any additional L1 inputs,
// we should be able to resolve that B1 is now finalized, since it was included in finalized L1 block C
require.NoError(t, RepeatStep(t, eq.Step, eq.progress, 10))
require.Equal(t, refC1, eq.Finalized(), "C1 was included in finalized D, and should now be finalized")
l1F.AssertExpectations(t)
......
......@@ -101,13 +101,13 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch
bank := NewChannelBank(log, cfg, l1Src, l1Fetcher)
chInReader := NewChannelInReader(log, bank)
batchQueue := NewBatchQueue(log, cfg, chInReader)
attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, batchQueue)
// Push stages (that act like pull stages b/c we push from the innermost stages prior to the outermost stages)
eng := NewEngineQueue(log, cfg, engine, metrics)
attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, eng, batchQueue)
eng := NewEngineQueue(log, cfg, engine, metrics, attributesQueue)
stages := []Stage{eng, attributesQueue}
pullStages := []PullStage{batchQueue, chInReader, bank, l1Src, l1Traversal}
stages := []Stage{eng}
pullStages := []PullStage{attributesQueue, batchQueue, chInReader, bank, l1Src, l1Traversal}
return &DerivationPipeline{
log: log,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment