Commit 2859af15 authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

Merge branch 'develop' into zhwrd/endpoint-monitor-ci

parents a10c2b49 f39853ef
...@@ -286,9 +286,14 @@ jobs: ...@@ -286,9 +286,14 @@ jobs:
gotestsum --junitfile /test-results/op-batcher.xml -- -coverpkg=github.com/ethereum-optimism/optimism/... -coverprofile=coverage.out -covermode=atomic ./... gotestsum --junitfile /test-results/op-batcher.xml -- -coverpkg=github.com/ethereum-optimism/optimism/... -coverprofile=coverage.out -covermode=atomic ./...
working_directory: op-batcher working_directory: op-batcher
- run: - run:
name: test op-e2e name: test op-e2e (WS)
command: | command: |
gotestsum --junitfile /test-results/op-e2e.xml -- -coverpkg=github.com/ethereum-optimism/optimism/... -coverprofile=coverage.out -covermode=atomic ./... gotestsum --format standard-verbose --junitfile /test-results/op-e2e.xml -- -coverpkg=github.com/ethereum-optimism/optimism/... -coverprofile=coverage.out -covermode=atomic ./...
working_directory: op-e2e
- run:
name: test op-e2e (HTTP)
command: |
OP_E2E_USE_HTTP=true gotestsum --junitfile /test-results/op-e2e.xml -- -coverpkg=github.com/ethereum-optimism/optimism/... -coverprofile=coverage.out -covermode=atomic ./...
working_directory: op-e2e working_directory: op-e2e
- run: - run:
name: test op-service name: test op-service
......
...@@ -150,3 +150,7 @@ func (s *L1Miner) ActL1EndBlock(t Testing) { ...@@ -150,3 +150,7 @@ func (s *L1Miner) ActL1EndBlock(t Testing) {
t.Fatalf("failed to insert block into l1 chain") t.Fatalf("failed to insert block into l1 chain")
} }
} }
func (s *L1Miner) Close() error {
return s.L1Replica.Close()
}
...@@ -19,6 +19,9 @@ func TestL1Miner_BuildBlock(gt *testing.T) { ...@@ -19,6 +19,9 @@ func TestL1Miner_BuildBlock(gt *testing.T) {
sd := e2eutils.Setup(t, dp, defaultAlloc) sd := e2eutils.Setup(t, dp, defaultAlloc)
log := testlog.Logger(t, log.LvlDebug) log := testlog.Logger(t, log.LvlDebug)
miner := NewL1Miner(log, sd.L1Cfg) miner := NewL1Miner(log, sd.L1Cfg)
t.Cleanup(func() {
_ = miner.Close()
})
cl := miner.EthClient() cl := miner.EthClient()
signer := types.LatestSigner(sd.L1Cfg.Config) signer := types.LatestSigner(sd.L1Cfg.Config)
...@@ -53,6 +56,9 @@ func TestL1Miner_BuildBlock(gt *testing.T) { ...@@ -53,6 +56,9 @@ func TestL1Miner_BuildBlock(gt *testing.T) {
// now make a replica that syncs these two blocks from the miner // now make a replica that syncs these two blocks from the miner
replica := NewL1Replica(log, sd.L1Cfg) replica := NewL1Replica(log, sd.L1Cfg)
t.Cleanup(func() {
_ = replica.Close()
})
replica.ActL1Sync(miner.CanonL1Chain())(t) replica.ActL1Sync(miner.CanonL1Chain())(t)
replica.ActL1Sync(miner.CanonL1Chain())(t) replica.ActL1Sync(miner.CanonL1Chain())(t)
require.Equal(t, replica.l1Chain.CurrentBlock().Hash(), miner.l1Chain.CurrentBlock().Hash()) require.Equal(t, replica.l1Chain.CurrentBlock().Hash(), miner.l1Chain.CurrentBlock().Hash())
......
...@@ -161,7 +161,7 @@ func (s *L1Replica) EthClient() *ethclient.Client { ...@@ -161,7 +161,7 @@ func (s *L1Replica) EthClient() *ethclient.Client {
func (s *L1Replica) RPCClient() client.RPC { func (s *L1Replica) RPCClient() client.RPC {
cl, _ := s.node.Attach() // never errors cl, _ := s.node.Attach() // never errors
return testutils.RPCErrFaker{ return testutils.RPCErrFaker{
RPC: cl, RPC: client.NewBaseRPCClient(cl),
ErrFn: func() error { ErrFn: func() error {
err := s.failL1RPC err := s.failL1RPC
s.failL1RPC = nil // reset back, only error once. s.failL1RPC = nil // reset back, only error once.
...@@ -195,3 +195,7 @@ func (s *L1Replica) ActL1SafeNext(t Testing) { ...@@ -195,3 +195,7 @@ func (s *L1Replica) ActL1SafeNext(t Testing) {
} }
s.l1Chain.SetSafe(next) s.l1Chain.SetSafe(next)
} }
func (s *L1Replica) Close() error {
return s.node.Close()
}
...@@ -33,6 +33,9 @@ func TestL1Replica_ActL1RPCFail(gt *testing.T) { ...@@ -33,6 +33,9 @@ func TestL1Replica_ActL1RPCFail(gt *testing.T) {
sd := e2eutils.Setup(t, dp, defaultAlloc) sd := e2eutils.Setup(t, dp, defaultAlloc)
log := testlog.Logger(t, log.LvlDebug) log := testlog.Logger(t, log.LvlDebug)
replica := NewL1Replica(log, sd.L1Cfg) replica := NewL1Replica(log, sd.L1Cfg)
t.Cleanup(func() {
_ = replica.Close()
})
// mock an RPC failure // mock an RPC failure
replica.ActL1RPCFail(t) replica.ActL1RPCFail(t)
// check RPC failure // check RPC failure
...@@ -76,6 +79,9 @@ func TestL1Replica_ActL1Sync(gt *testing.T) { ...@@ -76,6 +79,9 @@ func TestL1Replica_ActL1Sync(gt *testing.T) {
// Enough setup, create the test actor and run the actual actions // Enough setup, create the test actor and run the actual actions
replica1 := NewL1Replica(log, sd.L1Cfg) replica1 := NewL1Replica(log, sd.L1Cfg)
t.Cleanup(func() {
_ = replica1.Close()
})
syncFromA := replica1.ActL1Sync(canonL1(chainA)) syncFromA := replica1.ActL1Sync(canonL1(chainA))
// sync canonical chain A // sync canonical chain A
for replica1.l1Chain.CurrentBlock().NumberU64()+1 < uint64(len(chainA)) { for replica1.l1Chain.CurrentBlock().NumberU64()+1 < uint64(len(chainA)) {
...@@ -94,6 +100,9 @@ func TestL1Replica_ActL1Sync(gt *testing.T) { ...@@ -94,6 +100,9 @@ func TestL1Replica_ActL1Sync(gt *testing.T) {
// Adding and syncing a new replica // Adding and syncing a new replica
replica2 := NewL1Replica(log, sd.L1Cfg) replica2 := NewL1Replica(log, sd.L1Cfg)
t.Cleanup(func() {
_ = replica2.Close()
})
syncFromOther := replica2.ActL1Sync(replica1.CanonL1Chain()) syncFromOther := replica2.ActL1Sync(replica1.CanonL1Chain())
for replica2.l1Chain.CurrentBlock().NumberU64()+1 < uint64(len(chainB)) { for replica2.l1Chain.CurrentBlock().NumberU64()+1 < uint64(len(chainB)) {
syncFromOther(t) syncFromOther(t)
......
...@@ -121,7 +121,7 @@ func (s *L2Engine) EthClient() *ethclient.Client { ...@@ -121,7 +121,7 @@ func (s *L2Engine) EthClient() *ethclient.Client {
func (e *L2Engine) RPCClient() client.RPC { func (e *L2Engine) RPCClient() client.RPC {
cl, _ := e.node.Attach() // never errors cl, _ := e.node.Attach() // never errors
return testutils.RPCErrFaker{ return testutils.RPCErrFaker{
RPC: cl, RPC: client.NewBaseRPCClient(cl),
ErrFn: func() error { ErrFn: func() error {
err := e.failL2RPC err := e.failL2RPC
e.failL2RPC = nil // reset back, only error once. e.failL2RPC = nil // reset back, only error once.
...@@ -175,3 +175,7 @@ func (e *L2Engine) ActL2IncludeTx(from common.Address) Action { ...@@ -175,3 +175,7 @@ func (e *L2Engine) ActL2IncludeTx(from common.Address) Action {
e.l2Transactions = append(e.l2Transactions, tx) e.l2Transactions = append(e.l2Transactions, tx)
} }
} }
func (e *L2Engine) Close() error {
return e.node.Close()
}
...@@ -95,6 +95,9 @@ func TestL2EngineAPIBlockBuilding(gt *testing.T) { ...@@ -95,6 +95,9 @@ func TestL2EngineAPIBlockBuilding(gt *testing.T) {
sd.L2Cfg.MustCommit(db) sd.L2Cfg.MustCommit(db)
engine := NewL2Engine(log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath) engine := NewL2Engine(log, sd.L2Cfg, sd.RollupCfg.Genesis.L1, jwtPath)
t.Cleanup(func() {
_ = engine.Close()
})
cl := engine.EthClient() cl := engine.EthClient()
signer := types.LatestSigner(sd.L2Cfg.Config) signer := types.LatestSigner(sd.L2Cfg.Config)
......
...@@ -98,6 +98,8 @@ func initL1Geth(cfg *SystemConfig, wallet *hdwallet.Wallet, genesis *core.Genesi ...@@ -98,6 +98,8 @@ func initL1Geth(cfg *SystemConfig, wallet *hdwallet.Wallet, genesis *core.Genesi
} }
nodeConfig := &node.Config{ nodeConfig := &node.Config{
Name: "l1-geth", Name: "l1-geth",
HTTPHost: "127.0.0.1",
HTTPPort: 0,
WSHost: "127.0.0.1", WSHost: "127.0.0.1",
WSPort: 0, WSPort: 0,
WSModules: []string{"debug", "admin", "eth", "txpool", "net", "rpc", "web3", "personal", "engine"}, WSModules: []string{"debug", "admin", "eth", "txpool", "net", "rpc", "web3", "personal", "engine"},
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"math/big" "math/big"
"os"
"strings" "strings"
"time" "time"
...@@ -337,13 +338,25 @@ func (cfg SystemConfig) start() (*System, error) { ...@@ -337,13 +338,25 @@ func (cfg SystemConfig) start() (*System, error) {
// Configure connections to L1 and L2 for rollup nodes. // Configure connections to L1 and L2 for rollup nodes.
// TODO: refactor testing to use in-process rpc connections instead of websockets. // TODO: refactor testing to use in-process rpc connections instead of websockets.
l1EndpointConfig := l1Node.WSEndpoint()
useHTTP := os.Getenv("OP_E2E_USE_HTTP") == "true"
if useHTTP {
log.Info("using HTTP client")
l1EndpointConfig = l1Node.HTTPEndpoint()
}
for name, rollupCfg := range cfg.Nodes { for name, rollupCfg := range cfg.Nodes {
l2EndpointConfig := sys.nodes[name].WSAuthEndpoint()
if useHTTP {
l2EndpointConfig = sys.nodes[name].HTTPAuthEndpoint()
}
rollupCfg.L1 = &rollupNode.L1EndpointConfig{ rollupCfg.L1 = &rollupNode.L1EndpointConfig{
L1NodeAddr: l1Node.WSEndpoint(), L1NodeAddr: l1EndpointConfig,
L1TrustRPC: false, L1TrustRPC: false,
} }
rollupCfg.L2 = &rollupNode.L2EndpointConfig{ rollupCfg.L2 = &rollupNode.L2EndpointConfig{
L2EngineAddr: sys.nodes[name].WSAuthEndpoint(), L2EngineAddr: l2EndpointConfig,
L2EngineJWTSecret: cfg.JWTSecret, L2EngineJWTSecret: cfg.JWTSecret,
} }
} }
...@@ -541,17 +554,11 @@ func (cfg SystemConfig) start() (*System, error) { ...@@ -541,17 +554,11 @@ func (cfg SystemConfig) start() (*System, error) {
} }
} }
rollupEndpoint := fmt.Sprintf(
"http://%s:%d",
sys.cfg.Nodes["sequencer"].RPC.ListenAddr,
sys.cfg.Nodes["sequencer"].RPC.ListenPort,
)
// L2Output Submitter // L2Output Submitter
sys.l2OutputSubmitter, err = l2os.NewL2OutputSubmitter(l2os.Config{ sys.l2OutputSubmitter, err = l2os.NewL2OutputSubmitter(l2os.Config{
L1EthRpc: sys.nodes["l1"].WSEndpoint(), L1EthRpc: sys.nodes["l1"].WSEndpoint(),
L2EthRpc: sys.nodes["sequencer"].WSEndpoint(), L2EthRpc: sys.nodes["sequencer"].WSEndpoint(),
RollupRpc: rollupEndpoint, RollupRpc: sys.rollupNodes["sequencer"].HTTPEndpoint(),
L2OOAddress: sys.L2OOContractAddr.String(), L2OOAddress: sys.L2OOContractAddr.String(),
PollInterval: 50 * time.Millisecond, PollInterval: 50 * time.Millisecond,
NumConfirmations: 1, NumConfirmations: 1,
...@@ -576,7 +583,7 @@ func (cfg SystemConfig) start() (*System, error) { ...@@ -576,7 +583,7 @@ func (cfg SystemConfig) start() (*System, error) {
sys.batchSubmitter, err = bss.NewBatchSubmitter(bss.Config{ sys.batchSubmitter, err = bss.NewBatchSubmitter(bss.Config{
L1EthRpc: sys.nodes["l1"].WSEndpoint(), L1EthRpc: sys.nodes["l1"].WSEndpoint(),
L2EthRpc: sys.nodes["sequencer"].WSEndpoint(), L2EthRpc: sys.nodes["sequencer"].WSEndpoint(),
RollupRpc: rollupEndpoint, RollupRpc: sys.rollupNodes["sequencer"].HTTPEndpoint(),
MinL1TxSize: 1, MinL1TxSize: 1,
MaxL1TxSize: 120000, MaxL1TxSize: 120000,
ChannelTimeout: sys.cfg.RollupConfig.ChannelTimeout, ChannelTimeout: sys.cfg.RollupConfig.ChannelTimeout,
......
...@@ -121,7 +121,7 @@ func defaultSystemConfig(t *testing.T) SystemConfig { ...@@ -121,7 +121,7 @@ func defaultSystemConfig(t *testing.T) SystemConfig {
// Submitter PrivKey is set in system start for rollup nodes where sequencer = true // Submitter PrivKey is set in system start for rollup nodes where sequencer = true
RPC: node.RPCConfig{ RPC: node.RPCConfig{
ListenAddr: "127.0.0.1", ListenAddr: "127.0.0.1",
ListenPort: 9093, ListenPort: 0,
EnableAdmin: true, EnableAdmin: true,
}, },
L1EpochPollInterval: time.Second * 4, L1EpochPollInterval: time.Second * 4,
...@@ -154,6 +154,7 @@ func defaultSystemConfig(t *testing.T) SystemConfig { ...@@ -154,6 +154,7 @@ func defaultSystemConfig(t *testing.T) SystemConfig {
} }
func TestL2OutputSubmitter(t *testing.T) { func TestL2OutputSubmitter(t *testing.T) {
t.Parallel()
if !verboseGethNodes { if !verboseGethNodes {
log.Root().SetHandler(log.DiscardHandler()) log.Root().SetHandler(log.DiscardHandler())
} }
...@@ -166,7 +167,7 @@ func TestL2OutputSubmitter(t *testing.T) { ...@@ -166,7 +167,7 @@ func TestL2OutputSubmitter(t *testing.T) {
l1Client := sys.Clients["l1"] l1Client := sys.Clients["l1"]
rollupRPCClient, err := rpc.DialContext(context.Background(), cfg.Nodes["sequencer"].RPC.HttpEndpoint()) rollupRPCClient, err := rpc.DialContext(context.Background(), sys.rollupNodes["sequencer"].HTTPEndpoint())
require.Nil(t, err) require.Nil(t, err)
rollupClient := sources.NewRollupClient(rollupRPCClient) rollupClient := sources.NewRollupClient(rollupRPCClient)
...@@ -229,6 +230,7 @@ func TestL2OutputSubmitter(t *testing.T) { ...@@ -229,6 +230,7 @@ func TestL2OutputSubmitter(t *testing.T) {
// TestSystemE2E sets up a L1 Geth node, a rollup node, and a L2 geth node and then confirms that L1 deposits are reflected on L2. // TestSystemE2E sets up a L1 Geth node, a rollup node, and a L2 geth node and then confirms that L1 deposits are reflected on L2.
// All nodes are run in process (but are the full nodes, not mocked or stubbed). // All nodes are run in process (but are the full nodes, not mocked or stubbed).
func TestSystemE2E(t *testing.T) { func TestSystemE2E(t *testing.T) {
t.Parallel()
if !verboseGethNodes { if !verboseGethNodes {
log.Root().SetHandler(log.DiscardHandler()) log.Root().SetHandler(log.DiscardHandler())
} }
...@@ -328,7 +330,7 @@ func TestSystemE2E(t *testing.T) { ...@@ -328,7 +330,7 @@ func TestSystemE2E(t *testing.T) {
require.Equal(t, verifBlock.ParentHash(), seqBlock.ParentHash(), "Verifier and sequencer blocks parent hashes not the same after including a batch tx") require.Equal(t, verifBlock.ParentHash(), seqBlock.ParentHash(), "Verifier and sequencer blocks parent hashes not the same after including a batch tx")
require.Equal(t, verifBlock.Hash(), seqBlock.Hash(), "Verifier and sequencer blocks not the same after including a batch tx") require.Equal(t, verifBlock.Hash(), seqBlock.Hash(), "Verifier and sequencer blocks not the same after including a batch tx")
rollupRPCClient, err := rpc.DialContext(context.Background(), cfg.Nodes["sequencer"].RPC.HttpEndpoint()) rollupRPCClient, err := rpc.DialContext(context.Background(), sys.rollupNodes["sequencer"].HTTPEndpoint())
require.Nil(t, err) require.Nil(t, err)
rollupClient := sources.NewRollupClient(rollupRPCClient) rollupClient := sources.NewRollupClient(rollupRPCClient)
// basic check that sync status works // basic check that sync status works
...@@ -343,6 +345,7 @@ func TestSystemE2E(t *testing.T) { ...@@ -343,6 +345,7 @@ func TestSystemE2E(t *testing.T) {
// TestConfirmationDepth runs the rollup with both sequencer and verifier not immediately processing the tip of the chain. // TestConfirmationDepth runs the rollup with both sequencer and verifier not immediately processing the tip of the chain.
func TestConfirmationDepth(t *testing.T) { func TestConfirmationDepth(t *testing.T) {
t.Parallel()
if !verboseGethNodes { if !verboseGethNodes {
log.Root().SetHandler(log.DiscardHandler()) log.Root().SetHandler(log.DiscardHandler())
} }
...@@ -390,6 +393,7 @@ func TestConfirmationDepth(t *testing.T) { ...@@ -390,6 +393,7 @@ func TestConfirmationDepth(t *testing.T) {
// TestFinalize tests if L2 finalizes after sufficient time after L1 finalizes // TestFinalize tests if L2 finalizes after sufficient time after L1 finalizes
func TestFinalize(t *testing.T) { func TestFinalize(t *testing.T) {
t.Parallel()
if !verboseGethNodes { if !verboseGethNodes {
log.Root().SetHandler(log.DiscardHandler()) log.Root().SetHandler(log.DiscardHandler())
} }
...@@ -417,6 +421,7 @@ func TestFinalize(t *testing.T) { ...@@ -417,6 +421,7 @@ func TestFinalize(t *testing.T) {
} }
func TestMintOnRevertedDeposit(t *testing.T) { func TestMintOnRevertedDeposit(t *testing.T) {
t.Parallel()
if !verboseGethNodes { if !verboseGethNodes {
log.Root().SetHandler(log.DiscardHandler()) log.Root().SetHandler(log.DiscardHandler())
} }
...@@ -490,6 +495,7 @@ func TestMintOnRevertedDeposit(t *testing.T) { ...@@ -490,6 +495,7 @@ func TestMintOnRevertedDeposit(t *testing.T) {
} }
func TestMissingBatchE2E(t *testing.T) { func TestMissingBatchE2E(t *testing.T) {
t.Parallel()
if !verboseGethNodes { if !verboseGethNodes {
log.Root().SetHandler(log.DiscardHandler()) log.Root().SetHandler(log.DiscardHandler())
} }
...@@ -599,6 +605,7 @@ func L1InfoFromState(ctx context.Context, contract *bindings.L1Block, l2Number * ...@@ -599,6 +605,7 @@ func L1InfoFromState(ctx context.Context, contract *bindings.L1Block, l2Number *
// TestSystemMockP2P sets up a L1 Geth node, a rollup node, and a L2 geth node and then confirms that // TestSystemMockP2P sets up a L1 Geth node, a rollup node, and a L2 geth node and then confirms that
// the nodes can sync L2 blocks before they are confirmed on L1. // the nodes can sync L2 blocks before they are confirmed on L1.
func TestSystemMockP2P(t *testing.T) { func TestSystemMockP2P(t *testing.T) {
t.Parallel()
if !verboseGethNodes { if !verboseGethNodes {
log.Root().SetHandler(log.DiscardHandler()) log.Root().SetHandler(log.DiscardHandler())
} }
...@@ -672,6 +679,7 @@ func TestSystemMockP2P(t *testing.T) { ...@@ -672,6 +679,7 @@ func TestSystemMockP2P(t *testing.T) {
} }
func TestL1InfoContract(t *testing.T) { func TestL1InfoContract(t *testing.T) {
t.Parallel()
if !verboseGethNodes { if !verboseGethNodes {
log.Root().SetHandler(log.DiscardHandler()) log.Root().SetHandler(log.DiscardHandler())
} }
...@@ -795,6 +803,7 @@ func calcL1GasUsed(data []byte, overhead *big.Int) *big.Int { ...@@ -795,6 +803,7 @@ func calcL1GasUsed(data []byte, overhead *big.Int) *big.Int {
// balance changes on L1 and L2 and has to include gas fees in the balance checks. // balance changes on L1 and L2 and has to include gas fees in the balance checks.
// It does not check that the withdrawal can be executed prior to the end of the finality period. // It does not check that the withdrawal can be executed prior to the end of the finality period.
func TestWithdrawals(t *testing.T) { func TestWithdrawals(t *testing.T) {
t.Parallel()
if !verboseGethNodes { if !verboseGethNodes {
log.Root().SetHandler(log.DiscardHandler()) log.Root().SetHandler(log.DiscardHandler())
} }
...@@ -970,6 +979,7 @@ func TestWithdrawals(t *testing.T) { ...@@ -970,6 +979,7 @@ func TestWithdrawals(t *testing.T) {
// TestFees checks that L1/L2 fees are handled. // TestFees checks that L1/L2 fees are handled.
func TestFees(t *testing.T) { func TestFees(t *testing.T) {
t.Parallel()
if !verboseGethNodes { if !verboseGethNodes {
log.Root().SetHandler(log.DiscardHandler()) log.Root().SetHandler(log.DiscardHandler())
} }
......
package client
import (
"context"
"errors"
"sync"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)
var ErrSubscriberClosed = errors.New("subscriber closed")
// PollingClient is an RPC client that provides newHeads subscriptions
// via a polling loop. It's designed for HTTP endpoints, but WS will
// work too.
type PollingClient struct {
c RPC
lgr log.Logger
pollRate time.Duration
ctx context.Context
cancel context.CancelFunc
currHead *types.Header
subID int
// pollReqCh is used to request new polls of the upstream
// RPC client.
pollReqCh chan struct{}
mtx sync.RWMutex
subs map[int]chan *types.Header
closedCh chan struct{}
}
type WrappedHTTPClientOption func(w *PollingClient)
// WithPollRate specifies the rate at which the PollingClient will poll
// for new heads. Setting this to zero disables polling altogether,
// which is useful for testing.
func WithPollRate(duration time.Duration) WrappedHTTPClientOption {
return func(w *PollingClient) {
w.pollRate = duration
}
}
// NewPollingClient returns a new PollingClient. Canceling the passed-in context
// will close the client. Callers are responsible for closing the client in order
// to prevent resource leaks.
func NewPollingClient(ctx context.Context, lgr log.Logger, c RPC, opts ...WrappedHTTPClientOption) *PollingClient {
ctx, cancel := context.WithCancel(ctx)
res := &PollingClient{
c: c,
lgr: lgr,
pollRate: 250 * time.Millisecond,
ctx: ctx,
cancel: cancel,
pollReqCh: make(chan struct{}, 1),
subs: make(map[int]chan *types.Header),
closedCh: make(chan struct{}),
}
for _, opt := range opts {
opt(res)
}
go res.pollHeads()
return res
}
// Close closes the PollingClient and the underlying RPC client it talks to.
func (w *PollingClient) Close() {
w.cancel()
<-w.closedCh
w.c.Close()
}
func (w *PollingClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
return w.c.CallContext(ctx, result, method, args...)
}
func (w *PollingClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
return w.c.BatchCallContext(ctx, b)
}
// EthSubscribe creates a new newHeads subscription. It takes identical arguments
// to Geth's native EthSubscribe method. It will return an error, however, if the
// passed in channel is not a *types.Headers channel or the subscription type is not
// newHeads.
func (w *PollingClient) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) {
select {
case <-w.ctx.Done():
return nil, ErrSubscriberClosed
default:
}
headerCh, ok := channel.(chan<- *types.Header)
if !ok {
return nil, errors.New("invalid channel type")
}
if len(args) != 1 {
return nil, errors.New("invalid subscription args")
}
if args[0] != "newHeads" {
return nil, errors.New("unsupported subscription type")
}
sub := make(chan *types.Header, 1)
w.mtx.Lock()
subID := w.subID
w.subID++
w.subs[subID] = sub
w.mtx.Unlock()
return event.NewSubscription(func(quit <-chan struct{}) error {
for {
select {
case header := <-sub:
headerCh <- header
case <-quit:
w.mtx.Lock()
delete(w.subs, subID)
w.mtx.Unlock()
return nil
case <-w.ctx.Done():
return nil
}
}
}), nil
}
func (w *PollingClient) pollHeads() {
// To prevent polls from stacking up in case HTTP requests
// are slow, use a similar model to the driver in which
// polls are requested manually after each header is fetched.
reqPollAfter := func() {
if w.pollRate == 0 {
return
}
time.AfterFunc(w.pollRate, w.reqPoll)
}
reqPollAfter()
defer close(w.closedCh)
for {
select {
case <-w.pollReqCh:
// We don't need backoff here because we'll just try again
// after the pollRate elapses.
head, err := w.getLatestHeader()
if err != nil {
w.lgr.Error("error getting latest header", "err", err)
reqPollAfter()
continue
}
if w.currHead != nil && w.currHead.Hash() == head.Hash() {
w.lgr.Trace("no change in head, skipping notifications")
reqPollAfter()
continue
}
w.lgr.Trace("notifying subscribers of new head", "head", head.Hash())
w.currHead = head
w.mtx.RLock()
for _, sub := range w.subs {
sub <- head
}
w.mtx.RUnlock()
reqPollAfter()
case <-w.ctx.Done():
w.c.Close()
return
}
}
}
func (w *PollingClient) getLatestHeader() (*types.Header, error) {
ctx, cancel := context.WithTimeout(w.ctx, 5*time.Second)
defer cancel()
var head *types.Header
err := w.CallContext(ctx, &head, "eth_getBlockByNumber", "latest", false)
if err == nil && head == nil {
err = ethereum.NotFound
}
return head, err
}
func (w *PollingClient) reqPoll() {
w.pollReqCh <- struct{}{}
}
package client
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/require"
)
type MockRPC struct {
t *testing.T
callResults []*callResult
mtx sync.RWMutex
callCount int
autopop bool
closed bool
}
type callResult struct {
root common.Hash
error error
}
func (m *MockRPC) Close() {
m.closed = true
}
func (m *MockRPC) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
m.mtx.Lock()
defer m.mtx.Unlock()
if method != "eth_getBlockByNumber" {
m.t.Fatalf("invalid method %s", method)
}
if args[0] != "latest" {
m.t.Fatalf("invalid arg %v", args[0])
}
m.callCount++
res := m.callResults[0]
headerP := result.(**types.Header)
*headerP = &types.Header{
Root: res.root,
}
if m.autopop {
m.callResults = m.callResults[1:]
}
return res.error
}
func (m *MockRPC) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
m.t.Fatal("BatchCallContext should not be called")
return nil
}
func (m *MockRPC) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) {
m.t.Fatal("EthSubscribe should not be called")
return nil, nil
}
func (m *MockRPC) popResult() {
m.mtx.Lock()
defer m.mtx.Unlock()
m.callResults = m.callResults[1:]
}
func TestPollingClientSubscribeUnsubscribe(t *testing.T) {
lgr := log.New()
lgr.SetHandler(log.DiscardHandler())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
root1 := common.Hash{0x01}
root2 := common.Hash{0x02}
root3 := common.Hash{0x03}
mockRPC := &MockRPC{
t: t,
callResults: []*callResult{
{root1, nil},
{root2, nil},
{root3, nil},
},
}
client := NewPollingClient(ctx, lgr, mockRPC, WithPollRate(0))
subs := make([]ethereum.Subscription, 0)
chans := make([]chan *types.Header, 0)
for i := 0; i < 2; i++ {
ch := make(chan *types.Header, 2)
sub, err := doSubscribe(client, ch)
require.NoError(t, err)
subs = append(subs, sub)
chans = append(chans, ch)
}
client.reqPoll()
requireChansEqual(t, chans, root1)
mockRPC.popResult()
client.reqPoll()
requireChansEqual(t, chans, root2)
// Poll an additional time to show that responses with the same
// data don't notify again.
client.reqPoll()
// Verify that no further notifications have been sent.
for _, ch := range chans {
select {
case <-ch:
t.Fatal("unexpected notification")
case <-time.NewTimer(10 * time.Millisecond).C:
continue
}
}
mockRPC.popResult()
subs[0].Unsubscribe()
client.reqPoll()
select {
case <-chans[0]:
t.Fatal("unexpected notification")
case <-time.NewTimer(10 * time.Millisecond).C:
}
header := <-chans[1]
require.Equal(t, root3, header.Root)
}
func TestPollingClientErrorRecovery(t *testing.T) {
lgr := log.New()
lgr.SetHandler(log.DiscardHandler())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
root := common.Hash{0x01}
mockRPC := &MockRPC{
t: t,
callResults: []*callResult{
{common.Hash{}, errors.New("foobar")},
{common.Hash{}, errors.New("foobar")},
{root, nil},
},
autopop: true,
}
client := NewPollingClient(ctx, lgr, mockRPC, WithPollRate(0))
ch := make(chan *types.Header, 1)
sub, err := doSubscribe(client, ch)
require.NoError(t, err)
defer sub.Unsubscribe()
for i := 0; i < 3; i++ {
client.reqPoll()
}
header := <-ch
require.Equal(t, root, header.Root)
require.Equal(t, 3, mockRPC.callCount)
}
func TestPollingClientClose(t *testing.T) {
lgr := log.New()
lgr.SetHandler(log.DiscardHandler())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
root := common.Hash{0x01}
mockRPC := &MockRPC{
t: t,
callResults: []*callResult{
{root, nil},
},
autopop: true,
}
client := NewPollingClient(ctx, lgr, mockRPC, WithPollRate(0))
ch := make(chan *types.Header, 1)
sub, err := doSubscribe(client, ch)
require.NoError(t, err)
client.reqPoll()
header := <-ch
cancel()
require.Nil(t, <-sub.Err())
require.Equal(t, root, header.Root)
require.Equal(t, 1, mockRPC.callCount)
// unsubscribe should be safe
sub.Unsubscribe()
_, err = doSubscribe(client, ch)
require.Equal(t, ErrSubscriberClosed, err)
}
func requireChansEqual(t *testing.T, chans []chan *types.Header, root common.Hash) {
for _, ch := range chans {
header := <-ch
require.Equal(t, root, header.Root)
}
}
func doSubscribe(client RPC, ch chan<- *types.Header) (ethereum.Subscription, error) {
return client.EthSubscribe(context.Background(), ch, "newHeads")
}
...@@ -2,31 +2,99 @@ package client ...@@ -2,31 +2,99 @@ package client
import ( import (
"context" "context"
"fmt"
"regexp"
"github.com/ethereum-optimism/optimism/op-node/backoff"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/log"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
) )
var httpRegex = regexp.MustCompile("^http(s)?://")
type RPC interface { type RPC interface {
Close() Close()
CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
BatchCallContext(ctx context.Context, b []rpc.BatchElem) error BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error)
}
// NewRPC returns the correct client.RPC instance for a given RPC url.
func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...rpc.ClientOption) (RPC, error) {
underlying, err := DialRPCClientWithBackoff(ctx, lgr, addr, opts...)
if err != nil {
return nil, err
}
wrapped := &BaseRPCClient{
c: underlying,
}
if httpRegex.MatchString(addr) {
return NewPollingClient(ctx, lgr, wrapped), nil
}
return wrapped, nil
}
// Dials a JSON-RPC endpoint repeatedly, with a backoff, until a client connection is established. Auth is optional.
func DialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, opts ...rpc.ClientOption) (*rpc.Client, error) {
bOff := backoff.Exponential()
var ret *rpc.Client
err := backoff.Do(10, bOff, func() error {
client, err := rpc.DialOptions(ctx, addr, opts...)
if err != nil {
if client == nil {
return fmt.Errorf("failed to dial address (%s): %w", addr, err)
}
log.Warn("failed to dial address, but may connect later", "addr", addr, "err", err)
}
ret = client
return nil
})
if err != nil {
return nil, err
}
return ret, nil
}
// BaseRPCClient is a wrapper around a concrete *rpc.Client instance to make it compliant
// with the client.RPC interface.
type BaseRPCClient struct {
c *rpc.Client
}
func NewBaseRPCClient(c *rpc.Client) *BaseRPCClient {
return &BaseRPCClient{c: c}
}
func (b *BaseRPCClient) Close() {
b.c.Close()
}
func (b *BaseRPCClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
return b.c.CallContext(ctx, result, method, args...)
}
func (b *BaseRPCClient) BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error {
return b.c.BatchCallContext(ctx, batch)
}
func (b *BaseRPCClient) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) {
return b.c.EthSubscribe(ctx, channel, args...)
} }
// InstrumentedRPCClient is an RPC client that tracks // InstrumentedRPCClient is an RPC client that tracks
// Prometheus metrics for each call. // Prometheus metrics for each call.
type InstrumentedRPCClient struct { type InstrumentedRPCClient struct {
c *rpc.Client c RPC
m *metrics.Metrics m *metrics.Metrics
} }
// NewInstrumentedRPC creates a new instrumented RPC client. It takes // NewInstrumentedRPC creates a new instrumented RPC client.
// a concrete *rpc.Client to prevent people from passing in an already func NewInstrumentedRPC(c RPC, m *metrics.Metrics) *InstrumentedRPCClient {
// instrumented client.
func NewInstrumentedRPC(c *rpc.Client, m *metrics.Metrics) *InstrumentedRPCClient {
return &InstrumentedRPCClient{ return &InstrumentedRPCClient{
c: c, c: c,
m: m, m: m,
...@@ -49,14 +117,10 @@ func (ic *InstrumentedRPCClient) BatchCallContext(ctx context.Context, b []rpc.B ...@@ -49,14 +117,10 @@ func (ic *InstrumentedRPCClient) BatchCallContext(ctx context.Context, b []rpc.B
}, b) }, b)
} }
func (ic *InstrumentedRPCClient) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error) { func (ic *InstrumentedRPCClient) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) {
return ic.c.EthSubscribe(ctx, channel, args...) return ic.c.EthSubscribe(ctx, channel, args...)
} }
func (ic *InstrumentedRPCClient) Client() Client {
return NewInstrumentedClient(ic.c, ic.m)
}
// instrumentBatch handles metrics for batch calls. Request metrics are // instrumentBatch handles metrics for batch calls. Request metrics are
// increased for each batch element. Request durations are tracked for // increased for each batch element. Request durations are tracked for
// the batch as a whole using a special <batch> method. Errors are tracked // the batch as a whole using a special <batch> method. Errors are tracked
......
...@@ -5,7 +5,7 @@ import ( ...@@ -5,7 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/ethereum-optimism/optimism/op-node/backoff" "github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
gn "github.com/ethereum/go-ethereum/node" gn "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
...@@ -13,13 +13,13 @@ import ( ...@@ -13,13 +13,13 @@ import (
type L2EndpointSetup interface { type L2EndpointSetup interface {
// Setup a RPC client to a L2 execution engine to process rollup blocks with. // Setup a RPC client to a L2 execution engine to process rollup blocks with.
Setup(ctx context.Context, log log.Logger) (cl *rpc.Client, err error) Setup(ctx context.Context, log log.Logger) (cl client.RPC, err error)
Check() error Check() error
} }
type L1EndpointSetup interface { type L1EndpointSetup interface {
// Setup a RPC client to a L1 node to pull rollup input-data from. // Setup a RPC client to a L1 node to pull rollup input-data from.
Setup(ctx context.Context, log log.Logger) (cl *rpc.Client, trust bool, err error) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error)
} }
type L2EndpointConfig struct { type L2EndpointConfig struct {
...@@ -40,12 +40,12 @@ func (cfg *L2EndpointConfig) Check() error { ...@@ -40,12 +40,12 @@ func (cfg *L2EndpointConfig) Check() error {
return nil return nil
} }
func (cfg *L2EndpointConfig) Setup(ctx context.Context, log log.Logger) (*rpc.Client, error) { func (cfg *L2EndpointConfig) Setup(ctx context.Context, log log.Logger) (client.RPC, error) {
if err := cfg.Check(); err != nil { if err := cfg.Check(); err != nil {
return nil, err return nil, err
} }
auth := rpc.WithHTTPAuth(gn.NewJWTAuth(cfg.L2EngineJWTSecret)) auth := rpc.WithHTTPAuth(gn.NewJWTAuth(cfg.L2EngineJWTSecret))
l2Node, err := dialRPCClientWithBackoff(ctx, log, cfg.L2EngineAddr, auth) l2Node, err := client.NewRPC(ctx, log, cfg.L2EngineAddr, auth)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -55,7 +55,7 @@ func (cfg *L2EndpointConfig) Setup(ctx context.Context, log log.Logger) (*rpc.Cl ...@@ -55,7 +55,7 @@ func (cfg *L2EndpointConfig) Setup(ctx context.Context, log log.Logger) (*rpc.Cl
// PreparedL2Endpoints enables testing with in-process pre-setup RPC connections to L2 engines // PreparedL2Endpoints enables testing with in-process pre-setup RPC connections to L2 engines
type PreparedL2Endpoints struct { type PreparedL2Endpoints struct {
Client *rpc.Client Client client.RPC
} }
func (p *PreparedL2Endpoints) Check() error { func (p *PreparedL2Endpoints) Check() error {
...@@ -67,7 +67,7 @@ func (p *PreparedL2Endpoints) Check() error { ...@@ -67,7 +67,7 @@ func (p *PreparedL2Endpoints) Check() error {
var _ L2EndpointSetup = (*PreparedL2Endpoints)(nil) var _ L2EndpointSetup = (*PreparedL2Endpoints)(nil)
func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (*rpc.Client, error) { func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (client.RPC, error) {
return p.Client, nil return p.Client, nil
} }
...@@ -82,8 +82,8 @@ type L1EndpointConfig struct { ...@@ -82,8 +82,8 @@ type L1EndpointConfig struct {
var _ L1EndpointSetup = (*L1EndpointConfig)(nil) var _ L1EndpointSetup = (*L1EndpointConfig)(nil)
func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger) (cl *rpc.Client, trust bool, err error) { func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) {
l1Node, err := dialRPCClientWithBackoff(ctx, log, cfg.L1NodeAddr) l1Node, err := client.NewRPC(ctx, log, cfg.L1NodeAddr)
if err != nil { if err != nil {
return nil, false, fmt.Errorf("failed to dial L1 address (%s): %w", cfg.L1NodeAddr, err) return nil, false, fmt.Errorf("failed to dial L1 address (%s): %w", cfg.L1NodeAddr, err)
} }
...@@ -92,33 +92,12 @@ func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger) (cl *rpc ...@@ -92,33 +92,12 @@ func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger) (cl *rpc
// PreparedL1Endpoint enables testing with an in-process pre-setup RPC connection to L1 // PreparedL1Endpoint enables testing with an in-process pre-setup RPC connection to L1
type PreparedL1Endpoint struct { type PreparedL1Endpoint struct {
Client *rpc.Client Client client.RPC
TrustRPC bool TrustRPC bool
} }
var _ L1EndpointSetup = (*PreparedL1Endpoint)(nil) var _ L1EndpointSetup = (*PreparedL1Endpoint)(nil)
func (p *PreparedL1Endpoint) Setup(ctx context.Context, log log.Logger) (cl *rpc.Client, trust bool, err error) { func (p *PreparedL1Endpoint) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) {
return p.Client, p.TrustRPC, nil return p.Client, p.TrustRPC, nil
} }
// Dials a JSON-RPC endpoint repeatedly, with a backoff, until a client connection is established. Auth is optional.
func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, opts ...rpc.ClientOption) (*rpc.Client, error) {
bOff := backoff.Exponential()
var ret *rpc.Client
err := backoff.Do(10, bOff, func() error {
client, err := rpc.DialOptions(ctx, addr, opts...)
if err != nil {
if client == nil {
return fmt.Errorf("failed to dial address (%s): %w", addr, err)
}
log.Warn("failed to dial address, but may connect later", "addr", addr, "err", err)
}
ret = client
return nil
})
if err != nil {
return nil, err
}
return ret, nil
}
...@@ -355,3 +355,11 @@ func (n *OpNode) Close() error { ...@@ -355,3 +355,11 @@ func (n *OpNode) Close() error {
} }
return result.ErrorOrNil() return result.ErrorOrNil()
} }
func (n *OpNode) ListenAddr() string {
return n.server.listenAddr.String()
}
func (n *OpNode) HTTPEndpoint() string {
return fmt.Sprintf("http://%s", n.ListenAddr())
}
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"math/rand" "math/rand"
"testing" "testing"
rpcclient "github.com/ethereum-optimism/optimism/op-node/client"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
...@@ -101,7 +102,7 @@ func TestOutputAtBlock(t *testing.T) { ...@@ -101,7 +102,7 @@ func TestOutputAtBlock(t *testing.T) {
assert.NoError(t, server.Start()) assert.NoError(t, server.Start())
defer server.Stop() defer server.Stop()
client, err := dialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String()) client, err := rpcclient.DialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String())
assert.NoError(t, err) assert.NoError(t, err)
var out []eth.Bytes32 var out []eth.Bytes32
...@@ -127,7 +128,7 @@ func TestVersion(t *testing.T) { ...@@ -127,7 +128,7 @@ func TestVersion(t *testing.T) {
assert.NoError(t, server.Start()) assert.NoError(t, server.Start())
defer server.Stop() defer server.Stop()
client, err := dialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String()) client, err := rpcclient.DialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String())
assert.NoError(t, err) assert.NoError(t, err)
var out string var out string
...@@ -162,7 +163,7 @@ func TestSyncStatus(t *testing.T) { ...@@ -162,7 +163,7 @@ func TestSyncStatus(t *testing.T) {
assert.NoError(t, server.Start()) assert.NoError(t, server.Start())
defer server.Stop() defer server.Stop()
client, err := dialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String()) client, err := rpcclient.DialRPCClientWithBackoff(context.Background(), log, "http://"+server.Addr().String())
assert.NoError(t, err) assert.NoError(t, err)
var out *eth.SyncStatus var out *eth.SyncStatus
......
...@@ -224,7 +224,11 @@ func BuildBlocksValidator(log log.Logger, cfg *rollup.Config) pubsub.ValidatorEx ...@@ -224,7 +224,11 @@ func BuildBlocksValidator(log log.Logger, cfg *rollup.Config) pubsub.ValidatorEx
signatureBytes, payloadBytes := data[:65], data[65:] signatureBytes, payloadBytes := data[:65], data[65:]
// [REJECT] if the signature by the sequencer is not valid // [REJECT] if the signature by the sequencer is not valid
signingHash := BlockSigningHash(cfg, payloadBytes) signingHash, err := BlockSigningHash(cfg, payloadBytes)
if err != nil {
log.Warn("failed to compute block signing hash", "err", err, "peer", id)
return pubsub.ValidationReject
}
pub, err := crypto.SigToPub(signingHash[:], signatureBytes) pub, err := crypto.SigToPub(signingHash[:], signatureBytes)
if err != nil { if err != nil {
......
...@@ -23,19 +23,22 @@ type Signer interface { ...@@ -23,19 +23,22 @@ type Signer interface {
io.Closer io.Closer
} }
func SigningHash(domain [32]byte, chainID *big.Int, payloadBytes []byte) common.Hash { func SigningHash(domain [32]byte, chainID *big.Int, payloadBytes []byte) (common.Hash, error) {
var msgInput [32 + 32 + 32]byte var msgInput [32 + 32 + 32]byte
// domain: first 32 bytes // domain: first 32 bytes
copy(msgInput[:32], domain[:]) copy(msgInput[:32], domain[:])
// chain_id: second 32 bytes // chain_id: second 32 bytes
if chainID.BitLen() > 256 {
return common.Hash{}, errors.New("chain_id is too large")
}
chainID.FillBytes(msgInput[32:64]) chainID.FillBytes(msgInput[32:64])
// payload_hash: third 32 bytes, hash of encoded payload // payload_hash: third 32 bytes, hash of encoded payload
copy(msgInput[32:], crypto.Keccak256(payloadBytes)) copy(msgInput[32:], crypto.Keccak256(payloadBytes))
return crypto.Keccak256Hash(msgInput[:]) return crypto.Keccak256Hash(msgInput[:]), nil
} }
func BlockSigningHash(cfg *rollup.Config, payloadBytes []byte) common.Hash { func BlockSigningHash(cfg *rollup.Config, payloadBytes []byte) (common.Hash, error) {
return SigningHash(SigningDomainBlocksV1, cfg.L2ChainID, payloadBytes) return SigningHash(SigningDomainBlocksV1, cfg.L2ChainID, payloadBytes)
} }
...@@ -52,7 +55,10 @@ func (s *LocalSigner) Sign(ctx context.Context, domain [32]byte, chainID *big.In ...@@ -52,7 +55,10 @@ func (s *LocalSigner) Sign(ctx context.Context, domain [32]byte, chainID *big.In
if s.priv == nil { if s.priv == nil {
return nil, errors.New("signer is closed") return nil, errors.New("signer is closed")
} }
signingHash := SigningHash(domain, chainID, encodedMsg) signingHash, err := SigningHash(domain, chainID, encodedMsg)
if err != nil {
return nil, err
}
signature, err := crypto.Sign(signingHash[:], s.priv) signature, err := crypto.Sign(signingHash[:], s.priv)
if err != nil { if err != nil {
return nil, err return nil, err
......
...@@ -5,9 +5,10 @@ import ( ...@@ -5,9 +5,10 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/eth"
) )
// isDepositTx checks an opaqueTx to determine if it is a Deposit Transaction // isDepositTx checks an opaqueTx to determine if it is a Deposit Transaction
...@@ -68,9 +69,13 @@ func sanityCheckPayload(payload *eth.ExecutionPayload) error { ...@@ -68,9 +69,13 @@ func sanityCheckPayload(payload *eth.ExecutionPayload) error {
type BlockInsertionErrType uint type BlockInsertionErrType uint
const ( const (
// BlockInsertOK indicates that the payload was successfully executed and appended to the canonical chain.
BlockInsertOK BlockInsertionErrType = iota BlockInsertOK BlockInsertionErrType = iota
// BlockInsertTemporaryErr indicates that the insertion failed but may succeed at a later time without changes to the payload.
BlockInsertTemporaryErr BlockInsertTemporaryErr
// BlockInsertPrestateErr indicates that the pre-state to insert the payload could not be prepared, e.g. due to missing chain data.
BlockInsertPrestateErr BlockInsertPrestateErr
// BlockInsertPayloadErr indicates that the payload was invalid and cannot become canonical.
BlockInsertPayloadErr BlockInsertPayloadErr
) )
...@@ -80,37 +85,53 @@ const ( ...@@ -80,37 +85,53 @@ const (
// If updateSafe is true, the head block is considered to be the safe head as well as the head. // If updateSafe is true, the head block is considered to be the safe head as well as the head.
// It returns the payload, an RPC error (if the payload might still be valid), and a payload error (if the payload was not valid) // It returns the payload, an RPC error (if the payload might still be valid), and a payload error (if the payload was not valid)
func InsertHeadBlock(ctx context.Context, log log.Logger, eng Engine, fc eth.ForkchoiceState, attrs *eth.PayloadAttributes, updateSafe bool) (out *eth.ExecutionPayload, errTyp BlockInsertionErrType, err error) { func InsertHeadBlock(ctx context.Context, log log.Logger, eng Engine, fc eth.ForkchoiceState, attrs *eth.PayloadAttributes, updateSafe bool) (out *eth.ExecutionPayload, errTyp BlockInsertionErrType, err error) {
id, errTyp, err := StartPayload(ctx, eng, fc, attrs)
if err != nil {
return nil, errTyp, err
}
return ConfirmPayload(ctx, log, eng, fc, id, updateSafe)
}
// StartPayload starts an execution payload building process in the provided Engine, with the given attributes.
// The severity of the error is distinguished to determine whether the same payload attributes may be re-attempted later.
func StartPayload(ctx context.Context, eng Engine, fc eth.ForkchoiceState, attrs *eth.PayloadAttributes) (id eth.PayloadID, errType BlockInsertionErrType, err error) {
fcRes, err := eng.ForkchoiceUpdate(ctx, &fc, attrs) fcRes, err := eng.ForkchoiceUpdate(ctx, &fc, attrs)
if err != nil { if err != nil {
var inputErr eth.InputError var inputErr eth.InputError
if errors.As(err, &inputErr) { if errors.As(err, &inputErr) {
switch inputErr.Code { switch inputErr.Code {
case eth.InvalidForkchoiceState: case eth.InvalidForkchoiceState:
return nil, BlockInsertPrestateErr, fmt.Errorf("pre-block-creation forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap()) return eth.PayloadID{}, BlockInsertPrestateErr, fmt.Errorf("pre-block-creation forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap())
case eth.InvalidPayloadAttributes: case eth.InvalidPayloadAttributes:
return nil, BlockInsertPayloadErr, fmt.Errorf("payload attributes are not valid, cannot build block: %w", inputErr.Unwrap()) return eth.PayloadID{}, BlockInsertPayloadErr, fmt.Errorf("payload attributes are not valid, cannot build block: %w", inputErr.Unwrap())
default: default:
return nil, BlockInsertPrestateErr, fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err) return eth.PayloadID{}, BlockInsertPrestateErr, fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err)
} }
} else { } else {
return nil, BlockInsertTemporaryErr, fmt.Errorf("failed to create new block via forkchoice: %w", err) return eth.PayloadID{}, BlockInsertTemporaryErr, fmt.Errorf("failed to create new block via forkchoice: %w", err)
} }
} }
switch fcRes.PayloadStatus.Status { switch fcRes.PayloadStatus.Status {
// TODO(proto): snap sync - specify explicit different error type if node is syncing // TODO(proto): snap sync - specify explicit different error type if node is syncing
case eth.ExecutionInvalid, eth.ExecutionInvalidBlockHash: case eth.ExecutionInvalid, eth.ExecutionInvalidBlockHash:
return nil, BlockInsertPayloadErr, eth.ForkchoiceUpdateErr(fcRes.PayloadStatus) return eth.PayloadID{}, BlockInsertPayloadErr, eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)
case eth.ExecutionValid: case eth.ExecutionValid:
break
default:
return nil, BlockInsertTemporaryErr, eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)
}
id := fcRes.PayloadID id := fcRes.PayloadID
if id == nil { if id == nil {
return nil, BlockInsertTemporaryErr, errors.New("nil id in forkchoice result when expecting a valid ID") return eth.PayloadID{}, BlockInsertTemporaryErr, errors.New("nil id in forkchoice result when expecting a valid ID")
} }
payload, err := eng.GetPayload(ctx, *id) return *id, BlockInsertOK, nil
default:
return eth.PayloadID{}, BlockInsertTemporaryErr, eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)
}
}
// ConfirmPayload ends an execution payload building process in the provided Engine, and persists the payload as the canonical head.
// If updateSafe is true, then the payload will also be recognized as safe-head at the same time.
// The severity of the error is distinguished to determine whether the payload was valid and can become canonical.
func ConfirmPayload(ctx context.Context, log log.Logger, eng Engine, fc eth.ForkchoiceState, id eth.PayloadID, updateSafe bool) (out *eth.ExecutionPayload, errTyp BlockInsertionErrType, err error) {
payload, err := eng.GetPayload(ctx, id)
if err != nil { if err != nil {
// even if it is an input-error (unknown payload ID), it is temporary, since we will re-attempt the full payload building, not just the retrieval of the payload. // even if it is an input-error (unknown payload ID), it is temporary, since we will re-attempt the full payload building, not just the retrieval of the payload.
return nil, BlockInsertTemporaryErr, fmt.Errorf("failed to get execution payload: %w", err) return nil, BlockInsertTemporaryErr, fmt.Errorf("failed to get execution payload: %w", err)
...@@ -134,7 +155,7 @@ func InsertHeadBlock(ctx context.Context, log log.Logger, eng Engine, fc eth.For ...@@ -134,7 +155,7 @@ func InsertHeadBlock(ctx context.Context, log log.Logger, eng Engine, fc eth.For
if updateSafe { if updateSafe {
fc.SafeBlockHash = payload.BlockHash fc.SafeBlockHash = payload.BlockHash
} }
fcRes, err = eng.ForkchoiceUpdate(ctx, &fc, nil) fcRes, err := eng.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil { if err != nil {
var inputErr eth.InputError var inputErr eth.InputError
if errors.As(err, &inputErr) { if errors.As(err, &inputErr) {
......
...@@ -2,7 +2,6 @@ package sources ...@@ -2,7 +2,6 @@ package sources
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/client"
...@@ -248,53 +247,6 @@ func (s *EthClient) Fetch(ctx context.Context, blockHash common.Hash) (eth.Block ...@@ -248,53 +247,6 @@ func (s *EthClient) Fetch(ctx context.Context, blockHash common.Hash) (eth.Block
return info, txs, r, nil return info, txs, r, nil
} }
// BlockIDRange returns a range of block IDs from the provided begin up to max blocks after the begin.
// This batch-requests all blocks by number in the range at once, and then verifies the consistency
func (s *EthClient) BlockIDRange(ctx context.Context, begin eth.BlockID, max uint64) ([]eth.BlockID, error) {
headerRequests := make([]rpc.BatchElem, max)
for i := uint64(0); i < max; i++ {
headerRequests[i] = rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []interface{}{hexutil.EncodeUint64(begin.Number + 1 + i), false},
Result: new(*rpcHeader),
Error: nil,
}
}
if err := s.client.BatchCallContext(ctx, headerRequests); err != nil {
return nil, err
}
out := make([]eth.BlockID, 0, max)
// try to cache everything we have before halting on the results with errors
for i := 0; i < len(headerRequests); i++ {
result := *headerRequests[i].Result.(**rpcHeader)
if headerRequests[i].Error == nil {
if result == nil {
break // no more headers from here
}
info, err := result.Info(s.trustRPC, s.mustBePostMerge)
if err != nil {
return nil, fmt.Errorf("bad header data for block %s: %w", headerRequests[i].Args[0], err)
}
s.headersCache.Add(info.Hash(), info)
out = append(out, info.ID())
prev := begin
if i > 0 {
prev = out[i-1]
}
if prev.Hash != info.ParentHash() {
return nil, fmt.Errorf("inconsistent results from L1 chain range request, block %s not expected parent %s of %s", prev, info.ParentHash(), info.ID())
}
} else if errors.Is(headerRequests[i].Error, ethereum.NotFound) {
break // no more headers from here
} else {
return nil, fmt.Errorf("failed to retrieve block: %s: %w", headerRequests[i].Args[0], headerRequests[i].Error)
}
}
return out, nil
}
func (s *EthClient) GetProof(ctx context.Context, address common.Address, blockTag string) (*eth.AccountResult, error) { func (s *EthClient) GetProof(ctx context.Context, address common.Address, blockTag string) (*eth.AccountResult, error) {
var getProofResponse *eth.AccountResult var getProofResponse *eth.AccountResult
err := s.client.CallContext(ctx, &getProofResponse, "eth_getProof", address, []common.Hash{}, blockTag) err := s.client.CallContext(ctx, &getProofResponse, "eth_getProof", address, []common.Hash{}, blockTag)
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"math/rand" "math/rand"
"testing" "testing"
"github.com/ethereum/go-ethereum"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
...@@ -30,7 +31,7 @@ func (m *mockRPC) CallContext(ctx context.Context, result interface{}, method st ...@@ -30,7 +31,7 @@ func (m *mockRPC) CallContext(ctx context.Context, result interface{}, method st
return m.MethodCalled("CallContext", ctx, result, method, args).Get(0).([]error)[0] return m.MethodCalled("CallContext", ctx, result, method, args).Get(0).([]error)[0]
} }
func (m *mockRPC) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error) { func (m *mockRPC) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) {
called := m.MethodCalled("EthSubscribe", channel, args) called := m.MethodCalled("EthSubscribe", channel, args)
return called.Get(0).(*rpc.ClientSubscription), called.Get(1).([]error)[0] return called.Get(0).(*rpc.ClientSubscription), called.Get(1).([]error)[0]
} }
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"sync" "sync"
"github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/client"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
) )
...@@ -39,7 +40,7 @@ func (lc *limitClient) CallContext(ctx context.Context, result interface{}, meth ...@@ -39,7 +40,7 @@ func (lc *limitClient) CallContext(ctx context.Context, result interface{}, meth
return lc.c.CallContext(ctx, result, method, args...) return lc.c.CallContext(ctx, result, method, args...)
} }
func (lc *limitClient) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error) { func (lc *limitClient) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) {
// subscription doesn't count towards request limit // subscription doesn't count towards request limit
return lc.c.EthSubscribe(ctx, channel, args...) return lc.c.EthSubscribe(ctx, channel, args...)
} }
......
...@@ -3,6 +3,7 @@ package testutils ...@@ -3,6 +3,7 @@ package testutils
import ( import (
"context" "context"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/client"
...@@ -39,7 +40,7 @@ func (r RPCErrFaker) BatchCallContext(ctx context.Context, b []rpc.BatchElem) er ...@@ -39,7 +40,7 @@ func (r RPCErrFaker) BatchCallContext(ctx context.Context, b []rpc.BatchElem) er
return r.RPC.BatchCallContext(ctx, b) return r.RPC.BatchCallContext(ctx, b)
} }
func (r RPCErrFaker) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*rpc.ClientSubscription, error) { func (r RPCErrFaker) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (ethereum.Subscription, error) {
if r.ErrFn != nil { if r.ErrFn != nil {
if err := r.ErrFn(); err != nil { if err := r.ErrFn(); err != nil {
return nil, err return nil, err
......
...@@ -25,6 +25,10 @@ const config: HardhatUserConfig = { ...@@ -25,6 +25,10 @@ const config: HardhatUserConfig = {
url: process.env.L1_RPC || '', url: process.env.L1_RPC || '',
accounts: [process.env.PRIVATE_KEY_DEPLOYER || ethers.constants.HashZero], accounts: [process.env.PRIVATE_KEY_DEPLOYER || ethers.constants.HashZero],
}, },
goerli: {
url: process.env.L1_RPC || '',
accounts: [process.env.PRIVATE_KEY_DEPLOYER || ethers.constants.HashZero],
},
}, },
external: { external: {
contracts: [ contracts: [
...@@ -35,7 +39,10 @@ const config: HardhatUserConfig = { ...@@ -35,7 +39,10 @@ const config: HardhatUserConfig = {
deployments: { deployments: {
hivenet: ['../contracts-bedrock/deployments/hivenet'], hivenet: ['../contracts-bedrock/deployments/hivenet'],
devnetL1: ['../contracts-bedrock/deployments/devnetL1'], devnetL1: ['../contracts-bedrock/deployments/devnetL1'],
goerli: ['../contracts-bedrock/deployments/goerli'], goerli: [
'../contracts-bedrock/deployments/goerli',
'../contracts/deployments/goerli',
],
}, },
}, },
} }
......
import { task, types } from 'hardhat/config'
import { HardhatRuntimeEnvironment } from 'hardhat/types'
import { Wallet, providers } from 'ethers'
import { predeploys } from '@eth-optimism/contracts-bedrock'
import 'hardhat-deploy'
import '@nomiclabs/hardhat-ethers'
import {
CrossChainMessenger,
StandardBridgeAdapter,
MessageStatus,
} from '../src'
task('finalize-withdrawal', 'Finalize a withdrawal')
.addParam(
'transactionHash',
'L2 Transaction hash to finalize',
'',
types.string
)
.addParam('l2Url', 'L2 HTTP URL', 'http://localhost:9545', types.string)
.setAction(async (args, hre: HardhatRuntimeEnvironment) => {
const txHash = args.transactionHash
if (txHash === '') {
console.log('No tx hash')
}
const signers = await hre.ethers.getSigners()
if (signers.length === 0) {
throw new Error('No configured signers')
}
const signer = signers[0]
const address = await signer.getAddress()
console.log(`Using signer: ${address}`)
const l2Provider = new providers.StaticJsonRpcProvider(args.l2Url)
const l2Signer = new Wallet(hre.network.config.accounts[0], l2Provider)
let Deployment__L1StandardBridgeProxy = await hre.deployments.getOrNull(
'L1StandardBridgeProxy'
)
if (Deployment__L1StandardBridgeProxy === undefined) {
Deployment__L1StandardBridgeProxy = await hre.deployments.getOrNull(
'Proxy__OVM_L1StandardBridge'
)
}
let Deployment__L1CrossDomainMessengerProxy =
await hre.deployments.getOrNull('L1CrossDomainMessengerProxy')
if (Deployment__L1CrossDomainMessengerProxy === undefined) {
Deployment__L1CrossDomainMessengerProxy = await hre.deployments.getOrNull(
'Proxy__OVM_L1CrossDomainMessenger'
)
}
const Deployment__L2OutputOracleProxy = await hre.deployments.getOrNull(
'L2OutputOracleProxy'
)
const Deployment__OptimismPortalProxy = await hre.deployments.getOrNull(
'OptimismPortalProxy'
)
const messenger = new CrossChainMessenger({
l1SignerOrProvider: signer,
l2SignerOrProvider: l2Signer,
l1ChainId: await signer.getChainId(),
l2ChainId: await l2Signer.getChainId(),
bridges: {
Standard: {
Adapter: StandardBridgeAdapter,
l1Bridge: Deployment__L1StandardBridgeProxy?.address,
l2Bridge: predeploys.L2StandardBridge,
},
},
contracts: {
l1: {
L1StandardBridge: Deployment__L1StandardBridgeProxy?.address,
L1CrossDomainMessenger:
Deployment__L1CrossDomainMessengerProxy?.address,
L2OutputOracle: Deployment__L2OutputOracleProxy?.address,
OptimismPortal: Deployment__OptimismPortalProxy?.address,
},
},
})
console.log(`Fetching message status for ${txHash}`)
const status = await messenger.getMessageStatus(txHash)
console.log(`Status: ${MessageStatus[status]}`)
if (status === MessageStatus.READY_FOR_RELAY) {
const tx = await messenger.finalizeMessage(txHash)
const receipt = await tx.wait()
console.log(receipt)
console.log('Finalized withdrawal')
}
})
import './deposit-eth' import './deposit-eth'
import './deposit-erc20' import './deposit-erc20'
import './finalize-withdrawal'
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment