state.go 18.7 KB
Newer Older
1 2 3
package driver

import (
4
	"bytes"
5
	"context"
6
	"encoding/json"
7
	"errors"
8
	"fmt"
protolambda's avatar
protolambda committed
9
	"io"
10 11 12
	gosync "sync"
	"time"

13
	"github.com/ethereum/go-ethereum/common"
14 15
	"github.com/ethereum/go-ethereum/log"

16
	"github.com/ethereum-optimism/optimism/op-node/rollup"
17
	"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
18
	"github.com/ethereum-optimism/optimism/op-service/eth"
19
	"github.com/ethereum-optimism/optimism/op-service/retry"
20 21
)

22 23
// Deprecated: use eth.SyncStatus instead.
type SyncStatus = eth.SyncStatus
24

25 26 27
// sealingDuration defines the expected time it takes to seal the block
const sealingDuration = time.Millisecond * 50

28 29
type Driver struct {
	l1State L1StateIface
30

protolambda's avatar
protolambda committed
31
	// The derivation pipeline is reset whenever we reorg.
32
	// The derivation pipeline determines the new l2Safe.
protolambda's avatar
protolambda committed
33 34
	derivation DerivationPipeline

35
	// Requests to block the event loop for synchronous execution to avoid reading an inconsistent state
36
	stateReq chan chan struct{}
37

38 39 40 41
	// Upon receiving a channel in this channel, the derivation pipeline is forced to be reset.
	// It tells the caller that the reset occurred by closing the passed in channel.
	forceReset chan chan struct{}

42 43 44
	// Upon receiving a hash in this channel, the sequencer is started at the given hash.
	// It tells the caller that the sequencer started by closing the passed in channel (or returning an error).
	startSequencer chan hashAndErrorChannel
45 46

	// Upon receiving a channel in this channel, the sequencer is stopped.
47 48
	// It tells the caller that the sequencer stopped by returning the latest sequenced L2 block hash.
	stopSequencer chan chan hashAndError
49

50 51 52 53 54
	// Upon receiving a channel in this channel, the current sequencer status is queried.
	// It tells the caller the status by outputting a boolean to the provided channel:
	// true when the sequencer is active, false when it is not.
	sequencerActive chan chan bool

55 56 57
	// sequencerNotifs is notified when the sequencer is started or stopped
	sequencerNotifs SequencerStateListener

protolambda's avatar
protolambda committed
58
	// Rollup config: rollup chain configuration
59
	config *rollup.Config
protolambda's avatar
protolambda committed
60 61

	// Driver config: verifier and sequencer settings
62
	driverConfig *Config
63

64 65 66 67 68 69 70 71 72 73
	// L1 Signals:
	//
	// Not all L1 blocks, or all changes, have to be signalled:
	// the derivation process traverses the chain and handles reorgs as necessary,
	// the driver just needs to be aware of the *latest* signals enough so to not
	// lag behind actionable data.
	l1HeadSig      chan eth.L1BlockRef
	l1SafeSig      chan eth.L1BlockRef
	l1FinalizedSig chan eth.L1BlockRef

74 75
	// Interface to signal the L2 block range to sync.
	altSync AltSync
clabby's avatar
clabby committed
76

77 78
	// L2 Signals:

clabby's avatar
clabby committed
79
	unsafeL2Payloads chan *eth.ExecutionPayload
clabby's avatar
clabby committed
80

81 82 83 84
	l1        L1Chain
	l2        L2Chain
	sequencer SequencerIface
	network   Network // may be nil, network for is optional
85

86
	metrics     Metrics
87 88 89 90 91 92 93
	log         log.Logger
	snapshotLog log.Logger
	done        chan struct{}

	wg gosync.WaitGroup
}

94
// Start starts up the state loop.
95
// The loop will have been started iff err is not nil.
96
func (s *Driver) Start() error {
protolambda's avatar
protolambda committed
97
	s.derivation.Reset()
98

99
	log.Info("Starting driver", "sequencerEnabled", s.driverConfig.SequencerEnabled, "sequencerStopped", s.driverConfig.SequencerStopped)
100 101 102 103 104 105 106 107 108 109 110 111 112
	if s.driverConfig.SequencerEnabled {
		// Notify the initial sequencer state
		// This ensures persistence can write the state correctly and that the state file exists
		var err error
		if s.driverConfig.SequencerStopped {
			err = s.sequencerNotifs.SequencerStopped()
		} else {
			err = s.sequencerNotifs.SequencerStarted()
		}
		if err != nil {
			return fmt.Errorf("persist initial sequencer state: %w", err)
		}
	}
113

114
	s.wg.Add(1)
protolambda's avatar
protolambda committed
115 116
	go s.eventLoop()

117 118 119
	return nil
}

120
func (s *Driver) Close() error {
protolambda's avatar
protolambda committed
121
	s.done <- struct{}{}
122 123 124 125
	s.wg.Wait()
	return nil
}

126 127
// OnL1Head signals the driver that the L1 chain changed the "unsafe" block,
// also known as head of the chain, or "latest".
128
func (s *Driver) OnL1Head(ctx context.Context, unsafe eth.L1BlockRef) error {
129 130 131 132 133 134 135 136 137 138
	select {
	case <-ctx.Done():
		return ctx.Err()
	case s.l1HeadSig <- unsafe:
		return nil
	}
}

// OnL1Safe signals the driver that the L1 chain changed the "safe",
// also known as the justified checkpoint (as seen on L1 beacon-chain).
139
func (s *Driver) OnL1Safe(ctx context.Context, safe eth.L1BlockRef) error {
140 141 142
	select {
	case <-ctx.Done():
		return ctx.Err()
143 144 145 146 147
	case s.l1SafeSig <- safe:
		return nil
	}
}

148
func (s *Driver) OnL1Finalized(ctx context.Context, finalized eth.L1BlockRef) error {
149 150 151 152
	select {
	case <-ctx.Done():
		return ctx.Err()
	case s.l1FinalizedSig <- finalized:
153 154 155 156
		return nil
	}
}

157
func (s *Driver) OnUnsafeL2Payload(ctx context.Context, payload *eth.ExecutionPayload) error {
158 159 160
	select {
	case <-ctx.Done():
		return ctx.Err()
clabby's avatar
clabby committed
161
	case s.unsafeL2Payloads <- payload:
162 163 164 165
		return nil
	}
}

protolambda's avatar
protolambda committed
166
// the eventLoop responds to L1 changes and internal timers to produce L2 blocks.
167
func (s *Driver) eventLoop() {
168 169 170 171 172 173 174 175 176
	defer s.wg.Done()
	s.log.Info("State loop started")

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// stepReqCh is used to request that the driver attempts to step forward by one L1 block.
	stepReqCh := make(chan struct{}, 1)

177 178 179 180
	// channel, nil by default (not firing), but used to schedule re-attempts with delay
	var delayedStepReq <-chan time.Time

	// keep track of consecutive failed attempts, to adjust the backoff time accordingly
181
	bOffStrategy := retry.Exponential()
182 183 184 185
	stepAttempts := 0

	// step requests a derivation step to be taken. Won't deadlock if the channel is full.
	step := func() {
186 187 188 189 190 191 192
		select {
		case stepReqCh <- struct{}{}:
		// Don't deadlock if the channel is already full
		default:
		}
	}

193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
	// reqStep requests a derivation step nicely, with a delay if this is a reattempt, or not at all if we already scheduled a reattempt.
	reqStep := func() {
		if stepAttempts > 0 {
			// if this is not the first attempt, we re-schedule with a backoff, *without blocking other events*
			if delayedStepReq == nil {
				delay := bOffStrategy.Duration(stepAttempts)
				s.log.Debug("scheduling re-attempt with delay", "attempts", stepAttempts, "delay", delay)
				delayedStepReq = time.After(delay)
			} else {
				s.log.Debug("ignoring step request, already scheduled re-attempt after previous failure", "attempts", stepAttempts)
			}
		} else {
			step()
		}
	}

209 210 211 212 213
	// We call reqStep right away to finish syncing to the tip of the chain if we're behind.
	// reqStep will also be triggered when the L1 head moves forward or if there was a reorg on the
	// L1 chain that we need to handle.
	reqStep()

214 215 216
	sequencerTimer := time.NewTimer(0)
	var sequencerCh <-chan time.Time
	planSequencerAction := func() {
217
		delay := s.sequencer.PlanNextSequencerAction()
218 219 220 221 222 223 224
		sequencerCh = sequencerTimer.C
		if len(sequencerCh) > 0 { // empty if not already drained before resetting
			<-sequencerCh
		}
		sequencerTimer.Reset(delay)
	}

225 226
	// Create a ticker to check if there is a gap in the engine queue. Whenever
	// there is, we send requests to sync source to retrieve the missing payloads.
227 228
	syncCheckInterval := time.Duration(s.config.BlockTime) * time.Second * 2
	altSyncTicker := time.NewTicker(syncCheckInterval)
clabby's avatar
clabby committed
229
	defer altSyncTicker.Stop()
230
	lastUnsafeL2 := s.derivation.UnsafeL2Head()
clabby's avatar
clabby committed
231

232
	for {
233
		// If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action.
234
		// This may adjust at any time based on fork-choice changes or previous errors.
235 236 237
		// And avoid sequencing if the derivation pipeline indicates the engine is not ready.
		if s.driverConfig.SequencerEnabled && !s.driverConfig.SequencerStopped &&
			s.l1State.L1Head() != (eth.L1BlockRef{}) && s.derivation.EngineReady() {
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
			if s.driverConfig.SequencerMaxSafeLag > 0 && s.derivation.SafeL2Head().Number+s.driverConfig.SequencerMaxSafeLag <= s.derivation.UnsafeL2Head().Number {
				// If the safe head has fallen behind by a significant number of blocks, delay creating new blocks
				// until the safe lag is below SequencerMaxSafeLag.
				if sequencerCh != nil {
					s.log.Warn(
						"Delay creating new block since safe lag exceeds limit",
						"safe_l2", s.derivation.SafeL2Head(),
						"unsafe_l2", s.derivation.UnsafeL2Head(),
					)
					sequencerCh = nil
				}
			} else if s.sequencer.BuildingOnto().ID() != s.derivation.UnsafeL2Head().ID() {
				// If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action.
				// This may adjust at any time based on fork-choice changes or previous errors.
				//
				// update sequencer time if the head changed
254
				planSequencerAction()
255
			}
256 257 258
		} else {
			sequencerCh = nil
		}
259

260 261 262 263 264 265 266
		// If the engine is not ready, or if the L2 head is actively changing, then reset the alt-sync:
		// there is no need to request L2 blocks when we are syncing already.
		if head := s.derivation.UnsafeL2Head(); head != lastUnsafeL2 || !s.derivation.EngineReady() {
			lastUnsafeL2 = head
			altSyncTicker.Reset(syncCheckInterval)
		}

267 268
		select {
		case <-sequencerCh:
269 270 271 272 273
			payload, err := s.sequencer.RunNextSequencerAction(ctx)
			if err != nil {
				s.log.Error("Sequencer critical error", "err", err)
				return
			}
274 275 276 277 278 279 280
			if s.network != nil && payload != nil {
				// Publishing of unsafe data via p2p is optional.
				// Errors are not severe enough to change/halt sequencing but should be logged and metered.
				if err := s.network.PublishL2Payload(ctx, payload); err != nil {
					s.log.Warn("failed to publish newly created block", "id", payload.ID(), "err", err)
					s.metrics.RecordPublishingError()
				}
281 282
			}
			planSequencerAction() // schedule the next sequencer action to keep the sequencing looping
clabby's avatar
clabby committed
283
		case <-altSyncTicker.C:
284 285 286 287 288 289
			// Check if there is a gap in the current unsafe payload queue.
			ctx, cancel := context.WithTimeout(ctx, time.Second*2)
			err := s.checkForGapInUnsafeQueue(ctx)
			cancel()
			if err != nil {
				s.log.Warn("failed to check for unsafe L2 blocks to sync", "err", err)
clabby's avatar
clabby committed
290 291
			}
		case payload := <-s.unsafeL2Payloads:
protolambda's avatar
protolambda committed
292 293 294
			s.snapshot("New unsafe payload")
			s.log.Info("Optimistically queueing unsafe L2 execution payload", "id", payload.ID())
			s.derivation.AddUnsafePayload(payload)
295
			s.metrics.RecordReceivedUnsafePayload(payload)
protolambda's avatar
protolambda committed
296
			reqStep()
297

298
		case newL1Head := <-s.l1HeadSig:
299
			s.l1State.HandleNewL1HeadBlock(newL1Head)
protolambda's avatar
protolambda committed
300
			reqStep() // a new L1 head may mean we have the data to not get an EOF again.
301
		case newL1Safe := <-s.l1SafeSig:
302
			s.l1State.HandleNewL1SafeBlock(newL1Safe)
303 304
			// no step, justified L1 information does not do anything for L2 derivation or status
		case newL1Finalized := <-s.l1FinalizedSig:
305
			s.l1State.HandleNewL1FinalizedBlock(newL1Finalized)
306
			s.derivation.Finalize(newL1Finalized)
307
			reqStep() // we may be able to mark more L2 data as finalized now
308 309 310
		case <-delayedStepReq:
			delayedStepReq = nil
			step()
311
		case <-stepReqCh:
312
			s.metrics.SetDerivationIdle(false)
313
			s.log.Debug("Derivation process step", "onto_origin", s.derivation.Origin(), "attempts", stepAttempts)
314
			err := s.derivation.Step(context.Background())
315
			stepAttempts += 1 // count as attempt by default. We reset to 0 if we are making healthy progress.
Tei Im's avatar
Tei Im committed
316
			if err == io.EOF {
317
				s.log.Debug("Derivation process went idle", "progress", s.derivation.Origin(), "err", err)
318
				stepAttempts = 0
319
				s.metrics.SetDerivationIdle(true)
protolambda's avatar
protolambda committed
320
				continue
Tei Im's avatar
Tei Im committed
321 322 323 324 325
			} else if err != nil && errors.Is(err, derive.EngineP2PSyncing) {
				s.log.Debug("Derivation process went idle because the engine is syncing", "progress", s.derivation.Origin(), "sync_target", s.derivation.EngineSyncTarget(), "err", err)
				stepAttempts = 0
				s.metrics.SetDerivationIdle(true)
				continue
326
			} else if err != nil && errors.Is(err, derive.ErrReset) {
protolambda's avatar
protolambda committed
327 328 329
				// If the pipeline corrupts, e.g. due to a reorg, simply reset it
				s.log.Warn("Derivation pipeline is reset", "err", err)
				s.derivation.Reset()
330
				s.metrics.RecordPipelineReset()
331 332 333 334 335 336 337 338
				continue
			} else if err != nil && errors.Is(err, derive.ErrTemporary) {
				s.log.Warn("Derivation process temporary error", "attempts", stepAttempts, "err", err)
				reqStep()
				continue
			} else if err != nil && errors.Is(err, derive.ErrCritical) {
				s.log.Error("Derivation process critical error", "err", err)
				return
339 340 341 342
			} else if err != nil && errors.Is(err, derive.NotEnoughData) {
				stepAttempts = 0 // don't do a backoff for this error
				reqStep()
				continue
343 344 345 346
			} else if err != nil {
				s.log.Error("Derivation process error", "attempts", stepAttempts, "err", err)
				reqStep()
				continue
protolambda's avatar
protolambda committed
347
			} else {
348
				stepAttempts = 0
protolambda's avatar
protolambda committed
349
				reqStep() // continue with the next step if we can
350
			}
351 352
		case respCh := <-s.stateReq:
			respCh <- struct{}{}
353 354 355 356 357
		case respCh := <-s.forceReset:
			s.log.Warn("Derivation pipeline is manually reset")
			s.derivation.Reset()
			s.metrics.RecordPipelineReset()
			close(respCh)
358 359
		case resp := <-s.startSequencer:
			unsafeHead := s.derivation.UnsafeL2Head().Hash
360
			if !s.driverConfig.SequencerStopped {
361 362 363 364
				resp.err <- errors.New("sequencer already running")
			} else if !bytes.Equal(unsafeHead[:], resp.hash[:]) {
				resp.err <- fmt.Errorf("block hash does not match: head %s, received %s", unsafeHead.String(), resp.hash.String())
			} else {
365 366 367 368
				if err := s.sequencerNotifs.SequencerStarted(); err != nil {
					resp.err <- fmt.Errorf("sequencer start notification: %w", err)
					continue
				}
369
				s.log.Info("Sequencer has been started")
370
				s.driverConfig.SequencerStopped = false
371
				close(resp.err)
372
				planSequencerAction() // resume sequencing
373
			}
374
		case respCh := <-s.stopSequencer:
375
			if s.driverConfig.SequencerStopped {
376 377
				respCh <- hashAndError{err: errors.New("sequencer not running")}
			} else {
378 379 380 381
				if err := s.sequencerNotifs.SequencerStopped(); err != nil {
					respCh <- hashAndError{err: fmt.Errorf("sequencer start notification: %w", err)}
					continue
				}
382
				s.log.Warn("Sequencer has been stopped")
383
				s.driverConfig.SequencerStopped = true
384 385
				respCh <- hashAndError{hash: s.derivation.UnsafeL2Head().Hash}
			}
386 387
		case respCh := <-s.sequencerActive:
			respCh <- !s.driverConfig.SequencerStopped
388 389 390 391 392 393
		case <-s.done:
			return
		}
	}
}

394 395 396
// ResetDerivationPipeline forces a reset of the derivation pipeline.
// It waits for the reset to occur. It simply unblocks the caller rather
// than fully cancelling the reset request upon a context cancellation.
397
func (s *Driver) ResetDerivationPipeline(ctx context.Context) error {
398
	respCh := make(chan struct{}, 1)
399 400 401 402 403 404 405 406 407 408 409 410 411
	select {
	case <-ctx.Done():
		return ctx.Err()
	case s.forceReset <- respCh:
		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-respCh:
			return nil
		}
	}
}

412
func (s *Driver) StartSequencer(ctx context.Context, blockHash common.Hash) error {
413 414 415
	if !s.driverConfig.SequencerEnabled {
		return errors.New("sequencer is not enabled")
	}
416 417 418 419
	h := hashAndErrorChannel{
		hash: blockHash,
		err:  make(chan error, 1),
	}
420 421 422
	select {
	case <-ctx.Done():
		return ctx.Err()
423
	case s.startSequencer <- h:
424 425 426
		select {
		case <-ctx.Done():
			return ctx.Err()
427 428
		case e := <-h.err:
			return e
429 430 431 432
		}
	}
}

433
func (s *Driver) StopSequencer(ctx context.Context) (common.Hash, error) {
434 435 436
	if !s.driverConfig.SequencerEnabled {
		return common.Hash{}, errors.New("sequencer is not enabled")
	}
437
	respCh := make(chan hashAndError, 1)
438 439
	select {
	case <-ctx.Done():
440
		return common.Hash{}, ctx.Err()
441 442 443
	case s.stopSequencer <- respCh:
		select {
		case <-ctx.Done():
444 445 446
			return common.Hash{}, ctx.Err()
		case he := <-respCh:
			return he.hash, he.err
447 448 449 450
		}
	}
}

451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468
func (s *Driver) SequencerActive(ctx context.Context) (bool, error) {
	if !s.driverConfig.SequencerEnabled {
		return false, nil
	}
	respCh := make(chan bool, 1)
	select {
	case <-ctx.Done():
		return false, ctx.Err()
	case s.sequencerActive <- respCh:
		select {
		case <-ctx.Done():
			return false, ctx.Err()
		case active := <-respCh:
			return active, nil
		}
	}
}

469 470
// syncStatus returns the current sync status, and should only be called synchronously with
// the driver event loop to avoid retrieval of an inconsistent status.
471 472 473 474 475 476 477 478 479 480
func (s *Driver) syncStatus() *eth.SyncStatus {
	return &eth.SyncStatus{
		CurrentL1:          s.derivation.Origin(),
		CurrentL1Finalized: s.derivation.FinalizedL1(),
		HeadL1:             s.l1State.L1Head(),
		SafeL1:             s.l1State.L1Safe(),
		FinalizedL1:        s.l1State.L1Finalized(),
		UnsafeL2:           s.derivation.UnsafeL2Head(),
		SafeL2:             s.derivation.SafeL2Head(),
		FinalizedL2:        s.derivation.Finalized(),
481
		UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(),
482
		EngineSyncTarget:   s.derivation.EngineSyncTarget(),
483 484 485
	}
}

486 487
// SyncStatus blocks the driver event loop and captures the syncing status.
// If the event loop is too busy and the context expires, a context error is returned.
488
func (s *Driver) SyncStatus(ctx context.Context) (*eth.SyncStatus, error) {
489
	wait := make(chan struct{})
490
	select {
491 492 493 494
	case s.stateReq <- wait:
		resp := s.syncStatus()
		<-wait
		return resp, nil
495 496
	case <-ctx.Done():
		return nil, ctx.Err()
497 498 499
	}
}

500 501 502
// BlockRefWithStatus blocks the driver event loop and captures the syncing status,
// along with an L2 block reference by number consistent with that same status.
// If the event loop is too busy and the context expires, a context error is returned.
503 504 505 506 507 508 509 510 511 512
func (s *Driver) BlockRefWithStatus(ctx context.Context, num uint64) (eth.L2BlockRef, *eth.SyncStatus, error) {
	wait := make(chan struct{})
	select {
	case s.stateReq <- wait:
		resp := s.syncStatus()
		ref, err := s.l2.L2BlockRefByNumber(ctx, num)
		<-wait
		return ref, resp, err
	case <-ctx.Done():
		return eth.L2BlockRef{}, nil, ctx.Err()
513 514 515
	}
}

516 517 518 519 520 521 522 523 524 525
// deferJSONString helps avoid a JSON-encoding performance hit if the snapshot logger does not run
type deferJSONString struct {
	x any
}

func (v deferJSONString) String() string {
	out, _ := json.Marshal(v.x)
	return string(out)
}

526
func (s *Driver) snapshot(event string) {
527 528
	s.snapshotLog.Info("Rollup State Snapshot",
		"event", event,
529
		"l1Head", deferJSONString{s.l1State.L1Head()},
530
		"l1Current", deferJSONString{s.derivation.Origin()},
531 532 533
		"l2Head", deferJSONString{s.derivation.UnsafeL2Head()},
		"l2Safe", deferJSONString{s.derivation.SafeL2Head()},
		"l2FinalizedHead", deferJSONString{s.derivation.Finalized()})
534
}
535 536 537 538 539 540 541 542 543 544

type hashAndError struct {
	hash common.Hash
	err  error
}

type hashAndErrorChannel struct {
	hash common.Hash
	err  chan error
}
545

546 547 548 549
// checkForGapInUnsafeQueue checks if there is a gap in the unsafe queue and attempts to retrieve the missing payloads from an alt-sync method.
// WARNING: This is only an outgoing signal, the blocks are not guaranteed to be retrieved.
// Results are received through OnUnsafeL2Payload.
func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) error {
550 551 552 553 554 555 556 557
	start := s.derivation.UnsafeL2Head()
	end := s.derivation.UnsafeL2SyncTarget()
	// Check if we have missing blocks between the start and end. Request them if we do.
	if end == (eth.L2BlockRef{}) {
		s.log.Debug("requesting sync with open-end range", "start", start)
		return s.altSync.RequestL2Range(ctx, start, eth.L2BlockRef{})
	} else if end.Number > start.Number+1 {
		s.log.Debug("requesting missing unsafe L2 block range", "start", start, "end", end, "size", end.Number-start.Number)
558
		return s.altSync.RequestL2Range(ctx, start, end)
559
	}
560
	return nil
561
}