Commit 6680e2c4 authored by Hamdi Allam's avatar Hamdi Allam

update command to kick off l1/l2 processors. only include needed flags for now...

update command to kick off l1/l2 processors. only include needed flags for now (assumes local postgres host/port)
parent 010f4fc8
...@@ -29,7 +29,7 @@ func main() { ...@@ -29,7 +29,7 @@ func main() {
) )
app := cli.NewApp() app := cli.NewApp()
app.Flags = flags.Flags app.Flags = []cli.Flag{flags.LogLevelFlag, flags.L1EthRPCFlag, flags.L2EthRPCFlag, flags.DBNameFlag}
app.Version = fmt.Sprintf("%s-%s", GitVersion, params.VersionWithCommit(GitCommit, GitDate)) app.Version = fmt.Sprintf("%s-%s", GitVersion, params.VersionWithCommit(GitCommit, GitDate))
app.Name = "indexer" app.Name = "indexer"
app.Usage = "Indexer Service" app.Usage = "Indexer Service"
......
package indexer package indexer
import ( import (
"context" "fmt"
"os" "os"
"time" "time"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/flags"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/indexer/processor"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/urfave/cli" "github.com/urfave/cli"
) )
...@@ -23,7 +27,23 @@ const ( ...@@ -23,7 +27,23 @@ const (
// e.g. GitVersion, to be captured and used once the function is executed. // e.g. GitVersion, to be captured and used once the function is executed.
func Main(gitVersion string) func(ctx *cli.Context) error { func Main(gitVersion string) func(ctx *cli.Context) error {
return func(ctx *cli.Context) error { return func(ctx *cli.Context) error {
log.Info("Initializing indexer") log.Info("initializing indexer")
indexer, err := NewIndexer(ctx)
if err != nil {
log.Error("unable to initialize indexer", "err", err)
return err
}
log.Info("starting indexer")
if err := indexer.Start(); err != nil {
log.Error("unable to start indexer", "err", err)
}
defer indexer.Stop()
log.Info("indexer started")
// Never terminate
<-(chan struct{})(nil)
return nil return nil
} }
} }
...@@ -31,53 +51,56 @@ func Main(gitVersion string) func(ctx *cli.Context) error { ...@@ -31,53 +51,56 @@ func Main(gitVersion string) func(ctx *cli.Context) error {
// Indexer is a service that configures the necessary resources for // Indexer is a service that configures the necessary resources for
// running the Sync and BlockHandler sub-services. // running the Sync and BlockHandler sub-services.
type Indexer struct { type Indexer struct {
l1Client *ethclient.Client db *database.DB
l2Client *ethclient.Client
l1Processor *processor.L1Processor
l2Processor *processor.L2Processor
} }
// NewIndexer initializes the Indexer, gathering any resources // NewIndexer initializes the Indexer, gathering any resources
// that will be needed by the TxIndexer and StateIndexer // that will be needed by the TxIndexer and StateIndexer
// sub-services. // sub-services.
func NewIndexer() (*Indexer, error) { func NewIndexer(ctx *cli.Context) (*Indexer, error) {
ctx := context.Background()
var logHandler log.Handler = log.StreamHandler(os.Stdout, log.TerminalFormat(true))
// TODO https://linear.app/optimism/issue/DX-55/api-implement-rest-api-with-mocked-data // TODO https://linear.app/optimism/issue/DX-55/api-implement-rest-api-with-mocked-data
// do json format too // do json format too
// TODO https://linear.app/optimism/issue/DX-55/api-implement-rest-api-with-mocked-data // TODO https://linear.app/optimism/issue/DX-55/api-implement-rest-api-with-mocked-data
// pass in loglevel from config
// logHandler = log.StreamHandler(os.Stdout, log.JSONFormat()) // defaults to debug unless explicitly set
logLevel, err := log.LvlFromString("info") logLevel, err := log.LvlFromString(ctx.GlobalString(flags.LogLevelFlag.Name))
if err != nil { if err != nil {
return nil, err return nil, err
} }
logHandler := log.StreamHandler(os.Stdout, log.TerminalFormat(true))
log.Root().SetHandler(log.LvlFilterHandler(logLevel, logHandler)) log.Root().SetHandler(log.LvlFilterHandler(logLevel, logHandler))
// Connect to L1 and L2 providers. Perform these last since they are the dsn := fmt.Sprintf("database=%s", ctx.GlobalString(flags.DBNameFlag.Name))
// most expensive. db, err := database.NewDB(dsn)
// TODO https://linear.app/optimism/issue/DX-55/api-implement-rest-api-with-mocked-data
// pass in rpc url from config
l1Client, _, err := dialEthClientWithTimeout(ctx, "http://localhost:8545")
if err != nil { if err != nil {
return nil, err return nil, err
} }
// TODO https://linear.app/optimism/issue/DX-55/api-implement-rest-api-with-mocked-data // L1 Processor
// pass in rpc url from config l1EthClient, err := node.NewEthClient(ctx.GlobalString(flags.L1EthRPCFlag.Name))
l2Client, _, err := dialEthClientWithTimeout(ctx, "http://localhost:9545") if err != nil {
return nil, err
}
l1Processor, err := processor.NewL1Processor(l1EthClient, db)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// L2Processor
l2EthClient, err := node.NewEthClient(ctx.GlobalString(flags.L2EthRPCFlag.Name))
if err != nil {
return nil, err
}
l2Processor, err := processor.NewL2Processor(l2EthClient, db)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Indexer{ return &Indexer{db, l1Processor, l2Processor}, nil
l1Client: l1Client,
l2Client: l2Client,
}, nil
} }
// Serve spins up a REST API server at the given hostname and port. // Serve spins up a REST API server at the given hostname and port.
...@@ -88,25 +111,11 @@ func (b *Indexer) Serve() error { ...@@ -88,25 +111,11 @@ func (b *Indexer) Serve() error {
// Start starts the starts the indexing service on L1 and L2 chains and also // Start starts the starts the indexing service on L1 and L2 chains and also
// starts the REST server. // starts the REST server.
func (b *Indexer) Start() error { func (b *Indexer) Start() error {
go b.l1Processor.Start()
go b.l2Processor.Start()
return nil return nil
} }
// Stop stops the indexing service on L1 and L2 chains. // Stop stops the indexing service on L1 and L2 chains.
func (b *Indexer) Stop() { func (b *Indexer) Stop() {
} }
// dialL1EthClientWithTimeout attempts to dial the L1 provider using the
// provided URL. If the dial doesn't complete within defaultDialTimeout seconds,
// this method will return an error.
func dialEthClientWithTimeout(ctx context.Context, url string) (
*ethclient.Client, *rpc.Client, error) {
ctxt, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
c, err := rpc.DialContext(ctxt, url)
if err != nil {
return nil, nil, err
}
return ethclient.NewClient(c), c, nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment