Commit 49f7023f authored by vicotor's avatar vicotor

implement batcher.

parent 8d8b708b
package chaindb
import (
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
nebulav1 "github.com/exchain/go-exchain/exchain/protocol/gen/go/nebula/v1"
"github.com/exchain/go-exchain/metadb"
"github.com/golang/protobuf/proto"
lru "github.com/hashicorp/golang-lru"
"github.com/holiman/uint256"
)
type ChainReader interface {
ChainId() (*uint256.Int, error)
GetBlockByLabel(label ExChainBlockLabel) (*nebulav1.Block, error)
CurrentHeight() uint256.Int
GetOriginBlockData(*uint256.Int) ([]byte, error)
GetBlock(*uint256.Int) *nebulav1.Block
GetBlockHeader(*uint256.Int) *nebulav1.BlockHeader
BlockHeaderByHash(hash common.Hash) *nebulav1.BlockHeader
BlockByHash(hash common.Hash) *nebulav1.Block
GetTransaction(hash common.Hash) (*nebulav1.Transaction, error)
GetReceipt(hash common.Hash) *nebulav1.TransactionReceipt
}
func NewChainReader(log log.Logger, database metadb.Database) ChainReader {
chain := &chainReader{
log: log,
database: database,
}
var err error
if chain.txCache, err = lru.New(1000000); err != nil {
panic(err)
}
if chain.receiptCache, err = lru.New(1000000); err != nil {
panic(err)
}
if chain.blkCache, err = lru.New(1000); err != nil {
panic(err)
}
if chain.headerCache, err = lru.New(1000); err != nil {
panic(err)
}
return chain
}
type chainReader struct {
log log.Logger
txCache *lru.Cache
receiptCache *lru.Cache
blkCache *lru.Cache
headerCache *lru.Cache
database metadb.Database
chainId *uint256.Int
}
func (m *chainReader) GetBlockByLabel(label ExChainBlockLabel) (*nebulav1.Block, error) {
switch label {
case ExChainBlockLatest, ExChainBlockFinalized:
latest := m.CurrentHeight()
return m.GetBlock(&latest), nil
case ExChainBlockEarliest:
return m.GetBlock(big0), nil
default:
return nil, errors.New("invalid block label")
}
}
func (m *chainReader) ChainId() (*uint256.Int, error) {
if m.chainId != nil {
return m.chainId, nil
}
k := chainIdKey()
if v, err := m.database.Get([]byte(k)); err != nil {
return nil, err
} else {
m.chainId = new(uint256.Int).SetBytes(v)
return m.chainId, nil
}
}
func (m *chainReader) GetTransaction(hash common.Hash) (*nebulav1.Transaction, error) {
if tx, exist := m.txCache.Get(hash); exist {
ptx := tx.(*nebulav1.Transaction)
return ptx, nil
} else {
entry, err := m.getTxEntry(hash)
if err != nil {
return nil, err
}
return m.getTransaction(uint256.NewInt(entry.BlockNumber), int(entry.Index))
}
}
func (m *chainReader) GetReceipt(txhash common.Hash) *nebulav1.TransactionReceipt {
if r, exist := m.receiptCache.Get(txhash); exist {
return r.(*nebulav1.TransactionReceipt)
} else {
entry, err := m.getTxEntry(txhash)
if err != nil {
return nil
}
return m.getReceipt(uint256.NewInt(entry.BlockNumber), int(entry.Index))
}
}
func (m *chainReader) CurrentHeight() uint256.Int {
height := uint256.NewInt(0)
// load height string
h, err := m.database.Get([]byte(chainHeightKey()))
if err == nil {
height, _ = uint256.FromDecimal(string(h))
}
return *height
}
func (m *chainReader) GetBlock(num *uint256.Int) *nebulav1.Block {
return m.GetBlockBody(num)
}
func (m *chainReader) GetBlockBody(num *uint256.Int) *nebulav1.Block {
k := blockBodyKey(num)
if b, exist := m.blkCache.Get(k); exist {
return b.(*nebulav1.Block)
} else {
d, err := m.database.Get([]byte(k))
if err != nil {
return nil
}
block := new(nebulav1.Block)
if err = proto.Unmarshal(d, block); err != nil {
return nil
}
m.blkCache.Add(k, block)
return block
}
}
func (m *chainReader) GetBlockHeader(num *uint256.Int) *nebulav1.BlockHeader {
k := blockHeaderKey(num)
if h, exist := m.headerCache.Get(k); exist {
return h.(*nebulav1.BlockHeader)
} else {
d, err := m.database.Get([]byte(k))
if err != nil {
return nil
}
header := new(nebulav1.BlockHeader)
if err = proto.Unmarshal(d, header); err != nil {
return nil
}
m.headerCache.Add(k, header)
return header
}
}
func (m *chainReader) GetOriginBlockData(num *uint256.Int) (block []byte, err error) {
k := blockBodyKey(num)
if b, exist := m.blkCache.Get(k); exist {
blk := b.(*nebulav1.Block)
return proto.Marshal(blk)
}
return m.database.Get([]byte(k))
}
func (m *chainReader) BlockHeaderByHash(hash common.Hash) *nebulav1.BlockHeader {
number := m.blockNumberByHash(hash)
if number == nil {
return nil
}
return m.GetBlockHeader(number)
}
func (m *chainReader) BlockByHash(hash common.Hash) *nebulav1.Block {
number := m.blockNumberByHash(hash)
if number == nil {
return nil
}
return m.GetBlock(number)
}
func (m *chainReader) GetBlockTransactions(num *uint256.Int) *nebulav1.TransactionList {
txs := m.getBlockTxs(num)
return txs
}
func (m *chainReader) GetBlockReceipts(num *uint256.Int) *nebulav1.TransactionReceiptList {
return m.getBlockReceipts(num)
}
func (m *chainReader) blockNumberByHash(hash common.Hash) *uint256.Int {
k := blockNumKey(hash)
if number, exist := m.txCache.Get(k); exist {
return number.(*uint256.Int)
} else {
d, err := m.database.Get([]byte(k))
if err != nil {
return nil
}
n := new(uint256.Int)
n.SetBytes(d)
m.txCache.Add(k, n)
return n
}
}
func (m *chainReader) getBlockTxs(num *uint256.Int) *nebulav1.TransactionList {
blockbody := m.GetBlockBody(num)
if blockbody == nil {
return nil
}
return blockbody.Transactions
}
func (m *chainReader) getTransaction(num *uint256.Int, index int) (*nebulav1.Transaction, error) {
blockTxs := m.getBlockTxs(num)
if blockTxs == nil {
return nil, errors.New("transaction not found")
}
if index >= len(blockTxs.Txs) || index < 0 {
return nil, errors.New("transaction not found")
}
return blockTxs.Txs[index], nil
}
func (m *chainReader) getBlockReceipts(num *uint256.Int) *nebulav1.TransactionReceiptList {
k := blockReceiptsKey(num)
d, err := m.database.Get([]byte(k))
if err != nil {
m.log.Error("GetBlockReceipts failed, ", err)
return nil
}
receipts := new(nebulav1.TransactionReceiptList)
if err := proto.Unmarshal(d, receipts); err != nil {
m.log.Error("GetBlockReceipts failed, ", err)
return nil
}
return receipts
}
func (m *chainReader) getReceipt(num *uint256.Int, index int) *nebulav1.TransactionReceipt {
blockReceipts := m.getBlockReceipts(num)
if blockReceipts == nil {
return nil
}
if index >= len(blockReceipts.Receipts) || index < 0 {
return nil
}
return blockReceipts.Receipts[index]
}
func (m *chainReader) cacheBlockTxsInfo(txs *nebulav1.TransactionList, rs *nebulav1.TransactionReceiptList) error {
if txs == nil || rs == nil {
return nil
}
if len(txs.Txs) != len(rs.Receipts) {
return errors.New("txs and receipts not match")
}
for i, tx := range txs.Txs {
receipt := rs.Receipts[i]
txhash := common.BytesToHash(receipt.Hash)
m.txCache.Add(txhash, tx)
m.receiptCache.Add(txhash, receipt)
}
return nil
}
func (m *chainReader) cacheReceipts(rs []*nebulav1.TransactionReceipt) error {
for _, receipt := range rs {
m.receiptCache.Add(receipt.Hash, receipt)
}
return nil
}
func (m *chainReader) storeTxEntry(hash common.Hash, entry txEntry) error {
k := txEntryKey(hash)
return m.database.Put([]byte(k), entry.Bytes())
}
func (m *chainReader) getTxEntry(hash common.Hash) (txEntry, error) {
k := txEntryKey(hash)
v, err := m.database.Get([]byte(k))
if err != nil {
return txEntry{}, err
}
var entry = new(txEntry)
if err := entry.SetBytes(v); err != nil {
return txEntry{}, err
}
return *entry, nil
}
......@@ -604,3 +604,22 @@ func (m *chaindb) SaveBlockData(block *nebulav1.Block, rs *nebulav1.TransactionR
return nil
}
func (m *chaindb) storeTxEntry(hash common.Hash, entry txEntry) error {
k := txEntryKey(hash)
return m.database.Put([]byte(k), entry.Bytes())
}
func (m *chaindb) getTxEntry(hash common.Hash) (txEntry, error) {
k := txEntryKey(hash)
v, err := m.database.Get([]byte(k))
if err != nil {
return txEntry{}, err
}
var entry = new(txEntry)
if err := entry.SetBytes(v); err != nil {
return txEntry{}, err
}
return *entry, nil
}
......@@ -2,7 +2,6 @@ package chaindb
import (
"fmt"
"github.com/ethereum/go-ethereum/common"
"strconv"
"strings"
)
......@@ -26,22 +25,3 @@ func (e *txEntry) SetBytes(data []byte) error {
e.Index, _ = strconv.ParseInt(splits[1], 10, 0)
return nil
}
func (m *chaindb) storeTxEntry(hash common.Hash, entry txEntry) error {
k := txEntryKey(hash)
return m.database.Put([]byte(k), entry.Bytes())
}
func (m *chaindb) getTxEntry(hash common.Hash) (txEntry, error) {
k := txEntryKey(hash)
v, err := m.database.Get([]byte(k))
if err != nil {
return txEntry{}, err
}
var entry = new(txEntry)
if err := entry.SetBytes(v); err != nil {
return txEntry{}, err
}
return *entry, nil
}
package exchainapi
import (
"context"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/exchain/go-exchain/exchain/chaindb"
nodev1 "github.com/exchain/go-exchain/exchain/protocol/gen/go/node/v1"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/holiman/uint256"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"net"
"net/http"
)
type server struct {
chain chaindb.ChainReader
nodev1.UnimplementedNodeServer
}
func (s *server) GetAccount(ctx context.Context, request *nodev1.AccountRequest) (*nodev1.AccountResponse, error) {
//TODO implement me
panic("implement me")
}
func (s *server) GetBlockHeaderByNumber(ctx context.Context, request *nodev1.GetHeaderRequest) (*nodev1.GetHeaderResponse, error) {
header := s.chain.GetBlockHeader(uint256.NewInt(request.Number))
if header == nil {
return nil, fmt.Errorf("block header not found")
}
return &nodev1.GetHeaderResponse{
Header: header,
}, nil
}
func (s *server) GetBlockByNumber(ctx context.Context, request *nodev1.GetBlockRequest) (*nodev1.GetBlockResponse, error) {
blk := s.chain.GetBlock(uint256.NewInt(request.Number))
if blk == nil {
return nil, fmt.Errorf("block not found")
}
return &nodev1.GetBlockResponse{
Block: blk,
}, nil
}
func (s *server) GetBlockByHash(ctx context.Context, request *nodev1.GetBlockRequest) (*nodev1.GetBlockResponse, error) {
hash := common.HexToHash(request.Hash)
blk := s.chain.BlockByHash(hash)
if blk == nil {
return nil, fmt.Errorf("block not found")
}
return &nodev1.GetBlockResponse{
Block: blk,
}, nil
}
func (s *server) GetTransactionByHash(ctx context.Context, request *nodev1.GetTransactionRequest) (*nodev1.GetTransactionResponse, error) {
hash := common.HexToHash(request.Hash)
tx, err := s.chain.GetTransaction(hash)
if err != nil {
return nil, err
}
if tx == nil {
return nil, fmt.Errorf("transaction not found")
}
return &nodev1.GetTransactionResponse{
Transaction: tx,
}, nil
}
func (s *server) GetTransactionReceipt(ctx context.Context, request *nodev1.GetReceiptRequest) (*nodev1.GetReceiptResponse, error) {
hash := common.HexToHash(request.Hash)
receipt := s.chain.GetReceipt(hash)
if receipt == nil {
return nil, fmt.Errorf("receipt not found")
}
return &nodev1.GetReceiptResponse{
Receipt: receipt,
}, nil
}
func (s *server) GetPairList(ctx context.Context, request *nodev1.GetPairListRequest) (*nodev1.GetPairListResponse, error) {
//TODO implement me
panic("implement me")
}
func (s *server) GetPairInfo(ctx context.Context, request *nodev1.GetPairInfoRequest) (*nodev1.GetPairInfoResponse, error) {
//TODO implement me
panic("implement me")
}
func (s *server) GetCoinList(ctx context.Context, request *nodev1.GetCoinListRequest) (*nodev1.GetCoinListResponse, error) {
//TODO implement me
panic("implement me")
}
func (s *server) GetCoinInfo(ctx context.Context, request *nodev1.GetCoinInfoRequest) (*nodev1.GetCoinInfoResponse, error) {
//TODO implement me
panic("implement me")
}
func StartServer(logger log.Logger, grpcPort int, gwPort int, chain chaindb.ChainReader) error {
defer logger.Info("grpc and gateway server stopped")
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
gServer := &server{
chain: chain,
}
listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", grpcPort))
if err != nil {
logger.Error("failed to listen", "err", err)
return err
}
errCh := make(chan error)
grpcServer := grpc.NewServer()
nodev1.RegisterNodeServer(grpcServer, gServer)
go func(ch chan error) {
var e = grpcServer.Serve(listener)
ch <- e
}(errCh)
mux := runtime.NewServeMux()
opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
err = nodev1.RegisterNodeHandlerFromEndpoint(ctx, mux, fmt.Sprintf("127.0.0.1:%d", grpcPort), opts)
if err != nil {
return nil
}
go func(ch chan error) {
var e = http.ListenAndServe(fmt.Sprintf("0.0.0.0:%d", gwPort), mux)
ch <- e
}(errCh)
select {
case e := <-errCh:
return e
}
}
package exchainclient
import (
"context"
nodev1 "github.com/exchain/go-exchain/exchain/protocol/gen/go/node/v1"
"github.com/exchain/go-exchain/exchain/wrapper"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"math/big"
)
type ExClient struct {
ins nodev1.NodeClient
conn *grpc.ClientConn
}
func NewExClient(server string) (*ExClient, error) {
conn, err := grpc.Dial(server, grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(1024*1024*1024),
grpc.MaxCallSendMsgSize(1024*1024*1024))) // Use WithTransportCredentials for TLS
if err != nil {
return nil, err
}
client := nodev1.NewNodeClient(conn)
return &ExClient{ins: client, conn: conn}, nil
}
func (e *ExClient) BlockByNumber(ctx context.Context, number *big.Int) (*wrapper.BlkWrapper, error) {
req := &nodev1.GetBlockRequest{
Number: number.Uint64(),
}
res, err := e.ins.GetBlockByNumber(ctx, req)
if err != nil {
return nil, err
}
return wrapper.NewBlkWrapper(res.Block), nil
}
func (e *ExClient) Close() {
e.conn.Close()
return
}
This diff is collapsed.
......@@ -81,6 +81,7 @@ message GetHeaderResponse {
message GetBlockRequest {
uint64 number = 1;
string hash = 2;
}
message GetBlockResponse {
......
......@@ -35,6 +35,25 @@ func (b *BlkWrapper) Height() uint64 {
return b.blk.Header.Height
}
func (b *BlkWrapper) Transactions() []*nebulav1.Transaction {
if b.blk.Transactions != nil {
return b.blk.Transactions.Txs
}
return []*nebulav1.Transaction{}
}
func (b *BlkWrapper) ParentHash() common.Hash {
return common.BytesToHash(b.blk.Header.ParentHash)
}
func (b *BlkWrapper) Time() uint64 {
return b.blk.Header.Timestamp
}
func (b *BlkWrapper) NumberU64() uint64 {
return b.Height()
}
func (b *BlkWrapper) Header() *nebulav1.BlockHeader {
return b.blk.Header
}
......
......@@ -35,3 +35,11 @@ func (t *TxWrapper) calcHash() common.Hash {
return crypto.Keccak256Hash(data)
}
func (t *TxWrapper) Bytes() ([]byte, error) {
return proto.Marshal(t.tx)
}
func (t *TxWrapper) IsProtocolTx() bool {
return t.tx.TxType == nebulav1.TxType_ProtocolTx
}
package batcher
import (
"github.com/exchain/go-exchain/exchain/wrapper"
"math"
"github.com/ethereum/go-ethereum/log"
"github.com/exchain/go-exchain/op-batcher/metrics"
"github.com/exchain/go-exchain/op-node/rollup"
"github.com/exchain/go-exchain/op-node/rollup/derive"
"github.com/exchain/go-exchain/op-service/eth"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
// channel is a lightweight wrapper around a ChannelBuilder which keeps track of pending
......@@ -166,7 +166,7 @@ func (c *channel) CheckTimeout(l1BlockNum uint64) {
c.channelBuilder.CheckTimeout(l1BlockNum)
}
func (c *channel) AddBlock(block *types.Block) (*derive.L1BlockInfo, error) {
func (c *channel) AddBlock(block *wrapper.BlkWrapper) (*derive.L1BlockInfo, error) {
return c.channelBuilder.AddBlock(block)
}
......
......@@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"fmt"
"github.com/exchain/go-exchain/exchain/wrapper"
"io"
"math"
......@@ -11,7 +12,6 @@ import (
"github.com/exchain/go-exchain/op-node/rollup/derive"
"github.com/exchain/go-exchain/op-service/eth"
"github.com/exchain/go-exchain/op-service/queue"
"github.com/ethereum/go-ethereum/core/types"
)
var (
......@@ -66,7 +66,7 @@ type ChannelBuilder struct {
// current channel
co derive.ChannelOut
// list of blocks in the channel. Saved in case the channel must be rebuilt
blocks queue.Queue[*types.Block]
blocks queue.Queue[*wrapper.BlkWrapper]
// latestL1Origin is the latest L1 origin of all the L2 blocks that have been added to the channel
latestL1Origin eth.BlockID
// oldestL1Origin is the oldest L1 origin of all the L2 blocks that have been added to the channel
......@@ -136,7 +136,7 @@ func (c *ChannelBuilder) OutputBytes() int {
// Blocks returns a backup list of all blocks that were added to the channel. It
// can be used in case the channel needs to be rebuilt.
func (c *ChannelBuilder) Blocks() []*types.Block {
func (c *ChannelBuilder) Blocks() []*wrapper.BlkWrapper {
return c.blocks
}
......@@ -171,7 +171,7 @@ func (c *ChannelBuilder) OldestL2() eth.BlockID {
// first transaction for subsequent use by the caller.
//
// Call OutputFrames() afterwards to create frames.
func (c *ChannelBuilder) AddBlock(block *types.Block) (*derive.L1BlockInfo, error) {
func (c *ChannelBuilder) AddBlock(block *wrapper.BlkWrapper) (*derive.L1BlockInfo, error) {
if c.IsFull() {
return nil, c.FullErr()
}
......
......@@ -3,17 +3,17 @@ package batcher
import (
"errors"
"fmt"
"github.com/exchain/go-exchain/exchain/wrapper"
"io"
"math"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/exchain/go-exchain/op-batcher/metrics"
"github.com/exchain/go-exchain/op-node/rollup"
"github.com/exchain/go-exchain/op-node/rollup/derive"
"github.com/exchain/go-exchain/op-service/eth"
"github.com/exchain/go-exchain/op-service/queue"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
var ErrReorg = errors.New("block does not extend existing chain")
......@@ -36,7 +36,7 @@ type channelManager struct {
outFactory ChannelOutFactory
// All blocks since the last request for new tx data.
blocks queue.Queue[*types.Block]
blocks queue.Queue[*wrapper.BlkWrapper]
// blockCursor is an index into blocks queue. It points at the next block
// to build a channel with. blockCursor = len(blocks) is reserved for when
// there are no blocks ready to build with.
......@@ -124,7 +124,7 @@ func (s *channelManager) TxConfirmed(_id txID, inclusionBlock eth.BlockID) {
// in the block queue and the blockCursor is ahead of it.
// Panics if the block is not in state.
func (s *channelManager) rewindToBlock(block eth.BlockID) {
idx := block.Number - s.blocks[0].Number().Uint64()
idx := block.Number - s.blocks[0].Height()
if s.blocks[idx].Hash() == block.Hash && idx < uint64(s.blockCursor) {
s.blockCursor = int(idx)
} else {
......@@ -427,7 +427,7 @@ func (s *channelManager) outputFrames() error {
// AddL2Block adds an L2 block to the internal blocks queue. It returns ErrReorg
// if the block does not extend the last block loaded into the state. If no
// blocks were added yet, the parent hash check is skipped.
func (s *channelManager) AddL2Block(block *types.Block) error {
func (s *channelManager) AddL2Block(block *wrapper.BlkWrapper) error {
if s.tip != (common.Hash{}) && s.tip != block.ParentHash() {
return ErrReorg
}
......@@ -439,10 +439,10 @@ func (s *channelManager) AddL2Block(block *types.Block) error {
return nil
}
func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info *derive.L1BlockInfo) eth.L2BlockRef {
func l2BlockRefFromBlockAndL1Info(block *wrapper.BlkWrapper, l1info *derive.L1BlockInfo) eth.L2BlockRef {
return eth.L2BlockRef{
Hash: block.Hash(),
Number: block.NumberU64(),
Number: block.Height(),
ParentHash: block.ParentHash(),
Time: block.Time(),
L1Origin: eth.BlockID{Hash: l1info.BlockHash, Number: l1info.Number},
......
......@@ -24,8 +24,8 @@ type CLIConfig struct {
// L1EthRpc is the HTTP provider URL for L1.
L1EthRpc string
// L2EthRpc is the HTTP provider URL for the L2 execution engine. A comma-separated list enables the active L2 provider. Such a list needs to match the number of RollupRpcs provided.
L2EthRpc string
// L2GRpc is the grpc provider URL for the L2 execution engine. A comma-separated list enables the active L2 provider. Such a list needs to match the number of RollupRpcs provided.
L2GRpc string
// RollupRpc is the HTTP provider URL for the L2 rollup node. A comma-separated list enables the active L2 provider. Such a list needs to match the number of L2EthRpcs provided.
RollupRpc string
......@@ -125,13 +125,13 @@ func (c *CLIConfig) Check() error {
if c.L1EthRpc == "" {
return errors.New("empty L1 RPC URL")
}
if c.L2EthRpc == "" {
if c.L2GRpc == "" {
return errors.New("empty L2 RPC URL")
}
if c.RollupRpc == "" {
return errors.New("empty rollup RPC URL")
}
if strings.Count(c.RollupRpc, ",") != strings.Count(c.L2EthRpc, ",") {
if strings.Count(c.RollupRpc, ",") != strings.Count(c.L2GRpc, ",") {
return errors.New("number of rollup and eth URLs must match")
}
if c.PollInterval == 0 {
......@@ -182,7 +182,7 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
return &CLIConfig{
/* Required Flags */
L1EthRpc: ctx.String(flags.L1EthRpcFlag.Name),
L2EthRpc: ctx.String(flags.L2EthRpcFlag.Name),
L2GRpc: ctx.String(flags.L2GRpcFlag.Name),
RollupRpc: ctx.String(flags.RollupRpcFlag.Name),
SubSafetyMargin: ctx.Uint64(flags.SubSafetyMarginFlag.Name),
PollInterval: ctx.Duration(flags.PollIntervalFlag.Name),
......
......@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/exchain/go-exchain/exchain/wrapper"
"io"
"math/big"
_ "net/http/pprof"
......@@ -13,13 +14,10 @@ import (
"golang.org/x/sync/errgroup"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
altda "github.com/exchain/go-exchain/op-alt-da"
"github.com/exchain/go-exchain/op-batcher/metrics"
"github.com/exchain/go-exchain/op-node/rollup"
......@@ -72,7 +70,7 @@ type L1Client interface {
}
type L2Client interface {
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
BlockByNumber(ctx context.Context, number *big.Int) (*wrapper.BlkWrapper, error)
}
type RollupClient interface {
......@@ -255,7 +253,7 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context, start, end uin
l.Log.Info("Loading range of multiple blocks into state", "start", start, "end", end)
}
var latestBlock *types.Block
var latestBlock *wrapper.BlkWrapper
// Add all blocks to "state"
for i := start; i <= end; i++ {
block, err := l.loadBlockIntoState(ctx, i)
......@@ -280,7 +278,7 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context, start, end uin
}
// loadBlockIntoState fetches & stores a single block into `state`. It returns the block it loaded.
func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uint64) (*types.Block, error) {
func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uint64) (*wrapper.BlkWrapper, error) {
l2Client, err := l.EndpointProvider.EthClient(ctx)
if err != nil {
return nil, fmt.Errorf("getting L2 client: %w", err)
......@@ -516,67 +514,8 @@ func (l *BatchSubmitter) processReceiptsLoop(ctx context.Context, receiptsCh cha
// even in the event of sequencer failover.
func (l *BatchSubmitter) throttlingLoop(ctx context.Context) {
defer l.wg.Done()
l.Log.Info("Starting DA throttling loop")
ticker := time.NewTicker(l.Config.ThrottleInterval)
defer ticker.Stop()
updateParams := func(pendingBytes int64) {
ctx, cancel := context.WithTimeout(l.shutdownCtx, l.Config.NetworkTimeout)
defer cancel()
cl, err := l.EndpointProvider.EthClient(ctx)
if err != nil {
l.Log.Error("Can't reach sequencer execution RPC", "err", err)
return
}
maxTxSize := uint64(0)
maxBlockSize := l.Config.ThrottleAlwaysBlockSize
if pendingBytes > int64(l.Config.ThrottleThreshold) {
l.Log.Warn("Pending bytes over limit, throttling DA", "bytes", pendingBytes, "limit", l.Config.ThrottleThreshold)
maxTxSize = l.Config.ThrottleTxSize
if maxBlockSize == 0 || (l.Config.ThrottleBlockSize != 0 && l.Config.ThrottleBlockSize < maxBlockSize) {
maxBlockSize = l.Config.ThrottleBlockSize
}
}
var (
success bool
rpcErr rpc.Error
)
if err := cl.Client().CallContext(
ctx, &success, SetMaxDASizeMethod, hexutil.Uint64(maxTxSize), hexutil.Uint64(maxBlockSize),
); errors.As(err, &rpcErr) && eth.ErrorCode(rpcErr.ErrorCode()).IsGenericRPCError() {
l.Log.Error("SetMaxDASize rpc unavailable or broken, shutting down. Either enable it or disable throttling.", "err", err)
// We'd probably hit this error right after startup, so a short shutdown duration should suffice.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Call StopBatchSubmitting in another goroutine to avoid deadlock.
go func() {
// Always returns nil. An error is only returned to expose this function as an RPC.
_ = l.StopBatchSubmitting(ctx)
}()
return
} else if err != nil {
l.Log.Error("SetMaxDASize rpc failed, retrying.", "err", err)
return
}
if !success {
l.Log.Error("Result of SetMaxDASize was false, retrying.")
}
}
cachedPendingBytes := int64(0)
for {
select {
case <-ticker.C:
updateParams(int64(cachedPendingBytes))
case pendingBytes := <-l.pendingBytesUpdated:
cachedPendingBytes = pendingBytes
updateParams(pendingBytes)
case <-ctx.Done():
l.Log.Info("DA throttling loop done")
l.Log.Info("Current don't need throtting in exchain")
return
}
}
}
func (l *BatchSubmitter) waitNodeSyncAndClearState() {
......
......@@ -153,12 +153,12 @@ func (bs *BatcherService) initRPCClients(ctx context.Context, cfg *CLIConfig) er
bs.L1Client = l1Client
var endpointProvider dial.L2EndpointProvider
if strings.Contains(cfg.RollupRpc, ",") && strings.Contains(cfg.L2EthRpc, ",") {
if strings.Contains(cfg.RollupRpc, ",") && strings.Contains(cfg.L2GRpc, ",") {
rollupUrls := strings.Split(cfg.RollupRpc, ",")
ethUrls := strings.Split(cfg.L2EthRpc, ",")
ethUrls := strings.Split(cfg.L2GRpc, ",")
endpointProvider, err = dial.NewActiveL2EndpointProvider(ctx, ethUrls, rollupUrls, cfg.ActiveSequencerCheckDuration, dial.DefaultDialTimeout, bs.Log)
} else {
endpointProvider, err = dial.NewStaticL2EndpointProvider(ctx, bs.Log, cfg.L2EthRpc, cfg.RollupRpc)
endpointProvider, err = dial.NewStaticL2EndpointProvider(ctx, bs.Log, cfg.L2GRpc, cfg.RollupRpc)
}
if err != nil {
return fmt.Errorf("failed to build L2 endpoint provider: %w", err)
......
......@@ -2,11 +2,11 @@ package batcher
import (
"fmt"
"github.com/exchain/go-exchain/exchain/wrapper"
"github.com/ethereum/go-ethereum/log"
"github.com/exchain/go-exchain/op-service/eth"
"github.com/exchain/go-exchain/op-service/queue"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
type channelStatuser interface {
......@@ -34,7 +34,7 @@ func (s syncActions) String() string {
// state of the batcher (blocks and channels), the new sync status, and the previous current L1 block. The actions are returned
// in a struct specifying the number of blocks to prune, the number of channels to prune, whether to wait for node sync, the block
// range to load into the local state, and whether to clear the state entirely. Returns an boolean indicating if the sequencer is out of sync.
func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCurrentL1 eth.L1BlockRef, blocks queue.Queue[*types.Block], channels []T, l log.Logger) (syncActions, bool) {
func computeSyncActions[T channelStatuser](newSyncStatus eth.SyncStatus, prevCurrentL1 eth.L1BlockRef, blocks queue.Queue[*wrapper.BlkWrapper], channels []T, l log.Logger) (syncActions, bool) {
// PART 1: Initial checks on the sync status
if newSyncStatus.HeadL1 == (eth.L1BlockRef{}) {
......
......@@ -33,10 +33,10 @@ var (
Usage: "HTTP provider URL for L1",
EnvVars: prefixEnvVars("L1_ETH_RPC"),
}
L2EthRpcFlag = &cli.StringFlag{
Name: "l2-eth-rpc",
Usage: "HTTP provider URL for L2 execution engine. A comma-separated list enables the active L2 endpoint provider. Such a list needs to match the number of rollup-rpcs provided.",
EnvVars: prefixEnvVars("L2_ETH_RPC"),
L2GRpcFlag = &cli.StringFlag{
Name: "l2-grpc",
Usage: "Grpc URL provider for L2 node. A comma-separated list enables the active L2 endpoint provider. Such a list needs to match the number of rollup-rpcs provided.",
EnvVars: prefixEnvVars("L2_GRPC"),
}
RollupRpcFlag = &cli.StringFlag{
Name: "rollup-rpc",
......@@ -192,7 +192,7 @@ var (
var requiredFlags = []cli.Flag{
L1EthRpcFlag,
L2EthRpcFlag,
L2GRpcFlag,
RollupRpcFlag,
}
......
package metrics
import (
"github.com/exchain/go-exchain/exchain/wrapper"
"github.com/golang/protobuf/proto"
"sync/atomic"
"github.com/prometheus/client_golang/prometheus"
"github.com/ethereum/go-ethereum/core/types"
"github.com/exchain/go-exchain/op-node/rollup/derive"
"github.com/exchain/go-exchain/op-service/eth"
opmetrics "github.com/exchain/go-exchain/op-service/metrics"
......@@ -30,8 +31,8 @@ type Metricer interface {
RecordL2BlocksLoaded(l2ref eth.L2BlockRef)
RecordChannelOpened(id derive.ChannelID, numPendingBlocks int)
RecordL2BlocksAdded(l2ref eth.L2BlockRef, numBlocksAdded, numPendingBlocks, inputBytes, outputComprBytes int)
RecordL2BlockInPendingQueue(block *types.Block)
RecordL2BlockInChannel(block *types.Block)
RecordL2BlockInPendingQueue(block *wrapper.BlkWrapper)
RecordL2BlockInChannel(block *wrapper.BlkWrapper)
RecordChannelClosed(id derive.ChannelID, numPendingBlocks int, numFrames int, inputBytes int, outputComprBytes int, reason error)
RecordChannelFullySubmitted(id derive.ChannelID)
RecordChannelTimedOut(id derive.ChannelID)
......@@ -285,14 +286,14 @@ func (m *Metrics) RecordChannelClosed(id derive.ChannelID, numPendingBlocks int,
m.channelClosedReason.Set(float64(ClosedReasonToNum(reason)))
}
func (m *Metrics) RecordL2BlockInPendingQueue(block *types.Block) {
func (m *Metrics) RecordL2BlockInPendingQueue(block *wrapper.BlkWrapper) {
daSize, rawSize := estimateBatchSize(block)
m.pendingBlocksBytesTotal.Add(float64(rawSize))
m.pendingBlocksBytesCurrent.Add(float64(rawSize))
atomic.AddInt64(&m.pendingDABytes, int64(daSize))
}
func (m *Metrics) RecordL2BlockInChannel(block *types.Block) {
func (m *Metrics) RecordL2BlockInChannel(block *wrapper.BlkWrapper) {
daSize, rawSize := estimateBatchSize(block)
m.pendingBlocksBytesCurrent.Add(-1.0 * float64(rawSize))
atomic.AddInt64(&m.pendingDABytes, -1*int64(daSize))
......@@ -330,20 +331,11 @@ func (m *Metrics) RecordBlobUsedBytes(num int) {
// estimateBatchSize returns the estimated size of the block in a batch both with compression ('daSize') and without
// ('rawSize').
func estimateBatchSize(block *types.Block) (daSize, rawSize uint64) {
func estimateBatchSize(block *wrapper.BlkWrapper) (daSize, rawSize uint64) {
daSize = uint64(70) // estimated overhead of batch metadata
rawSize = uint64(70)
for _, tx := range block.Transactions() {
// Deposit transactions are not included in batches
if tx.IsDepositTx() {
continue
}
bigSize := tx.RollupCostData().EstimatedDASize()
if bigSize.IsUint64() { // this should always be true, but if not just ignore
daSize += bigSize.Uint64()
}
// Add 2 for the overhead of encoding the tx bytes in a RLP list
rawSize += tx.Size() + 2
}
data, _ := proto.Marshal(block.Block())
rawSize += uint64(len(data))
daSize += uint64(len(data))
return
}
package metrics
import (
"github.com/exchain/go-exchain/exchain/wrapper"
"io"
"math"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
......@@ -32,8 +32,8 @@ func (*noopMetrics) RecordLatestL1Block(l1ref eth.L1BlockRef) {}
func (*noopMetrics) RecordL2BlocksLoaded(eth.L2BlockRef) {}
func (*noopMetrics) RecordChannelOpened(derive.ChannelID, int) {}
func (*noopMetrics) RecordL2BlocksAdded(eth.L2BlockRef, int, int, int, int) {}
func (*noopMetrics) RecordL2BlockInPendingQueue(*types.Block) {}
func (*noopMetrics) RecordL2BlockInChannel(*types.Block) {}
func (*noopMetrics) RecordL2BlockInPendingQueue(*wrapper.BlkWrapper) {}
func (*noopMetrics) RecordL2BlockInChannel(*wrapper.BlkWrapper) {}
func (*noopMetrics) RecordChannelClosed(derive.ChannelID, int, int, int, int, error) {}
......
package metrics
import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/exchain/go-exchain/exchain/wrapper"
)
type TestMetrics struct {
......@@ -11,12 +11,12 @@ type TestMetrics struct {
var _ Metricer = new(TestMetrics)
func (m *TestMetrics) RecordL2BlockInPendingQueue(block *types.Block) {
func (m *TestMetrics) RecordL2BlockInPendingQueue(block *wrapper.BlkWrapper) {
_, rawSize := estimateBatchSize(block)
m.PendingBlocksBytesCurrent += float64(rawSize)
}
func (m *TestMetrics) RecordL2BlockInChannel(block *types.Block) {
func (m *TestMetrics) RecordL2BlockInChannel(block *wrapper.BlkWrapper) {
_, rawSize := estimateBatchSize(block)
m.PendingBlocksBytesCurrent -= float64(rawSize)
}
......@@ -252,7 +252,7 @@ var Subcommands = cli.Commands{
return err
}
chain := chaindb.NewChainDB(logger, database)
processengine.NewEngine(chain) // todo
processengine.NewEngine(dataDir, logger, chain) // todo
engine := mockengine.NewEngine(dataDir, logger, chain)
if err = genblk.Commit(engine, chain); err != nil {
......
......@@ -107,6 +107,34 @@ var (
}(),
Category: RollupCategory,
}
GRPCListenAddr = &cli.StringFlag{
Name: "grpc.addr",
Usage: "gRPC listening address",
EnvVars: prefixEnvVars("GRPC_ADDR"),
Value: "127.0.0.1",
Category: OperationsCategory,
}
GRPCListenPort = &cli.IntFlag{
Name: "grpc.port",
Usage: "gRPC listening port",
EnvVars: prefixEnvVars("GRPC_PORT"),
Value: 7788, // Note: op-service/rpc/cli.go uses 8545 as the default.
Category: OperationsCategory,
}
GWListenAddr = &cli.StringFlag{
Name: "gw.addr",
Usage: "gRPC gateway listening address",
EnvVars: prefixEnvVars("GW_ADDR"),
Value: "127.0.0.1",
Category: OperationsCategory,
}
GWListenPort = &cli.IntFlag{
Name: "gw.port",
Usage: "gRPC gateway listening port",
EnvVars: prefixEnvVars("GW_PORT"),
Value: 7790, // Note: op-service/rpc/cli.go uses 8545 as the default.
Category: OperationsCategory,
}
RPCListenAddr = &cli.StringFlag{
Name: "rpc.addr",
Usage: "RPC listening address",
......@@ -394,6 +422,10 @@ var optionalFlags = []cli.Flag{
BeaconCheckIgnore,
BeaconFetchAllSidecars,
SyncModeFlag,
GRPCListenAddr,
GRPCListenPort,
GWListenAddr,
GWListenPort,
RPCListenAddr,
RPCListenPort,
L1TrustRPC,
......
......@@ -36,6 +36,10 @@ type Config struct {
RPC RPCConfig
GW GWConfig
GRPC GRPCConfig
P2P p2p.SetupP2P
Metrics MetricsConfig
......@@ -82,6 +86,16 @@ type Config struct {
// ConductorRPCFunc retrieves the endpoint. The RPC may not immediately be available.
type ConductorRPCFunc func(ctx context.Context) (string, error)
type GRPCConfig struct {
ListenAddr string
ListenPort int
}
type GWConfig struct {
ListenAddr string
ListenPort int
}
type RPCConfig struct {
ListenAddr string
ListenPort int
......
......@@ -7,6 +7,7 @@ import (
"github.com/exchain/go-exchain/engine"
"github.com/exchain/go-exchain/exchain"
"github.com/exchain/go-exchain/exchain/chaindb"
"github.com/exchain/go-exchain/exchain/exchainapi"
"github.com/exchain/go-exchain/exchain/mockengine"
"github.com/exchain/go-exchain/metadb"
"github.com/exchain/go-exchain/metadb/groupdb"
......@@ -400,7 +401,7 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config) error {
var err error
n.db = groupdb.NewGroupDB(n.cfg.NodeDataPath, "chain")
chain := chaindb.NewChainDB(n.log, n.db)
n.engineIns = processengine.NewEngine(chain)
n.engineIns = processengine.NewEngine(n.cfg.NodeDataPath, n.log, chain)
n.engineIns = mockengine.NewEngine(n.cfg.NodeDataPath, n.log, chain)
n.l2Source = engine.NewEngineAPI(&n.cfg.Rollup, chain, n.engineIns)
if n.engineIns == nil {
......@@ -459,6 +460,17 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config) error {
return nil
}
func (n *OpNode) initGrpcServer(cfg *Config) error {
reader := chaindb.NewChainReader(n.log, n.db)
go func() {
err := exchainapi.StartServer(n.log, cfg.GRPC.ListenPort, cfg.GW.ListenPort, reader)
if err != nil {
n.log.Error("start server failed", "err", err)
}
}()
return nil
}
func (n *OpNode) initRPCServer(cfg *Config) error {
server, err := newRPCServer(&cfg.RPC, &cfg.Rollup, n.l2Source, n.l2Driver, n.safeDB, n.log, n.appVersion, n.metrics)
if err != nil {
......
......@@ -5,13 +5,13 @@ import (
"crypto/rand"
"errors"
"fmt"
"github.com/exchain/go-exchain/exchain/wrapper"
"io"
"github.com/exchain/go-exchain/op-node/rollup"
"github.com/exchain/go-exchain/op-node/rollup/derive/params"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/exchain/go-exchain/op-node/rollup"
"github.com/exchain/go-exchain/op-node/rollup/derive/params"
)
var (
......@@ -54,7 +54,7 @@ type Compressor interface {
type ChannelOut interface {
ID() ChannelID
Reset() error
AddBlock(*rollup.Config, *types.Block) (*L1BlockInfo, error)
AddBlock(*rollup.Config, *wrapper.BlkWrapper) (*L1BlockInfo, error)
InputBytes() int
ReadyBytes() int
Flush() error
......@@ -111,7 +111,7 @@ func (co *SingularChannelOut) Reset() error {
// and an error if there is a problem adding the block. The only sentinel error
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// should be closed and a new one should be made.
func (co *SingularChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) (*L1BlockInfo, error) {
func (co *SingularChannelOut) AddBlock(rollupCfg *rollup.Config, block *wrapper.BlkWrapper) (*L1BlockInfo, error) {
if co.closed {
return nil, ErrChannelOutAlreadyClosed
}
......@@ -218,28 +218,30 @@ func (co *SingularChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint
}
// BlockToSingularBatch transforms a block into a batch object that can easily be RLP encoded.
func BlockToSingularBatch(rollupCfg *rollup.Config, block *types.Block) (*SingularBatch, *L1BlockInfo, error) {
func BlockToSingularBatch(rollupCfg *rollup.Config, block *wrapper.BlkWrapper) (*SingularBatch, *L1BlockInfo, error) {
if len(block.Transactions()) == 0 {
return nil, nil, fmt.Errorf("block %v has no transactions", block.Hash())
}
opaqueTxs := make([]hexutil.Bytes, 0, len(block.Transactions()))
for i, tx := range block.Transactions() {
if tx.Type() == types.DepositTxType {
wtx := wrapper.NewTxWrapper(tx)
if wtx.IsProtocolTx() {
continue
}
otx, err := tx.MarshalBinary()
otx, err := wtx.Bytes()
if err != nil {
return nil, nil, fmt.Errorf("could not encode tx %v in block %v: %w", i, tx.Hash(), err)
return nil, nil, fmt.Errorf("could not encode tx %v in block %v: %w", i, wtx.Hash(), err)
}
opaqueTxs = append(opaqueTxs, otx)
}
l1InfoTx := block.Transactions()[0]
if l1InfoTx.Type() != types.DepositTxType {
wtx := wrapper.NewTxWrapper(l1InfoTx)
if !wtx.IsProtocolTx() {
return nil, nil, ErrNotDepositTx
}
l1Info, err := L1BlockInfoFromBytes(rollupCfg, block.Time(), l1InfoTx.Data())
l1Info, err := L1BlockInfoFromNebula(rollupCfg, block.Time(), l1InfoTx.GetProtocolTx())
if err != nil {
return nil, l1Info, fmt.Errorf("could not parse the L1 Info deposit: %w", err)
}
......
package derive
import (
"bytes"
"fmt"
nebulav1 "github.com/exchain/go-exchain/exchain/protocol/gen/go/nebula/v1"
"math/big"
......@@ -54,6 +55,7 @@ func DeriveDeposits(receipts []*types.Receipt, depositContractAddr common.Addres
func DeriveDepositsForExchain(receipts []*types.Receipt, depositContractAddr common.Address) ([]*nebulav1.Transaction_DepositTx, error) {
var result error
localCoinName := "USDT"
userDeposits, err := UserDeposits(receipts, depositContractAddr)
if err != nil {
result = multierror.Append(result, err)
......@@ -68,16 +70,16 @@ func DeriveDepositsForExchain(receipts []*types.Receipt, depositContractAddr com
mint = &nebulav1.DepositTransaction{
SourceHash: tx.SourceHash.Bytes(),
User: tx.From.Bytes(),
Coin: common.Address{}.Bytes(),
Coin: []byte(localCoinName),
Amount: tx.Mint.Bytes(),
}
}
if tx.To != nil {
if tx.To != nil && bytes.Compare(tx.From.Bytes(), tx.To.Bytes()) != 0 {
// then do transfer tx
transfer = &nebulav1.DepositTransaction{
SourceHash: tx.SourceHash.Bytes(),
User: tx.To.Bytes(),
Coin: common.Address{}.Bytes(),
Coin: []byte(localCoinName),
Amount: tx.Value.Bytes(),
}
}
......
......@@ -2,10 +2,9 @@ package derive
import (
"fmt"
nebulav1 "github.com/exchain/go-exchain/exchain/protocol/gen/go/nebula/v1"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/exchain/go-exchain/op-node/rollup"
"github.com/exchain/go-exchain/op-service/eth"
)
......@@ -20,7 +19,7 @@ type L2BlockRefSource interface {
ParentHash() common.Hash
NumberU64() uint64
Time() uint64
Transactions() types.Transactions
Transactions() []*nebulav1.Transaction
}
// L2BlockToBlockRef extracts the essential L2BlockRef information from an L2
......@@ -39,14 +38,14 @@ func L2BlockToBlockRef(rollupCfg *rollup.Config, block L2BlockRefSource) (eth.L2
sequenceNumber = 0
} else {
txs := block.Transactions()
if txs.Len() == 0 {
if len(txs) == 0 {
return eth.L2BlockRef{}, fmt.Errorf("l2 block is missing L1 info deposit tx, block hash: %s", hash)
}
tx := txs[0]
if tx.Type() != types.DepositTxType {
return eth.L2BlockRef{}, fmt.Errorf("first payload tx has unexpected tx type: %d", tx.Type())
if tx.TxType != nebulav1.TxType_ProtocolTx {
return eth.L2BlockRef{}, fmt.Errorf("first payload tx has unexpected tx type: %d", tx.TxType)
}
info, err := L1BlockInfoFromBytes(rollupCfg, block.Time(), tx.Data())
info, err := L1BlockInfoFromNebula(rollupCfg, block.Time(), tx.GetProtocolTx())
if err != nil {
return eth.L2BlockRef{}, fmt.Errorf("failed to parse L1 info deposit tx from L2 block: %w", err)
}
......
......@@ -4,9 +4,9 @@ import (
"bytes"
"crypto/rand"
"fmt"
"github.com/exchain/go-exchain/exchain/wrapper"
"io"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/exchain/go-exchain/op-node/rollup"
......@@ -126,7 +126,7 @@ func (co *SpanChannelOut) swapRLP() {
// and an error if there is a problem adding the block. The only sentinel error
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// should be closed and a new one should be made.
func (co *SpanChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) (*L1BlockInfo, error) {
func (co *SpanChannelOut) AddBlock(rollupCfg *rollup.Config, block *wrapper.BlkWrapper) (*L1BlockInfo, error) {
if co.closed {
return nil, ErrChannelOutAlreadyClosed
}
......
......@@ -4,27 +4,27 @@ import (
"context"
"errors"
"fmt"
"github.com/exchain/go-exchain/exchain/exchainclient"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/exchain/go-exchain/op-service/client"
"github.com/exchain/go-exchain/op-service/sources"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
)
const DefaultActiveSequencerFollowerCheckDuration = 2 * DefaultDialTimeout
type ethDialer func(ctx context.Context, log log.Logger, url string) (EthClientInterface, error)
type l2Dialer func(ctx context.Context, log log.Logger, url string) (ExchainClientInterface, error)
// ActiveL2EndpointProvider is an interface for providing a RollupClient and l2 eth client
// It manages the lifecycle of the RollupClient and eth client for callers
// It does this by failing over down the list of rollupUrls if the current one is inactive or broken
type ActiveL2EndpointProvider struct {
ActiveL2RollupProvider
currentEthClient EthClientInterface
ethClientIndex int
ethDialer ethDialer
ethUrls []string
currentEthClient ExchainClientInterface
l2ClientIndex int
l2Dialer l2Dialer
l2Urls []string
}
// NewActiveL2EndpointProvider creates a new ActiveL2EndpointProvider
......@@ -36,13 +36,8 @@ func NewActiveL2EndpointProvider(ctx context.Context,
networkTimeout time.Duration,
logger log.Logger,
) (*ActiveL2EndpointProvider, error) {
ethDialer := func(ctx context.Context, log log.Logger, url string) (EthClientInterface, error) {
rpcCl, err := dialRPCClient(ctx, log, url)
if err != nil {
return nil, err
}
return ethclient.NewClient(rpcCl), nil
ethDialer := func(ctx context.Context, log log.Logger, url string) (ExchainClientInterface, error) {
return exchainclient.NewExClient(url)
}
rollupDialer := func(ctx context.Context, log log.Logger, url string) (RollupClientInterface, error) {
rpcCl, err := dialRPCClient(ctx, log, url)
......@@ -57,18 +52,18 @@ func NewActiveL2EndpointProvider(ctx context.Context,
func newActiveL2EndpointProvider(
ctx context.Context,
ethUrls, rollupUrls []string,
l2Urls, rollupUrls []string,
checkDuration time.Duration,
networkTimeout time.Duration,
logger log.Logger,
ethDialer ethDialer,
l2Dialer l2Dialer,
rollupDialer rollupDialer,
) (*ActiveL2EndpointProvider, error) {
if len(rollupUrls) == 0 {
return nil, errors.New("empty rollup urls list, expected at least one URL")
}
if len(ethUrls) != len(rollupUrls) {
return nil, fmt.Errorf("number of eth urls (%d) and rollup urls (%d) mismatch", len(ethUrls), len(rollupUrls))
if len(l2Urls) != len(rollupUrls) {
return nil, fmt.Errorf("number of eth urls (%d) and rollup urls (%d) mismatch", len(l2Urls), len(rollupUrls))
}
rollupProvider, err := newActiveL2RollupProvider(ctx, rollupUrls, checkDuration, networkTimeout, logger, rollupDialer)
......@@ -77,8 +72,8 @@ func newActiveL2EndpointProvider(
}
p := &ActiveL2EndpointProvider{
ActiveL2RollupProvider: *rollupProvider,
ethDialer: ethDialer,
ethUrls: ethUrls,
l2Dialer: l2Dialer,
l2Urls: l2Urls,
}
cctx, cancel := context.WithTimeout(ctx, networkTimeout)
defer cancel()
......@@ -88,28 +83,28 @@ func newActiveL2EndpointProvider(
return p, nil
}
func (p *ActiveL2EndpointProvider) EthClient(ctx context.Context) (EthClientInterface, error) {
func (p *ActiveL2EndpointProvider) EthClient(ctx context.Context) (ExchainClientInterface, error) {
p.clientLock.Lock()
defer p.clientLock.Unlock()
err := p.ensureActiveEndpoint(ctx)
if err != nil {
return nil, err
}
if p.ethClientIndex != p.rollupIndex || p.currentEthClient == nil {
if p.l2ClientIndex != p.rollupIndex || p.currentEthClient == nil {
// we changed sequencers, dial a new EthClient
cctx, cancel := context.WithTimeout(ctx, p.networkTimeout)
defer cancel()
idx := p.rollupIndex
ep := p.ethUrls[idx]
ep := p.l2Urls[idx]
log.Info("sequencer changed (or ethClient was nil due to startup), dialing new eth client", "new_index", idx, "new_url", ep)
ethClient, err := p.ethDialer(cctx, p.log, ep)
ethClient, err := p.l2Dialer(cctx, p.log, ep)
if err != nil {
return nil, fmt.Errorf("dialing eth client: %w", err)
}
if p.currentEthClient != nil {
p.currentEthClient.Close()
}
p.ethClientIndex = idx
p.l2ClientIndex = idx
p.currentEthClient = ethClient
}
return p.currentEthClient, nil
......
......@@ -2,15 +2,15 @@ package dial
import (
"context"
"github.com/ethereum/go-ethereum/core/types"
"math/big"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
)
// EthClientInterface is an interface for providing an ethclient.Client
// It does not describe all of the functions an ethclient.Client has, only the ones used by callers of the L2 Providers
type EthClientInterface interface {
type MEthClientInterface interface {
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
Client() *rpc.Client
......
package dial
import (
"context"
"github.com/exchain/go-exchain/exchain/wrapper"
"math/big"
)
// ExchainClientInterface is an interface for providing an ethclient.Client
// It does not describe all of the functions an ethclient.Client has, only the ones used by callers of the L2 Providers
type ExchainClientInterface interface {
BlockByNumber(ctx context.Context, number *big.Int) (*wrapper.BlkWrapper, error)
Close()
}
......@@ -2,8 +2,8 @@ package dial
import (
"context"
"github.com/exchain/go-exchain/exchain/exchainclient"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
)
......@@ -15,38 +15,39 @@ type L2EndpointProvider interface {
// EthClient(ctx) returns the underlying ethclient pointing to the L2 execution node.
// Note: ctx should be a lifecycle context without an attached timeout as client selection may involve
// multiple network operations, specifically in the case of failover.
EthClient(ctx context.Context) (EthClientInterface, error)
EthClient(ctx context.Context) (ExchainClientInterface, error)
}
// StaticL2EndpointProvider is a L2EndpointProvider that always returns the same static RollupClient and eth client
// It is meant for scenarios where a single, unchanging (L2 rollup node, L2 execution node) pair is used
type StaticL2EndpointProvider struct {
StaticL2RollupProvider
ethClient *ethclient.Client
l2Client *exchainclient.ExClient
}
func NewStaticL2EndpointProvider(ctx context.Context, log log.Logger, ethClientUrl string, rollupClientUrl string) (*StaticL2EndpointProvider, error) {
ethClient, err := DialEthClientWithTimeout(ctx, DefaultDialTimeout, log, ethClientUrl)
func NewStaticL2EndpointProvider(ctx context.Context, log log.Logger, l2Url string, rollupClientUrl string) (*StaticL2EndpointProvider, error) {
cli, err := exchainclient.NewExClient(l2Url)
if err != nil {
return nil, err
}
rollupProvider, err := NewStaticL2RollupProvider(ctx, log, rollupClientUrl)
if err != nil {
return nil, err
}
return &StaticL2EndpointProvider{
StaticL2RollupProvider: *rollupProvider,
ethClient: ethClient,
l2Client: cli,
}, nil
}
func (p *StaticL2EndpointProvider) EthClient(context.Context) (EthClientInterface, error) {
return p.ethClient, nil
func (p *StaticL2EndpointProvider) EthClient(context.Context) (ExchainClientInterface, error) {
return p.l2Client, nil
}
func (p *StaticL2EndpointProvider) Close() {
if p.ethClient != nil {
p.ethClient.Close()
if p.l2Client != nil {
p.l2Client.Close()
}
p.StaticL2RollupProvider.Close()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment