Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
ebe2f2ff
Unverified
Commit
ebe2f2ff
authored
Oct 31, 2023
by
protolambda
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
indexer: refactor service lifecycle to start/stop resoures more cleanly
parent
96a24cc3
Changes
13
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
13 changed files
with
779 additions
and
465 deletions
+779
-465
api.go
indexer/api/api.go
+110
-98
api_test.go
indexer/api/api_test.go
+28
-8
config.go
indexer/api/config.go
+61
-0
cli.go
indexer/cmd/indexer/cli.go
+19
-30
setup.go
indexer/e2e_tests/setup.go
+30
-42
etl.go
indexer/etl/etl.go
+55
-44
l1_etl.go
indexer/etl/l1_etl.go
+119
-68
l2_etl.go
indexer/etl/l2_etl.go
+101
-51
indexer.go
indexer/indexer.go
+180
-101
client.go
indexer/node/client.go
+12
-4
client_test.go
indexer/node/client_test.go
+4
-3
mocks.go
indexer/node/mocks.go
+3
-0
bridge.go
indexer/processors/bridge.go
+57
-16
No files found.
indexer/api/api.go
View file @
ebe2f2ff
...
...
@@ -6,33 +6,24 @@ import (
"fmt"
"net"
"net/http"
"runtime/debug"
"strconv"
"sync"
"sync/atomic"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/prometheus/client_golang/prometheus"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/api/routes"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/op-service/httputil"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum/log"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/prometheus/client_golang/prometheus"
)
const
ethereumAddressRegex
=
`^0x[a-fA-F0-9]{40}$`
// Api ... Indexer API struct
// TODO : Structured error responses
type
API
struct
{
log
log
.
Logger
router
*
chi
.
Mux
serverConfig
config
.
ServerConfig
metricsConfig
config
.
ServerConfig
metricsRegistry
*
prometheus
.
Registry
}
const
(
MetricsNamespace
=
"op_indexer_api"
addressParam
=
"{address:%s}"
...
...
@@ -45,6 +36,23 @@ const (
WithdrawalsPath
=
"/api/v0/withdrawals/"
)
// Api ... Indexer API struct
// TODO : Structured error responses
type
APIService
struct
{
log
log
.
Logger
router
*
chi
.
Mux
bv
database
.
BridgeTransfersView
dbClose
func
()
error
metricsRegistry
*
prometheus
.
Registry
apiServer
*
httputil
.
HTTPServer
metricsServer
*
httputil
.
HTTPServer
stopped
atomic
.
Bool
}
// chiMetricsMiddleware ... Injects a metrics recorder into request processing middleware
func
chiMetricsMiddleware
(
rec
metrics
.
HTTPRecorder
)
func
(
http
.
Handler
)
http
.
Handler
{
return
func
(
next
http
.
Handler
)
http
.
Handler
{
...
...
@@ -53,112 +61,116 @@ func chiMetricsMiddleware(rec metrics.HTTPRecorder) func(http.Handler) http.Hand
}
// NewApi ... Construct a new api instance
func
NewApi
(
logger
log
.
Logger
,
bv
database
.
BridgeTransfersView
,
serverConfig
config
.
ServerConfig
,
metricsConfig
config
.
ServerConfig
)
*
API
{
// (1) Initialize dependencies
apiRouter
:=
chi
.
NewRouter
()
h
:=
routes
.
NewRoutes
(
logger
,
bv
,
apiRouter
)
mr
:=
metrics
.
NewRegistry
()
promRecorder
:=
metrics
.
NewPromHTTPRecorder
(
mr
,
MetricsNamespace
)
// (2) Inject routing middleware
apiRouter
.
Use
(
chiMetricsMiddleware
(
promRecorder
))
apiRouter
.
Use
(
middleware
.
Recoverer
)
apiRouter
.
Use
(
middleware
.
Heartbeat
(
HealthPath
))
func
NewApi
(
ctx
context
.
Context
,
log
log
.
Logger
,
cfg
*
Config
)
(
*
APIService
,
error
)
{
out
:=
&
APIService
{
log
:
log
,
metricsRegistry
:
metrics
.
NewRegistry
()}
if
err
:=
out
.
initFromConfig
(
ctx
,
cfg
);
err
!=
nil
{
return
nil
,
errors
.
Join
(
err
,
out
.
Stop
(
ctx
))
// close any resources we may have opened already
}
return
out
,
nil
}
// (3) Set GET routes
apiRouter
.
Get
(
fmt
.
Sprintf
(
DepositsPath
+
addressParam
,
ethereumAddressRegex
),
h
.
L1DepositsHandler
)
apiRouter
.
Get
(
fmt
.
Sprintf
(
WithdrawalsPath
+
addressParam
,
ethereumAddressRegex
),
h
.
L2WithdrawalsHandler
)
func
(
a
*
APIService
)
initFromConfig
(
ctx
context
.
Context
,
cfg
*
Config
)
error
{
if
err
:=
a
.
initDB
(
ctx
,
cfg
.
DB
);
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to init DB: %w"
,
err
)
}
if
err
:=
a
.
startMetricsServer
(
cfg
.
MetricsServer
);
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to start metrics server: %w"
,
err
)
}
a
.
initRouter
()
if
err
:=
a
.
startServer
(
cfg
.
HTTPServer
);
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to start API server: %w"
,
err
)
}
return
nil
}
return
&
API
{
log
:
logger
,
router
:
apiRouter
,
metricsRegistry
:
mr
,
serverConfig
:
serverConfig
,
metricsConfig
:
metricsConfig
}
func
(
a
*
APIService
)
Start
(
ctx
context
.
Context
)
error
{
// Completed all setup-up jobs at init-time already,
// and the API service does not have any other special starting routines or background-jobs to start.
return
nil
}
// Run ... Runs the API server routines
func
(
a
*
API
)
Run
(
ctx
context
.
Context
)
error
{
var
wg
sync
.
WaitGroup
errCh
:=
make
(
chan
error
,
2
)
// (1) Construct an inner function that will start a goroutine
// and handle any panics that occur on a shared error channel
processCtx
,
processCancel
:=
context
.
WithCancel
(
ctx
)
runProcess
:=
func
(
start
func
(
ctx
context
.
Context
)
error
)
{
wg
.
Add
(
1
)
go
func
()
{
defer
func
()
{
if
err
:=
recover
();
err
!=
nil
{
a
.
log
.
Error
(
"halting api on panic"
,
"err"
,
err
)
debug
.
PrintStack
()
errCh
<-
fmt
.
Errorf
(
"panic: %v"
,
err
)
}
processCancel
()
wg
.
Done
()
}()
errCh
<-
start
(
processCtx
)
}()
func
(
a
*
APIService
)
Stop
(
ctx
context
.
Context
)
error
{
var
result
error
if
a
.
apiServer
!=
nil
{
if
err
:=
a
.
apiServer
.
Stop
(
ctx
);
err
!=
nil
{
result
=
errors
.
Join
(
result
,
fmt
.
Errorf
(
"failed to stop API server: %w"
,
err
))
}
}
if
a
.
metricsServer
!=
nil
{
if
err
:=
a
.
metricsServer
.
Stop
(
ctx
);
err
!=
nil
{
result
=
errors
.
Join
(
result
,
fmt
.
Errorf
(
"failed to stop metrics server: %w"
,
err
))
}
}
if
a
.
dbClose
!=
nil
{
if
err
:=
a
.
dbClose
();
err
!=
nil
{
result
=
errors
.
Join
(
result
,
fmt
.
Errorf
(
"failed to close DB: %w"
,
err
))
}
}
a
.
stopped
.
Store
(
true
)
a
.
log
.
Info
(
"API service shutdown complete"
)
return
result
}
// (2) Start the API and metrics servers
r
unProcess
(
a
.
startServer
)
runProcess
(
a
.
startMetricsServer
)
func
(
a
*
APIService
)
Stopped
()
bool
{
r
eturn
a
.
stopped
.
Load
(
)
}
// (3) Wait for all processes to complete
wg
.
Wait
()
// Addr ... returns the address that the HTTP server is listening on (excl. http:// prefix, just the host and port)
func
(
a
*
APIService
)
Addr
()
string
{
if
a
.
apiServer
==
nil
{
return
""
}
return
a
.
apiServer
.
Addr
()
.
String
()
}
err
:=
<-
errCh
func
(
a
*
APIService
)
initDB
(
ctx
context
.
Context
,
connector
DBConnector
)
error
{
db
,
err
:=
connector
.
OpenDB
(
ctx
,
a
.
log
)
if
err
!=
nil
{
a
.
log
.
Error
(
"api stopped"
,
"err"
,
err
)
}
else
{
a
.
log
.
Info
(
"api stopped"
)
return
fmt
.
Errorf
(
"failed to connect to databse: %w"
,
err
)
}
return
err
a
.
dbClose
=
db
.
Closer
a
.
bv
=
db
.
BridgeTransfers
return
nil
}
// Port ... Returns the the port that server is listening on
func
(
a
*
API
)
Port
()
int
{
return
a
.
serverConfig
.
Port
func
(
a
*
APIService
)
initRouter
()
{
apiRouter
:=
chi
.
NewRouter
()
h
:=
routes
.
NewRoutes
(
a
.
log
,
a
.
bv
,
apiRouter
)
promRecorder
:=
metrics
.
NewPromHTTPRecorder
(
a
.
metricsRegistry
,
MetricsNamespace
)
// (2) Inject routing middleware
apiRouter
.
Use
(
chiMetricsMiddleware
(
promRecorder
))
apiRouter
.
Use
(
middleware
.
Recoverer
)
apiRouter
.
Use
(
middleware
.
Heartbeat
(
HealthPath
))
// (3) Set GET routes
apiRouter
.
Get
(
fmt
.
Sprintf
(
DepositsPath
+
addressParam
,
ethereumAddressRegex
),
h
.
L1DepositsHandler
)
apiRouter
.
Get
(
fmt
.
Sprintf
(
WithdrawalsPath
+
addressParam
,
ethereumAddressRegex
),
h
.
L2WithdrawalsHandler
)
a
.
router
=
apiRouter
}
// startServer ... Starts the API server
func
(
a
*
API
)
startServer
(
ctx
context
.
Context
)
error
{
a
.
log
.
Debug
(
"
api server listening..."
,
"port"
,
a
.
serverConfig
.
Port
)
addr
:=
net
.
JoinHostPort
(
a
.
serverConfig
.
Host
,
strconv
.
Itoa
(
a
.
serverConfig
.
Port
))
func
(
a
*
API
Service
)
startServer
(
serverConfig
config
.
ServerConfig
)
error
{
a
.
log
.
Debug
(
"
API server listening..."
,
"port"
,
serverConfig
.
Port
)
addr
:=
net
.
JoinHostPort
(
serverConfig
.
Host
,
strconv
.
Itoa
(
serverConfig
.
Port
))
srv
,
err
:=
httputil
.
StartHTTPServer
(
addr
,
a
.
router
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to start API server: %w"
,
err
)
}
host
,
portStr
,
err
:=
net
.
SplitHostPort
(
srv
.
Addr
()
.
String
())
if
err
!=
nil
{
return
errors
.
Join
(
err
,
srv
.
Close
())
}
port
,
err
:=
strconv
.
Atoi
(
portStr
)
if
err
!=
nil
{
return
errors
.
Join
(
err
,
srv
.
Close
())
}
// Update the port in the config in case the OS chose a different port
// than the one we requested (e.g. using port 0 to fetch a random open port)
a
.
serverConfig
.
Host
=
host
a
.
serverConfig
.
Port
=
port
<-
ctx
.
Done
()
if
err
:=
srv
.
Stop
(
context
.
Background
());
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to shutdown api server: %w"
,
err
)
}
a
.
log
.
Info
(
"API server started"
,
"addr"
,
srv
.
Addr
()
.
String
())
a
.
apiServer
=
srv
return
nil
}
// startMetricsServer ... Starts the metrics server
func
(
a
*
API
)
startMetricsServer
(
ctx
context
.
Context
)
error
{
a
.
log
.
Debug
(
"starting metrics server..."
,
"port"
,
a
.
metricsConfig
.
Port
)
srv
,
err
:=
metrics
.
StartServer
(
a
.
metricsRegistry
,
a
.
metricsConfig
.
Host
,
a
.
metricsConfig
.
Port
)
func
(
a
*
API
Service
)
startMetricsServer
(
metricsConfig
config
.
ServerConfig
)
error
{
a
.
log
.
Debug
(
"starting metrics server..."
,
"port"
,
metricsConfig
.
Port
)
srv
,
err
:=
metrics
.
StartServer
(
a
.
metricsRegistry
,
metricsConfig
.
Host
,
metricsConfig
.
Port
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to start metrics server: %w"
,
err
)
}
<-
ctx
.
Done
(
)
defer
a
.
log
.
Info
(
"metrics server stopped"
)
return
srv
.
Stop
(
context
.
Background
())
a
.
log
.
Info
(
"Metrics server started"
,
"addr"
,
srv
.
Addr
()
.
String
()
)
a
.
metricsServer
=
srv
return
nil
}
indexer/api/api_test.go
View file @
ebe2f2ff
package
api
import
(
"context"
"encoding/json"
"fmt"
"net/http"
...
...
@@ -24,11 +25,12 @@ var mockAddress = "0x4204204204204204204204204204204204204204"
var
apiConfig
=
config
.
ServerConfig
{
Host
:
"localhost"
,
Port
:
8080
,
Port
:
0
,
// random port, to allow parallel tests
}
var
metricsConfig
=
config
.
ServerConfig
{
Host
:
"localhost"
,
Port
:
7300
,
Port
:
0
,
// random port, to allow parallel tests
}
var
(
...
...
@@ -95,8 +97,14 @@ func (mbv *MockBridgeTransfersView) L2BridgeWithdrawalsByAddress(address common.
}
func
TestHealthz
(
t
*
testing
.
T
)
{
logger
:=
testlog
.
Logger
(
t
,
log
.
LvlInfo
)
api
:=
NewApi
(
logger
,
&
MockBridgeTransfersView
{},
apiConfig
,
metricsConfig
)
request
,
err
:=
http
.
NewRequest
(
"GET"
,
"/healthz"
,
nil
)
cfg
:=
&
Config
{
DB
:
&
TestDBConnector
{
BridgeTransfers
:
&
MockBridgeTransfersView
{}},
HTTPServer
:
apiConfig
,
MetricsServer
:
metricsConfig
,
}
api
,
err
:=
NewApi
(
context
.
Background
(),
logger
,
cfg
)
require
.
NoError
(
t
,
err
)
request
,
err
:=
http
.
NewRequest
(
"GET"
,
"http://"
+
api
.
Addr
()
+
"/healthz"
,
nil
)
assert
.
Nil
(
t
,
err
)
responseRecorder
:=
httptest
.
NewRecorder
()
...
...
@@ -107,8 +115,14 @@ func TestHealthz(t *testing.T) {
func
TestL1BridgeDepositsHandler
(
t
*
testing
.
T
)
{
logger
:=
testlog
.
Logger
(
t
,
log
.
LvlInfo
)
api
:=
NewApi
(
logger
,
&
MockBridgeTransfersView
{},
apiConfig
,
metricsConfig
)
request
,
err
:=
http
.
NewRequest
(
"GET"
,
fmt
.
Sprintf
(
"/api/v0/deposits/%s"
,
mockAddress
),
nil
)
cfg
:=
&
Config
{
DB
:
&
TestDBConnector
{
BridgeTransfers
:
&
MockBridgeTransfersView
{}},
HTTPServer
:
apiConfig
,
MetricsServer
:
metricsConfig
,
}
api
,
err
:=
NewApi
(
context
.
Background
(),
logger
,
cfg
)
require
.
NoError
(
t
,
err
)
request
,
err
:=
http
.
NewRequest
(
"GET"
,
fmt
.
Sprintf
(
"http://"
+
api
.
Addr
()
+
"/api/v0/deposits/%s"
,
mockAddress
),
nil
)
assert
.
Nil
(
t
,
err
)
responseRecorder
:=
httptest
.
NewRecorder
()
...
...
@@ -130,8 +144,14 @@ func TestL1BridgeDepositsHandler(t *testing.T) {
func
TestL2BridgeWithdrawalsByAddressHandler
(
t
*
testing
.
T
)
{
logger
:=
testlog
.
Logger
(
t
,
log
.
LvlInfo
)
api
:=
NewApi
(
logger
,
&
MockBridgeTransfersView
{},
apiConfig
,
metricsConfig
)
request
,
err
:=
http
.
NewRequest
(
"GET"
,
fmt
.
Sprintf
(
"/api/v0/withdrawals/%s"
,
mockAddress
),
nil
)
cfg
:=
&
Config
{
DB
:
&
TestDBConnector
{
BridgeTransfers
:
&
MockBridgeTransfersView
{}},
HTTPServer
:
apiConfig
,
MetricsServer
:
metricsConfig
,
}
api
,
err
:=
NewApi
(
context
.
Background
(),
logger
,
cfg
)
require
.
NoError
(
t
,
err
)
request
,
err
:=
http
.
NewRequest
(
"GET"
,
fmt
.
Sprintf
(
"http://"
+
api
.
Addr
()
+
"/api/v0/withdrawals/%s"
,
mockAddress
),
nil
)
assert
.
Nil
(
t
,
err
)
responseRecorder
:=
httptest
.
NewRecorder
()
...
...
indexer/api/config.go
0 → 100644
View file @
ebe2f2ff
package
api
import
(
"context"
"fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
)
// DB represents the abstract DB access the API has.
type
DB
struct
{
BridgeTransfers
database
.
BridgeTransfersView
Closer
func
()
error
}
// DBConfigConnector implements a fully config based DBConnector
type
DBConfigConnector
struct
{
config
.
DBConfig
}
func
(
cfg
*
DBConfigConnector
)
OpenDB
(
ctx
context
.
Context
,
log
log
.
Logger
)
(
*
DB
,
error
)
{
// TODO: pass through the ctx argument, so we can interrupt while connecting
db
,
err
:=
database
.
NewDB
(
log
,
cfg
.
DBConfig
)
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"failed to connect to databse: %w"
,
err
)
}
return
&
DB
{
BridgeTransfers
:
db
.
BridgeTransfers
,
Closer
:
db
.
Close
,
},
nil
}
type
TestDBConnector
struct
{
BridgeTransfers
database
.
BridgeTransfersView
}
func
(
tdb
*
TestDBConnector
)
OpenDB
(
ctx
context
.
Context
,
log
log
.
Logger
)
(
*
DB
,
error
)
{
return
&
DB
{
BridgeTransfers
:
tdb
.
BridgeTransfers
,
Closer
:
func
()
error
{
log
.
Info
(
"API service closed test DB view"
)
return
nil
},
},
nil
}
// DBConnector is an interface: the config may provide different ways to access the DB.
// This is implemented in tests to provide custom DB views, or share the DB with other services.
type
DBConnector
interface
{
OpenDB
(
ctx
context
.
Context
,
log
log
.
Logger
)
(
*
DB
,
error
)
}
// Config for the API service
type
Config
struct
{
DB
DBConnector
HTTPServer
config
.
ServerConfig
MetricsServer
config
.
ServerConfig
}
indexer/cmd/indexer/cli.go
View file @
ebe2f2ff
package
main
import
(
"context"
"github.com/urfave/cli/v2"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum-optimism/optimism/indexer"
"github.com/ethereum-optimism/optimism/indexer/api"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
oplog
"github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum/go-ethereum/params"
"github.com/urfave/cli/v2"
)
var
(
...
...
@@ -27,7 +31,7 @@ var (
}
)
func
runIndexer
(
ctx
*
cli
.
Context
)
error
{
func
runIndexer
(
ctx
*
cli
.
Context
,
shutdown
context
.
CancelCauseFunc
)
(
cliapp
.
Lifecycle
,
error
)
{
log
:=
oplog
.
NewLogger
(
oplog
.
AppOut
(
ctx
),
oplog
.
ReadCLIConfig
(
ctx
))
.
New
(
"role"
,
"indexer"
)
oplog
.
SetGlobalLogHandler
(
log
.
GetHandler
())
log
.
Info
(
"running indexer..."
)
...
...
@@ -35,26 +39,13 @@ func runIndexer(ctx *cli.Context) error {
cfg
,
err
:=
config
.
LoadConfig
(
log
,
ctx
.
String
(
ConfigFlag
.
Name
))
if
err
!=
nil
{
log
.
Error
(
"failed to load config"
,
"err"
,
err
)
return
err
}
db
,
err
:=
database
.
NewDB
(
log
,
cfg
.
DB
)
if
err
!=
nil
{
log
.
Error
(
"failed to connect to database"
,
"err"
,
err
)
return
err
}
defer
db
.
Close
()
indexer
,
err
:=
indexer
.
NewIndexer
(
log
,
db
,
cfg
.
Chain
,
cfg
.
RPCs
,
cfg
.
HTTPServer
,
cfg
.
MetricsServer
)
if
err
!=
nil
{
log
.
Error
(
"failed to create indexer"
,
"err"
,
err
)
return
err
return
nil
,
err
}
return
indexer
.
Run
(
ctx
.
Context
)
return
indexer
.
NewIndexer
(
ctx
.
Context
,
log
,
&
cfg
,
shutdown
)
}
func
runApi
(
ctx
*
cli
.
Context
)
error
{
func
runApi
(
ctx
*
cli
.
Context
,
_
context
.
CancelCauseFunc
)
(
cliapp
.
Lifecycle
,
error
)
{
log
:=
oplog
.
NewLogger
(
oplog
.
AppOut
(
ctx
),
oplog
.
ReadCLIConfig
(
ctx
))
.
New
(
"role"
,
"api"
)
oplog
.
SetGlobalLogHandler
(
log
.
GetHandler
())
log
.
Info
(
"running api..."
)
...
...
@@ -62,18 +53,16 @@ func runApi(ctx *cli.Context) error {
cfg
,
err
:=
config
.
LoadConfig
(
log
,
ctx
.
String
(
ConfigFlag
.
Name
))
if
err
!=
nil
{
log
.
Error
(
"failed to load config"
,
"err"
,
err
)
return
err
return
nil
,
err
}
db
,
err
:=
database
.
NewDB
(
log
,
cfg
.
DB
)
if
err
!=
nil
{
log
.
Error
(
"failed to connect to database"
,
"err"
,
err
)
return
err
apiCfg
:=
&
api
.
Config
{
DB
:
&
api
.
DBConfigConnector
{
DBConfig
:
cfg
.
DB
},
HTTPServer
:
cfg
.
HTTPServer
,
MetricsServer
:
cfg
.
MetricsServer
,
}
defer
db
.
Close
()
api
:=
api
.
NewApi
(
log
,
db
.
BridgeTransfers
,
cfg
.
HTTPServer
,
cfg
.
MetricsServer
)
return
api
.
Run
(
ctx
.
Context
)
return
api
.
NewApi
(
ctx
.
Context
,
log
,
apiCfg
)
}
func
runMigrations
(
ctx
*
cli
.
Context
)
error
{
...
...
@@ -112,13 +101,13 @@ func newCli(GitCommit string, GitDate string) *cli.App {
Name
:
"api"
,
Flags
:
flags
,
Description
:
"Runs the api service"
,
Action
:
runApi
,
Action
:
cliapp
.
LifecycleCmd
(
runApi
)
,
},
{
Name
:
"index"
,
Flags
:
flags
,
Description
:
"Runs the indexing service"
,
Action
:
runIndexer
,
Action
:
cliapp
.
LifecycleCmd
(
runIndexer
)
,
},
{
Name
:
"migrate"
,
...
...
indexer/e2e_tests/setup.go
View file @
ebe2f2ff
...
...
@@ -34,7 +34,7 @@ type E2ETestSuite struct {
// API
Client
*
client
.
Client
API
*
api
.
API
API
*
api
.
API
Service
// Indexer
DB
*
database
.
DB
...
...
@@ -73,7 +73,7 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
t
.
Cleanup
(
func
()
{
opSys
.
Close
()
})
// Indexer Configuration and Start
indexerCfg
:=
config
.
Config
{
indexerCfg
:=
&
config
.
Config
{
DB
:
config
.
DBConfig
{
Host
:
"127.0.0.1"
,
Port
:
5432
,
...
...
@@ -106,51 +106,40 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
// the system is running, mark this test for Parallel execution
t
.
Parallel
()
// provide a DB for the unit test. disable logging
silentLog
:=
testlog
.
Logger
(
t
,
log
.
LvlInfo
)
silentLog
.
SetHandler
(
log
.
DiscardHandler
())
db
,
err
:=
database
.
NewDB
(
silentLog
,
indexerCfg
.
DB
)
require
.
NoError
(
t
,
err
)
t
.
Cleanup
(
func
()
{
db
.
Close
()
})
indexerLog
:=
testlog
.
Logger
(
t
,
log
.
LvlInfo
)
.
New
(
"role"
,
"indexer"
)
indexer
,
err
:=
indexer
.
NewIndexer
(
indexerLog
,
db
,
indexerCfg
.
Chain
,
indexerCfg
.
RPCs
,
indexerCfg
.
HTTPServer
,
indexerCfg
.
MetricsServer
)
ix
,
err
:=
indexer
.
NewIndexer
(
context
.
Background
(),
indexerLog
,
indexerCfg
,
func
(
cause
error
)
{
if
cause
!=
nil
{
t
.
Fatalf
(
"indexer shut down with critical error: %v"
,
cause
)
}
})
require
.
NoError
(
t
,
err
)
indexerCtx
,
indexerStop
:=
context
.
WithCancel
(
context
.
Background
())
go
func
()
{
err
:=
indexer
.
Run
(
indexerCtx
)
if
err
!=
nil
{
// panicking here ensures that the test will exit
// during service failure. Using t.Fail() wouldn't be caught
// until all awaiting routines finish which would never happen.
panic
(
err
)
}
}()
require
.
NoError
(
t
,
ix
.
Start
(
context
.
Background
()),
"cleanly start indexer"
)
apiLog
:=
testlog
.
Logger
(
t
,
log
.
LvlInfo
)
.
New
(
"role"
,
"indexer_api"
)
t
.
Cleanup
(
func
()
{
require
.
NoError
(
t
,
ix
.
Stop
(
context
.
Background
()),
"cleanly shut down indexer"
)
})
apiCfg
:=
config
.
ServerConfig
{
Host
:
"127.0.0.1"
,
Port
:
0
,
}
apiLog
:=
testlog
.
Logger
(
t
,
log
.
LvlInfo
)
.
New
(
"role"
,
"indexer_api"
)
mCfg
:=
config
.
ServerConfig
{
Host
:
"127.0.0.1"
,
Port
:
0
,
apiCfg
:=
&
api
.
Config
{
DB
:
&
api
.
TestDBConnector
{
BridgeTransfers
:
ix
.
DB
.
BridgeTransfers
},
// reuse the same DB
HTTPServer
:
config
.
ServerConfig
{
Host
:
"127.0.0.1"
,
Port
:
0
,
},
MetricsServer
:
config
.
ServerConfig
{
Host
:
"127.0.0.1"
,
Port
:
0
,
},
}
api
:=
api
.
NewApi
(
apiLog
,
db
.
BridgeTransfers
,
apiCfg
,
mCfg
)
apiCtx
,
apiStop
:=
context
.
WithCancel
(
context
.
Background
())
go
func
()
{
err
:=
api
.
Run
(
apiCtx
)
if
err
!=
nil
{
panic
(
err
)
}
}()
apiService
,
err
:=
api
.
NewApi
(
context
.
Background
(),
apiLog
,
apiCfg
)
require
.
NoError
(
t
,
err
,
"create indexer API service"
)
require
.
NoError
(
t
,
apiService
.
Start
(
context
.
Background
()),
"start indexer API service"
)
t
.
Cleanup
(
func
()
{
apiStop
()
indexerStop
()
require
.
NoError
(
t
,
apiService
.
Stop
(
context
.
Background
()),
"cleanly shut down indexer"
)
})
// Wait for the API to start listening
...
...
@@ -158,16 +147,15 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
client
,
err
:=
client
.
NewClient
(
&
client
.
Config
{
PaginationLimit
:
100
,
BaseURL
:
fmt
.
Sprintf
(
"http://%s:%d"
,
apiCfg
.
Host
,
api
.
Port
()
),
BaseURL
:
"http://"
+
apiService
.
Addr
(
),
})
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
,
"must open indexer API client"
)
return
E2ETestSuite
{
t
:
t
,
Client
:
client
,
DB
:
db
,
Indexer
:
i
ndexer
,
DB
:
ix
.
DB
,
Indexer
:
i
x
,
OpCfg
:
&
opCfg
,
OpSys
:
opSys
,
L1Client
:
opSys
.
Clients
[
"l1"
],
...
...
indexer/etl/etl.go
View file @
ebe2f2ff
...
...
@@ -7,11 +7,13 @@ import (
"math/big"
"time"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/op-service/clock"
)
type
Config
struct
{
...
...
@@ -31,9 +33,15 @@ type ETL struct {
headerTraversal
*
node
.
HeaderTraversal
contracts
[]
common
.
Address
etlBatches
chan
ETLBatch
etlBatches
chan
*
ETLBatch
EthClient
node
.
EthClient
// A reference that'll stay populated between intervals
// in the event of failures in order to retry.
headers
[]
types
.
Header
worker
*
clock
.
LoopFn
}
type
ETLBatch
struct
{
...
...
@@ -46,51 +54,54 @@ type ETLBatch struct {
HeadersWithLog
map
[
common
.
Hash
]
bool
}
func
(
etl
*
ETL
)
Start
(
ctx
context
.
Context
)
error
{
done
:=
ctx
.
Done
()
pollTicker
:=
time
.
NewTicker
(
etl
.
loopInterval
)
defer
pollTicker
.
Stop
()
// Start starts the ETL polling routine. The ETL work should be stopped with Close().
func
(
etl
*
ETL
)
Start
()
error
{
if
etl
.
worker
!=
nil
{
return
errors
.
New
(
"already started"
)
}
etl
.
log
.
Info
(
"starting etl..."
)
etl
.
worker
=
clock
.
NewLoopFn
(
clock
.
SystemClock
,
etl
.
tick
,
func
()
error
{
close
(
etl
.
etlBatches
)
// can close the channel now, to signal to the consumer that we're done
etl
.
log
.
Info
(
"stopped etl worker loop"
)
return
nil
},
etl
.
loopInterval
)
return
nil
}
func
(
etl
*
ETL
)
Close
()
error
{
if
etl
.
worker
==
nil
{
return
nil
// worker was not running
}
return
etl
.
worker
.
Close
()
}
// A reference that'll stay populated between intervals
// in the event of failures in order to retry.
var
headers
[]
types
.
Header
func
(
etl
*
ETL
)
tick
(
_
context
.
Context
)
{
done
:=
etl
.
metrics
.
RecordInterval
()
if
len
(
etl
.
headers
)
>
0
{
etl
.
log
.
Info
(
"retrying previous batch"
)
}
else
{
newHeaders
,
err
:=
etl
.
headerTraversal
.
NextHeaders
(
etl
.
headerBufferSize
)
if
err
!=
nil
{
etl
.
log
.
Error
(
"error querying for headers"
,
"err"
,
err
)
}
else
if
len
(
newHeaders
)
==
0
{
etl
.
log
.
Warn
(
"no new headers. etl at head?"
)
}
else
{
etl
.
headers
=
newHeaders
}
etl
.
log
.
Info
(
"starting etl..."
)
for
{
select
{
case
<-
done
:
etl
.
log
.
Info
(
"stopping etl"
)
return
nil
case
<-
pollTicker
.
C
:
done
:=
etl
.
metrics
.
RecordInterval
()
if
len
(
headers
)
>
0
{
etl
.
log
.
Info
(
"retrying previous batch"
)
}
else
{
newHeaders
,
err
:=
etl
.
headerTraversal
.
NextHeaders
(
etl
.
headerBufferSize
)
if
err
!=
nil
{
etl
.
log
.
Error
(
"error querying for headers"
,
"err"
,
err
)
}
else
if
len
(
newHeaders
)
==
0
{
etl
.
log
.
Warn
(
"no new headers. etl at head?"
)
}
else
{
headers
=
newHeaders
}
latestHeader
:=
etl
.
headerTraversal
.
LatestHeader
()
if
latestHeader
!=
nil
{
etl
.
metrics
.
RecordLatestHeight
(
latestHeader
.
Number
)
}
}
// only clear the reference if we were able to process this batch
err
:=
etl
.
processBatch
(
headers
)
if
err
==
nil
{
headers
=
nil
}
done
(
err
)
latestHeader
:=
etl
.
headerTraversal
.
LatestHeader
()
if
latestHeader
!=
nil
{
etl
.
metrics
.
RecordLatestHeight
(
latestHeader
.
Number
)
}
}
// only clear the reference if we were able to process this batch
err
:=
etl
.
processBatch
(
etl
.
headers
)
if
err
==
nil
{
etl
.
headers
=
nil
}
done
(
err
)
}
func
(
etl
*
ETL
)
processBatch
(
headers
[]
types
.
Header
)
error
{
...
...
@@ -143,6 +154,6 @@ func (etl *ETL) processBatch(headers []types.Header) error {
// ensure we use unique downstream references for the etl batch
headersRef
:=
headers
etl
.
etlBatches
<-
ETLBatch
{
Logger
:
batchLog
,
Headers
:
headersRef
,
HeaderMap
:
headerMap
,
Logs
:
logs
.
Logs
,
HeadersWithLog
:
headersWithLog
}
etl
.
etlBatches
<-
&
ETLBatch
{
Logger
:
batchLog
,
Headers
:
headersRef
,
HeaderMap
:
headerMap
,
Logs
:
logs
.
Logs
,
HeadersWithLog
:
headersWithLog
}
return
nil
}
indexer/etl/l1_etl.go
View file @
ebe2f2ff
...
...
@@ -4,10 +4,13 @@ import (
"context"
"errors"
"fmt"
"runtime/debug"
"strings"
"sync"
"time"
"golang.org/x/sync/errgroup"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node"
...
...
@@ -20,8 +23,16 @@ import (
type
L1ETL
struct
{
ETL
db
*
database
.
DB
mu
*
sync
.
Mutex
// the batch handler may do work that we can interrupt on shutdown
resourceCtx
context
.
Context
resourceCancel
context
.
CancelFunc
tasks
errgroup
.
Group
db
*
database
.
DB
mu
sync
.
Mutex
listeners
[]
chan
interface
{}
}
...
...
@@ -71,8 +82,10 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
}
// NOTE - The use of un-buffered channel here assumes that downstream consumers
// will be able to keep up with the rate of incoming batches
etlBatches
:=
make
(
chan
ETLBatch
)
// will be able to keep up with the rate of incoming batches.
// When the producer closes the channel we stop consuming from it.
etlBatches
:=
make
(
chan
*
ETLBatch
)
etl
:=
ETL
{
loopInterval
:
time
.
Duration
(
cfg
.
LoopIntervalMsec
)
*
time
.
Millisecond
,
headerBufferSize
:
uint64
(
cfg
.
HeaderBufferSize
),
...
...
@@ -86,82 +99,120 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
EthClient
:
client
,
}
return
&
L1ETL
{
ETL
:
etl
,
db
:
db
,
mu
:
new
(
sync
.
Mutex
)},
nil
resCtx
,
resCancel
:=
context
.
WithCancel
(
context
.
Background
())
return
&
L1ETL
{
ETL
:
etl
,
db
:
db
,
resourceCtx
:
resCtx
,
resourceCancel
:
resCancel
,
},
nil
}
func
(
l1Etl
*
L1ETL
)
Start
(
ctx
context
.
Context
)
error
{
errCh
:=
make
(
chan
error
,
1
)
go
func
()
{
errCh
<-
l1Etl
.
ETL
.
Start
(
ctx
)
}()
func
(
l1Etl
*
L1ETL
)
Close
()
error
{
var
result
error
// close the producer
if
err
:=
l1Etl
.
ETL
.
Close
();
err
!=
nil
{
result
=
errors
.
Join
(
result
,
fmt
.
Errorf
(
"failed to close internal ETL: %w"
,
err
))
}
// tell the consumer it can stop what it's doing
l1Etl
.
resourceCancel
()
// wait for consumer to pick up on closure of producer
if
err
:=
l1Etl
.
tasks
.
Wait
();
err
!=
nil
{
result
=
errors
.
Join
(
result
,
fmt
.
Errorf
(
"failed to await batch handler completion: %w"
,
err
))
}
return
result
}
f
or
{
select
{
case
err
:=
<-
errCh
:
return
err
// Index incoming batches (only L1 blocks that have an emitted log)
case
batch
:=
<-
l1Etl
.
etlBatches
:
l1BlockHeaders
:=
make
([]
database
.
L1BlockHeader
,
0
,
len
(
batch
.
Headers
))
for
i
:=
range
batch
.
Headers
{
if
_
,
ok
:=
batch
.
HeadersWithLog
[
batch
.
Headers
[
i
]
.
Hash
()];
ok
{
l1BlockHeaders
=
append
(
l1BlockHeaders
,
database
.
L1BlockHeader
{
BlockHeader
:
database
.
BlockHeaderFromHeader
(
&
batch
.
Headers
[
i
])}
)
}
func
(
l1Etl
*
L1ETL
)
Start
(
shutdown
context
.
CancelCauseFunc
)
err
or
{
// start ETL batch producer
if
err
:=
l1Etl
.
ETL
.
Start
();
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to start internal ETL: %w"
,
err
)
}
// start ETL batch consumer
l1Etl
.
tasks
.
Go
(
func
()
error
{
defer
func
()
{
if
err
:=
recover
();
err
!=
nil
{
l1Etl
.
log
.
Error
(
"halting indexer on L1 ETL panic"
,
"err"
,
err
)
debug
.
PrintStack
(
)
shutdown
(
fmt
.
Errorf
(
"panic: %v"
,
err
))
}
if
len
(
l1BlockHeaders
)
==
0
{
batch
.
Logger
.
Info
(
"no l1 blocks with logs in batch"
)
continue
}()
for
{
// Index incoming batches (only L1 blocks that have an emitted log)
batch
,
ok
:=
<-
l1Etl
.
etlBatches
if
!
ok
{
l1Etl
.
log
.
Info
(
"No more batches, shutting down L1 batch handler"
)
return
nil
}
l1ContractEvents
:=
make
([]
database
.
L1ContractEvent
,
len
(
batch
.
Logs
))
for
i
:=
range
batch
.
Logs
{
timestamp
:=
batch
.
HeaderMap
[
batch
.
Logs
[
i
]
.
BlockHash
]
.
Time
l1ContractEvents
[
i
]
=
database
.
L1ContractEvent
{
ContractEvent
:
database
.
ContractEventFromLog
(
&
batch
.
Logs
[
i
],
timestamp
)}
l1Etl
.
ETL
.
metrics
.
RecordIndexedLog
(
batch
.
Logs
[
i
]
.
Address
)
if
err
:=
l1Etl
.
handleBatch
(
batch
);
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to handle batch, stopping L2 ETL: %w"
,
err
)
}
}
})
return
nil
}
func
(
l1Etl
*
L1ETL
)
handleBatch
(
batch
*
ETLBatch
)
error
{
l1BlockHeaders
:=
make
([]
database
.
L1BlockHeader
,
0
,
len
(
batch
.
Headers
))
for
i
:=
range
batch
.
Headers
{
if
_
,
ok
:=
batch
.
HeadersWithLog
[
batch
.
Headers
[
i
]
.
Hash
()];
ok
{
l1BlockHeaders
=
append
(
l1BlockHeaders
,
database
.
L1BlockHeader
{
BlockHeader
:
database
.
BlockHeaderFromHeader
(
&
batch
.
Headers
[
i
])})
}
}
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out
retryStrategy
:=
&
retry
.
ExponentialStrategy
{
Min
:
1000
,
Max
:
20
_000
,
MaxJitter
:
250
}
if
_
,
err
:=
retry
.
Do
[
interface
{}](
ctx
,
10
,
retryStrategy
,
func
()
(
interface
{},
error
)
{
if
err
:=
l1Etl
.
db
.
Transaction
(
func
(
tx
*
database
.
DB
)
error
{
if
err
:=
tx
.
Blocks
.
StoreL1BlockHeaders
(
l1BlockHeaders
);
err
!=
nil
{
return
err
}
// we must have logs if we have l1 blocks
if
err
:=
tx
.
ContractEvents
.
StoreL1ContractEvents
(
l1ContractEvents
);
err
!=
nil
{
return
err
}
return
nil
});
err
!=
nil
{
batch
.
Logger
.
Error
(
"unable to persist batch"
,
"err"
,
err
)
return
nil
,
err
}
l1Etl
.
ETL
.
metrics
.
RecordIndexedHeaders
(
len
(
l1BlockHeaders
))
l1Etl
.
ETL
.
metrics
.
RecordIndexedLatestHeight
(
l1BlockHeaders
[
len
(
l1BlockHeaders
)
-
1
]
.
Number
)
// a-ok!
return
nil
,
nil
});
err
!=
nil
{
if
len
(
l1BlockHeaders
)
==
0
{
batch
.
Logger
.
Info
(
"no l1 blocks with logs in batch"
)
return
nil
}
l1ContractEvents
:=
make
([]
database
.
L1ContractEvent
,
len
(
batch
.
Logs
))
for
i
:=
range
batch
.
Logs
{
timestamp
:=
batch
.
HeaderMap
[
batch
.
Logs
[
i
]
.
BlockHash
]
.
Time
l1ContractEvents
[
i
]
=
database
.
L1ContractEvent
{
ContractEvent
:
database
.
ContractEventFromLog
(
&
batch
.
Logs
[
i
],
timestamp
)}
l1Etl
.
ETL
.
metrics
.
RecordIndexedLog
(
batch
.
Logs
[
i
]
.
Address
)
}
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out
retryStrategy
:=
&
retry
.
ExponentialStrategy
{
Min
:
1000
,
Max
:
20
_000
,
MaxJitter
:
250
}
if
_
,
err
:=
retry
.
Do
[
interface
{}](
l1Etl
.
resourceCtx
,
10
,
retryStrategy
,
func
()
(
interface
{},
error
)
{
if
err
:=
l1Etl
.
db
.
Transaction
(
func
(
tx
*
database
.
DB
)
error
{
if
err
:=
tx
.
Blocks
.
StoreL1BlockHeaders
(
l1BlockHeaders
);
err
!=
nil
{
return
err
}
batch
.
Logger
.
Info
(
"indexed batch"
)
// Notify Listeners
l1Etl
.
mu
.
Lock
()
for
i
:=
range
l1Etl
.
listeners
{
select
{
case
l1Etl
.
listeners
[
i
]
<-
struct
{}{}
:
default
:
// do nothing if the listener hasn't picked
// up the previous notif
}
// we must have logs if we have l1 blocks
if
err
:=
tx
.
ContractEvents
.
StoreL1ContractEvents
(
l1ContractEvents
);
err
!=
nil
{
return
err
}
l1Etl
.
mu
.
Unlock
()
return
nil
});
err
!=
nil
{
batch
.
Logger
.
Error
(
"unable to persist batch"
,
"err"
,
err
)
return
nil
,
fmt
.
Errorf
(
"unable to persist batch: %w"
,
err
)
}
l1Etl
.
ETL
.
metrics
.
RecordIndexedHeaders
(
len
(
l1BlockHeaders
))
l1Etl
.
ETL
.
metrics
.
RecordIndexedLatestHeight
(
l1BlockHeaders
[
len
(
l1BlockHeaders
)
-
1
]
.
Number
)
// a-ok!
return
nil
,
nil
});
err
!=
nil
{
return
err
}
batch
.
Logger
.
Info
(
"indexed batch"
)
// Notify Listeners
l1Etl
.
mu
.
Lock
()
for
i
:=
range
l1Etl
.
listeners
{
select
{
case
l1Etl
.
listeners
[
i
]
<-
struct
{}{}
:
default
:
// do nothing if the listener hasn't picked
// up the previous notif
}
}
l1Etl
.
mu
.
Unlock
()
return
nil
}
// Notify returns a channel that'll receive a value every time new data has
...
...
indexer/etl/l2_etl.go
View file @
ebe2f2ff
...
...
@@ -3,20 +3,31 @@ package etl
import
(
"context"
"errors"
"fmt"
"runtime/debug"
"time"
"golang.org/x/sync/errgroup"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
type
L2ETL
struct
{
ETL
// the batch handler may do work that we can interrupt on shutdown
resourceCtx
context
.
Context
resourceCancel
context
.
CancelFunc
tasks
errgroup
.
Group
db
*
database
.
DB
}
...
...
@@ -54,7 +65,7 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
log
.
Info
(
"no indexed state, starting from genesis"
)
}
etlBatches
:=
make
(
chan
ETLBatch
)
etlBatches
:=
make
(
chan
*
ETLBatch
)
etl
:=
ETL
{
loopInterval
:
time
.
Duration
(
cfg
.
LoopIntervalMsec
)
*
time
.
Millisecond
,
headerBufferSize
:
uint64
(
cfg
.
HeaderBufferSize
),
...
...
@@ -68,62 +79,101 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
EthClient
:
client
,
}
return
&
L2ETL
{
ETL
:
etl
,
db
:
db
},
nil
resCtx
,
resCancel
:=
context
.
WithCancel
(
context
.
Background
())
return
&
L2ETL
{
ETL
:
etl
,
resourceCtx
:
resCtx
,
resourceCancel
:
resCancel
,
db
:
db
,
},
nil
}
func
(
l2Etl
*
L2ETL
)
Start
(
ctx
context
.
Context
)
error
{
errCh
:=
make
(
chan
error
,
1
)
go
func
()
{
errCh
<-
l2Etl
.
ETL
.
Start
(
ctx
)
}()
for
{
select
{
case
err
:=
<-
errCh
:
return
err
// Index incoming batches (all L2 Blocks)
case
batch
:=
<-
l2Etl
.
etlBatches
:
l2BlockHeaders
:=
make
([]
database
.
L2BlockHeader
,
len
(
batch
.
Headers
))
for
i
:=
range
batch
.
Headers
{
l2BlockHeaders
[
i
]
=
database
.
L2BlockHeader
{
BlockHeader
:
database
.
BlockHeaderFromHeader
(
&
batch
.
Headers
[
i
])}
}
func
(
l2Etl
*
L2ETL
)
Close
()
error
{
var
result
error
// close the producer
if
err
:=
l2Etl
.
ETL
.
Close
();
err
!=
nil
{
result
=
errors
.
Join
(
result
,
fmt
.
Errorf
(
"failed to close internal ETL: %w"
,
err
))
}
// tell the consumer it can stop what it's doing
l2Etl
.
resourceCancel
()
// wait for consumer to pick up on closure of producer
if
err
:=
l2Etl
.
tasks
.
Wait
();
err
!=
nil
{
result
=
errors
.
Join
(
result
,
fmt
.
Errorf
(
"failed to await batch handler completion: %w"
,
err
))
}
return
result
}
func
(
l2Etl
*
L2ETL
)
Start
(
shutdown
context
.
CancelCauseFunc
)
error
{
// start ETL batch producer
if
err
:=
l2Etl
.
ETL
.
Start
();
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to start internal ETL: %w"
,
err
)
}
l2ContractEvents
:=
make
([]
database
.
L2ContractEvent
,
len
(
batch
.
Logs
))
for
i
:=
range
batch
.
Logs
{
timestamp
:=
batch
.
HeaderMap
[
batch
.
Logs
[
i
]
.
BlockHash
]
.
Time
l2ContractEvents
[
i
]
=
database
.
L2ContractEvent
{
ContractEvent
:
database
.
ContractEventFromLog
(
&
batch
.
Logs
[
i
],
timestamp
)}
l2Etl
.
ETL
.
metrics
.
RecordIndexedLog
(
batch
.
Logs
[
i
]
.
Address
)
// start ETL batch consumer
l2Etl
.
tasks
.
Go
(
func
()
error
{
defer
func
()
{
if
err
:=
recover
();
err
!=
nil
{
l2Etl
.
log
.
Error
(
"halting indexer on L2 ETL panic"
,
"err"
,
err
)
debug
.
PrintStack
()
shutdown
(
fmt
.
Errorf
(
"panic: %v"
,
err
))
}
}()
for
{
// Index incoming batches (all L2 blocks)
batch
,
ok
:=
<-
l2Etl
.
etlBatches
if
!
ok
{
l2Etl
.
log
.
Info
(
"No more batches, shutting down L2 batch handler"
)
return
nil
}
if
err
:=
l2Etl
.
handleBatch
(
batch
);
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to handle batch, stopping L2 ETL: %w"
,
err
)
}
}
})
return
nil
}
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out
retryStrategy
:=
&
retry
.
ExponentialStrategy
{
Min
:
1000
,
Max
:
20
_000
,
MaxJitter
:
250
}
if
_
,
err
:=
retry
.
Do
[
interface
{}](
ctx
,
10
,
retryStrategy
,
func
()
(
interface
{},
error
)
{
if
err
:=
l2Etl
.
db
.
Transaction
(
func
(
tx
*
database
.
DB
)
error
{
if
err
:=
tx
.
Blocks
.
StoreL2BlockHeaders
(
l2BlockHeaders
);
err
!=
nil
{
return
err
}
if
len
(
l2ContractEvents
)
>
0
{
if
err
:=
tx
.
ContractEvents
.
StoreL2ContractEvents
(
l2ContractEvents
);
err
!=
nil
{
return
err
}
}
return
nil
});
err
!=
nil
{
batch
.
Logger
.
Error
(
"unable to persist batch"
,
"err"
,
err
)
return
nil
,
err
}
func
(
l2Etl
*
L2ETL
)
handleBatch
(
batch
*
ETLBatch
)
error
{
l2BlockHeaders
:=
make
([]
database
.
L2BlockHeader
,
len
(
batch
.
Headers
))
for
i
:=
range
batch
.
Headers
{
l2BlockHeaders
[
i
]
=
database
.
L2BlockHeader
{
BlockHeader
:
database
.
BlockHeaderFromHeader
(
&
batch
.
Headers
[
i
])}
}
l2Etl
.
ETL
.
metrics
.
RecordIndexedHeaders
(
len
(
l2BlockHeaders
))
l2Etl
.
ETL
.
metrics
.
RecordIndexedLatestHeight
(
l2BlockHeaders
[
len
(
l2BlockHeaders
)
-
1
]
.
Number
)
l2ContractEvents
:=
make
([]
database
.
L2ContractEvent
,
len
(
batch
.
Logs
))
for
i
:=
range
batch
.
Logs
{
timestamp
:=
batch
.
HeaderMap
[
batch
.
Logs
[
i
]
.
BlockHash
]
.
Time
l2ContractEvents
[
i
]
=
database
.
L2ContractEvent
{
ContractEvent
:
database
.
ContractEventFromLog
(
&
batch
.
Logs
[
i
],
timestamp
)}
l2Etl
.
ETL
.
metrics
.
RecordIndexedLog
(
batch
.
Logs
[
i
]
.
Address
)
}
// a-ok!
return
nil
,
nil
});
err
!=
nil
{
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out
retryStrategy
:=
&
retry
.
ExponentialStrategy
{
Min
:
1000
,
Max
:
20
_000
,
MaxJitter
:
250
}
if
_
,
err
:=
retry
.
Do
[
interface
{}](
l2Etl
.
resourceCtx
,
10
,
retryStrategy
,
func
()
(
interface
{},
error
)
{
if
err
:=
l2Etl
.
db
.
Transaction
(
func
(
tx
*
database
.
DB
)
error
{
if
err
:=
tx
.
Blocks
.
StoreL2BlockHeaders
(
l2BlockHeaders
);
err
!=
nil
{
return
err
}
batch
.
Logger
.
Info
(
"indexed batch"
)
if
len
(
l2ContractEvents
)
>
0
{
if
err
:=
tx
.
ContractEvents
.
StoreL2ContractEvents
(
l2ContractEvents
);
err
!=
nil
{
return
err
}
}
return
nil
});
err
!=
nil
{
batch
.
Logger
.
Error
(
"unable to persist batch"
,
"err"
,
err
)
return
nil
,
err
}
l2Etl
.
ETL
.
metrics
.
RecordIndexedHeaders
(
len
(
l2BlockHeaders
))
l2Etl
.
ETL
.
metrics
.
RecordIndexedLatestHeight
(
l2BlockHeaders
[
len
(
l2BlockHeaders
)
-
1
]
.
Number
)
// a-ok!
return
nil
,
nil
});
err
!=
nil
{
return
err
}
batch
.
Logger
.
Info
(
"indexed batch"
)
return
nil
}
indexer/indexer.go
View file @
ebe2f2ff
This diff is collapsed.
Click to expand it.
indexer/node/client.go
View file @
ebe2f2ff
...
...
@@ -40,23 +40,27 @@ type EthClient interface {
StorageHash
(
common
.
Address
,
*
big
.
Int
)
(
common
.
Hash
,
error
)
FilterLogs
(
ethereum
.
FilterQuery
)
(
Logs
,
error
)
// Close closes the underlying RPC connection.
// RPC close does not return any errors, but does shut down e.g. a websocket connection.
Close
()
}
type
clnt
struct
{
rpc
RPC
}
func
DialEthClient
(
rpcUrl
string
,
metrics
Metricer
)
(
EthClient
,
error
)
{
ctx
wt
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
()
,
defaultDialTimeout
)
func
DialEthClient
(
ctx
context
.
Context
,
rpcUrl
string
,
metrics
Metricer
)
(
EthClient
,
error
)
{
ctx
,
cancel
:=
context
.
WithTimeout
(
ctx
,
defaultDialTimeout
)
defer
cancel
()
bOff
:=
retry
.
Exponential
()
rpcClient
,
err
:=
retry
.
Do
(
ctx
wt
,
defaultDialAttempts
,
bOff
,
func
()
(
*
rpc
.
Client
,
error
)
{
rpcClient
,
err
:=
retry
.
Do
(
ctx
,
defaultDialAttempts
,
bOff
,
func
()
(
*
rpc
.
Client
,
error
)
{
if
!
client
.
IsURLAvailable
(
rpcUrl
)
{
return
nil
,
fmt
.
Errorf
(
"address unavailable (%s)"
,
rpcUrl
)
}
client
,
err
:=
rpc
.
DialContext
(
ctx
wt
,
rpcUrl
)
client
,
err
:=
rpc
.
DialContext
(
ctx
,
rpcUrl
)
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"failed to dial address (%s): %w"
,
rpcUrl
,
err
)
}
...
...
@@ -192,6 +196,10 @@ func (c *clnt) StorageHash(address common.Address, blockNumber *big.Int) (common
return
proof
.
StorageHash
,
nil
}
func
(
c
*
clnt
)
Close
()
{
c
.
rpc
.
Close
()
}
type
Logs
struct
{
Logs
[]
types
.
Log
ToBlockHeader
*
types
.
Header
...
...
indexer/node/client_test.go
View file @
ebe2f2ff
package
node
import
(
"context"
"fmt"
"net"
"strings"
...
...
@@ -21,14 +22,14 @@ func TestDialEthClientUnavailable(t *testing.T) {
metrics
:=
&
clientMetrics
{}
// available
_
,
err
=
DialEthClient
(
addr
,
metrics
)
_
,
err
=
DialEthClient
(
context
.
Background
(),
addr
,
metrics
)
require
.
NoError
(
t
,
err
)
// :0 requests a new unbound port
_
,
err
=
DialEthClient
(
"http://localhost:0"
,
metrics
)
_
,
err
=
DialEthClient
(
context
.
Background
(),
"http://localhost:0"
,
metrics
)
require
.
Error
(
t
,
err
)
// Fail open if we don't recognize the scheme
_
,
err
=
DialEthClient
(
"mailto://example.com"
,
metrics
)
_
,
err
=
DialEthClient
(
context
.
Background
(),
"mailto://example.com"
,
metrics
)
require
.
Error
(
t
,
err
)
}
indexer/node/mocks.go
View file @
ebe2f2ff
...
...
@@ -45,3 +45,6 @@ func (m *MockEthClient) FilterLogs(query ethereum.FilterQuery) (Logs, error) {
args
:=
m
.
Called
(
query
)
return
args
.
Get
(
0
)
.
(
Logs
),
args
.
Error
(
1
)
}
func
(
m
*
MockEthClient
)
Close
()
{
}
indexer/processors/bridge.go
View file @
ebe2f2ff
...
...
@@ -3,7 +3,11 @@ package processors
import
(
"context"
"errors"
"fmt"
"math/big"
"runtime/debug"
"golang.org/x/sync/errgroup"
"github.com/ethereum-optimism/optimism/indexer/bigint"
"github.com/ethereum-optimism/optimism/indexer/config"
...
...
@@ -20,6 +24,10 @@ type BridgeProcessor struct {
db
*
database
.
DB
metrics
bridge
.
Metricer
resourceCtx
context
.
Context
resourceCancel
context
.
CancelFunc
tasks
errgroup
.
Group
l1Etl
*
etl
.
L1ETL
chainConfig
config
.
ChainConfig
...
...
@@ -57,11 +65,22 @@ func NewBridgeProcessor(log log.Logger, db *database.DB, metrics bridge.Metricer
log
.
Info
(
"detected latest indexed bridge state"
,
"l1_block_number"
,
l1Height
,
"l2_block_number"
,
l2Height
)
}
return
&
BridgeProcessor
{
log
,
db
,
metrics
,
l1Etl
,
chainConfig
,
l1Header
,
l2Header
},
nil
resCtx
,
resCancel
:=
context
.
WithCancel
(
context
.
Background
())
return
&
BridgeProcessor
{
log
:
log
,
db
:
db
,
metrics
:
metrics
,
l1Etl
:
l1Etl
,
resourceCtx
:
resCtx
,
resourceCancel
:
resCancel
,
chainConfig
:
chainConfig
,
LatestL1Header
:
l1Header
,
LatestL2Header
:
l2Header
,
},
nil
}
func
(
b
*
BridgeProcessor
)
Start
(
ctx
context
.
Context
)
error
{
done
:=
ctx
.
Done
(
)
func
(
b
*
BridgeProcessor
)
Start
(
shutdown
context
.
CancelCauseFunc
)
error
{
b
.
log
.
Info
(
"starting bridge processor..."
)
// Fire off independently on startup to check for
// new data or if we've indexed new L1 data.
...
...
@@ -69,21 +88,43 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
startup
:=
make
(
chan
interface
{},
1
)
startup
<-
nil
b
.
log
.
Info
(
"starting bridge processor..."
)
for
{
select
{
case
<-
done
:
b
.
log
.
Info
(
"stopping bridge processor"
)
return
nil
// Tickers
case
<-
startup
:
case
<-
l1EtlUpdates
:
b
.
tasks
.
Go
(
func
()
error
{
defer
func
()
{
if
err
:=
recover
();
err
!=
nil
{
b
.
log
.
Error
(
"halting indexer on bridge-processor panic"
,
"err"
,
err
)
debug
.
PrintStack
()
shutdown
(
fmt
.
Errorf
(
"panic: %v"
,
err
))
}
}()
for
{
select
{
case
<-
b
.
resourceCtx
.
Done
()
:
b
.
log
.
Info
(
"stopping bridge processor"
)
return
nil
// Tickers
case
<-
startup
:
case
<-
l1EtlUpdates
:
}
done
:=
b
.
metrics
.
RecordInterval
()
// TODO: why log all the errors and return the same thing, if we just return the error, and log here?
err
:=
b
.
run
()
if
err
!=
nil
{
b
.
log
.
Error
(
"bridge processor error"
,
"err"
,
err
)
}
done
(
err
)
}
})
return
nil
}
done
:=
b
.
metrics
.
RecordInterval
()
done
(
b
.
run
())
}
func
(
b
*
BridgeProcessor
)
Close
()
error
{
// signal that we can stop any ongoing work
b
.
resourceCancel
()
// await the work to stop
return
b
.
tasks
.
Wait
()
}
// Runs the processing loop. In order to ensure all seen bridge finalization events
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment