Commit 69256aa1 authored by Hamdi Allam's avatar Hamdi Allam

indexer.db.batching

parent e52924b9
package bigint
import "math/big"
var (
Zero = big.NewInt(0)
One = big.NewInt(1)
)
// Clamp returns a new big.Int for `end` to which `end - start` <= size.
// @note (start, end) is an inclusive range
func Clamp(start, end *big.Int, size uint64) *big.Int {
temp := new(big.Int)
count := temp.Sub(end, start).Uint64() + 1
if count <= size {
return end
}
// we re-use the allocated temp as the new end
temp.Add(start, big.NewInt(int64(size-1)))
return temp
}
// Matcher returns an inner comparison function result for a big.Int
func Matcher(num int64) func(*big.Int) bool {
return func(bi *big.Int) bool { return bi.Int64() == num }
}
type Range struct {
Start *big.Int
End *big.Int
}
// Grouped will return a slice of inclusive ranges from (start, end),
// capped to the supplied size from `(start, end)`.
func Grouped(start, end *big.Int, size uint64) []Range {
if end.Cmp(start) < 0 || size == 0 {
return nil
}
bigMaxDiff := big.NewInt(int64(size - 1))
groups := []Range{}
for start.Cmp(end) <= 0 {
diff := new(big.Int).Sub(end, start)
switch {
case diff.Uint64()+1 <= size:
// re-use allocated diff as the next start
groups = append(groups, Range{start, end})
start = diff.Add(end, One)
default:
// re-use allocated diff as the next start
end := new(big.Int).Add(start, bigMaxDiff)
groups = append(groups, Range{start, end})
start = diff.Add(end, One)
}
}
return groups
}
package bigint
import (
"math/big"
"testing"
"github.com/stretchr/testify/require"
)
func TestClamp(t *testing.T) {
start := big.NewInt(1)
end := big.NewInt(10)
// When the (start, end) bounds are within range
// the same end pointer should be returned
// larger range
result := Clamp(start, end, 20)
require.True(t, end == result)
// exact range
result = Clamp(start, end, 10)
require.True(t, end == result)
// smaller range
result = Clamp(start, end, 5)
require.False(t, end == result)
require.Equal(t, uint64(5), result.Uint64())
}
func TestGrouped(t *testing.T) {
// base cases
require.Nil(t, Grouped(One, Zero, 1))
require.Nil(t, Grouped(Zero, One, 0))
// Same Start/End
group := Grouped(One, One, 1)
require.Len(t, group, 1)
require.Equal(t, One, group[0].Start)
require.Equal(t, One, group[0].End)
Three, Five := big.NewInt(3), big.NewInt(5)
// One at a time
group = Grouped(One, Three, 1)
require.Equal(t, One, group[0].End)
require.Equal(t, int64(1), group[0].End.Int64())
require.Equal(t, int64(2), group[1].Start.Int64())
require.Equal(t, int64(2), group[1].End.Int64())
require.Equal(t, int64(3), group[2].Start.Int64())
require.Equal(t, int64(3), group[2].End.Int64())
// Split groups
group = Grouped(One, Five, 3)
require.Len(t, group, 2)
require.Equal(t, One, group[0].Start)
require.Equal(t, int64(3), group[0].End.Int64())
require.Equal(t, int64(4), group[1].Start.Int64())
require.Equal(t, Five, group[1].End)
// Encompasses the range
group = Grouped(One, Five, 5)
require.Len(t, group, 1)
require.Equal(t, One, group[0].Start, Zero)
require.Equal(t, Five, group[0].End)
// Size larger than the entire range
group = Grouped(One, Five, 100)
require.Len(t, group, 1)
require.Equal(t, One, group[0].Start, Zero)
require.Equal(t, Five, group[0].End)
}
...@@ -104,17 +104,17 @@ func newBlocksDB(db *gorm.DB) BlocksDB { ...@@ -104,17 +104,17 @@ func newBlocksDB(db *gorm.DB) BlocksDB {
// L1 // L1
func (db *blocksDB) StoreL1BlockHeaders(headers []L1BlockHeader) error { func (db *blocksDB) StoreL1BlockHeaders(headers []L1BlockHeader) error {
result := db.gorm.Create(&headers) result := db.gorm.CreateInBatches(&headers, batchInsertSize)
return result.Error return result.Error
} }
func (db *blocksDB) StoreLegacyStateBatches(stateBatches []LegacyStateBatch) error { func (db *blocksDB) StoreLegacyStateBatches(stateBatches []LegacyStateBatch) error {
result := db.gorm.Create(stateBatches) result := db.gorm.CreateInBatches(stateBatches, batchInsertSize)
return result.Error return result.Error
} }
func (db *blocksDB) StoreOutputProposals(outputs []OutputProposal) error { func (db *blocksDB) StoreOutputProposals(outputs []OutputProposal) error {
result := db.gorm.Create(outputs) result := db.gorm.CreateInBatches(outputs, batchInsertSize)
return result.Error return result.Error
} }
...@@ -180,7 +180,7 @@ func (db *blocksDB) OutputProposal(index *big.Int) (*OutputProposal, error) { ...@@ -180,7 +180,7 @@ func (db *blocksDB) OutputProposal(index *big.Int) (*OutputProposal, error) {
// L2 // L2
func (db *blocksDB) StoreL2BlockHeaders(headers []L2BlockHeader) error { func (db *blocksDB) StoreL2BlockHeaders(headers []L2BlockHeader) error {
result := db.gorm.Create(&headers) result := db.gorm.CreateInBatches(&headers, batchInsertSize)
return result.Error return result.Error
} }
...@@ -234,6 +234,7 @@ func (db *blocksDB) LatestEpoch() (*Epoch, error) { ...@@ -234,6 +234,7 @@ func (db *blocksDB) LatestEpoch() (*Epoch, error) {
// will have a matching timestamp with the L1 origin. // will have a matching timestamp with the L1 origin.
query := db.gorm.Table("l1_block_headers").Order("l1_block_headers.timestamp DESC") query := db.gorm.Table("l1_block_headers").Order("l1_block_headers.timestamp DESC")
query = query.Joins("INNER JOIN l2_block_headers ON l2_block_headers.timestamp = l1_block_headers.timestamp") query = query.Joins("INNER JOIN l2_block_headers ON l2_block_headers.timestamp = l1_block_headers.timestamp")
query = query.Order("l2_block_headers.number DESC")
query = query.Select("*") query = query.Select("*")
var epoch Epoch var epoch Epoch
......
...@@ -72,7 +72,7 @@ func newBridgeMessagesDB(db *gorm.DB) BridgeMessagesDB { ...@@ -72,7 +72,7 @@ func newBridgeMessagesDB(db *gorm.DB) BridgeMessagesDB {
*/ */
func (db bridgeMessagesDB) StoreL1BridgeMessages(messages []L1BridgeMessage) error { func (db bridgeMessagesDB) StoreL1BridgeMessages(messages []L1BridgeMessage) error {
result := db.gorm.Create(&messages) result := db.gorm.CreateInBatches(&messages, batchInsertSize)
return result.Error return result.Error
} }
...@@ -111,7 +111,7 @@ func (db bridgeMessagesDB) MarkRelayedL1BridgeMessage(messageHash common.Hash, r ...@@ -111,7 +111,7 @@ func (db bridgeMessagesDB) MarkRelayedL1BridgeMessage(messageHash common.Hash, r
*/ */
func (db bridgeMessagesDB) StoreL2BridgeMessages(messages []L2BridgeMessage) error { func (db bridgeMessagesDB) StoreL2BridgeMessages(messages []L2BridgeMessage) error {
result := db.gorm.Create(&messages) result := db.gorm.CreateInBatches(&messages, batchInsertSize)
return result.Error return result.Error
} }
......
...@@ -80,7 +80,7 @@ func newBridgeTransactionsDB(db *gorm.DB) BridgeTransactionsDB { ...@@ -80,7 +80,7 @@ func newBridgeTransactionsDB(db *gorm.DB) BridgeTransactionsDB {
*/ */
func (db *bridgeTransactionsDB) StoreL1TransactionDeposits(deposits []L1TransactionDeposit) error { func (db *bridgeTransactionsDB) StoreL1TransactionDeposits(deposits []L1TransactionDeposit) error {
result := db.gorm.Create(&deposits) result := db.gorm.CreateInBatches(&deposits, batchInsertSize)
return result.Error return result.Error
} }
...@@ -133,7 +133,7 @@ func (db *bridgeTransactionsDB) L1LatestBlockHeader() (*L1BlockHeader, error) { ...@@ -133,7 +133,7 @@ func (db *bridgeTransactionsDB) L1LatestBlockHeader() (*L1BlockHeader, error) {
*/ */
func (db *bridgeTransactionsDB) StoreL2TransactionWithdrawals(withdrawals []L2TransactionWithdrawal) error { func (db *bridgeTransactionsDB) StoreL2TransactionWithdrawals(withdrawals []L2TransactionWithdrawal) error {
result := db.gorm.Create(&withdrawals) result := db.gorm.CreateInBatches(&withdrawals, batchInsertSize)
return result.Error return result.Error
} }
......
...@@ -89,7 +89,7 @@ func newBridgeTransfersDB(db *gorm.DB) BridgeTransfersDB { ...@@ -89,7 +89,7 @@ func newBridgeTransfersDB(db *gorm.DB) BridgeTransfersDB {
*/ */
func (db *bridgeTransfersDB) StoreL1BridgeDeposits(deposits []L1BridgeDeposit) error { func (db *bridgeTransfersDB) StoreL1BridgeDeposits(deposits []L1BridgeDeposit) error {
result := db.gorm.Create(&deposits) result := db.gorm.CreateInBatches(&deposits, batchInsertSize)
return result.Error return result.Error
} }
...@@ -202,7 +202,7 @@ l1_bridge_deposits.timestamp, cross_domain_message_hash, local_token_address, re ...@@ -202,7 +202,7 @@ l1_bridge_deposits.timestamp, cross_domain_message_hash, local_token_address, re
*/ */
func (db *bridgeTransfersDB) StoreL2BridgeWithdrawals(withdrawals []L2BridgeWithdrawal) error { func (db *bridgeTransfersDB) StoreL2BridgeWithdrawals(withdrawals []L2BridgeWithdrawal) error {
result := db.gorm.Create(&withdrawals) result := db.gorm.CreateInBatches(&withdrawals, batchInsertSize)
return result.Error return result.Error
} }
......
...@@ -109,7 +109,7 @@ func newContractEventsDB(db *gorm.DB) ContractEventsDB { ...@@ -109,7 +109,7 @@ func newContractEventsDB(db *gorm.DB) ContractEventsDB {
// L1 // L1
func (db *contractEventsDB) StoreL1ContractEvents(events []L1ContractEvent) error { func (db *contractEventsDB) StoreL1ContractEvents(events []L1ContractEvent) error {
result := db.gorm.Create(&events) result := db.gorm.CreateInBatches(&events, batchInsertSize)
return result.Error return result.Error
} }
...@@ -176,7 +176,7 @@ func (db *contractEventsDB) L1LatestContractEventWithFilter(filter ContractEvent ...@@ -176,7 +176,7 @@ func (db *contractEventsDB) L1LatestContractEventWithFilter(filter ContractEvent
// L2 // L2
func (db *contractEventsDB) StoreL2ContractEvents(events []L2ContractEvent) error { func (db *contractEventsDB) StoreL2ContractEvents(events []L2ContractEvent) error {
result := db.gorm.Create(&events) result := db.gorm.CreateInBatches(&events, batchInsertSize)
return result.Error return result.Error
} }
......
...@@ -12,6 +12,14 @@ import ( ...@@ -12,6 +12,14 @@ import (
"gorm.io/gorm/logger" "gorm.io/gorm/logger"
) )
var (
// The postgres parameter counter for a given query is stored via a uint16,
// resulting in a parameter limit of 65535. In order to avoid reaching this limit
// we'll utilize a batch size of 3k for inserts, well below as long as the the number
// of columns < 20.
batchInsertSize int = 3_000
)
type DB struct { type DB struct {
gorm *gorm.DB gorm *gorm.DB
...@@ -31,8 +39,7 @@ func NewDB(dbConfig config.DBConfig) (*DB, error) { ...@@ -31,8 +39,7 @@ func NewDB(dbConfig config.DBConfig) (*DB, error) {
dsn += fmt.Sprintf(" password=%s", dbConfig.Password) dsn += fmt.Sprintf(" password=%s", dbConfig.Password)
} }
gorm, err := gorm.Open(postgres.Open(dsn), &gorm.Config{ gorm, err := gorm.Open(postgres.Open(dsn), &gorm.Config{
// The indexer will explicitly manage the transaction // The indexer will explicitly manage the transactions
// flow processing blocks
SkipDefaultTransaction: true, SkipDefaultTransaction: true,
// We may choose to create an adapter such that the // We may choose to create an adapter such that the
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"math/big" "math/big"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
) )
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/indexer/bigint"
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node" "github.com/ethereum-optimism/optimism/indexer/node"
...@@ -17,7 +18,7 @@ import ( ...@@ -17,7 +18,7 @@ import (
"testing" "testing"
) )
func Test_L1ETL_Construction(t *testing.T) { func TestL1ETLConstruction(t *testing.T) {
etlMetrics := NewMetrics(metrics.NewRegistry(), "l1") etlMetrics := NewMetrics(metrics.NewRegistry(), "l1")
type testSuite struct { type testSuite struct {
...@@ -39,11 +40,10 @@ func Test_L1ETL_Construction(t *testing.T) { ...@@ -39,11 +40,10 @@ func Test_L1ETL_Construction(t *testing.T) {
db := database.NewMockDB() db := database.NewMockDB()
testStart := big.NewInt(100) testStart := big.NewInt(100)
db.MockBlocks.On("L1LatestBlockHeader").Return(nil, nil) db.MockBlocks.On("L1LatestBlockHeader").Return(nil, nil)
client.On("BlockHeaderByNumber", mock.MatchedBy( client.On("BlockHeaderByNumber", mock.MatchedBy(
node.BigIntMatcher(100))).Return( bigint.Matcher(100))).Return(
&types.Header{ &types.Header{
ParentHash: common.HexToHash("0x69"), ParentHash: common.HexToHash("0x69"),
}, nil) }, nil)
......
CREATE DOMAIN UINT256 AS NUMERIC DO $$
CHECK (VALUE >= 0 AND VALUE < 2^256 and SCALE(VALUE) = 0); BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'uint256') THEN
CREATE DOMAIN UINT256 AS NUMERIC
CHECK (VALUE >= 0 AND VALUE < 2^256 and SCALE(VALUE) = 0);
END IF;
END $$;
/** /**
* BLOCK DATA * BLOCK DATA
...@@ -16,6 +21,8 @@ CREATE TABLE IF NOT EXISTS l1_block_headers ( ...@@ -16,6 +21,8 @@ CREATE TABLE IF NOT EXISTS l1_block_headers (
-- Raw Data -- Raw Data
rlp_bytes VARCHAR NOT NULL rlp_bytes VARCHAR NOT NULL
); );
CREATE INDEX IF NOT EXISTS l1_block_headers_timestamp ON l1_block_headers(timestamp);
CREATE INDEX IF NOT EXISTS l1_block_headers_number ON l1_block_headers(number);
CREATE TABLE IF NOT EXISTS l2_block_headers ( CREATE TABLE IF NOT EXISTS l2_block_headers (
-- Searchable fields -- Searchable fields
...@@ -27,6 +34,8 @@ CREATE TABLE IF NOT EXISTS l2_block_headers ( ...@@ -27,6 +34,8 @@ CREATE TABLE IF NOT EXISTS l2_block_headers (
-- Raw Data -- Raw Data
rlp_bytes VARCHAR NOT NULL rlp_bytes VARCHAR NOT NULL
); );
CREATE INDEX IF NOT EXISTS l2_block_headers_timestamp ON l2_block_headers(timestamp);
CREATE INDEX IF NOT EXISTS l2_block_headers_number ON l2_block_headers(number);
/** /**
* EVENT DATA * EVENT DATA
...@@ -45,6 +54,9 @@ CREATE TABLE IF NOT EXISTS l1_contract_events ( ...@@ -45,6 +54,9 @@ CREATE TABLE IF NOT EXISTS l1_contract_events (
-- Raw Data -- Raw Data
rlp_bytes VARCHAR NOT NULL rlp_bytes VARCHAR NOT NULL
); );
CREATE INDEX IF NOT EXISTS l1_contract_events_timestamp ON l1_contract_events(timestamp);
CREATE INDEX IF NOT EXISTS l1_contract_events_block_hash ON l1_contract_events(block_hash);
CREATE INDEX IF NOT EXISTS l1_contract_events_event_signature ON l1_contract_events(event_signature);
CREATE TABLE IF NOT EXISTS l2_contract_events ( CREATE TABLE IF NOT EXISTS l2_contract_events (
-- Searchable fields -- Searchable fields
...@@ -59,6 +71,9 @@ CREATE TABLE IF NOT EXISTS l2_contract_events ( ...@@ -59,6 +71,9 @@ CREATE TABLE IF NOT EXISTS l2_contract_events (
-- Raw Data -- Raw Data
rlp_bytes VARCHAR NOT NULL rlp_bytes VARCHAR NOT NULL
); );
CREATE INDEX IF NOT EXISTS l2_contract_events_timestamp ON l2_contract_events(timestamp);
CREATE INDEX IF NOT EXISTS l2_contract_events_block_hash ON l2_contract_events(block_hash);
CREATE INDEX IF NOT EXISTS l2_contract_events_event_signature ON l2_contract_events(event_signature);
-- Tables that index finalization markers for L2 blocks. -- Tables that index finalization markers for L2 blocks.
...@@ -79,6 +94,7 @@ CREATE TABLE IF NOT EXISTS output_proposals ( ...@@ -79,6 +94,7 @@ CREATE TABLE IF NOT EXISTS output_proposals (
output_proposed_guid VARCHAR NOT NULL UNIQUE REFERENCES l1_contract_events(guid) ON DELETE CASCADE output_proposed_guid VARCHAR NOT NULL UNIQUE REFERENCES l1_contract_events(guid) ON DELETE CASCADE
); );
/** /**
* BRIDGING DATA * BRIDGING DATA
*/ */
...@@ -118,6 +134,10 @@ CREATE TABLE IF NOT EXISTS l1_transaction_deposits ( ...@@ -118,6 +134,10 @@ CREATE TABLE IF NOT EXISTS l1_transaction_deposits (
data VARCHAR NOT NULL, data VARCHAR NOT NULL,
timestamp INTEGER NOT NULL CHECK (timestamp > 0) timestamp INTEGER NOT NULL CHECK (timestamp > 0)
); );
CREATE INDEX IF NOT EXISTS l1_transaction_deposits_timestamp ON l1_transaction_deposits(timestamp);
CREATE INDEX IF NOT EXISTS l1_transaction_deposits_initiated_l1_event_guid ON l1_transaction_deposits(initiated_l1_event_guid);
CREATE INDEX IF NOT EXISTS l1_transaction_deposits_from_address ON l1_transaction_deposits(from_address);
CREATE TABLE IF NOT EXISTS l2_transaction_withdrawals ( CREATE TABLE IF NOT EXISTS l2_transaction_withdrawals (
withdrawal_hash VARCHAR PRIMARY KEY, withdrawal_hash VARCHAR PRIMARY KEY,
nonce UINT256 NOT NULL UNIQUE, nonce UINT256 NOT NULL UNIQUE,
...@@ -136,6 +156,9 @@ CREATE TABLE IF NOT EXISTS l2_transaction_withdrawals ( ...@@ -136,6 +156,9 @@ CREATE TABLE IF NOT EXISTS l2_transaction_withdrawals (
data VARCHAR NOT NULL, data VARCHAR NOT NULL,
timestamp INTEGER NOT NULL CHECK (timestamp > 0) timestamp INTEGER NOT NULL CHECK (timestamp > 0)
); );
CREATE INDEX IF NOT EXISTS l2_transaction_withdrawals_timestamp ON l2_transaction_withdrawals(timestamp);
CREATE INDEX IF NOT EXISTS l2_transaction_withdrawals_initiated_l2_event_guid ON l2_transaction_withdrawals(initiated_l2_event_guid);
CREATE INDEX IF NOT EXISTS l2_transaction_withdrawals_from_address ON l2_transaction_withdrawals(from_address);
-- CrossDomainMessenger -- CrossDomainMessenger
CREATE TABLE IF NOT EXISTS l1_bridge_messages( CREATE TABLE IF NOT EXISTS l1_bridge_messages(
...@@ -154,6 +177,10 @@ CREATE TABLE IF NOT EXISTS l1_bridge_messages( ...@@ -154,6 +177,10 @@ CREATE TABLE IF NOT EXISTS l1_bridge_messages(
data VARCHAR NOT NULL, data VARCHAR NOT NULL,
timestamp INTEGER NOT NULL CHECK (timestamp > 0) timestamp INTEGER NOT NULL CHECK (timestamp > 0)
); );
CREATE INDEX IF NOT EXISTS l1_bridge_messages_timestamp ON l1_bridge_messages(timestamp);
CREATE INDEX IF NOT EXISTS l1_bridge_messages_transaction_source_hash ON l1_bridge_messages(transaction_source_hash);
CREATE INDEX IF NOT EXISTS l1_bridge_messages_from_address ON l1_bridge_messages(from_address);
CREATE TABLE IF NOT EXISTS l2_bridge_messages( CREATE TABLE IF NOT EXISTS l2_bridge_messages(
message_hash VARCHAR PRIMARY KEY, message_hash VARCHAR PRIMARY KEY,
nonce UINT256 NOT NULL UNIQUE, nonce UINT256 NOT NULL UNIQUE,
...@@ -170,6 +197,9 @@ CREATE TABLE IF NOT EXISTS l2_bridge_messages( ...@@ -170,6 +197,9 @@ CREATE TABLE IF NOT EXISTS l2_bridge_messages(
data VARCHAR NOT NULL, data VARCHAR NOT NULL,
timestamp INTEGER NOT NULL CHECK (timestamp > 0) timestamp INTEGER NOT NULL CHECK (timestamp > 0)
); );
CREATE INDEX IF NOT EXISTS l2_bridge_messages_timestamp ON l2_bridge_messages(timestamp);
CREATE INDEX IF NOT EXISTS l2_bridge_messages_transaction_withdrawal_hash ON l2_bridge_messages(transaction_withdrawal_hash);
CREATE INDEX IF NOT EXISTS l2_bridge_messages_from_address ON l2_bridge_messages(from_address);
-- StandardBridge -- StandardBridge
CREATE TABLE IF NOT EXISTS l1_bridge_deposits ( CREATE TABLE IF NOT EXISTS l1_bridge_deposits (
...@@ -185,6 +215,10 @@ CREATE TABLE IF NOT EXISTS l1_bridge_deposits ( ...@@ -185,6 +215,10 @@ CREATE TABLE IF NOT EXISTS l1_bridge_deposits (
data VARCHAR NOT NULL, data VARCHAR NOT NULL,
timestamp INTEGER NOT NULL CHECK (timestamp > 0) timestamp INTEGER NOT NULL CHECK (timestamp > 0)
); );
CREATE INDEX IF NOT EXISTS l1_bridge_deposits_timestamp ON l1_bridge_deposits(timestamp);
CREATE INDEX IF NOT EXISTS l1_bridge_deposits_cross_domain_message_hash ON l1_bridge_deposits(cross_domain_message_hash);
CREATE INDEX IF NOT EXISTS l1_bridge_deposits_from_address ON l1_bridge_deposits(from_address);
CREATE TABLE IF NOT EXISTS l2_bridge_withdrawals ( CREATE TABLE IF NOT EXISTS l2_bridge_withdrawals (
transaction_withdrawal_hash VARCHAR PRIMARY KEY REFERENCES l2_transaction_withdrawals(withdrawal_hash) ON DELETE CASCADE, transaction_withdrawal_hash VARCHAR PRIMARY KEY REFERENCES l2_transaction_withdrawals(withdrawal_hash) ON DELETE CASCADE,
cross_domain_message_hash VARCHAR NOT NULL UNIQUE REFERENCES l2_bridge_messages(message_hash) ON DELETE CASCADE, cross_domain_message_hash VARCHAR NOT NULL UNIQUE REFERENCES l2_bridge_messages(message_hash) ON DELETE CASCADE,
...@@ -198,3 +232,6 @@ CREATE TABLE IF NOT EXISTS l2_bridge_withdrawals ( ...@@ -198,3 +232,6 @@ CREATE TABLE IF NOT EXISTS l2_bridge_withdrawals (
data VARCHAR NOT NULL, data VARCHAR NOT NULL,
timestamp INTEGER NOT NULL CHECK (timestamp > 0) timestamp INTEGER NOT NULL CHECK (timestamp > 0)
); );
CREATE INDEX IF NOT EXISTS l2_bridge_withdrawals_timestamp ON l2_bridge_withdrawals(timestamp);
CREATE INDEX IF NOT EXISTS l2_bridge_withdrawals_cross_domain_message_hash ON l2_bridge_withdrawals(cross_domain_message_hash);
CREATE INDEX IF NOT EXISTS l2_bridge_withdrawals_from_address ON l2_bridge_withdrawals(from_address);
package node
import "math/big"
var bigZero = big.NewInt(0)
var bigOne = big.NewInt(1)
// returns a new big.Int for `end` to which `end - start` <= size.
// @note (start, end) is an inclusive range
func clampBigInt(start, end *big.Int, size uint64) *big.Int {
temp := new(big.Int)
count := temp.Sub(end, start).Uint64() + 1
if count <= size {
return end
}
// we re-use the allocated temp as the new end
temp.Add(start, big.NewInt(int64(size-1)))
return temp
}
// returns an inner comparison function result for a big.Int
func BigIntMatcher(num int64) func(*big.Int) bool {
return func(bi *big.Int) bool { return bi.Int64() == num }
}
package node
import (
"math/big"
"testing"
"github.com/stretchr/testify/assert"
)
func TestClampBigInt(t *testing.T) {
assert.True(t, true)
start := big.NewInt(1)
end := big.NewInt(10)
// When the (start, end) bounds are within range
// the same end pointer should be returned
// larger range
result := clampBigInt(start, end, 20)
assert.True(t, end == result)
// exact range
result = clampBigInt(start, end, 10)
assert.True(t, end == result)
// smaller range
result = clampBigInt(start, end, 5)
assert.False(t, end == result)
assert.Equal(t, uint64(5), result.Uint64())
}
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"math/big" "math/big"
"github.com/ethereum-optimism/optimism/indexer/bigint"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
...@@ -55,12 +56,12 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, ...@@ -55,12 +56,12 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header,
} }
} }
nextHeight := bigZero nextHeight := bigint.Zero
if f.lastHeader != nil { if f.lastHeader != nil {
nextHeight = new(big.Int).Add(f.lastHeader.Number, bigOne) nextHeight = new(big.Int).Add(f.lastHeader.Number, bigint.One)
} }
endHeight = clampBigInt(nextHeight, endHeight, maxSize) endHeight = bigint.Clamp(nextHeight, endHeight, maxSize)
headers, err := f.ethClient.BlockHeadersByRange(nextHeight, endHeight) headers, err := f.ethClient.BlockHeadersByRange(nextHeight, endHeight)
if err != nil { if err != nil {
return nil, fmt.Errorf("error querying blocks by range: %w", err) return nil, fmt.Errorf("error querying blocks by range: %w", err)
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"math/big" "math/big"
"testing" "testing"
"github.com/ethereum-optimism/optimism/indexer/bigint"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
...@@ -37,7 +38,7 @@ func TestHeaderTraversalNextFinalizedHeadersNoOp(t *testing.T) { ...@@ -37,7 +38,7 @@ func TestHeaderTraversalNextFinalizedHeadersNoOp(t *testing.T) {
// start from block 10 as the latest fetched block // start from block 10 as the latest fetched block
lastHeader := &types.Header{Number: big.NewInt(10)} lastHeader := &types.Header{Number: big.NewInt(10)}
headerTraversal := NewHeaderTraversal(client, lastHeader, bigZero) headerTraversal := NewHeaderTraversal(client, lastHeader, bigint.Zero)
// no new headers when matched with head // no new headers when matched with head
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(lastHeader, nil) client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(lastHeader, nil)
...@@ -50,12 +51,12 @@ func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) { ...@@ -50,12 +51,12 @@ func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from genesis // start from genesis
headerTraversal := NewHeaderTraversal(client, nil, bigZero) headerTraversal := NewHeaderTraversal(client, nil, bigint.Zero)
// blocks [0..4] // blocks [0..4]
headers := makeHeaders(5, nil) headers := makeHeaders(5, nil)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil).Times(1) // Times so that we can override next client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(0)), mock.MatchedBy(BigIntMatcher(4))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5) headers, err := headerTraversal.NextFinalizedHeaders(5)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, headers, 5) require.Len(t, headers, 5)
...@@ -63,7 +64,7 @@ func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) { ...@@ -63,7 +64,7 @@ func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) {
// blocks [5..9] // blocks [5..9]
headers = makeHeaders(5, &headers[len(headers)-1]) headers = makeHeaders(5, &headers[len(headers)-1])
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil) client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil)
client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(5)), mock.MatchedBy(BigIntMatcher(9))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(9))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(5) headers, err = headerTraversal.NextFinalizedHeaders(5)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, headers, 5) require.Len(t, headers, 5)
...@@ -73,21 +74,21 @@ func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) { ...@@ -73,21 +74,21 @@ func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from genesis // start from genesis
headerTraversal := NewHeaderTraversal(client, nil, bigZero) headerTraversal := NewHeaderTraversal(client, nil, bigint.Zero)
// 100 "available" headers // 100 "available" headers
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&types.Header{Number: big.NewInt(100)}, nil) client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&types.Header{Number: big.NewInt(100)}, nil)
// clamped by the supplied size // clamped by the supplied size
headers := makeHeaders(5, nil) headers := makeHeaders(5, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(0)), mock.MatchedBy(BigIntMatcher(4))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5) headers, err := headerTraversal.NextFinalizedHeaders(5)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, headers, 5) require.Len(t, headers, 5)
// clamped by the supplied size. FinalizedHeight == 100 // clamped by the supplied size. FinalizedHeight == 100
headers = makeHeaders(10, &headers[len(headers)-1]) headers = makeHeaders(10, &headers[len(headers)-1])
client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(5)), mock.MatchedBy(BigIntMatcher(14))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(14))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(10) headers, err = headerTraversal.NextFinalizedHeaders(10)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, headers, 10) require.Len(t, headers, 10)
...@@ -97,12 +98,12 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) { ...@@ -97,12 +98,12 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
client := new(MockEthClient) client := new(MockEthClient)
// start from genesis // start from genesis
headerTraversal := NewHeaderTraversal(client, nil, bigZero) headerTraversal := NewHeaderTraversal(client, nil, bigint.Zero)
// blocks [0..4] // blocks [0..4]
headers := makeHeaders(5, nil) headers := makeHeaders(5, nil)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil).Times(1) // Times so that we can override next client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(0)), mock.MatchedBy(BigIntMatcher(4))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5) headers, err := headerTraversal.NextFinalizedHeaders(5)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, headers, 5) require.Len(t, headers, 5)
...@@ -110,7 +111,7 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) { ...@@ -110,7 +111,7 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
// blocks [5..9]. Next batch is not chained correctly (starts again from genesis) // blocks [5..9]. Next batch is not chained correctly (starts again from genesis)
headers = makeHeaders(5, nil) headers = makeHeaders(5, nil)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&types.Header{Number: big.NewInt(9)}, nil) client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&types.Header{Number: big.NewInt(9)}, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(BigIntMatcher(5)), mock.MatchedBy(BigIntMatcher(9))).Return(headers, nil) client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(9))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(5) headers, err = headerTraversal.NextFinalizedHeaders(5)
require.Nil(t, headers) require.Nil(t, headers)
require.Equal(t, ErrHeaderTraversalAndProviderMismatchedState, err) require.Equal(t, ErrHeaderTraversalAndProviderMismatchedState, err)
......
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
) )
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"errors" "errors"
"math/big" "math/big"
"github.com/ethereum-optimism/optimism/indexer/bigint"
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/etl" "github.com/ethereum-optimism/optimism/indexer/etl"
...@@ -42,7 +43,7 @@ func NewBridgeProcessor(log log.Logger, db *database.DB, l1Etl *etl.L1ETL, chain ...@@ -42,7 +43,7 @@ func NewBridgeProcessor(log log.Logger, db *database.DB, l1Etl *etl.L1ETL, chain
if latestL1Header == nil && latestL2Header == nil { if latestL1Header == nil && latestL2Header == nil {
log.Info("no indexed state, starting from rollup genesis") log.Info("no indexed state, starting from rollup genesis")
} else { } else {
l1Height, l2Height := big.NewInt(0), big.NewInt(0) l1Height, l2Height := bigint.Zero, bigint.Zero
if latestL1Header != nil { if latestL1Header != nil {
l1Height = latestL1Header.Number l1Height = latestL1Header.Number
l1Header = latestL1Header.RLPHeader.Header() l1Header = latestL1Header.RLPHeader.Header()
...@@ -109,12 +110,12 @@ func (b *BridgeProcessor) Start(ctx context.Context) error { ...@@ -109,12 +110,12 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
// Process Bridge Events // Process Bridge Events
toL1Height, toL2Height := latestEpoch.L1BlockHeader.Number, latestEpoch.L2BlockHeader.Number toL1Height, toL2Height := latestEpoch.L1BlockHeader.Number, latestEpoch.L2BlockHeader.Number
fromL1Height, fromL2Height := big.NewInt(0), big.NewInt(0) fromL1Height, fromL2Height := bigint.Zero, bigint.Zero
if b.LatestL1Header != nil { if b.LatestL1Header != nil {
fromL1Height = new(big.Int).Add(b.LatestL1Header.Number, big.NewInt(1)) fromL1Height = new(big.Int).Add(b.LatestL1Header.Number, bigint.One)
} }
if b.LatestL2Header != nil { if b.LatestL2Header != nil {
fromL2Height = new(big.Int).Add(b.LatestL2Header.Number, big.NewInt(1)) fromL2Height = new(big.Int).Add(b.LatestL2Header.Number, bigint.One)
} }
batchLog := b.log.New("epoch_start_number", fromL1Height, "epoch_end_number", toL1Height) batchLog := b.log.New("epoch_start_number", fromL1Height, "epoch_end_number", toL1Height)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment