Commit 2ece2172 authored by Conner Fromknecht's avatar Conner Fromknecht

feat: add PendingTx tracking to teleportr db

parent 44d2dc90
...@@ -77,10 +77,19 @@ CREATE TABLE IF NOT EXISTS last_processed_block ( ...@@ -77,10 +77,19 @@ CREATE TABLE IF NOT EXISTS last_processed_block (
); );
` `
const pendingTxTable = `
CREATE TABLE IF NOT EXISTS pending_txs (
txn_hash VARCHAR NOT NULL PRIMARY KEY,
start_id INT8 NOT NULL,
end_id INT8 NOT NULL
);
`
var migrations = []string{ var migrations = []string{
createDepositsTable, createDepositsTable,
createDisbursementsTable, createDisbursementsTable,
lastProcessedBlockTable, lastProcessedBlockTable,
pendingTxTable,
} }
// Config houses the data required to connect to a Postgres backend. // Config houses the data required to connect to a Postgres backend.
...@@ -417,3 +426,88 @@ func (d *Database) CompletedTeleports() ([]CompletedTeleport, error) { ...@@ -417,3 +426,88 @@ func (d *Database) CompletedTeleports() ([]CompletedTeleport, error) {
return teleports, nil return teleports, nil
} }
// PendingTx encapsulates the metadata stored about published disbursement txs.
type PendingTx struct {
// Txhash is the tx hash of the disbursement tx.
TxHash common.Hash
// StartID is the deposit id of the first disbursement, inclusive.
StartID uint64
// EndID is the deposit id fo the last disbursement, exclusive.
EndID uint64
}
const upsertPendingTxStatement = `
INSERT INTO pending_txs (txn_hash, start_id, end_id)
VALUES ($1, $2, $3)
ON CONFLICT (txn_hash) DO UPDATE
SET (start_id, end_id) = ($2, $3)
`
// UpsertPendingTx inserts a disbursement, or updates the entry if the TxHash
// already exists.
func (d *Database) UpsertPendingTx(pendingTx PendingTx) error {
_, err := d.conn.Exec(
upsertPendingTxStatement,
pendingTx.TxHash.String(),
pendingTx.StartID,
pendingTx.EndID,
)
return err
}
const listPendingTxsQuery = `
SELECT txn_hash, start_id, end_id
FROM pending_txs
ORDER BY start_id DESC, end_id DESC, txn_hash ASC
`
// ListPendingTxs returns all pending txs stored in the database.
func (d *Database) ListPendingTxs() ([]PendingTx, error) {
rows, err := d.conn.Query(listPendingTxsQuery)
if err != nil {
return nil, err
}
defer rows.Close()
var pendingTxs []PendingTx
for rows.Next() {
var pendingTx PendingTx
var txHashStr string
err = rows.Scan(
&txHashStr,
&pendingTx.StartID,
&pendingTx.EndID,
)
if err != nil {
return nil, err
}
pendingTx.TxHash = common.HexToHash(txHashStr)
pendingTxs = append(pendingTxs, pendingTx)
}
if err := rows.Err(); err != nil {
return nil, err
}
return pendingTxs, nil
}
const deletePendingTxsStatement = `
DELETE FROM pending_txs
WHERE start_id = $1 AND end_id = $2
`
// DeletePendingTx removes any pending txs with matching start and end ids. This
// allows the caller to remove any logically-conflicting pending txs from the
// database after successfully processing the outcomes.
func (d *Database) DeletePendingTx(startID, endID uint64) error {
_, err := d.conn.Exec(
deletePendingTxsStatement,
startID,
endID,
)
return err
}
...@@ -311,3 +311,135 @@ func TestUpsertDisbursement(t *testing.T) { ...@@ -311,3 +311,135 @@ func TestUpsertDisbursement(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
require.Equal(t, expTeleports, teleports) require.Equal(t, expTeleports, teleports)
} }
// TestUpsertPendingTxs asserts that UpsertPendingTx properly records a pending
// tx, and that it appears in ListPendingTxs on subsequent calls.
func TestUpsertPendingTxs(t *testing.T) {
t.Parallel()
d := newDatabase(t)
defer d.Close()
// Should be empty at first.
pendingTxs, err := d.ListPendingTxs()
require.Nil(t, err)
require.Nil(t, pendingTxs)
// Add first pending tx.
pendingTx1 := db.PendingTx{
TxHash: common.HexToHash("0x11"),
StartID: 0,
EndID: 1,
}
err = d.UpsertPendingTx(pendingTx1)
require.Nil(t, err)
pendingTxs, err = d.ListPendingTxs()
require.Nil(t, err)
require.Equal(t, []db.PendingTx{pendingTx1}, pendingTxs)
// Add second pending tx.
pendingTx2 := db.PendingTx{
TxHash: common.HexToHash("0x22"),
StartID: 0,
EndID: 1,
}
err = d.UpsertPendingTx(pendingTx2)
require.Nil(t, err)
pendingTxs, err = d.ListPendingTxs()
require.Nil(t, err)
require.Equal(t, []db.PendingTx{pendingTx1, pendingTx2}, pendingTxs)
// Readd duplciate pending tx.
err = d.UpsertPendingTx(pendingTx2)
require.Nil(t, err)
pendingTxs, err = d.ListPendingTxs()
require.Nil(t, err)
require.Equal(t, []db.PendingTx{pendingTx1, pendingTx2}, pendingTxs)
// Add third pending tx.
pendingTx3 := db.PendingTx{
TxHash: common.HexToHash("0x33"),
StartID: 1,
EndID: 2,
}
err = d.UpsertPendingTx(pendingTx3)
require.Nil(t, err)
pendingTxs, err = d.ListPendingTxs()
require.Nil(t, err)
require.Equal(t, []db.PendingTx{pendingTx3, pendingTx1, pendingTx2}, pendingTxs)
}
// TestDeletePendingTx asserts that DeletePendingTx properly cleans up the
// pending_txs table when provided with various start/end ids.
func TestDeletePendingTx(t *testing.T) {
t.Parallel()
d := newDatabase(t)
defer d.Close()
pendingTx1 := db.PendingTx{
TxHash: common.HexToHash("0x11"),
StartID: 0,
EndID: 1,
}
pendingTx2 := db.PendingTx{
TxHash: common.HexToHash("0x22"),
StartID: 0,
EndID: 1,
}
pendingTx3 := db.PendingTx{
TxHash: common.HexToHash("0x33"),
StartID: 1,
EndID: 2,
}
err := d.UpsertPendingTx(pendingTx1)
require.Nil(t, err)
err = d.UpsertPendingTx(pendingTx2)
require.Nil(t, err)
err = d.UpsertPendingTx(pendingTx3)
require.Nil(t, err)
pendingTxs, err := d.ListPendingTxs()
require.Nil(t, err)
require.Equal(t, []db.PendingTx{pendingTx3, pendingTx1, pendingTx2}, pendingTxs)
// Delete with indexes that do not match any start/end, no effect.
err = d.DeletePendingTx(3, 4)
require.Nil(t, err)
pendingTxs, err = d.ListPendingTxs()
require.Nil(t, err)
require.Equal(t, []db.PendingTx{pendingTx3, pendingTx1, pendingTx2}, pendingTxs)
// Delete with indexes that matches start but no end, no effect.
err = d.DeletePendingTx(1, 3)
require.Nil(t, err)
pendingTxs, err = d.ListPendingTxs()
require.Nil(t, err)
require.Equal(t, []db.PendingTx{pendingTx3, pendingTx1, pendingTx2}, pendingTxs)
// Delete with indexes that matches end but no start, no effect.
err = d.DeletePendingTx(0, 2)
require.Nil(t, err)
pendingTxs, err = d.ListPendingTxs()
require.Nil(t, err)
require.Equal(t, []db.PendingTx{pendingTx3, pendingTx1, pendingTx2}, pendingTxs)
// Delete with indexes that matches start and end, should remove both.
err = d.DeletePendingTx(0, 1)
require.Nil(t, err)
pendingTxs, err = d.ListPendingTxs()
require.Nil(t, err)
require.Equal(t, []db.PendingTx{pendingTx3}, pendingTxs)
// Delete with indexes that matches start and end, no empty.
err = d.DeletePendingTx(1, 2)
require.Nil(t, err)
pendingTxs, err = d.ListPendingTxs()
require.Nil(t, err)
require.Nil(t, pendingTxs)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment