Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
1c445ccb
Commit
1c445ccb
authored
Nov 04, 2023
by
Hamdi Allam
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
indexer.bridge.processing.split
parent
314b0414
Changes
12
Show whitespace changes
Inline
Side-by-side
Showing
12 changed files
with
493 additions
and
359 deletions
+493
-359
blocks.go
indexer/database/blocks.go
+16
-105
bridge_transactions.go
indexer/database/bridge_transactions.go
+53
-14
mocks.go
indexer/database/mocks.go
+10
-6
bridge_messages_e2e_test.go
indexer/e2e_tests/bridge_messages_e2e_test.go
+4
-4
bridge_transactions_e2e_test.go
indexer/e2e_tests/bridge_transactions_e2e_test.go
+5
-5
bridge_transfers_e2e_test.go
indexer/e2e_tests/bridge_transfers_e2e_test.go
+17
-14
etl.go
indexer/etl/etl.go
+1
-2
l1_etl.go
indexer/etl/l1_etl.go
+18
-11
l2_etl.go
indexer/etl/l2_etl.go
+42
-8
indexer.go
indexer/indexer.go
+1
-1
bridge.go
indexer/processors/bridge.go
+275
-161
metrics.go
indexer/processors/bridge/metrics.go
+51
-28
No files found.
indexer/database/blocks.go
View file @
1c445ccb
...
@@ -37,6 +37,10 @@ func BlockHeaderFromHeader(header *types.Header) BlockHeader {
...
@@ -37,6 +37,10 @@ func BlockHeaderFromHeader(header *types.Header) BlockHeader {
}
}
}
}
func
(
b
BlockHeader
)
String
()
string
{
return
fmt
.
Sprintf
(
"{Hash: %s, Number: %s}"
,
b
.
Hash
,
b
.
Number
)
}
type
L1BlockHeader
struct
{
type
L1BlockHeader
struct
{
BlockHeader
`gorm:"embedded"`
BlockHeader
`gorm:"embedded"`
}
}
...
@@ -48,13 +52,13 @@ type L2BlockHeader struct {
...
@@ -48,13 +52,13 @@ type L2BlockHeader struct {
type
BlocksView
interface
{
type
BlocksView
interface
{
L1BlockHeader
(
common
.
Hash
)
(
*
L1BlockHeader
,
error
)
L1BlockHeader
(
common
.
Hash
)
(
*
L1BlockHeader
,
error
)
L1BlockHeaderWithFilter
(
BlockHeader
)
(
*
L1BlockHeader
,
error
)
L1BlockHeaderWithFilter
(
BlockHeader
)
(
*
L1BlockHeader
,
error
)
L1BlockHeaderWithScope
(
func
(
db
*
gorm
.
DB
)
*
gorm
.
DB
)
(
*
L1BlockHeader
,
error
)
L1LatestBlockHeader
()
(
*
L1BlockHeader
,
error
)
L1LatestBlockHeader
()
(
*
L1BlockHeader
,
error
)
L2BlockHeader
(
common
.
Hash
)
(
*
L2BlockHeader
,
error
)
L2BlockHeader
(
common
.
Hash
)
(
*
L2BlockHeader
,
error
)
L2BlockHeaderWithFilter
(
BlockHeader
)
(
*
L2BlockHeader
,
error
)
L2BlockHeaderWithFilter
(
BlockHeader
)
(
*
L2BlockHeader
,
error
)
L2BlockHeaderWithScope
(
func
(
db
*
gorm
.
DB
)
*
gorm
.
DB
)
(
*
L2BlockHeader
,
error
)
L2LatestBlockHeader
()
(
*
L2BlockHeader
,
error
)
L2LatestBlockHeader
()
(
*
L2BlockHeader
,
error
)
LatestObservedEpoch
(
*
big
.
Int
,
uint64
)
(
*
Epoch
,
error
)
}
}
type
BlocksDB
interface
{
type
BlocksDB
interface
{
...
@@ -94,8 +98,12 @@ func (db *blocksDB) L1BlockHeader(hash common.Hash) (*L1BlockHeader, error) {
...
@@ -94,8 +98,12 @@ func (db *blocksDB) L1BlockHeader(hash common.Hash) (*L1BlockHeader, error) {
}
}
func
(
db
*
blocksDB
)
L1BlockHeaderWithFilter
(
filter
BlockHeader
)
(
*
L1BlockHeader
,
error
)
{
func
(
db
*
blocksDB
)
L1BlockHeaderWithFilter
(
filter
BlockHeader
)
(
*
L1BlockHeader
,
error
)
{
return
db
.
L1BlockHeaderWithScope
(
func
(
gorm
*
gorm
.
DB
)
*
gorm
.
DB
{
return
gorm
.
Where
(
&
filter
)
})
}
func
(
db
*
blocksDB
)
L1BlockHeaderWithScope
(
scope
func
(
*
gorm
.
DB
)
*
gorm
.
DB
)
(
*
L1BlockHeader
,
error
)
{
var
l1Header
L1BlockHeader
var
l1Header
L1BlockHeader
result
:=
db
.
gorm
.
Where
(
&
filter
)
.
Take
(
&
l1Header
)
result
:=
db
.
gorm
.
Scopes
(
scope
)
.
Take
(
&
l1Header
)
if
result
.
Error
!=
nil
{
if
result
.
Error
!=
nil
{
if
errors
.
Is
(
result
.
Error
,
gorm
.
ErrRecordNotFound
)
{
if
errors
.
Is
(
result
.
Error
,
gorm
.
ErrRecordNotFound
)
{
return
nil
,
nil
return
nil
,
nil
...
@@ -137,8 +145,12 @@ func (db *blocksDB) L2BlockHeader(hash common.Hash) (*L2BlockHeader, error) {
...
@@ -137,8 +145,12 @@ func (db *blocksDB) L2BlockHeader(hash common.Hash) (*L2BlockHeader, error) {
}
}
func
(
db
*
blocksDB
)
L2BlockHeaderWithFilter
(
filter
BlockHeader
)
(
*
L2BlockHeader
,
error
)
{
func
(
db
*
blocksDB
)
L2BlockHeaderWithFilter
(
filter
BlockHeader
)
(
*
L2BlockHeader
,
error
)
{
return
db
.
L2BlockHeaderWithScope
(
func
(
gorm
*
gorm
.
DB
)
*
gorm
.
DB
{
return
gorm
.
Where
(
&
filter
)
})
}
func
(
db
*
blocksDB
)
L2BlockHeaderWithScope
(
scope
func
(
*
gorm
.
DB
)
*
gorm
.
DB
)
(
*
L2BlockHeader
,
error
)
{
var
l2Header
L2BlockHeader
var
l2Header
L2BlockHeader
result
:=
db
.
gorm
.
Where
(
&
filter
)
.
Take
(
&
l2Header
)
result
:=
db
.
gorm
.
Scopes
(
scope
)
.
Take
(
&
l2Header
)
if
result
.
Error
!=
nil
{
if
result
.
Error
!=
nil
{
if
errors
.
Is
(
result
.
Error
,
gorm
.
ErrRecordNotFound
)
{
if
errors
.
Is
(
result
.
Error
,
gorm
.
ErrRecordNotFound
)
{
return
nil
,
nil
return
nil
,
nil
...
@@ -161,104 +173,3 @@ func (db *blocksDB) L2LatestBlockHeader() (*L2BlockHeader, error) {
...
@@ -161,104 +173,3 @@ func (db *blocksDB) L2LatestBlockHeader() (*L2BlockHeader, error) {
return
&
l2Header
,
nil
return
&
l2Header
,
nil
}
}
// Auxiliary Methods on both L1 & L2
type
Epoch
struct
{
L1BlockHeader
L1BlockHeader
`gorm:"embedded"`
L2BlockHeader
L2BlockHeader
`gorm:"embedded"`
}
// LatestObservedEpoch return the marker for latest epoch, observed on L1 & L2, within
// the specified bounds. In other words this returns the latest indexed L1 block that has
// a corresponding indexed L2 block with a matching L1Origin (equal timestamps).
//
// If `fromL1Height` (inclusive) is not specified, the search will start from genesis and
// continue all the way to latest indexed heights if `maxL1Range == 0`.
//
// For more, see the protocol spec:
// - https://github.com/ethereum-optimism/optimism/blob/develop/specs/derivation.md
func
(
db
*
blocksDB
)
LatestObservedEpoch
(
fromL1Height
*
big
.
Int
,
maxL1Range
uint64
)
(
*
Epoch
,
error
)
{
// We use timestamps since that translates to both L1 & L2
var
fromTimestamp
,
toTimestamp
uint64
// Lower Bound (the default `fromTimestamp = l1_starting_height` (default=0) suffices genesis representation)
var
header
L1BlockHeader
if
fromL1Height
!=
nil
{
result
:=
db
.
gorm
.
Where
(
"number = ?"
,
fromL1Height
)
.
Take
(
&
header
)
if
result
.
Error
!=
nil
{
if
errors
.
Is
(
result
.
Error
,
gorm
.
ErrRecordNotFound
)
{
return
nil
,
nil
}
return
nil
,
result
.
Error
}
fromTimestamp
=
header
.
Timestamp
}
else
{
// Take the lowest indexed L1 block to compute the lower bound
result
:=
db
.
gorm
.
Order
(
"number ASC"
)
.
Take
(
&
header
)
if
result
.
Error
!=
nil
{
if
errors
.
Is
(
result
.
Error
,
gorm
.
ErrRecordNotFound
)
{
return
nil
,
nil
}
return
nil
,
result
.
Error
}
fromL1Height
=
header
.
Number
fromTimestamp
=
header
.
Timestamp
}
// Upper Bound (lowest timestamp indexed between L1/L2 bounded by `maxL1Range`)
{
l1QueryFilter
:=
fmt
.
Sprintf
(
"timestamp >= %d"
,
fromTimestamp
)
if
maxL1Range
>
0
{
maxHeight
:=
new
(
big
.
Int
)
.
Add
(
fromL1Height
,
big
.
NewInt
(
int64
(
maxL1Range
)))
l1QueryFilter
=
fmt
.
Sprintf
(
"%s AND number <= %d"
,
l1QueryFilter
,
maxHeight
)
}
// Fetch most recent header from l1_block_headers table
var
l1Header
L1BlockHeader
result
:=
db
.
gorm
.
Where
(
l1QueryFilter
)
.
Order
(
"timestamp DESC"
)
.
Take
(
&
l1Header
)
if
result
.
Error
!=
nil
{
if
errors
.
Is
(
result
.
Error
,
gorm
.
ErrRecordNotFound
)
{
return
nil
,
nil
}
return
nil
,
result
.
Error
}
toTimestamp
=
l1Header
.
Timestamp
// Fetch most recent header from l2_block_headers table
var
l2Header
L2BlockHeader
result
=
db
.
gorm
.
Where
(
"timestamp <= ?"
,
toTimestamp
)
.
Order
(
"timestamp DESC"
)
.
Take
(
&
l2Header
)
if
result
.
Error
!=
nil
{
if
errors
.
Is
(
result
.
Error
,
gorm
.
ErrRecordNotFound
)
{
return
nil
,
nil
}
return
nil
,
result
.
Error
}
if
l2Header
.
Timestamp
<
toTimestamp
{
toTimestamp
=
l2Header
.
Timestamp
}
}
// Search for the latest indexed epoch within range. This is a faster query than doing an INNER JOIN between
// l1_block_headers and l2_block_headers which requires a full table scan to compute the resulting table.
l1Query
:=
db
.
gorm
.
Table
(
"l1_block_headers"
)
.
Where
(
"timestamp >= ? AND timestamp <= ?"
,
fromTimestamp
,
toTimestamp
)
l2Query
:=
db
.
gorm
.
Table
(
"l2_block_headers"
)
.
Where
(
"timestamp >= ? AND timestamp <= ?"
,
fromTimestamp
,
toTimestamp
)
query
:=
db
.
gorm
.
Raw
(
`SELECT * FROM (?) AS l1_block_headers, (?) AS l2_block_headers
WHERE l1_block_headers.timestamp = l2_block_headers.timestamp
ORDER BY l2_block_headers.number DESC LIMIT 1`
,
l1Query
,
l2Query
)
var
epoch
Epoch
result
:=
query
.
Take
(
&
epoch
)
if
result
.
Error
!=
nil
{
if
errors
.
Is
(
result
.
Error
,
gorm
.
ErrRecordNotFound
)
{
return
nil
,
nil
}
return
nil
,
result
.
Error
}
return
&
epoch
,
nil
}
indexer/database/bridge_transactions.go
View file @
1c445ccb
...
@@ -50,9 +50,11 @@ type L2TransactionWithdrawal struct {
...
@@ -50,9 +50,11 @@ type L2TransactionWithdrawal struct {
type
BridgeTransactionsView
interface
{
type
BridgeTransactionsView
interface
{
L1TransactionDeposit
(
common
.
Hash
)
(
*
L1TransactionDeposit
,
error
)
L1TransactionDeposit
(
common
.
Hash
)
(
*
L1TransactionDeposit
,
error
)
L1LatestBlockHeader
()
(
*
L1BlockHeader
,
error
)
L1LatestBlockHeader
()
(
*
L1BlockHeader
,
error
)
L1LatestFinalizedBlockHeader
()
(
*
L1BlockHeader
,
error
)
L2TransactionWithdrawal
(
common
.
Hash
)
(
*
L2TransactionWithdrawal
,
error
)
L2TransactionWithdrawal
(
common
.
Hash
)
(
*
L2TransactionWithdrawal
,
error
)
L2LatestBlockHeader
()
(
*
L2BlockHeader
,
error
)
L2LatestBlockHeader
()
(
*
L2BlockHeader
,
error
)
L2LatestFinalizedBlockHeader
()
(
*
L2BlockHeader
,
error
)
}
}
type
BridgeTransactionsDB
interface
{
type
BridgeTransactionsDB
interface
{
...
@@ -106,23 +108,41 @@ func (db *bridgeTransactionsDB) L1TransactionDeposit(sourceHash common.Hash) (*L
...
@@ -106,23 +108,41 @@ func (db *bridgeTransactionsDB) L1TransactionDeposit(sourceHash common.Hash) (*L
}
}
func
(
db
*
bridgeTransactionsDB
)
L1LatestBlockHeader
()
(
*
L1BlockHeader
,
error
)
{
func
(
db
*
bridgeTransactionsDB
)
L1LatestBlockHeader
()
(
*
L1BlockHeader
,
error
)
{
//
Markers for an indexed bridge even
t
//
Latest Transaction Deposi
t
// L1: Latest Transaction Deposit, Latest Proven/Finalized Withdrawal
l1Query
:=
db
.
gorm
.
Table
(
"l1_transaction_deposits"
)
.
Order
(
"l1_transaction_deposits.timestamp DESC"
)
l1
DepositQuery
:=
db
.
gorm
.
Table
(
"l1_transaction_deposits"
)
.
Order
(
"l1_transaction_deposits.timestamp DESC"
)
.
Limit
(
1
)
l1
Query
=
l1Query
.
Joins
(
"INNER JOIN l1_contract_events ON l1_contract_events.guid = l1_transaction_deposits.initiated_l1_event_guid"
)
l1
DepositQuery
=
l1DepositQuery
.
Joins
(
"INNER JOIN l1_contract_events ON l1_contract_events.guid = l1_transaction_deposits.initiated_l1_event_guid
"
)
l1
Query
=
l1Query
.
Joins
(
"INNER JOIN l1_block_headers ON l1_block_headers.hash = l1_contract_events.block_hash
"
)
l1
DepositQuery
=
l1DepositQuery
.
Select
(
"l1_contract_event
s.*"
)
l1
Query
=
l1Query
.
Select
(
"l1_block_header
s.*"
)
l1ProvenQuery
:=
db
.
gorm
.
Table
(
"l2_transaction_withdrawals"
)
var
l1Header
L1BlockHeader
l1ProvenQuery
=
l1ProvenQuery
.
Joins
(
"INNER JOIN l1_contract_events ON l1_contract_events.guid = l2_transaction_withdrawals.proven_l1_event_guid"
)
result
:=
l1Query
.
Take
(
&
l1Header
)
l1ProvenQuery
=
l1ProvenQuery
.
Order
(
"l1_contract_events.timestamp DESC"
)
.
Select
(
"l1_contract_events.*"
)
.
Limit
(
1
)
if
result
.
Error
!=
nil
{
if
errors
.
Is
(
result
.
Error
,
gorm
.
ErrRecordNotFound
)
{
return
nil
,
nil
}
return
nil
,
result
.
Error
}
return
&
l1Header
,
nil
}
func
(
db
*
bridgeTransactionsDB
)
L1LatestFinalizedBlockHeader
()
(
*
L1BlockHeader
,
error
)
{
// A Proven, Finalized Event or Relayed Message
provenQuery
:=
db
.
gorm
.
Table
(
"l2_transaction_withdrawals"
)
.
Order
(
"timestamp DESC"
)
.
Limit
(
1
)
provenQuery
=
provenQuery
.
Joins
(
"INNER JOIN l1_contract_events ON l1_contract_events.guid = l2_transaction_withdrawals.proven_l1_event_guid"
)
provenQuery
=
provenQuery
.
Order
(
"l1_contract_events.timestamp DESC"
)
.
Select
(
"l1_contract_events.*"
)
l1FinalizedQuery
:=
db
.
gorm
.
Table
(
"l2_transaction_withdrawals"
)
finalizedQuery
:=
db
.
gorm
.
Table
(
"l2_transaction_withdrawals"
)
.
Order
(
"timestamp DESC"
)
.
Limit
(
1
)
l1FinalizedQuery
=
l1F
inalizedQuery
.
Joins
(
"INNER JOIN l1_contract_events ON l1_contract_events.guid = l2_transaction_withdrawals.proven_l1_event_guid"
)
finalizedQuery
=
f
inalizedQuery
.
Joins
(
"INNER JOIN l1_contract_events ON l1_contract_events.guid = l2_transaction_withdrawals.proven_l1_event_guid"
)
l1FinalizedQuery
=
l1FinalizedQuery
.
Order
(
"l1_contract_events.timestamp DESC"
)
.
Select
(
"l1_contract_events.*"
)
.
Limit
(
1
)
finalizedQuery
=
finalizedQuery
.
Select
(
"l1_contract_events.*"
)
l1Query
:=
db
.
gorm
.
Table
(
"((?) UNION (?) UNION (?)) AS latest_bridge_events"
,
l1DepositQuery
.
Limit
(
1
),
l1ProvenQuery
,
l1FinalizedQuery
)
relayedQuery
:=
db
.
gorm
.
Table
(
"l2_bridge_messages"
)
.
Order
(
"timestamp DESC"
)
l1Query
=
l1Query
.
Joins
(
"INNER JOIN l1_block_headers ON l1_block_headers.hash = latest_bridge_events.block_hash"
)
relayedQuery
=
relayedQuery
.
Joins
(
"INNER JOIN l1_contract_events ON l1_contract_events.guid = l2_bridge_messages.relayed_message_event_guid"
)
l1Query
=
l1Query
.
Order
(
"latest_bridge_events.timestamp DESC"
)
.
Select
(
"l1_block_headers.*"
)
relayedQuery
=
relayedQuery
.
Select
(
"l1_contract_events.*"
)
l1Query
:=
db
.
gorm
.
Table
(
"((?) UNION (?) UNION (?)) AS finalized_bridge_events"
,
provenQuery
,
finalizedQuery
,
relayedQuery
)
l1Query
=
l1Query
.
Joins
(
"INNER JOIN l1_block_headers ON l1_block_headers.hash = finalized_bridge_events.block_hash"
)
l1Query
=
l1Query
.
Order
(
"finalized_bridge_events.timestamp DESC"
)
.
Select
(
"l1_block_headers.*"
)
var
l1Header
L1BlockHeader
var
l1Header
L1BlockHeader
result
:=
l1Query
.
Take
(
&
l1Header
)
result
:=
l1Query
.
Take
(
&
l1Header
)
...
@@ -251,3 +271,22 @@ func (db *bridgeTransactionsDB) L2LatestBlockHeader() (*L2BlockHeader, error) {
...
@@ -251,3 +271,22 @@ func (db *bridgeTransactionsDB) L2LatestBlockHeader() (*L2BlockHeader, error) {
return
latestL2DepositHeader
,
nil
return
latestL2DepositHeader
,
nil
}
}
}
}
func
(
db
*
bridgeTransactionsDB
)
L2LatestFinalizedBlockHeader
()
(
*
L2BlockHeader
,
error
)
{
// Only a Relayed message since we dont track L1 deposit inclusion status.
relayedQuery
:=
db
.
gorm
.
Table
(
"l1_bridge_messages"
)
.
Order
(
"timestamp DESC"
)
.
Limit
(
1
)
relayedQuery
=
relayedQuery
.
Joins
(
"INNER JOIN l2_contract_events ON l2_contract_events.guid = l1_bridge_messages.relayed_message_event_guid"
)
relayedQuery
=
relayedQuery
.
Joins
(
"INNER JOIN l2_block_headers ON l2_block_headers.hash = l2_contract_events.block_hash"
)
relayedQuery
=
relayedQuery
.
Select
(
"l2_block_headers.*"
)
var
l2Header
L2BlockHeader
result
:=
relayedQuery
.
Take
(
&
l2Header
)
if
result
.
Error
!=
nil
{
if
errors
.
Is
(
result
.
Error
,
gorm
.
ErrRecordNotFound
)
{
return
nil
,
nil
}
return
nil
,
result
.
Error
}
return
&
l2Header
,
nil
}
indexer/database/mocks.go
View file @
1c445ccb
package
database
package
database
import
(
import
(
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common"
"gorm.io/gorm"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/mock"
)
)
...
@@ -27,6 +26,11 @@ func (m *MockBlocksView) L1BlockHeaderWithFilter(BlockHeader) (*L1BlockHeader, e
...
@@ -27,6 +26,11 @@ func (m *MockBlocksView) L1BlockHeaderWithFilter(BlockHeader) (*L1BlockHeader, e
return
args
.
Get
(
0
)
.
(
*
L1BlockHeader
),
args
.
Error
(
1
)
return
args
.
Get
(
0
)
.
(
*
L1BlockHeader
),
args
.
Error
(
1
)
}
}
func
(
m
*
MockBlocksView
)
L1BlockHeaderWithScope
(
func
(
*
gorm
.
DB
)
*
gorm
.
DB
)
(
*
L1BlockHeader
,
error
)
{
args
:=
m
.
Called
()
return
args
.
Get
(
0
)
.
(
*
L1BlockHeader
),
args
.
Error
(
1
)
}
func
(
m
*
MockBlocksView
)
L1LatestBlockHeader
()
(
*
L1BlockHeader
,
error
)
{
func
(
m
*
MockBlocksView
)
L1LatestBlockHeader
()
(
*
L1BlockHeader
,
error
)
{
args
:=
m
.
Called
()
args
:=
m
.
Called
()
...
@@ -48,14 +52,14 @@ func (m *MockBlocksView) L2BlockHeaderWithFilter(BlockHeader) (*L2BlockHeader, e
...
@@ -48,14 +52,14 @@ func (m *MockBlocksView) L2BlockHeaderWithFilter(BlockHeader) (*L2BlockHeader, e
return
args
.
Get
(
0
)
.
(
*
L2BlockHeader
),
args
.
Error
(
1
)
return
args
.
Get
(
0
)
.
(
*
L2BlockHeader
),
args
.
Error
(
1
)
}
}
func
(
m
*
MockBlocksView
)
L2
LatestBlockHeader
(
)
(
*
L2BlockHeader
,
error
)
{
func
(
m
*
MockBlocksView
)
L2
BlockHeaderWithScope
(
func
(
*
gorm
.
DB
)
*
gorm
.
DB
)
(
*
L2BlockHeader
,
error
)
{
args
:=
m
.
Called
()
args
:=
m
.
Called
()
return
args
.
Get
(
0
)
.
(
*
L2BlockHeader
),
args
.
Error
(
1
)
return
args
.
Get
(
0
)
.
(
*
L2BlockHeader
),
args
.
Error
(
2
)
}
}
func
(
m
*
MockBlocksView
)
L
atestObservedEpoch
(
*
big
.
Int
,
uint64
)
(
*
Epoch
,
error
)
{
func
(
m
*
MockBlocksView
)
L
2LatestBlockHeader
()
(
*
L2BlockHeader
,
error
)
{
args
:=
m
.
Called
()
args
:=
m
.
Called
()
return
args
.
Get
(
0
)
.
(
*
Epoch
),
args
.
Error
(
1
)
return
args
.
Get
(
0
)
.
(
*
L2BlockHeader
),
args
.
Error
(
1
)
}
}
type
MockBlocksDB
struct
{
type
MockBlocksDB
struct
{
...
...
indexer/e2e_tests/bridge_messages_e2e_test.go
View file @
1c445ccb
...
@@ -43,7 +43,7 @@ func TestE2EBridgeL1CrossDomainMessenger(t *testing.T) {
...
@@ -43,7 +43,7 @@ func TestE2EBridgeL1CrossDomainMessenger(t *testing.T) {
// wait for processor catchup
// wait for processor catchup
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
l1Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
te
stL1Header
l1Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
LastL1Header
return
l1Header
!=
nil
&&
l1Header
.
Number
.
Uint64
()
>=
sentMsgReceipt
.
BlockNumber
.
Uint64
(),
nil
return
l1Header
!=
nil
&&
l1Header
.
Number
.
Uint64
()
>=
sentMsgReceipt
.
BlockNumber
.
Uint64
(),
nil
}))
}))
...
@@ -77,7 +77,7 @@ func TestE2EBridgeL1CrossDomainMessenger(t *testing.T) {
...
@@ -77,7 +77,7 @@ func TestE2EBridgeL1CrossDomainMessenger(t *testing.T) {
l2DepositReceipt
,
err
:=
wait
.
ForReceiptOK
(
context
.
Background
(),
testSuite
.
L2Client
,
transaction
.
L2TransactionHash
)
l2DepositReceipt
,
err
:=
wait
.
ForReceiptOK
(
context
.
Background
(),
testSuite
.
L2Client
,
transaction
.
L2TransactionHash
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
l2Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
test
L2Header
l2Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
stFinalized
L2Header
return
l2Header
!=
nil
&&
l2Header
.
Number
.
Uint64
()
>=
l2DepositReceipt
.
BlockNumber
.
Uint64
(),
nil
return
l2Header
!=
nil
&&
l2Header
.
Number
.
Uint64
()
>=
l2DepositReceipt
.
BlockNumber
.
Uint64
(),
nil
}))
}))
...
@@ -130,7 +130,7 @@ func TestE2EBridgeL2CrossDomainMessenger(t *testing.T) {
...
@@ -130,7 +130,7 @@ func TestE2EBridgeL2CrossDomainMessenger(t *testing.T) {
// wait for processor catchup
// wait for processor catchup
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
l2Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
te
stL2Header
l2Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
LastL2Header
return
l2Header
!=
nil
&&
l2Header
.
Number
.
Uint64
()
>=
sentMsgReceipt
.
BlockNumber
.
Uint64
(),
nil
return
l2Header
!=
nil
&&
l2Header
.
Number
.
Uint64
()
>=
sentMsgReceipt
.
BlockNumber
.
Uint64
(),
nil
}))
}))
...
@@ -159,7 +159,7 @@ func TestE2EBridgeL2CrossDomainMessenger(t *testing.T) {
...
@@ -159,7 +159,7 @@ func TestE2EBridgeL2CrossDomainMessenger(t *testing.T) {
// wait for processor catchup
// wait for processor catchup
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
l1Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
test
L1Header
l1Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
stFinalized
L1Header
return
l1Header
!=
nil
&&
l1Header
.
Number
.
Uint64
()
>=
finalizedReceipt
.
BlockNumber
.
Uint64
(),
nil
return
l1Header
!=
nil
&&
l1Header
.
Number
.
Uint64
()
>=
finalizedReceipt
.
BlockNumber
.
Uint64
(),
nil
}))
}))
...
...
indexer/e2e_tests/bridge_transactions_e2e_test.go
View file @
1c445ccb
...
@@ -49,7 +49,7 @@ func TestE2EBridgeTransactionsOptimismPortalDeposits(t *testing.T) {
...
@@ -49,7 +49,7 @@ func TestE2EBridgeTransactionsOptimismPortalDeposits(t *testing.T) {
// wait for processor catchup
// wait for processor catchup
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
l1Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
te
stL1Header
l1Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
LastL1Header
return
l1Header
!=
nil
&&
l1Header
.
Number
.
Uint64
()
>=
depositReceipt
.
BlockNumber
.
Uint64
(),
nil
return
l1Header
!=
nil
&&
l1Header
.
Number
.
Uint64
()
>=
depositReceipt
.
BlockNumber
.
Uint64
(),
nil
}))
}))
...
@@ -104,7 +104,7 @@ func TestE2EBridgeTransactionsL2ToL1MessagePasserWithdrawal(t *testing.T) {
...
@@ -104,7 +104,7 @@ func TestE2EBridgeTransactionsL2ToL1MessagePasserWithdrawal(t *testing.T) {
// wait for processor catchup
// wait for processor catchup
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
l2Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
te
stL2Header
l2Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
LastL2Header
return
l2Header
!=
nil
&&
l2Header
.
Number
.
Uint64
()
>=
withdrawReceipt
.
BlockNumber
.
Uint64
(),
nil
return
l2Header
!=
nil
&&
l2Header
.
Number
.
Uint64
()
>=
withdrawReceipt
.
BlockNumber
.
Uint64
(),
nil
}))
}))
...
@@ -134,7 +134,7 @@ func TestE2EBridgeTransactionsL2ToL1MessagePasserWithdrawal(t *testing.T) {
...
@@ -134,7 +134,7 @@ func TestE2EBridgeTransactionsL2ToL1MessagePasserWithdrawal(t *testing.T) {
withdrawParams
,
proveReceipt
:=
op_e2e
.
ProveWithdrawal
(
t
,
*
testSuite
.
OpCfg
,
testSuite
.
L1Client
,
testSuite
.
OpSys
.
EthInstances
[
"sequencer"
],
testSuite
.
OpCfg
.
Secrets
.
Alice
,
withdrawReceipt
)
withdrawParams
,
proveReceipt
:=
op_e2e
.
ProveWithdrawal
(
t
,
*
testSuite
.
OpCfg
,
testSuite
.
L1Client
,
testSuite
.
OpSys
.
EthInstances
[
"sequencer"
],
testSuite
.
OpCfg
.
Secrets
.
Alice
,
withdrawReceipt
)
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
l1Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
test
L1Header
l1Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
stFinalized
L1Header
return
l1Header
!=
nil
&&
l1Header
.
Number
.
Uint64
()
>=
proveReceipt
.
BlockNumber
.
Uint64
(),
nil
return
l1Header
!=
nil
&&
l1Header
.
Number
.
Uint64
()
>=
proveReceipt
.
BlockNumber
.
Uint64
(),
nil
}))
}))
...
@@ -152,7 +152,7 @@ func TestE2EBridgeTransactionsL2ToL1MessagePasserWithdrawal(t *testing.T) {
...
@@ -152,7 +152,7 @@ func TestE2EBridgeTransactionsL2ToL1MessagePasserWithdrawal(t *testing.T) {
finalizeReceipt
:=
op_e2e
.
FinalizeWithdrawal
(
t
,
*
testSuite
.
OpCfg
,
testSuite
.
L1Client
,
testSuite
.
OpCfg
.
Secrets
.
Alice
,
proveReceipt
,
withdrawParams
)
finalizeReceipt
:=
op_e2e
.
FinalizeWithdrawal
(
t
,
*
testSuite
.
OpCfg
,
testSuite
.
L1Client
,
testSuite
.
OpCfg
.
Secrets
.
Alice
,
proveReceipt
,
withdrawParams
)
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
l1Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
test
L1Header
l1Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
stFinalized
L1Header
return
l1Header
!=
nil
&&
l1Header
.
Number
.
Uint64
()
>=
finalizeReceipt
.
BlockNumber
.
Uint64
(),
nil
return
l1Header
!=
nil
&&
l1Header
.
Number
.
Uint64
()
>=
finalizeReceipt
.
BlockNumber
.
Uint64
(),
nil
}))
}))
...
@@ -194,7 +194,7 @@ func TestE2EBridgeTransactionsL2ToL1MessagePasserFailedWithdrawal(t *testing.T)
...
@@ -194,7 +194,7 @@ func TestE2EBridgeTransactionsL2ToL1MessagePasserFailedWithdrawal(t *testing.T)
// Prove&Finalize withdrawal
// Prove&Finalize withdrawal
_
,
finalizeReceipt
:=
op_e2e
.
ProveAndFinalizeWithdrawal
(
t
,
*
testSuite
.
OpCfg
,
testSuite
.
L1Client
,
testSuite
.
OpSys
.
EthInstances
[
"sequencer"
],
testSuite
.
OpCfg
.
Secrets
.
Alice
,
withdrawReceipt
)
_
,
finalizeReceipt
:=
op_e2e
.
ProveAndFinalizeWithdrawal
(
t
,
*
testSuite
.
OpCfg
,
testSuite
.
L1Client
,
testSuite
.
OpSys
.
EthInstances
[
"sequencer"
],
testSuite
.
OpCfg
.
Secrets
.
Alice
,
withdrawReceipt
)
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
l1Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
test
L1Header
l1Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
stFinalized
L1Header
return
l1Header
!=
nil
&&
l1Header
.
Number
.
Uint64
()
>=
finalizeReceipt
.
BlockNumber
.
Uint64
(),
nil
return
l1Header
!=
nil
&&
l1Header
.
Number
.
Uint64
()
>=
finalizeReceipt
.
BlockNumber
.
Uint64
(),
nil
}))
}))
...
...
indexer/e2e_tests/bridge_transfers_e2e_test.go
View file @
1c445ccb
...
@@ -48,7 +48,7 @@ func TestE2EBridgeTransfersStandardBridgeETHDeposit(t *testing.T) {
...
@@ -48,7 +48,7 @@ func TestE2EBridgeTransfersStandardBridgeETHDeposit(t *testing.T) {
// wait for processor catchup
// wait for processor catchup
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
l1Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
te
stL1Header
l1Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
LastL1Header
return
l1Header
!=
nil
&&
l1Header
.
Number
.
Uint64
()
>=
depositReceipt
.
BlockNumber
.
Uint64
(),
nil
return
l1Header
!=
nil
&&
l1Header
.
Number
.
Uint64
()
>=
depositReceipt
.
BlockNumber
.
Uint64
(),
nil
}))
}))
...
@@ -82,7 +82,7 @@ func TestE2EBridgeTransfersStandardBridgeETHDeposit(t *testing.T) {
...
@@ -82,7 +82,7 @@ func TestE2EBridgeTransfersStandardBridgeETHDeposit(t *testing.T) {
l2DepositReceipt
,
err
:=
wait
.
ForReceiptOK
(
context
.
Background
(),
testSuite
.
L2Client
,
types
.
NewTx
(
depositInfo
.
DepositTx
)
.
Hash
())
l2DepositReceipt
,
err
:=
wait
.
ForReceiptOK
(
context
.
Background
(),
testSuite
.
L2Client
,
types
.
NewTx
(
depositInfo
.
DepositTx
)
.
Hash
())
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
l2Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
test
L2Header
l2Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
stFinalized
L2Header
return
l2Header
!=
nil
&&
l2Header
.
Number
.
Uint64
()
>=
l2DepositReceipt
.
BlockNumber
.
Uint64
(),
nil
return
l2Header
!=
nil
&&
l2Header
.
Number
.
Uint64
()
>=
l2DepositReceipt
.
BlockNumber
.
Uint64
(),
nil
}))
}))
...
@@ -115,7 +115,7 @@ func TestE2EBridgeTransfersOptimismPortalETHReceive(t *testing.T) {
...
@@ -115,7 +115,7 @@ func TestE2EBridgeTransfersOptimismPortalETHReceive(t *testing.T) {
// wait for processor catchup
// wait for processor catchup
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
l1Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
te
stL1Header
l1Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
LastL1Header
return
l1Header
!=
nil
&&
l1Header
.
Number
.
Uint64
()
>=
portalDepositReceipt
.
BlockNumber
.
Uint64
(),
nil
return
l1Header
!=
nil
&&
l1Header
.
Number
.
Uint64
()
>=
portalDepositReceipt
.
BlockNumber
.
Uint64
(),
nil
}))
}))
...
@@ -141,7 +141,7 @@ func TestE2EBridgeTransfersOptimismPortalETHReceive(t *testing.T) {
...
@@ -141,7 +141,7 @@ func TestE2EBridgeTransfersOptimismPortalETHReceive(t *testing.T) {
l2DepositReceipt
,
err
:=
wait
.
ForReceiptOK
(
context
.
Background
(),
testSuite
.
L2Client
,
types
.
NewTx
(
depositInfo
.
DepositTx
)
.
Hash
())
l2DepositReceipt
,
err
:=
wait
.
ForReceiptOK
(
context
.
Background
(),
testSuite
.
L2Client
,
types
.
NewTx
(
depositInfo
.
DepositTx
)
.
Hash
())
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
l2Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
test
L2Header
l2Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
stFinalized
L2Header
return
l2Header
!=
nil
&&
l2Header
.
Number
.
Uint64
()
>=
l2DepositReceipt
.
BlockNumber
.
Uint64
(),
nil
return
l2Header
!=
nil
&&
l2Header
.
Number
.
Uint64
()
>=
l2DepositReceipt
.
BlockNumber
.
Uint64
(),
nil
}))
}))
...
@@ -183,7 +183,7 @@ func TestE2EBridgeTransfersCursoredDeposits(t *testing.T) {
...
@@ -183,7 +183,7 @@ func TestE2EBridgeTransfersCursoredDeposits(t *testing.T) {
// wait for processor catchup of the latest tx
// wait for processor catchup of the latest tx
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
l1Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
te
stL1Header
l1Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
LastL1Header
return
l1Header
!=
nil
&&
l1Header
.
Number
.
Uint64
()
>=
depositReceipts
[
2
]
.
BlockNumber
.
Uint64
(),
nil
return
l1Header
!=
nil
&&
l1Header
.
Number
.
Uint64
()
>=
depositReceipts
[
2
]
.
BlockNumber
.
Uint64
(),
nil
}))
}))
...
@@ -251,7 +251,7 @@ func TestE2EBridgeTransfersStandardBridgeETHWithdrawal(t *testing.T) {
...
@@ -251,7 +251,7 @@ func TestE2EBridgeTransfersStandardBridgeETHWithdrawal(t *testing.T) {
// wait for processor catchup
// wait for processor catchup
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
l2Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
te
stL2Header
l2Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
LastL2Header
return
l2Header
!=
nil
&&
l2Header
.
Number
.
Uint64
()
>=
withdrawReceipt
.
BlockNumber
.
Uint64
(),
nil
return
l2Header
!=
nil
&&
l2Header
.
Number
.
Uint64
()
>=
withdrawReceipt
.
BlockNumber
.
Uint64
(),
nil
}))
}))
...
@@ -289,7 +289,7 @@ func TestE2EBridgeTransfersStandardBridgeETHWithdrawal(t *testing.T) {
...
@@ -289,7 +289,7 @@ func TestE2EBridgeTransfersStandardBridgeETHWithdrawal(t *testing.T) {
// wait for processor catchup
// wait for processor catchup
proveReceipt
,
finalizeReceipt
:=
op_e2e
.
ProveAndFinalizeWithdrawal
(
t
,
*
testSuite
.
OpCfg
,
testSuite
.
L1Client
,
testSuite
.
OpSys
.
EthInstances
[
"sequencer"
],
testSuite
.
OpCfg
.
Secrets
.
Alice
,
withdrawReceipt
)
proveReceipt
,
finalizeReceipt
:=
op_e2e
.
ProveAndFinalizeWithdrawal
(
t
,
*
testSuite
.
OpCfg
,
testSuite
.
L1Client
,
testSuite
.
OpSys
.
EthInstances
[
"sequencer"
],
testSuite
.
OpCfg
.
Secrets
.
Alice
,
withdrawReceipt
)
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
l1Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
test
L1Header
l1Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
stFinalized
L1Header
return
l1Header
!=
nil
&&
l1Header
.
Number
.
Uint64
()
>=
finalizeReceipt
.
BlockNumber
.
Uint64
(),
nil
return
l1Header
!=
nil
&&
l1Header
.
Number
.
Uint64
()
>=
finalizeReceipt
.
BlockNumber
.
Uint64
(),
nil
}))
}))
...
@@ -335,7 +335,7 @@ func TestE2EBridgeTransfersL2ToL1MessagePasserETHReceive(t *testing.T) {
...
@@ -335,7 +335,7 @@ func TestE2EBridgeTransfersL2ToL1MessagePasserETHReceive(t *testing.T) {
// wait for processor catchup
// wait for processor catchup
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
l2Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
te
stL2Header
l2Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
LastL2Header
return
l2Header
!=
nil
&&
l2Header
.
Number
.
Uint64
()
>=
l2ToL1WithdrawReceipt
.
BlockNumber
.
Uint64
(),
nil
return
l2Header
!=
nil
&&
l2Header
.
Number
.
Uint64
()
>=
l2ToL1WithdrawReceipt
.
BlockNumber
.
Uint64
(),
nil
}))
}))
...
@@ -368,7 +368,7 @@ func TestE2EBridgeTransfersL2ToL1MessagePasserETHReceive(t *testing.T) {
...
@@ -368,7 +368,7 @@ func TestE2EBridgeTransfersL2ToL1MessagePasserETHReceive(t *testing.T) {
// wait for processor catchup
// wait for processor catchup
proveReceipt
,
finalizeReceipt
:=
op_e2e
.
ProveAndFinalizeWithdrawal
(
t
,
*
testSuite
.
OpCfg
,
testSuite
.
L1Client
,
testSuite
.
OpSys
.
EthInstances
[
"sequencer"
],
testSuite
.
OpCfg
.
Secrets
.
Alice
,
l2ToL1WithdrawReceipt
)
proveReceipt
,
finalizeReceipt
:=
op_e2e
.
ProveAndFinalizeWithdrawal
(
t
,
*
testSuite
.
OpCfg
,
testSuite
.
L1Client
,
testSuite
.
OpSys
.
EthInstances
[
"sequencer"
],
testSuite
.
OpCfg
.
Secrets
.
Alice
,
l2ToL1WithdrawReceipt
)
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
l1Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
test
L1Header
l1Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
stFinalized
L1Header
return
l1Header
!=
nil
&&
l1Header
.
Number
.
Uint64
()
>=
finalizeReceipt
.
BlockNumber
.
Uint64
(),
nil
return
l1Header
!=
nil
&&
l1Header
.
Number
.
Uint64
()
>=
finalizeReceipt
.
BlockNumber
.
Uint64
(),
nil
}))
}))
...
@@ -411,7 +411,7 @@ func TestE2EBridgeTransfersCursoredWithdrawals(t *testing.T) {
...
@@ -411,7 +411,7 @@ func TestE2EBridgeTransfersCursoredWithdrawals(t *testing.T) {
// wait for processor catchup of the latest tx
// wait for processor catchup of the latest tx
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
l2Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
La
te
stL2Header
l2Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
LastL2Header
return
l2Header
!=
nil
&&
l2Header
.
Number
.
Uint64
()
>=
withdrawReceipts
[
2
]
.
BlockNumber
.
Uint64
(),
nil
return
l2Header
!=
nil
&&
l2Header
.
Number
.
Uint64
()
>=
withdrawReceipts
[
2
]
.
BlockNumber
.
Uint64
(),
nil
}))
}))
...
@@ -497,7 +497,7 @@ func TestClientBridgeFunctions(t *testing.T) {
...
@@ -497,7 +497,7 @@ func TestClientBridgeFunctions(t *testing.T) {
l1Opts
.
Value
=
l2Opts
.
Value
l1Opts
.
Value
=
l2Opts
.
Value
depositTx
,
err
:=
optimismPortal
.
Receive
(
l1Opts
)
depositTx
,
err
:=
optimismPortal
.
Receive
(
l1Opts
)
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
_
,
err
=
wait
.
ForReceiptOK
(
context
.
Background
(),
testSuite
.
L1Client
,
depositTx
.
Hash
())
depositReceipt
,
err
:
=
wait
.
ForReceiptOK
(
context
.
Background
(),
testSuite
.
L1Client
,
depositTx
.
Hash
())
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
mintSum
=
new
(
big
.
Int
)
.
Add
(
mintSum
,
depositTx
.
Value
())
mintSum
=
new
(
big
.
Int
)
.
Add
(
mintSum
,
depositTx
.
Value
())
...
@@ -508,10 +508,13 @@ func TestClientBridgeFunctions(t *testing.T) {
...
@@ -508,10 +508,13 @@ func TestClientBridgeFunctions(t *testing.T) {
l2ToL1WithdrawReceipt
,
err
:=
wait
.
ForReceiptOK
(
context
.
Background
(),
testSuite
.
L2Client
,
l2ToL1MessagePasserWithdrawTx
.
Hash
())
l2ToL1WithdrawReceipt
,
err
:=
wait
.
ForReceiptOK
(
context
.
Background
(),
testSuite
.
L2Client
,
l2ToL1MessagePasserWithdrawTx
.
Hash
())
require
.
NoError
(
t
,
err
)
require
.
NoError
(
t
,
err
)
// (3.c) wait for indexer processor to catchup with the L
2 block containing the
withdrawal tx
// (3.c) wait for indexer processor to catchup with the L
1 & L2 block containing the deposit &
withdrawal tx
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
require
.
NoError
(
t
,
wait
.
For
(
context
.
Background
(),
500
*
time
.
Millisecond
,
func
()
(
bool
,
error
)
{
l2Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
LatestL2Header
l1Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
LastL1Header
return
l2Header
!=
nil
&&
l2Header
.
Number
.
Uint64
()
>=
l2ToL1WithdrawReceipt
.
BlockNumber
.
Uint64
(),
nil
l2Header
:=
testSuite
.
Indexer
.
BridgeProcessor
.
LastL2Header
seenL2
:=
l2Header
!=
nil
&&
l2Header
.
Number
.
Uint64
()
>=
l2ToL1WithdrawReceipt
.
BlockNumber
.
Uint64
()
seenL1
:=
l1Header
!=
nil
&&
l1Header
.
Number
.
Uint64
()
>=
depositReceipt
.
BlockNumber
.
Uint64
()
return
seenL1
&&
seenL2
,
nil
}))
}))
withdrawSum
=
new
(
big
.
Int
)
.
Add
(
withdrawSum
,
l2ToL1MessagePasserWithdrawTx
.
Value
())
withdrawSum
=
new
(
big
.
Int
)
.
Add
(
withdrawSum
,
l2ToL1MessagePasserWithdrawTx
.
Value
())
...
...
indexer/etl/etl.go
View file @
1c445ccb
...
@@ -59,10 +59,9 @@ func (etl *ETL) Start() error {
...
@@ -59,10 +59,9 @@ func (etl *ETL) Start() error {
if
etl
.
worker
!=
nil
{
if
etl
.
worker
!=
nil
{
return
errors
.
New
(
"already started"
)
return
errors
.
New
(
"already started"
)
}
}
etl
.
log
.
Info
(
"starting etl..."
)
etl
.
worker
=
clock
.
NewLoopFn
(
clock
.
SystemClock
,
etl
.
tick
,
func
()
error
{
etl
.
worker
=
clock
.
NewLoopFn
(
clock
.
SystemClock
,
etl
.
tick
,
func
()
error
{
etl
.
log
.
Info
(
"shutting down batch producer"
)
close
(
etl
.
etlBatches
)
// can close the channel now, to signal to the consumer that we're done
close
(
etl
.
etlBatches
)
// can close the channel now, to signal to the consumer that we're done
etl
.
log
.
Info
(
"stopped etl worker loop"
)
return
nil
return
nil
},
etl
.
loopInterval
)
},
etl
.
loopInterval
)
return
nil
return
nil
...
...
indexer/etl/l1_etl.go
View file @
1c445ccb
...
@@ -21,6 +21,7 @@ import (
...
@@ -21,6 +21,7 @@ import (
type
L1ETL
struct
{
type
L1ETL
struct
{
ETL
ETL
LatestHeader
*
types
.
Header
// the batch handler may do work that we can interrupt on shutdown
// the batch handler may do work that we can interrupt on shutdown
resourceCtx
context
.
Context
resourceCtx
context
.
Context
...
@@ -31,7 +32,6 @@ type L1ETL struct {
...
@@ -31,7 +32,6 @@ type L1ETL struct {
db
*
database
.
DB
db
*
database
.
DB
mu
sync
.
Mutex
mu
sync
.
Mutex
listeners
[]
chan
interface
{}
listeners
[]
chan
interface
{}
}
}
...
@@ -102,6 +102,8 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
...
@@ -102,6 +102,8 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
resCtx
,
resCancel
:=
context
.
WithCancel
(
context
.
Background
())
resCtx
,
resCancel
:=
context
.
WithCancel
(
context
.
Background
())
return
&
L1ETL
{
return
&
L1ETL
{
ETL
:
etl
,
ETL
:
etl
,
LatestHeader
:
fromHeader
,
db
:
db
,
db
:
db
,
resourceCtx
:
resCtx
,
resourceCtx
:
resCtx
,
resourceCancel
:
resCancel
,
resourceCancel
:
resCancel
,
...
@@ -123,32 +125,35 @@ func (l1Etl *L1ETL) Close() error {
...
@@ -123,32 +125,35 @@ func (l1Etl *L1ETL) Close() error {
if
err
:=
l1Etl
.
tasks
.
Wait
();
err
!=
nil
{
if
err
:=
l1Etl
.
tasks
.
Wait
();
err
!=
nil
{
result
=
errors
.
Join
(
result
,
fmt
.
Errorf
(
"failed to await batch handler completion: %w"
,
err
))
result
=
errors
.
Join
(
result
,
fmt
.
Errorf
(
"failed to await batch handler completion: %w"
,
err
))
}
}
// close listeners
for
i
:=
range
l1Etl
.
listeners
{
close
(
l1Etl
.
listeners
[
i
])
}
return
result
return
result
}
}
func
(
l1Etl
*
L1ETL
)
Start
()
error
{
func
(
l1Etl
*
L1ETL
)
Start
()
error
{
l1Etl
.
log
.
Info
(
"starting etl..."
)
// start ETL batch producer
// start ETL batch producer
if
err
:=
l1Etl
.
ETL
.
Start
();
err
!=
nil
{
if
err
:=
l1Etl
.
ETL
.
Start
();
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to start internal ETL: %w"
,
err
)
return
fmt
.
Errorf
(
"failed to start internal ETL: %w"
,
err
)
}
}
// start ETL batch consumer
// start ETL batch consumer
l1Etl
.
tasks
.
Go
(
func
()
error
{
l1Etl
.
tasks
.
Go
(
func
()
error
{
for
{
for
batch
:=
range
l1Etl
.
etlBatches
{
// Index incoming batches (only L1 blocks that have an emitted log)
batch
,
ok
:=
<-
l1Etl
.
etlBatches
if
!
ok
{
l1Etl
.
log
.
Info
(
"No more batches, shutting down L1 batch handler"
)
return
nil
}
if
err
:=
l1Etl
.
handleBatch
(
batch
);
err
!=
nil
{
if
err
:=
l1Etl
.
handleBatch
(
batch
);
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to handle batch, stopping L2 ETL: %w"
,
err
)
return
fmt
.
Errorf
(
"failed to handle batch, stopping L2 ETL: %w"
,
err
)
}
}
}
}
l1Etl
.
log
.
Info
(
"no more batches, shutting down batch handler"
)
return
nil
})
})
return
nil
return
nil
}
}
func
(
l1Etl
*
L1ETL
)
handleBatch
(
batch
*
ETLBatch
)
error
{
func
(
l1Etl
*
L1ETL
)
handleBatch
(
batch
*
ETLBatch
)
error
{
// Index incoming batches (only L1 blocks that have an emitted log)
l1BlockHeaders
:=
make
([]
database
.
L1BlockHeader
,
0
,
len
(
batch
.
Headers
))
l1BlockHeaders
:=
make
([]
database
.
L1BlockHeader
,
0
,
len
(
batch
.
Headers
))
for
i
:=
range
batch
.
Headers
{
for
i
:=
range
batch
.
Headers
{
if
_
,
ok
:=
batch
.
HeadersWithLog
[
batch
.
Headers
[
i
]
.
Hash
()];
ok
{
if
_
,
ok
:=
batch
.
HeadersWithLog
[
batch
.
Headers
[
i
]
.
Hash
()];
ok
{
...
@@ -195,9 +200,11 @@ func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error {
...
@@ -195,9 +200,11 @@ func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error {
}
}
batch
.
Logger
.
Info
(
"indexed batch"
)
batch
.
Logger
.
Info
(
"indexed batch"
)
l1Etl
.
LatestHeader
=
&
batch
.
Headers
[
len
(
batch
.
Headers
)
-
1
]
// Notify Listeners
// Notify Listeners
l1Etl
.
mu
.
Lock
()
l1Etl
.
mu
.
Lock
()
defer
l1Etl
.
mu
.
Unlock
()
for
i
:=
range
l1Etl
.
listeners
{
for
i
:=
range
l1Etl
.
listeners
{
select
{
select
{
case
l1Etl
.
listeners
[
i
]
<-
struct
{}{}
:
case
l1Etl
.
listeners
[
i
]
<-
struct
{}{}
:
...
@@ -206,7 +213,7 @@ func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error {
...
@@ -206,7 +213,7 @@ func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error {
// up the previous notif
// up the previous notif
}
}
}
}
l1Etl
.
mu
.
Unlock
()
return
nil
return
nil
}
}
...
...
indexer/etl/l2_etl.go
View file @
1c445ccb
...
@@ -4,6 +4,7 @@ import (
...
@@ -4,6 +4,7 @@ import (
"context"
"context"
"errors"
"errors"
"fmt"
"fmt"
"sync"
"time"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common"
...
@@ -19,6 +20,7 @@ import (
...
@@ -19,6 +20,7 @@ import (
type
L2ETL
struct
{
type
L2ETL
struct
{
ETL
ETL
LatestHeader
*
types
.
Header
// the batch handler may do work that we can interrupt on shutdown
// the batch handler may do work that we can interrupt on shutdown
resourceCtx
context
.
Context
resourceCtx
context
.
Context
...
@@ -27,6 +29,9 @@ type L2ETL struct {
...
@@ -27,6 +29,9 @@ type L2ETL struct {
tasks
tasks
.
Group
tasks
tasks
.
Group
db
*
database
.
DB
db
*
database
.
DB
mu
sync
.
Mutex
listeners
[]
chan
interface
{}
}
}
func
NewL2ETL
(
cfg
Config
,
log
log
.
Logger
,
db
*
database
.
DB
,
metrics
Metricer
,
client
node
.
EthClient
,
func
NewL2ETL
(
cfg
Config
,
log
log
.
Logger
,
db
*
database
.
DB
,
metrics
Metricer
,
client
node
.
EthClient
,
...
@@ -81,6 +86,8 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
...
@@ -81,6 +86,8 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
resCtx
,
resCancel
:=
context
.
WithCancel
(
context
.
Background
())
resCtx
,
resCancel
:=
context
.
WithCancel
(
context
.
Background
())
return
&
L2ETL
{
return
&
L2ETL
{
ETL
:
etl
,
ETL
:
etl
,
LatestHeader
:
fromHeader
,
resourceCtx
:
resCtx
,
resourceCtx
:
resCtx
,
resourceCancel
:
resCancel
,
resourceCancel
:
resCancel
,
db
:
db
,
db
:
db
,
...
@@ -102,10 +109,16 @@ func (l2Etl *L2ETL) Close() error {
...
@@ -102,10 +109,16 @@ func (l2Etl *L2ETL) Close() error {
if
err
:=
l2Etl
.
tasks
.
Wait
();
err
!=
nil
{
if
err
:=
l2Etl
.
tasks
.
Wait
();
err
!=
nil
{
result
=
errors
.
Join
(
result
,
fmt
.
Errorf
(
"failed to await batch handler completion: %w"
,
err
))
result
=
errors
.
Join
(
result
,
fmt
.
Errorf
(
"failed to await batch handler completion: %w"
,
err
))
}
}
// close listeners
for
i
:=
range
l2Etl
.
listeners
{
close
(
l2Etl
.
listeners
[
i
])
}
return
result
return
result
}
}
func
(
l2Etl
*
L2ETL
)
Start
()
error
{
func
(
l2Etl
*
L2ETL
)
Start
()
error
{
l2Etl
.
log
.
Info
(
"starting etl..."
)
// start ETL batch producer
// start ETL batch producer
if
err
:=
l2Etl
.
ETL
.
Start
();
err
!=
nil
{
if
err
:=
l2Etl
.
ETL
.
Start
();
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to start internal ETL: %w"
,
err
)
return
fmt
.
Errorf
(
"failed to start internal ETL: %w"
,
err
)
...
@@ -113,17 +126,13 @@ func (l2Etl *L2ETL) Start() error {
...
@@ -113,17 +126,13 @@ func (l2Etl *L2ETL) Start() error {
// start ETL batch consumer
// start ETL batch consumer
l2Etl
.
tasks
.
Go
(
func
()
error
{
l2Etl
.
tasks
.
Go
(
func
()
error
{
for
{
for
batch
:=
range
l2Etl
.
etlBatches
{
// Index incoming batches (all L2 blocks)
batch
,
ok
:=
<-
l2Etl
.
etlBatches
if
!
ok
{
l2Etl
.
log
.
Info
(
"No more batches, shutting down L2 batch handler"
)
return
nil
}
if
err
:=
l2Etl
.
handleBatch
(
batch
);
err
!=
nil
{
if
err
:=
l2Etl
.
handleBatch
(
batch
);
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to handle batch, stopping L2 ETL: %w"
,
err
)
return
fmt
.
Errorf
(
"failed to handle batch, stopping L2 ETL: %w"
,
err
)
}
}
}
}
l2Etl
.
log
.
Info
(
"no more batches, shutting down batch handler"
)
return
nil
})
})
return
nil
return
nil
}
}
...
@@ -169,5 +178,30 @@ func (l2Etl *L2ETL) handleBatch(batch *ETLBatch) error {
...
@@ -169,5 +178,30 @@ func (l2Etl *L2ETL) handleBatch(batch *ETLBatch) error {
}
}
batch
.
Logger
.
Info
(
"indexed batch"
)
batch
.
Logger
.
Info
(
"indexed batch"
)
l2Etl
.
LatestHeader
=
&
batch
.
Headers
[
len
(
batch
.
Headers
)
-
1
]
// Notify Listeners
l2Etl
.
mu
.
Lock
()
defer
l2Etl
.
mu
.
Unlock
()
for
i
:=
range
l2Etl
.
listeners
{
select
{
case
l2Etl
.
listeners
[
i
]
<-
struct
{}{}
:
default
:
// do nothing if the listener hasn't picked
// up the previous notif
}
}
return
nil
return
nil
}
}
// Notify returns a channel that'll receive a value every time new data has
// been persisted by the L2ETL
func
(
l2Etl
*
L2ETL
)
Notify
()
<-
chan
interface
{}
{
receiver
:=
make
(
chan
interface
{})
l2Etl
.
mu
.
Lock
()
defer
l2Etl
.
mu
.
Unlock
()
l2Etl
.
listeners
=
append
(
l2Etl
.
listeners
,
receiver
)
return
receiver
}
indexer/indexer.go
View file @
1c445ccb
...
@@ -223,7 +223,7 @@ func (ix *Indexer) initL2ETL(chainConfig config.ChainConfig) error {
...
@@ -223,7 +223,7 @@ func (ix *Indexer) initL2ETL(chainConfig config.ChainConfig) error {
func
(
ix
*
Indexer
)
initBridgeProcessor
(
chainConfig
config
.
ChainConfig
)
error
{
func
(
ix
*
Indexer
)
initBridgeProcessor
(
chainConfig
config
.
ChainConfig
)
error
{
bridgeProcessor
,
err
:=
processors
.
NewBridgeProcessor
(
bridgeProcessor
,
err
:=
processors
.
NewBridgeProcessor
(
ix
.
log
,
ix
.
DB
,
bridge
.
NewMetrics
(
ix
.
metricsRegistry
),
ix
.
L1ETL
,
chainConfig
,
ix
.
shutdown
)
ix
.
log
,
ix
.
DB
,
bridge
.
NewMetrics
(
ix
.
metricsRegistry
),
ix
.
L1ETL
,
ix
.
L2ETL
,
chainConfig
,
ix
.
shutdown
)
if
err
!=
nil
{
if
err
!=
nil
{
return
err
return
err
}
}
...
...
indexer/processors/bridge.go
View file @
1c445ccb
...
@@ -6,7 +6,8 @@ import (
...
@@ -6,7 +6,8 @@ import (
"fmt"
"fmt"
"math/big"
"math/big"
"github.com/ethereum/go-ethereum/core/types"
"gorm.io/gorm"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/bigint"
"github.com/ethereum-optimism/optimism/indexer/bigint"
...
@@ -17,6 +18,8 @@ import (
...
@@ -17,6 +18,8 @@ import (
"github.com/ethereum-optimism/optimism/op-service/tasks"
"github.com/ethereum-optimism/optimism/op-service/tasks"
)
)
var
blocksLimit
=
10
_000
type
BridgeProcessor
struct
{
type
BridgeProcessor
struct
{
log
log
.
Logger
log
log
.
Logger
db
*
database
.
DB
db
*
database
.
DB
...
@@ -27,13 +30,17 @@ type BridgeProcessor struct {
...
@@ -27,13 +30,17 @@ type BridgeProcessor struct {
tasks
tasks
.
Group
tasks
tasks
.
Group
l1Etl
*
etl
.
L1ETL
l1Etl
*
etl
.
L1ETL
l2Etl
*
etl
.
L2ETL
chainConfig
config
.
ChainConfig
chainConfig
config
.
ChainConfig
LatestL1Header
*
types
.
Header
LastL1Header
*
database
.
L1BlockHeader
LatestL2Header
*
types
.
Header
LastL2Header
*
database
.
L2BlockHeader
LastFinalizedL1Header
*
database
.
L1BlockHeader
LastFinalizedL2Header
*
database
.
L2BlockHeader
}
}
func
NewBridgeProcessor
(
log
log
.
Logger
,
db
*
database
.
DB
,
metrics
bridge
.
Metricer
,
l1Etl
*
etl
.
L1ETL
,
func
NewBridgeProcessor
(
log
log
.
Logger
,
db
*
database
.
DB
,
metrics
bridge
.
Metricer
,
l1Etl
*
etl
.
L1ETL
,
l2Etl
*
etl
.
L2ETL
,
chainConfig
config
.
ChainConfig
,
shutdown
context
.
CancelCauseFunc
)
(
*
BridgeProcessor
,
error
)
{
chainConfig
config
.
ChainConfig
,
shutdown
context
.
CancelCauseFunc
)
(
*
BridgeProcessor
,
error
)
{
log
=
log
.
New
(
"processor"
,
"bridge"
)
log
=
log
.
New
(
"processor"
,
"bridge"
)
...
@@ -46,35 +53,33 @@ func NewBridgeProcessor(log log.Logger, db *database.DB, metrics bridge.Metricer
...
@@ -46,35 +53,33 @@ func NewBridgeProcessor(log log.Logger, db *database.DB, metrics bridge.Metricer
return
nil
,
err
return
nil
,
err
}
}
var
l1Header
,
l2Header
*
types
.
Header
latestFinalizedL1Header
,
err
:=
db
.
BridgeTransactions
.
L1LatestFinalizedBlockHeader
()
if
latestL1Header
==
nil
&&
latestL2Header
==
nil
{
if
err
!=
nil
{
log
.
Info
(
"no indexed state, starting from rollup genesis"
)
return
nil
,
err
}
else
{
l1Height
,
l2Height
:=
bigint
.
Zero
,
bigint
.
Zero
if
latestL1Header
!=
nil
{
l1Height
=
latestL1Header
.
Number
l1Header
=
latestL1Header
.
RLPHeader
.
Header
()
metrics
.
RecordLatestIndexedL1Height
(
l1Height
)
}
if
latestL2Header
!=
nil
{
l2Height
=
latestL2Header
.
Number
l2Header
=
latestL2Header
.
RLPHeader
.
Header
()
metrics
.
RecordLatestIndexedL2Height
(
l2Height
)
}
}
log
.
Info
(
"detected latest indexed bridge state"
,
"l1_block_number"
,
l1Height
,
"l2_block_number"
,
l2Height
)
latestFinalizedL2Header
,
err
:=
db
.
BridgeTransactions
.
L2LatestFinalizedBlockHeader
()
if
err
!=
nil
{
return
nil
,
err
}
}
log
.
Info
(
"detected indexed bridge state"
,
"l1_block"
,
latestL1Header
,
"l2_block"
,
latestL2Header
,
"finalized_l1_block"
,
latestFinalizedL1Header
,
"finalized_l2_block"
,
latestFinalizedL2Header
)
resCtx
,
resCancel
:=
context
.
WithCancel
(
context
.
Background
())
resCtx
,
resCancel
:=
context
.
WithCancel
(
context
.
Background
())
return
&
BridgeProcessor
{
return
&
BridgeProcessor
{
log
:
log
,
log
:
log
,
db
:
db
,
db
:
db
,
metrics
:
metrics
,
metrics
:
metrics
,
l1Etl
:
l1Etl
,
l1Etl
:
l1Etl
,
l2Etl
:
l2Etl
,
resourceCtx
:
resCtx
,
resourceCtx
:
resCtx
,
resourceCancel
:
resCancel
,
resourceCancel
:
resCancel
,
chainConfig
:
chainConfig
,
chainConfig
:
chainConfig
,
LatestL1Header
:
l1Header
,
LastL1Header
:
latestL1Header
,
LatestL2Header
:
l2Header
,
LastL2Header
:
latestL2Header
,
LastFinalizedL1Header
:
latestFinalizedL1Header
,
LastFinalizedL2Header
:
latestFinalizedL2Header
,
tasks
:
tasks
.
Group
{
HandleCrit
:
func
(
err
error
)
{
tasks
:
tasks
.
Group
{
HandleCrit
:
func
(
err
error
)
{
shutdown
(
fmt
.
Errorf
(
"critical error in bridge processor: %w"
,
err
))
shutdown
(
fmt
.
Errorf
(
"critical error in bridge processor: %w"
,
err
))
}},
}},
...
@@ -84,32 +89,25 @@ func NewBridgeProcessor(log log.Logger, db *database.DB, metrics bridge.Metricer
...
@@ -84,32 +89,25 @@ func NewBridgeProcessor(log log.Logger, db *database.DB, metrics bridge.Metricer
func
(
b
*
BridgeProcessor
)
Start
()
error
{
func
(
b
*
BridgeProcessor
)
Start
()
error
{
b
.
log
.
Info
(
"starting bridge processor..."
)
b
.
log
.
Info
(
"starting bridge processor..."
)
// Fire off independently on startup to check for
// start L1 worker
// new data or if we've indexed new L1 data.
l1EtlUpdates
:=
b
.
l1Etl
.
Notify
()
startup
:=
make
(
chan
interface
{},
1
)
startup
<-
nil
b
.
tasks
.
Go
(
func
()
error
{
b
.
tasks
.
Go
(
func
()
error
{
for
{
l1EtlUpdates
:=
b
.
l1Etl
.
Notify
()
select
{
for
range
l1EtlUpdates
{
case
<-
b
.
resourceCtx
.
Done
()
:
done
:=
b
.
metrics
.
RecordL1Interval
()
b
.
log
.
Info
(
"stopping bridge processor"
)
done
(
b
.
onL1Data
())
return
nil
// Tickers
case
<-
startup
:
case
<-
l1EtlUpdates
:
}
done
:=
b
.
metrics
.
RecordInterval
()
// TODO(8013): why log all the errors and return the same thing, if we just return the error, and log here?
err
:=
b
.
run
()
if
err
!=
nil
{
b
.
log
.
Error
(
"bridge processor error"
,
"err"
,
err
)
}
}
done
(
err
)
b
.
log
.
Info
(
"no more l1 etl updates. shutting down l1 task"
)
return
nil
})
// start L2 worker
b
.
tasks
.
Go
(
func
()
error
{
l2EtlUpdates
:=
b
.
l2Etl
.
Notify
()
for
range
l2EtlUpdates
{
done
:=
b
.
metrics
.
RecordL2Interval
()
done
(
b
.
onL2Data
())
}
}
b
.
log
.
Info
(
"no more l2 etl updates. shutting down l2 task"
)
return
nil
})
})
return
nil
return
nil
}
}
...
@@ -121,154 +119,270 @@ func (b *BridgeProcessor) Close() error {
...
@@ -121,154 +119,270 @@ func (b *BridgeProcessor) Close() error {
return
b
.
tasks
.
Wait
()
return
b
.
tasks
.
Wait
()
}
}
// Runs the processing loop. In order to ensure all seen bridge finalization events
// onL1Data will index new bridge events for the unvisited L1 state. As new L1 bridge events
// can be correlated with bridge initiated events, we establish a shared marker between
// are processed, bridge finalization events can be processed on L2 in this same window.
// L1 and L2 when processing events. The latest shared indexed time (epochs) between
func
(
b
*
BridgeProcessor
)
onL1Data
()
error
{
// L1 and L2 serves as this shared marker.
latestL1Header
:=
b
.
l1Etl
.
LatestHeader
func
(
b
*
BridgeProcessor
)
run
()
error
{
b
.
log
.
Info
(
"notified of new L1 state"
,
"l1_etl_block_number"
,
latestL1Header
.
Number
)
// In the event where we have a large number of un-observed epochs, we cap the search
// of epochs by 10k. If this turns out to be a bottleneck, we can parallelize the processing
var
errs
error
// of epochs to significantly speed up sync times.
if
err
:=
b
.
processInitiatedL1Events
();
err
!=
nil
{
maxEpochRange
:=
uint64
(
10
_000
)
b
.
log
.
Error
(
"failed to process initiated L1 events"
,
"err"
,
err
)
var
lastEpoch
*
big
.
Int
errs
=
errors
.
Join
(
errs
,
err
)
if
b
.
LatestL1Header
!=
nil
{
}
lastEpoch
=
b
.
LatestL1Header
.
Number
}
// `LastFinalizedL2Header` and `LastL1Header` are mutated by the same routine and can
// safely be read without needing any sync primitives
latestEpoch
,
err
:=
b
.
db
.
Blocks
.
LatestObservedEpoch
(
lastEpoch
,
maxEpochRange
)
if
b
.
LastFinalizedL2Header
==
nil
||
b
.
LastFinalizedL2Header
.
Timestamp
<
b
.
LastL1Header
.
Timestamp
{
if
err
!=
nil
{
if
err
:=
b
.
processFinalizedL2Events
();
err
!=
nil
{
return
err
b
.
log
.
Error
(
"failed to process finalized L2 events"
,
"err"
,
err
)
}
else
if
latestEpoch
==
nil
{
errs
=
errors
.
Join
(
errs
,
err
)
if
b
.
LatestL1Header
!=
nil
||
b
.
LatestL2Header
!=
nil
{
// Once we have some indexed state `latestEpoch != nil` as `LatestObservedEpoch` is inclusive in its search with the last provided epoch.
b
.
log
.
Error
(
"bridge events indexed, but no observed epoch returned"
,
"latest_bridge_l1_block_number"
,
b
.
LatestL1Header
.
Number
)
return
errors
.
New
(
"bridge events indexed, but no observed epoch returned"
)
}
}
b
.
log
.
Warn
(
"no observed epochs available. waiting..."
)
return
nil
}
}
if
b
.
LatestL1Header
!=
nil
&&
latestEpoch
.
L1BlockHeader
.
Hash
==
b
.
LatestL1Header
.
Hash
()
{
return
errs
b
.
log
.
Warn
(
"all available epochs indexed"
,
"latest_bridge_l1_block_number"
,
b
.
LatestL1Header
.
Number
)
}
return
nil
// onL2Data will index new bridge events for the unvisited L2 state. As new L2 bridge events
// are processed, bridge finalization events can be processed on L1 in this same window.
func
(
b
*
BridgeProcessor
)
onL2Data
()
error
{
if
b
.
l2Etl
.
LatestHeader
.
Number
.
Cmp
(
bigint
.
Zero
)
==
0
{
return
nil
// skip genesis
}
}
b
.
log
.
Info
(
"notified of new L2 state"
,
"l2_etl_block_number"
,
b
.
l2Etl
.
LatestHeader
.
Number
)
// Integrity Checks
var
errs
error
if
err
:=
b
.
processInitiatedL2Events
();
err
!=
nil
{
b
.
log
.
Error
(
"failed to process initiated L2 events"
,
"err"
,
err
)
errs
=
errors
.
Join
(
errs
,
err
)
}
genesisL1Height
:=
big
.
NewInt
(
int64
(
b
.
chainConfig
.
L1StartingHeight
))
// `LastFinalizedL1Header` and `LastL2Header` are mutated by the same routine and can
if
latestEpoch
.
L1BlockHeader
.
Number
.
Cmp
(
genesisL1Height
)
<
0
{
// safely be read without needing any sync primitives
b
.
log
.
Error
(
"L1 epoch less than starting L1 height observed"
,
"l1_starting_number"
,
genesisL1Height
,
"latest_epoch_number"
,
latestEpoch
.
L1BlockHeader
.
Number
)
if
b
.
LastFinalizedL1Header
==
nil
||
b
.
LastFinalizedL1Header
.
Timestamp
<
b
.
LastL2Header
.
Timestamp
{
return
errors
.
New
(
"L1 epoch less than starting L1 height observed"
)
if
err
:=
b
.
processFinalizedL1Events
();
err
!=
nil
{
b
.
log
.
Error
(
"failed to process finalized L1 events"
,
"err"
,
err
)
errs
=
errors
.
Join
(
errs
,
err
)
}
}
if
b
.
LatestL1Header
!=
nil
&&
latestEpoch
.
L1BlockHeader
.
Number
.
Cmp
(
b
.
LatestL1Header
.
Number
)
<=
0
{
b
.
log
.
Error
(
"non-increasing l1 block height observed"
,
"latest_bridge_l1_block_number"
,
b
.
LatestL1Header
.
Number
,
"latest_epoch_l1_block_number"
,
latestEpoch
.
L1BlockHeader
.
Number
)
return
errors
.
New
(
"non-increasing l1 block height observed"
)
}
}
if
b
.
LatestL2Header
!=
nil
&&
latestEpoch
.
L2BlockHeader
.
Number
.
Cmp
(
b
.
LatestL2Header
.
Number
)
<=
0
{
b
.
log
.
Error
(
"non-increasing l2 block height observed"
,
"latest_bridge_l2_block_number"
,
b
.
LatestL2Header
.
Number
,
"latest_epoch_l2_block_number"
,
latestEpoch
.
L2BlockHeader
.
Number
)
return
errs
return
errors
.
New
(
"non-increasing l2 block height observed"
)
}
// Process Initiated Bridge Events
func
(
b
*
BridgeProcessor
)
processInitiatedL1Events
()
error
{
l1BridgeLog
:=
b
.
log
.
New
(
"bridge"
,
"l1"
,
"kind"
,
"initiated"
)
lastL1BlockNumber
:=
big
.
NewInt
(
int64
(
b
.
chainConfig
.
L1StartingHeight
)
-
1
)
if
b
.
LastL1Header
!=
nil
{
lastL1BlockNumber
=
b
.
LastL1Header
.
Number
}
}
toL1Height
,
toL2Height
:=
latestEpoch
.
L1BlockHeader
.
Number
,
latestEpoch
.
L2BlockHeader
.
Number
// Latest unobserved L1 state bounded by `blockLimits` blocks. Since this process is driven on new L1 data,
fromL1Height
,
fromL2Height
:=
genesisL1Height
,
bigint
.
Zero
// we always expect this query to return a new result
if
b
.
LatestL1Header
!=
nil
{
latestL1HeaderScope
:=
func
(
db
*
gorm
.
DB
)
*
gorm
.
DB
{
fromL1Height
=
new
(
big
.
Int
)
.
Add
(
b
.
LatestL1Header
.
Number
,
bigint
.
One
)
newQuery
:=
db
.
Session
(
&
gorm
.
Session
{
NewDB
:
true
})
// fresh subquery
headers
:=
newQuery
.
Model
(
database
.
L1BlockHeader
{})
.
Where
(
"number > ?"
,
lastL1BlockNumber
)
return
db
.
Where
(
"number = (?)"
,
newQuery
.
Table
(
"(?) as block_numbers"
,
headers
.
Order
(
"number ASC"
)
.
Limit
(
blocksLimit
))
.
Select
(
"MAX(number)"
))
}
}
if
b
.
LatestL2Header
!=
nil
{
latestL1Header
,
err
:=
b
.
db
.
Blocks
.
L1BlockHeaderWithScope
(
latestL1HeaderScope
)
fromL2Height
=
new
(
big
.
Int
)
.
Add
(
b
.
LatestL2Header
.
Number
,
bigint
.
One
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to query new L1 state: %w"
,
err
)
}
else
if
latestL1Header
==
nil
{
return
fmt
.
Errorf
(
"no new L1 state found"
)
}
}
l1BedrockStartingHeight
:=
big
.
NewInt
(
int64
(
b
.
chainConfig
.
L1BedrockStartingHeight
))
fromL1Height
,
toL1Height
:=
new
(
big
.
Int
)
.
Add
(
lastL1BlockNumber
,
bigint
.
One
),
latestL1Header
.
Number
l2BedrockStartingHeight
:=
big
.
NewInt
(
int64
(
b
.
chainConfig
.
L2BedrockStartingHeight
))
batchLog
:=
b
.
log
.
New
(
"epoch_start_number"
,
fromL1Height
,
"epoch_end_number"
,
toL1Height
)
batchLog
.
Info
(
"unobserved epochs"
,
"latest_l1_block_number"
,
fromL1Height
,
"latest_l2_block_number"
,
fromL2Height
)
if
err
:=
b
.
db
.
Transaction
(
func
(
tx
*
database
.
DB
)
error
{
if
err
:=
b
.
db
.
Transaction
(
func
(
tx
*
database
.
DB
)
error
{
l1BridgeLog
:=
b
.
log
.
New
(
"bridge"
,
"l1"
)
l1BedrockStartingHeight
:=
big
.
NewInt
(
int64
(
b
.
chainConfig
.
L1BedrockStartingHeight
))
l2BridgeLog
:=
b
.
log
.
New
(
"bridge"
,
"l2"
)
if
l1BedrockStartingHeight
.
Cmp
(
fromL1Height
)
>
0
{
// OP Mainnet & OP Goerli Only.
// FOR OP-MAINNET, OP-GOERLI ONLY! Specially handle the existence of pre-bedrock blocks
if
l1BedrockStartingHeight
.
Cmp
(
fromL1Height
)
>
0
{
legacyFromL1Height
,
legacyToL1Height
:=
fromL1Height
,
toL1Height
legacyFromL1Height
,
legacyToL1Height
:=
fromL1Height
,
toL1Height
legacyFromL2Height
,
legacyToL2Height
:=
fromL2Height
,
toL2Height
if
l1BedrockStartingHeight
.
Cmp
(
toL1Height
)
<=
0
{
if
l1BedrockStartingHeight
.
Cmp
(
toL1Height
)
<=
0
{
legacyToL1Height
=
new
(
big
.
Int
)
.
Sub
(
l1BedrockStartingHeight
,
bigint
.
One
)
legacyToL1Height
=
new
(
big
.
Int
)
.
Sub
(
l1BedrockStartingHeight
,
bigint
.
One
)
legacyToL2Height
=
new
(
big
.
Int
)
.
Sub
(
l2BedrockStartingHeight
,
bigint
.
One
)
}
}
l1BridgeLog
=
l1BridgeLog
.
New
(
"mode"
,
"legacy"
,
"from_l1_block_number"
,
legacyFromL1Height
,
"to_l1_block_number"
,
legacyToL1Height
)
legacyBridgeLog
:=
l1BridgeLog
.
New
(
"mode"
,
"legacy"
,
"from_block_number"
,
legacyFromL1Height
,
"to_block_number"
,
legacyToL1Height
)
l1BridgeLog
.
Info
(
"scanning for bridge events"
)
legacyBridgeLog
.
Info
(
"scanning for legacy initiated bridge events"
)
if
err
:=
bridge
.
LegacyL1ProcessInitiatedBridgeEvents
(
legacyBridgeLog
,
tx
,
b
.
metrics
,
b
.
chainConfig
.
L1Contracts
,
legacyFromL1Height
,
legacyToL1Height
);
err
!=
nil
{
l2BridgeLog
=
l2BridgeLog
.
New
(
"mode"
,
"legacy"
,
"from_l2_block_number"
,
legacyFromL2Height
,
"to_l2_block_number"
,
legacyToL2Height
)
l2BridgeLog
.
Info
(
"scanning for bridge events"
)
// First, find all possible initiated bridge events
if
err
:=
bridge
.
LegacyL1ProcessInitiatedBridgeEvents
(
l1BridgeLog
,
tx
,
b
.
metrics
,
b
.
chainConfig
.
L1Contracts
,
legacyFromL1Height
,
legacyToL1Height
);
err
!=
nil
{
batchLog
.
Error
(
"failed to index legacy l1 initiated bridge events"
,
"err"
,
err
)
return
err
return
err
}
else
if
legacyToL1Height
.
Cmp
(
toL1Height
)
==
0
{
return
nil
// a-ok! Entire range was legacy blocks
}
}
if
err
:=
bridge
.
LegacyL2ProcessInitiatedBridgeEvents
(
l2BridgeLog
,
tx
,
b
.
metrics
,
b
.
chainConfig
.
L2Contracts
,
legacyFromL2Height
,
legacyToL2Height
);
err
!=
nil
{
legacyBridgeLog
.
Info
(
"detected switch to bedrock"
,
"bedrock_block_number"
,
l1BedrockStartingHeight
)
batchLog
.
Error
(
"failed to index legacy l2 initiated bridge events"
,
"err"
,
err
)
fromL1Height
=
l1BedrockStartingHeight
}
l1BridgeLog
=
l1BridgeLog
.
New
(
"from_block_number"
,
fromL1Height
,
"to_block_number"
,
toL1Height
)
l1BridgeLog
.
Info
(
"scanning for initiated bridge events"
)
return
bridge
.
L1ProcessInitiatedBridgeEvents
(
l1BridgeLog
,
tx
,
b
.
metrics
,
b
.
chainConfig
.
L1Contracts
,
fromL1Height
,
toL1Height
)
});
err
!=
nil
{
return
err
return
err
}
}
// Now that all initiated events have been indexed, it is ensured that all finalization can find their counterpart.
b
.
LastL1Header
=
latestL1Header
if
err
:=
bridge
.
LegacyL1ProcessFinalizedBridgeEvents
(
l1BridgeLog
,
tx
,
b
.
metrics
,
b
.
l1Etl
.
EthClient
,
b
.
chainConfig
.
L1Contracts
,
legacyFromL1Height
,
legacyToL1Height
);
err
!=
nil
{
b
.
metrics
.
RecordL1LatestHeight
(
latestL1Header
.
Number
)
batchLog
.
Error
(
"failed to index legacy l1 finalized bridge events"
,
"err"
,
err
)
return
nil
}
func
(
b
*
BridgeProcessor
)
processInitiatedL2Events
()
error
{
l2BridgeLog
:=
b
.
log
.
New
(
"bridge"
,
"l2"
,
"kind"
,
"initiated"
)
lastL2BlockNumber
:=
bigint
.
Zero
if
b
.
LastL2Header
!=
nil
{
lastL2BlockNumber
=
b
.
LastL2Header
.
Number
}
// Latest unobserved L2 state bounded by `blockLimits` blocks. Since this process is driven by new L2 data,
// we always expect this query to return a new result
latestL2HeaderScope
:=
func
(
db
*
gorm
.
DB
)
*
gorm
.
DB
{
newQuery
:=
db
.
Session
(
&
gorm
.
Session
{
NewDB
:
true
})
// fresh subquery
headers
:=
newQuery
.
Model
(
database
.
L2BlockHeader
{})
.
Where
(
"number > ?"
,
lastL2BlockNumber
)
return
db
.
Where
(
"number = (?)"
,
newQuery
.
Table
(
"(?) as block_numbers"
,
headers
.
Order
(
"number ASC"
)
.
Limit
(
blocksLimit
))
.
Select
(
"MAX(number)"
))
}
latestL2Header
,
err
:=
b
.
db
.
Blocks
.
L2BlockHeaderWithScope
(
latestL2HeaderScope
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to query new L2 state: %w"
,
err
)
}
else
if
latestL2Header
==
nil
{
return
fmt
.
Errorf
(
"no new L2 state found"
)
}
fromL2Height
,
toL2Height
:=
new
(
big
.
Int
)
.
Add
(
lastL2BlockNumber
,
bigint
.
One
),
latestL2Header
.
Number
if
err
:=
b
.
db
.
Transaction
(
func
(
tx
*
database
.
DB
)
error
{
l2BedrockStartingHeight
:=
big
.
NewInt
(
int64
(
b
.
chainConfig
.
L2BedrockStartingHeight
))
if
l2BedrockStartingHeight
.
Cmp
(
fromL2Height
)
>
0
{
// OP Mainnet & OP Goerli Only
legacyFromL2Height
,
legacyToL2Height
:=
fromL2Height
,
toL2Height
if
l2BedrockStartingHeight
.
Cmp
(
toL2Height
)
<=
0
{
legacyToL2Height
=
new
(
big
.
Int
)
.
Sub
(
l2BedrockStartingHeight
,
bigint
.
One
)
}
legacyBridgeLog
:=
l2BridgeLog
.
New
(
"mode"
,
"legacy"
,
"from_block_number"
,
legacyFromL2Height
,
"to_block_number"
,
legacyToL2Height
)
legacyBridgeLog
.
Info
(
"scanning for legacy initiated bridge events"
)
if
err
:=
bridge
.
LegacyL2ProcessInitiatedBridgeEvents
(
legacyBridgeLog
,
tx
,
b
.
metrics
,
b
.
chainConfig
.
L2Contracts
,
legacyFromL2Height
,
legacyToL2Height
);
err
!=
nil
{
return
err
return
err
}
else
if
legacyToL2Height
.
Cmp
(
toL2Height
)
==
0
{
return
nil
// a-ok! Entire range was legacy blocks
}
}
if
err
:=
bridge
.
LegacyL2ProcessFinalizedBridgeEvents
(
l2BridgeLog
,
tx
,
b
.
metrics
,
b
.
chainConfig
.
L2Contracts
,
legacyFromL2Height
,
legacyToL2Height
);
err
!=
nil
{
legacyBridgeLog
.
Info
(
"detected switch to bedrock"
)
batchLog
.
Error
(
"failed to index legacy l2l finalized bridge events"
,
"err"
,
err
)
fromL2Height
=
l2BedrockStartingHeight
}
l2BridgeLog
=
l2BridgeLog
.
New
(
"from_block_number"
,
fromL2Height
,
"to_block_number"
,
toL2Height
)
l2BridgeLog
.
Info
(
"scanning for initiated bridge events"
)
return
bridge
.
L2ProcessInitiatedBridgeEvents
(
l2BridgeLog
,
tx
,
b
.
metrics
,
b
.
chainConfig
.
L2Contracts
,
fromL2Height
,
toL2Height
)
});
err
!=
nil
{
return
err
return
err
}
}
if
legacyToL1Height
.
Cmp
(
toL1Height
)
==
0
{
b
.
LastL2Header
=
latestL2Header
// a-ok! entire batch was legacy blocks
b
.
metrics
.
RecordL2LatestHeight
(
latestL2Header
.
Number
)
return
nil
return
nil
}
}
batchLog
.
Info
(
"detected switch to bedrock"
,
"l1_bedrock_starting_height"
,
l1BedrockStartingHeight
,
"l2_bedrock_starting_height"
,
l2BedrockStartingHeight
)
// Process Finalized Bridge Events
fromL1Height
=
l1BedrockStartingHeight
fromL2Height
=
l2BedrockStartingHeight
func
(
b
*
BridgeProcessor
)
processFinalizedL1Events
()
error
{
l1BridgeLog
:=
b
.
log
.
New
(
"bridge"
,
"l1"
,
"kind"
,
"finalization"
)
lastFinalizedL1BlockNumber
:=
big
.
NewInt
(
int64
(
b
.
chainConfig
.
L1StartingHeight
)
-
1
)
if
b
.
LastFinalizedL1Header
!=
nil
{
lastFinalizedL1BlockNumber
=
b
.
LastFinalizedL1Header
.
Number
}
}
l1BridgeLog
=
l1BridgeLog
.
New
(
"from_l1_block_number"
,
fromL1Height
,
"to_l1_block_number"
,
toL1Height
)
// Latest unfinalized L1 state bounded by `blockLimit` blocks that have had L2 bridge events indexed. Since L1 data
l1BridgeLog
.
Info
(
"scanning for bridge events"
)
// is indexed independently of L2, there may not be new L1 state to finalized
latestL1HeaderScope
:=
func
(
db
*
gorm
.
DB
)
*
gorm
.
DB
{
newQuery
:=
db
.
Session
(
&
gorm
.
Session
{
NewDB
:
true
})
// fresh subquery
headers
:=
newQuery
.
Model
(
database
.
L1BlockHeader
{})
.
Where
(
"number > ? AND timestamp <= ?"
,
lastFinalizedL1BlockNumber
,
b
.
LastL2Header
.
Timestamp
)
return
db
.
Where
(
"number = (?)"
,
newQuery
.
Table
(
"(?) as block_numbers"
,
headers
.
Order
(
"number ASC"
)
.
Limit
(
blocksLimit
))
.
Select
(
"MAX(number)"
))
}
latestL1Header
,
err
:=
b
.
db
.
Blocks
.
L1BlockHeaderWithScope
(
latestL1HeaderScope
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to query for latest unfinalized L1 state: %w"
,
err
)
}
else
if
latestL1Header
==
nil
{
l1BridgeLog
.
Debug
(
"no new l1 state to finalize"
,
"last_finalized_block_number"
,
lastFinalizedL1BlockNumber
)
return
nil
}
l2BridgeLog
=
l2BridgeLog
.
New
(
"from_l2_block_number"
,
fromL2Height
,
"to_l2_block_number"
,
toL2Height
)
fromL1Height
,
toL1Height
:=
new
(
big
.
Int
)
.
Add
(
lastFinalizedL1BlockNumber
,
bigint
.
One
),
latestL1Header
.
Number
l2BridgeLog
.
Info
(
"scanning for bridge events"
)
if
err
:=
b
.
db
.
Transaction
(
func
(
tx
*
database
.
DB
)
error
{
l1BedrockStartingHeight
:=
big
.
NewInt
(
int64
(
b
.
chainConfig
.
L1BedrockStartingHeight
))
if
l1BedrockStartingHeight
.
Cmp
(
fromL1Height
)
>
0
{
legacyFromL1Height
,
legacyToL1Height
:=
fromL1Height
,
toL1Height
if
l1BedrockStartingHeight
.
Cmp
(
toL1Height
)
<=
0
{
legacyToL1Height
=
new
(
big
.
Int
)
.
Sub
(
l1BedrockStartingHeight
,
bigint
.
One
)
}
// First, find all possible initiated bridge events
legacyBridgeLog
:=
l1BridgeLog
.
New
(
"mode"
,
"legacy"
,
"from_block_number"
,
legacyFromL1Height
,
"to_block_number"
,
legacyToL1Height
)
if
err
:=
bridge
.
L1ProcessInitiatedBridgeEvents
(
l1BridgeLog
,
tx
,
b
.
metrics
,
b
.
chainConfig
.
L1Contracts
,
fromL1Height
,
toL1Height
);
err
!=
nil
{
legacyBridgeLog
.
Info
(
"scanning for legacy finalized bridge events"
)
batchLog
.
Error
(
"failed to index l1 initiated bridge events"
,
"err"
,
err
)
if
err
:=
bridge
.
LegacyL1ProcessFinalizedBridgeEvents
(
legacyBridgeLog
,
tx
,
b
.
metrics
,
b
.
l1Etl
.
EthClient
,
b
.
chainConfig
.
L1Contracts
,
legacyFromL1Height
,
legacyToL1Height
);
err
!=
nil
{
return
err
return
err
}
else
if
legacyToL1Height
.
Cmp
(
toL1Height
)
==
0
{
return
nil
// a-ok! Entire range was legacy blocks
}
}
if
err
:=
bridge
.
L2ProcessInitiatedBridgeEvents
(
l2BridgeLog
,
tx
,
b
.
metrics
,
b
.
chainConfig
.
L2Contracts
,
fromL2Height
,
toL2Height
);
err
!=
nil
{
legacyBridgeLog
.
Info
(
"detected switch to bedrock"
)
batchLog
.
Error
(
"failed to index l2 initiated bridge events"
,
"err"
,
err
)
fromL1Height
=
l1BedrockStartingHeight
return
err
}
}
// Now all finalization events can find their counterpart.
l1BridgeLog
=
l1BridgeLog
.
New
(
"from_block_number"
,
fromL1Height
,
"to_block_number"
,
toL1Height
)
if
err
:=
bridge
.
L1ProcessFinalizedBridgeEvents
(
l1BridgeLog
,
tx
,
b
.
metrics
,
b
.
chainConfig
.
L1Contracts
,
fromL1Height
,
toL1Height
);
err
!=
nil
{
l1BridgeLog
.
Info
(
"scanning for finalized bridge events"
)
batchLog
.
Error
(
"failed to index l1 finalized bridge events"
,
"err"
,
err
)
return
bridge
.
L1ProcessFinalizedBridgeEvents
(
l1BridgeLog
,
tx
,
b
.
metrics
,
b
.
chainConfig
.
L1Contracts
,
fromL1Height
,
toL1Height
)
});
err
!=
nil
{
return
err
return
err
}
}
if
err
:=
bridge
.
L2ProcessFinalizedBridgeEvents
(
l2BridgeLog
,
tx
,
b
.
metrics
,
b
.
chainConfig
.
L2Contracts
,
fromL2Height
,
toL2Height
);
err
!=
nil
{
batchLog
.
Error
(
"failed to index l2 finalized bridge events"
,
"err"
,
err
)
b
.
LastFinalizedL1Header
=
latestL1Header
return
err
b
.
metrics
.
RecordL1LatestFinalizedHeight
(
latestL1Header
.
Number
)
return
nil
}
func
(
b
*
BridgeProcessor
)
processFinalizedL2Events
()
error
{
l2BridgeLog
:=
b
.
log
.
New
(
"bridge"
,
"l2"
,
"kind"
,
"finalization"
)
lastFinalizedL2BlockNumber
:=
bigint
.
Zero
if
b
.
LastFinalizedL2Header
!=
nil
{
lastFinalizedL2BlockNumber
=
b
.
LastFinalizedL2Header
.
Number
}
}
// a-ok
// Latest unfinalized L2 state bounded by `blockLimit` blocks that have had L1 bridge events indexed. Since L2 data
// is indexed independently of L1, there may not be new L2 state to finalized
latestL2HeaderScope
:=
func
(
db
*
gorm
.
DB
)
*
gorm
.
DB
{
newQuery
:=
db
.
Session
(
&
gorm
.
Session
{
NewDB
:
true
})
// fresh subquery
headers
:=
newQuery
.
Model
(
database
.
L2BlockHeader
{})
.
Where
(
"number > ? AND timestamp <= ?"
,
lastFinalizedL2BlockNumber
,
b
.
LastL1Header
.
Timestamp
)
return
db
.
Where
(
"number = (?)"
,
newQuery
.
Table
(
"(?) as block_numbers"
,
headers
.
Order
(
"number ASC"
)
.
Limit
(
blocksLimit
))
.
Select
(
"MAX(number)"
))
}
latestL2Header
,
err
:=
b
.
db
.
Blocks
.
L2BlockHeaderWithScope
(
latestL2HeaderScope
)
if
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to query for latest unfinalized L2 state: %w"
,
err
)
}
else
if
latestL2Header
==
nil
{
l2BridgeLog
.
Debug
(
"no new l2 state to finalize"
,
"last_finalized_block_number"
,
lastFinalizedL2BlockNumber
)
return
nil
return
nil
});
err
!=
nil
{
}
fromL2Height
,
toL2Height
:=
new
(
big
.
Int
)
.
Add
(
lastFinalizedL2BlockNumber
,
bigint
.
One
),
latestL2Header
.
Number
if
err
:=
b
.
db
.
Transaction
(
func
(
tx
*
database
.
DB
)
error
{
l2BedrockStartingHeight
:=
big
.
NewInt
(
int64
(
b
.
chainConfig
.
L2BedrockStartingHeight
))
if
l2BedrockStartingHeight
.
Cmp
(
fromL2Height
)
>
0
{
legacyFromL2Height
,
legacyToL2Height
:=
fromL2Height
,
toL2Height
if
l2BedrockStartingHeight
.
Cmp
(
toL2Height
)
<=
0
{
legacyToL2Height
=
new
(
big
.
Int
)
.
Sub
(
l2BedrockStartingHeight
,
bigint
.
One
)
}
legacyBridgeLog
:=
l2BridgeLog
.
New
(
"mode"
,
"legacy"
,
"from_block_number"
,
legacyFromL2Height
,
"to_block_number"
,
legacyToL2Height
)
legacyBridgeLog
.
Info
(
"scanning for legacy finalized bridge events"
)
if
err
:=
bridge
.
LegacyL2ProcessFinalizedBridgeEvents
(
legacyBridgeLog
,
tx
,
b
.
metrics
,
b
.
chainConfig
.
L2Contracts
,
legacyFromL2Height
,
legacyToL2Height
);
err
!=
nil
{
return
err
return
err
}
else
if
legacyToL2Height
.
Cmp
(
toL2Height
)
==
0
{
return
nil
// a-ok! Entire range was legacy blocks
}
legacyBridgeLog
.
Info
(
"detected switch to bedrock"
,
"bedrock_block_number"
,
l2BedrockStartingHeight
)
fromL2Height
=
l2BedrockStartingHeight
}
}
batchLog
.
Info
(
"indexed bridge events"
,
"latest_l1_block_number"
,
toL1Height
,
"latest_l2_block_number"
,
toL2Height
)
l2BridgeLog
=
l2BridgeLog
.
New
(
"from_block_number"
,
fromL2Height
,
"to_block_number"
,
toL2Height
)
b
.
LatestL1Header
=
latestEpoch
.
L1BlockHeader
.
RLPHeader
.
Header
()
l2BridgeLog
.
Info
(
"scanning for finalized bridge events"
)
b
.
metrics
.
RecordLatestIndexedL1Height
(
b
.
LatestL1Header
.
Number
)
return
bridge
.
L2ProcessFinalizedBridgeEvents
(
l2BridgeLog
,
tx
,
b
.
metrics
,
b
.
chainConfig
.
L2Contracts
,
fromL2Height
,
toL2Height
)
});
err
!=
nil
{
return
err
}
b
.
La
testL2Header
=
latestEpoch
.
L2BlockHeader
.
RLPHeader
.
Header
()
b
.
La
stFinalizedL2Header
=
latestL2Header
b
.
metrics
.
RecordL
atestIndexedL2Height
(
b
.
L
atestL2Header
.
Number
)
b
.
metrics
.
RecordL
2LatestFinalizedHeight
(
l
atestL2Header
.
Number
)
return
nil
return
nil
}
}
indexer/processors/bridge/metrics.go
View file @
1c445ccb
...
@@ -14,7 +14,9 @@ var (
...
@@ -14,7 +14,9 @@ var (
)
)
type
L1Metricer
interface
{
type
L1Metricer
interface
{
RecordLatestIndexedL1Height
(
height
*
big
.
Int
)
RecordL1Interval
()
(
done
func
(
err
error
))
RecordL1LatestHeight
(
height
*
big
.
Int
)
RecordL1LatestFinalizedHeight
(
height
*
big
.
Int
)
RecordL1TransactionDeposits
(
size
int
,
mintedETH
float64
)
RecordL1TransactionDeposits
(
size
int
,
mintedETH
float64
)
RecordL1ProvenWithdrawals
(
size
int
)
RecordL1ProvenWithdrawals
(
size
int
)
...
@@ -28,7 +30,9 @@ type L1Metricer interface {
...
@@ -28,7 +30,9 @@ type L1Metricer interface {
}
}
type
L2Metricer
interface
{
type
L2Metricer
interface
{
RecordLatestIndexedL2Height
(
height
*
big
.
Int
)
RecordL2Interval
()
(
done
func
(
err
error
))
RecordL2LatestHeight
(
height
*
big
.
Int
)
RecordL2LatestFinalizedHeight
(
height
*
big
.
Int
)
RecordL2TransactionWithdrawals
(
size
int
,
withdrawnETH
float64
)
RecordL2TransactionWithdrawals
(
size
int
,
withdrawnETH
float64
)
...
@@ -42,17 +46,14 @@ type L2Metricer interface {
...
@@ -42,17 +46,14 @@ type L2Metricer interface {
type
Metricer
interface
{
type
Metricer
interface
{
L1Metricer
L1Metricer
L2Metricer
L2Metricer
RecordInterval
()
(
done
func
(
err
error
))
}
}
type
bridgeMetrics
struct
{
type
bridgeMetrics
struct
{
intervalTick
prometheus
.
Counter
latestHeight
*
prometheus
.
GaugeVec
intervalDuration
prometheus
.
Histogram
intervalFailures
prometheus
.
Counter
latestL1Height
prometheus
.
Gauge
intervalTick
*
prometheus
.
CounterVec
latestL2Height
prometheus
.
Gauge
intervalDuration
*
prometheus
.
HistogramVec
intervalFailures
*
prometheus
.
CounterVec
txDeposits
prometheus
.
Counter
txDeposits
prometheus
.
Counter
txMintedETH
prometheus
.
Counter
txMintedETH
prometheus
.
Counter
...
@@ -71,32 +72,35 @@ type bridgeMetrics struct {
...
@@ -71,32 +72,35 @@ type bridgeMetrics struct {
func
NewMetrics
(
registry
*
prometheus
.
Registry
)
Metricer
{
func
NewMetrics
(
registry
*
prometheus
.
Registry
)
Metricer
{
factory
:=
metrics
.
With
(
registry
)
factory
:=
metrics
.
With
(
registry
)
return
&
bridgeMetrics
{
return
&
bridgeMetrics
{
intervalTick
:
factory
.
NewCounter
(
prometheus
.
CounterOpts
{
intervalTick
:
factory
.
NewCounter
Vec
(
prometheus
.
CounterOpts
{
Namespace
:
MetricsNamespace
,
Namespace
:
MetricsNamespace
,
Name
:
"intervals_total"
,
Name
:
"intervals_total"
,
Help
:
"number of times processing loop has run"
,
Help
:
"number of times processing loop has run"
,
},
[]
string
{
"chain"
,
}),
}),
intervalDuration
:
factory
.
NewHistogram
(
prometheus
.
HistogramOpts
{
intervalDuration
:
factory
.
NewHistogram
Vec
(
prometheus
.
HistogramOpts
{
Namespace
:
MetricsNamespace
,
Namespace
:
MetricsNamespace
,
Name
:
"interval_seconds"
,
Name
:
"interval_seconds"
,
Help
:
"duration elapsed in the processing loop"
,
Help
:
"duration elapsed in the processing loop"
,
},
[]
string
{
"chain"
,
}),
}),
intervalFailures
:
factory
.
NewCounter
(
prometheus
.
CounterOpts
{
intervalFailures
:
factory
.
NewCounter
Vec
(
prometheus
.
CounterOpts
{
Namespace
:
MetricsNamespace
,
Namespace
:
MetricsNamespace
,
Name
:
"interval_failures_total"
,
Name
:
"interval_failures_total"
,
Help
:
"number of failures encountered"
,
Help
:
"number of failures encountered"
,
},
[]
string
{
"chain"
,
}),
}),
latest
L1Height
:
factory
.
NewGauge
(
prometheus
.
GaugeOpts
{
latest
Height
:
factory
.
NewGaugeVec
(
prometheus
.
GaugeOpts
{
Namespace
:
MetricsNamespace
,
Namespace
:
MetricsNamespace
,
Subsystem
:
"l1"
,
Subsystem
:
"l1"
,
Name
:
"height"
,
Name
:
"height"
,
Help
:
"the latest processed l1 block height"
,
Help
:
"the latest processed l1 block height"
,
}),
},
[]
string
{
latestL2Height
:
factory
.
NewGauge
(
prometheus
.
GaugeOpts
{
"chain"
,
Namespace
:
MetricsNamespace
,
"kind"
,
Subsystem
:
"l2"
,
Name
:
"height"
,
Help
:
"the latest processed l2 block height"
,
}),
}),
txDeposits
:
factory
.
NewCounter
(
prometheus
.
CounterOpts
{
txDeposits
:
factory
.
NewCounter
(
prometheus
.
CounterOpts
{
Namespace
:
MetricsNamespace
,
Namespace
:
MetricsNamespace
,
...
@@ -161,21 +165,25 @@ func NewMetrics(registry *prometheus.Registry) Metricer {
...
@@ -161,21 +165,25 @@ func NewMetrics(registry *prometheus.Registry) Metricer {
}
}
}
}
func
(
m
*
bridgeMetrics
)
RecordInterval
()
func
(
error
)
{
// L1Metricer
m
.
intervalTick
.
Inc
()
timer
:=
prometheus
.
NewTimer
(
m
.
intervalDuration
)
func
(
m
*
bridgeMetrics
)
RecordL1Interval
()
func
(
error
)
{
m
.
intervalTick
.
WithLabelValues
(
"l1"
)
.
Inc
()
timer
:=
prometheus
.
NewTimer
(
m
.
intervalDuration
.
WithLabelValues
(
"l1"
))
return
func
(
err
error
)
{
return
func
(
err
error
)
{
timer
.
ObserveDuration
()
timer
.
ObserveDuration
()
if
err
!=
nil
{
if
err
!=
nil
{
m
.
intervalFailures
.
Inc
()
m
.
intervalFailures
.
WithLabelValues
(
"l1"
)
.
Inc
()
}
}
}
}
}
}
// L1Metricer
func
(
m
*
bridgeMetrics
)
RecordL1LatestHeight
(
height
*
big
.
Int
)
{
m
.
latestHeight
.
WithLabelValues
(
"l1"
,
"initiated"
)
.
Set
(
float64
(
height
.
Uint64
()))
}
func
(
m
*
bridgeMetrics
)
RecordL
atestIndexedL1
Height
(
height
*
big
.
Int
)
{
func
(
m
*
bridgeMetrics
)
RecordL
1LatestFinalized
Height
(
height
*
big
.
Int
)
{
m
.
latest
L1Height
.
Set
(
float64
(
height
.
Uint64
()))
m
.
latest
Height
.
WithLabelValues
(
"l1"
,
"finalized"
)
.
Set
(
float64
(
height
.
Uint64
()))
}
}
func
(
m
*
bridgeMetrics
)
RecordL1TransactionDeposits
(
size
int
,
mintedETH
float64
)
{
func
(
m
*
bridgeMetrics
)
RecordL1TransactionDeposits
(
size
int
,
mintedETH
float64
)
{
...
@@ -209,8 +217,23 @@ func (m *bridgeMetrics) RecordL1FinalizedBridgeTransfers(tokenAddr common.Addres
...
@@ -209,8 +217,23 @@ func (m *bridgeMetrics) RecordL1FinalizedBridgeTransfers(tokenAddr common.Addres
// L2Metricer
// L2Metricer
func
(
m
*
bridgeMetrics
)
RecordLatestIndexedL2Height
(
height
*
big
.
Int
)
{
func
(
m
*
bridgeMetrics
)
RecordL2Interval
()
func
(
error
)
{
m
.
latestL2Height
.
Set
(
float64
(
height
.
Uint64
()))
m
.
intervalTick
.
WithLabelValues
(
"l2"
)
.
Inc
()
timer
:=
prometheus
.
NewTimer
(
m
.
intervalDuration
.
WithLabelValues
(
"l2"
))
return
func
(
err
error
)
{
timer
.
ObserveDuration
()
if
err
!=
nil
{
m
.
intervalFailures
.
WithLabelValues
(
"l2"
)
.
Inc
()
}
}
}
func
(
m
*
bridgeMetrics
)
RecordL2LatestHeight
(
height
*
big
.
Int
)
{
m
.
latestHeight
.
WithLabelValues
(
"l2"
,
"initiated"
)
.
Set
(
float64
(
height
.
Uint64
()))
}
func
(
m
*
bridgeMetrics
)
RecordL2LatestFinalizedHeight
(
height
*
big
.
Int
)
{
m
.
latestHeight
.
WithLabelValues
(
"l2"
,
"finalized"
)
.
Set
(
float64
(
height
.
Uint64
()))
}
}
func
(
m
*
bridgeMetrics
)
RecordL2TransactionWithdrawals
(
size
int
,
withdrawnETH
float64
)
{
func
(
m
*
bridgeMetrics
)
RecordL2TransactionWithdrawals
(
size
int
,
withdrawnETH
float64
)
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment