Commit 89542545 authored by Conner Fromknecht's avatar Conner Fromknecht

feat: replace LastestDeposit with LastProcessedBlock

Currently the teleportr database supports a LatestDeposit method,
which returns the highest block number observed, if any. The intent was
to use this as the starting point for syncing, however, this isn't super
useful as there may be long periods of inactivity that we have already
scanned.

Instead, we now store the last processed block in a separate table, and
pass the end of the ingestion block range as argument to UpsertDeposits.
The list of deposits and last processed block are written atomically to
avoid consistency issues. The value can be retrieved using the
LastProcessedBlock getter.
parent 59ff146c
...@@ -69,9 +69,18 @@ CREATE TABLE IF NOT EXISTS disbursements ( ...@@ -69,9 +69,18 @@ CREATE TABLE IF NOT EXISTS disbursements (
); );
` `
const lastProcessedBlockTable = `
CREATE TABLE IF NOT EXISTS last_processed_block (
id BOOL PRIMARY KEY DEFAULT TRUE,
value INT8 NOT NULL,
CONSTRAINT id CHECK (id)
);
`
var migrations = []string{ var migrations = []string{
createDepositsTable, createDepositsTable,
createDisbursementsTable, createDisbursementsTable,
lastProcessedBlockTable,
} }
// Config houses the data required to connect to a Postgres backend. // Config houses the data required to connect to a Postgres backend.
...@@ -155,6 +164,13 @@ func (d *Database) Close() error { ...@@ -155,6 +164,13 @@ func (d *Database) Close() error {
return d.conn.Close() return d.conn.Close()
} }
const upsertLastProcessedBlock = `
INSERT INTO last_processed_block (value)
VALUES ($1)
ON CONFLICT (id) DO UPDATE
SET value = $1
`
const upsertDepositStatement = ` const upsertDepositStatement = `
INSERT INTO deposits (id, txn_hash, block_number, block_timestamp, address, amount) INSERT INTO deposits (id, txn_hash, block_number, block_timestamp, address, amount)
VALUES ($1, $2, $3, $4, $5, $6) VALUES ($1, $2, $3, $4, $5, $6)
...@@ -164,10 +180,10 @@ SET (txn_hash, block_number, block_timestamp, address, amount) = ($2, $3, $4, $5 ...@@ -164,10 +180,10 @@ SET (txn_hash, block_number, block_timestamp, address, amount) = ($2, $3, $4, $5
// UpsertDeposits inserts a list of deposits into the database, or updats an // UpsertDeposits inserts a list of deposits into the database, or updats an
// existing deposit in place if the same ID is found. // existing deposit in place if the same ID is found.
func (d *Database) UpsertDeposits(deposits []Deposit) error { func (d *Database) UpsertDeposits(
if len(deposits) == 0 { deposits []Deposit,
return nil lastProcessedBlock uint64,
} ) error {
// Sanity check deposits. // Sanity check deposits.
for _, deposit := range deposits { for _, deposit := range deposits {
...@@ -185,7 +201,6 @@ func (d *Database) UpsertDeposits(deposits []Deposit) error { ...@@ -185,7 +201,6 @@ func (d *Database) UpsertDeposits(deposits []Deposit) error {
}() }()
for _, deposit := range deposits { for _, deposit := range deposits {
_, err = tx.Exec( _, err = tx.Exec(
upsertDepositStatement, upsertDepositStatement,
deposit.ID, deposit.ID,
...@@ -200,29 +215,30 @@ func (d *Database) UpsertDeposits(deposits []Deposit) error { ...@@ -200,29 +215,30 @@ func (d *Database) UpsertDeposits(deposits []Deposit) error {
} }
} }
_, err = tx.Exec(upsertLastProcessedBlock, lastProcessedBlock)
if err != nil {
return err
}
return tx.Commit() return tx.Commit()
} }
const latestDepositQuery = ` const lastProcessedBlockQuery = `
SELECT block_number FROM deposits SELECT value FROM last_processed_block
ORDER BY block_number DESC
LIMIT 1
` `
// LatestDeposit returns the block number of the latest deposit known to the func (d *Database) LastProcessedBlock() (*uint64, error) {
// database. row := d.conn.QueryRow(lastProcessedBlockQuery)
func (d *Database) LatestDeposit() (*uint64, error) {
row := d.conn.QueryRow(latestDepositQuery)
var latestTransfer uint64 var lastProcessedBlock uint64
err := row.Scan(&latestTransfer) err := row.Scan(&lastProcessedBlock)
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
return nil, nil return nil, nil
} else if err != nil { } else if err != nil {
return nil, err return nil, err
} }
return &latestTransfer, nil return &lastProcessedBlock, nil
} }
const confirmedDepositsQuery = ` const confirmedDepositsQuery = `
...@@ -277,6 +293,28 @@ func (d *Database) ConfirmedDeposits(blockNumber, confirmations uint64) ([]Depos ...@@ -277,6 +293,28 @@ func (d *Database) ConfirmedDeposits(blockNumber, confirmations uint64) ([]Depos
return deposits, nil return deposits, nil
} }
const latestDisbursementIDQuery = `
SELECT id FROM disbursements
ORDER BY id DESC
LIMIT 1
`
// LatestDisbursementID returns the latest deposit id known to the database that
// has a recorded disbursement.
func (d *Database) LatestDisbursementID() (*uint64, error) {
row := d.conn.QueryRow(latestDisbursementIDQuery)
var latestDisbursementID uint64
err := row.Scan(&latestDisbursementID)
if err == sql.ErrNoRows {
return nil, nil
} else if err != nil {
return nil, err
}
return &latestDisbursementID, nil
}
const markDisbursedStatement = ` const markDisbursedStatement = `
INSERT INTO disbursements (id, txn_hash, block_number, block_timestamp) INSERT INTO disbursements (id, txn_hash, block_number, block_timestamp)
VALUES ($1, $2, $3, $4) VALUES ($1, $2, $3, $4)
......
...@@ -55,18 +55,18 @@ func TestOpenClose(t *testing.T) { ...@@ -55,18 +55,18 @@ func TestOpenClose(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
} }
// TestUpsert empty deposits asserts that it is safe to call UpsertDeposits with // TestUpsertEmptyDeposits empty deposits asserts that it is safe to call
// an empty list. // UpsertDeposits with an empty list.
func TestUpsertEmptyDeposits(t *testing.T) { func TestUpsertEmptyDeposits(t *testing.T) {
t.Parallel() t.Parallel()
d := newDatabase(t) d := newDatabase(t)
defer d.Close() defer d.Close()
err := d.UpsertDeposits(nil) err := d.UpsertDeposits(nil, 0)
require.Nil(t, err) require.Nil(t, err)
err = d.UpsertDeposits([]db.Deposit{}) err = d.UpsertDeposits([]db.Deposit{}, 0)
require.Nil(t, err) require.Nil(t, err)
} }
...@@ -78,58 +78,10 @@ func TestUpsertDepositWithZeroTimestampFails(t *testing.T) { ...@@ -78,58 +78,10 @@ func TestUpsertDepositWithZeroTimestampFails(t *testing.T) {
d := newDatabase(t) d := newDatabase(t)
defer d.Close() defer d.Close()
err := d.UpsertDeposits([]db.Deposit{{}}) err := d.UpsertDeposits([]db.Deposit{{}}, 0)
require.Equal(t, db.ErrZeroTimestamp, err) require.Equal(t, db.ErrZeroTimestamp, err)
} }
// TestLatestDeposit asserts that the LatestDeposit method properly returns the
// highest block number in the databse, or nil if no items are present.
func TestLatestDeposit(t *testing.T) {
t.Parallel()
d := newDatabase(t)
defer d.Close()
// Query should return nil on empty databse.
latestDeposit, err := d.LatestDeposit()
require.Nil(t, err)
require.Equal(t, (*uint64)(nil), latestDeposit)
// Update table to have a single element.
expLatestDeposit := uint64(1)
err = d.UpsertDeposits([]db.Deposit{{
ID: 1,
TxnHash: common.HexToHash("0xf1"),
BlockNumber: expLatestDeposit,
BlockTimestamp: testTimestamp,
Address: common.HexToAddress("0xa1"),
Amount: big.NewInt(1),
}})
require.Nil(t, err)
// Query should return block number of only deposit.
latestDeposit, err = d.LatestDeposit()
require.Nil(t, err)
require.Equal(t, &expLatestDeposit, latestDeposit)
// Update table to have two distinct block numbers.
expLatestDeposit = 2
err = d.UpsertDeposits([]db.Deposit{{
ID: 2,
TxnHash: common.HexToHash("0xf2"),
BlockNumber: expLatestDeposit,
BlockTimestamp: testTimestamp,
Address: common.HexToAddress("0xa2"),
Amount: big.NewInt(2),
}})
require.Nil(t, err)
// Query should return the highest of the two block numbers.
latestDeposit, err = d.LatestDeposit()
require.Nil(t, err)
require.Equal(t, &expLatestDeposit, latestDeposit)
}
// TestUpsertDeposits asserts that UpsertDeposits properly overwrites an // TestUpsertDeposits asserts that UpsertDeposits properly overwrites an
// existing entry with the same ID. // existing entry with the same ID.
func TestUpsertDeposits(t *testing.T) { func TestUpsertDeposits(t *testing.T) {
...@@ -147,7 +99,7 @@ func TestUpsertDeposits(t *testing.T) { ...@@ -147,7 +99,7 @@ func TestUpsertDeposits(t *testing.T) {
Amount: big.NewInt(1), Amount: big.NewInt(1),
} }
err := d.UpsertDeposits([]db.Deposit{deposit1}) err := d.UpsertDeposits([]db.Deposit{deposit1}, 0)
require.Nil(t, err) require.Nil(t, err)
deposits, err := d.ConfirmedDeposits(1, 1) deposits, err := d.ConfirmedDeposits(1, 1)
...@@ -163,7 +115,7 @@ func TestUpsertDeposits(t *testing.T) { ...@@ -163,7 +115,7 @@ func TestUpsertDeposits(t *testing.T) {
Amount: big.NewInt(2), Amount: big.NewInt(2),
} }
err = d.UpsertDeposits([]db.Deposit{deposit2}) err = d.UpsertDeposits([]db.Deposit{deposit2}, 0)
require.Nil(t, err) require.Nil(t, err)
deposits, err = d.ConfirmedDeposits(2, 1) deposits, err = d.ConfirmedDeposits(2, 1)
...@@ -171,6 +123,59 @@ func TestUpsertDeposits(t *testing.T) { ...@@ -171,6 +123,59 @@ func TestUpsertDeposits(t *testing.T) {
require.Equal(t, deposits, []db.Deposit{deposit2}) require.Equal(t, deposits, []db.Deposit{deposit2})
} }
// TestUpsertDepositsRecordsLastProcessedBlock asserts that calling
// UpsertDeposits properly records the last processed block.
func TestUpsertDepositsRecordsLastProcessedBlock(t *testing.T) {
t.Parallel()
d := newDatabase(t)
defer d.Close()
uint64Ptr := func(x uint64) *uint64 {
return &x
}
// Should be empty initially.
lastProcessedBlock, err := d.LastProcessedBlock()
require.Nil(t, err)
require.Nil(t, lastProcessedBlock)
// Insert nil deposits through block 1.
err = d.UpsertDeposits(nil, 1)
require.Nil(t, err)
// Check that LastProcessedBlock returns 1.
lastProcessedBlock, err = d.LastProcessedBlock()
require.Nil(t, err)
require.Equal(t, uint64Ptr(1), lastProcessedBlock)
// Insert empty deposits through block 2.
err = d.UpsertDeposits([]db.Deposit{}, 2)
require.Nil(t, err)
// Check that LastProcessedBlock returns 2.
lastProcessedBlock, err = d.LastProcessedBlock()
require.Nil(t, err)
require.Equal(t, uint64Ptr(2), lastProcessedBlock)
// Insert real deposit in block 3 with last processed at 4.
deposit := db.Deposit{
ID: 0,
TxnHash: common.HexToHash("0xff03"),
BlockNumber: 3,
BlockTimestamp: testTimestamp,
Address: common.HexToAddress("0xaa03"),
Amount: big.NewInt(3),
}
err = d.UpsertDeposits([]db.Deposit{deposit}, 4)
require.Nil(t, err)
// Check that LastProcessedBlock returns 2.
lastProcessedBlock, err = d.LastProcessedBlock()
require.Nil(t, err)
require.Equal(t, uint64Ptr(4), lastProcessedBlock)
}
// TestConfirmedDeposits asserts that ConfirmedDeposits properly returns the set // TestConfirmedDeposits asserts that ConfirmedDeposits properly returns the set
// of deposits that have sufficient confirmation, but do not have a recorded // of deposits that have sufficient confirmation, but do not have a recorded
// disbursement. // disbursement.
...@@ -211,7 +216,7 @@ func TestConfirmedDeposits(t *testing.T) { ...@@ -211,7 +216,7 @@ func TestConfirmedDeposits(t *testing.T) {
err = d.UpsertDeposits([]db.Deposit{ err = d.UpsertDeposits([]db.Deposit{
deposit1, deposit2, deposit3, deposit1, deposit2, deposit3,
}) }, 0)
require.Nil(t, err) require.Nil(t, err)
// First deposit only has 1 conf, should not be found using 2 confs at block // First deposit only has 1 conf, should not be found using 2 confs at block
...@@ -271,7 +276,7 @@ func TestUpsertDisbursement(t *testing.T) { ...@@ -271,7 +276,7 @@ func TestUpsertDisbursement(t *testing.T) {
Address: address, Address: address,
Amount: amount, Amount: amount,
}, },
}) }, 0)
require.Nil(t, err) require.Nil(t, err)
// Mark the deposit as disbursed with some temporary info. // Mark the deposit as disbursed with some temporary info.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment