Commit 8a07e2fc authored by Will Cory's avatar Will Cory

chore(indexer): Add entrypoint for indexer

fix: linter
parent 1b91eaf1
# indexer/cmd/indexer-refresh
Entrypoint for the new WIP indexer. After project is deployed and stable we will delete the [indexer/main.go](../indexer/main.go) file and move [indexer-refresh/main.go](./main.go) in it's place.
package main
import (
"fmt"
"os"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/urfave/cli"
"github.com/ethereum-optimism/optimism/indexer"
"github.com/ethereum-optimism/optimism/indexer/flags"
)
var (
GitVersion = ""
GitCommit = ""
GitDate = ""
)
func main() {
// Set up logger with a default INFO level in case we fail to parse flags.
// Otherwise the final crtiical log won't show what the parsing error was.
log.Root().SetHandler(
log.LvlFilterHandler(
log.LvlInfo,
log.StreamHandler(os.Stdout, log.TerminalFormat(true)),
),
)
app := cli.NewApp()
app.Flags = flags.Flags
app.Version = fmt.Sprintf("%s-%s", GitVersion, params.VersionWithCommit(GitCommit, GitDate))
app.Name = "indexer"
app.Usage = "Indexer Service"
app.Description = "Service for indexing deposits and withdrawals " +
"by account on L1 and L2"
app.Action = indexer.Main(GitVersion)
err := app.Run(os.Args)
if err != nil {
log.Crit("Application failed", "message", err)
}
}
......@@ -8,8 +8,8 @@ import (
"github.com/ethereum/go-ethereum/params"
"github.com/urfave/cli"
"github.com/ethereum-optimism/optimism/indexer"
"github.com/ethereum-optimism/optimism/indexer/flags"
"github.com/ethereum-optimism/optimism/indexer/legacy"
)
var (
......@@ -36,7 +36,7 @@ func main() {
app.Description = "Service for indexing deposits and withdrawals " +
"by account on L1 and L2"
app.Action = indexer.Main(GitVersion)
app.Action = legacy.Main(GitVersion)
err := app.Run(os.Args)
if err != nil {
log.Crit("Application failed", "message", err)
......
......@@ -2,28 +2,12 @@ package indexer
import (
"context"
"fmt"
"math/big"
"net"
"net/http"
"os"
"strconv"
"time"
"github.com/ethereum-optimism/optimism/indexer/services"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/indexer/metrics"
"github.com/ethereum-optimism/optimism/indexer/server"
"github.com/rs/cors"
database "github.com/ethereum-optimism/optimism/indexer/db"
"github.com/ethereum-optimism/optimism/indexer/services/l1"
"github.com/ethereum-optimism/optimism/indexer/services/l2"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gorilla/mux"
"github.com/urfave/cli"
)
......@@ -39,30 +23,7 @@ const (
// e.g. GitVersion, to be captured and used once the function is executed.
func Main(gitVersion string) func(ctx *cli.Context) error {
return func(ctx *cli.Context) error {
cfg, err := NewConfig(ctx)
if err != nil {
return err
}
log.Info("Initializing indexer")
indexer, err := NewIndexer(cfg)
if err != nil {
log.Error("Unable to create indexer", "error", err)
return err
}
log.Info("Starting indexer")
if err := indexer.Start(); err != nil {
return err
}
defer indexer.Stop()
log.Info("Indexer started")
<-(chan struct{})(nil)
return nil
}
}
......@@ -70,35 +31,23 @@ func Main(gitVersion string) func(ctx *cli.Context) error {
// Indexer is a service that configures the necessary resources for
// running the Sync and BlockHandler sub-services.
type Indexer struct {
ctx context.Context
cfg Config
l1Client *ethclient.Client
l2Client *ethclient.Client
l1IndexingService *l1.Service
l2IndexingService *l2.Service
airdropService *services.Airdrop
router *mux.Router
metrics *metrics.Metrics
db *database.Database
server *http.Server
}
// NewIndexer initializes the Indexer, gathering any resources
// that will be needed by the TxIndexer and StateIndexer
// sub-services.
func NewIndexer(cfg Config) (*Indexer, error) {
func NewIndexer() (*Indexer, error) {
ctx := context.Background()
var logHandler log.Handler
if cfg.LogTerminal {
logHandler = log.StreamHandler(os.Stdout, log.TerminalFormat(true))
} else {
logHandler = log.StreamHandler(os.Stdout, log.JSONFormat())
}
logLevel, err := log.LvlFromString(cfg.LogLevel)
var logHandler log.Handler = log.StreamHandler(os.Stdout, log.TerminalFormat(true))
// TODO https://linear.app/optimism/issue/DX-55/api-implement-rest-api-with-mocked-data
// do json format too
// TODO https://linear.app/optimism/issue/DX-55/api-implement-rest-api-with-mocked-data
// pass in loglevel from config
// logHandler = log.StreamHandler(os.Stdout, log.JSONFormat())
logLevel, err := log.LvlFromString("info")
if err != nil {
return nil, err
}
......@@ -107,182 +56,43 @@ func NewIndexer(cfg Config) (*Indexer, error) {
// Connect to L1 and L2 providers. Perform these last since they are the
// most expensive.
l1Client, rawl1Client, err := dialEthClientWithTimeout(ctx, cfg.L1EthRpc)
if err != nil {
return nil, err
}
l2Client, l2RPC, err := dialEthClientWithTimeout(ctx, cfg.L2EthRpc)
if err != nil {
return nil, err
}
m := metrics.NewMetrics(nil)
if cfg.MetricsServerEnable {
go func() {
_, err := m.Serve(cfg.MetricsHostname, cfg.MetricsPort)
if err != nil {
log.Error("metrics server failed to start", "err", err)
}
}()
log.Info("metrics server enabled", "host", cfg.MetricsHostname, "port", cfg.MetricsPort)
}
dsn := fmt.Sprintf("host=%s port=%d dbname=%s sslmode=disable",
cfg.DBHost, cfg.DBPort, cfg.DBName)
if cfg.DBUser != "" {
dsn += fmt.Sprintf(" user=%s", cfg.DBUser)
}
if cfg.DBPassword != "" {
dsn += fmt.Sprintf(" password=%s", cfg.DBPassword)
}
db, err := database.NewDatabase(dsn)
if err != nil {
return nil, err
}
var addrManager services.AddressManager
if cfg.Bedrock {
addrManager, err = services.NewBedrockAddresses(
l1Client,
cfg.BedrockL1StandardBridgeAddress,
cfg.BedrockOptimismPortalAddress,
)
} else {
addrManager, err = services.NewLegacyAddresses(l1Client, common.HexToAddress(cfg.L1AddressManagerAddress))
}
// TODO https://linear.app/optimism/issue/DX-55/api-implement-rest-api-with-mocked-data
// pass in rpc url from config
l1Client, _, err := dialEthClientWithTimeout(ctx, "http://localhost:8545")
if err != nil {
return nil, err
}
l1IndexingService, err := l1.NewService(l1.ServiceConfig{
Context: ctx,
Metrics: m,
L1Client: l1Client,
RawL1Client: rawl1Client,
ChainID: new(big.Int).SetUint64(cfg.ChainID),
AddressManager: addrManager,
DB: db,
ConfDepth: cfg.L1ConfDepth,
MaxHeaderBatchSize: cfg.MaxHeaderBatchSize,
StartBlockNumber: cfg.L1StartBlockNumber,
Bedrock: cfg.Bedrock,
})
// TODO https://linear.app/optimism/issue/DX-55/api-implement-rest-api-with-mocked-data
// pass in rpc url from config
l2Client, _, err := dialEthClientWithTimeout(ctx, "http://localhost:9545")
if err != nil {
return nil, err
}
l2IndexingService, err := l2.NewService(l2.ServiceConfig{
Context: ctx,
Metrics: m,
L2RPC: l2RPC,
L2Client: l2Client,
DB: db,
ConfDepth: cfg.L2ConfDepth,
MaxHeaderBatchSize: cfg.MaxHeaderBatchSize,
StartBlockNumber: uint64(0),
Bedrock: cfg.Bedrock,
})
if err != nil {
return nil, err
}
return &Indexer{
ctx: ctx,
cfg: cfg,
l1Client: l1Client,
l2Client: l2Client,
l1IndexingService: l1IndexingService,
l2IndexingService: l2IndexingService,
airdropService: services.NewAirdrop(db, m),
router: mux.NewRouter(),
metrics: m,
db: db,
l1Client: l1Client,
l2Client: l2Client,
}, nil
}
// Serve spins up a REST API server at the given hostname and port.
func (b *Indexer) Serve() error {
c := cors.New(cors.Options{
AllowedOrigins: []string{"*"},
})
b.router.HandleFunc("/v1/l1/status", b.l1IndexingService.GetIndexerStatus).Methods("GET")
b.router.HandleFunc("/v1/l2/status", b.l2IndexingService.GetIndexerStatus).Methods("GET")
b.router.HandleFunc("/v1/deposits/0x{address:[a-fA-F0-9]{40}}", b.l1IndexingService.GetDeposits).Methods("GET")
b.router.HandleFunc("/v1/withdrawal/0x{hash:[a-fA-F0-9]{64}}", b.l2IndexingService.GetWithdrawalBatch).Methods("GET")
b.router.HandleFunc("/v1/withdrawals/0x{address:[a-fA-F0-9]{40}}", b.l2IndexingService.GetWithdrawals).Methods("GET")
b.router.HandleFunc("/v1/airdrops/0x{address:[a-fA-F0-9]{40}}", b.airdropService.GetAirdrop)
b.router.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
_, err := w.Write([]byte("OK"))
if err != nil {
log.Error("Error handling /healthz", "error", err)
}
})
middleware := server.LoggingMiddleware(b.metrics, log.New("service", "server"))
port := strconv.FormatUint(b.cfg.RESTPort, 10)
addr := net.JoinHostPort(b.cfg.RESTHostname, port)
b.server = &http.Server{
Addr: addr,
Handler: middleware(c.Handler(b.router)),
}
errCh := make(chan error, 1)
go func() {
errCh <- b.server.ListenAndServe()
}()
// Capture server startup errors
<-time.After(10 * time.Millisecond)
select {
case err := <-errCh:
return err
default:
log.Info("indexer REST server listening on", "addr", addr)
return nil
}
return nil
}
// Start starts the starts the indexing service on L1 and L2 chains and also
// starts the REST server.
func (b *Indexer) Start() error {
if b.cfg.DisableIndexer {
log.Info("indexer disabled, only serving data")
} else {
err := b.l1IndexingService.Start()
if err != nil {
return err
}
err = b.l2IndexingService.Start()
if err != nil {
return err
}
}
return b.Serve()
return nil
}
// Stop stops the indexing service on L1 and L2 chains.
func (b *Indexer) Stop() {
b.db.Close()
if b.server != nil {
// background context here so it waits for
// conns to close
_ = b.server.Shutdown(context.Background())
}
if !b.cfg.DisableIndexer {
b.l1IndexingService.Stop()
b.l2IndexingService.Stop()
}
}
// dialL1EthClientWithTimeout attempts to dial the L1 provider using the
......
......@@ -20,8 +20,8 @@ import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/require"
"github.com/ethereum-optimism/optimism/indexer"
"github.com/ethereum-optimism/optimism/indexer/db"
"github.com/ethereum-optimism/optimism/indexer/legacy"
"github.com/ethereum-optimism/optimism/indexer/services/l1"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-bindings/predeploys"
......@@ -60,7 +60,7 @@ func TestBedrockIndexer(t *testing.T) {
l2Opts, err := bind.NewKeyedTransactorWithChainID(cfg.Secrets.Alice, cfg.L2ChainIDBig())
require.NoError(t, err)
idxrCfg := indexer.Config{
idxrCfg := legacy.Config{
ChainID: cfg.DeployConfig.L1ChainID,
L1EthRpc: sys.Nodes["l1"].HTTPEndpoint(),
L2EthRpc: sys.Nodes["sequencer"].HTTPEndpoint(),
......@@ -83,7 +83,7 @@ func TestBedrockIndexer(t *testing.T) {
BedrockL1StandardBridgeAddress: cfg.DeployConfig.L1StandardBridgeProxy,
BedrockOptimismPortalAddress: cfg.DeployConfig.OptimismPortalProxy,
}
idxr, err := indexer.NewIndexer(idxrCfg)
idxr, err := legacy.NewIndexer(idxrCfg)
require.NoError(t, err)
errCh := make(chan error, 1)
......
package indexer
package legacy
import (
"errors"
......
package indexer_test
package legacy_test
import (
"fmt"
"testing"
indexer "github.com/ethereum-optimism/optimism/indexer"
legacy "github.com/ethereum-optimism/optimism/indexer/legacy"
"github.com/stretchr/testify/require"
)
var validateConfigTests = []struct {
name string
cfg indexer.Config
cfg legacy.Config
expErr error
}{
{
name: "bad log level",
cfg: indexer.Config{
cfg: legacy.Config{
LogLevel: "unknown",
},
expErr: fmt.Errorf("unknown level: unknown"),
......@@ -27,7 +27,7 @@ var validateConfigTests = []struct {
func TestValidateConfig(t *testing.T) {
for _, test := range validateConfigTests {
t.Run(test.name, func(t *testing.T) {
err := indexer.ValidateConfig(&test.cfg)
err := legacy.ValidateConfig(&test.cfg)
require.Equal(t, err, test.expErr)
})
}
......
package legacy
import (
"context"
"fmt"
"math/big"
"net"
"net/http"
"os"
"strconv"
"time"
"github.com/ethereum-optimism/optimism/indexer/services"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum-optimism/optimism/indexer/metrics"
"github.com/ethereum-optimism/optimism/indexer/server"
"github.com/rs/cors"
database "github.com/ethereum-optimism/optimism/indexer/db"
"github.com/ethereum-optimism/optimism/indexer/services/l1"
"github.com/ethereum-optimism/optimism/indexer/services/l2"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gorilla/mux"
"github.com/urfave/cli"
)
const (
// defaultDialTimeout is default duration the service will wait on
// startup to make a connection to either the L1 or L2 backends.
defaultDialTimeout = 5 * time.Second
)
// Main is the entrypoint into the indexer service. This method returns
// a closure that executes the service and blocks until the service exits. The
// use of a closure allows the parameters bound to the top-level main package,
// e.g. GitVersion, to be captured and used once the function is executed.
func Main(gitVersion string) func(ctx *cli.Context) error {
return func(ctx *cli.Context) error {
cfg, err := NewConfig(ctx)
if err != nil {
return err
}
log.Info("Initializing indexer")
indexer, err := NewIndexer(cfg)
if err != nil {
log.Error("Unable to create indexer", "error", err)
return err
}
log.Info("Starting indexer")
if err := indexer.Start(); err != nil {
return err
}
defer indexer.Stop()
log.Info("Indexer started")
<-(chan struct{})(nil)
return nil
}
}
// Indexer is a service that configures the necessary resources for
// running the Sync and BlockHandler sub-services.
type Indexer struct {
ctx context.Context
cfg Config
l1Client *ethclient.Client
l2Client *ethclient.Client
l1IndexingService *l1.Service
l2IndexingService *l2.Service
airdropService *services.Airdrop
router *mux.Router
metrics *metrics.Metrics
db *database.Database
server *http.Server
}
// NewIndexer initializes the Indexer, gathering any resources
// that will be needed by the TxIndexer and StateIndexer
// sub-services.
func NewIndexer(cfg Config) (*Indexer, error) {
ctx := context.Background()
var logHandler log.Handler
if cfg.LogTerminal {
logHandler = log.StreamHandler(os.Stdout, log.TerminalFormat(true))
} else {
logHandler = log.StreamHandler(os.Stdout, log.JSONFormat())
}
logLevel, err := log.LvlFromString(cfg.LogLevel)
if err != nil {
return nil, err
}
log.Root().SetHandler(log.LvlFilterHandler(logLevel, logHandler))
// Connect to L1 and L2 providers. Perform these last since they are the
// most expensive.
l1Client, rawl1Client, err := dialEthClientWithTimeout(ctx, cfg.L1EthRpc)
if err != nil {
return nil, err
}
l2Client, l2RPC, err := dialEthClientWithTimeout(ctx, cfg.L2EthRpc)
if err != nil {
return nil, err
}
m := metrics.NewMetrics(nil)
if cfg.MetricsServerEnable {
go func() {
_, err := m.Serve(cfg.MetricsHostname, cfg.MetricsPort)
if err != nil {
log.Error("metrics server failed to start", "err", err)
}
}()
log.Info("metrics server enabled", "host", cfg.MetricsHostname, "port", cfg.MetricsPort)
}
dsn := fmt.Sprintf("host=%s port=%d dbname=%s sslmode=disable",
cfg.DBHost, cfg.DBPort, cfg.DBName)
if cfg.DBUser != "" {
dsn += fmt.Sprintf(" user=%s", cfg.DBUser)
}
if cfg.DBPassword != "" {
dsn += fmt.Sprintf(" password=%s", cfg.DBPassword)
}
db, err := database.NewDatabase(dsn)
if err != nil {
return nil, err
}
var addrManager services.AddressManager
if cfg.Bedrock {
addrManager, err = services.NewBedrockAddresses(
l1Client,
cfg.BedrockL1StandardBridgeAddress,
cfg.BedrockOptimismPortalAddress,
)
} else {
addrManager, err = services.NewLegacyAddresses(l1Client, common.HexToAddress(cfg.L1AddressManagerAddress))
}
if err != nil {
return nil, err
}
l1IndexingService, err := l1.NewService(l1.ServiceConfig{
Context: ctx,
Metrics: m,
L1Client: l1Client,
RawL1Client: rawl1Client,
ChainID: new(big.Int).SetUint64(cfg.ChainID),
AddressManager: addrManager,
DB: db,
ConfDepth: cfg.L1ConfDepth,
MaxHeaderBatchSize: cfg.MaxHeaderBatchSize,
StartBlockNumber: cfg.L1StartBlockNumber,
Bedrock: cfg.Bedrock,
})
if err != nil {
return nil, err
}
l2IndexingService, err := l2.NewService(l2.ServiceConfig{
Context: ctx,
Metrics: m,
L2RPC: l2RPC,
L2Client: l2Client,
DB: db,
ConfDepth: cfg.L2ConfDepth,
MaxHeaderBatchSize: cfg.MaxHeaderBatchSize,
StartBlockNumber: uint64(0),
Bedrock: cfg.Bedrock,
})
if err != nil {
return nil, err
}
return &Indexer{
ctx: ctx,
cfg: cfg,
l1Client: l1Client,
l2Client: l2Client,
l1IndexingService: l1IndexingService,
l2IndexingService: l2IndexingService,
airdropService: services.NewAirdrop(db, m),
router: mux.NewRouter(),
metrics: m,
db: db,
}, nil
}
// Serve spins up a REST API server at the given hostname and port.
func (b *Indexer) Serve() error {
c := cors.New(cors.Options{
AllowedOrigins: []string{"*"},
})
b.router.HandleFunc("/v1/l1/status", b.l1IndexingService.GetIndexerStatus).Methods("GET")
b.router.HandleFunc("/v1/l2/status", b.l2IndexingService.GetIndexerStatus).Methods("GET")
b.router.HandleFunc("/v1/deposits/0x{address:[a-fA-F0-9]{40}}", b.l1IndexingService.GetDeposits).Methods("GET")
b.router.HandleFunc("/v1/withdrawal/0x{hash:[a-fA-F0-9]{64}}", b.l2IndexingService.GetWithdrawalBatch).Methods("GET")
b.router.HandleFunc("/v1/withdrawals/0x{address:[a-fA-F0-9]{40}}", b.l2IndexingService.GetWithdrawals).Methods("GET")
b.router.HandleFunc("/v1/airdrops/0x{address:[a-fA-F0-9]{40}}", b.airdropService.GetAirdrop)
b.router.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
_, err := w.Write([]byte("OK"))
if err != nil {
log.Error("Error handling /healthz", "error", err)
}
})
middleware := server.LoggingMiddleware(b.metrics, log.New("service", "server"))
port := strconv.FormatUint(b.cfg.RESTPort, 10)
addr := net.JoinHostPort(b.cfg.RESTHostname, port)
b.server = &http.Server{
Addr: addr,
Handler: middleware(c.Handler(b.router)),
}
errCh := make(chan error, 1)
go func() {
errCh <- b.server.ListenAndServe()
}()
// Capture server startup errors
<-time.After(10 * time.Millisecond)
select {
case err := <-errCh:
return err
default:
log.Info("indexer REST server listening on", "addr", addr)
return nil
}
}
// Start starts the starts the indexing service on L1 and L2 chains and also
// starts the REST server.
func (b *Indexer) Start() error {
if b.cfg.DisableIndexer {
log.Info("indexer disabled, only serving data")
} else {
err := b.l1IndexingService.Start()
if err != nil {
return err
}
err = b.l2IndexingService.Start()
if err != nil {
return err
}
}
return b.Serve()
}
// Stop stops the indexing service on L1 and L2 chains.
func (b *Indexer) Stop() {
b.db.Close()
if b.server != nil {
// background context here so it waits for
// conns to close
_ = b.server.Shutdown(context.Background())
}
if !b.cfg.DisableIndexer {
b.l1IndexingService.Stop()
b.l2IndexingService.Stop()
}
}
// dialL1EthClientWithTimeout attempts to dial the L1 provider using the
// provided URL. If the dial doesn't complete within defaultDialTimeout seconds,
// this method will return an error.
func dialEthClientWithTimeout(ctx context.Context, url string) (
*ethclient.Client, *rpc.Client, error) {
ctxt, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
c, err := rpc.DialContext(ctxt, url)
if err != nil {
return nil, nil, err
}
return ethclient.NewClient(c), c, nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment