Commit df1123f0 authored by protolambda's avatar protolambda

op-node: get block with sync status, make finalization more safe, fix finality delay with engine

parent 1c95fc27
......@@ -177,8 +177,16 @@ func (s *L1Replica) L1Client(t Testing, cfg *rollup.Config) *sources.L1Client {
// ActL1FinalizeNext finalizes the next block, which must be marked as safe before doing so (see ActL1SafeNext).
func (s *L1Replica) ActL1FinalizeNext(t Testing) {
safe := s.l1Chain.CurrentSafeBlock()
finalizedNum := s.l1Chain.CurrentFinalizedBlock().NumberU64()
if safe.NumberU64() <= finalizedNum {
safeNum := uint64(0)
if safe != nil {
safeNum = safe.NumberU64()
}
finalized := s.l1Chain.CurrentFinalizedBlock()
finalizedNum := uint64(0)
if finalized != nil {
finalizedNum = finalized.NumberU64()
}
if safeNum <= finalizedNum {
t.InvalidAction("need to move forward safe block before moving finalized block")
return
}
......@@ -192,7 +200,11 @@ func (s *L1Replica) ActL1FinalizeNext(t Testing) {
// ActL1SafeNext marks the next unsafe block as safe.
func (s *L1Replica) ActL1SafeNext(t Testing) {
safe := s.l1Chain.CurrentSafeBlock()
next := s.l1Chain.GetBlockByNumber(safe.NumberU64() + 1)
safeNum := uint64(0)
if safe != nil {
safeNum = safe.NumberU64()
}
next := s.l1Chain.GetBlockByNumber(safeNum + 1)
if next == nil {
t.InvalidAction("if head of chain is marked as safe then there's no next block")
return
......
......@@ -7,15 +7,16 @@ import (
"io"
"math/big"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
)
type SyncStatusAPI interface {
......@@ -149,8 +150,8 @@ func (s *L2Batcher) ActL2BatchSubmit(t Testing) {
data.WriteByte(derive.DerivationVersion0)
// subtract one, to account for the version byte
if err := s.l2ChannelOut.OutputFrame(data, s.l2BatcherCfg.MaxL1TxSize-1); err == io.EOF {
s.l2ChannelOut = nil
s.l2Submitting = false
// there may still be some data to submit
} else if err != nil {
s.l2Submitting = false
t.Fatalf("failed to output channel data to frame: %v", err)
......
......@@ -4,12 +4,14 @@ import (
"math/big"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/testlog"
)
......@@ -87,3 +89,108 @@ func TestBatcher(gt *testing.T) {
require.False(t, isPending)
require.NotNil(t, vTx)
}
func TestL2Finalization(gt *testing.T) {
t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
sd := e2eutils.Setup(t, dp, defaultAlloc)
log := testlog.Logger(t, log.LvlDebug)
miner, engine, sequencer := setupSequencerTest(t, sd, log)
sequencer.ActL2PipelineFull(t)
// build an empty L1 block (#1), mark it as justified
miner.ActEmptyBlock(t)
miner.ActL1SafeNext(t) // #0 -> #1
// sequencer builds L2 chain, up to and including a block that has the new L1 block as origin
sequencer.ActL1HeadSignal(t)
sequencer.ActBuildToL1Head(t)
sequencer.ActL2PipelineFull(t)
sequencer.ActL1SafeSignal(t)
require.Equal(t, uint64(1), sequencer.SyncStatus().SafeL1.Number)
// build another L1 block (#2), mark it as justified. And mark previous justified as finalized.
miner.ActEmptyBlock(t)
miner.ActL1SafeNext(t) // #1 -> #2
miner.ActL1FinalizeNext(t) // #0 -> #1
sequencer.ActL1HeadSignal(t)
sequencer.ActBuildToL1Head(t)
// continue to build L2 chain referencing the new L1 blocks
sequencer.ActL2PipelineFull(t)
sequencer.ActL1FinalizedSignal(t)
sequencer.ActL1SafeSignal(t)
require.Equal(t, uint64(2), sequencer.SyncStatus().SafeL1.Number)
require.Equal(t, uint64(1), sequencer.SyncStatus().FinalizedL1.Number)
require.Equal(t, uint64(0), sequencer.SyncStatus().FinalizedL2.Number, "L2 block has to be included on L1 before it can be finalized")
batcher := NewL2Batcher(log, sd.RollupCfg, &BatcherCfg{
MinL1TxSize: 0,
MaxL1TxSize: 128_000,
BatcherKey: dp.Secrets.Batcher,
}, sequencer.RollupClient(), miner.EthClient(), engine.EthClient())
heightToSubmit := sequencer.SyncStatus().UnsafeL2.Number
batcher.ActSubmitAll(t)
// confirm batch on L1, block #3
miner.ActL1StartBlock(12)(t)
miner.ActL1IncludeTx(dp.Addresses.Batcher)(t)
miner.ActL1EndBlock(t)
// read the batch
sequencer.ActL2PipelineFull(t)
require.Equal(t, uint64(0), sequencer.SyncStatus().FinalizedL2.Number, "Batch must be included in finalized part of L1 chain for L2 block to finalize")
// build some more L2 blocks, so there is an unsafe part again that hasn't been submitted yet
sequencer.ActL1HeadSignal(t)
sequencer.ActBuildToL1Head(t)
// submit those blocks too, block #4
batcher.ActSubmitAll(t)
miner.ActL1StartBlock(12)(t)
miner.ActL1IncludeTx(dp.Addresses.Batcher)(t)
miner.ActL1EndBlock(t)
// add some more L1 blocks #5, #6
miner.ActEmptyBlock(t)
miner.ActEmptyBlock(t)
// and more unsafe L2 blocks
sequencer.ActL1HeadSignal(t)
sequencer.ActBuildToL1Head(t)
// move safe/finalize markers: finalize the L1 chain block with the first batch, but not the second
miner.ActL1SafeNext(t) // #2 -> #3
miner.ActL1SafeNext(t) // #3 -> #4
miner.ActL1FinalizeNext(t) // #1 -> #2
miner.ActL1FinalizeNext(t) // #2 -> #3
sequencer.ActL2PipelineFull(t)
sequencer.ActL1FinalizedSignal(t)
sequencer.ActL1SafeSignal(t)
sequencer.ActL1HeadSignal(t)
require.Equal(t, uint64(6), sequencer.SyncStatus().HeadL1.Number)
require.Equal(t, uint64(4), sequencer.SyncStatus().SafeL1.Number)
require.Equal(t, uint64(3), sequencer.SyncStatus().FinalizedL1.Number)
require.Equal(t, heightToSubmit, sequencer.SyncStatus().FinalizedL2.Number, "finalized L2 blocks in first batch")
// need to act with the engine on the signals still
sequencer.ActL2PipelineFull(t)
engCl := engine.EngineClient(t, sd.RollupCfg)
engBlock, err := engCl.L2BlockRefByLabel(t.Ctx(), eth.Finalized)
require.NoError(t, err)
require.Equal(t, heightToSubmit, engBlock.Number, "engine finalizes what rollup node finalizes")
// Now try to finalize block 4, but with a bad/malicious alternative hash.
// If we get this false signal, we shouldn't finalize the L2 chain.
altBlock4 := sequencer.SyncStatus().SafeL1
altBlock4.Hash = common.HexToHash("0xdead")
sequencer.derivation.Finalize(altBlock4)
sequencer.ActL2PipelineFull(t)
require.Equal(t, uint64(3), sequencer.SyncStatus().FinalizedL1.Number)
require.Equal(t, heightToSubmit, sequencer.SyncStatus().FinalizedL2.Number, "unknown/bad finalized L1 blocks are ignored")
}
......@@ -118,13 +118,14 @@ func (s *L2Verifier) L2Unsafe() eth.L2BlockRef {
func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
return &eth.SyncStatus{
CurrentL1: s.derivation.Origin(),
HeadL1: s.l1State.L1Head(),
SafeL1: s.l1State.L1Safe(),
FinalizedL1: s.l1State.L1Finalized(),
UnsafeL2: s.L2Unsafe(),
SafeL2: s.L2Safe(),
FinalizedL2: s.L2Finalized(),
CurrentL1: s.derivation.Origin(),
CurrentL1Finalized: s.derivation.FinalizedL1(),
HeadL1: s.l1State.L1Head(),
SafeL1: s.l1State.L1Safe(),
FinalizedL1: s.l1State.L1Finalized(),
UnsafeL2: s.L2Unsafe(),
SafeL2: s.L2Safe(),
FinalizedL2: s.L2Finalized(),
}
}
......@@ -160,15 +161,16 @@ func (s *L2Verifier) ActL1HeadSignal(t Testing) {
}
func (s *L2Verifier) ActL1SafeSignal(t Testing) {
head, err := s.l1.L1BlockRefByLabel(t.Ctx(), eth.Safe)
safe, err := s.l1.L1BlockRefByLabel(t.Ctx(), eth.Safe)
require.NoError(t, err)
s.l1State.HandleNewL1SafeBlock(head)
s.l1State.HandleNewL1SafeBlock(safe)
}
func (s *L2Verifier) ActL1FinalizedSignal(t Testing) {
head, err := s.l1.L1BlockRefByLabel(t.Ctx(), eth.Finalized)
finalized, err := s.l1.L1BlockRefByLabel(t.Ctx(), eth.Finalized)
require.NoError(t, err)
s.l1State.HandleNewL1FinalizedBlock(head)
s.l1State.HandleNewL1FinalizedBlock(finalized)
s.derivation.Finalize(finalized)
}
// ActL2PipelineStep runs one iteration of the L2 derivation pipeline
......
......@@ -3,10 +3,18 @@ package eth
// SyncStatus is a snapshot of the driver.
// Values may be zeroed if not yet initialized.
type SyncStatus struct {
// CurrentL1 is the block that the derivation process is currently at,
// this may not be fully derived into L2 data yet.
// CurrentL1 is the L1 block that the derivation process is currently at in the inner-most stage.
// This may not be fully derived into L2 data yet.
// The safe L2 blocks were produced/included fully from the L1 chain up to and including this L1 block.
// If the node is synced, this matches the HeadL1, minus the verifier confirmation distance.
CurrentL1 L1BlockRef `json:"current_l1"`
// CurrentL1Finalized is the L1 block that the derivation process is currently accepting as finalized
// in the inner-most stage,
// This may not be fully derived into L2 data yet.
// The finalized L2 blocks were produced/included fully from the L1 chain up to and including this L1 block.
// This may lag behind the FinalizedL1 when the FinalizedL1 could not yet be verified
// to be canonical w.r.t. the currently derived L2 chain. It may be zeroed if no block could be verified yet.
CurrentL1Finalized L1BlockRef `json:"current_l1_finalized"`
// HeadL1 is the perceived head of the L1 chain, no confirmation distance.
// The head is not guaranteed to build on the other L1 sync status fields,
// as the node may be in progress of resetting to adapt to a L1 reorg.
......
......@@ -72,7 +72,7 @@ type EngineQueue struct {
// This update may repeat if the engine returns a temporary error.
needForkchoiceUpdate bool
finalizedL1 eth.BlockID
finalizedL1 eth.L1BlockRef
safeAttributes []*eth.PayloadAttributes
unsafePayloads PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps
......@@ -83,7 +83,7 @@ type EngineQueue struct {
engine Engine
prev NextAttributesProvider
origin eth.L1BlockRef // only used for pipeline resets
origin eth.L1BlockRef // updated on resets, and whenever we read from the previous stage.
metrics Metrics
l1Fetcher L1Fetcher
......@@ -106,6 +106,7 @@ func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics M
}
}
// Origin identifies the L1 chain (incl.) that included and/or produced all the safe L2 blocks.
func (eq *EngineQueue) Origin() eth.L1BlockRef {
return eq.origin
}
......@@ -134,9 +135,28 @@ func (eq *EngineQueue) AddSafeAttributes(attributes *eth.PayloadAttributes) {
eq.safeAttributes = append(eq.safeAttributes, attributes)
}
func (eq *EngineQueue) Finalize(l1Origin eth.BlockID) {
eq.finalizedL1 = l1Origin
eq.tryFinalizeL2()
func (eq *EngineQueue) Finalize(l1Origin eth.L1BlockRef) {
if l1Origin.Number < eq.finalizedL1.Number {
eq.log.Error("ignoring old L1 finalized block signal! Is the L1 provider corrupted?", "prev_finalized_l1", eq.finalizedL1, "signaled_finalized_l1", l1Origin)
return
}
// Perform a safety check: the L1 finalization signal is only accepted if we previously processed the L1 block.
// This prevents a corrupt L1 provider from tricking us in recognizing a L1 block inconsistent with the L1 chain we are on.
// Missing a finality signal due to empty buffer is fine, it will finalize when the buffer is filled again.
for _, fd := range eq.finalityData {
if fd.L1Block == l1Origin.ID() {
eq.finalizedL1 = l1Origin
eq.tryFinalizeL2()
return
}
}
eq.log.Warn("ignoring finalization signal for unknown L1 block, waiting for new L1 blocks in buffer", "prev_finalized_l1", eq.finalizedL1, "signaled_finalized_l1", l1Origin)
}
// FinalizedL1 identifies the L1 chain (incl.) that included and/or produced all the finalized L2 blocks.
// This may return a zeroed ID if no finalization signals have been seen yet.
func (eq *EngineQueue) FinalizedL1() eth.L1BlockRef {
return eq.finalizedL1
}
func (eq *EngineQueue) Finalized() eth.L2BlockRef {
......@@ -167,6 +187,7 @@ func (eq *EngineQueue) Step(ctx context.Context) error {
}
outOfData := false
if len(eq.safeAttributes) == 0 {
eq.origin = eq.prev.Origin()
if next, err := eq.prev.NextAttributes(ctx, eq.safeHead); err == io.EOF {
outOfData = true
} else if err != nil {
......@@ -191,7 +212,7 @@ func (eq *EngineQueue) Step(ctx context.Context) error {
// and then marks the latest fully derived L2 block from this as finalized,
// or defaults to the current finalized L2 block.
func (eq *EngineQueue) tryFinalizeL2() {
if eq.finalizedL1 == (eth.BlockID{}) {
if eq.finalizedL1 == (eth.L1BlockRef{}) {
return // if no L1 information is finalized yet, then skip this
}
// default to keep the same finalized block
......@@ -200,6 +221,7 @@ func (eq *EngineQueue) tryFinalizeL2() {
for _, fd := range eq.finalityData {
if fd.L2Block.Number > finalizedL2.Number && fd.L1Block.Number <= eq.finalizedL1.Number {
finalizedL2 = fd.L2Block
eq.needForkchoiceUpdate = true
}
}
eq.finalized = finalizedL2
......@@ -214,11 +236,11 @@ func (eq *EngineQueue) postProcessSafeL2() {
eq.finalityData = append(eq.finalityData[:0], eq.finalityData[1:finalityLookback]...)
}
// remember the last L2 block that we fully derived from the given finality data
if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.prev.Origin().Number {
if len(eq.finalityData) == 0 || eq.finalityData[len(eq.finalityData)-1].L1Block.Number < eq.origin.Number {
// append entry for new L1 block
eq.finalityData = append(eq.finalityData, FinalityData{
L2Block: eq.safeHead,
L1Block: eq.prev.Origin().ID(),
L1Block: eq.origin.ID(),
})
} else {
// if it's a now L2 block that was derived from the same latest L1 block, then just update the entry
......@@ -233,7 +255,7 @@ func (eq *EngineQueue) logSyncProgress(reason string) {
"l2_safe", eq.safeHead,
"l2_unsafe", eq.unsafeHead,
"l2_time", eq.unsafeHead.Time,
"l1_derived", eq.prev.Origin(),
"l1_derived", eq.origin,
)
}
......
......@@ -250,7 +250,7 @@ func TestEngineQueue_Finalize(t *testing.T) {
eq.postProcessSafeL2()
// let's finalize D (current L1), from which we fully derived C1 (it was safe head), but not D0 (included in E)
eq.Finalize(refD.ID())
eq.Finalize(refD)
require.Equal(t, refC1, eq.Finalized(), "C1 was included in finalized D, and should now be finalized")
......
......@@ -5,9 +5,10 @@ import (
"fmt"
"io"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum/go-ethereum/log"
)
type Metrics interface {
......@@ -30,13 +31,14 @@ type ResetableStage interface {
}
type EngineQueueStage interface {
FinalizedL1() eth.L1BlockRef
Finalized() eth.L2BlockRef
UnsafeL2Head() eth.L2BlockRef
SafeL2Head() eth.L2BlockRef
Origin() eth.L1BlockRef
SetUnsafeHead(head eth.L2BlockRef)
Finalize(l1Origin eth.BlockID)
Finalize(l1Origin eth.L1BlockRef)
AddSafeAttributes(attributes *eth.PayloadAttributes)
AddUnsafePayload(payload *eth.ExecutionPayload)
Step(context.Context) error
......@@ -97,14 +99,22 @@ func (dp *DerivationPipeline) Reset() {
dp.resetting = 0
}
// Origin is the L1 block of the inner-most stage of the derivation pipeline,
// i.e. the L1 chain up to and including this point included and/or produced all the safe L2 blocks.
func (dp *DerivationPipeline) Origin() eth.L1BlockRef {
return dp.eng.Origin()
}
func (dp *DerivationPipeline) Finalize(l1Origin eth.BlockID) {
func (dp *DerivationPipeline) Finalize(l1Origin eth.L1BlockRef) {
dp.eng.Finalize(l1Origin)
}
// FinalizedL1 is the L1 finalization of the inner-most stage of the derivation pipeline,
// i.e. the L1 chain up to and including this point included and/or produced all the finalized L2 blocks.
func (dp *DerivationPipeline) FinalizedL1() eth.L1BlockRef {
return dp.eng.FinalizedL1()
}
func (dp *DerivationPipeline) Finalized() eth.L2BlockRef {
return dp.eng.Finalized()
}
......
......@@ -39,6 +39,7 @@ type L2Chain interface {
derive.Engine
L2BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L2BlockRef, error)
L2BlockRefByHash(ctx context.Context, l2Hash common.Hash) (eth.L2BlockRef, error)
L2BlockRefByNumber(ctx context.Context, num uint64) (eth.L2BlockRef, error)
}
type DerivationPipeline interface {
......@@ -46,7 +47,8 @@ type DerivationPipeline interface {
Step(ctx context.Context) error
SetUnsafeHead(head eth.L2BlockRef)
AddUnsafePayload(payload *eth.ExecutionPayload)
Finalize(ref eth.BlockID)
Finalize(ref eth.L1BlockRef)
FinalizedL1() eth.L1BlockRef
Finalized() eth.L2BlockRef
SafeL2Head() eth.L2BlockRef
UnsafeL2Head() eth.L2BlockRef
......@@ -92,7 +94,7 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, ne
l1State: l1State,
derivation: derivationPipeline,
idleDerivation: false,
syncStatusReq: make(chan chan eth.SyncStatus, 10),
stateReq: make(chan chan struct{}),
forceReset: make(chan chan struct{}, 10),
config: cfg,
driverConfig: driverCfg,
......
......@@ -30,8 +30,8 @@ type Driver struct {
// When the derivation pipeline is waiting for new data to do anything
idleDerivation bool
// Requests for sync status. Synchronized with event loop to avoid reading an inconsistent sync status.
syncStatusReq chan chan eth.SyncStatus
// Requests for synchronized event loop execution to avoid reading an inconsistent state
stateReq chan chan struct{}
// Upon receiving a channel in this channel, the derivation pipeline is forced to be reset.
// It tells the caller that the reset occurred by closing the passed in channel.
......@@ -300,7 +300,7 @@ func (s *Driver) eventLoop() {
// no step, justified L1 information does not do anything for L2 derivation or status
case newL1Finalized := <-s.l1FinalizedSig:
s.l1State.HandleNewL1FinalizedBlock(newL1Finalized)
s.derivation.Finalize(newL1Finalized.ID())
s.derivation.Finalize(newL1Finalized)
reqStep() // we may be able to mark more L2 data as finalized now
case <-delayedStepReq:
delayedStepReq = nil
......@@ -342,16 +342,8 @@ func (s *Driver) eventLoop() {
stepAttempts = 0
reqStep() // continue with the next step if we can
}
case respCh := <-s.syncStatusReq:
respCh <- eth.SyncStatus{
CurrentL1: s.derivation.Origin(),
HeadL1: s.l1State.L1Head(),
SafeL1: s.l1State.L1Safe(),
FinalizedL1: s.l1State.L1Finalized(),
UnsafeL2: s.derivation.UnsafeL2Head(),
SafeL2: s.derivation.SafeL2Head(),
FinalizedL2: s.derivation.Finalized(),
}
case respCh := <-s.stateReq:
respCh <- struct{}{}
case respCh := <-s.forceReset:
s.log.Warn("Derivation pipeline is manually reset")
s.derivation.Reset()
......@@ -381,18 +373,41 @@ func (s *Driver) ResetDerivationPipeline(ctx context.Context) error {
}
}
func (s *Driver) syncStatus() *eth.SyncStatus {
return &eth.SyncStatus{
CurrentL1: s.derivation.Origin(),
CurrentL1Finalized: s.derivation.FinalizedL1(),
HeadL1: s.l1State.L1Head(),
SafeL1: s.l1State.L1Safe(),
FinalizedL1: s.l1State.L1Finalized(),
UnsafeL2: s.derivation.UnsafeL2Head(),
SafeL2: s.derivation.SafeL2Head(),
FinalizedL2: s.derivation.Finalized(),
}
}
func (s *Driver) SyncStatus(ctx context.Context) (*eth.SyncStatus, error) {
respCh := make(chan eth.SyncStatus, 1)
wait := make(chan struct{})
select {
case s.stateReq <- wait:
resp := s.syncStatus()
<-wait
return resp, nil
case <-ctx.Done():
return nil, ctx.Err()
case s.syncStatusReq <- respCh:
select {
case <-ctx.Done():
return nil, ctx.Err()
case resp := <-respCh:
return &resp, nil
}
}
}
func (s *Driver) BlockRefWithStatus(ctx context.Context, num uint64) (eth.L2BlockRef, *eth.SyncStatus, error) {
wait := make(chan struct{})
select {
case s.stateReq <- wait:
resp := s.syncStatus()
ref, err := s.l2.L2BlockRefByNumber(ctx, num)
<-wait
return ref, resp, err
case <-ctx.Done():
return eth.L2BlockRef{}, nil, ctx.Err()
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment