Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
66007fb4
Commit
66007fb4
authored
Oct 24, 2023
by
Hamdi Allam
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
indexer FilterLog client-side consistency
parent
a667fdda
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
65 additions
and
37 deletions
+65
-37
etl.go
indexer/etl/etl.go
+19
-7
client.go
indexer/node/client.go
+44
-28
mocks.go
indexer/node/mocks.go
+2
-2
No files found.
indexer/etl/etl.go
View file @
66007fb4
...
@@ -3,6 +3,7 @@ package etl
...
@@ -3,6 +3,7 @@ package etl
import
(
import
(
"context"
"context"
"errors"
"errors"
"fmt"
"math/big"
"math/big"
"time"
"time"
...
@@ -105,22 +106,33 @@ func (etl *ETL) processBatch(headers []types.Header) error {
...
@@ -105,22 +106,33 @@ func (etl *ETL) processBatch(headers []types.Header) error {
}
}
headersWithLog
:=
make
(
map
[
common
.
Hash
]
bool
,
len
(
headers
))
headersWithLog
:=
make
(
map
[
common
.
Hash
]
bool
,
len
(
headers
))
logs
,
err
:=
etl
.
EthClient
.
FilterLogs
(
ethereum
.
FilterQuery
{
FromBlock
:
firstHeader
.
Number
,
ToBlock
:
lastHeader
.
Number
,
Addresses
:
etl
.
contracts
})
filterQuery
:=
ethereum
.
FilterQuery
{
FromBlock
:
firstHeader
.
Number
,
ToBlock
:
lastHeader
.
Number
,
Addresses
:
etl
.
contracts
}
logs
,
err
:=
etl
.
EthClient
.
FilterLogs
(
filterQuery
)
if
err
!=
nil
{
if
err
!=
nil
{
batchLog
.
Info
(
"failed to extract logs"
,
"err"
,
err
)
batchLog
.
Info
(
"failed to extract logs"
,
"err"
,
err
)
return
err
return
err
}
}
if
len
(
logs
)
>
0
{
batchLog
.
Info
(
"detected logs"
,
"size"
,
len
(
logs
))
if
logs
.
ToBlockHeader
.
Number
.
Cmp
(
lastHeader
.
Number
)
!=
0
{
// Warn and simply wait for the provider to synchronize state
batchLog
.
Warn
(
"mismatch in FilterLog#ToBlock number"
,
"queried_to_block_number"
,
lastHeader
.
Number
,
"reported_to_block_number"
,
logs
.
ToBlockHeader
.
Number
)
return
fmt
.
Errorf
(
"mismatch in FilterLog#ToBlock number"
)
}
else
if
logs
.
ToBlockHeader
.
Hash
()
!=
lastHeader
.
Hash
()
{
batchLog
.
Error
(
"mismatch in FitlerLog#ToBlock block hash!!!"
,
"queried_to_block_hash"
,
lastHeader
.
Hash
()
.
String
(),
"reported_to_block_hash"
,
logs
.
ToBlockHeader
.
Hash
()
.
String
())
return
fmt
.
Errorf
(
"mismatch in FitlerLog#ToBlock block hash!!!"
)
}
if
len
(
logs
.
Logs
)
>
0
{
batchLog
.
Info
(
"detected logs"
,
"size"
,
len
(
logs
.
Logs
))
}
}
for
i
:=
range
logs
{
for
i
:=
range
logs
.
Logs
{
log
:=
logs
[
i
]
log
:=
logs
.
Logs
[
i
]
if
_
,
ok
:=
headerMap
[
log
.
BlockHash
];
!
ok
{
if
_
,
ok
:=
headerMap
[
log
.
BlockHash
];
!
ok
{
// NOTE. Definitely an error state if the none of the headers were re-orged out in between
// NOTE. Definitely an error state if the none of the headers were re-orged out in between
// the blocks and logs retrieval operations. Unlikely as long as the confirmation depth has
// the blocks and logs retrieval operations. Unlikely as long as the confirmation depth has
// been appropriately set or when we get to natively handling reorgs.
// been appropriately set or when we get to natively handling reorgs.
batchLog
.
Error
(
"log found with block hash not in the batch"
,
"block_hash"
,
logs
[
i
]
.
BlockHash
,
"log_index"
,
l
ogs
[
i
]
.
Index
)
batchLog
.
Error
(
"log found with block hash not in the batch"
,
"block_hash"
,
logs
.
Logs
[
i
]
.
BlockHash
,
"log_index"
,
logs
.
L
ogs
[
i
]
.
Index
)
return
errors
.
New
(
"parsed log with a block hash not in the batch"
)
return
errors
.
New
(
"parsed log with a block hash not in the batch"
)
}
}
...
@@ -130,6 +142,6 @@ func (etl *ETL) processBatch(headers []types.Header) error {
...
@@ -130,6 +142,6 @@ func (etl *ETL) processBatch(headers []types.Header) error {
// ensure we use unique downstream references for the etl batch
// ensure we use unique downstream references for the etl batch
headersRef
:=
headers
headersRef
:=
headers
etl
.
etlBatches
<-
ETLBatch
{
Logger
:
batchLog
,
Headers
:
headersRef
,
HeaderMap
:
headerMap
,
Logs
:
logs
,
HeadersWithLog
:
headersWithLog
}
etl
.
etlBatches
<-
ETLBatch
{
Logger
:
batchLog
,
Headers
:
headersRef
,
HeaderMap
:
headerMap
,
Logs
:
logs
.
Logs
,
HeadersWithLog
:
headersWithLog
}
return
nil
return
nil
}
}
indexer/node/client.go
View file @
66007fb4
...
@@ -39,7 +39,7 @@ type EthClient interface {
...
@@ -39,7 +39,7 @@ type EthClient interface {
TxByHash
(
common
.
Hash
)
(
*
types
.
Transaction
,
error
)
TxByHash
(
common
.
Hash
)
(
*
types
.
Transaction
,
error
)
StorageHash
(
common
.
Address
,
*
big
.
Int
)
(
common
.
Hash
,
error
)
StorageHash
(
common
.
Address
,
*
big
.
Int
)
(
common
.
Hash
,
error
)
FilterLogs
(
ethereum
.
FilterQuery
)
(
[]
types
.
Log
,
error
)
FilterLogs
(
ethereum
.
FilterQuery
)
(
Logs
,
error
)
}
}
type
clnt
struct
{
type
clnt
struct
{
...
@@ -122,15 +122,12 @@ func (c *clnt) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.Hea
...
@@ -122,15 +122,12 @@ func (c *clnt) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.Hea
}
}
count
:=
new
(
big
.
Int
)
.
Sub
(
endHeight
,
startHeight
)
.
Uint64
()
+
1
count
:=
new
(
big
.
Int
)
.
Sub
(
endHeight
,
startHeight
)
.
Uint64
()
+
1
headers
:=
make
([]
types
.
Header
,
count
)
batchElems
:=
make
([]
rpc
.
BatchElem
,
count
)
batchElems
:=
make
([]
rpc
.
BatchElem
,
count
)
for
i
:=
uint64
(
0
);
i
<
count
;
i
++
{
for
i
:=
uint64
(
0
);
i
<
count
;
i
++
{
height
:=
new
(
big
.
Int
)
.
Add
(
startHeight
,
new
(
big
.
Int
)
.
SetUint64
(
i
))
height
:=
new
(
big
.
Int
)
.
Add
(
startHeight
,
new
(
big
.
Int
)
.
SetUint64
(
i
))
batchElems
[
i
]
=
rpc
.
BatchElem
{
batchElems
[
i
]
=
rpc
.
BatchElem
{
Method
:
"eth_getBlockByNumber"
,
Args
:
[]
interface
{}{
toBlockNumArg
(
height
),
false
},
Result
:
&
headers
[
i
]}
Method
:
"eth_getBlockByNumber"
,
Args
:
[]
interface
{}{
toBlockNumArg
(
height
),
false
},
Result
:
new
(
types
.
Header
),
Error
:
nil
,
}
}
}
ctxwt
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
defaultRequestTimeout
)
ctxwt
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
defaultRequestTimeout
)
...
@@ -144,23 +141,21 @@ func (c *clnt) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.Hea
...
@@ -144,23 +141,21 @@ func (c *clnt) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.Hea
// - Ensure integrity that they build on top of each other
// - Ensure integrity that they build on top of each other
// - Truncate out headers that do not exist (endHeight > "latest")
// - Truncate out headers that do not exist (endHeight > "latest")
size
:=
0
size
:=
0
headers
:=
make
([]
types
.
Header
,
count
)
for
i
,
batchElem
:=
range
batchElems
{
for
i
,
batchElem
:=
range
batchElems
{
if
batchElem
.
Error
!=
nil
{
if
batchElem
.
Error
!=
nil
{
return
nil
,
batchElem
.
Error
if
size
==
0
{
return
nil
,
batchElem
.
Error
}
else
{
break
// try return whatever headers are available
}
}
else
if
batchElem
.
Result
==
nil
{
}
else
if
batchElem
.
Result
==
nil
{
break
break
}
}
header
,
ok
:=
batchElem
.
Result
.
(
*
types
.
Header
)
if
i
>
0
&&
headers
[
i
]
.
ParentHash
!=
headers
[
i
-
1
]
.
Hash
()
{
if
!
ok
{
return
nil
,
fmt
.
Errorf
(
"queried header %s does not follow parent %s"
,
headers
[
i
]
.
Hash
(),
headers
[
i
-
1
]
.
Hash
())
return
nil
,
fmt
.
Errorf
(
"unable to transform rpc response %v into types.Header"
,
batchElem
.
Result
)
}
if
i
>
0
&&
header
.
ParentHash
!=
headers
[
i
-
1
]
.
Hash
()
{
return
nil
,
fmt
.
Errorf
(
"queried header %s does not follow parent %s"
,
header
.
Hash
(),
headers
[
i
-
1
]
.
Hash
())
}
}
headers
[
i
]
=
*
header
size
=
size
+
1
size
=
size
+
1
}
}
...
@@ -197,19 +192,43 @@ func (c *clnt) StorageHash(address common.Address, blockNumber *big.Int) (common
...
@@ -197,19 +192,43 @@ func (c *clnt) StorageHash(address common.Address, blockNumber *big.Int) (common
return
proof
.
StorageHash
,
nil
return
proof
.
StorageHash
,
nil
}
}
// FilterLogs returns logs that fit the query parameters
type
Logs
struct
{
func
(
c
*
clnt
)
FilterLogs
(
query
ethereum
.
FilterQuery
)
([]
types
.
Log
,
error
)
{
Logs
[]
types
.
Log
ctxwt
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
defaultRequestTimeout
)
ToBlockHeader
*
types
.
Header
defer
cancel
()
}
var
result
[]
types
.
Log
// FilterLogs returns logs that fit the query parameters. The underlying request is a batch
// request including `eth_getBlockHeaderByNumber` to allow the caller to check that connected
// node has the state necessary to fulfill this request
func
(
c
*
clnt
)
FilterLogs
(
query
ethereum
.
FilterQuery
)
(
Logs
,
error
)
{
arg
,
err
:=
toFilterArg
(
query
)
arg
,
err
:=
toFilterArg
(
query
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
Logs
{}
,
err
}
}
err
=
c
.
rpc
.
CallContext
(
ctxwt
,
&
result
,
"eth_getLogs"
,
arg
)
var
logs
[]
types
.
Log
return
result
,
err
var
header
types
.
Header
batchElems
:=
make
([]
rpc
.
BatchElem
,
2
)
batchElems
[
0
]
=
rpc
.
BatchElem
{
Method
:
"eth_getBlockByNumber"
,
Args
:
[]
interface
{}{
toBlockNumArg
(
query
.
ToBlock
),
false
},
Result
:
&
header
}
batchElems
[
1
]
=
rpc
.
BatchElem
{
Method
:
"eth_getLogs"
,
Args
:
[]
interface
{}{
arg
},
Result
:
&
logs
}
ctxwt
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
defaultRequestTimeout
)
defer
cancel
()
err
=
c
.
rpc
.
BatchCallContext
(
ctxwt
,
batchElems
)
if
err
!=
nil
{
return
Logs
{},
err
}
if
batchElems
[
0
]
.
Error
!=
nil
{
return
Logs
{},
fmt
.
Errorf
(
"unable to query for the `FilterQuery#ToBlock` header: %w"
,
batchElems
[
0
]
.
Error
)
}
if
batchElems
[
1
]
.
Error
!=
nil
{
return
Logs
{},
fmt
.
Errorf
(
"unable to query logs: %w"
,
batchElems
[
1
]
.
Error
)
}
return
Logs
{
Logs
:
logs
,
ToBlockHeader
:
&
header
},
nil
}
}
// Modeled off op-service/client.go. We can refactor this once the client/metrics portion
// Modeled off op-service/client.go. We can refactor this once the client/metrics portion
...
@@ -262,10 +281,7 @@ func toBlockNumArg(number *big.Int) string {
...
@@ -262,10 +281,7 @@ func toBlockNumArg(number *big.Int) string {
}
}
func
toFilterArg
(
q
ethereum
.
FilterQuery
)
(
interface
{},
error
)
{
func
toFilterArg
(
q
ethereum
.
FilterQuery
)
(
interface
{},
error
)
{
arg
:=
map
[
string
]
interface
{}{
arg
:=
map
[
string
]
interface
{}{
"address"
:
q
.
Addresses
,
"topics"
:
q
.
Topics
}
"address"
:
q
.
Addresses
,
"topics"
:
q
.
Topics
,
}
if
q
.
BlockHash
!=
nil
{
if
q
.
BlockHash
!=
nil
{
arg
[
"blockHash"
]
=
*
q
.
BlockHash
arg
[
"blockHash"
]
=
*
q
.
BlockHash
if
q
.
FromBlock
!=
nil
||
q
.
ToBlock
!=
nil
{
if
q
.
FromBlock
!=
nil
||
q
.
ToBlock
!=
nil
{
...
...
indexer/node/mocks.go
View file @
66007fb4
...
@@ -41,7 +41,7 @@ func (m *MockEthClient) StorageHash(address common.Address, blockNumber *big.Int
...
@@ -41,7 +41,7 @@ func (m *MockEthClient) StorageHash(address common.Address, blockNumber *big.Int
return
args
.
Get
(
0
)
.
(
common
.
Hash
),
args
.
Error
(
1
)
return
args
.
Get
(
0
)
.
(
common
.
Hash
),
args
.
Error
(
1
)
}
}
func
(
m
*
MockEthClient
)
FilterLogs
(
query
ethereum
.
FilterQuery
)
(
[]
types
.
Log
,
error
)
{
func
(
m
*
MockEthClient
)
FilterLogs
(
query
ethereum
.
FilterQuery
)
(
Logs
,
error
)
{
args
:=
m
.
Called
(
query
)
args
:=
m
.
Called
(
query
)
return
args
.
Get
(
0
)
.
(
[]
types
.
Log
),
args
.
Error
(
1
)
return
args
.
Get
(
0
)
.
(
Logs
),
args
.
Error
(
1
)
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment