Commit 0cf40464 authored by Hamdi Allam's avatar Hamdi Allam

database idempotency

parent 0600fe87
...@@ -27,5 +27,6 @@ func main() { ...@@ -27,5 +27,6 @@ func main() {
app := newCli(GitCommit, GitDate) app := newCli(GitCommit, GitDate)
if err := app.RunContext(ctx, os.Args); err != nil { if err := app.RunContext(ctx, os.Args); err != nil {
log.Error("application failed", "err", err) log.Error("application failed", "err", err)
os.Exit(1)
} }
} }
...@@ -7,8 +7,10 @@ import ( ...@@ -7,8 +7,10 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/clause"
) )
/** /**
...@@ -67,17 +69,23 @@ type BlocksDB interface { ...@@ -67,17 +69,23 @@ type BlocksDB interface {
*/ */
type blocksDB struct { type blocksDB struct {
log log.Logger
gorm *gorm.DB gorm *gorm.DB
} }
func newBlocksDB(db *gorm.DB) BlocksDB { func newBlocksDB(log log.Logger, db *gorm.DB) BlocksDB {
return &blocksDB{gorm: db} return &blocksDB{log: log.New("table", "blocks"), gorm: db}
} }
// L1 // L1
func (db *blocksDB) StoreL1BlockHeaders(headers []L1BlockHeader) error { func (db *blocksDB) StoreL1BlockHeaders(headers []L1BlockHeader) error {
result := db.gorm.CreateInBatches(&headers, batchInsertSize) deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "hash"}}, DoNothing: true})
result := deduped.CreateInBatches(&headers, batchInsertSize)
if result.Error != nil && int(result.RowsAffected) < len(headers) {
db.log.Warn("ignored L1 block duplicates", "duplicates", len(headers)-int(result.RowsAffected))
}
return result.Error return result.Error
} }
...@@ -115,7 +123,12 @@ func (db *blocksDB) L1LatestBlockHeader() (*L1BlockHeader, error) { ...@@ -115,7 +123,12 @@ func (db *blocksDB) L1LatestBlockHeader() (*L1BlockHeader, error) {
// L2 // L2
func (db *blocksDB) StoreL2BlockHeaders(headers []L2BlockHeader) error { func (db *blocksDB) StoreL2BlockHeaders(headers []L2BlockHeader) error {
result := db.gorm.CreateInBatches(&headers, batchInsertSize) deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "hash"}}, DoNothing: true})
result := deduped.CreateInBatches(&headers, batchInsertSize)
if result.Error != nil && int(result.RowsAffected) < len(headers) {
db.log.Warn("ignored L2 block duplicates", "duplicates", len(headers)-int(result.RowsAffected))
}
return result.Error return result.Error
} }
......
...@@ -6,8 +6,10 @@ import ( ...@@ -6,8 +6,10 @@ import (
"math/big" "math/big"
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/clause"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/google/uuid" "github.com/google/uuid"
) )
...@@ -60,11 +62,12 @@ type BridgeMessagesDB interface { ...@@ -60,11 +62,12 @@ type BridgeMessagesDB interface {
*/ */
type bridgeMessagesDB struct { type bridgeMessagesDB struct {
log log.Logger
gorm *gorm.DB gorm *gorm.DB
} }
func newBridgeMessagesDB(db *gorm.DB) BridgeMessagesDB { func newBridgeMessagesDB(log log.Logger, db *gorm.DB) BridgeMessagesDB {
return &bridgeMessagesDB{gorm: db} return &bridgeMessagesDB{log: log.New("table", "messages"), gorm: db}
} }
/** /**
...@@ -72,7 +75,12 @@ func newBridgeMessagesDB(db *gorm.DB) BridgeMessagesDB { ...@@ -72,7 +75,12 @@ func newBridgeMessagesDB(db *gorm.DB) BridgeMessagesDB {
*/ */
func (db bridgeMessagesDB) StoreL1BridgeMessages(messages []L1BridgeMessage) error { func (db bridgeMessagesDB) StoreL1BridgeMessages(messages []L1BridgeMessage) error {
result := db.gorm.CreateInBatches(&messages, batchInsertSize) deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "message_hash"}}, DoNothing: true})
result := deduped.CreateInBatches(&messages, batchInsertSize)
if result.Error != nil && int(result.RowsAffected) < len(messages) {
db.log.Warn("ignored L1 bridge message duplicates", "duplicates", len(messages)-int(result.RowsAffected))
}
return result.Error return result.Error
} }
...@@ -98,7 +106,13 @@ func (db bridgeMessagesDB) MarkRelayedL1BridgeMessage(messageHash common.Hash, r ...@@ -98,7 +106,13 @@ func (db bridgeMessagesDB) MarkRelayedL1BridgeMessage(messageHash common.Hash, r
if err != nil { if err != nil {
return err return err
} else if message == nil { } else if message == nil {
return fmt.Errorf("L1BridgeMessage with message hash %s not found", messageHash) return fmt.Errorf("L1BridgeMessage %s not found", messageHash)
}
if message.RelayedMessageEventGUID != nil && message.RelayedMessageEventGUID.ID() == relayEvent.ID() {
return nil
} else if message.RelayedMessageEventGUID != nil {
return fmt.Errorf("relayed message %s re-relayed with a different event %d", messageHash, relayEvent)
} }
message.RelayedMessageEventGUID = &relayEvent message.RelayedMessageEventGUID = &relayEvent
...@@ -111,7 +125,12 @@ func (db bridgeMessagesDB) MarkRelayedL1BridgeMessage(messageHash common.Hash, r ...@@ -111,7 +125,12 @@ func (db bridgeMessagesDB) MarkRelayedL1BridgeMessage(messageHash common.Hash, r
*/ */
func (db bridgeMessagesDB) StoreL2BridgeMessages(messages []L2BridgeMessage) error { func (db bridgeMessagesDB) StoreL2BridgeMessages(messages []L2BridgeMessage) error {
result := db.gorm.CreateInBatches(&messages, batchInsertSize) deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "message_hash"}}, DoNothing: true})
result := deduped.CreateInBatches(&messages, batchInsertSize)
if result.Error != nil && int(result.RowsAffected) < len(messages) {
db.log.Warn("ignored L1 bridge message duplicates", "duplicates", len(messages)-int(result.RowsAffected))
}
return result.Error return result.Error
} }
...@@ -137,7 +156,13 @@ func (db bridgeMessagesDB) MarkRelayedL2BridgeMessage(messageHash common.Hash, r ...@@ -137,7 +156,13 @@ func (db bridgeMessagesDB) MarkRelayedL2BridgeMessage(messageHash common.Hash, r
if err != nil { if err != nil {
return err return err
} else if message == nil { } else if message == nil {
return fmt.Errorf("L2BridgeMessage with message hash %s not found", messageHash) return fmt.Errorf("L2BridgeMessage %s not found", messageHash)
}
if message.RelayedMessageEventGUID != nil && message.RelayedMessageEventGUID.ID() == relayEvent.ID() {
return nil
} else if message.RelayedMessageEventGUID != nil {
return fmt.Errorf("relayed message %s re-relayed with a different event %d", messageHash, relayEvent)
} }
message.RelayedMessageEventGUID = &relayEvent message.RelayedMessageEventGUID = &relayEvent
......
...@@ -7,8 +7,10 @@ import ( ...@@ -7,8 +7,10 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/clause"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
) )
/** /**
...@@ -68,11 +70,12 @@ type BridgeTransactionsDB interface { ...@@ -68,11 +70,12 @@ type BridgeTransactionsDB interface {
*/ */
type bridgeTransactionsDB struct { type bridgeTransactionsDB struct {
log log.Logger
gorm *gorm.DB gorm *gorm.DB
} }
func newBridgeTransactionsDB(db *gorm.DB) BridgeTransactionsDB { func newBridgeTransactionsDB(log log.Logger, db *gorm.DB) BridgeTransactionsDB {
return &bridgeTransactionsDB{gorm: db} return &bridgeTransactionsDB{log: log.New("table", "txs"), gorm: db}
} }
/** /**
...@@ -80,7 +83,12 @@ func newBridgeTransactionsDB(db *gorm.DB) BridgeTransactionsDB { ...@@ -80,7 +83,12 @@ func newBridgeTransactionsDB(db *gorm.DB) BridgeTransactionsDB {
*/ */
func (db *bridgeTransactionsDB) StoreL1TransactionDeposits(deposits []L1TransactionDeposit) error { func (db *bridgeTransactionsDB) StoreL1TransactionDeposits(deposits []L1TransactionDeposit) error {
result := db.gorm.CreateInBatches(&deposits, batchInsertSize) deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "source_hash"}}, DoNothing: true})
result := deduped.CreateInBatches(&deposits, batchInsertSize)
if result.Error != nil && int(result.RowsAffected) < len(deposits) {
db.log.Warn("ignored L1 tx deposit duplicates", "duplicates", len(deposits)-int(result.RowsAffected))
}
return result.Error return result.Error
} }
...@@ -133,7 +141,12 @@ func (db *bridgeTransactionsDB) L1LatestBlockHeader() (*L1BlockHeader, error) { ...@@ -133,7 +141,12 @@ func (db *bridgeTransactionsDB) L1LatestBlockHeader() (*L1BlockHeader, error) {
*/ */
func (db *bridgeTransactionsDB) StoreL2TransactionWithdrawals(withdrawals []L2TransactionWithdrawal) error { func (db *bridgeTransactionsDB) StoreL2TransactionWithdrawals(withdrawals []L2TransactionWithdrawal) error {
result := db.gorm.CreateInBatches(&withdrawals, batchInsertSize) deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "withdrawal_hash"}}, DoNothing: true})
result := deduped.CreateInBatches(&withdrawals, batchInsertSize)
if result.Error != nil && int(result.RowsAffected) < len(withdrawals) {
db.log.Warn("ignored L2 tx deposit withdrawals", "duplicates", len(withdrawals)-int(result.RowsAffected))
}
return result.Error return result.Error
} }
...@@ -155,11 +168,16 @@ func (db *bridgeTransactionsDB) MarkL2TransactionWithdrawalProvenEvent(withdrawa ...@@ -155,11 +168,16 @@ func (db *bridgeTransactionsDB) MarkL2TransactionWithdrawalProvenEvent(withdrawa
withdrawal, err := db.L2TransactionWithdrawal(withdrawalHash) withdrawal, err := db.L2TransactionWithdrawal(withdrawalHash)
if err != nil { if err != nil {
return err return err
} } else if withdrawal == nil {
if withdrawal == nil {
return fmt.Errorf("transaction withdrawal hash %s not found", withdrawalHash) return fmt.Errorf("transaction withdrawal hash %s not found", withdrawalHash)
} }
if withdrawal.ProvenL1EventGUID != nil && withdrawal.ProvenL1EventGUID.ID() == provenL1EventGuid.ID() {
return nil
} else if withdrawal.ProvenL1EventGUID != nil {
return fmt.Errorf("proven withdrawal %s re-proven with a different event %d", withdrawalHash, provenL1EventGuid)
}
withdrawal.ProvenL1EventGUID = &provenL1EventGuid withdrawal.ProvenL1EventGUID = &provenL1EventGuid
result := db.gorm.Save(&withdrawal) result := db.gorm.Save(&withdrawal)
return result.Error return result.Error
...@@ -170,14 +188,18 @@ func (db *bridgeTransactionsDB) MarkL2TransactionWithdrawalFinalizedEvent(withdr ...@@ -170,14 +188,18 @@ func (db *bridgeTransactionsDB) MarkL2TransactionWithdrawalFinalizedEvent(withdr
withdrawal, err := db.L2TransactionWithdrawal(withdrawalHash) withdrawal, err := db.L2TransactionWithdrawal(withdrawalHash)
if err != nil { if err != nil {
return err return err
} } else if withdrawal == nil {
if withdrawal == nil {
return fmt.Errorf("transaction withdrawal hash %s not found", withdrawalHash) return fmt.Errorf("transaction withdrawal hash %s not found", withdrawalHash)
} } else if withdrawal.ProvenL1EventGUID == nil {
if withdrawal.ProvenL1EventGUID == nil {
return fmt.Errorf("cannot mark unproven withdrawal hash %s as finalized", withdrawal.WithdrawalHash) return fmt.Errorf("cannot mark unproven withdrawal hash %s as finalized", withdrawal.WithdrawalHash)
} }
if withdrawal.FinalizedL1EventGUID != nil && withdrawal.FinalizedL1EventGUID.ID() == finalizedL1EventGuid.ID() {
return nil
} else if withdrawal.FinalizedL1EventGUID != nil {
return fmt.Errorf("finalized withdrawal %s re-finalized with a different event %d", withdrawalHash, finalizedL1EventGuid)
}
withdrawal.FinalizedL1EventGUID = &finalizedL1EventGuid withdrawal.FinalizedL1EventGUID = &finalizedL1EventGuid
withdrawal.Succeeded = &succeeded withdrawal.Succeeded = &succeeded
result := db.gorm.Save(&withdrawal) result := db.gorm.Save(&withdrawal)
......
...@@ -5,9 +5,11 @@ import ( ...@@ -5,9 +5,11 @@ import (
"fmt" "fmt"
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/clause"
"github.com/ethereum-optimism/optimism/op-bindings/predeploys" "github.com/ethereum-optimism/optimism/op-bindings/predeploys"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
) )
var ( var (
...@@ -79,11 +81,12 @@ type BridgeTransfersDB interface { ...@@ -79,11 +81,12 @@ type BridgeTransfersDB interface {
*/ */
type bridgeTransfersDB struct { type bridgeTransfersDB struct {
log log.Logger
gorm *gorm.DB gorm *gorm.DB
} }
func newBridgeTransfersDB(db *gorm.DB) BridgeTransfersDB { func newBridgeTransfersDB(log log.Logger, db *gorm.DB) BridgeTransfersDB {
return &bridgeTransfersDB{gorm: db} return &bridgeTransfersDB{log: log.New("table", "transfers"), gorm: db}
} }
/** /**
...@@ -91,7 +94,12 @@ func newBridgeTransfersDB(db *gorm.DB) BridgeTransfersDB { ...@@ -91,7 +94,12 @@ func newBridgeTransfersDB(db *gorm.DB) BridgeTransfersDB {
*/ */
func (db *bridgeTransfersDB) StoreL1BridgeDeposits(deposits []L1BridgeDeposit) error { func (db *bridgeTransfersDB) StoreL1BridgeDeposits(deposits []L1BridgeDeposit) error {
result := db.gorm.CreateInBatches(&deposits, batchInsertSize) deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "transaction_source_hash"}}, DoNothing: true})
result := deduped.CreateInBatches(&deposits, batchInsertSize)
if result.Error != nil && int(result.RowsAffected) < len(deposits) {
db.log.Warn("ignored L1 bridge transfer duplicates", "duplicates", len(deposits)-int(result.RowsAffected))
}
return result.Error return result.Error
} }
...@@ -204,7 +212,12 @@ l1_bridge_deposits.timestamp, cross_domain_message_hash, local_token_address, re ...@@ -204,7 +212,12 @@ l1_bridge_deposits.timestamp, cross_domain_message_hash, local_token_address, re
*/ */
func (db *bridgeTransfersDB) StoreL2BridgeWithdrawals(withdrawals []L2BridgeWithdrawal) error { func (db *bridgeTransfersDB) StoreL2BridgeWithdrawals(withdrawals []L2BridgeWithdrawal) error {
result := db.gorm.CreateInBatches(&withdrawals, batchInsertSize) deduped := db.gorm.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "transaction_withdrawal_hash"}}, DoNothing: true})
result := deduped.CreateInBatches(&withdrawals, batchInsertSize)
if result.Error != nil && int(result.RowsAffected) < len(withdrawals) {
db.log.Warn("ignored L2 bridge transfer duplicates", "duplicates", len(withdrawals)-int(result.RowsAffected))
}
return result.Error return result.Error
} }
......
...@@ -6,9 +6,11 @@ import ( ...@@ -6,9 +6,11 @@ import (
"math/big" "math/big"
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/clause"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/google/uuid" "github.com/google/uuid"
) )
...@@ -99,17 +101,25 @@ type ContractEventsDB interface { ...@@ -99,17 +101,25 @@ type ContractEventsDB interface {
*/ */
type contractEventsDB struct { type contractEventsDB struct {
log log.Logger
gorm *gorm.DB gorm *gorm.DB
} }
func newContractEventsDB(db *gorm.DB) ContractEventsDB { func newContractEventsDB(log log.Logger, db *gorm.DB) ContractEventsDB {
return &contractEventsDB{gorm: db} return &contractEventsDB{log: log.New("table", "events"), gorm: db}
} }
// L1 // L1
func (db *contractEventsDB) StoreL1ContractEvents(events []L1ContractEvent) error { func (db *contractEventsDB) StoreL1ContractEvents(events []L1ContractEvent) error {
result := db.gorm.CreateInBatches(&events, batchInsertSize) // Since the block hash refers back to L1, we dont necessarily have to check
// that the RLP bytes match when doing conflict resolution.
deduped := db.gorm.Clauses(clause.OnConflict{OnConstraint: "l1_contract_events_block_hash_log_index_key", DoNothing: true})
result := deduped.CreateInBatches(&events, batchInsertSize)
if result.Error != nil && int(result.RowsAffected) < len(events) {
db.log.Warn("ignored L1 contract event duplicates", "duplicates", len(events)-int(result.RowsAffected))
}
return result.Error return result.Error
} }
...@@ -176,7 +186,14 @@ func (db *contractEventsDB) L1LatestContractEventWithFilter(filter ContractEvent ...@@ -176,7 +186,14 @@ func (db *contractEventsDB) L1LatestContractEventWithFilter(filter ContractEvent
// L2 // L2
func (db *contractEventsDB) StoreL2ContractEvents(events []L2ContractEvent) error { func (db *contractEventsDB) StoreL2ContractEvents(events []L2ContractEvent) error {
result := db.gorm.CreateInBatches(&events, batchInsertSize) // Since the block hash refers back to L2, we dont necessarily have to check
// that the RLP bytes match when doing conflict resolution.
deduped := db.gorm.Clauses(clause.OnConflict{OnConstraint: "l2_contract_events_block_hash_log_index_key", DoNothing: true})
result := deduped.CreateInBatches(&events, batchInsertSize)
if result.Error != nil && int(result.RowsAffected) < len(events) {
db.log.Warn("ignored L2 contract event duplicates", "duplicates", len(events)-int(result.RowsAffected))
}
return result.Error return result.Error
} }
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"github.com/ethereum-optimism/optimism/indexer/config" "github.com/ethereum-optimism/optimism/indexer/config"
_ "github.com/ethereum-optimism/optimism/indexer/database/serializers" _ "github.com/ethereum-optimism/optimism/indexer/database/serializers"
"github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -28,6 +29,7 @@ var ( ...@@ -28,6 +29,7 @@ var (
type DB struct { type DB struct {
gorm *gorm.DB gorm *gorm.DB
log log.Logger
Blocks BlocksDB Blocks BlocksDB
ContractEvents ContractEventsDB ContractEvents ContractEventsDB
...@@ -37,7 +39,7 @@ type DB struct { ...@@ -37,7 +39,7 @@ type DB struct {
} }
func NewDB(log log.Logger, dbConfig config.DBConfig) (*DB, error) { func NewDB(log log.Logger, dbConfig config.DBConfig) (*DB, error) {
retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250} log = log.New("module", "db")
dsn := fmt.Sprintf("host=%s dbname=%s sslmode=disable", dbConfig.Host, dbConfig.Name) dsn := fmt.Sprintf("host=%s dbname=%s sslmode=disable", dbConfig.Host, dbConfig.Name)
if dbConfig.Port != 0 { if dbConfig.Port != 0 {
...@@ -56,6 +58,7 @@ func NewDB(log log.Logger, dbConfig config.DBConfig) (*DB, error) { ...@@ -56,6 +58,7 @@ func NewDB(log log.Logger, dbConfig config.DBConfig) (*DB, error) {
Logger: newLogger(log), Logger: newLogger(log),
} }
retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
gorm, err := retry.Do[*gorm.DB](context.Background(), 10, retryStrategy, func() (*gorm.DB, error) { gorm, err := retry.Do[*gorm.DB](context.Background(), 10, retryStrategy, func() (*gorm.DB, error) {
gorm, err := gorm.Open(postgres.Open(dsn), &gormConfig) gorm, err := gorm.Open(postgres.Open(dsn), &gormConfig)
if err != nil { if err != nil {
...@@ -66,16 +69,17 @@ func NewDB(log log.Logger, dbConfig config.DBConfig) (*DB, error) { ...@@ -66,16 +69,17 @@ func NewDB(log log.Logger, dbConfig config.DBConfig) (*DB, error) {
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to connect to database after multiple retries: %w", err) return nil, err
} }
db := &DB{ db := &DB{
gorm: gorm, gorm: gorm,
Blocks: newBlocksDB(gorm), log: log,
ContractEvents: newContractEventsDB(gorm), Blocks: newBlocksDB(log, gorm),
BridgeTransfers: newBridgeTransfersDB(gorm), ContractEvents: newContractEventsDB(log, gorm),
BridgeMessages: newBridgeMessagesDB(gorm), BridgeTransfers: newBridgeTransfersDB(log, gorm),
BridgeTransactions: newBridgeTransactionsDB(gorm), BridgeMessages: newBridgeMessagesDB(log, gorm),
BridgeTransactions: newBridgeTransactionsDB(log, gorm),
} }
return db, nil return db, nil
...@@ -85,7 +89,16 @@ func NewDB(log log.Logger, dbConfig config.DBConfig) (*DB, error) { ...@@ -85,7 +89,16 @@ func NewDB(log log.Logger, dbConfig config.DBConfig) (*DB, error) {
// transaction. If the supplied function errors, the transaction is rolled back. // transaction. If the supplied function errors, the transaction is rolled back.
func (db *DB) Transaction(fn func(db *DB) error) error { func (db *DB) Transaction(fn func(db *DB) error) error {
return db.gorm.Transaction(func(tx *gorm.DB) error { return db.gorm.Transaction(func(tx *gorm.DB) error {
return fn(dbFromGormTx(tx)) txDB := &DB{
gorm: tx,
Blocks: newBlocksDB(db.log, tx),
ContractEvents: newContractEventsDB(db.log, tx),
BridgeTransfers: newBridgeTransfersDB(db.log, tx),
BridgeMessages: newBridgeMessagesDB(db.log, tx),
BridgeTransactions: newBridgeTransactionsDB(db.log, tx),
}
return fn(txDB)
}) })
} }
...@@ -98,17 +111,6 @@ func (db *DB) Close() error { ...@@ -98,17 +111,6 @@ func (db *DB) Close() error {
return sql.Close() return sql.Close()
} }
func dbFromGormTx(tx *gorm.DB) *DB {
return &DB{
gorm: tx,
Blocks: newBlocksDB(tx),
ContractEvents: newContractEventsDB(tx),
BridgeTransfers: newBridgeTransfersDB(tx),
BridgeMessages: newBridgeMessagesDB(tx),
BridgeTransactions: newBridgeTransactionsDB(tx),
}
}
func (db *DB) ExecuteSQLMigration(migrationsFolder string) error { func (db *DB) ExecuteSQLMigration(migrationsFolder string) error {
err := filepath.Walk(migrationsFolder, func(path string, info os.FileInfo, err error) error { err := filepath.Walk(migrationsFolder, func(path string, info os.FileInfo, err error) error {
// Check for any walking error // Check for any walking error
......
...@@ -14,7 +14,7 @@ import ( ...@@ -14,7 +14,7 @@ import (
var ( var (
_ logger.Interface = Logger{} _ logger.Interface = Logger{}
SlowThresholdMilliseconds = 200 SlowThresholdMilliseconds int64 = 500
) )
type Logger struct { type Logger struct {
...@@ -22,7 +22,7 @@ type Logger struct { ...@@ -22,7 +22,7 @@ type Logger struct {
} }
func newLogger(log log.Logger) Logger { func newLogger(log log.Logger) Logger {
return Logger{log.New("module", "db")} return Logger{log}
} }
func (l Logger) LogMode(lvl logger.LogLevel) logger.Interface { func (l Logger) LogMode(lvl logger.LogLevel) logger.Interface {
...@@ -50,7 +50,7 @@ func (l Logger) Trace(ctx context.Context, begin time.Time, fc func() (sql strin ...@@ -50,7 +50,7 @@ func (l Logger) Trace(ctx context.Context, begin time.Time, fc func() (sql strin
sql = fmt.Sprintf("%sVALUES (...)", sql[:i]) sql = fmt.Sprintf("%sVALUES (...)", sql[:i])
} }
if elapsedMs < 200 { if elapsedMs < SlowThresholdMilliseconds {
l.log.Debug("database operation", "duration_ms", elapsedMs, "rows_affected", rows, "sql", sql) l.log.Debug("database operation", "duration_ms", elapsedMs, "rows_affected", rows, "sql", sql)
} else { } else {
l.log.Warn("database operation", "duration_ms", elapsedMs, "rows_affected", rows, "sql", sql) l.log.Warn("database operation", "duration_ms", elapsedMs, "rows_affected", rows, "sql", sql)
......
...@@ -63,6 +63,7 @@ CREATE INDEX IF NOT EXISTS l1_contract_events_timestamp ON l1_contract_events(ti ...@@ -63,6 +63,7 @@ CREATE INDEX IF NOT EXISTS l1_contract_events_timestamp ON l1_contract_events(ti
CREATE INDEX IF NOT EXISTS l1_contract_events_block_hash ON l1_contract_events(block_hash); CREATE INDEX IF NOT EXISTS l1_contract_events_block_hash ON l1_contract_events(block_hash);
CREATE INDEX IF NOT EXISTS l1_contract_events_event_signature ON l1_contract_events(event_signature); CREATE INDEX IF NOT EXISTS l1_contract_events_event_signature ON l1_contract_events(event_signature);
CREATE INDEX IF NOT EXISTS l1_contract_events_contract_address ON l1_contract_events(contract_address); CREATE INDEX IF NOT EXISTS l1_contract_events_contract_address ON l1_contract_events(contract_address);
ALTER TABLE l1_contract_events ADD UNIQUE (block_hash, log_index);
CREATE TABLE IF NOT EXISTS l2_contract_events ( CREATE TABLE IF NOT EXISTS l2_contract_events (
-- Searchable fields -- Searchable fields
...@@ -81,6 +82,7 @@ CREATE INDEX IF NOT EXISTS l2_contract_events_timestamp ON l2_contract_events(ti ...@@ -81,6 +82,7 @@ CREATE INDEX IF NOT EXISTS l2_contract_events_timestamp ON l2_contract_events(ti
CREATE INDEX IF NOT EXISTS l2_contract_events_block_hash ON l2_contract_events(block_hash); CREATE INDEX IF NOT EXISTS l2_contract_events_block_hash ON l2_contract_events(block_hash);
CREATE INDEX IF NOT EXISTS l2_contract_events_event_signature ON l2_contract_events(event_signature); CREATE INDEX IF NOT EXISTS l2_contract_events_event_signature ON l2_contract_events(event_signature);
CREATE INDEX IF NOT EXISTS l2_contract_events_contract_address ON l2_contract_events(contract_address); CREATE INDEX IF NOT EXISTS l2_contract_events_contract_address ON l2_contract_events(contract_address);
ALTER TABLE l2_contract_events ADD UNIQUE (block_hash, log_index);
/** /**
* BRIDGING DATA * BRIDGING DATA
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment