Commit 8766f3ec authored by protolambda's avatar protolambda

Merge branch 'develop' into tip/allow-overlapping-span-batch

parents 81cbf88e a281abba
This diff is collapsed.
ee5d02c3ef5f55a06b069e4a70a820661a9130c8
d85718785859dc0b5a095d2302d1a20ec06ab77a
......@@ -17,6 +17,3 @@
path = packages/contracts-bedrock/lib/safe-contracts
url = https://github.com/safe-global/safe-contracts
branch = v1.4.0
......@@ -3,6 +3,9 @@ ITESTS_L2_HOST=http://localhost:9545
BEDROCK_TAGS_REMOTE?=origin
OP_STACK_GO_BUILDER?=us-docker.pkg.dev/oplabs-tools-artifacts/images/op-stack-go:latest
# Requires at least Python v3.9; specify a minor version below if needed
PYTHON?=python3
build: build-go build-ts
.PHONY: build
......@@ -28,7 +31,7 @@ golang-docker:
# We don't use a buildx builder here, and just load directly into regular docker, for convenience.
GIT_COMMIT=$$(git rev-parse HEAD) \
GIT_DATE=$$(git show -s --format='%ct') \
IMAGE_TAGS=$$GIT_COMMIT,latest \
IMAGE_TAGS=$$(git rev-parse HEAD),latest \
docker buildx bake \
--progress plain \
--load \
......@@ -114,14 +117,14 @@ pre-devnet:
devnet-up: pre-devnet
./ops/scripts/newer-file.sh .devnet/allocs-l1.json ./packages/contracts-bedrock \
|| make devnet-allocs
PYTHONPATH=./bedrock-devnet python3 ./bedrock-devnet/main.py --monorepo-dir=.
PYTHONPATH=./bedrock-devnet $(PYTHON) ./bedrock-devnet/main.py --monorepo-dir=.
.PHONY: devnet-up
# alias for devnet-up
devnet-up-deploy: devnet-up
devnet-test: pre-devnet
PYTHONPATH=./bedrock-devnet python3 ./bedrock-devnet/main.py --monorepo-dir=. --test
PYTHONPATH=./bedrock-devnet $(PYTHON) ./bedrock-devnet/main.py --monorepo-dir=. --test
.PHONY: devnet-test
devnet-down:
......@@ -137,7 +140,7 @@ devnet-clean:
.PHONY: devnet-clean
devnet-allocs: pre-devnet
PYTHONPATH=./bedrock-devnet python3 ./bedrock-devnet/main.py --monorepo-dir=. --allocs
PYTHONPATH=./bedrock-devnet $(PYTHON) ./bedrock-devnet/main.py --monorepo-dir=. --allocs
devnet-logs:
@(cd ./ops-bedrock && docker compose logs -f)
......@@ -186,4 +189,3 @@ install-geth:
go install -v github.com/ethereum/go-ethereum/cmd/geth@$(shell cat .gethrc); \
echo "Installed geth!"; true)
.PHONY: install-geth
......@@ -202,13 +202,12 @@ def devnet_deploy(paths):
# If someone reads this comment and understands why this is being done, please
# update this comment to explain.
init_devnet_l1_deploy_config(paths, update_timestamp=True)
outfile_l1 = pjoin(paths.devnet_dir, 'genesis-l1.json')
run_command([
'go', 'run', 'cmd/main.go', 'genesis', 'l1',
'--deploy-config', paths.devnet_config_path,
'--l1-allocs', paths.allocs_path,
'--l1-deployments', paths.addresses_json_path,
'--outfile.l1', outfile_l1,
'--outfile.l1', paths.genesis_l1_path,
], cwd=paths.op_node_dir)
log.info('Starting L1.')
......@@ -227,8 +226,8 @@ def devnet_deploy(paths):
'--l1-rpc', 'http://localhost:8545',
'--deploy-config', paths.devnet_config_path,
'--deployment-dir', paths.deployment_dir,
'--outfile.l2', pjoin(paths.devnet_dir, 'genesis-l2.json'),
'--outfile.rollup', pjoin(paths.devnet_dir, 'rollup.json')
'--outfile.l2', paths.genesis_l2_path,
'--outfile.rollup', paths.rollup_config_path
], cwd=paths.op_node_dir)
rollup_config = read_json(paths.rollup_config_path)
......@@ -288,21 +287,23 @@ def debug_dumpBlock(url):
def wait_for_rpc_server(url):
log.info(f'Waiting for RPC server at {url}')
conn = http.client.HTTPConnection(url)
headers = {'Content-type': 'application/json'}
body = '{"id":1, "jsonrpc":"2.0", "method": "eth_chainId", "params":[]}'
while True:
try:
conn = http.client.HTTPConnection(url)
conn.request('POST', '/', body, headers)
response = conn.getresponse()
conn.close()
if response.status < 300:
log.info(f'RPC server at {url} ready')
return
except Exception as e:
log.info(f'Waiting for RPC server at {url}')
time.sleep(1)
finally:
if conn:
conn.close()
CommandPreset = namedtuple('Command', ['name', 'args', 'cwd', 'timeout'])
......
comment: false
codecov:
require_ci_to_pass: false
comment:
layout: "diff, flags, files"
behavior: default
require_changes: true
flags:
- contracts-bedrock-tests
ignore:
- "op-e2e"
- "**/*.t.sol"
- "op-bindings/bindings/*.go"
- "**/*.t.sol"
- "packages/contracts-bedrock/test/**/*.sol"
- "packages/contracts-bedrock/scripts/**/*.sol"
- "packages/contracts-bedrock/contracts/vendor/WETH9.sol"
- 'packages/contracts-bedrock/contracts/EAS/**/*.sol'
coverage:
......@@ -13,6 +23,7 @@ coverage:
threshold: 0% # coverage is not allowed to reduce vs. the PR base
base: auto
informational: true
enabled: true
project:
default:
informational: true
......
......@@ -158,7 +158,7 @@ target "ufm-metamask" {
tags = [for tag in split(",", IMAGE_TAGS) : "${REGISTRY}/${REPOSITORY}/ufm-metamask:${tag}"]
}
type "chain-mon" {
target "chain-mon" {
dockerfile = "./ops/docker/Dockerfile.packages"
context = "."
args = {
......@@ -173,9 +173,9 @@ type "chain-mon" {
tags = [for tag in split(",", IMAGE_TAGS) : "${REGISTRY}/${REPOSITORY}/chain-mon:${tag}"]
}
type "ci-builder" {
dockerfile = "Dockerfile"
context = "ops/docker/ci-builder"
target "ci-builder" {
dockerfile = "./ops/docker/ci-builder/Dockerfile"
context = "."
platforms = split(",", PLATFORMS)
tags = [for tag in split(",", IMAGE_TAGS) : "${REGISTRY}/${REPOSITORY}/ci-builder:${tag}"]
}
......
......@@ -8,7 +8,7 @@ require (
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0
github.com/ethereum-optimism/go-ethereum-hdwallet v0.1.3
github.com/ethereum-optimism/superchain-registry/superchain v0.0.0-20231018202221-fdba3d104171
github.com/ethereum-optimism/superchain-registry/superchain v0.0.0-20231030223232-e16eae11e492
github.com/ethereum/go-ethereum v1.13.1
github.com/fsnotify/fsnotify v1.7.0
github.com/go-chi/chi/v5 v5.0.10
......@@ -16,35 +16,35 @@ require (
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb
github.com/google/go-cmp v0.6.0
github.com/google/gofuzz v1.2.1-0.20220503160820-4a35382e8fc8
github.com/google/uuid v1.3.1
github.com/google/uuid v1.4.0
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/holiman/uint256 v1.2.3
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/jackc/pgtype v1.14.0
github.com/jackc/pgx/v5 v5.4.3
github.com/libp2p/go-libp2p v0.31.0
github.com/jackc/pgx/v5 v5.5.0
github.com/libp2p/go-libp2p v0.32.0
github.com/libp2p/go-libp2p-mplex v0.9.0
github.com/libp2p/go-libp2p-pubsub v0.9.3
github.com/libp2p/go-libp2p-pubsub v0.10.0
github.com/libp2p/go-libp2p-testing v0.12.0
github.com/mattn/go-isatty v0.0.20
github.com/multiformats/go-base32 v0.1.0
github.com/multiformats/go-multiaddr v0.12.0
github.com/multiformats/go-multiaddr-dns v0.3.1
github.com/olekukonko/tablewriter v0.0.5
github.com/onsi/gomega v1.28.1
github.com/onsi/gomega v1.29.0
github.com/pkg/errors v0.9.1
github.com/pkg/profile v1.7.0
github.com/prometheus/client_golang v1.17.0
github.com/stretchr/testify v1.8.4
github.com/urfave/cli/v2 v2.25.7
golang.org/x/crypto v0.14.0
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
golang.org/x/sync v0.4.0
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
golang.org/x/sync v0.5.0
golang.org/x/term v0.13.0
golang.org/x/time v0.3.0
gorm.io/driver/postgres v1.5.3
golang.org/x/time v0.4.0
gorm.io/driver/postgres v1.5.4
gorm.io/gorm v1.25.5
)
......@@ -98,10 +98,9 @@ require (
github.com/gofrs/flock v0.8.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20230817174616-7a8ec2ada47b // indirect
github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/graph-gophers/graphql-go v1.3.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
......@@ -118,6 +117,7 @@ require (
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
......@@ -125,7 +125,7 @@ require (
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/karalabe/usb v0.0.3-0.20230711191512-61db3e06439c // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/kr/pretty v0.3.1 // indirect
......@@ -145,7 +145,7 @@ require (
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/miekg/dns v1.1.55 // indirect
github.com/miekg/dns v1.1.56 // indirect
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
......@@ -158,7 +158,7 @@ require (
github.com/multiformats/go-multibase v0.2.0 // indirect
github.com/multiformats/go-multicodec v0.9.0 // indirect
github.com/multiformats/go-multihash v0.2.3 // indirect
github.com/multiformats/go-multistream v0.4.1 // indirect
github.com/multiformats/go-multistream v0.5.0 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/naoina/go-stringutil v0.1.0 // indirect
github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416 // indirect
......@@ -172,9 +172,9 @@ require (
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/qtls-go1-20 v0.3.3 // indirect
github.com/quic-go/quic-go v0.38.1 // indirect
github.com/quic-go/webtransport-go v0.5.3 // indirect
github.com/quic-go/qtls-go1-20 v0.3.4 // indirect
github.com/quic-go/quic-go v0.39.3 // indirect
github.com/quic-go/webtransport-go v0.6.0 // indirect
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/rivo/uniseg v0.4.3 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
......@@ -192,15 +192,16 @@ require (
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.uber.org/automaxprocs v1.5.2 // indirect
go.uber.org/dig v1.17.0 // indirect
go.uber.org/fx v1.20.0 // indirect
go.uber.org/dig v1.17.1 // indirect
go.uber.org/fx v1.20.1 // indirect
go.uber.org/mock v0.3.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.25.0 // indirect
golang.org/x/mod v0.12.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/tools v0.13.0 // indirect
golang.org/x/tools v0.14.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
......@@ -209,7 +210,7 @@ require (
rsc.io/tmplfunc v0.0.3 // indirect
)
replace github.com/ethereum/go-ethereum v1.13.1 => github.com/ethereum-optimism/op-geth v1.101303.0-rc.2.0.20231024150425-5023660bf92d
replace github.com/ethereum/go-ethereum v1.13.1 => github.com/ethereum-optimism/op-geth v1.101304.0-rc.2.0.20231030225546-cd491fa3b588
//replace github.com/ethereum-optimism/superchain-registry/superchain => ../superchain-registry/superchain
//replace github.com/ethereum/go-ethereum v1.13.1 => ../go-ethereum
This diff is collapsed.
......@@ -34,12 +34,12 @@ export interface WithdrawalItem {
from: string;
to: string;
transactionHash: string;
messageHash: string;
crossDomainMessageHash: string;
timestamp: number /* uint64 */;
l2BlockHash: string;
amount: string;
proofTransactionHash: string;
claimTransactionHash: string;
l1ProvenTxHash: string;
l1FinalizedTxHash: string;
l1TokenAddress: string;
l2TokenAddress: string;
}
......
......@@ -6,33 +6,25 @@ import (
"fmt"
"net"
"net/http"
"runtime/debug"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/prometheus/client_golang/prometheus"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/api/routes"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/op-service/httputil"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum/log"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/prometheus/client_golang/prometheus"
)
const ethereumAddressRegex = `^0x[a-fA-F0-9]{40}$`
// Api ... Indexer API struct
// TODO : Structured error responses
type API struct {
log log.Logger
router *chi.Mux
serverConfig config.ServerConfig
metricsConfig config.ServerConfig
metricsRegistry *prometheus.Registry
}
const (
MetricsNamespace = "op_indexer_api"
addressParam = "{address:%s}"
......@@ -45,6 +37,23 @@ const (
WithdrawalsPath = "/api/v0/withdrawals/"
)
// Api ... Indexer API struct
// TODO : Structured error responses
type APIService struct {
log log.Logger
router *chi.Mux
bv database.BridgeTransfersView
dbClose func() error
metricsRegistry *prometheus.Registry
apiServer *httputil.HTTPServer
metricsServer *httputil.HTTPServer
stopped atomic.Bool
}
// chiMetricsMiddleware ... Injects a metrics recorder into request processing middleware
func chiMetricsMiddleware(rec metrics.HTTPRecorder) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
......@@ -53,112 +62,117 @@ func chiMetricsMiddleware(rec metrics.HTTPRecorder) func(http.Handler) http.Hand
}
// NewApi ... Construct a new api instance
func NewApi(logger log.Logger, bv database.BridgeTransfersView, serverConfig config.ServerConfig, metricsConfig config.ServerConfig) *API {
// (1) Initialize dependencies
apiRouter := chi.NewRouter()
h := routes.NewRoutes(logger, bv, apiRouter)
mr := metrics.NewRegistry()
promRecorder := metrics.NewPromHTTPRecorder(mr, MetricsNamespace)
// (2) Inject routing middleware
apiRouter.Use(chiMetricsMiddleware(promRecorder))
apiRouter.Use(middleware.Recoverer)
apiRouter.Use(middleware.Heartbeat(HealthPath))
func NewApi(ctx context.Context, log log.Logger, cfg *Config) (*APIService, error) {
out := &APIService{log: log, metricsRegistry: metrics.NewRegistry()}
if err := out.initFromConfig(ctx, cfg); err != nil {
return nil, errors.Join(err, out.Stop(ctx)) // close any resources we may have opened already
}
return out, nil
}
// (3) Set GET routes
apiRouter.Get(fmt.Sprintf(DepositsPath+addressParam, ethereumAddressRegex), h.L1DepositsHandler)
apiRouter.Get(fmt.Sprintf(WithdrawalsPath+addressParam, ethereumAddressRegex), h.L2WithdrawalsHandler)
func (a *APIService) initFromConfig(ctx context.Context, cfg *Config) error {
if err := a.initDB(ctx, cfg.DB); err != nil {
return fmt.Errorf("failed to init DB: %w", err)
}
if err := a.startMetricsServer(cfg.MetricsServer); err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
a.initRouter(cfg.HTTPServer)
if err := a.startServer(cfg.HTTPServer); err != nil {
return fmt.Errorf("failed to start API server: %w", err)
}
return nil
}
return &API{log: logger, router: apiRouter, metricsRegistry: mr, serverConfig: serverConfig, metricsConfig: metricsConfig}
func (a *APIService) Start(ctx context.Context) error {
// Completed all setup-up jobs at init-time already,
// and the API service does not have any other special starting routines or background-jobs to start.
return nil
}
// Run ... Runs the API server routines
func (a *API) Run(ctx context.Context) error {
var wg sync.WaitGroup
errCh := make(chan error, 2)
// (1) Construct an inner function that will start a goroutine
// and handle any panics that occur on a shared error channel
processCtx, processCancel := context.WithCancel(ctx)
runProcess := func(start func(ctx context.Context) error) {
wg.Add(1)
go func() {
defer func() {
if err := recover(); err != nil {
a.log.Error("halting api on panic", "err", err)
debug.PrintStack()
errCh <- fmt.Errorf("panic: %v", err)
}
processCancel()
wg.Done()
}()
errCh <- start(processCtx)
}()
func (a *APIService) Stop(ctx context.Context) error {
var result error
if a.apiServer != nil {
if err := a.apiServer.Stop(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop API server: %w", err))
}
}
if a.metricsServer != nil {
if err := a.metricsServer.Stop(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop metrics server: %w", err))
}
}
if a.dbClose != nil {
if err := a.dbClose(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to close DB: %w", err))
}
}
a.stopped.Store(true)
a.log.Info("API service shutdown complete")
return result
}
// (2) Start the API and metrics servers
runProcess(a.startServer)
runProcess(a.startMetricsServer)
func (a *APIService) Stopped() bool {
return a.stopped.Load()
}
// (3) Wait for all processes to complete
wg.Wait()
// Addr ... returns the address that the HTTP server is listening on (excl. http:// prefix, just the host and port)
func (a *APIService) Addr() string {
if a.apiServer == nil {
return ""
}
return a.apiServer.Addr().String()
}
err := <-errCh
func (a *APIService) initDB(ctx context.Context, connector DBConnector) error {
db, err := connector.OpenDB(ctx, a.log)
if err != nil {
a.log.Error("api stopped", "err", err)
} else {
a.log.Info("api stopped")
return fmt.Errorf("failed to connect to databse: %w", err)
}
return err
a.dbClose = db.Closer
a.bv = db.BridgeTransfers
return nil
}
// Port ... Returns the the port that server is listening on
func (a *API) Port() int {
return a.serverConfig.Port
func (a *APIService) initRouter(apiConfig config.ServerConfig) {
apiRouter := chi.NewRouter()
h := routes.NewRoutes(a.log, a.bv, apiRouter)
promRecorder := metrics.NewPromHTTPRecorder(a.metricsRegistry, MetricsNamespace)
// (2) Inject routing middleware
apiRouter.Use(chiMetricsMiddleware(promRecorder))
apiRouter.Use(middleware.Timeout(time.Duration(apiConfig.WriteTimeout) * time.Second))
apiRouter.Use(middleware.Recoverer)
apiRouter.Use(middleware.Heartbeat(HealthPath))
// (3) Set GET routes
apiRouter.Get(fmt.Sprintf(DepositsPath+addressParam, ethereumAddressRegex), h.L1DepositsHandler)
apiRouter.Get(fmt.Sprintf(WithdrawalsPath+addressParam, ethereumAddressRegex), h.L2WithdrawalsHandler)
a.router = apiRouter
}
// startServer ... Starts the API server
func (a *API) startServer(ctx context.Context) error {
a.log.Debug("api server listening...", "port", a.serverConfig.Port)
addr := net.JoinHostPort(a.serverConfig.Host, strconv.Itoa(a.serverConfig.Port))
func (a *APIService) startServer(serverConfig config.ServerConfig) error {
a.log.Debug("API server listening...", "port", serverConfig.Port)
addr := net.JoinHostPort(serverConfig.Host, strconv.Itoa(serverConfig.Port))
srv, err := httputil.StartHTTPServer(addr, a.router)
if err != nil {
return fmt.Errorf("failed to start API server: %w", err)
}
host, portStr, err := net.SplitHostPort(srv.Addr().String())
if err != nil {
return errors.Join(err, srv.Close())
}
port, err := strconv.Atoi(portStr)
if err != nil {
return errors.Join(err, srv.Close())
}
// Update the port in the config in case the OS chose a different port
// than the one we requested (e.g. using port 0 to fetch a random open port)
a.serverConfig.Host = host
a.serverConfig.Port = port
<-ctx.Done()
if err := srv.Stop(context.Background()); err != nil {
return fmt.Errorf("failed to shutdown api server: %w", err)
}
a.log.Info("API server started", "addr", srv.Addr().String())
a.apiServer = srv
return nil
}
// startMetricsServer ... Starts the metrics server
func (a *API) startMetricsServer(ctx context.Context) error {
a.log.Debug("starting metrics server...", "port", a.metricsConfig.Port)
srv, err := metrics.StartServer(a.metricsRegistry, a.metricsConfig.Host, a.metricsConfig.Port)
func (a *APIService) startMetricsServer(metricsConfig config.ServerConfig) error {
a.log.Debug("starting metrics server...", "port", metricsConfig.Port)
srv, err := metrics.StartServer(a.metricsRegistry, metricsConfig.Host, metricsConfig.Port)
if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
<-ctx.Done()
defer a.log.Info("metrics server stopped")
return srv.Stop(context.Background())
a.log.Info("Metrics server started", "addr", srv.Addr().String())
a.metricsServer = srv
return nil
}
package api
import (
"context"
"encoding/json"
"fmt"
"net/http"
......@@ -24,11 +25,12 @@ var mockAddress = "0x4204204204204204204204204204204204204204"
var apiConfig = config.ServerConfig{
Host: "localhost",
Port: 8080,
Port: 0, // random port, to allow parallel tests
}
var metricsConfig = config.ServerConfig{
Host: "localhost",
Port: 7300,
Port: 0, // random port, to allow parallel tests
}
var (
......@@ -95,8 +97,14 @@ func (mbv *MockBridgeTransfersView) L2BridgeWithdrawalsByAddress(address common.
}
func TestHealthz(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
api := NewApi(logger, &MockBridgeTransfersView{}, apiConfig, metricsConfig)
request, err := http.NewRequest("GET", "/healthz", nil)
cfg := &Config{
DB: &TestDBConnector{BridgeTransfers: &MockBridgeTransfersView{}},
HTTPServer: apiConfig,
MetricsServer: metricsConfig,
}
api, err := NewApi(context.Background(), logger, cfg)
require.NoError(t, err)
request, err := http.NewRequest("GET", "http://"+api.Addr()+"/healthz", nil)
assert.Nil(t, err)
responseRecorder := httptest.NewRecorder()
......@@ -107,8 +115,14 @@ func TestHealthz(t *testing.T) {
func TestL1BridgeDepositsHandler(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
api := NewApi(logger, &MockBridgeTransfersView{}, apiConfig, metricsConfig)
request, err := http.NewRequest("GET", fmt.Sprintf("/api/v0/deposits/%s", mockAddress), nil)
cfg := &Config{
DB: &TestDBConnector{BridgeTransfers: &MockBridgeTransfersView{}},
HTTPServer: apiConfig,
MetricsServer: metricsConfig,
}
api, err := NewApi(context.Background(), logger, cfg)
require.NoError(t, err)
request, err := http.NewRequest("GET", fmt.Sprintf("http://"+api.Addr()+"/api/v0/deposits/%s", mockAddress), nil)
assert.Nil(t, err)
responseRecorder := httptest.NewRecorder()
......@@ -130,8 +144,14 @@ func TestL1BridgeDepositsHandler(t *testing.T) {
func TestL2BridgeWithdrawalsByAddressHandler(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
api := NewApi(logger, &MockBridgeTransfersView{}, apiConfig, metricsConfig)
request, err := http.NewRequest("GET", fmt.Sprintf("/api/v0/withdrawals/%s", mockAddress), nil)
cfg := &Config{
DB: &TestDBConnector{BridgeTransfers: &MockBridgeTransfersView{}},
HTTPServer: apiConfig,
MetricsServer: metricsConfig,
}
api, err := NewApi(context.Background(), logger, cfg)
require.NoError(t, err)
request, err := http.NewRequest("GET", fmt.Sprintf("http://"+api.Addr()+"/api/v0/withdrawals/%s", mockAddress), nil)
assert.Nil(t, err)
responseRecorder := httptest.NewRecorder()
......@@ -149,8 +169,8 @@ func TestL2BridgeWithdrawalsByAddressHandler(t *testing.T) {
assert.Equal(t, resp.Items[0].To, withdrawal.Tx.ToAddress.String())
assert.Equal(t, resp.Items[0].TransactionHash, common.HexToHash("0x789").String())
assert.Equal(t, resp.Items[0].Amount, withdrawal.Tx.Amount.String())
assert.Equal(t, resp.Items[0].ProofTransactionHash, common.HexToHash("0x123").String())
assert.Equal(t, resp.Items[0].ClaimTransactionHash, common.HexToHash("0x123").String())
assert.Equal(t, resp.Items[0].L1ProvenTxHash, common.HexToHash("0x123").String())
assert.Equal(t, resp.Items[0].L1FinalizedTxHash, common.HexToHash("0x123").String())
assert.Equal(t, resp.Items[0].L1TokenAddress, withdrawal.TokenPair.RemoteTokenAddress.String())
assert.Equal(t, resp.Items[0].L2TokenAddress, withdrawal.TokenPair.LocalTokenAddress.String())
assert.Equal(t, resp.Items[0].Timestamp, withdrawal.Tx.Timestamp)
......
package api
import (
"context"
"fmt"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
)
// DB represents the abstract DB access the API has.
type DB struct {
BridgeTransfers database.BridgeTransfersView
Closer func() error
}
// DBConfigConnector implements a fully config based DBConnector
type DBConfigConnector struct {
config.DBConfig
}
func (cfg *DBConfigConnector) OpenDB(ctx context.Context, log log.Logger) (*DB, error) {
db, err := database.NewDB(ctx, log, cfg.DBConfig)
if err != nil {
return nil, fmt.Errorf("failed to connect to databse: %w", err)
}
return &DB{
BridgeTransfers: db.BridgeTransfers,
Closer: db.Close,
}, nil
}
type TestDBConnector struct {
BridgeTransfers database.BridgeTransfersView
}
func (tdb *TestDBConnector) OpenDB(ctx context.Context, log log.Logger) (*DB, error) {
return &DB{
BridgeTransfers: tdb.BridgeTransfers,
Closer: func() error {
log.Info("API service closed test DB view")
return nil
},
}, nil
}
// DBConnector is an interface: the config may provide different ways to access the DB.
// This is implemented in tests to provide custom DB views, or share the DB with other services.
type DBConnector interface {
OpenDB(ctx context.Context, log log.Logger) (*DB, error)
}
// Config for the API service
type Config struct {
DB DBConnector
HTTPServer config.ServerConfig
MetricsServer config.ServerConfig
}
package models
import (
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum/go-ethereum/common"
)
// DepositItem ... Deposit item model for API responses
type DepositItem struct {
Guid string `json:"guid"`
......@@ -23,18 +28,18 @@ type DepositResponse struct {
// WithdrawalItem ... Data model for API JSON response
type WithdrawalItem struct {
Guid string `json:"guid"`
From string `json:"from"`
To string `json:"to"`
TransactionHash string `json:"transactionHash"`
MessageHash string `json:"messageHash"`
Timestamp uint64 `json:"timestamp"`
L2BlockHash string `json:"l2BlockHash"`
Amount string `json:"amount"`
ProofTransactionHash string `json:"proofTransactionHash"`
ClaimTransactionHash string `json:"claimTransactionHash"`
L1TokenAddress string `json:"l1TokenAddress"`
L2TokenAddress string `json:"l2TokenAddress"`
Guid string `json:"guid"`
From string `json:"from"`
To string `json:"to"`
TransactionHash string `json:"transactionHash"`
CrossDomainMessageHash string `json:"crossDomainMessageHash"`
Timestamp uint64 `json:"timestamp"`
L2BlockHash string `json:"l2BlockHash"`
Amount string `json:"amount"`
L1ProvenTxHash string `json:"l1ProvenTxHash"`
L1FinalizedTxHash string `json:"l1FinalizedTxHash"`
L1TokenAddress string `json:"l1TokenAddress"`
L2TokenAddress string `json:"l2TokenAddress"`
}
// WithdrawalResponse ... Data model for API JSON response
......@@ -43,3 +48,38 @@ type WithdrawalResponse struct {
HasNextPage bool `json:"hasNextPage"`
Items []WithdrawalItem `json:"items"`
}
// FIXME make a pure function that returns a struct instead of newWithdrawalResponse
// newWithdrawalResponse ... Converts a database.L2BridgeWithdrawalsResponse to an api.WithdrawalResponse
func CreateWithdrawalResponse(withdrawals *database.L2BridgeWithdrawalsResponse) WithdrawalResponse {
items := make([]WithdrawalItem, len(withdrawals.Withdrawals))
for i, withdrawal := range withdrawals.Withdrawals {
cdh := withdrawal.L2BridgeWithdrawal.CrossDomainMessageHash
if cdh == nil { // Zero value indicates that the withdrawal didn't have a cross domain message
cdh = &common.Hash{0}
}
item := WithdrawalItem{
Guid: withdrawal.L2BridgeWithdrawal.TransactionWithdrawalHash.String(),
L2BlockHash: withdrawal.L2BlockHash.String(),
Timestamp: withdrawal.L2BridgeWithdrawal.Tx.Timestamp,
From: withdrawal.L2BridgeWithdrawal.Tx.FromAddress.String(),
To: withdrawal.L2BridgeWithdrawal.Tx.ToAddress.String(),
TransactionHash: withdrawal.L2TransactionHash.String(),
Amount: withdrawal.L2BridgeWithdrawal.Tx.Amount.String(),
CrossDomainMessageHash: cdh.String(),
L1ProvenTxHash: withdrawal.ProvenL1TransactionHash.String(),
L1FinalizedTxHash: withdrawal.FinalizedL1TransactionHash.String(),
L1TokenAddress: withdrawal.L2BridgeWithdrawal.TokenPair.RemoteTokenAddress.String(),
L2TokenAddress: withdrawal.L2BridgeWithdrawal.TokenPair.LocalTokenAddress.String(),
}
items[i] = item
}
return WithdrawalResponse{
Cursor: withdrawals.Cursor,
HasNextPage: withdrawals.HasNextPage,
Items: items,
}
}
package models_test
import (
"fmt"
"reflect"
"testing"
"github.com/ethereum-optimism/optimism/indexer/api/models"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
)
func TestCreateWithdrawal(t *testing.T) {
// (1) Create a dummy database response object
cdh := common.HexToHash("0x2")
dbWithdrawals := &database.L2BridgeWithdrawalsResponse{
Withdrawals: []database.L2BridgeWithdrawalWithTransactionHashes{
{
L2BridgeWithdrawal: database.L2BridgeWithdrawal{
TransactionWithdrawalHash: common.HexToHash("0x1"),
BridgeTransfer: database.BridgeTransfer{
CrossDomainMessageHash: &cdh,
Tx: database.Transaction{
FromAddress: common.HexToAddress("0x3"),
ToAddress: common.HexToAddress("0x4"),
Timestamp: 5,
},
TokenPair: database.TokenPair{
LocalTokenAddress: common.HexToAddress("0x6"),
RemoteTokenAddress: common.HexToAddress("0x7"),
},
},
},
},
},
}
// (2) Create and validate response object
response := models.CreateWithdrawalResponse(dbWithdrawals)
require.NotEmpty(t, response.Items)
require.Len(t, response.Items, 1)
// (3) Use reflection to check that all fields in WithdrawalItem are populated correctly
item := response.Items[0]
structType := reflect.TypeOf(item)
structVal := reflect.ValueOf(item)
fieldNum := structVal.NumField()
for i := 0; i < fieldNum; i++ {
field := structVal.Field(i)
fieldName := structType.Field(i).Name
isSet := field.IsValid() && !field.IsZero()
require.True(t, isSet, fmt.Sprintf("%s in not set", fieldName))
}
}
......@@ -4,37 +4,9 @@ import (
"net/http"
"github.com/ethereum-optimism/optimism/indexer/api/models"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/go-chi/chi/v5"
)
// FIXME make a pure function that returns a struct instead of newWithdrawalResponse
// newWithdrawalResponse ... Converts a database.L2BridgeWithdrawalsResponse to an api.WithdrawalResponse
func newWithdrawalResponse(withdrawals *database.L2BridgeWithdrawalsResponse) models.WithdrawalResponse {
items := make([]models.WithdrawalItem, len(withdrawals.Withdrawals))
for i, withdrawal := range withdrawals.Withdrawals {
item := models.WithdrawalItem{
Guid: withdrawal.L2BridgeWithdrawal.TransactionWithdrawalHash.String(),
L2BlockHash: withdrawal.L2BlockHash.String(),
From: withdrawal.L2BridgeWithdrawal.Tx.FromAddress.String(),
To: withdrawal.L2BridgeWithdrawal.Tx.ToAddress.String(),
TransactionHash: withdrawal.L2TransactionHash.String(),
Amount: withdrawal.L2BridgeWithdrawal.Tx.Amount.String(),
ProofTransactionHash: withdrawal.ProvenL1TransactionHash.String(),
ClaimTransactionHash: withdrawal.FinalizedL1TransactionHash.String(),
L1TokenAddress: withdrawal.L2BridgeWithdrawal.TokenPair.RemoteTokenAddress.String(),
L2TokenAddress: withdrawal.L2BridgeWithdrawal.TokenPair.LocalTokenAddress.String(),
}
items[i] = item
}
return models.WithdrawalResponse{
Cursor: withdrawals.Cursor,
HasNextPage: withdrawals.HasNextPage,
Items: items,
}
}
// L2WithdrawalsHandler ... Handles /api/v0/withdrawals/{address} GET requests
func (h Routes) L2WithdrawalsHandler(w http.ResponseWriter, r *http.Request) {
addressValue := chi.URLParam(r, "address")
......@@ -68,7 +40,7 @@ func (h Routes) L2WithdrawalsHandler(w http.ResponseWriter, r *http.Request) {
h.logger.Error("Unable to read withdrawals from DB", "err", err.Error())
return
}
response := newWithdrawalResponse(withdrawals)
response := models.CreateWithdrawalResponse(withdrawals)
err = jsonResponse(w, response, http.StatusOK)
if err != nil {
......
package main
import (
"context"
"github.com/urfave/cli/v2"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum-optimism/optimism/indexer"
"github.com/ethereum-optimism/optimism/indexer/api"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum/go-ethereum/params"
"github.com/urfave/cli/v2"
"github.com/ethereum-optimism/optimism/op-service/opio"
)
var (
......@@ -27,7 +32,7 @@ var (
}
)
func runIndexer(ctx *cli.Context) error {
func runIndexer(ctx *cli.Context, shutdown context.CancelCauseFunc) (cliapp.Lifecycle, error) {
log := oplog.NewLogger(oplog.AppOut(ctx), oplog.ReadCLIConfig(ctx)).New("role", "indexer")
oplog.SetGlobalLogHandler(log.GetHandler())
log.Info("running indexer...")
......@@ -35,26 +40,13 @@ func runIndexer(ctx *cli.Context) error {
cfg, err := config.LoadConfig(log, ctx.String(ConfigFlag.Name))
if err != nil {
log.Error("failed to load config", "err", err)
return err
return nil, err
}
db, err := database.NewDB(log, cfg.DB)
if err != nil {
log.Error("failed to connect to database", "err", err)
return err
}
defer db.Close()
indexer, err := indexer.NewIndexer(log, db, cfg.Chain, cfg.RPCs, cfg.HTTPServer, cfg.MetricsServer)
if err != nil {
log.Error("failed to create indexer", "err", err)
return err
}
return indexer.Run(ctx.Context)
return indexer.NewIndexer(ctx.Context, log, &cfg, shutdown)
}
func runApi(ctx *cli.Context) error {
func runApi(ctx *cli.Context, _ context.CancelCauseFunc) (cliapp.Lifecycle, error) {
log := oplog.NewLogger(oplog.AppOut(ctx), oplog.ReadCLIConfig(ctx)).New("role", "api")
oplog.SetGlobalLogHandler(log.GetHandler())
log.Info("running api...")
......@@ -62,21 +54,22 @@ func runApi(ctx *cli.Context) error {
cfg, err := config.LoadConfig(log, ctx.String(ConfigFlag.Name))
if err != nil {
log.Error("failed to load config", "err", err)
return err
return nil, err
}
db, err := database.NewDB(log, cfg.DB)
if err != nil {
log.Error("failed to connect to database", "err", err)
return err
apiCfg := &api.Config{
DB: &api.DBConfigConnector{DBConfig: cfg.DB},
HTTPServer: cfg.HTTPServer,
MetricsServer: cfg.MetricsServer,
}
defer db.Close()
api := api.NewApi(log, db.BridgeTransfers, cfg.HTTPServer, cfg.MetricsServer)
return api.Run(ctx.Context)
return api.NewApi(ctx.Context, log, apiCfg)
}
func runMigrations(ctx *cli.Context) error {
// We don't maintain a complicated lifecycle here, just interrupt to shut down.
ctx.Context = opio.CancelOnInterrupt(ctx.Context)
log := oplog.NewLogger(oplog.AppOut(ctx), oplog.ReadCLIConfig(ctx)).New("role", "migrations")
oplog.SetGlobalLogHandler(log.GetHandler())
log.Info("running migrations...")
......@@ -87,7 +80,7 @@ func runMigrations(ctx *cli.Context) error {
return err
}
db, err := database.NewDB(log, cfg.DB)
db, err := database.NewDB(ctx.Context, log, cfg.DB)
if err != nil {
log.Error("failed to connect to database", "err", err)
return err
......@@ -112,13 +105,13 @@ func newCli(GitCommit string, GitDate string) *cli.App {
Name: "api",
Flags: flags,
Description: "Runs the api service",
Action: runApi,
Action: cliapp.LifecycleCmd(runApi),
},
{
Name: "index",
Flags: flags,
Description: "Runs the indexing service",
Action: runIndexer,
Action: cliapp.LifecycleCmd(runIndexer),
},
{
Name: "migrate",
......
......@@ -4,9 +4,10 @@ import (
"context"
"os"
"github.com/ethereum/go-ethereum/log"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/opio"
"github.com/ethereum/go-ethereum/log"
)
var (
......@@ -15,16 +16,10 @@ var (
)
func main() {
// This is the most root context, used to propagate
// cancellations to all spawned application-level goroutines
ctx, cancel := context.WithCancel(context.Background())
go func() {
opio.BlockOnInterrupts()
cancel()
}()
oplog.SetupDefaults()
app := newCli(GitCommit, GitDate)
// sub-commands set up their individual interrupt lifecycles, which can block on the given interrupt as needed.
ctx := opio.WithInterruptBlocker(context.Background())
if err := app.RunContext(ctx, os.Args); err != nil {
log.Error("application failed", "err", err)
os.Exit(1)
......
......@@ -134,10 +134,11 @@ type DBConfig struct {
Password string `toml:"password"`
}
// Configures the a server
// Configures the server
type ServerConfig struct {
Host string `toml:"host"`
Port int `toml:"port"`
Host string `toml:"host"`
Port int `toml:"port"`
WriteTimeout int `toml:"timeout"`
}
// LoadConfig loads the `indexer.toml` config file from a given path
......
......@@ -30,7 +30,9 @@ type DB struct {
BridgeTransactions BridgeTransactionsDB
}
func NewDB(log log.Logger, dbConfig config.DBConfig) (*DB, error) {
// NewDB connects to the configured DB, and provides client-bindings to it.
// The initial connection may fail, or the dial may be cancelled with the provided context.
func NewDB(ctx context.Context, log log.Logger, dbConfig config.DBConfig) (*DB, error) {
log = log.New("module", "db")
dsn := fmt.Sprintf("host=%s dbname=%s sslmode=disable", dbConfig.Host, dbConfig.Name)
......
......@@ -34,7 +34,7 @@ type E2ETestSuite struct {
// API
Client *client.Client
API *api.API
API *api.APIService
// Indexer
DB *database.DB
......@@ -73,7 +73,7 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
t.Cleanup(func() { opSys.Close() })
// Indexer Configuration and Start
indexerCfg := config.Config{
indexerCfg := &config.Config{
DB: config.DBConfig{
Host: "127.0.0.1",
Port: 5432,
......@@ -106,51 +106,40 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
// the system is running, mark this test for Parallel execution
t.Parallel()
// provide a DB for the unit test. disable logging
silentLog := testlog.Logger(t, log.LvlInfo)
silentLog.SetHandler(log.DiscardHandler())
db, err := database.NewDB(silentLog, indexerCfg.DB)
require.NoError(t, err)
t.Cleanup(func() { db.Close() })
indexerLog := testlog.Logger(t, log.LvlInfo).New("role", "indexer")
indexer, err := indexer.NewIndexer(indexerLog, db, indexerCfg.Chain, indexerCfg.RPCs, indexerCfg.HTTPServer, indexerCfg.MetricsServer)
ix, err := indexer.NewIndexer(context.Background(), indexerLog, indexerCfg, func(cause error) {
if cause != nil {
t.Fatalf("indexer shut down with critical error: %v", cause)
}
})
require.NoError(t, err)
indexerCtx, indexerStop := context.WithCancel(context.Background())
go func() {
err := indexer.Run(indexerCtx)
if err != nil { // panicking here ensures that the test will exit
// during service failure. Using t.Fail() wouldn't be caught
// until all awaiting routines finish which would never happen.
panic(err)
}
}()
require.NoError(t, ix.Start(context.Background()), "cleanly start indexer")
apiLog := testlog.Logger(t, log.LvlInfo).New("role", "indexer_api")
t.Cleanup(func() {
require.NoError(t, ix.Stop(context.Background()), "cleanly shut down indexer")
})
apiCfg := config.ServerConfig{
Host: "127.0.0.1",
Port: 0,
}
apiLog := testlog.Logger(t, log.LvlInfo).New("role", "indexer_api")
mCfg := config.ServerConfig{
Host: "127.0.0.1",
Port: 0,
apiCfg := &api.Config{
DB: &api.TestDBConnector{BridgeTransfers: ix.DB.BridgeTransfers}, // reuse the same DB
HTTPServer: config.ServerConfig{
Host: "127.0.0.1",
Port: 0,
},
MetricsServer: config.ServerConfig{
Host: "127.0.0.1",
Port: 0,
},
}
api := api.NewApi(apiLog, db.BridgeTransfers, apiCfg, mCfg)
apiCtx, apiStop := context.WithCancel(context.Background())
go func() {
err := api.Run(apiCtx)
if err != nil {
panic(err)
}
}()
apiService, err := api.NewApi(context.Background(), apiLog, apiCfg)
require.NoError(t, err, "create indexer API service")
require.NoError(t, apiService.Start(context.Background()), "start indexer API service")
t.Cleanup(func() {
apiStop()
indexerStop()
require.NoError(t, apiService.Stop(context.Background()), "cleanly shut down indexer")
})
// Wait for the API to start listening
......@@ -158,16 +147,15 @@ func createE2ETestSuite(t *testing.T) E2ETestSuite {
client, err := client.NewClient(&client.Config{
PaginationLimit: 100,
BaseURL: fmt.Sprintf("http://%s:%d", apiCfg.Host, api.Port()),
BaseURL: "http://" + apiService.Addr(),
})
require.NoError(t, err)
require.NoError(t, err, "must open indexer API client")
return E2ETestSuite{
t: t,
Client: client,
DB: db,
Indexer: indexer,
DB: ix.DB,
Indexer: ix,
OpCfg: &opCfg,
OpSys: opSys,
L1Client: opSys.Clients["l1"],
......@@ -203,7 +191,7 @@ func setupTestDatabase(t *testing.T) string {
silentLog := log.New()
silentLog.SetHandler(log.DiscardHandler())
db, err := database.NewDB(silentLog, dbConfig)
db, err := database.NewDB(context.Background(), silentLog, dbConfig)
require.NoError(t, err)
defer db.Close()
......
......@@ -7,11 +7,13 @@ import (
"math/big"
"time"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/op-service/clock"
)
type Config struct {
......@@ -31,9 +33,15 @@ type ETL struct {
headerTraversal *node.HeaderTraversal
contracts []common.Address
etlBatches chan ETLBatch
etlBatches chan *ETLBatch
EthClient node.EthClient
// A reference that'll stay populated between intervals
// in the event of failures in order to retry.
headers []types.Header
worker *clock.LoopFn
}
type ETLBatch struct {
......@@ -46,47 +54,54 @@ type ETLBatch struct {
HeadersWithLog map[common.Hash]bool
}
func (etl *ETL) Start(ctx context.Context) error {
done := ctx.Done()
pollTicker := time.NewTicker(etl.loopInterval)
defer pollTicker.Stop()
// Start starts the ETL polling routine. The ETL work should be stopped with Close().
func (etl *ETL) Start() error {
if etl.worker != nil {
return errors.New("already started")
}
etl.log.Info("starting etl...")
etl.worker = clock.NewLoopFn(clock.SystemClock, etl.tick, func() error {
close(etl.etlBatches) // can close the channel now, to signal to the consumer that we're done
etl.log.Info("stopped etl worker loop")
return nil
}, etl.loopInterval)
return nil
}
// A reference that'll stay populated between intervals
// in the event of failures in order to retry.
var headers []types.Header
func (etl *ETL) Close() error {
if etl.worker == nil {
return nil // worker was not running
}
return etl.worker.Close()
}
etl.log.Info("starting etl...")
for {
select {
case <-done:
etl.log.Info("stopping etl")
return nil
case <-pollTicker.C:
done := etl.metrics.RecordInterval()
if len(headers) > 0 {
etl.log.Info("retrying previous batch")
} else {
newHeaders, err := etl.headerTraversal.NextFinalizedHeaders(etl.headerBufferSize)
if err != nil {
etl.log.Error("error querying for headers", "err", err)
} else if len(newHeaders) == 0 {
etl.log.Warn("no new headers. processor unexpectedly at head...")
} else {
headers = newHeaders
etl.metrics.RecordBatchHeaders(len(newHeaders))
}
}
// only clear the reference if we were able to process this batch
err := etl.processBatch(headers)
if err == nil {
headers = nil
}
done(err)
func (etl *ETL) tick(_ context.Context) {
done := etl.metrics.RecordInterval()
if len(etl.headers) > 0 {
etl.log.Info("retrying previous batch")
} else {
newHeaders, err := etl.headerTraversal.NextHeaders(etl.headerBufferSize)
if err != nil {
etl.log.Error("error querying for headers", "err", err)
} else if len(newHeaders) == 0 {
etl.log.Warn("no new headers. etl at head?")
} else {
etl.headers = newHeaders
}
latestHeader := etl.headerTraversal.LatestHeader()
if latestHeader != nil {
etl.metrics.RecordLatestHeight(latestHeader.Number)
}
}
// only clear the reference if we were able to process this batch
err := etl.processBatch(etl.headers)
if err == nil {
etl.headers = nil
}
done(err)
}
func (etl *ETL) processBatch(headers []types.Header) error {
......@@ -98,7 +113,6 @@ func (etl *ETL) processBatch(headers []types.Header) error {
batchLog := etl.log.New("batch_start_block_number", firstHeader.Number, "batch_end_block_number", lastHeader.Number)
batchLog.Info("extracting batch", "size", len(headers))
etl.metrics.RecordBatchLatestHeight(lastHeader.Number)
headerMap := make(map[common.Hash]*types.Header, len(headers))
for i := range headers {
header := headers[i]
......@@ -128,6 +142,7 @@ func (etl *ETL) processBatch(headers []types.Header) error {
for i := range logs.Logs {
log := logs.Logs[i]
headersWithLog[log.BlockHash] = true
if _, ok := headerMap[log.BlockHash]; !ok {
// NOTE. Definitely an error state if the none of the headers were re-orged out in between
// the blocks and logs retrieval operations. Unlikely as long as the confirmation depth has
......@@ -135,13 +150,10 @@ func (etl *ETL) processBatch(headers []types.Header) error {
batchLog.Error("log found with block hash not in the batch", "block_hash", logs.Logs[i].BlockHash, "log_index", logs.Logs[i].Index)
return errors.New("parsed log with a block hash not in the batch")
}
etl.metrics.RecordBatchLog(log.Address)
headersWithLog[log.BlockHash] = true
}
// ensure we use unique downstream references for the etl batch
headersRef := headers
etl.etlBatches <- ETLBatch{Logger: batchLog, Headers: headersRef, HeaderMap: headerMap, Logs: logs.Logs, HeadersWithLog: headersWithLog}
etl.etlBatches <- &ETLBatch{Logger: batchLog, Headers: headersRef, HeaderMap: headerMap, Logs: logs.Logs, HeadersWithLog: headersWithLog}
return nil
}
......@@ -8,26 +8,37 @@ import (
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/tasks"
)
type L1ETL struct {
ETL
db *database.DB
mu *sync.Mutex
// the batch handler may do work that we can interrupt on shutdown
resourceCtx context.Context
resourceCancel context.CancelFunc
tasks tasks.Group
db *database.DB
mu sync.Mutex
listeners []chan interface{}
}
// NewL1ETL creates a new L1ETL instance that will start indexing from different starting points
// depending on the state of the database and the supplied start height.
func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient, contracts config.L1Contracts) (*L1ETL, error) {
func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient,
contracts config.L1Contracts, shutdown context.CancelCauseFunc) (*L1ETL, error) {
log = log.New("etl", "l1")
zeroAddr := common.Address{}
......@@ -71,8 +82,10 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
}
// NOTE - The use of un-buffered channel here assumes that downstream consumers
// will be able to keep up with the rate of incoming batches
etlBatches := make(chan ETLBatch)
// will be able to keep up with the rate of incoming batches.
// When the producer closes the channel we stop consuming from it.
etlBatches := make(chan *ETLBatch)
etl := ETL{
loopInterval: time.Duration(cfg.LoopIntervalMsec) * time.Millisecond,
headerBufferSize: uint64(cfg.HeaderBufferSize),
......@@ -86,81 +99,115 @@ func NewL1ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
EthClient: client,
}
return &L1ETL{ETL: etl, db: db, mu: new(sync.Mutex)}, nil
resCtx, resCancel := context.WithCancel(context.Background())
return &L1ETL{
ETL: etl,
db: db,
resourceCtx: resCtx,
resourceCancel: resCancel,
tasks: tasks.Group{HandleCrit: func(err error) {
shutdown(fmt.Errorf("critical error in L1 ETL: %w", err))
}},
}, nil
}
func (l1Etl *L1ETL) Start(ctx context.Context) error {
errCh := make(chan error, 1)
go func() {
errCh <- l1Etl.ETL.Start(ctx)
}()
func (l1Etl *L1ETL) Close() error {
var result error
// close the producer
if err := l1Etl.ETL.Close(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to close internal ETL: %w", err))
}
// tell the consumer it can stop what it's doing
l1Etl.resourceCancel()
// wait for consumer to pick up on closure of producer
if err := l1Etl.tasks.Wait(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to await batch handler completion: %w", err))
}
return result
}
for {
select {
case err := <-errCh:
return err
// Index incoming batches (only L1 blocks that have an emitted log)
case batch := <-l1Etl.etlBatches:
l1BlockHeaders := make([]database.L1BlockHeader, 0, len(batch.Headers))
for i := range batch.Headers {
if _, ok := batch.HeadersWithLog[batch.Headers[i].Hash()]; ok {
l1BlockHeaders = append(l1BlockHeaders, database.L1BlockHeader{BlockHeader: database.BlockHeaderFromHeader(&batch.Headers[i])})
}
func (l1Etl *L1ETL) Start() error {
// start ETL batch producer
if err := l1Etl.ETL.Start(); err != nil {
return fmt.Errorf("failed to start internal ETL: %w", err)
}
// start ETL batch consumer
l1Etl.tasks.Go(func() error {
for {
// Index incoming batches (only L1 blocks that have an emitted log)
batch, ok := <-l1Etl.etlBatches
if !ok {
l1Etl.log.Info("No more batches, shutting down L1 batch handler")
return nil
}
if len(l1BlockHeaders) == 0 {
batch.Logger.Info("no l1 blocks with logs in batch")
continue
if err := l1Etl.handleBatch(batch); err != nil {
return fmt.Errorf("failed to handle batch, stopping L2 ETL: %w", err)
}
}
})
return nil
}
l1ContractEvents := make([]database.L1ContractEvent, len(batch.Logs))
for i := range batch.Logs {
timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time
l1ContractEvents[i] = database.L1ContractEvent{ContractEvent: database.ContractEventFromLog(&batch.Logs[i], timestamp)}
}
func (l1Etl *L1ETL) handleBatch(batch *ETLBatch) error {
l1BlockHeaders := make([]database.L1BlockHeader, 0, len(batch.Headers))
for i := range batch.Headers {
if _, ok := batch.HeadersWithLog[batch.Headers[i].Hash()]; ok {
l1BlockHeaders = append(l1BlockHeaders, database.L1BlockHeader{BlockHeader: database.BlockHeaderFromHeader(&batch.Headers[i])})
}
}
if len(l1BlockHeaders) == 0 {
batch.Logger.Info("no l1 blocks with logs in batch")
return nil
}
l1ContractEvents := make([]database.L1ContractEvent, len(batch.Logs))
for i := range batch.Logs {
timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time
l1ContractEvents[i] = database.L1ContractEvent{ContractEvent: database.ContractEventFromLog(&batch.Logs[i], timestamp)}
l1Etl.ETL.metrics.RecordIndexedLog(batch.Logs[i].Address)
}
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out
retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
if _, err := retry.Do[interface{}](ctx, 10, retryStrategy, func() (interface{}, error) {
if err := l1Etl.db.Transaction(func(tx *database.DB) error {
if err := tx.Blocks.StoreL1BlockHeaders(l1BlockHeaders); err != nil {
return err
}
// we must have logs if we have l1 blocks
if err := tx.ContractEvents.StoreL1ContractEvents(l1ContractEvents); err != nil {
return err
}
return nil
}); err != nil {
batch.Logger.Error("unable to persist batch", "err", err)
return nil, err
}
l1Etl.ETL.metrics.RecordIndexedHeaders(len(l1BlockHeaders))
l1Etl.ETL.metrics.RecordIndexedLatestHeight(l1BlockHeaders[len(l1BlockHeaders)-1].Number)
l1Etl.ETL.metrics.RecordIndexedLogs(len(l1ContractEvents))
// a-ok!
return nil, nil
}); err != nil {
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out
retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
if _, err := retry.Do[interface{}](l1Etl.resourceCtx, 10, retryStrategy, func() (interface{}, error) {
if err := l1Etl.db.Transaction(func(tx *database.DB) error {
if err := tx.Blocks.StoreL1BlockHeaders(l1BlockHeaders); err != nil {
return err
}
batch.Logger.Info("indexed batch")
// Notify Listeners
l1Etl.mu.Lock()
for i := range l1Etl.listeners {
select {
case l1Etl.listeners[i] <- struct{}{}:
default:
// do nothing if the listener hasn't picked
// up the previous notif
}
// we must have logs if we have l1 blocks
if err := tx.ContractEvents.StoreL1ContractEvents(l1ContractEvents); err != nil {
return err
}
l1Etl.mu.Unlock()
return nil
}); err != nil {
batch.Logger.Error("unable to persist batch", "err", err)
return nil, fmt.Errorf("unable to persist batch: %w", err)
}
l1Etl.ETL.metrics.RecordIndexedHeaders(len(l1BlockHeaders))
l1Etl.ETL.metrics.RecordIndexedLatestHeight(l1BlockHeaders[len(l1BlockHeaders)-1].Number)
// a-ok!
return nil, nil
}); err != nil {
return err
}
batch.Logger.Info("indexed batch")
// Notify Listeners
l1Etl.mu.Lock()
for i := range l1Etl.listeners {
select {
case l1Etl.listeners[i] <- struct{}{}:
default:
// do nothing if the listener hasn't picked
// up the previous notif
}
}
l1Etl.mu.Unlock()
return nil
}
// Notify returns a channel that'll receive a value every time new data has
......
......@@ -62,7 +62,7 @@ func TestL1ETLConstruction(t *testing.T) {
},
assertion: func(etl *L1ETL, err error) {
require.NoError(t, err)
require.Equal(t, etl.headerTraversal.LastHeader().ParentHash, common.HexToHash("0x69"))
require.Equal(t, etl.headerTraversal.LastTraversedHeader().ParentHash, common.HexToHash("0x69"))
},
},
{
......@@ -94,7 +94,7 @@ func TestL1ETLConstruction(t *testing.T) {
},
assertion: func(etl *L1ETL, err error) {
require.NoError(t, err)
header := etl.headerTraversal.LastHeader()
header := etl.headerTraversal.LastTraversedHeader()
require.True(t, header.Number.Cmp(big.NewInt(69)) == 0)
},
......@@ -108,7 +108,9 @@ func TestL1ETLConstruction(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
cfg := Config{StartHeight: ts.start}
etl, err := NewL1ETL(cfg, logger, ts.db.DB, etlMetrics, ts.client, ts.contracts)
etl, err := NewL1ETL(cfg, logger, ts.db.DB, etlMetrics, ts.client, ts.contracts, func(cause error) {
t.Fatalf("crit error: %v", cause)
})
test.assertion(etl, err)
})
}
......
......@@ -3,24 +3,34 @@ package etl
import (
"context"
"errors"
"fmt"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/node"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/tasks"
)
type L2ETL struct {
ETL
// the batch handler may do work that we can interrupt on shutdown
resourceCtx context.Context
resourceCancel context.CancelFunc
tasks tasks.Group
db *database.DB
}
func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient, contracts config.L2Contracts) (*L2ETL, error) {
func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, client node.EthClient,
contracts config.L2Contracts, shutdown context.CancelCauseFunc) (*L2ETL, error) {
log = log.New("etl", "l2")
zeroAddr := common.Address{}
......@@ -54,7 +64,7 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
log.Info("no indexed state, starting from genesis")
}
etlBatches := make(chan ETLBatch)
etlBatches := make(chan *ETLBatch)
etl := ETL{
loopInterval: time.Duration(cfg.LoopIntervalMsec) * time.Millisecond,
headerBufferSize: uint64(cfg.HeaderBufferSize),
......@@ -68,64 +78,96 @@ func NewL2ETL(cfg Config, log log.Logger, db *database.DB, metrics Metricer, cli
EthClient: client,
}
return &L2ETL{ETL: etl, db: db}, nil
resCtx, resCancel := context.WithCancel(context.Background())
return &L2ETL{
ETL: etl,
resourceCtx: resCtx,
resourceCancel: resCancel,
db: db,
tasks: tasks.Group{HandleCrit: func(err error) {
shutdown(fmt.Errorf("critical error in L2 ETL: %w", err))
}},
}, nil
}
func (l2Etl *L2ETL) Start(ctx context.Context) error {
errCh := make(chan error, 1)
go func() {
errCh <- l2Etl.ETL.Start(ctx)
}()
for {
select {
case err := <-errCh:
return err
// Index incoming batches (all L2 Blocks)
case batch := <-l2Etl.etlBatches:
l2BlockHeaders := make([]database.L2BlockHeader, len(batch.Headers))
for i := range batch.Headers {
l2BlockHeaders[i] = database.L2BlockHeader{BlockHeader: database.BlockHeaderFromHeader(&batch.Headers[i])}
}
func (l2Etl *L2ETL) Close() error {
var result error
// close the producer
if err := l2Etl.ETL.Close(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to close internal ETL: %w", err))
}
// tell the consumer it can stop what it's doing
l2Etl.resourceCancel()
// wait for consumer to pick up on closure of producer
if err := l2Etl.tasks.Wait(); err != nil {
result = errors.Join(result, fmt.Errorf("failed to await batch handler completion: %w", err))
}
return result
}
func (l2Etl *L2ETL) Start() error {
// start ETL batch producer
if err := l2Etl.ETL.Start(); err != nil {
return fmt.Errorf("failed to start internal ETL: %w", err)
}
l2ContractEvents := make([]database.L2ContractEvent, len(batch.Logs))
for i := range batch.Logs {
timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time
l2ContractEvents[i] = database.L2ContractEvent{ContractEvent: database.ContractEventFromLog(&batch.Logs[i], timestamp)}
// start ETL batch consumer
l2Etl.tasks.Go(func() error {
for {
// Index incoming batches (all L2 blocks)
batch, ok := <-l2Etl.etlBatches
if !ok {
l2Etl.log.Info("No more batches, shutting down L2 batch handler")
return nil
}
if err := l2Etl.handleBatch(batch); err != nil {
return fmt.Errorf("failed to handle batch, stopping L2 ETL: %w", err)
}
}
})
return nil
}
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out
retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
if _, err := retry.Do[interface{}](ctx, 10, retryStrategy, func() (interface{}, error) {
if err := l2Etl.db.Transaction(func(tx *database.DB) error {
if err := tx.Blocks.StoreL2BlockHeaders(l2BlockHeaders); err != nil {
return err
}
if len(l2ContractEvents) > 0 {
if err := tx.ContractEvents.StoreL2ContractEvents(l2ContractEvents); err != nil {
return err
}
}
return nil
}); err != nil {
batch.Logger.Error("unable to persist batch", "err", err)
return nil, err
}
func (l2Etl *L2ETL) handleBatch(batch *ETLBatch) error {
l2BlockHeaders := make([]database.L2BlockHeader, len(batch.Headers))
for i := range batch.Headers {
l2BlockHeaders[i] = database.L2BlockHeader{BlockHeader: database.BlockHeaderFromHeader(&batch.Headers[i])}
}
l2Etl.ETL.metrics.RecordIndexedHeaders(len(l2BlockHeaders))
l2Etl.ETL.metrics.RecordIndexedLatestHeight(l2BlockHeaders[len(l2BlockHeaders)-1].Number)
if len(l2ContractEvents) > 0 {
l2Etl.ETL.metrics.RecordIndexedLogs(len(l2ContractEvents))
}
l2ContractEvents := make([]database.L2ContractEvent, len(batch.Logs))
for i := range batch.Logs {
timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time
l2ContractEvents[i] = database.L2ContractEvent{ContractEvent: database.ContractEventFromLog(&batch.Logs[i], timestamp)}
l2Etl.ETL.metrics.RecordIndexedLog(batch.Logs[i].Address)
}
// a-ok!
return nil, nil
}); err != nil {
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out
retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
if _, err := retry.Do[interface{}](l2Etl.resourceCtx, 10, retryStrategy, func() (interface{}, error) {
if err := l2Etl.db.Transaction(func(tx *database.DB) error {
if err := tx.Blocks.StoreL2BlockHeaders(l2BlockHeaders); err != nil {
return err
}
batch.Logger.Info("indexed batch")
if len(l2ContractEvents) > 0 {
if err := tx.ContractEvents.StoreL2ContractEvents(l2ContractEvents); err != nil {
return err
}
}
return nil
}); err != nil {
batch.Logger.Error("unable to persist batch", "err", err)
return nil, err
}
l2Etl.ETL.metrics.RecordIndexedHeaders(len(l2BlockHeaders))
l2Etl.ETL.metrics.RecordIndexedLatestHeight(l2BlockHeaders[len(l2BlockHeaders)-1].Number)
// a-ok!
return nil, nil
}); err != nil {
return err
}
batch.Logger.Info("indexed batch")
return nil
}
......@@ -9,35 +9,28 @@ import (
)
var (
MetricsNamespace string = "etl"
MetricsNamespace string = "op_indexer_etl"
)
type Metricer interface {
RecordInterval() (done func(err error))
// Batch Extraction
RecordBatchLatestHeight(height *big.Int)
RecordBatchHeaders(size int)
RecordBatchLog(contractAddress common.Address)
RecordLatestHeight(height *big.Int)
// Indexed Batches
RecordIndexedLatestHeight(height *big.Int)
RecordIndexedHeaders(size int)
RecordIndexedLogs(size int)
RecordIndexedLog(contractAddress common.Address)
}
type etlMetrics struct {
intervalTick prometheus.Counter
intervalDuration prometheus.Histogram
batchFailures prometheus.Counter
batchLatestHeight prometheus.Gauge
batchHeaders prometheus.Counter
batchLogs *prometheus.CounterVec
intervalFailures prometheus.Counter
latestHeight prometheus.Gauge
indexedLatestHeight prometheus.Gauge
indexedHeaders prometheus.Counter
indexedLogs prometheus.Counter
indexedLogs *prometheus.CounterVec
}
func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
......@@ -55,31 +48,17 @@ func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
Name: "interval_seconds",
Help: "duration elapsed for during the processing loop",
}),
batchFailures: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "failures_total",
Help: "number of times the etl encountered a failure to extract a batch",
}),
batchLatestHeight: factory.NewGauge(prometheus.GaugeOpts{
intervalFailures: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "height",
Help: "the latest block height observed by an etl interval",
Name: "interval_failures_total",
Help: "number of times the etl encountered a failure during the processing loop",
}),
batchHeaders: factory.NewCounter(prometheus.CounterOpts{
latestHeight: factory.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "headers_total",
Help: "number of headers observed by the etl",
}),
batchLogs: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "logs_total",
Help: "number of logs observed by the etl",
}, []string{
"contract",
Name: "latest_height",
Help: "the latest height reported by the connected client",
}),
indexedLatestHeight: factory.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
......@@ -93,11 +72,13 @@ func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
Name: "indexed_headers_total",
Help: "number of headers indexed by the etl",
}),
indexedLogs: factory.NewCounter(prometheus.CounterOpts{
indexedLogs: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "indexed_logs_total",
Help: "number of logs indexed by the etl",
}, []string{
"contract",
}),
}
}
......@@ -107,23 +88,14 @@ func (m *etlMetrics) RecordInterval() func(error) {
timer := prometheus.NewTimer(m.intervalDuration)
return func(err error) {
if err != nil {
m.batchFailures.Inc()
m.intervalFailures.Inc()
}
timer.ObserveDuration()
}
}
func (m *etlMetrics) RecordBatchLatestHeight(height *big.Int) {
m.batchLatestHeight.Set(float64(height.Uint64()))
}
func (m *etlMetrics) RecordBatchHeaders(size int) {
m.batchHeaders.Add(float64(size))
}
func (m *etlMetrics) RecordBatchLog(contractAddress common.Address) {
m.batchLogs.WithLabelValues(contractAddress.String()).Inc()
func (m *etlMetrics) RecordLatestHeight(height *big.Int) {
m.latestHeight.Set(float64(height.Uint64()))
}
func (m *etlMetrics) RecordIndexedLatestHeight(height *big.Int) {
......@@ -134,6 +106,6 @@ func (m *etlMetrics) RecordIndexedHeaders(size int) {
m.indexedHeaders.Add(float64(size))
}
func (m *etlMetrics) RecordIndexedLogs(size int) {
m.indexedLogs.Add(float64(size))
func (m *etlMetrics) RecordIndexedLog(addr common.Address) {
m.indexedLogs.WithLabelValues(addr.String()).Inc()
}
This diff is collapsed.
......@@ -29,6 +29,7 @@ name = "$INDEXER_DB_NAME"
[http]
host = "127.0.0.1"
port = 8080
timeout = 10
[metrics]
host = "127.0.0.1"
......
......@@ -40,23 +40,27 @@ type EthClient interface {
StorageHash(common.Address, *big.Int) (common.Hash, error)
FilterLogs(ethereum.FilterQuery) (Logs, error)
// Close closes the underlying RPC connection.
// RPC close does not return any errors, but does shut down e.g. a websocket connection.
Close()
}
type clnt struct {
rpc RPC
}
func DialEthClient(rpcUrl string, metrics Metricer) (EthClient, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultDialTimeout)
func DialEthClient(ctx context.Context, rpcUrl string, metrics Metricer) (EthClient, error) {
ctx, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
bOff := retry.Exponential()
rpcClient, err := retry.Do(ctxwt, defaultDialAttempts, bOff, func() (*rpc.Client, error) {
rpcClient, err := retry.Do(ctx, defaultDialAttempts, bOff, func() (*rpc.Client, error) {
if !client.IsURLAvailable(rpcUrl) {
return nil, fmt.Errorf("address unavailable (%s)", rpcUrl)
}
client, err := rpc.DialContext(ctxwt, rpcUrl)
client, err := rpc.DialContext(ctx, rpcUrl)
if err != nil {
return nil, fmt.Errorf("failed to dial address (%s): %w", rpcUrl, err)
}
......@@ -192,6 +196,10 @@ func (c *clnt) StorageHash(address common.Address, blockNumber *big.Int) (common
return proof.StorageHash, nil
}
func (c *clnt) Close() {
c.rpc.Close()
}
type Logs struct {
Logs []types.Log
ToBlockHeader *types.Header
......
package node
import (
"context"
"fmt"
"net"
"strings"
......@@ -21,14 +22,14 @@ func TestDialEthClientUnavailable(t *testing.T) {
metrics := &clientMetrics{}
// available
_, err = DialEthClient(addr, metrics)
_, err = DialEthClient(context.Background(), addr, metrics)
require.NoError(t, err)
// :0 requests a new unbound port
_, err = DialEthClient("http://localhost:0", metrics)
_, err = DialEthClient(context.Background(), "http://localhost:0", metrics)
require.Error(t, err)
// Fail open if we don't recognize the scheme
_, err = DialEthClient("mailto://example.com", metrics)
_, err = DialEthClient(context.Background(), "mailto://example.com", metrics)
require.Error(t, err)
}
......@@ -17,38 +17,56 @@ var (
type HeaderTraversal struct {
ethClient EthClient
lastHeader *types.Header
latestHeader *types.Header
lastTraversedHeader *types.Header
blockConfirmationDepth *big.Int
}
// NewHeaderTraversal instantiates a new instance of HeaderTraversal against the supplied rpc client.
// The HeaderTraversal will start fetching blocks starting from the supplied header unless nil, indicating genesis.
func NewHeaderTraversal(ethClient EthClient, fromHeader *types.Header, confDepth *big.Int) *HeaderTraversal {
return &HeaderTraversal{ethClient: ethClient, lastHeader: fromHeader, blockConfirmationDepth: confDepth}
return &HeaderTraversal{
ethClient: ethClient,
lastTraversedHeader: fromHeader,
blockConfirmationDepth: confDepth,
}
}
// LatestHeader returns the latest header reported by underlying eth client
// as headers are traversed via `NextHeaders`.
func (f *HeaderTraversal) LatestHeader() *types.Header {
return f.latestHeader
}
// LastHeader returns the last header that was fetched by the HeaderTraversal
// This is useful for testing the state of the HeaderTraversal
func (f *HeaderTraversal) LastHeader() *types.Header {
return f.lastHeader
// LastTraversedHeader returns the last header traversed.
// - This is useful for testing the state of the HeaderTraversal
// - LastTraversedHeader may be << LatestHeader depending on the number
// headers traversed via `NextHeaders`.
func (f *HeaderTraversal) LastTraversedHeader() *types.Header {
return f.lastTraversedHeader
}
// NextFinalizedHeaders retrieves the next set of headers that have been
// NextHeaders retrieves the next set of headers that have been
// marked as finalized by the connected client, bounded by the supplied size
func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, error) {
latestBlockHeader, err := f.ethClient.BlockHeaderByNumber(nil)
func (f *HeaderTraversal) NextHeaders(maxSize uint64) ([]types.Header, error) {
latestHeader, err := f.ethClient.BlockHeaderByNumber(nil)
if err != nil {
return nil, fmt.Errorf("unable to query latest block: %w", err)
} else if latestHeader == nil {
return nil, fmt.Errorf("latest header unreported")
} else {
f.latestHeader = latestHeader
}
endHeight := new(big.Int).Sub(latestBlockHeader.Number, f.blockConfirmationDepth)
endHeight := new(big.Int).Sub(latestHeader.Number, f.blockConfirmationDepth)
if endHeight.Sign() < 0 {
// No blocks with the provided confirmation depth available
return nil, nil
}
if f.lastHeader != nil {
cmp := f.lastHeader.Number.Cmp(endHeight)
if f.lastTraversedHeader != nil {
cmp := f.lastTraversedHeader.Number.Cmp(endHeight)
if cmp == 0 { // We're synced to head and there are no new headers
return nil, nil
} else if cmp > 0 {
......@@ -57,8 +75,8 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header,
}
nextHeight := bigint.Zero
if f.lastHeader != nil {
nextHeight = new(big.Int).Add(f.lastHeader.Number, bigint.One)
if f.lastTraversedHeader != nil {
nextHeight = new(big.Int).Add(f.lastTraversedHeader.Number, bigint.One)
}
// endHeight = (nextHeight - endHeight) <= maxSize
......@@ -71,12 +89,12 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header,
numHeaders := len(headers)
if numHeaders == 0 {
return nil, nil
} else if f.lastHeader != nil && headers[0].ParentHash != f.lastHeader.Hash() {
} else if f.lastTraversedHeader != nil && headers[0].ParentHash != f.lastTraversedHeader.Hash() {
// The indexer's state is in an irrecoverable state relative to the provider. This
// should never happen since the indexer is dealing with only finalized blocks.
return nil, ErrHeaderTraversalAndProviderMismatchedState
}
f.lastHeader = &headers[numHeaders-1]
f.lastTraversedHeader = &headers[numHeaders-1]
return headers, nil
}
......@@ -33,44 +33,55 @@ func makeHeaders(numHeaders uint64, prevHeader *types.Header) []types.Header {
return headers
}
func TestHeaderTraversalNextFinalizedHeadersNoOp(t *testing.T) {
func TestHeaderTraversalNextHeadersNoOp(t *testing.T) {
client := new(MockEthClient)
// start from block 10 as the latest fetched block
lastHeader := &types.Header{Number: big.NewInt(10)}
headerTraversal := NewHeaderTraversal(client, lastHeader, bigint.Zero)
LastTraversedHeader := &types.Header{Number: big.NewInt(10)}
headerTraversal := NewHeaderTraversal(client, LastTraversedHeader, bigint.Zero)
require.Nil(t, headerTraversal.LatestHeader())
require.NotNil(t, headerTraversal.LastTraversedHeader())
// no new headers when matched with head
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(lastHeader, nil)
headers, err := headerTraversal.NextFinalizedHeaders(100)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(LastTraversedHeader, nil)
headers, err := headerTraversal.NextHeaders(100)
require.NoError(t, err)
require.Empty(t, headers)
require.NotNil(t, headerTraversal.LatestHeader())
require.NotNil(t, headerTraversal.LastTraversedHeader())
require.Equal(t, LastTraversedHeader.Number.Uint64(), headerTraversal.LatestHeader().Number.Uint64())
}
func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) {
func TestHeaderTraversalNextHeadersCursored(t *testing.T) {
client := new(MockEthClient)
// start from genesis
headerTraversal := NewHeaderTraversal(client, nil, bigint.Zero)
// blocks [0..4]
headers := makeHeaders(5, nil)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5)
headers := makeHeaders(10, nil)
// blocks [0..4]. Latest reported is 7
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[7], nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers[:5], nil)
_, err := headerTraversal.NextHeaders(5)
require.NoError(t, err)
require.Len(t, headers, 5)
// blocks [5..9]
headers = makeHeaders(5, &headers[len(headers)-1])
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(9))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(5)
require.Equal(t, uint64(7), headerTraversal.LatestHeader().Number.Uint64())
require.Equal(t, uint64(4), headerTraversal.LastTraversedHeader().Number.Uint64())
// blocks [5..9]. Latest Reported is 9
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[9], nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(9))).Return(headers[5:], nil)
_, err = headerTraversal.NextHeaders(5)
require.NoError(t, err)
require.Len(t, headers, 5)
require.Equal(t, uint64(9), headerTraversal.LatestHeader().Number.Uint64())
require.Equal(t, uint64(9), headerTraversal.LastTraversedHeader().Number.Uint64())
}
func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) {
func TestHeaderTraversalNextHeadersMaxSize(t *testing.T) {
client := new(MockEthClient)
// start from genesis
......@@ -82,16 +93,22 @@ func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) {
// clamped by the supplied size
headers := makeHeaders(5, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5)
headers, err := headerTraversal.NextHeaders(5)
require.NoError(t, err)
require.Len(t, headers, 5)
require.Equal(t, uint64(100), headerTraversal.LatestHeader().Number.Uint64())
require.Equal(t, uint64(4), headerTraversal.LastTraversedHeader().Number.Uint64())
// clamped by the supplied size. FinalizedHeight == 100
headers = makeHeaders(10, &headers[len(headers)-1])
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(14))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(10)
headers, err = headerTraversal.NextHeaders(10)
require.NoError(t, err)
require.Len(t, headers, 10)
require.Equal(t, uint64(100), headerTraversal.LatestHeader().Number.Uint64())
require.Equal(t, uint64(14), headerTraversal.LastTraversedHeader().Number.Uint64())
}
func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
......@@ -104,7 +121,7 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
headers := makeHeaders(5, nil)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5)
headers, err := headerTraversal.NextHeaders(5)
require.NoError(t, err)
require.Len(t, headers, 5)
......@@ -112,7 +129,7 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
headers = makeHeaders(5, nil)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&types.Header{Number: big.NewInt(9)}, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(9))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(5)
headers, err = headerTraversal.NextHeaders(5)
require.Nil(t, headers)
require.Equal(t, ErrHeaderTraversalAndProviderMismatchedState, err)
}
......@@ -12,7 +12,7 @@ import (
)
var (
MetricsNamespace = "rpc"
MetricsNamespace = "op_indexer_rpc"
batchMethod = "<batch>"
)
......
......@@ -45,3 +45,6 @@ func (m *MockEthClient) FilterLogs(query ethereum.FilterQuery) (Logs, error) {
args := m.Called(query)
return args.Get(0).(Logs), args.Error(1)
}
func (m *MockEthClient) Close() {
}
......@@ -3,16 +3,18 @@ package processors
import (
"context"
"errors"
"fmt"
"math/big"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/bigint"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/indexer/etl"
"github.com/ethereum-optimism/optimism/indexer/processors/bridge"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/tasks"
)
type BridgeProcessor struct {
......@@ -20,6 +22,10 @@ type BridgeProcessor struct {
db *database.DB
metrics bridge.Metricer
resourceCtx context.Context
resourceCancel context.CancelFunc
tasks tasks.Group
l1Etl *etl.L1ETL
chainConfig config.ChainConfig
......@@ -27,7 +33,8 @@ type BridgeProcessor struct {
LatestL2Header *types.Header
}
func NewBridgeProcessor(log log.Logger, db *database.DB, metrics bridge.Metricer, l1Etl *etl.L1ETL, chainConfig config.ChainConfig) (*BridgeProcessor, error) {
func NewBridgeProcessor(log log.Logger, db *database.DB, metrics bridge.Metricer, l1Etl *etl.L1ETL,
chainConfig config.ChainConfig, shutdown context.CancelCauseFunc) (*BridgeProcessor, error) {
log = log.New("processor", "bridge")
latestL1Header, err := db.BridgeTransactions.L1LatestBlockHeader()
......@@ -57,11 +64,25 @@ func NewBridgeProcessor(log log.Logger, db *database.DB, metrics bridge.Metricer
log.Info("detected latest indexed bridge state", "l1_block_number", l1Height, "l2_block_number", l2Height)
}
return &BridgeProcessor{log, db, metrics, l1Etl, chainConfig, l1Header, l2Header}, nil
resCtx, resCancel := context.WithCancel(context.Background())
return &BridgeProcessor{
log: log,
db: db,
metrics: metrics,
l1Etl: l1Etl,
resourceCtx: resCtx,
resourceCancel: resCancel,
chainConfig: chainConfig,
LatestL1Header: l1Header,
LatestL2Header: l2Header,
tasks: tasks.Group{HandleCrit: func(err error) {
shutdown(fmt.Errorf("critical error in bridge processor: %w", err))
}},
}, nil
}
func (b *BridgeProcessor) Start(ctx context.Context) error {
done := ctx.Done()
func (b *BridgeProcessor) Start() error {
b.log.Info("starting bridge processor...")
// Fire off independently on startup to check for
// new data or if we've indexed new L1 data.
......@@ -69,21 +90,35 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {
startup := make(chan interface{}, 1)
startup <- nil
b.log.Info("starting bridge processor...")
for {
select {
case <-done:
b.log.Info("stopping bridge processor")
return nil
// Tickers
case <-startup:
case <-l1EtlUpdates:
b.tasks.Go(func() error {
for {
select {
case <-b.resourceCtx.Done():
b.log.Info("stopping bridge processor")
return nil
// Tickers
case <-startup:
case <-l1EtlUpdates:
}
done := b.metrics.RecordInterval()
// TODO(8013): why log all the errors and return the same thing, if we just return the error, and log here?
err := b.run()
if err != nil {
b.log.Error("bridge processor error", "err", err)
}
done(err)
}
})
return nil
}
done := b.metrics.RecordInterval()
done(b.run())
}
func (b *BridgeProcessor) Close() error {
// signal that we can stop any ongoing work
b.resourceCancel()
// await the work to stop
return b.tasks.Wait()
}
// Runs the processing loop. In order to ensure all seen bridge finalization events
......@@ -231,6 +266,9 @@ func (b *BridgeProcessor) run() error {
batchLog.Info("indexed bridge events", "latest_l1_block_number", toL1Height, "latest_l2_block_number", toL2Height)
b.LatestL1Header = latestEpoch.L1BlockHeader.RLPHeader.Header()
b.metrics.RecordLatestIndexedL1Height(b.LatestL1Header.Number)
b.LatestL2Header = latestEpoch.L2BlockHeader.RLPHeader.Header()
b.metrics.RecordLatestIndexedL2Height(b.LatestL2Header.Number)
return nil
}
......@@ -10,7 +10,7 @@ import (
)
var (
MetricsNamespace string = "bridge"
MetricsNamespace string = "op_indexer_bridge"
)
type L1Metricer interface {
......@@ -83,7 +83,7 @@ func NewMetrics(registry *prometheus.Registry) Metricer {
}),
intervalFailures: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "failures_total",
Name: "interval_failures_total",
Help: "number of failures encountered",
}),
latestL1Height: factory.NewGauge(prometheus.GaugeOpts{
......
......@@ -5,6 +5,7 @@ import (
"math"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/core/types"
......@@ -26,8 +27,8 @@ type channel struct {
confirmedTransactions map[txID]eth.BlockID
}
func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) (*channel, error) {
cb, err := newChannelBuilder(cfg)
func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rcfg *rollup.Config) (*channel, error) {
cb, err := newChannelBuilder(cfg, rcfg)
if err != nil {
return nil, fmt.Errorf("creating new channel: %w", err)
}
......
......@@ -8,6 +8,7 @@ import (
"math"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/core/types"
)
......@@ -58,6 +59,9 @@ type ChannelConfig struct {
// CompressorConfig contains the configuration for creating new compressors.
CompressorConfig compressor.Config
// BatchType indicates whether the channel uses SingularBatch or SpanBatch.
BatchType uint
}
// Check validates the [ChannelConfig] parameters.
......@@ -83,6 +87,10 @@ func (cc *ChannelConfig) Check() error {
return fmt.Errorf("max frame size %d is less than the minimum 23", cc.MaxFrameSize)
}
if cc.BatchType > derive.SpanBatchType {
return fmt.Errorf("unrecognized batch type: %d", cc.BatchType)
}
return nil
}
......@@ -114,7 +122,7 @@ type channelBuilder struct {
// guaranteed to be a ChannelFullError wrapping the specific reason.
fullErr error
// current channel
co *derive.ChannelOut
co derive.ChannelOut
// list of blocks in the channel. Saved in case the channel must be rebuilt
blocks []*types.Block
// frames data queue, to be send as txs
......@@ -127,12 +135,16 @@ type channelBuilder struct {
// newChannelBuilder creates a new channel builder or returns an error if the
// channel out could not be created.
func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) {
func newChannelBuilder(cfg ChannelConfig, rcfg *rollup.Config) (*channelBuilder, error) {
c, err := cfg.CompressorConfig.NewCompressor()
if err != nil {
return nil, err
}
co, err := derive.NewChannelOut(c)
var spanBatchBuilder *derive.SpanBatchBuilder
if cfg.BatchType == derive.SpanBatchType {
spanBatchBuilder = derive.NewSpanBatchBuilder(rcfg.Genesis.L2Time, rcfg.L2ChainID)
}
co, err := derive.NewChannelOut(cfg.BatchType, c, spanBatchBuilder)
if err != nil {
return nil, err
}
......@@ -194,12 +206,12 @@ func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error
return derive.L1BlockInfo{}, c.FullErr()
}
batch, l1info, err := derive.BlockToBatch(block)
batch, l1info, err := derive.BlockToSingularBatch(block)
if err != nil {
return l1info, fmt.Errorf("converting block to batch: %w", err)
}
if _, err = c.co.AddBatch(batch); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.CompressorFullErr) {
if _, err = c.co.AddSingularBatch(batch, l1info.SequenceNumber); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.CompressorFullErr) {
c.setFullErr(err)
return l1info, c.FullErr()
} else if err != nil {
......@@ -252,7 +264,7 @@ func (c *channelBuilder) updateDurationTimeout(l1BlockNum uint64) {
// derived from the batch's origin L1 block. The timeout is only moved forward
// if the derived sequencer window timeout is earlier than the currently set
// timeout.
func (c *channelBuilder) updateSwTimeout(batch *derive.BatchData) {
func (c *channelBuilder) updateSwTimeout(batch *derive.SingularBatch) {
timeout := uint64(batch.EpochNum) + c.cfg.SeqWindowSize - c.cfg.SubSafetyMargin
c.updateTimeout(timeout, ErrSeqWindowClose)
}
......
This diff is collapsed.
......@@ -7,6 +7,7 @@ import (
"sync"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
......@@ -28,6 +29,7 @@ type channelManager struct {
log log.Logger
metr metrics.Metricer
cfg ChannelConfig
rcfg *rollup.Config
// All blocks since the last request for new tx data.
blocks []*types.Block
......@@ -45,17 +47,18 @@ type channelManager struct {
closed bool
}
func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) *channelManager {
func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rcfg *rollup.Config) *channelManager {
return &channelManager{
log: log,
metr: metr,
cfg: cfg,
rcfg: rcfg,
txChannels: make(map[txID]*channel),
}
}
// Clear clears the entire state of the channel manager.
// It is intended to be used after an L2 reorg.
// It is intended to be used before launching op-batcher and after an L2 reorg.
func (s *channelManager) Clear() {
s.mu.Lock()
defer s.mu.Unlock()
......@@ -195,7 +198,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
return nil
}
pc, err := newChannel(s.log, s.metr, s.cfg)
pc, err := newChannel(s.log, s.metr, s.cfg, s.rcfg)
if err != nil {
return fmt.Errorf("creating new channel: %w", err)
}
......
......@@ -9,6 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
derivetest "github.com/ethereum-optimism/optimism/op-node/rollup/derive/test"
"github.com/ethereum-optimism/optimism/op-service/eth"
......@@ -19,11 +20,41 @@ import (
"github.com/stretchr/testify/require"
)
// TestChannelManagerReturnsErrReorg ensures that the channel manager
func TestChannelManagerBatchType(t *testing.T) {
tests := []struct {
name string
f func(t *testing.T, batchType uint)
}{
{"ChannelManagerReturnsErrReorg", ChannelManagerReturnsErrReorg},
{"ChannelManagerReturnsErrReorgWhenDrained", ChannelManagerReturnsErrReorgWhenDrained},
{"ChannelManager_Clear", ChannelManager_Clear},
{"ChannelManager_TxResend", ChannelManager_TxResend},
{"ChannelManagerCloseBeforeFirstUse", ChannelManagerCloseBeforeFirstUse},
{"ChannelManagerCloseNoPendingChannel", ChannelManagerCloseNoPendingChannel},
{"ChannelManagerClosePendingChannel", ChannelManagerClosePendingChannel},
{"ChannelManagerCloseAllTxsFailed", ChannelManagerCloseAllTxsFailed},
}
for _, test := range tests {
test := test
t.Run(test.name+"_SingularBatch", func(t *testing.T) {
test.f(t, derive.SingularBatchType)
})
}
for _, test := range tests {
test := test
t.Run(test.name+"_SpanBatch", func(t *testing.T) {
test.f(t, derive.SpanBatchType)
})
}
}
// ChannelManagerReturnsErrReorg ensures that the channel manager
// detects a reorg when it has cached L1 blocks.
func TestChannelManagerReturnsErrReorg(t *testing.T) {
func ChannelManagerReturnsErrReorg(t *testing.T, batchType uint) {
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{})
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{BatchType: batchType}, &rollup.Config{})
m.Clear()
a := types.NewBlock(&types.Header{
Number: big.NewInt(0),
......@@ -49,9 +80,9 @@ func TestChannelManagerReturnsErrReorg(t *testing.T) {
require.Equal(t, []*types.Block{a, b, c}, m.blocks)
}
// TestChannelManagerReturnsErrReorgWhenDrained ensures that the channel manager
// ChannelManagerReturnsErrReorgWhenDrained ensures that the channel manager
// detects a reorg even if it does not have any blocks inside it.
func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
func ChannelManagerReturnsErrReorgWhenDrained(t *testing.T, batchType uint) {
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
......@@ -61,7 +92,11 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&rollup.Config{},
)
m.Clear()
a := newMiniL2Block(0)
x := newMiniL2BlockWithNumberParent(0, big.NewInt(1), common.Hash{0xff})
......@@ -76,8 +111,8 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
require.ErrorIs(t, m.AddL2Block(x), ErrReorg)
}
// TestChannelManager_Clear tests clearing the channel manager.
func TestChannelManager_Clear(t *testing.T) {
// ChannelManager_Clear tests clearing the channel manager.
func ChannelManager_Clear(t *testing.T, batchType uint) {
require := require.New(t)
// Create a channel manager
......@@ -96,7 +131,10 @@ func TestChannelManager_Clear(t *testing.T) {
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&defaultTestRollupConfig,
)
// Channel Manager state should be empty by default
require.Empty(m.blocks)
......@@ -104,9 +142,11 @@ func TestChannelManager_Clear(t *testing.T) {
require.Nil(m.currentChannel)
require.Empty(m.channelQueue)
require.Empty(m.txChannels)
// Set the last block
m.Clear()
// Add a block to the channel manager
a, _ := derivetest.RandomL2Block(rng, 4)
a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID)
newL1Tip := a.Hash()
l1BlockID := eth.BlockID{
Hash: a.Hash(),
......@@ -153,7 +193,7 @@ func TestChannelManager_Clear(t *testing.T) {
require.Empty(m.txChannels)
}
func TestChannelManager_TxResend(t *testing.T) {
func ChannelManager_TxResend(t *testing.T, batchType uint) {
require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LvlError)
......@@ -165,9 +205,13 @@ func TestChannelManager_TxResend(t *testing.T) {
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&defaultTestRollupConfig,
)
m.Clear()
a, _ := derivetest.RandomL2Block(rng, 4)
a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID)
require.NoError(m.AddL2Block(a))
......@@ -195,9 +239,9 @@ func TestChannelManager_TxResend(t *testing.T) {
require.Len(fs, 1)
}
// TestChannelManagerCloseBeforeFirstUse ensures that the channel manager
// ChannelManagerCloseBeforeFirstUse ensures that the channel manager
// will not produce any frames if closed immediately.
func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
func ChannelManagerCloseBeforeFirstUse(t *testing.T, batchType uint) {
require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LvlCrit)
......@@ -209,9 +253,13 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
TargetFrameSize: 0,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&defaultTestRollupConfig,
)
m.Clear()
a, _ := derivetest.RandomL2Block(rng, 4)
a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID)
m.Close()
......@@ -222,10 +270,10 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
require.ErrorIs(err, io.EOF, "Expected closed channel manager to contain no tx data")
}
// TestChannelManagerCloseNoPendingChannel ensures that the channel manager
// ChannelManagerCloseNoPendingChannel ensures that the channel manager
// can gracefully close with no pending channels, and will not emit any new
// channel frames.
func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
func ChannelManagerCloseNoPendingChannel(t *testing.T, batchType uint) {
require := require.New(t)
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
......@@ -237,7 +285,11 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&defaultTestRollupConfig,
)
m.Clear()
a := newMiniL2Block(0)
b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash())
......@@ -261,25 +313,37 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
require.ErrorIs(err, io.EOF, "Expected closed channel manager to return no new tx data")
}
// TestChannelManagerCloseNoPendingChannel ensures that the channel manager
// ChannelManagerCloseNoPendingChannel ensures that the channel manager
// can gracefully close with a pending channel, and will not produce any
// new channel frames after this point.
func TestChannelManagerClosePendingChannel(t *testing.T) {
func ChannelManagerClosePendingChannel(t *testing.T, batchType uint) {
require := require.New(t)
// The number of batch txs depends on compression of the random data, hence the static test RNG seed.
// Example of different RNG seed that creates less than 2 frames: 1698700588902821588
rng := rand.New(rand.NewSource(123))
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
MaxFrameSize: 1000,
MaxFrameSize: 10000,
ChannelTimeout: 1000,
CompressorConfig: compressor.Config{
TargetNumFrames: 100,
TargetFrameSize: 1000,
TargetNumFrames: 1,
TargetFrameSize: 10000,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&defaultTestRollupConfig,
)
m.Clear()
a := newMiniL2Block(50_000)
b := newMiniL2BlockWithNumberParent(10, big.NewInt(1), a.Hash())
numTx := 20 // Adjust number of txs to make 2 frames
a := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
b := derivetest.RandomL2BlockWithChainId(rng, 10, defaultTestRollupConfig.L2ChainID)
bHeader := b.Header()
bHeader.Number = new(big.Int).Add(a.Number(), big.NewInt(1))
bHeader.ParentHash = a.Hash()
b = b.WithSeal(bHeader)
err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
......@@ -306,11 +370,12 @@ func TestChannelManagerClosePendingChannel(t *testing.T) {
require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data")
}
// TestChannelManagerCloseAllTxsFailed ensures that the channel manager
// ChannelManagerCloseAllTxsFailed ensures that the channel manager
// can gracefully close after producing transaction frames if none of these
// have successfully landed on chain.
func TestChannelManagerCloseAllTxsFailed(t *testing.T) {
func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) {
require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
......@@ -321,9 +386,12 @@ func TestChannelManagerCloseAllTxsFailed(t *testing.T) {
TargetFrameSize: 1000,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
}, &defaultTestRollupConfig,
)
m.Clear()
a := newMiniL2Block(50_000)
a := derivetest.RandomL2BlockWithChainId(rng, 50000, defaultTestRollupConfig.L2ChainID)
err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
......
......@@ -5,6 +5,7 @@ import (
"testing"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
......@@ -20,7 +21,8 @@ func TestChannelTimeout(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{
ChannelTimeout: 100,
})
}, &rollup.Config{})
m.Clear()
// Pending channel is nil so is cannot be timed out
require.Nil(t, m.currentChannel)
......@@ -61,7 +63,8 @@ func TestChannelTimeout(t *testing.T) {
// TestChannelNextTxData checks the nextTxData function.
func TestChannelNextTxData(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{})
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{})
m.Clear()
// Nil pending channel should return EOF
returnedTxData, err := m.nextTxData(nil)
......@@ -109,7 +112,8 @@ func TestChannelTxConfirmed(t *testing.T) {
// channels on confirmation. This would result in [TxConfirmed]
// clearing confirmed transactions, and reseting the pendingChannels map
ChannelTimeout: 10,
})
}, &rollup.Config{})
m.Clear()
// Let's add a valid pending transaction to the channel manager
// So we can demonstrate that TxConfirmed's correctness
......@@ -157,7 +161,8 @@ func TestChannelTxConfirmed(t *testing.T) {
func TestChannelTxFailed(t *testing.T) {
// Create a channel manager
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{})
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{})
m.Clear()
// Let's add a valid pending transaction to the channel
// manager so we can demonstrate correctness
......
......@@ -52,6 +52,8 @@ type CLIConfig struct {
Stopped bool
BatchType uint
TxMgrConfig txmgr.CLIConfig
LogConfig oplog.CLIConfig
MetricsConfig opmetrics.CLIConfig
......@@ -93,6 +95,7 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
MaxChannelDuration: ctx.Uint64(flags.MaxChannelDurationFlag.Name),
MaxL1TxSize: ctx.Uint64(flags.MaxL1TxSizeBytesFlag.Name),
Stopped: ctx.Bool(flags.StoppedFlag.Name),
BatchType: ctx.Uint(flags.BatchTypeFlag.Name),
TxMgrConfig: txmgr.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
......
......@@ -74,7 +74,7 @@ type BatchSubmitter struct {
func NewBatchSubmitter(setup DriverSetup) *BatchSubmitter {
return &BatchSubmitter{
DriverSetup: setup,
state: NewChannelManager(setup.Log, setup.Metr, setup.Channel),
state: NewChannelManager(setup.Log, setup.Metr, setup.Channel, setup.RollupCfg),
}
}
......
......@@ -173,6 +173,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
SubSafetyMargin: cfg.SubSafetyMargin,
MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version
CompressorConfig: cfg.CompressorConfig.Config(),
BatchType: cfg.BatchType,
}
if err := bs.Channel.Check(); err != nil {
return fmt.Errorf("invalid channel configuration: %w", err)
......@@ -289,8 +290,10 @@ func (bs *BatcherService) Stop(ctx context.Context) error {
bs.Log.Info("Stopping batcher")
var result error
if err := bs.driver.StopBatchSubmittingIfRunning(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop batch submitting: %w", err))
if bs.driver != nil {
if err := bs.driver.StopBatchSubmittingIfRunning(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop batch submitting: %w", err))
}
}
if bs.rpcServer != nil {
......@@ -327,7 +330,7 @@ func (bs *BatcherService) Stop(ctx context.Context) error {
if result == nil {
bs.stopped.Store(true)
bs.driver.Log.Info("Batch Submitter stopped")
bs.Log.Info("Batch Submitter stopped")
}
return result
}
......
package main
import (
"context"
"os"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/urfave/cli/v2"
"github.com/ethereum-optimism/optimism/op-batcher/batcher"
"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
opservice "github.com/ethereum-optimism/optimism/op-service"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
"github.com/ethereum-optimism/optimism/op-service/metrics/doc"
"github.com/ethereum-optimism/optimism/op-service/opio"
"github.com/ethereum/go-ethereum/log"
)
......@@ -38,7 +40,8 @@ func main() {
},
}
err := app.Run(os.Args)
ctx := opio.WithInterruptBlocker(context.Background())
err := app.RunContext(ctx, os.Args)
if err != nil {
log.Crit("Application failed", "message", err)
}
......
......@@ -76,6 +76,12 @@ var (
Usage: "Initialize the batcher in a stopped state. The batcher can be started using the admin_startBatcher RPC",
EnvVars: prefixEnvVars("STOPPED"),
}
BatchTypeFlag = &cli.UintFlag{
Name: "batch-type",
Usage: "The batch type. 0 for SingularBatch and 1 for SpanBatch.",
Value: 0,
EnvVars: prefixEnvVars("BATCH_TYPE"),
}
// Legacy Flags
SequencerHDPathFlag = txmgr.SequencerHDPathFlag
)
......@@ -94,6 +100,7 @@ var optionalFlags = []cli.Flag{
MaxL1TxSizeBytesFlag,
StoppedFlag,
SequencerHDPathFlag,
BatchTypeFlag,
}
func init() {
......
......@@ -9,7 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-bindings/solc"
)
const AlphabetVMStorageLayoutJSON = "{\"storage\":[{\"astId\":1000,\"contract\":\"test/FaultDisputeGame.t.sol:AlphabetVM\",\"label\":\"oracle\",\"offset\":0,\"slot\":\"0\",\"type\":\"t_contract(IPreimageOracle)1001\"}],\"types\":{\"t_contract(IPreimageOracle)1001\":{\"encoding\":\"inplace\",\"label\":\"contract IPreimageOracle\",\"numberOfBytes\":\"20\"}}}"
const AlphabetVMStorageLayoutJSON = "{\"storage\":[{\"astId\":1000,\"contract\":\"test/mocks/AlphabetVM.sol:AlphabetVM\",\"label\":\"oracle\",\"offset\":0,\"slot\":\"0\",\"type\":\"t_contract(IPreimageOracle)1001\"}],\"types\":{\"t_contract(IPreimageOracle)1001\":{\"encoding\":\"inplace\",\"label\":\"contract IPreimageOracle\",\"numberOfBytes\":\"20\"}}}"
var AlphabetVMStorageLayout = new(solc.StorageLayout)
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
package main
import (
"bytes"
"context"
"errors"
"flag"
"fmt"
"math/big"
"os"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/trie"
)
func CalcBaseFee(parent eth.BlockInfo, elasticity uint64, canyonActive bool) *big.Int {
denomUint := uint64(50)
if canyonActive {
denomUint = uint64(250)
}
parentGasTarget := parent.GasLimit() / elasticity
// If the parent gasUsed is the same as the target, the baseFee remains unchanged.
if parent.GasUsed() == parentGasTarget {
return new(big.Int).Set(parent.BaseFee())
}
var (
num = new(big.Int)
denom = new(big.Int)
)
if parent.GasUsed() > parentGasTarget {
// If the parent block used more gas than its target, the baseFee should increase.
// max(1, parentBaseFee * gasUsedDelta / parentGasTarget / baseFeeChangeDenominator)
num.SetUint64(parent.GasUsed() - parentGasTarget)
num.Mul(num, parent.BaseFee())
num.Div(num, denom.SetUint64(parentGasTarget))
num.Div(num, denom.SetUint64(denomUint))
baseFeeDelta := math.BigMax(num, common.Big1)
return num.Add(parent.BaseFee(), baseFeeDelta)
} else {
// Otherwise if the parent block used less gas than its target, the baseFee should decrease.
// max(0, parentBaseFee * gasUsedDelta / parentGasTarget / baseFeeChangeDenominator)
num.SetUint64(parentGasTarget - parent.GasUsed())
num.Mul(num, parent.BaseFee())
num.Div(num, denom.SetUint64(parentGasTarget))
num.Div(num, denom.SetUint64(denomUint))
baseFee := num.Sub(parent.BaseFee(), num)
return math.BigMax(baseFee, common.Big0)
}
}
func ManuallyEncodeReceipts(receipts types.Receipts, canyonActive bool) [][]byte {
v := uint64(1)
for _, receipt := range receipts {
if receipt.Type == types.DepositTxType {
if canyonActive {
receipt.DepositReceiptVersion = &v
} else {
receipt.DepositReceiptVersion = nil
}
}
}
var out [][]byte
for i := range receipts {
var buf bytes.Buffer
receipts.EncodeIndex(i, &buf)
out = append(out, buf.Bytes())
}
return out
}
type rawReceipts [][]byte
func (s rawReceipts) Len() int { return len(s) }
func (s rawReceipts) EncodeIndex(i int, w *bytes.Buffer) {
w.Write(s[i])
}
func HashList(list [][]byte) common.Hash {
hasher := trie.NewStackTrie(nil)
return types.DeriveSha(rawReceipts(list), hasher)
}
type L2Client interface {
BlockByNumber(context.Context, *big.Int) (*types.Block, error)
CodeAt(context.Context, common.Address, *big.Int) ([]byte, error)
InfoByNumber(context.Context, uint64) (eth.BlockInfo, error)
FetchReceipts(context.Context, common.Hash) (eth.BlockInfo, types.Receipts, error)
}
type Client struct {
*ethclient.Client
*sources.L1Client
}
type Args struct {
Number uint64
Elasticity uint64
Client L2Client
}
func ValidateReceipts(ctx Args, canyonActive bool) error {
block, err := ctx.Client.InfoByNumber(context.Background(), ctx.Number)
if err != nil {
return err
}
_, receipts, err := ctx.Client.FetchReceipts(context.Background(), block.Hash())
if err != nil {
return err
}
have := block.ReceiptHash()
want := HashList(ManuallyEncodeReceipts(receipts, canyonActive))
if have != want {
return fmt.Errorf("Receipts do not look correct. canyonActive: %v. have: %v, want: %v", canyonActive, have, want)
}
return nil
}
func Validate1559Params(ctx Args, canyonActive bool) error {
block, err := ctx.Client.InfoByNumber(context.Background(), ctx.Number)
if err != nil {
return err
}
if block.BaseFee().Cmp(big.NewInt(1000)) < 0 {
log.Info("Basefee to low to properly validate", "basefee", block.BaseFee())
return nil
}
parent, err := ctx.Client.InfoByNumber(context.Background(), ctx.Number-1)
if err != nil {
return err
}
want := CalcBaseFee(parent, ctx.Elasticity, canyonActive)
have := block.BaseFee()
if have.Cmp(want) != 0 {
return fmt.Errorf("BaseFee does not match. canyonActive: %v. have: %v, want: %v", canyonActive, have, want)
}
return nil
}
func ValidateWithdrawals(ctx Args, canyonActive bool) error {
block, err := ctx.Client.BlockByNumber(context.Background(), new(big.Int).SetUint64(ctx.Number))
if err != nil {
return err
}
if canyonActive && block.Withdrawals() == nil {
return errors.New("No nonwithdrawals in a canyon block")
} else if canyonActive && len(block.Withdrawals()) > 0 {
return errors.New("Withdrawals length is not zero in a canyon block")
} else if !canyonActive && block.Withdrawals() != nil {
return errors.New("Withdrawals in a pre-canyon block")
}
return nil
}
func ValidateCreate2Deployer(ctx Args, canyonActive bool) error {
addr := common.HexToAddress("0x13b0D85CcB8bf860b6b79AF3029fCA081AE9beF2")
code, err := ctx.Client.CodeAt(context.Background(), addr, new(big.Int).SetUint64(ctx.Number))
if err != nil {
return err
}
codeHash := crypto.Keccak256Hash(code)
expectedCodeHash := common.HexToHash("0xb0550b5b431e30d38000efb7107aaa0ade03d48a7198a140edda9d27134468b2")
if canyonActive && codeHash != expectedCodeHash {
return fmt.Errorf("Canyon active but code hash does not match. have: %v, want: %v", codeHash, expectedCodeHash)
} else if !canyonActive && codeHash == expectedCodeHash {
return fmt.Errorf("Canyon not active but code hashes do match. codeHash: %v", codeHash)
}
return nil
}
// CheckActivation takes a function f which determines in a specific block follows the rules of a fork.
// forkActivated tells `f` if the fork is active or not. `f` is called twice: First to validate that
// there is no error when checking the new value and second to validate the it returns an error when
// attempting to validate the block against the opposite of what is is.
// If any error is encountered, valid is set to false.
func CheckActivation(f func(Args, bool) error, ctx Args, forkActivated bool, valid *bool) {
if forkActivated {
if err := f(ctx, true); err != nil {
log.Error("Pre-state was invalid when it was expected to be valid", "err", err)
*valid = false
}
if err := f(ctx, false); err == nil {
log.Error("Post-state was valid when it was expected to be invalid")
*valid = false
}
} else {
if err := f(ctx, true); err == nil {
log.Error("Pre-state was valid when it was expected to be invalid")
*valid = false
}
if err := f(ctx, false); err != nil {
log.Error("Post-state was invalid when it was expected to be valid", "err", err)
*valid = false
}
}
}
func main() {
logger := log.New()
// Define the flag variables
var (
canyonActive bool
number uint64
elasticity uint64
rpcURL string
)
valid := true
// Define and parse the command-line flags
flag.BoolVar(&canyonActive, "canyon", false, "Set this flag to assert canyon behavior")
flag.Uint64Var(&number, "number", 31, "Block number to check")
flag.Uint64Var(&elasticity, "elasticity", 6, "Specify the EIP-1559 elasticity. 6 on mainnet/sepolia. 10 on goerli")
flag.StringVar(&rpcURL, "rpc-url", "http://localhost:8545", "Specify the L2 ETH RPC URL")
// Parse the command-line arguments
flag.Parse()
l2RPC, err := client.NewRPC(context.Background(), logger, rpcURL, client.WithDialBackoff(10))
if err != nil {
log.Crit("Error creating RPC", "err", err)
}
c := &rollup.Config{SeqWindowSize: 10}
l2Cfg := sources.L1ClientDefaultConfig(c, true, sources.RPCKindBasic)
sourceClient, err := sources.NewL1Client(l2RPC, logger, nil, l2Cfg)
if err != nil {
log.Crit("Error creating RPC", "err", err)
}
ethClient, err := ethclient.Dial(rpcURL)
if err != nil {
log.Crit("Error creating RPC", "err", err)
}
client := Client{ethClient, sourceClient}
ctx := Args{
Number: number,
Elasticity: elasticity,
Client: client,
}
CheckActivation(ValidateReceipts, ctx, canyonActive, &valid)
CheckActivation(Validate1559Params, ctx, canyonActive, &valid)
CheckActivation(ValidateWithdrawals, ctx, canyonActive, &valid)
CheckActivation(ValidateCreate2Deployer, ctx, canyonActive, &valid)
if !valid {
os.Exit(1)
} else if canyonActive {
log.Info(fmt.Sprintf("Successfully validated block %v as a Canyon block", number))
} else {
log.Info(fmt.Sprintf("Successfully validated block %v as a Pre-Canyon block", number))
}
}
......@@ -112,7 +112,7 @@ func entrypoint(ctx *cli.Context) error {
name, _ := toDeployConfigName(chainConfig)
config, err := genesis.NewDeployConfigWithNetwork(name, deployConfig)
if err != nil {
log.Warn("Cannot find deploy config for network", "name", chainConfig.Name, "deploy-config-name", name, "path", deployConfig)
log.Warn("Cannot find deploy config for network", "name", chainConfig.Name, "deploy-config-name", name, "path", deployConfig, "err", err)
}
if config != nil {
......
......@@ -185,6 +185,8 @@ type DeployConfig struct {
EIP1559Elasticity uint64 `json:"eip1559Elasticity"`
// EIP1559Denominator is the denominator of EIP1559 base fee market.
EIP1559Denominator uint64 `json:"eip1559Denominator"`
// EIP1559DenominatorCanyon is the denominator of EIP1559 base fee market when Canyon is active.
EIP1559DenominatorCanyon uint64 `json:"eip1559DenominatorCanyon"`
// SystemConfigStartBlock represents the block at which the op-node should start syncing
// from. It is an override to set this value on legacy networks where it is not set by
// default. It can be removed once all networks have this value set in their storage.
......@@ -318,6 +320,9 @@ func (d *DeployConfig) Check() error {
if d.EIP1559Denominator == 0 {
return fmt.Errorf("%w: EIP1559Denominator cannot be 0", ErrInvalidDeployConfig)
}
if d.L2GenesisCanyonTimeOffset != nil && d.EIP1559DenominatorCanyon == 0 {
return fmt.Errorf("%w: EIP1559DenominatorCanyon cannot be 0 if Canyon is activated", ErrInvalidDeployConfig)
}
if d.EIP1559Elasticity == 0 {
return fmt.Errorf("%w: EIP1559Elasticity cannot be 0", ErrInvalidDeployConfig)
}
......
......@@ -31,6 +31,10 @@ func NewL2Genesis(config *DeployConfig, block *types.Block) (*core.Genesis, erro
if eip1559Denom == 0 {
eip1559Denom = 50
}
eip1559DenomCanyon := config.EIP1559DenominatorCanyon
if eip1559DenomCanyon == 0 {
eip1559DenomCanyon = 250
}
eip1559Elasticity := config.EIP1559Elasticity
if eip1559Elasticity == 0 {
eip1559Elasticity = 10
......@@ -61,8 +65,9 @@ func NewL2Genesis(config *DeployConfig, block *types.Block) (*core.Genesis, erro
CanyonTime: config.CanyonTime(block.Time()),
ShanghaiTime: config.CanyonTime(block.Time()),
Optimism: &params.OptimismConfig{
EIP1559Denominator: eip1559Denom,
EIP1559Elasticity: eip1559Elasticity,
EIP1559Denominator: eip1559Denom,
EIP1559Elasticity: eip1559Elasticity,
EIP1559DenominatorCanyon: eip1559DenomCanyon,
},
}
......
......@@ -62,6 +62,7 @@
"governanceTokenOwner": "0x0000000000000000000000000000000000000333",
"deploymentWaitConfirmations": 1,
"eip1559Denominator": 8,
"eip1559DenominatorCanyon": 12,
"eip1559Elasticity": 2,
"fundDevAccounts": true,
"faultGameAbsolutePrestate": "0x0000000000000000000000000000000000000000000000000000000000000000",
......
......@@ -7,7 +7,6 @@ import (
"time"
"github.com/ethereum-optimism/optimism/op-challenger/config"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
......@@ -17,8 +16,8 @@ import (
var (
l1EthRpc = "http://example.com:8545"
gameFactoryAddressValue = "0xbb00000000000000000000000000000000000000"
cannonNetwork = chaincfg.AvailableNetworks()[0]
otherCannonNetwork = chaincfg.AvailableNetworks()[1]
cannonNetwork = "op-mainnet"
otherCannonNetwork = "op-goerli"
cannonBin = "./bin/cannon"
cannonServer = "./bin/op-program"
cannonPreState = "./pre.json"
......
......@@ -25,7 +25,7 @@ type Responder interface {
}
type ClaimLoader interface {
FetchClaims(ctx context.Context) ([]types.Claim, error)
GetAllClaims(ctx context.Context) ([]types.Claim, error)
}
type Agent struct {
......@@ -136,7 +136,7 @@ func (a *Agent) tryResolve(ctx context.Context) bool {
var errNoResolvableClaims = errors.New("no resolvable claims")
func (a *Agent) tryResolveClaims(ctx context.Context) error {
claims, err := a.loader.FetchClaims(ctx)
claims, err := a.loader.GetAllClaims(ctx)
if err != nil {
return fmt.Errorf("failed to fetch claims: %w", err)
}
......@@ -189,7 +189,7 @@ func (a *Agent) resolveClaims(ctx context.Context) error {
// newGameFromContracts initializes a new game state from the state in the contract
func (a *Agent) newGameFromContracts(ctx context.Context) (types.Game, error) {
claims, err := a.loader.FetchClaims(ctx)
claims, err := a.loader.GetAllClaims(ctx)
if err != nil {
return nil, fmt.Errorf("failed to fetch claims: %w", err)
}
......
......@@ -124,7 +124,7 @@ type stubClaimLoader struct {
claims []types.Claim
}
func (s *stubClaimLoader) FetchClaims(ctx context.Context) ([]types.Claim, error) {
func (s *stubClaimLoader) GetAllClaims(ctx context.Context) ([]types.Claim, error) {
s.callCount++
return s.claims, nil
}
......
package contracts
import (
"context"
"fmt"
"math/big"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-challenger/game/fault/types"
gameTypes "github.com/ethereum-optimism/optimism/op-challenger/game/types"
"github.com/ethereum-optimism/optimism/op-service/sources/batching"
"github.com/ethereum/go-ethereum/common"
)
const (
methodGameDuration = "GAME_DURATION"
methodMaxGameDepth = "MAX_GAME_DEPTH"
methodAbsolutePrestate = "ABSOLUTE_PRESTATE"
methodStatus = "status"
methodClaimCount = "claimDataLen"
methodClaim = "claimData"
)
type FaultDisputeGameContract struct {
multiCaller *batching.MultiCaller
contract *batching.BoundContract
}
func NewFaultDisputeGameContract(addr common.Address, caller *batching.MultiCaller) (*FaultDisputeGameContract, error) {
fdgAbi, err := bindings.FaultDisputeGameMetaData.GetAbi()
if err != nil {
return nil, fmt.Errorf("failed to load fault dispute game ABI: %w", err)
}
return &FaultDisputeGameContract{
multiCaller: caller,
contract: batching.NewBoundContract(fdgAbi, addr),
}, nil
}
func (f *FaultDisputeGameContract) GetGameDuration(ctx context.Context) (uint64, error) {
result, err := f.multiCaller.SingleCall(ctx, batching.BlockLatest, f.contract.Call(methodGameDuration))
if err != nil {
return 0, fmt.Errorf("failed to fetch game duration: %w", err)
}
return result.GetUint64(0), nil
}
func (f *FaultDisputeGameContract) GetMaxGameDepth(ctx context.Context) (uint64, error) {
result, err := f.multiCaller.SingleCall(ctx, batching.BlockLatest, f.contract.Call(methodMaxGameDepth))
if err != nil {
return 0, fmt.Errorf("failed to fetch max game depth: %w", err)
}
return result.GetBigInt(0).Uint64(), nil
}
func (f *FaultDisputeGameContract) GetAbsolutePrestateHash(ctx context.Context) (common.Hash, error) {
result, err := f.multiCaller.SingleCall(ctx, batching.BlockLatest, f.contract.Call(methodAbsolutePrestate))
if err != nil {
return common.Hash{}, fmt.Errorf("failed to fetch absolute prestate hash: %w", err)
}
return result.GetHash(0), nil
}
func (f *FaultDisputeGameContract) GetStatus(ctx context.Context) (gameTypes.GameStatus, error) {
result, err := f.multiCaller.SingleCall(ctx, batching.BlockLatest, f.contract.Call(methodStatus))
if err != nil {
return 0, fmt.Errorf("failed to fetch status: %w", err)
}
return gameTypes.GameStatusFromUint8(result.GetUint8(0))
}
func (f *FaultDisputeGameContract) GetClaimCount(ctx context.Context) (uint64, error) {
result, err := f.multiCaller.SingleCall(ctx, batching.BlockLatest, f.contract.Call(methodClaimCount))
if err != nil {
return 0, fmt.Errorf("failed to fetch claim count: %w", err)
}
return result.GetBigInt(0).Uint64(), nil
}
func (f *FaultDisputeGameContract) GetClaim(ctx context.Context, idx uint64) (types.Claim, error) {
result, err := f.multiCaller.SingleCall(ctx, batching.BlockLatest, f.contract.Call(methodClaim, new(big.Int).SetUint64(idx)))
if err != nil {
return types.Claim{}, fmt.Errorf("failed to fetch claim %v: %w", idx, err)
}
return f.decodeClaim(result, int(idx)), nil
}
func (f *FaultDisputeGameContract) GetAllClaims(ctx context.Context) ([]types.Claim, error) {
count, err := f.GetClaimCount(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load claim count: %w", err)
}
calls := make([]*batching.ContractCall, count)
for i := uint64(0); i < count; i++ {
calls[i] = f.contract.Call(methodClaim, new(big.Int).SetUint64(i))
}
results, err := f.multiCaller.Call(ctx, batching.BlockLatest, calls...)
if err != nil {
return nil, fmt.Errorf("failed to fetch claim data: %w", err)
}
var claims []types.Claim
for idx, result := range results {
claims = append(claims, f.decodeClaim(result, idx))
}
return claims, nil
}
func (f *FaultDisputeGameContract) decodeClaim(result *batching.CallResult, contractIndex int) types.Claim {
parentIndex := result.GetUint32(0)
countered := result.GetBool(1)
claim := result.GetHash(2)
position := result.GetBigInt(3)
clock := result.GetBigInt(4)
return types.Claim{
ClaimData: types.ClaimData{
Value: claim,
Position: types.NewPositionFromGIndex(position),
},
Countered: countered,
Clock: clock.Uint64(),
ContractIndex: contractIndex,
ParentContractIndex: int(parentIndex),
}
}
This diff is collapsed.
package contracts
import (
"context"
"fmt"
"math/big"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-challenger/game/types"
"github.com/ethereum-optimism/optimism/op-service/sources/batching"
"github.com/ethereum/go-ethereum/common"
)
const (
methodGameCount = "gameCount"
methodGameAtIndex = "gameAtIndex"
)
type DisputeGameFactoryContract struct {
multiCaller *batching.MultiCaller
contract *batching.BoundContract
}
func NewDisputeGameFactoryContract(addr common.Address, caller *batching.MultiCaller) (*DisputeGameFactoryContract, error) {
factoryAbi, err := bindings.DisputeGameFactoryMetaData.GetAbi()
if err != nil {
return nil, fmt.Errorf("failed to load dispute game factory ABI: %w", err)
}
return &DisputeGameFactoryContract{
multiCaller: caller,
contract: batching.NewBoundContract(factoryAbi, addr),
}, nil
}
func (f *DisputeGameFactoryContract) GetGameCount(ctx context.Context, blockNum uint64) (uint64, error) {
result, err := f.multiCaller.SingleCall(ctx, batching.BlockByNumber(blockNum), f.contract.Call(methodGameCount))
if err != nil {
return 0, fmt.Errorf("failed to load game count: %w", err)
}
return result.GetBigInt(0).Uint64(), nil
}
func (f *DisputeGameFactoryContract) GetGame(ctx context.Context, idx uint64, blockNum uint64) (types.GameMetadata, error) {
result, err := f.multiCaller.SingleCall(ctx, batching.BlockByNumber(blockNum), f.contract.Call(methodGameAtIndex, new(big.Int).SetUint64(idx)))
if err != nil {
return types.GameMetadata{}, fmt.Errorf("failed to load game %v: %w", idx, err)
}
return f.decodeGame(result), nil
}
func (f *DisputeGameFactoryContract) decodeGame(result *batching.CallResult) types.GameMetadata {
gameType := result.GetUint8(0)
timestamp := result.GetUint64(1)
proxy := result.GetAddress(2)
return types.GameMetadata{
GameType: gameType,
Timestamp: timestamp,
Proxy: proxy,
}
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -181,7 +181,7 @@ func (s *stubGameState) Act(ctx context.Context) error {
return s.actErr
}
func (s *stubGameState) GetGameStatus(ctx context.Context) (gameTypes.GameStatus, error) {
func (s *stubGameState) GetStatus(ctx context.Context) (gameTypes.GameStatus, error) {
return s.status, nil
}
......@@ -234,7 +234,7 @@ func newMockPrestateLoader(prestateError bool, prestate common.Hash) *mockLoader
prestate: prestate,
}
}
func (m *mockLoader) FetchAbsolutePrestateHash(ctx context.Context) (common.Hash, error) {
func (m *mockLoader) GetAbsolutePrestateHash(ctx context.Context) (common.Hash, error) {
if m.prestateError {
return common.Hash{}, mockLoaderError
}
......
......@@ -12,8 +12,8 @@ import (
"github.com/ethereum-optimism/optimism/op-challenger/game/types"
"github.com/ethereum-optimism/optimism/op-challenger/metrics"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
)
......@@ -33,7 +33,7 @@ func RegisterGameTypes(
m metrics.Metricer,
cfg *config.Config,
txMgr txmgr.TxManager,
client bind.ContractCaller,
client *ethclient.Client,
) {
if cfg.TraceTypeEnabled(config.TraceTypeCannon) {
resourceCreator := func(addr common.Address, gameDepth uint64, dir string) (faultTypes.TraceProvider, faultTypes.OracleUpdater, error) {
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment