Commit 99c40279 authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

Merge pull request #2190 from cfromknecht/teleportr-postgres

feat: add teleportr Postgres backend
parents 93cd04ff 0dbd504c
......@@ -11,7 +11,7 @@ on:
- 'regenesis/*'
pull_request:
paths:
- 'go/batch-submitter/*'
- 'go/batch-submitter/**'
workflow_dispatch:
defaults:
......
......@@ -11,7 +11,7 @@ on:
- 'regenesis/*'
pull_request:
paths:
- 'go/bss-core/*'
- 'go/bss-core/**'
workflow_dispatch:
defaults:
......
name: teleportr unit tests
on:
push:
paths:
- 'go/teleportr/**'
branches:
- 'master'
- 'develop'
- '*rc'
- 'regenesis/*'
pull_request:
paths:
- 'go/teleportr/**'
workflow_dispatch:
defaults:
run:
working-directory: './go/teleportr'
jobs:
tests:
runs-on: ubuntu-latest
services:
postgres:
image: postgres
env:
POSTGRES_USER=postgres
POSTGRES_PASSWORD=password
ports:
- 5432:5432
steps:
- name: Install Go
uses: actions/setup-go@v2
with:
go-version: 1.16.x
- name: Checkout code
uses: actions/checkout@v2
- name: Test
run: go test -v ./...
package db
import (
"database/sql"
"errors"
"fmt"
"math/big"
"strings"
"time"
"github.com/ethereum/go-ethereum/common"
_ "github.com/lib/pq"
)
var (
// ErrZeroTimestamp signals that the caller attempted to insert deposits
// with a timestamp of zero.
ErrZeroTimestamp = errors.New("timestamp is zero")
// ErrUnknownDeposit signals that the target deposit could not be found.
ErrUnknownDeposit = errors.New("unknown deposit")
)
// Deposit represents an event emitted from the TeleportrDeposit contract on L1,
// along with additional info about the tx that generated the event.
type Deposit struct {
ID int64
TxnHash common.Hash
BlockNumber int64
BlockTimestamp time.Time
Address common.Address
Amount *big.Int
}
// ConfirmationInfo holds metadata about a tx on either the L1 or L2 chain.
type ConfirmationInfo struct {
TxnHash common.Hash
BlockNumber int64
BlockTimestamp time.Time
}
// CompletedTeleport represents an L1 deposit that has been disbursed on L2. The
// struct also hold info about the L1 and L2 txns involved.
type CompletedTeleport struct {
ID int64
Address common.Address
Amount *big.Int
Deposit ConfirmationInfo
Disbursement ConfirmationInfo
}
const createDepositsTable = `
CREATE TABLE IF NOT EXISTS deposits (
id INT8 NOT NULL PRIMARY KEY,
txn_hash VARCHAR NOT NULL,
block_number INT8 NOT NULL,
block_timestamp TIMESTAMPTZ NOT NULL,
address VARCHAR NOT NULL,
amount VARCHAR NOT NULL
);
`
const createDisbursementsTable = `
CREATE TABLE IF NOT EXISTS disbursements (
id INT8 NOT NULL PRIMARY KEY REFERENCES deposits(id),
txn_hash VARCHAR NOT NULL,
block_number INT8 NOT NULL,
block_timestamp TIMESTAMPTZ NOT NULL
);
`
var migrations = []string{
createDepositsTable,
createDisbursementsTable,
}
// Config houses the data required to connect to a Postgres backend.
type Config struct {
// Host is the database hostname.
Host string
// Port is the database port.
Port uint16
// User is the database user to log in as.
User string
// Password is the user's password to authenticate.
Password string
// DBName is the name of the database to connect to.
DBName string
// EnableSSL enables SLL on the connection if set to true.
EnableSSL bool
}
// WithDB returns the connection string with a specific database to connect to.
func (c Config) WithDB() string {
return fmt.Sprintf(
"host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",
c.Host, c.Port, c.User, c.Password, c.DBName, c.sslMode(),
)
}
// WithoutDB returns the connection string without connecting to a specific
// database.
func (c Config) WithoutDB() string {
return fmt.Sprintf(
"host=%s port=%d user=%s password=%s sslmode=%s",
c.Host, c.Port, c.User, c.Password, c.sslMode(),
)
}
// sslMode retuns "enabled" if EnableSSL is true, otherwise returns "disabled".
func (c Config) sslMode() string {
if c.EnableSSL {
return "enable"
}
return "disable"
}
// Database provides a Go API for accessing Teleportr read/write operations.
type Database struct {
conn *sql.DB
}
// Open creates a new database connection to the configured Postgres backend and
// applies any migrations.
func Open(cfg Config) (*Database, error) {
conn, err := sql.Open("postgres", cfg.WithDB())
if err != nil {
return nil, err
}
return &Database{
conn: conn,
}, nil
}
// Migrate applies all existing migrations to the open database.
func (d *Database) Migrate() error {
for _, migration := range migrations {
_, err := d.conn.Exec(migration)
if err != nil {
return err
}
}
return nil
}
// Close closes the connection to the database.
func (d *Database) Close() error {
return d.conn.Close()
}
const upsertDepositStatement = `
INSERT INTO deposits (id, txn_hash, block_number, block_timestamp, address, amount)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (id) DO UPDATE
SET (txn_hash, block_number, block_timestamp, address, amount) = ($2, $3, $4, $5, $6)
`
// UpsertDeposits inserts a list of deposits into the database, or updats an
// existing deposit in place if the same ID is found.
func (d *Database) UpsertDeposits(deposits []Deposit) error {
if len(deposits) == 0 {
return nil
}
// Sanity check deposits.
for _, deposit := range deposits {
if deposit.BlockTimestamp.IsZero() {
return ErrZeroTimestamp
}
}
tx, err := d.conn.Begin()
if err != nil {
return err
}
defer tx.Rollback()
for _, deposit := range deposits {
_, err = tx.Exec(
upsertDepositStatement,
deposit.ID,
deposit.TxnHash.String(),
deposit.BlockNumber,
deposit.BlockTimestamp,
deposit.Address.String(),
deposit.Amount.String(),
)
if err != nil {
return err
}
}
return tx.Commit()
}
const latestDepositQuery = `
SELECT block_number FROM deposits
ORDER BY block_number DESC
LIMIT 1
`
// LatestDeposit returns the block number of the latest deposit known to the
// database.
func (d *Database) LatestDeposit() (*int64, error) {
row := d.conn.QueryRow(latestDepositQuery)
var latestTransfer int64
err := row.Scan(&latestTransfer)
if err == sql.ErrNoRows {
return nil, nil
} else if err != nil {
return nil, err
}
return &latestTransfer, nil
}
const confirmedDepositsQuery = `
SELECT dep.*
FROM deposits AS dep
LEFT JOIN disbursements AS dis ON dep.id = dis.id
WHERE dis.id IS NULL AND dep.block_number + $1 <= $2 + 1
ORDER BY dep.id ASC
`
// ConfirmedDeposits returns the set of all deposits that have sufficient
// confirmation, but do not have a recorded disbursement.
func (d *Database) ConfirmedDeposits(blockNumber, confirmations int64) ([]Deposit, error) {
rows, err := d.conn.Query(confirmedDepositsQuery, confirmations, blockNumber)
if err != nil {
return nil, err
}
defer rows.Close()
var deposits []Deposit
for rows.Next() {
var deposit Deposit
var txnHashStr string
var addressStr string
var amountStr string
err = rows.Scan(
&deposit.ID,
&txnHashStr,
&deposit.BlockNumber,
&deposit.BlockTimestamp,
&addressStr,
&amountStr,
)
if err != nil {
return nil, err
}
amount, ok := new(big.Int).SetString(amountStr, 10)
if !ok {
return nil, fmt.Errorf("unable to parse amount %v", amount)
}
deposit.TxnHash = common.HexToHash(txnHashStr)
deposit.BlockTimestamp = deposit.BlockTimestamp.Local()
deposit.Amount = amount
deposit.Address = common.HexToAddress(addressStr)
deposits = append(deposits, deposit)
}
if err := rows.Err(); err != nil {
return nil, err
}
return deposits, nil
}
const markDisbursedStatement = `
INSERT INTO disbursements (id, txn_hash, block_number, block_timestamp)
VALUES ($1, $2, $3, $4)
ON CONFLICT (id) DO UPDATE
SET (txn_hash, block_number, block_timestamp) = ($2, $3, $4)
`
// UpsertDisbursement inserts a disbursement, or updates an existing record
// in-place if the ID already exists.
func (d *Database) UpsertDisbursement(
id int64,
txnHash common.Hash,
blockNumber int64,
blockTimestamp time.Time,
) error {
if blockTimestamp.IsZero() {
return ErrZeroTimestamp
}
result, err := d.conn.Exec(
markDisbursedStatement,
id,
txnHash.String(),
blockNumber,
blockTimestamp,
)
if err != nil {
if strings.Contains(err.Error(), "violates foreign key constraint") {
return ErrUnknownDeposit
}
return err
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return err
}
if rowsAffected != 1 {
return ErrUnknownDeposit
}
return nil
}
const completedTeleportsQuery = `
SELECT
dep.id, dep.address, dep.amount,
dep.txn_hash, dep.block_number, dep.block_timestamp,
dis.txn_hash, dis.block_number, dis.block_timestamp
FROM deposits AS dep, disbursements AS dis
WHERE dep.id = dis.id
ORDER BY id DESC
`
// CompletedTeleports returns the set of all deposits that have also been
// disbursed.
func (d *Database) CompletedTeleports() ([]CompletedTeleport, error) {
rows, err := d.conn.Query(completedTeleportsQuery)
if err != nil {
return nil, err
}
defer rows.Close()
var teleports []CompletedTeleport
for rows.Next() {
var teleport CompletedTeleport
var addressStr string
var amountStr string
var depTxnHashStr string
var disTxnHashStr string
err = rows.Scan(
&teleport.ID,
&addressStr,
&amountStr,
&depTxnHashStr,
&teleport.Deposit.BlockNumber,
&teleport.Deposit.BlockTimestamp,
&disTxnHashStr,
&teleport.Disbursement.BlockNumber,
&teleport.Disbursement.BlockTimestamp,
)
if err != nil {
return nil, err
}
amount, ok := new(big.Int).SetString(amountStr, 10)
if !ok {
return nil, fmt.Errorf("unable to parse amount %v", amount)
}
teleport.Address = common.HexToAddress(addressStr)
teleport.Amount = amount
teleport.Deposit.TxnHash = common.HexToHash(depTxnHashStr)
teleport.Deposit.BlockTimestamp = teleport.Deposit.BlockTimestamp.Local()
teleport.Disbursement.TxnHash = common.HexToHash(disTxnHashStr)
teleport.Disbursement.BlockTimestamp = teleport.Disbursement.BlockTimestamp.Local()
teleports = append(teleports, teleport)
}
if err := rows.Err(); err != nil {
return nil, err
}
return teleports, nil
}
package db_test
import (
"database/sql"
"fmt"
"math/big"
"testing"
"time"
"github.com/ethereum-optimism/optimism/go/teleportr/db"
"github.com/ethereum/go-ethereum/common"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
)
var (
testTimestamp = time.Unix(time.Now().Unix(), 0)
)
func newDatabase(t *testing.T) *db.Database {
dbName := uuid.NewString()
cfg := db.Config{
Host: "0.0.0.0",
Port: 5432,
User: "postgres",
Password: "password",
DBName: dbName,
}
conn, err := sql.Open("postgres", cfg.WithoutDB())
require.Nil(t, err)
_, err = conn.Exec(fmt.Sprintf("CREATE DATABASE \"%s\";", dbName))
require.Nil(t, err)
err = conn.Close()
require.Nil(t, err)
db, err := db.Open(cfg)
require.Nil(t, err)
err = db.Migrate()
require.Nil(t, err)
return db
}
// TestOpenClose asserts that we are able to open and close the database
// connection.
func TestOpenClose(t *testing.T) {
t.Parallel()
d := newDatabase(t)
err := d.Close()
require.Nil(t, err)
}
// TestUpsert empty deposits asserts that it is safe to call UpsertDeposits with
// an empty list.
func TestUpsertEmptyDeposits(t *testing.T) {
t.Parallel()
d := newDatabase(t)
defer d.Close()
err := d.UpsertDeposits(nil)
require.Nil(t, err)
err = d.UpsertDeposits([]db.Deposit{})
require.Nil(t, err)
}
// TestUpsertDepositWithZeroTimestampFails asserts that trying to insert a
// deposit with a zero-timestamp fails.
func TestUpsertDepositWithZeroTimestampFails(t *testing.T) {
t.Parallel()
d := newDatabase(t)
defer d.Close()
err := d.UpsertDeposits([]db.Deposit{{}})
require.Equal(t, db.ErrZeroTimestamp, err)
}
// TestLatestDeposit asserts that the LatestDeposit method properly returns the
// highest block number in the databse, or nil if no items are present.
func TestLatestDeposit(t *testing.T) {
t.Parallel()
d := newDatabase(t)
defer d.Close()
// Query should return nil on empty databse.
latestDeposit, err := d.LatestDeposit()
require.Nil(t, err)
require.Equal(t, (*int64)(nil), latestDeposit)
// Update table to have a single element.
expLatestDeposit := int64(1)
err = d.UpsertDeposits([]db.Deposit{{
ID: 1,
TxnHash: common.HexToHash("0xf1"),
BlockNumber: expLatestDeposit,
BlockTimestamp: testTimestamp,
Address: common.HexToAddress("0xa1"),
Amount: big.NewInt(1),
}})
require.Nil(t, err)
// Query should return block number of only deposit.
latestDeposit, err = d.LatestDeposit()
require.Nil(t, err)
require.Equal(t, &expLatestDeposit, latestDeposit)
// Update table to have two distinct block numbers.
expLatestDeposit = 2
err = d.UpsertDeposits([]db.Deposit{{
ID: 2,
TxnHash: common.HexToHash("0xf2"),
BlockNumber: expLatestDeposit,
BlockTimestamp: testTimestamp,
Address: common.HexToAddress("0xa2"),
Amount: big.NewInt(2),
}})
require.Nil(t, err)
// Query should return the highest of the two block numbers.
latestDeposit, err = d.LatestDeposit()
require.Nil(t, err)
require.Equal(t, &expLatestDeposit, latestDeposit)
}
// TestUpsertDeposits asserts that UpsertDeposits properly overwrites an
// existing entry with the same ID.
func TestUpsertDeposits(t *testing.T) {
t.Parallel()
d := newDatabase(t)
defer d.Close()
deposit1 := db.Deposit{
ID: 1,
TxnHash: common.HexToHash("0xff01"),
BlockNumber: 1,
BlockTimestamp: testTimestamp,
Address: common.HexToAddress("0xaa01"),
Amount: big.NewInt(1),
}
err := d.UpsertDeposits([]db.Deposit{deposit1})
require.Nil(t, err)
deposits, err := d.ConfirmedDeposits(1, 1)
require.Nil(t, err)
require.Equal(t, deposits, []db.Deposit{deposit1})
deposit2 := db.Deposit{
ID: 1,
TxnHash: common.HexToHash("0xff02"),
BlockNumber: 2,
BlockTimestamp: testTimestamp,
Address: common.HexToAddress("0xaa02"),
Amount: big.NewInt(2),
}
err = d.UpsertDeposits([]db.Deposit{deposit2})
require.Nil(t, err)
deposits, err = d.ConfirmedDeposits(2, 1)
require.Nil(t, err)
require.Equal(t, deposits, []db.Deposit{deposit2})
}
// TestConfirmedDeposits asserts that ConfirmedDeposits properly returns the set
// of deposits that have sufficient confirmation, but do not have a recorded
// disbursement.
func TestConfirmedDeposits(t *testing.T) {
t.Parallel()
d := newDatabase(t)
defer d.Close()
deposits, err := d.ConfirmedDeposits(1e9, 1)
require.Nil(t, err)
require.Equal(t, int(0), len(deposits))
deposit1 := db.Deposit{
ID: 1,
TxnHash: common.HexToHash("0xff01"),
BlockNumber: 1,
BlockTimestamp: testTimestamp,
Address: common.HexToAddress("0xaa01"),
Amount: big.NewInt(1),
}
deposit2 := db.Deposit{
ID: 2,
TxnHash: common.HexToHash("0xff21"),
BlockNumber: 2,
BlockTimestamp: testTimestamp,
Address: common.HexToAddress("0xaa21"),
Amount: big.NewInt(2),
}
deposit3 := db.Deposit{
ID: 3,
TxnHash: common.HexToHash("0xff22"),
BlockNumber: 2,
BlockTimestamp: testTimestamp,
Address: common.HexToAddress("0xaa22"),
Amount: big.NewInt(2),
}
err = d.UpsertDeposits([]db.Deposit{
deposit1, deposit2, deposit3,
})
require.Nil(t, err)
// First deposit only has 1 conf, should not be found using 2 confs at block
// 1.
deposits, err = d.ConfirmedDeposits(1, 2)
require.Nil(t, err)
require.Equal(t, int(0), len(deposits))
// First deposit should be returned when querying for 1 conf at block 1.
deposits, err = d.ConfirmedDeposits(1, 1)
require.Nil(t, err)
require.Equal(t, []db.Deposit{deposit1}, deposits)
// All deposits should be returned when querying for 1 conf at block 2.
deposits, err = d.ConfirmedDeposits(2, 1)
require.Nil(t, err)
require.Equal(t, []db.Deposit{deposit1, deposit2, deposit3}, deposits)
err = d.UpsertDisbursement(deposit1.ID, common.HexToHash("0xdd01"), 1, testTimestamp)
require.Nil(t, err)
deposits, err = d.ConfirmedDeposits(2, 1)
require.Nil(t, err)
require.Equal(t, []db.Deposit{deposit2, deposit3}, deposits)
}
// TestUpsertDisbursement asserts that UpsertDisbursement properly inserts new
// disbursements or overwrites existing ones.
func TestUpsertDisbursement(t *testing.T) {
t.Parallel()
d := newDatabase(t)
defer d.Close()
address := common.HexToAddress("0xaa01")
amount := big.NewInt(1)
depTxnHash := common.HexToHash("0xdd01")
depBlockNumber := int64(1)
disTxnHash := common.HexToHash("0xee02")
disBlockNumber := int64(2)
// Calling UpsertDisbursement with the zero timestamp should fail.
err := d.UpsertDisbursement(0, common.HexToHash("0xdd00"), 0, time.Time{})
require.Equal(t, db.ErrZeroTimestamp, err)
// Calling UpsertDisbursement with an unknown id should fail.
err = d.UpsertDisbursement(0, common.HexToHash("0xdd00"), 0, testTimestamp)
require.Equal(t, db.ErrUnknownDeposit, err)
// Now, insert a real deposit that we will disburse.
err = d.UpsertDeposits([]db.Deposit{
{
ID: 1,
TxnHash: depTxnHash,
BlockNumber: depBlockNumber,
BlockTimestamp: testTimestamp,
Address: address,
Amount: amount,
},
})
require.Nil(t, err)
// Mark the deposit as disbursed with some temporary info.
err = d.UpsertDisbursement(1, common.HexToHash("0xee00"), 1, testTimestamp)
require.Nil(t, err)
// Overwrite the disbursement info with the final values.
err = d.UpsertDisbursement(1, disTxnHash, disBlockNumber, testTimestamp)
require.Nil(t, err)
expTeleports := []db.CompletedTeleport{
{
ID: 1,
Address: address,
Amount: amount,
Deposit: db.ConfirmationInfo{
TxnHash: depTxnHash,
BlockNumber: depBlockNumber,
BlockTimestamp: testTimestamp,
},
Disbursement: db.ConfirmationInfo{
TxnHash: disTxnHash,
BlockNumber: disBlockNumber,
BlockTimestamp: testTimestamp,
},
},
}
// Assert that the deposit now shows up in the CompletedTeleports method
// with both the L1 and L2 confirmation info.
teleports, err := d.CompletedTeleports()
require.Nil(t, err)
require.Equal(t, expTeleports, teleports)
}
module github.com/ethereum-optimism/optimism/go/teleportr
go 1.17
require (
github.com/ethereum/go-ethereum v1.10.15
github.com/google/uuid v1.3.0
github.com/lib/pq v1.10.4
github.com/stretchr/testify v1.7.0
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 // indirect
golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment