Commit 79be3e80 authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

indexer: Add airdrop API (#2498)

parent ee57b82b
---
'@eth-optimism/indexer': minor
---
Add airdrops API
package db
type Airdrop struct {
Address string `json:"address"`
VoterAmount string `json:"voterAmount"`
MultisigSignerAmount string `json:"multisigSignerAmount"`
GitcoinAmount string `json:"gitcoinAmount"`
ActiveBridgedAmount string `json:"activeBridgedAmount"`
OpUserAmount string `json:"opUserAmount"`
OpRepeatUserAmount string `json:"opRepeatUserAmount"`
BonusAmount string `json:"bonusAmount"`
TotalAmount string `json:"totalAmount"`
}
...@@ -3,6 +3,8 @@ package db ...@@ -3,6 +3,8 @@ package db
import ( import (
"database/sql" "database/sql"
"errors" "errors"
"fmt"
"strings"
l2common "github.com/ethereum-optimism/optimism/l2geth/common" l2common "github.com/ethereum-optimism/optimism/l2geth/common"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -17,6 +19,31 @@ type Database struct { ...@@ -17,6 +19,31 @@ type Database struct {
config string config string
} }
// NewDatabase returns the database for the given connection string.
func NewDatabase(config string) (*Database, error) {
db, err := sql.Open("postgres", config)
if err != nil {
return nil, err
}
err = db.Ping()
if err != nil {
return nil, err
}
for _, migration := range schema {
_, err = db.Exec(migration)
if err != nil {
return nil, err
}
}
return &Database{
db: db,
config: config,
}, nil
}
// Close closes the database. // Close closes the database.
// NOTE: "It is rarely necessary to close a DB." // NOTE: "It is rarely necessary to close a DB."
// See: https://pkg.go.dev/database/sql#Open // See: https://pkg.go.dev/database/sql#Open
...@@ -633,27 +660,38 @@ func (d *Database) GetIndexedL1BlockByHash(hash common.Hash) (*IndexedL1Block, e ...@@ -633,27 +660,38 @@ func (d *Database) GetIndexedL1BlockByHash(hash common.Hash) (*IndexedL1Block, e
return block, nil return block, nil
} }
// NewDatabase returns the database for the given connection string. const getAirdropQuery = `
func NewDatabase(config string) (*Database, error) { SELECT
db, err := sql.Open("postgres", config) address, voter_amount, multisig_signer_amount, gitcoin_amount,
if err != nil { active_bridged_amount, op_user_amount, op_repeat_user_amount,
return nil, err bonus_amount, total_amount
} FROM airdrops
WHERE address = $1
err = db.Ping() `
if err != nil {
return nil, err func (d *Database) GetAirdrop(address common.Address) (*Airdrop, error) {
row := d.db.QueryRow(getAirdropQuery, strings.ToLower(address.String()))
if row.Err() != nil {
return nil, fmt.Errorf("error getting airdrop: %v", row.Err())
}
airdrop := new(Airdrop)
err := row.Scan(
&airdrop.Address,
&airdrop.VoterAmount,
&airdrop.MultisigSignerAmount,
&airdrop.GitcoinAmount,
&airdrop.ActiveBridgedAmount,
&airdrop.OpUserAmount,
&airdrop.OpRepeatUserAmount,
&airdrop.BonusAmount,
&airdrop.TotalAmount,
)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
} }
for _, migration := range schema {
_, err = db.Exec(migration)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("error scanning airdrop: %v", err)
} }
} return airdrop, nil
return &Database{
db: db,
config: config,
}, nil
} }
...@@ -107,6 +107,21 @@ CREATE UNIQUE INDEX IF NOT EXISTS l1_blocks_number ON l1_blocks(number); ...@@ -107,6 +107,21 @@ CREATE UNIQUE INDEX IF NOT EXISTS l1_blocks_number ON l1_blocks(number);
CREATE UNIQUE INDEX IF NOT EXISTS l2_blocks_number ON l2_blocks(number); CREATE UNIQUE INDEX IF NOT EXISTS l2_blocks_number ON l2_blocks(number);
` `
const createAirdropsTable = `
CREATE TABLE IF NOT EXISTS airdrops (
address VARCHAR(42) PRIMARY KEY,
voter_amount VARCHAR NOT NULL DEFAULT '0' CHECK(voter_amount ~ '^\d+$') ,
multisig_signer_amount VARCHAR NOT NULL DEFAULT '0' CHECK(multisig_signer_amount ~ '^\d+$'),
gitcoin_amount VARCHAR NOT NULL DEFAULT '0' CHECK(gitcoin_amount ~ '^\d+$'),
active_bridged_amount VARCHAR NOT NULL DEFAULT '0' CHECK(active_bridged_amount ~ '^\d+$'),
op_user_amount VARCHAR NOT NULL DEFAULT '0' CHECK(op_user_amount ~ '^\d+$'),
op_repeat_user_amount VARCHAR NOT NULL DEFAULT '0' CHECK(op_user_amount ~ '^\d+$'),
op_og_amount VARCHAR NOT NULL DEFAULT '0' CHECK(op_og_amount ~ '^\d+$'),
bonus_amount VARCHAR NOT NULL DEFAULT '0' CHECK(bonus_amount ~ '^\d+$'),
total_amount VARCHAR NOT NULL CHECK(voter_amount ~ '^\d+$')
)
`
var schema = []string{ var schema = []string{
createL1BlocksTable, createL1BlocksTable,
createL2BlocksTable, createL2BlocksTable,
...@@ -118,4 +133,5 @@ var schema = []string{ ...@@ -118,4 +133,5 @@ var schema = []string{
createDepositsTable, createDepositsTable,
createWithdrawalsTable, createWithdrawalsTable,
createL1L2NumberIndex, createL1L2NumberIndex,
createAirdropsTable,
} }
...@@ -9,6 +9,8 @@ import ( ...@@ -9,6 +9,8 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/ethereum-optimism/optimism/go/indexer/services"
l2rpc "github.com/ethereum-optimism/optimism/l2geth/rpc" l2rpc "github.com/ethereum-optimism/optimism/l2geth/rpc"
"github.com/ethereum-optimism/optimism/go/indexer/metrics" "github.com/ethereum-optimism/optimism/go/indexer/metrics"
...@@ -83,8 +85,10 @@ type Indexer struct { ...@@ -83,8 +85,10 @@ type Indexer struct {
l1IndexingService *l1.Service l1IndexingService *l1.Service
l2IndexingService *l2.Service l2IndexingService *l2.Service
airdropService *services.Airdrop
router *mux.Router router *mux.Router
metrics *metrics.Metrics
} }
// NewIndexer initializes the Indexer, gathering any resources // NewIndexer initializes the Indexer, gathering any resources
...@@ -201,7 +205,9 @@ func NewIndexer(cfg Config, gitVersion string) (*Indexer, error) { ...@@ -201,7 +205,9 @@ func NewIndexer(cfg Config, gitVersion string) (*Indexer, error) {
l2Client: l2Client, l2Client: l2Client,
l1IndexingService: l1IndexingService, l1IndexingService: l1IndexingService,
l2IndexingService: l2IndexingService, l2IndexingService: l2IndexingService,
airdropService: services.NewAirdrop(db, m),
router: mux.NewRouter(), router: mux.NewRouter(),
metrics: m,
}, nil }, nil
} }
...@@ -216,6 +222,7 @@ func (b *Indexer) Serve() error { ...@@ -216,6 +222,7 @@ func (b *Indexer) Serve() error {
b.router.HandleFunc("/v1/deposits/0x{address:[a-fA-F0-9]{40}}", b.l1IndexingService.GetDeposits).Methods("GET") b.router.HandleFunc("/v1/deposits/0x{address:[a-fA-F0-9]{40}}", b.l1IndexingService.GetDeposits).Methods("GET")
b.router.HandleFunc("/v1/withdrawal/0x{hash:[a-fA-F0-9]{64}}", b.l2IndexingService.GetWithdrawalBatch).Methods("GET") b.router.HandleFunc("/v1/withdrawal/0x{hash:[a-fA-F0-9]{64}}", b.l2IndexingService.GetWithdrawalBatch).Methods("GET")
b.router.HandleFunc("/v1/withdrawals/0x{address:[a-fA-F0-9]{40}}", b.l2IndexingService.GetWithdrawals).Methods("GET") b.router.HandleFunc("/v1/withdrawals/0x{address:[a-fA-F0-9]{40}}", b.l2IndexingService.GetWithdrawals).Methods("GET")
b.router.HandleFunc("/v1/airdrops/0x{address:[a-fA-F0-9]{40}}", b.airdropService.GetAirdrop)
b.router.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { b.router.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200) w.WriteHeader(200)
_, err := w.Write([]byte("OK")) _, err := w.Write([]byte("OK"))
...@@ -224,7 +231,7 @@ func (b *Indexer) Serve() error { ...@@ -224,7 +231,7 @@ func (b *Indexer) Serve() error {
} }
}) })
middleware := server.LoggingMiddleware(log.New("service", "server")) middleware := server.LoggingMiddleware(b.metrics, log.New("service", "server"))
port := strconv.FormatUint(b.cfg.RESTPort, 10) port := strconv.FormatUint(b.cfg.RESTPort, 10)
addr := fmt.Sprintf("%s:%s", b.cfg.RESTHostname, port) addr := fmt.Sprintf("%s:%s", b.cfg.RESTHostname, port)
......
...@@ -3,6 +3,8 @@ package metrics ...@@ -3,6 +3,8 @@ package metrics
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"strconv"
"time"
l2common "github.com/ethereum-optimism/optimism/l2geth/common" l2common "github.com/ethereum-optimism/optimism/l2geth/common"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -32,6 +34,12 @@ type Metrics struct { ...@@ -32,6 +34,12 @@ type Metrics struct {
CachedTokensCount *prometheus.CounterVec CachedTokensCount *prometheus.CounterVec
HTTPRequestsCount prometheus.Counter
HTTPResponsesCount *prometheus.CounterVec
HTTPRequestDurationSecs prometheus.Summary
tokenAddrs map[string]string tokenAddrs map[string]string
} }
...@@ -110,6 +118,27 @@ func NewMetrics(monitoredTokens map[string]string) *Metrics { ...@@ -110,6 +118,27 @@ func NewMetrics(monitoredTokens map[string]string) *Metrics {
"chain", "chain",
}), }),
HTTPRequestsCount: promauto.NewCounter(prometheus.CounterOpts{
Name: "http_requests_count",
Help: "How many HTTP requests this instance has seen",
Namespace: metricsNamespace,
}),
HTTPResponsesCount: promauto.NewCounterVec(prometheus.CounterOpts{
Name: "http_responses_count",
Help: "How many HTTP responses this instance has served",
Namespace: metricsNamespace,
}, []string{
"status_code",
}),
HTTPRequestDurationSecs: promauto.NewSummary(prometheus.SummaryOpts{
Name: "http_request_duration_secs",
Help: "How long each HTTP request took",
Namespace: metricsNamespace,
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
}),
tokenAddrs: mts, tokenAddrs: mts,
} }
} }
...@@ -176,6 +205,15 @@ func (m *Metrics) IncL2CachedTokensCount() { ...@@ -176,6 +205,15 @@ func (m *Metrics) IncL2CachedTokensCount() {
m.CachedTokensCount.WithLabelValues("l2").Inc() m.CachedTokensCount.WithLabelValues("l2").Inc()
} }
func (m *Metrics) RecordHTTPRequest() {
m.HTTPRequestsCount.Inc()
}
func (m *Metrics) RecordHTTPResponse(code int, dur time.Duration) {
m.HTTPResponsesCount.WithLabelValues(strconv.Itoa(code)).Inc()
m.HTTPRequestDurationSecs.Observe(float64(dur) / float64(time.Second))
}
func (m *Metrics) Serve(hostname string, port uint64) (*http.Server, error) { func (m *Metrics) Serve(hostname string, port uint64) (*http.Server, error) {
mux := http.NewServeMux() mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler()) mux.Handle("/metrics", promhttp.Handler())
......
...@@ -6,6 +6,8 @@ import ( ...@@ -6,6 +6,8 @@ import (
"runtime/debug" "runtime/debug"
"time" "time"
"github.com/ethereum-optimism/optimism/go/indexer/metrics"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -50,7 +52,7 @@ func (rw *responseWriter) WriteHeader(code int) { ...@@ -50,7 +52,7 @@ func (rw *responseWriter) WriteHeader(code int) {
} }
// LoggingMiddleware logs the incoming HTTP request & its duration. // LoggingMiddleware logs the incoming HTTP request & its duration.
func LoggingMiddleware(logger log.Logger) func(http.Handler) http.Handler { func LoggingMiddleware(metrics *metrics.Metrics, logger log.Logger) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler { return func(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) { fn := func(w http.ResponseWriter, r *http.Request) {
defer func() { defer func() {
...@@ -64,16 +66,19 @@ func LoggingMiddleware(logger log.Logger) func(http.Handler) http.Handler { ...@@ -64,16 +66,19 @@ func LoggingMiddleware(logger log.Logger) func(http.Handler) http.Handler {
} }
}() }()
metrics.RecordHTTPRequest()
start := time.Now() start := time.Now()
wrapped := wrapResponseWriter(w) wrapped := wrapResponseWriter(w)
next.ServeHTTP(wrapped, r) next.ServeHTTP(wrapped, r)
dur := time.Since(start)
logger.Info( logger.Info(
"served request", "served request",
"status", wrapped.status, "status", wrapped.status,
"method", r.Method, "method", r.Method,
"path", r.URL.EscapedPath(), "path", r.URL.EscapedPath(),
"duration", time.Since(start), "duration", dur,
) )
metrics.RecordHTTPResponse(wrapped.status, dur)
} }
return http.HandlerFunc(fn) return http.HandlerFunc(fn)
......
package services
import (
"net/http"
"github.com/ethereum-optimism/optimism/go/indexer/db"
"github.com/ethereum-optimism/optimism/go/indexer/metrics"
"github.com/ethereum-optimism/optimism/go/indexer/server"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/gorilla/mux"
)
var airdropLogger = log.New("service", "airdrop")
type Airdrop struct {
db *db.Database
metrics *metrics.Metrics
}
func NewAirdrop(db *db.Database, metrics *metrics.Metrics) *Airdrop {
return &Airdrop{
db: db,
metrics: metrics,
}
}
func (a *Airdrop) GetAirdrop(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
address := vars["address"]
airdrop, err := a.db.GetAirdrop(common.HexToAddress(address))
if err != nil {
airdropLogger.Error("db error getting airdrop", "err", err)
server.RespondWithError(w, http.StatusInternalServerError, "database error")
return
}
if airdrop == nil {
server.RespondWithError(w, http.StatusNotFound, "airdrop not found")
return
}
server.RespondWithJSON(w, http.StatusOK, airdrop)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment