Commit b11ba1e7 authored by Hamdi Allam's avatar Hamdi Allam

refactor batches

parent a6086c18
...@@ -82,7 +82,6 @@ func runMigrations(ctx *cli.Context) error { ...@@ -82,7 +82,6 @@ func runMigrations(ctx *cli.Context) error {
log.Info("running migrations...") log.Info("running migrations...")
cfg, err := config.LoadConfig(log, ctx.String(ConfigFlag.Name)) cfg, err := config.LoadConfig(log, ctx.String(ConfigFlag.Name))
migrationsDir := ctx.String(MigrationsFlag.Name)
if err != nil { if err != nil {
log.Error("failed to load config", "err", err) log.Error("failed to load config", "err", err)
return err return err
...@@ -93,8 +92,9 @@ func runMigrations(ctx *cli.Context) error { ...@@ -93,8 +92,9 @@ func runMigrations(ctx *cli.Context) error {
log.Error("failed to connect to database", "err", err) log.Error("failed to connect to database", "err", err)
return err return err
} }
defer db.Close() defer db.Close()
migrationsDir := ctx.String(MigrationsFlag.Name)
return db.ExecuteSQLMigration(migrationsDir) return db.ExecuteSQLMigration(migrationsDir)
} }
......
...@@ -81,7 +81,7 @@ func newBlocksDB(log log.Logger, db *gorm.DB) BlocksDB { ...@@ -81,7 +81,7 @@ func newBlocksDB(log log.Logger, db *gorm.DB) BlocksDB {
func (db *blocksDB) StoreL1BlockHeaders(headers []L1BlockHeader) error { func (db *blocksDB) StoreL1BlockHeaders(headers []L1BlockHeader) error {
deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "hash"}}, DoNothing: true}) deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "hash"}}, DoNothing: true})
result := deduped.CreateInBatches(&headers, batchInsertSize) result := deduped.Create(&headers)
if result.Error == nil && int(result.RowsAffected) < len(headers) { if result.Error == nil && int(result.RowsAffected) < len(headers) {
db.log.Warn("ignored L1 block duplicates", "duplicates", len(headers)-int(result.RowsAffected)) db.log.Warn("ignored L1 block duplicates", "duplicates", len(headers)-int(result.RowsAffected))
} }
...@@ -124,7 +124,7 @@ func (db *blocksDB) L1LatestBlockHeader() (*L1BlockHeader, error) { ...@@ -124,7 +124,7 @@ func (db *blocksDB) L1LatestBlockHeader() (*L1BlockHeader, error) {
func (db *blocksDB) StoreL2BlockHeaders(headers []L2BlockHeader) error { func (db *blocksDB) StoreL2BlockHeaders(headers []L2BlockHeader) error {
deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "hash"}}, DoNothing: true}) deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "hash"}}, DoNothing: true})
result := deduped.CreateInBatches(&headers, batchInsertSize) result := deduped.Create(&headers)
if result.Error == nil && int(result.RowsAffected) < len(headers) { if result.Error == nil && int(result.RowsAffected) < len(headers) {
db.log.Warn("ignored L2 block duplicates", "duplicates", len(headers)-int(result.RowsAffected)) db.log.Warn("ignored L2 block duplicates", "duplicates", len(headers)-int(result.RowsAffected))
} }
......
...@@ -76,7 +76,7 @@ func newBridgeMessagesDB(log log.Logger, db *gorm.DB) BridgeMessagesDB { ...@@ -76,7 +76,7 @@ func newBridgeMessagesDB(log log.Logger, db *gorm.DB) BridgeMessagesDB {
func (db bridgeMessagesDB) StoreL1BridgeMessages(messages []L1BridgeMessage) error { func (db bridgeMessagesDB) StoreL1BridgeMessages(messages []L1BridgeMessage) error {
deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "message_hash"}}, DoNothing: true}) deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "message_hash"}}, DoNothing: true})
result := deduped.CreateInBatches(&messages, batchInsertSize) result := deduped.Create(&messages)
if result.Error == nil && int(result.RowsAffected) < len(messages) { if result.Error == nil && int(result.RowsAffected) < len(messages) {
db.log.Warn("ignored L1 bridge message duplicates", "duplicates", len(messages)-int(result.RowsAffected)) db.log.Warn("ignored L1 bridge message duplicates", "duplicates", len(messages)-int(result.RowsAffected))
} }
...@@ -126,7 +126,7 @@ func (db bridgeMessagesDB) MarkRelayedL1BridgeMessage(messageHash common.Hash, r ...@@ -126,7 +126,7 @@ func (db bridgeMessagesDB) MarkRelayedL1BridgeMessage(messageHash common.Hash, r
func (db bridgeMessagesDB) StoreL2BridgeMessages(messages []L2BridgeMessage) error { func (db bridgeMessagesDB) StoreL2BridgeMessages(messages []L2BridgeMessage) error {
deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "message_hash"}}, DoNothing: true}) deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "message_hash"}}, DoNothing: true})
result := deduped.CreateInBatches(&messages, batchInsertSize) result := deduped.Create(&messages)
if result.Error == nil && int(result.RowsAffected) < len(messages) { if result.Error == nil && int(result.RowsAffected) < len(messages) {
db.log.Warn("ignored L2 bridge message duplicates", "duplicates", len(messages)-int(result.RowsAffected)) db.log.Warn("ignored L2 bridge message duplicates", "duplicates", len(messages)-int(result.RowsAffected))
} }
......
...@@ -84,7 +84,7 @@ func newBridgeTransactionsDB(log log.Logger, db *gorm.DB) BridgeTransactionsDB { ...@@ -84,7 +84,7 @@ func newBridgeTransactionsDB(log log.Logger, db *gorm.DB) BridgeTransactionsDB {
func (db *bridgeTransactionsDB) StoreL1TransactionDeposits(deposits []L1TransactionDeposit) error { func (db *bridgeTransactionsDB) StoreL1TransactionDeposits(deposits []L1TransactionDeposit) error {
deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "source_hash"}}, DoNothing: true}) deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "source_hash"}}, DoNothing: true})
result := deduped.CreateInBatches(&deposits, batchInsertSize) result := deduped.Create(&deposits)
if result.Error == nil && int(result.RowsAffected) < len(deposits) { if result.Error == nil && int(result.RowsAffected) < len(deposits) {
db.log.Warn("ignored L1 tx deposit duplicates", "duplicates", len(deposits)-int(result.RowsAffected)) db.log.Warn("ignored L1 tx deposit duplicates", "duplicates", len(deposits)-int(result.RowsAffected))
} }
...@@ -142,7 +142,7 @@ func (db *bridgeTransactionsDB) L1LatestBlockHeader() (*L1BlockHeader, error) { ...@@ -142,7 +142,7 @@ func (db *bridgeTransactionsDB) L1LatestBlockHeader() (*L1BlockHeader, error) {
func (db *bridgeTransactionsDB) StoreL2TransactionWithdrawals(withdrawals []L2TransactionWithdrawal) error { func (db *bridgeTransactionsDB) StoreL2TransactionWithdrawals(withdrawals []L2TransactionWithdrawal) error {
deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "withdrawal_hash"}}, DoNothing: true}) deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "withdrawal_hash"}}, DoNothing: true})
result := deduped.CreateInBatches(&withdrawals, batchInsertSize) result := deduped.Create(&withdrawals)
if result.Error == nil && int(result.RowsAffected) < len(withdrawals) { if result.Error == nil && int(result.RowsAffected) < len(withdrawals) {
db.log.Warn("ignored L2 tx withdrawal duplicates", "duplicates", len(withdrawals)-int(result.RowsAffected)) db.log.Warn("ignored L2 tx withdrawal duplicates", "duplicates", len(withdrawals)-int(result.RowsAffected))
} }
......
...@@ -95,7 +95,7 @@ func newBridgeTransfersDB(log log.Logger, db *gorm.DB) BridgeTransfersDB { ...@@ -95,7 +95,7 @@ func newBridgeTransfersDB(log log.Logger, db *gorm.DB) BridgeTransfersDB {
func (db *bridgeTransfersDB) StoreL1BridgeDeposits(deposits []L1BridgeDeposit) error { func (db *bridgeTransfersDB) StoreL1BridgeDeposits(deposits []L1BridgeDeposit) error {
deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "transaction_source_hash"}}, DoNothing: true}) deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "transaction_source_hash"}}, DoNothing: true})
result := deduped.CreateInBatches(&deposits, batchInsertSize) result := deduped.Create(&deposits)
if result.Error == nil && int(result.RowsAffected) < len(deposits) { if result.Error == nil && int(result.RowsAffected) < len(deposits) {
db.log.Warn("ignored L1 bridge transfer duplicates", "duplicates", len(deposits)-int(result.RowsAffected)) db.log.Warn("ignored L1 bridge transfer duplicates", "duplicates", len(deposits)-int(result.RowsAffected))
} }
...@@ -213,7 +213,7 @@ l1_bridge_deposits.timestamp, cross_domain_message_hash, local_token_address, re ...@@ -213,7 +213,7 @@ l1_bridge_deposits.timestamp, cross_domain_message_hash, local_token_address, re
func (db *bridgeTransfersDB) StoreL2BridgeWithdrawals(withdrawals []L2BridgeWithdrawal) error { func (db *bridgeTransfersDB) StoreL2BridgeWithdrawals(withdrawals []L2BridgeWithdrawal) error {
deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "transaction_withdrawal_hash"}}, DoNothing: true}) deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "transaction_withdrawal_hash"}}, DoNothing: true})
result := deduped.CreateInBatches(&withdrawals, batchInsertSize) result := deduped.Create(&withdrawals)
if result.Error == nil && int(result.RowsAffected) < len(withdrawals) { if result.Error == nil && int(result.RowsAffected) < len(withdrawals) {
db.log.Warn("ignored L2 bridge transfer duplicates", "duplicates", len(withdrawals)-int(result.RowsAffected)) db.log.Warn("ignored L2 bridge transfer duplicates", "duplicates", len(withdrawals)-int(result.RowsAffected))
} }
......
...@@ -115,7 +115,7 @@ func (db *contractEventsDB) StoreL1ContractEvents(events []L1ContractEvent) erro ...@@ -115,7 +115,7 @@ func (db *contractEventsDB) StoreL1ContractEvents(events []L1ContractEvent) erro
// Since the block hash refers back to L1, we dont necessarily have to check // Since the block hash refers back to L1, we dont necessarily have to check
// that the RLP bytes match when doing conflict resolution. // that the RLP bytes match when doing conflict resolution.
deduped := db.gorm.Clauses(clause.OnConflict{OnConstraint: "l1_contract_events_block_hash_log_index_key", DoNothing: true}) deduped := db.gorm.Clauses(clause.OnConflict{OnConstraint: "l1_contract_events_block_hash_log_index_key", DoNothing: true})
result := deduped.CreateInBatches(&events, batchInsertSize) result := deduped.Create(&events)
if result.Error == nil && int(result.RowsAffected) < len(events) { if result.Error == nil && int(result.RowsAffected) < len(events) {
db.log.Warn("ignored L1 contract event duplicates", "duplicates", len(events)-int(result.RowsAffected)) db.log.Warn("ignored L1 contract event duplicates", "duplicates", len(events)-int(result.RowsAffected))
} }
...@@ -189,7 +189,7 @@ func (db *contractEventsDB) StoreL2ContractEvents(events []L2ContractEvent) erro ...@@ -189,7 +189,7 @@ func (db *contractEventsDB) StoreL2ContractEvents(events []L2ContractEvent) erro
// Since the block hash refers back to L2, we dont necessarily have to check // Since the block hash refers back to L2, we dont necessarily have to check
// that the RLP bytes match when doing conflict resolution. // that the RLP bytes match when doing conflict resolution.
deduped := db.gorm.Clauses(clause.OnConflict{OnConstraint: "l2_contract_events_block_hash_log_index_key", DoNothing: true}) deduped := db.gorm.Clauses(clause.OnConflict{OnConstraint: "l2_contract_events_block_hash_log_index_key", DoNothing: true})
result := deduped.CreateInBatches(&events, batchInsertSize) result := deduped.Create(&events)
if result.Error == nil && int(result.RowsAffected) < len(events) { if result.Error == nil && int(result.RowsAffected) < len(events) {
db.log.Warn("ignored L2 contract event duplicates", "duplicates", len(events)-int(result.RowsAffected)) db.log.Warn("ignored L2 contract event duplicates", "duplicates", len(events)-int(result.RowsAffected))
} }
......
...@@ -19,14 +19,6 @@ import ( ...@@ -19,14 +19,6 @@ import (
"gorm.io/gorm" "gorm.io/gorm"
) )
var (
// The postgres parameter counter for a given query is stored via a uint16,
// resulting in a parameter limit of 65535. In order to avoid reaching this limit
// we'll utilize a batch size of 3k for inserts, well below as long as the the number
// of columns < 20.
batchInsertSize int = 3_000
)
type DB struct { type DB struct {
gorm *gorm.DB gorm *gorm.DB
log log.Logger log log.Logger
...@@ -53,9 +45,16 @@ func NewDB(log log.Logger, dbConfig config.DBConfig) (*DB, error) { ...@@ -53,9 +45,16 @@ func NewDB(log log.Logger, dbConfig config.DBConfig) (*DB, error) {
} }
gormConfig := gorm.Config{ gormConfig := gorm.Config{
Logger: newLogger(log),
// The indexer will explicitly manage the transactions // The indexer will explicitly manage the transactions
SkipDefaultTransaction: true, SkipDefaultTransaction: true,
Logger: newLogger(log),
// The postgres parameter counter for a given query is represented with uint16,
// resulting in a parameter limit of 65535. In order to avoid reaching this limit
// we'll utilize a batch size of 3k for inserts, well below the limit as long as
// the number of columns < 20.
CreateBatchSize: 3_000,
} }
retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250} retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
...@@ -125,12 +124,14 @@ func (db *DB) ExecuteSQLMigration(migrationsFolder string) error { ...@@ -125,12 +124,14 @@ func (db *DB) ExecuteSQLMigration(migrationsFolder string) error {
} }
// Read the migration file content // Read the migration file content
db.log.Info("reading sql file", "path", path)
fileContent, readErr := os.ReadFile(path) fileContent, readErr := os.ReadFile(path)
if readErr != nil { if readErr != nil {
return errors.Wrap(readErr, fmt.Sprintf("Error reading SQL file: %s", path)) return errors.Wrap(readErr, fmt.Sprintf("Error reading SQL file: %s", path))
} }
// Execute the migration // Execute the migration
db.log.Info("executing sql file", "path", path)
execErr := db.gorm.Exec(string(fileContent)).Error execErr := db.gorm.Exec(string(fileContent)).Error
if execErr != nil { if execErr != nil {
return errors.Wrap(execErr, fmt.Sprintf("Error executing SQL script: %s", path)) return errors.Wrap(execErr, fmt.Sprintf("Error executing SQL script: %s", path))
...@@ -139,5 +140,6 @@ func (db *DB) ExecuteSQLMigration(migrationsFolder string) error { ...@@ -139,5 +140,6 @@ func (db *DB) ExecuteSQLMigration(migrationsFolder string) error {
return nil return nil
}) })
db.log.Info("finished migrations")
return err return err
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment