node.go 13 KB
Newer Older
1 2 3 4 5
package syncnode

import (
	"context"
	"errors"
6
	"fmt"
7 8 9 10 11 12 13 14 15 16 17
	"io"
	"strings"
	"sync"
	"time"

	"github.com/ethereum-optimism/optimism/op-service/rpc"
	gethrpc "github.com/ethereum/go-ethereum/rpc"

	"github.com/ethereum/go-ethereum"
	"github.com/ethereum/go-ethereum/log"

18
	"github.com/ethereum-optimism/optimism/op-node/rollup/event"
19
	"github.com/ethereum-optimism/optimism/op-service/eth"
20
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/superevents"
21 22 23 24 25
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
	gethevent "github.com/ethereum/go-ethereum/event"
)

type backend interface {
26 27 28 29
	LocalSafe(ctx context.Context, chainID eth.ChainID) (pair types.DerivedIDPair, err error)
	LocalUnsafe(ctx context.Context, chainID eth.ChainID) (eth.BlockID, error)
	SafeDerivedAt(ctx context.Context, chainID eth.ChainID, derivedFrom eth.BlockID) (derived eth.BlockID, err error)
	Finalized(ctx context.Context, chainID eth.ChainID) (eth.BlockID, error)
30 31 32 33
	L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error)
}

const (
34 35 36 37 38
	internalTimeout            = time.Second * 30
	nodeTimeout                = time.Second * 10
	maxWalkBackAttempts        = 300
	blockNotFoundRPCErrCode    = -39001
	conflictingBlockRPCErrCode = -39002
39 40 41 42 43
)

type ManagedNode struct {
	log     log.Logger
	Node    SyncControl
44
	chainID eth.ChainID
45 46 47

	backend backend

48 49
	// When the node has an update for us
	// Nil when node events are pulled synchronously.
50 51 52 53
	nodeEvents chan *types.ManagedEvent

	subscriptions []gethevent.Subscription

54 55
	emitter event.Emitter

56 57 58 59 60
	ctx    context.Context
	cancel context.CancelFunc
	wg     sync.WaitGroup
}

61 62 63
var _ event.AttachEmitter = (*ManagedNode)(nil)
var _ event.Deriver = (*ManagedNode)(nil)

64
func NewManagedNode(log log.Logger, id eth.ChainID, node SyncControl, backend backend, noSubscribe bool) *ManagedNode {
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
	ctx, cancel := context.WithCancel(context.Background())
	m := &ManagedNode{
		log:     log.New("chain", id),
		backend: backend,
		Node:    node,
		chainID: id,
		ctx:     ctx,
		cancel:  cancel,
	}
	if !noSubscribe {
		m.SubscribeToNodeEvents()
	}
	m.WatchSubscriptionErrors()
	return m
}

81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
func (m *ManagedNode) AttachEmitter(em event.Emitter) {
	m.emitter = em
}

func (m *ManagedNode) OnEvent(ev event.Event) bool {
	switch x := ev.(type) {
	case superevents.CrossUnsafeUpdateEvent:
		if x.ChainID != m.chainID {
			return false
		}
		m.onCrossUnsafeUpdate(x.NewCrossUnsafe)
	case superevents.CrossSafeUpdateEvent:
		if x.ChainID != m.chainID {
			return false
		}
		m.onCrossSafeUpdate(x.NewCrossSafe)
	case superevents.FinalizedL2UpdateEvent:
		if x.ChainID != m.chainID {
			return false
		}
		m.onFinalizedL2(x.FinalizedL2)
	case superevents.LocalSafeOutOfSyncEvent:
		if x.ChainID != m.chainID {
			return false
		}
		m.resetSignal(x.Err, x.L1Ref)
	// TODO: watch for reorg events from DB. Send a reset signal to op-node if needed
	default:
		return false
110
	}
111
	return true
112 113 114 115 116 117 118 119
}

func (m *ManagedNode) SubscribeToNodeEvents() {
	m.nodeEvents = make(chan *types.ManagedEvent, 10)

	// Resubscribe, since the RPC subscription might fail intermittently.
	// And fall back to polling, if RPC subscriptions are not supported.
	m.subscriptions = append(m.subscriptions, gethevent.ResubscribeErr(time.Second*10,
120 121 122 123 124
		func(ctx context.Context, prevErr error) (gethevent.Subscription, error) {
			if prevErr != nil {
				// This is the RPC runtime error, not the setup error we have logging for below.
				m.log.Error("RPC subscription failed, restarting now", "err", prevErr)
			}
125 126 127
			sub, err := m.Node.SubscribeEvents(ctx, m.nodeEvents)
			if err != nil {
				if errors.Is(err, gethrpc.ErrNotificationsUnsupported) {
128
					m.log.Warn("No RPC notification support detected, falling back to polling")
129
					// fallback to polling if subscriptions are not supported.
130
					sub, err := rpc.StreamFallback[types.ManagedEvent](
131
						m.Node.PullEvent, time.Millisecond*100, m.nodeEvents)
132 133 134 135 136
					if err != nil {
						m.log.Error("Failed to start RPC stream fallback", "err", err)
						return nil, err
					}
					return sub, err
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
				}
				return nil, err
			}
			return sub, nil
		}))
}

func (m *ManagedNode) WatchSubscriptionErrors() {
	watchSub := func(sub ethereum.Subscription) {
		defer m.wg.Done()
		select {
		case err := <-sub.Err():
			m.log.Error("Subscription error", "err", err)
		case <-m.ctx.Done():
			// we're closing, stop watching the subscription
		}
	}
	for _, sub := range m.subscriptions {
		m.wg.Add(1)
		go watchSub(sub)
	}
}

func (m *ManagedNode) Start() {
	m.wg.Add(1)
	go func() {
		defer m.wg.Done()

		for {
			select {
			case <-m.ctx.Done():
				m.log.Info("Exiting node syncing")
				return
170
			case ev := <-m.nodeEvents: // nil, indefinitely blocking, if no node-events subscriber is set up.
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
				m.onNodeEvent(ev)
			}
		}
	}()
}

// PullEvents pulls all events, until there are none left,
// the ctx is canceled, or an error upon event-pulling occurs.
func (m *ManagedNode) PullEvents(ctx context.Context) (pulledAny bool, err error) {
	for {
		ev, err := m.Node.PullEvent(ctx)
		if err != nil {
			if errors.Is(err, io.EOF) {
				// no events left
				return pulledAny, nil
			}
			return pulledAny, err
		}
		pulledAny = true
		m.onNodeEvent(ev)
	}
}

func (m *ManagedNode) onNodeEvent(ev *types.ManagedEvent) {
	if ev == nil {
		m.log.Warn("Received nil event")
		return
	}
	if ev.Reset != nil {
		m.onResetEvent(*ev.Reset)
	}
	if ev.UnsafeBlock != nil {
		m.onUnsafeBlock(*ev.UnsafeBlock)
	}
	if ev.DerivationUpdate != nil {
		m.onDerivationUpdate(*ev.DerivationUpdate)
	}
	if ev.ExhaustL1 != nil {
		m.onExhaustL1Event(*ev.ExhaustL1)
	}
}

func (m *ManagedNode) onResetEvent(errStr string) {
	m.log.Warn("Node sent us a reset error", "err", errStr)
	if strings.Contains(errStr, "cannot continue derivation until Engine has been reset") {
		// TODO
		return
	}
	// Try and restore the safe head of the op-supervisor.
	// The node will abort the reset until we find a block that is known.
	m.resetSignal(types.ErrFuture, eth.L1BlockRef{})
}

func (m *ManagedNode) onCrossUnsafeUpdate(seal types.BlockSeal) {
	m.log.Debug("updating cross unsafe", "crossUnsafe", seal)
	ctx, cancel := context.WithTimeout(m.ctx, nodeTimeout)
	defer cancel()
	id := seal.ID()
	err := m.Node.UpdateCrossUnsafe(ctx, id)
	if err != nil {
		m.log.Warn("Node failed cross-unsafe updating", "err", err)
		return
	}
}

func (m *ManagedNode) onCrossSafeUpdate(pair types.DerivedBlockSealPair) {
	m.log.Debug("updating cross safe", "derived", pair.Derived, "derivedFrom", pair.DerivedFrom)
	ctx, cancel := context.WithTimeout(m.ctx, nodeTimeout)
	defer cancel()
	pairIDs := pair.IDs()
	err := m.Node.UpdateCrossSafe(ctx, pairIDs.Derived, pairIDs.DerivedFrom)
	if err != nil {
		m.log.Warn("Node failed cross-safe updating", "err", err)
		return
	}
}

func (m *ManagedNode) onFinalizedL2(seal types.BlockSeal) {
249
	m.log.Info("updating finalized L2", "finalized", seal)
250 251 252 253 254 255 256 257 258 259 260 261
	ctx, cancel := context.WithTimeout(m.ctx, nodeTimeout)
	defer cancel()
	id := seal.ID()
	err := m.Node.UpdateFinalized(ctx, id)
	if err != nil {
		m.log.Warn("Node failed finality updating", "err", err)
		return
	}
}

func (m *ManagedNode) onUnsafeBlock(unsafeRef eth.BlockRef) {
	m.log.Info("Node has new unsafe block", "unsafeBlock", unsafeRef)
262 263 264 265
	m.emitter.Emit(superevents.LocalUnsafeReceivedEvent{
		ChainID:        m.chainID,
		NewLocalUnsafe: unsafeRef,
	})
266 267 268 269 270
}

func (m *ManagedNode) onDerivationUpdate(pair types.DerivedBlockRefPair) {
	m.log.Info("Node derived new block", "derived", pair.Derived,
		"derivedParent", pair.Derived.ParentID(), "derivedFrom", pair.DerivedFrom)
271 272 273 274 275 276 277
	m.emitter.Emit(superevents.LocalDerivedEvent{
		ChainID: m.chainID,
		Derived: pair,
	})
	// TODO: keep synchronous local-safe DB update feedback?
	// We'll still need more async ways of doing this for reorg handling.

278 279 280
	// ctx, cancel := context.WithTimeout(m.ctx, internalTimeout)
	// defer cancel()
	// if err := m.backend.UpdateLocalSafe(ctx, m.chainID, pair.DerivedFrom, pair.Derived); err != nil {
281 282 283
	//	m.log.Warn("Backend failed to process local-safe update",
	//		"derived", pair.Derived, "derivedFrom", pair.DerivedFrom, "err", err)
	//	m.resetSignal(err, pair.DerivedFrom)
284
	// }
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
}

func (m *ManagedNode) resetSignal(errSignal error, l1Ref eth.BlockRef) {
	// if conflict error -> send reset to drop
	// if future error -> send reset to rewind
	// if out of order -> warn, just old data
	ctx, cancel := context.WithTimeout(m.ctx, internalTimeout)
	defer cancel()
	u, err := m.backend.LocalUnsafe(ctx, m.chainID)
	if err != nil {
		m.log.Warn("Failed to retrieve local-unsafe", "err", err)
		return
	}
	f, err := m.backend.Finalized(ctx, m.chainID)
	if err != nil {
		m.log.Warn("Failed to retrieve finalized", "err", err)
		return
	}

	// fix finalized to point to a L2 block that the L2 node knows about
	// Conceptually: track the last known block by the node (based on unsafe block updates), as upper bound for resets.
	// Then when reset fails, lower the last known block
	// (and prevent it from changing by subscription, until success with reset), and rinse and repeat.

	// TODO: this is very very broken

	// TODO: errors.As switch
	switch errSignal {
	case types.ErrConflict:
314 315
		if err := m.resolveConflict(ctx, l1Ref, u, f); err != nil {
			m.log.Warn("Failed to resolve conflict", "unsafe", u, "finalized", f)
316 317
			return
		}
318

319 320 321 322 323
	case types.ErrFuture:
		s, err := m.backend.LocalSafe(ctx, m.chainID)
		if err != nil {
			m.log.Warn("Failed to retrieve local-safe", "err", err)
		}
324
		m.log.Debug("Node detected future block, resetting", "unsafe", u, "safe", s, "finalized", f)
325 326 327 328 329 330 331 332 333
		err = m.Node.Reset(ctx, u, s.Derived, f)
		if err != nil {
			m.log.Warn("Node failed to reset", "err", err)
		}
	case types.ErrOutOfOrder:
		m.log.Warn("Node detected out of order block", "unsafe", u, "finalized", f)
	}
}

334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
// resolveConflict attempts to reset the node to a valid state when a conflict is detected.
// It first tries using the latest safe block, and if that fails, walks back block by block
// until it finds a common ancestor or reaches the finalized block.
func (m *ManagedNode) resolveConflict(ctx context.Context, l1Ref eth.BlockRef, u eth.BlockID, f eth.BlockID) error {
	// First try to reset to the last known safe block
	s, err := m.backend.SafeDerivedAt(ctx, m.chainID, l1Ref.ID())
	if err != nil {
		return fmt.Errorf("failed to retrieve safe block for %v: %w", l1Ref.ID(), err)
	}

	// Helper to attempt a reset and classify the error
	tryReset := func(safe eth.BlockID) (resolved bool, needsWalkback bool, err error) {
		m.log.Debug("Attempting reset", "unsafe", u, "safe", safe, "finalized", f)
		if err := m.Node.Reset(ctx, u, safe, f); err == nil {
			return true, false, nil
		} else {
			var rpcErr *gethrpc.JsonError
			if errors.As(err, &rpcErr) && (rpcErr.Code == blockNotFoundRPCErrCode || rpcErr.Code == conflictingBlockRPCErrCode) {
				return false, true, err
			}
			return false, false, err
		}
	}

	// Try initial reset
	resolved, needsWalkback, err := tryReset(s)
	if resolved {
		return nil
	}
	if !needsWalkback {
		return fmt.Errorf("error during reset: %w", err)
	}

	// Walk back one block at a time looking for a common ancestor
	currentBlock := s.Number
	for i := 0; i < maxWalkBackAttempts; i++ {
		currentBlock--
		if currentBlock <= f.Number {
			return fmt.Errorf("reached finalized block %d without finding common ancestor", f.Number)
		}

		safe, err := m.backend.SafeDerivedAt(ctx, m.chainID, eth.BlockID{Number: currentBlock})
		if err != nil {
			return fmt.Errorf("failed to retrieve safe block %d: %w", currentBlock, err)
		}

		resolved, _, err := tryReset(safe)
		if resolved {
			return nil
		}
		// Continue walking back on walkable errors, otherwise return the error
		var rpcErr *gethrpc.JsonError
		if !errors.As(err, &rpcErr) || (rpcErr.Code != blockNotFoundRPCErrCode && rpcErr.Code != conflictingBlockRPCErrCode) {
			return fmt.Errorf("error during reset at block %d: %w", currentBlock, err)
		}
	}
	return fmt.Errorf("exceeded maximum walk-back attempts (%d)", maxWalkBackAttempts)
}

393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428
func (m *ManagedNode) onExhaustL1Event(completed types.DerivedBlockRefPair) {
	m.log.Info("Node completed syncing", "l2", completed.Derived, "l1", completed.DerivedFrom)

	internalCtx, cancel := context.WithTimeout(m.ctx, internalTimeout)
	defer cancel()
	nextL1, err := m.backend.L1BlockRefByNumber(internalCtx, completed.DerivedFrom.Number+1)
	if err != nil {
		if errors.Is(err, ethereum.NotFound) {
			m.log.Debug("Next L1 block is not yet available", "l1Block", completed.DerivedFrom, "err", err)
			return
		}
		m.log.Error("Failed to retrieve next L1 block for node", "l1Block", completed.DerivedFrom, "err", err)
		return
	}

	nodeCtx, cancel := context.WithTimeout(m.ctx, nodeTimeout)
	defer cancel()
	if err := m.Node.ProvideL1(nodeCtx, nextL1); err != nil {
		m.log.Warn("Failed to provide next L1 block to node", "err", err)
		// We will reset the node if we receive a reset-event from it,
		// which is fired if the provided L1 block was received successfully,
		// but does not fit on the derivation state.
		return
	}
}

func (m *ManagedNode) Close() error {
	m.cancel()
	m.wg.Wait() // wait for work to complete

	// Now close all subscriptions, since we don't use them anymore.
	for _, sub := range m.subscriptions {
		sub.Unsubscribe()
	}
	return nil
}