1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package etl
import (
"context"
"errors"
"fmt"
"math/big"
"time"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
type Config struct {
LoopIntervalMsec uint
HeaderBufferSize uint
StartHeight *big.Int
ConfirmationDepth *big.Int
}
type ETL struct {
log log.Logger
metrics Metricer
loopInterval time.Duration
headerBufferSize uint64
headerTraversal *node.HeaderTraversal
contracts []common.Address
etlBatches chan ETLBatch
EthClient node.EthClient
}
type ETLBatch struct {
Logger log.Logger
Headers []types.Header
HeaderMap map[common.Hash]*types.Header
Logs []types.Log
HeadersWithLog map[common.Hash]bool
}
func (etl *ETL) Start(ctx context.Context) error {
done := ctx.Done()
pollTicker := time.NewTicker(etl.loopInterval)
defer pollTicker.Stop()
// A reference that'll stay populated between intervals
// in the event of failures in order to retry.
var headers []types.Header
etl.log.Info("starting etl...")
for {
select {
case <-done:
etl.log.Info("stopping etl")
return nil
case <-pollTicker.C:
done := etl.metrics.RecordInterval()
if len(headers) > 0 {
etl.log.Info("retrying previous batch")
} else {
newHeaders, err := etl.headerTraversal.NextFinalizedHeaders(etl.headerBufferSize)
if err != nil {
etl.log.Error("error querying for headers", "err", err)
} else if len(newHeaders) == 0 {
etl.log.Warn("no new headers. processor unexpectedly at head...")
} else {
headers = newHeaders
etl.metrics.RecordBatchHeaders(len(newHeaders))
}
}
// only clear the reference if we were able to process this batch
err := etl.processBatch(headers)
if err == nil {
headers = nil
}
done(err)
}
}
}
func (etl *ETL) processBatch(headers []types.Header) error {
if len(headers) == 0 {
return nil
}
firstHeader, lastHeader := headers[0], headers[len(headers)-1]
batchLog := etl.log.New("batch_start_block_number", firstHeader.Number, "batch_end_block_number", lastHeader.Number)
batchLog.Info("extracting batch", "size", len(headers))
etl.metrics.RecordBatchLatestHeight(lastHeader.Number)
headerMap := make(map[common.Hash]*types.Header, len(headers))
for i := range headers {
header := headers[i]
headerMap[header.Hash()] = &header
}
headersWithLog := make(map[common.Hash]bool, len(headers))
filterQuery := ethereum.FilterQuery{FromBlock: firstHeader.Number, ToBlock: lastHeader.Number, Addresses: etl.contracts}
logs, err := etl.EthClient.FilterLogs(filterQuery)
if err != nil {
batchLog.Info("failed to extract logs", "err", err)
return err
}
if logs.ToBlockHeader.Number.Cmp(lastHeader.Number) != 0 {
// Warn and simply wait for the provider to synchronize state
batchLog.Warn("mismatch in FilterLog#ToBlock number", "queried_to_block_number", lastHeader.Number, "reported_to_block_number", logs.ToBlockHeader.Number)
return fmt.Errorf("mismatch in FilterLog#ToBlock number")
} else if logs.ToBlockHeader.Hash() != lastHeader.Hash() {
batchLog.Error("mismatch in FitlerLog#ToBlock block hash!!!", "queried_to_block_hash", lastHeader.Hash().String(), "reported_to_block_hash", logs.ToBlockHeader.Hash().String())
return fmt.Errorf("mismatch in FitlerLog#ToBlock block hash!!!")
}
if len(logs.Logs) > 0 {
batchLog.Info("detected logs", "size", len(logs.Logs))
}
for i := range logs.Logs {
log := logs.Logs[i]
if _, ok := headerMap[log.BlockHash]; !ok {
// NOTE. Definitely an error state if the none of the headers were re-orged out in between
// the blocks and logs retrieval operations. Unlikely as long as the confirmation depth has
// been appropriately set or when we get to natively handling reorgs.
batchLog.Error("log found with block hash not in the batch", "block_hash", logs.Logs[i].BlockHash, "log_index", logs.Logs[i].Index)
return errors.New("parsed log with a block hash not in the batch")
}
etl.metrics.RecordBatchLog(log.Address)
headersWithLog[log.BlockHash] = true
}
// ensure we use unique downstream references for the etl batch
headersRef := headers
etl.etlBatches <- ETLBatch{Logger: batchLog, Headers: headersRef, HeaderMap: headerMap, Logs: logs.Logs, HeadersWithLog: headersWithLog}
return nil
}