Commit 13fe565a authored by Diederik Loerakker's avatar Diederik Loerakker Committed by GitHub

Bedrock: fix new batcher, use sync status api, start from safe head (#2897)

* op-node: syncstatus RPC endpoint

* op-proposer: extend RPC client bindings with sync status and version endpoint

* op-batcher: batch submitting starting from safe head, up to unsafe head

* fix output frame size

* bedrock: fix import style, add SyncStatus doc comments
parent ec3fc335
......@@ -15,6 +15,7 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/sequencer"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-proposer/rollupclient"
"github.com/ethereum-optimism/optimism/op-proposer/txmgr"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
......@@ -23,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
hdwallet "github.com/miguelmota/go-ethereum-hdwallet"
"github.com/urfave/cli"
)
......@@ -100,7 +102,7 @@ type BatchSubmitter struct {
ctx context.Context
cancel context.CancelFunc
l2HeadNumber uint64
lastSubmittedBlock eth.BlockID
ch *derive.ChannelOut
}
......@@ -143,6 +145,11 @@ func NewBatchSubmitter(cfg Config, l log.Logger) (*BatchSubmitter, error) {
return nil, err
}
rollupClient, err := dialRollupClientWithTimeout(ctx, cfg.RollupRpc)
if err != nil {
return nil, err
}
chainID, err := l1Client.ChainID(ctx)
if err != nil {
return nil, err
......@@ -162,6 +169,7 @@ func NewBatchSubmitter(cfg Config, l log.Logger) (*BatchSubmitter, error) {
Name: "Batch Submitter",
L1Client: l1Client,
L2Client: l2Client,
RollupNode: rollupClient,
MinL1TxSize: cfg.MinL1TxSize,
MaxL1TxSize: cfg.MaxL1TxSize,
BatchInboxAddress: batchInboxAddress,
......@@ -209,25 +217,30 @@ mainLoop:
// Do the simplest thing of one channel per range of blocks since the iteration of this loop.
// The channel is closed at the end of this loop (to avoid lifecycle management of the channel).
ctx, cancel := context.WithTimeout(l.ctx, time.Second*10)
head, err := l.cfg.L2Client.BlockByNumber(ctx, nil)
syncStatus, err := l.cfg.RollupNode.SyncStatus(ctx)
cancel()
if err != nil {
l.log.Error("issue fetching L2 head", "err", err)
continue
}
l.log.Info("Got new L2 Block", "block", head.Number())
if head.NumberU64() <= l.l2HeadNumber {
// Didn't advance
l.log.Trace("Old block")
l.log.Info("Got new L2 sync status", "safe_head", syncStatus.SafeL2, "unsafe_head", syncStatus.UnsafeL2, "last_submitted", l.lastSubmittedBlock)
if syncStatus.SafeL2.Number >= syncStatus.UnsafeL2.Number {
l.log.Trace("No unsubmitted blocks from sequencer")
continue
}
// the lastSubmittedBlock may be zeroed, or just lag behind. If it's lagging behind, catch it up.
if l.lastSubmittedBlock.Number < syncStatus.SafeL2.Number {
l.log.Warn("last submitted block lagged behind L2 safe head: batch submission will continue from the safe head now", "last", l.lastSubmittedBlock, "safe", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
}
if ch, err := derive.NewChannelOut(uint64(time.Now().Unix())); err != nil {
l.log.Error("Error creating channel", "err", err)
continue
} else {
l.ch = ch
}
for i := l.l2HeadNumber + 1; i <= head.NumberU64(); i++ {
prevID := l.lastSubmittedBlock
for i := l.lastSubmittedBlock.Number + 1; i <= syncStatus.UnsafeL2.Number; i++ {
ctx, cancel := context.WithTimeout(l.ctx, time.Second*10)
block, err := l.cfg.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(i))
cancel()
......@@ -235,15 +248,18 @@ mainLoop:
l.log.Error("issue fetching L2 block", "err", err)
continue mainLoop
}
if block.ParentHash() != prevID.Hash {
l.log.Error("detected a reorg in L2 chain vs previous submitted information, resetting to safe head now", "safe_head", syncStatus.SafeL2)
l.lastSubmittedBlock = syncStatus.SafeL2.ID()
continue mainLoop
}
if err := l.ch.AddBlock(block); err != nil {
l.log.Error("issue adding L2 Block to the channel", "err", err, "channel_id", l.ch.ID())
continue mainLoop
}
l.log.Info("added L2 block to channel", "block", eth.BlockID{Hash: block.Hash(), Number: block.NumberU64()}, "channel_id", l.ch.ID(), "tx_count", len(block.Transactions()), "time", block.Time())
prevID = eth.BlockID{Hash: block.Hash(), Number: block.NumberU64()}
l.log.Info("added L2 block to channel", "block", prevID, "channel_id", l.ch.ID(), "tx_count", len(block.Transactions()), "time", block.Time())
}
// TODO: above there are ugly "continue mainLoop" because we shouldn't progress if we're missing blocks, since the submitter logic can't handle gaps yet.
l.l2HeadNumber = head.NumberU64()
if err := l.ch.Close(); err != nil {
l.log.Error("issue getting adding L2 Block", "err", err)
continue
......@@ -254,7 +270,8 @@ mainLoop:
data := new(bytes.Buffer)
data.WriteByte(derive.DerivationVersion0)
done := false
if err := l.ch.OutputFrame(data, l.cfg.MaxL1TxSize); err == io.EOF {
// subtract one, to account for the version byte
if err := l.ch.OutputFrame(data, l.cfg.MaxL1TxSize-1); err == io.EOF {
done = true
} else if err != nil {
l.log.Error("error outputting frame", "err", err)
......@@ -306,6 +323,13 @@ mainLoop:
break // local do-while loop
}
}
// TODO: if we exit to the mainLoop early on an error,
// it would be nice if we can determine which blocks are still readable from the partially submitted data.
// We can open a channel-in-reader, parse the data up to which we managed to submit it,
// and then take the block hash (if we remember which blocks we put in the channel)
//
// Now we just continue batch submission from the end of the channel.
l.lastSubmittedBlock = prevID
case <-l.done:
return
......@@ -394,6 +418,21 @@ func dialEthClientWithTimeout(ctx context.Context, url string) (
return ethclient.DialContext(ctxt, url)
}
// dialRollupClientWithTimeout attempts to dial the RPC provider using the provided
// URL. If the dial doesn't complete within defaultDialTimeout seconds, this
// method will return an error.
func dialRollupClientWithTimeout(ctx context.Context, url string) (*rollupclient.RollupClient, error) {
ctxt, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
client, err := rpc.DialContext(ctxt, url)
if err != nil {
return nil, err
}
return rollupclient.NewRollupClient(client), nil
}
// parseAddress parses an ETH address from a hex string. This method will fail if
// the address is not a valid hexadecimal address.
func parseAddress(address string) (common.Address, error) {
......
......@@ -14,9 +14,12 @@ type Config struct {
// L1EthRpc is the HTTP provider URL for L1.
L1EthRpc string
// L2EthRpc is the HTTP provider URL for the rollup node.
// L2EthRpc is the HTTP provider URL for the L2 execution engine.
L2EthRpc string
// RollupRpc is the HTTP provider URL for the L2 rollup node.
RollupRpc string
// MinL1TxSize is the minimum size of a batch tx submitted to L1.
MinL1TxSize uint64
......@@ -73,6 +76,7 @@ func NewConfig(ctx *cli.Context) Config {
/* Required Flags */
L1EthRpc: ctx.GlobalString(flags.L1EthRpcFlag.Name),
L2EthRpc: ctx.GlobalString(flags.L2EthRpcFlag.Name),
RollupRpc: ctx.GlobalString(flags.RollupRpcFlag.Name),
MinL1TxSize: ctx.GlobalUint64(flags.MinL1TxSizeBytesFlag.Name),
MaxL1TxSize: ctx.GlobalUint64(flags.MaxL1TxSizeBytesFlag.Name),
ChannelTimeout: ctx.GlobalUint64(flags.ChannelTimeoutFlag.Name),
......
......@@ -25,6 +25,12 @@ var (
Required: true,
EnvVar: "L2_ETH_RPC",
}
RollupRpcFlag = cli.StringFlag{
Name: "rollup-rpc",
Usage: "HTTP provider URL for Rollup node",
Required: true,
EnvVar: "ROLLUP_RPC",
}
MinL1TxSizeBytesFlag = cli.Uint64Flag{
Name: "min-l1-tx-size-bytes",
Usage: "The minimum size of a batch tx submitted to L1.",
......@@ -112,6 +118,7 @@ var (
var requiredFlags = []cli.Flag{
L1EthRpcFlag,
L2EthRpcFlag,
RollupRpcFlag,
MinL1TxSizeBytesFlag,
MaxL1TxSizeBytesFlag,
ChannelTimeoutFlag,
......
......@@ -5,6 +5,8 @@ import (
"math/big"
"time"
"github.com/ethereum-optimism/optimism/op-proposer/rollupclient"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
......@@ -20,6 +22,8 @@ type Config struct {
// API to hit for batch data
L2Client *ethclient.Client
RollupNode *rollupclient.RollupClient
// Limit the size of txs
MinL1TxSize uint64
MaxL1TxSize uint64
......
......@@ -573,6 +573,7 @@ func (cfg SystemConfig) start() (*System, error) {
sys.batchSubmitter, err = bss.NewBatchSubmitter(bss.Config{
L1EthRpc: sys.nodes["l1"].WSEndpoint(),
L2EthRpc: sys.nodes["sequencer"].WSEndpoint(),
RollupRpc: rollupEndpoint,
MinL1TxSize: 1,
MaxL1TxSize: 120000,
ChannelTimeout: sys.cfg.RollupConfig.ChannelTimeout,
......
......@@ -5,14 +5,13 @@ import (
"fmt"
"math/big"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/version"
"github.com/ethereum-optimism/optimism/op-bindings/predeploys"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/l2"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/version"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
......@@ -31,17 +30,23 @@ type l2EthClient interface {
L2BlockRefByHash(ctx context.Context, l2Hash common.Hash) (eth.L2BlockRef, error)
}
type driverClient interface {
SyncStatus(ctx context.Context) (*driver.SyncStatus, error)
}
type nodeAPI struct {
config *rollup.Config
client l2EthClient
dr driverClient
log log.Logger
m *metrics.Metrics
}
func newNodeAPI(config *rollup.Config, l2Client l2EthClient, log log.Logger, m *metrics.Metrics) *nodeAPI {
func newNodeAPI(config *rollup.Config, l2Client l2EthClient, dr driverClient, log log.Logger, m *metrics.Metrics) *nodeAPI {
return &nodeAPI{
config: config,
client: l2Client,
dr: dr,
log: log,
m: m,
}
......@@ -81,6 +86,10 @@ func (n *nodeAPI) OutputAtBlock(ctx context.Context, number rpc.BlockNumber) ([]
return []eth.Bytes32{l2OutputRootVersion, l2OutputRoot}, nil
}
func (n *nodeAPI) SyncStatus(ctx context.Context) (*driver.SyncStatus, error) {
return n.dr.SyncStatus(ctx)
}
func (n *nodeAPI) Version(ctx context.Context) (string, error) {
recordDur := n.m.RecordRPCServerRequest("optimism_version")
defer recordDur()
......
......@@ -155,7 +155,7 @@ func (n *OpNode) initRPCServer(ctx context.Context, cfg *Config) error {
if err != nil {
return err
}
n.server, err = newRPCServer(ctx, &cfg.RPC, &cfg.Rollup, client, n.log, n.appVersion, n.metrics)
n.server, err = newRPCServer(ctx, &cfg.RPC, &cfg.Rollup, client, n.l2Engine, n.log, n.appVersion, n.metrics)
if err != nil {
return err
}
......
......@@ -31,8 +31,8 @@ type rpcServer struct {
l2.Source
}
func newRPCServer(ctx context.Context, rpcCfg *RPCConfig, rollupCfg *rollup.Config, l2Client l2EthClient, log log.Logger, appVersion string, m *metrics.Metrics) (*rpcServer, error) {
api := newNodeAPI(rollupCfg, l2Client, log.New("rpc", "node"), m)
func newRPCServer(ctx context.Context, rpcCfg *RPCConfig, rollupCfg *rollup.Config, l2Client l2EthClient, dr driverClient, log log.Logger, appVersion string, m *metrics.Metrics) (*rpcServer, error) {
api := newNodeAPI(rollupCfg, l2Client, dr, log.New("rpc", "node"), m)
// TODO: extend RPC config with options for WS, IPC and HTTP RPC connections
endpoint := net.JoinHostPort(rpcCfg.ListenAddr, strconv.Itoa(rpcCfg.ListenPort))
r := &rpcServer{
......
......@@ -4,6 +4,10 @@ import (
"context"
"encoding/json"
"math/big"
"math/rand"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum-optimism/optimism/op-node/testutils"
"github.com/ethereum-optimism/optimism/op-node/metrics"
......@@ -88,7 +92,9 @@ func TestOutputAtBlock(t *testing.T) {
l2Client.mock.On("GetBlockHeader", "latest").Return(&header)
l2Client.mock.On("GetProof", predeploys.L2ToL1MessagePasserAddr, "latest").Return(&result)
server, err := newRPCServer(context.Background(), rpcCfg, rollupCfg, l2Client, log, "0.0", metrics.NewMetrics(""))
drClient := &mockDriverClient{}
server, err := newRPCServer(context.Background(), rpcCfg, rollupCfg, l2Client, drClient, log, "0.0", metrics.NewMetrics(""))
assert.NoError(t, err)
assert.NoError(t, server.Start())
defer server.Stop()
......@@ -106,6 +112,7 @@ func TestOutputAtBlock(t *testing.T) {
func TestVersion(t *testing.T) {
log := testlog.Logger(t, log.LvlError)
l2Client := &mockL2Client{}
drClient := &mockDriverClient{}
rpcCfg := &RPCConfig{
ListenAddr: "localhost",
ListenPort: 0,
......@@ -113,7 +120,7 @@ func TestVersion(t *testing.T) {
rollupCfg := &rollup.Config{
// ignore other rollup config info in this test
}
server, err := newRPCServer(context.Background(), rpcCfg, rollupCfg, l2Client, log, "0.0", metrics.NewMetrics(""))
server, err := newRPCServer(context.Background(), rpcCfg, rollupCfg, l2Client, drClient, log, "0.0", metrics.NewMetrics(""))
assert.NoError(t, err)
assert.NoError(t, server.Start())
defer server.Stop()
......@@ -127,6 +134,49 @@ func TestVersion(t *testing.T) {
assert.Equal(t, version.Version+"-"+version.Meta, out)
}
func TestSyncStatus(t *testing.T) {
log := testlog.Logger(t, log.LvlError)
l2Client := &mockL2Client{}
drClient := &mockDriverClient{}
rng := rand.New(rand.NewSource(1234))
status := driver.SyncStatus{
CurrentL1: testutils.RandomBlockRef(rng),
HeadL1: testutils.RandomBlockRef(rng),
UnsafeL2: testutils.RandomL2BlockRef(rng),
SafeL2: testutils.RandomL2BlockRef(rng),
FinalizedL2: testutils.RandomL2BlockRef(rng),
}
drClient.On("SyncStatus").Return(&status)
rpcCfg := &RPCConfig{
ListenAddr: "localhost",
ListenPort: 0,
}
rollupCfg := &rollup.Config{
// ignore other rollup config info in this test
}
server, err := newRPCServer(context.Background(), rpcCfg, rollupCfg, l2Client, drClient, log, "0.0", metrics.NewMetrics(""))
assert.NoError(t, err)
assert.NoError(t, server.Start())
defer server.Stop()
client, err := dialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String(), nil)
assert.NoError(t, err)
var out *driver.SyncStatus
err = client.CallContext(context.Background(), &out, "optimism_syncStatus")
assert.NoError(t, err)
assert.Equal(t, &status, out)
}
type mockDriverClient struct {
mock.Mock
}
func (c *mockDriverClient) SyncStatus(ctx context.Context) (*driver.SyncStatus, error) {
return c.Mock.MethodCalled("SyncStatus").Get(0).(*driver.SyncStatus), nil
}
type mockL2Client struct {
mock mock.Mock
}
......
......@@ -79,6 +79,10 @@ func (d *Driver) OnUnsafeL2Payload(ctx context.Context, payload *eth.ExecutionPa
return d.s.OnUnsafeL2Payload(ctx, payload)
}
func (d *Driver) SyncStatus(ctx context.Context) (*SyncStatus, error) {
return d.s.SyncStatus(ctx)
}
func (d *Driver) Start(ctx context.Context) error {
return d.s.Start(ctx)
}
......
......@@ -13,6 +13,29 @@ import (
"github.com/ethereum/go-ethereum/log"
)
// SyncStatus is a snapshot of the driver
type SyncStatus struct {
// CurrentL1 is the block that the derivation process is currently at,
// this may not be fully derived into L2 data yet.
// If the node is synced, this matches the HeadL1, minus the verifier confirmation distance.
CurrentL1 eth.L1BlockRef `json:"current_l1"`
// HeadL1 is the perceived head of the L1 chain, no confirmation distance.
// The head is not guaranteed to build on the other L1 sync status fields,
// as the node may be in progress of resetting to adapt to a L1 reorg.
HeadL1 eth.L1BlockRef `json:"head_l1"`
// UnsafeL2 is the absolute tip of the L2 chain,
// pointing to block data that has not been submitted to L1 yet.
// The sequencer is building this, and verifiers may also be ahead of the
// SafeL2 block if they sync blocks via p2p or other offchain sources.
UnsafeL2 eth.L2BlockRef `json:"unsafe_l2"`
// SafeL2 points to the L2 block that was derived from the L1 chain.
// This point may still reorg if the L1 chain reorgs.
SafeL2 eth.L2BlockRef `json:"safe_l2"`
// FinalizedL2 points to the L2 block that was derived fully from
// finalized L1 information, thus irreversible.
FinalizedL2 eth.L2BlockRef `json:"finalized_l2"`
}
type state struct {
// Chain State
l1Head eth.L1BlockRef // Latest recorded head of the L1 Chain, independent of derivation work
......@@ -27,6 +50,9 @@ type state struct {
// When the derivation pipeline is waiting for new data to do anything
idleDerivation bool
// Requests for sync status. Synchronized with event loop to avoid reading an inconsistent sync status.
syncStatusReq chan chan SyncStatus
// Rollup config: rollup chain configuration
Config *rollup.Config
......@@ -55,6 +81,7 @@ func NewState(driverCfg *Config, log log.Logger, snapshotLog log.Logger, config
return &state{
derivation: derivationPipeline,
idleDerivation: false,
syncStatusReq: make(chan chan SyncStatus, 10),
Config: config,
DriverConfig: driverCfg,
done: make(chan struct{}),
......@@ -336,12 +363,35 @@ func (s *state) eventLoop() {
s.l2Head = unsafe
reqStep() // continue with the next step if we can
}
case respCh := <-s.syncStatusReq:
respCh <- SyncStatus{
CurrentL1: s.derivation.Progress().Origin,
HeadL1: s.l1Head,
UnsafeL2: s.l2Head,
SafeL2: s.l2SafeHead,
FinalizedL2: s.l2Finalized,
}
case <-s.done:
return
}
}
}
func (s *state) SyncStatus(ctx context.Context) (*SyncStatus, error) {
respCh := make(chan SyncStatus)
select {
case <-ctx.Done():
return nil, ctx.Err()
case s.syncStatusReq <- respCh:
select {
case <-ctx.Done():
return nil, ctx.Err()
case resp := <-respCh:
return &resp, nil
}
}
}
// deferJSONString helps avoid a JSON-encoding performance hit if the snapshot logger does not run
type deferJSONString struct {
x any
......
......@@ -66,6 +66,17 @@ func NextRandomRef(rng *rand.Rand, parent eth.L1BlockRef) eth.L1BlockRef {
}
}
func RandomL2BlockRef(rng *rand.Rand) eth.L2BlockRef {
return eth.L2BlockRef{
Hash: RandomHash(rng),
Number: rng.Uint64(),
ParentHash: RandomHash(rng),
Time: rng.Uint64(),
L1Origin: RandomBlockID(rng),
SequenceNumber: rng.Uint64(),
}
}
func NextRandomL2Ref(rng *rand.Rand, l2BlockTime uint64, parent eth.L2BlockRef, origin eth.BlockID) eth.L2BlockRef {
seq := parent.SequenceNumber + 1
if parent.L1Origin != origin {
......
......@@ -5,6 +5,7 @@ import (
"math/big"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rpc"
)
......@@ -22,3 +23,15 @@ func (r *RollupClient) OutputAtBlock(ctx context.Context, blockNum *big.Int) ([]
err := r.rpc.CallContext(ctx, &output, "optimism_outputAtBlock", hexutil.EncodeBig(blockNum))
return output, err
}
func (r *RollupClient) SyncStatus(ctx context.Context) (*driver.SyncStatus, error) {
var output *driver.SyncStatus
err := r.rpc.CallContext(ctx, &output, "optimism_syncStatus")
return output, err
}
func (r *RollupClient) Version(ctx context.Context) (string, error) {
var output string
err := r.rpc.CallContext(ctx, &output, "optimism_version")
return output, err
}
......@@ -106,6 +106,7 @@ services:
environment:
L1_ETH_RPC: http://l1:8545
L2_ETH_RPC: http://l2:8545
ROLLUP_RPC: http://op-node:8545
BATCH_SUBMITTER_MIN_L1_TX_SIZE_BYTES: 1
BATCH_SUBMITTER_MAX_L1_TX_SIZE_BYTES: 120000
BATCH_SUBMITTER_CHANNEL_TIMEOUT: 40
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment