Commit 6e9b91db authored by tre's avatar tre

Merge branch 'develop' into harry/faucet_drippie

parents c6cd795f 2aad1783
---
'@eth-optimism/chain-mon': patch
---
Bump node version to LTS node 20.9.0
This diff is collapsed.
......@@ -25,9 +25,10 @@
/ops-bedrock @ethereum-optimism/go-reviewers
# Ops
/.circleci @ethereum-optimism/infra-reviewers
/.github @ethereum-optimism/infra-reviewers
/ops @ethereum-optimism/infra-reviewers
/.circleci @ethereum-optimism/infra-reviewers
/.github @ethereum-optimism/infra-reviewers
/ops @ethereum-optimism/infra-reviewers
/docker-bake.hcl @ethereum-optimism/infra-reviewers
# Misc
/proxyd @ethereum-optimism/infra-reviewers
......
......@@ -17,6 +17,3 @@
path = packages/contracts-bedrock/lib/safe-contracts
url = https://github.com/safe-global/safe-contracts
branch = v1.4.0
COMPOSEFLAGS=-d
ITESTS_L2_HOST=http://localhost:9545
BEDROCK_TAGS_REMOTE?=origin
OP_STACK_GO_BUILDER?=us-docker.pkg.dev/oplabs-tools-artifacts/images/op_stack_go:latest
OP_STACK_GO_BUILDER?=us-docker.pkg.dev/oplabs-tools-artifacts/images/op-stack-go:latest
# Requires at least Python v3.9; specify a minor version below if needed
PYTHON?=python3
build: build-go build-ts
.PHONY: build
......@@ -25,10 +28,15 @@ ci-builder:
docker build -t ci-builder -f ops/docker/ci-builder/Dockerfile .
golang-docker:
DOCKER_BUILDKIT=1 docker build -t op-stack-go \
--build-arg GIT_COMMIT=$$(git rev-parse HEAD) \
--build-arg GIT_DATE=$$(git show -s --format='%ct') \
-f ops/docker/op-stack-go/Dockerfile .
# We don't use a buildx builder here, and just load directly into regular docker, for convenience.
GIT_COMMIT=$$(git rev-parse HEAD) \
GIT_DATE=$$(git show -s --format='%ct') \
IMAGE_TAGS=$$(git rev-parse HEAD),latest \
docker buildx bake \
--progress plain \
--load \
-f docker-bake.hcl \
op-node op-batcher op-proposer op-challenger
.PHONY: golang-docker
submodules:
......@@ -109,14 +117,14 @@ pre-devnet:
devnet-up: pre-devnet
./ops/scripts/newer-file.sh .devnet/allocs-l1.json ./packages/contracts-bedrock \
|| make devnet-allocs
PYTHONPATH=./bedrock-devnet python3 ./bedrock-devnet/main.py --monorepo-dir=.
PYTHONPATH=./bedrock-devnet $(PYTHON) ./bedrock-devnet/main.py --monorepo-dir=.
.PHONY: devnet-up
# alias for devnet-up
devnet-up-deploy: devnet-up
devnet-test: pre-devnet
PYTHONPATH=./bedrock-devnet python3 ./bedrock-devnet/main.py --monorepo-dir=. --test
PYTHONPATH=./bedrock-devnet $(PYTHON) ./bedrock-devnet/main.py --monorepo-dir=. --test
.PHONY: devnet-test
devnet-down:
......@@ -132,7 +140,7 @@ devnet-clean:
.PHONY: devnet-clean
devnet-allocs: pre-devnet
PYTHONPATH=./bedrock-devnet python3 ./bedrock-devnet/main.py --monorepo-dir=. --allocs
PYTHONPATH=./bedrock-devnet $(PYTHON) ./bedrock-devnet/main.py --monorepo-dir=. --allocs
devnet-logs:
@(cd ./ops-bedrock && docker compose logs -f)
......@@ -181,4 +189,3 @@ install-geth:
go install -v github.com/ethereum/go-ethereum/cmd/geth@$(shell cat .gethrc); \
echo "Installed geth!"; true)
.PHONY: install-geth
......@@ -10,6 +10,9 @@ import time
import shutil
import http.client
from multiprocessing import Process, Queue
import concurrent.futures
from collections import namedtuple
import devnet.log_setup
......@@ -97,14 +100,18 @@ def main():
git_commit = subprocess.run(['git', 'rev-parse', 'HEAD'], capture_output=True, text=True).stdout.strip()
git_date = subprocess.run(['git', 'show', '-s', "--format=%ct"], capture_output=True, text=True).stdout.strip()
log.info(f'Building docker images for git commit {git_commit} ({git_date})')
run_command(['docker', 'compose', 'build', '--progress', 'plain',
'--build-arg', f'GIT_COMMIT={git_commit}', '--build-arg', f'GIT_DATE={git_date}'],
cwd=paths.ops_bedrock_dir, env={
'PWD': paths.ops_bedrock_dir,
'DOCKER_BUILDKIT': '1', # (should be available by default in later versions, but explicitly enable it anyway)
'COMPOSE_DOCKER_CLI_BUILD': '1' # use the docker cache
})
# CI loads the images from workspace, and does not otherwise know the images are good as-is
if os.getenv('DEVNET_NO_BUILD') == "true":
log.info('Skipping docker images build')
else:
log.info(f'Building docker images for git commit {git_commit} ({git_date})')
run_command(['docker', 'compose', 'build', '--progress', 'plain',
'--build-arg', f'GIT_COMMIT={git_commit}', '--build-arg', f'GIT_DATE={git_date}'],
cwd=paths.ops_bedrock_dir, env={
'PWD': paths.ops_bedrock_dir,
'DOCKER_BUILDKIT': '1', # (should be available by default in later versions, but explicitly enable it anyway)
'COMPOSE_DOCKER_CLI_BUILD': '1' # use the docker cache
})
log.info('Devnet starting')
devnet_deploy(paths)
......@@ -297,6 +304,10 @@ def wait_for_rpc_server(url):
log.info(f'Waiting for RPC server at {url}')
time.sleep(1)
CommandPreset = namedtuple('Command', ['name', 'args', 'cwd', 'timeout'])
def devnet_test(paths):
# Check the L2 config
run_command(
......@@ -304,17 +315,57 @@ def devnet_test(paths):
cwd=paths.ops_chain_ops,
)
run_command(
['npx', 'hardhat', 'deposit-erc20', '--network', 'devnetL1', '--l1-contracts-json-path', paths.addresses_json_path],
cwd=paths.sdk_dir,
timeout=8*60,
)
# Run the two commands with different signers, so the ethereum nonce management does not conflict
# And do not use devnet system addresses, to avoid breaking fee-estimation or nonce values.
run_commands([
CommandPreset('erc20-test',
['npx', 'hardhat', 'deposit-erc20', '--network', 'devnetL1',
'--l1-contracts-json-path', paths.addresses_json_path, '--signer-index', '14'],
cwd=paths.sdk_dir, timeout=8*60),
CommandPreset('eth-test',
['npx', 'hardhat', 'deposit-eth', '--network', 'devnetL1',
'--l1-contracts-json-path', paths.addresses_json_path, '--signer-index', '15'],
cwd=paths.sdk_dir, timeout=8*60)
], max_workers=2)
def run_commands(commands: list[CommandPreset], max_workers=2):
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(run_command_preset, cmd) for cmd in commands]
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result:
print(result.stdout)
def run_command_preset(command: CommandPreset):
with subprocess.Popen(command.args, cwd=command.cwd,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) as proc:
try:
# Live output processing
for line in proc.stdout:
# Annotate and print the line with timestamp and command name
timestamp = datetime.datetime.utcnow().strftime('%H:%M:%S.%f')
# Annotate and print the line with the timestamp
print(f"[{timestamp}][{command.name}] {line}", end='')
stdout, stderr = proc.communicate(timeout=command.timeout)
if proc.returncode != 0:
raise RuntimeError(f"Command '{' '.join(command.args)}' failed with return code {proc.returncode}: {stderr}")
except subprocess.TimeoutExpired:
raise RuntimeError(f"Command '{' '.join(command.args)}' timed out!")
except Exception as e:
raise RuntimeError(f"Error executing '{' '.join(command.args)}': {e}")
finally:
# Ensure process is terminated
proc.kill()
return proc.returncode
run_command(
['npx', 'hardhat', 'deposit-eth', '--network', 'devnetL1', '--l1-contracts-json-path', paths.addresses_json_path],
cwd=paths.sdk_dir,
timeout=8*60,
)
def run_command(args, check=True, shell=False, cwd=None, env=None, timeout=None):
env = env if env else {}
......
comment: false
codecov:
require_ci_to_pass: false
comment:
layout: "diff, flags, files"
behavior: default
require_changes: true
flags:
- contracts-bedrock-tests
ignore:
- "op-e2e"
- "**/*.t.sol"
- "op-bindings/bindings/*.go"
- "**/*.t.sol"
- "packages/contracts-bedrock/test/**/*.sol"
- "packages/contracts-bedrock/contracts/vendor/WETH9.sol"
- 'packages/contracts-bedrock/contracts/EAS/**/*.sol'
coverage:
......@@ -13,6 +22,7 @@ coverage:
threshold: 0% # coverage is not allowed to reduce vs. the PR base
base: auto
informational: true
enabled: true
project:
default:
informational: true
......@@ -22,7 +32,7 @@ flag_management:
individual_flags:
- name: contracts-bedrock-tests
paths:
- packages/contracts-bedrock/contracts
- packages/contracts-bedrock
statuses:
- type: patch
target: 100%
......
variable "REGISTRY" {
default = "us-docker.pkg.dev"
}
variable "REPOSITORY" {
default = "oplabs-tools-artifacts/images"
}
variable "GIT_COMMIT" {
default = "dev"
}
variable "GIT_DATE" {
default = "0"
}
variable "GIT_VERSION" {
default = "docker" // original default as set in proxyd file, not used by full go stack, yet
}
variable "IMAGE_TAGS" {
default = "${GIT_COMMIT}" // split by ","
}
variable "PLATFORMS" {
// You can override this as "linux/amd64,linux/arm64".
// Only a specify a single platform when `--load` ing into docker.
// Multi-platform is supported when outputting to disk or pushing to a registry.
// Multi-platform builds can be tested locally with: --set="*.output=type=image,push=false"
default = "linux/amd64"
}
target "op-stack-go" {
dockerfile = "ops/docker/op-stack-go/Dockerfile"
context = "."
args = {
GIT_COMMIT = "${GIT_COMMIT}"
GIT_DATE = "${GIT_DATE}"
}
platforms = split(",", PLATFORMS)
tags = [for tag in split(",", IMAGE_TAGS) : "${REGISTRY}/${REPOSITORY}/op-stack-go:${tag}"]
}
target "op-node" {
dockerfile = "Dockerfile"
context = "./op-node"
args = {
OP_STACK_GO_BUILDER = "op-stack-go"
}
contexts = {
op-stack-go: "target:op-stack-go"
}
platforms = split(",", PLATFORMS)
tags = [for tag in split(",", IMAGE_TAGS) : "${REGISTRY}/${REPOSITORY}/op-node:${tag}"]
}
target "op-batcher" {
dockerfile = "Dockerfile"
context = "./op-batcher"
args = {
OP_STACK_GO_BUILDER = "op-stack-go"
}
contexts = {
op-stack-go: "target:op-stack-go"
}
platforms = split(",", PLATFORMS)
tags = [for tag in split(",", IMAGE_TAGS) : "${REGISTRY}/${REPOSITORY}/op-batcher:${tag}"]
}
target "op-proposer" {
dockerfile = "Dockerfile"
context = "./op-proposer"
args = {
OP_STACK_GO_BUILDER = "op-stack-go"
}
contexts = {
op-stack-go: "target:op-stack-go"
}
platforms = split(",", PLATFORMS)
tags = [for tag in split(",", IMAGE_TAGS) : "${REGISTRY}/${REPOSITORY}/op-proposer:${tag}"]
}
target "op-challenger" {
dockerfile = "Dockerfile"
context = "./op-challenger"
args = {
OP_STACK_GO_BUILDER = "op-stack-go"
}
contexts = {
op-stack-go: "target:op-stack-go"
}
platforms = split(",", PLATFORMS)
tags = [for tag in split(",", IMAGE_TAGS) : "${REGISTRY}/${REPOSITORY}/op-challenger:${tag}"]
}
target "op-heartbeat" {
dockerfile = "Dockerfile"
context = "./op-heartbeat"
args = {
OP_STACK_GO_BUILDER = "op-stack-go"
}
contexts = {
op-stack-go: "target:op-stack-go"
}
platforms = split(",", PLATFORMS)
tags = [for tag in split(",", IMAGE_TAGS) : "${REGISTRY}/${REPOSITORY}/op-heartbeat:${tag}"]
}
target "op-program" {
dockerfile = "Dockerfile"
context = "./op-program"
args = {
OP_STACK_GO_BUILDER = "op-stack-go"
}
contexts = {
op-stack-go: "target:op-stack-go"
}
platforms = split(",", PLATFORMS)
tags = [for tag in split(",", IMAGE_TAGS) : "${REGISTRY}/${REPOSITORY}/op-program:${tag}"]
}
target "proxyd" {
dockerfile = "Dockerfile"
context = "./proxyd"
args = {
// proxyd dockerfile has no _ in the args
GITCOMMIT = "${GIT_COMMIT}"
GITDATE = "${GIT_DATE}"
GITVERSION = "${GIT_VERSION}"
}
platforms = split(",", PLATFORMS)
tags = [for tag in split(",", IMAGE_TAGS) : "${REGISTRY}/${REPOSITORY}/proxyd:${tag}"]
}
target "indexer" {
dockerfile = "./indexer/Dockerfile"
context = "./"
args = {
// proxyd dockerfile has no _ in the args
GITCOMMIT = "${GIT_COMMIT}"
GITDATE = "${GIT_DATE}"
GITVERSION = "${GIT_VERSION}"
}
platforms = split(",", PLATFORMS)
tags = [for tag in split(",", IMAGE_TAGS) : "${REGISTRY}/${REPOSITORY}/indexer:${tag}"]
}
target "ufm-metamask" {
dockerfile = "Dockerfile"
context = "./ufm-test-services/metamask"
args = {
// proxyd dockerfile has no _ in the args
GITCOMMIT = "${GIT_COMMIT}"
GITDATE = "${GIT_DATE}"
GITVERSION = "${GIT_VERSION}"
}
platforms = split(",", PLATFORMS)
tags = [for tag in split(",", IMAGE_TAGS) : "${REGISTRY}/${REPOSITORY}/ufm-metamask:${tag}"]
}
target "chain-mon" {
dockerfile = "./ops/docker/Dockerfile.packages"
context = "."
args = {
// proxyd dockerfile has no _ in the args
GITCOMMIT = "${GIT_COMMIT}"
GITDATE = "${GIT_DATE}"
GITVERSION = "${GIT_VERSION}"
}
// this is a multi-stage build, where each stage is a possible output target, but wd-mon is all we publish
target = "wd-mon"
platforms = split(",", PLATFORMS)
tags = [for tag in split(",", IMAGE_TAGS) : "${REGISTRY}/${REPOSITORY}/chain-mon:${tag}"]
}
target "ci-builder" {
dockerfile = "./ops/docker/ci-builder/Dockerfile"
context = "."
platforms = split(",", PLATFORMS)
tags = [for tag in split(",", IMAGE_TAGS) : "${REGISTRY}/${REPOSITORY}/ci-builder:${tag}"]
}
......@@ -8,7 +8,7 @@ require (
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0
github.com/ethereum-optimism/go-ethereum-hdwallet v0.1.3
github.com/ethereum-optimism/superchain-registry/superchain v0.0.0-20231018202221-fdba3d104171
github.com/ethereum-optimism/superchain-registry/superchain v0.0.0-20231030223232-e16eae11e492
github.com/ethereum/go-ethereum v1.13.1
github.com/fsnotify/fsnotify v1.7.0
github.com/go-chi/chi/v5 v5.0.10
......@@ -16,7 +16,7 @@ require (
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb
github.com/google/go-cmp v0.6.0
github.com/google/gofuzz v1.2.1-0.20220503160820-4a35382e8fc8
github.com/google/uuid v1.3.1
github.com/google/uuid v1.4.0
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/golang-lru/v2 v2.0.5
github.com/holiman/uint256 v1.2.3
......@@ -33,7 +33,7 @@ require (
github.com/multiformats/go-multiaddr v0.12.0
github.com/multiformats/go-multiaddr-dns v0.3.1
github.com/olekukonko/tablewriter v0.0.5
github.com/onsi/gomega v1.28.1
github.com/onsi/gomega v1.29.0
github.com/pkg/errors v0.9.1
github.com/pkg/profile v1.7.0
github.com/prometheus/client_golang v1.17.0
......@@ -44,14 +44,14 @@ require (
golang.org/x/sync v0.4.0
golang.org/x/term v0.13.0
golang.org/x/time v0.3.0
gorm.io/driver/postgres v1.5.3
gorm.io/driver/postgres v1.5.4
gorm.io/gorm v1.25.5
)
require (
github.com/DataDog/zstd v1.5.2 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/VictoriaMetrics/fastcache v1.10.0 // indirect
github.com/VictoriaMetrics/fastcache v1.12.1 // indirect
github.com/allegro/bigcache v1.2.1 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
......@@ -60,13 +60,13 @@ require (
github.com/btcsuite/btcd/btcutil v1.1.0 // indirect
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cockroachdb/errors v1.9.1 // indirect
github.com/cockroachdb/errors v1.11.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/pebble v0.0.0-20230928194634-aa077af62593 // indirect
github.com/cockroachdb/redact v1.1.3 // indirect
github.com/cockroachdb/pebble v0.0.0-20231018212520-f6cde3fc2fa4 // indirect
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
github.com/consensys/bavard v0.1.13 // indirect
github.com/consensys/gnark-crypto v0.12.0 // indirect
github.com/consensys/gnark-crypto v0.12.1 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
......@@ -82,7 +82,7 @@ require (
github.com/dop251/goja v0.0.0-20230806174421-c933cf95e127 // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/ethereum/c-kzg-4844 v0.3.1 // indirect
github.com/fatih/color v1.7.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/felixge/fgprof v0.9.3 // indirect
github.com/fjl/memsize v0.0.1 // indirect
github.com/flynn/noise v1.0.0 // indirect
......@@ -97,7 +97,7 @@ require (
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.4.2 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gopacket v1.1.19 // indirect
......@@ -209,7 +209,7 @@ require (
rsc.io/tmplfunc v0.0.3 // indirect
)
replace github.com/ethereum/go-ethereum v1.13.1 => github.com/ethereum-optimism/op-geth v1.101301.2-0.20231018201518-63125bd85c80
replace github.com/ethereum/go-ethereum v1.13.1 => github.com/ethereum-optimism/op-geth v1.101304.0-rc.2.0.20231030225546-cd491fa3b588
//replace github.com/ethereum-optimism/superchain-registry/superchain => ../superchain-registry/superchain
//replace github.com/ethereum/go-ethereum v1.13.1 => ../go-ethereum
This diff is collapsed.
......@@ -16,6 +16,7 @@ func newWithdrawalResponse(withdrawals *database.L2BridgeWithdrawalsResponse) mo
item := models.WithdrawalItem{
Guid: withdrawal.L2BridgeWithdrawal.TransactionWithdrawalHash.String(),
L2BlockHash: withdrawal.L2BlockHash.String(),
Timestamp: withdrawal.L2BridgeWithdrawal.Tx.Timestamp,
From: withdrawal.L2BridgeWithdrawal.Tx.FromAddress.String(),
To: withdrawal.L2BridgeWithdrawal.Tx.ToAddress.String(),
TransactionHash: withdrawal.L2TransactionHash.String(),
......
......@@ -153,11 +153,18 @@ func LoadConfig(log log.Logger, path string) (Config, error) {
data = []byte(os.ExpandEnv(string(data)))
log.Debug("parsed config file", "data", string(data))
if _, err := toml.Decode(string(data), &cfg); err != nil {
md, err := toml.Decode(string(data), &cfg)
if err != nil {
log.Error("failed to decode config file", "err", err)
return cfg, err
}
if len(md.Undecoded()) > 0 {
log.Error("unknown fields in config file", "fields", md.Undecoded())
err = fmt.Errorf("unknown fields in config file: %v", md.Undecoded())
return cfg, err
}
if cfg.Chain.Preset == DevnetPresetId {
preset, err := DevnetPreset()
if err != nil {
......
......@@ -257,3 +257,49 @@ func TestLocalDevnet(t *testing.T) {
require.Equal(t, devnetPreset.ChainConfig.L1Contracts, conf.Chain.L1Contracts)
}
func TestThrowsOnUnknownKeys(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo)
tmpfile, err := os.CreateTemp("", "test.toml")
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
defer tmpfile.Close()
testData := `
[chain]
unknown_key = 420
preset = 420
[rpcs]
l1-rpc = "https://l1.example.com"
l2-rpc = "https://l2.example.com"
[db]
another_unknownKey = 420
host = "127.0.0.1"
port = 5432
user = "postgres"
password = "postgres"
name = "indexer"
[http]
host = "127.0.0.1"
port = 8080
[metrics]
host = "127.0.0.1"
port = 7300
`
data := []byte(testData)
err = os.WriteFile(tmpfile.Name(), data, 0644)
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
err = tmpfile.Close()
require.NoError(t, err)
_, err = LoadConfig(logger, tmpfile.Name())
require.Error(t, err)
require.Contains(t, err.Error(), "unknown fields in config file")
}
......@@ -154,12 +154,11 @@ func (db *bridgeTransfersDB) L1BridgeDepositsByAddress(address common.Address, c
cursorClause = fmt.Sprintf("l1_transaction_deposits.timestamp <= %d", txDeposit.Tx.Timestamp)
}
// TODO join with l1_bridged_tokens and l2_bridged_tokens
ethAddressString := predeploys.LegacyERC20ETHAddr.String()
// Coalesce l1 transaction deposits that are simply ETH sends
ethTransactionDeposits := db.gorm.Model(&L1TransactionDeposit{})
ethTransactionDeposits = ethTransactionDeposits.Where(&Transaction{FromAddress: address}).Where("data = '0x' AND amount > 0")
ethTransactionDeposits = ethTransactionDeposits.Where(&Transaction{FromAddress: address}).Where("amount > 0")
ethTransactionDeposits = ethTransactionDeposits.Joins("INNER JOIN l1_contract_events ON l1_contract_events.guid = initiated_l1_event_guid")
ethTransactionDeposits = ethTransactionDeposits.Select(`
from_address, to_address, amount, data, source_hash AS transaction_source_hash,
......@@ -283,7 +282,7 @@ func (db *bridgeTransfersDB) L2BridgeWithdrawalsByAddress(address common.Address
// Coalesce l2 transaction withdrawals that are simply ETH sends
ethTransactionWithdrawals := db.gorm.Model(&L2TransactionWithdrawal{})
ethTransactionWithdrawals = ethTransactionWithdrawals.Where(&Transaction{FromAddress: address}).Where(`data = '0x' AND amount > 0`)
ethTransactionWithdrawals = ethTransactionWithdrawals.Where(&Transaction{FromAddress: address}).Where("amount > 0")
ethTransactionWithdrawals = ethTransactionWithdrawals.Joins("INNER JOIN l2_contract_events ON l2_contract_events.guid = l2_transaction_withdrawals.initiated_l2_event_guid")
ethTransactionWithdrawals = ethTransactionWithdrawals.Joins("LEFT JOIN l1_contract_events AS proven_l1_events ON proven_l1_events.guid = l2_transaction_withdrawals.proven_l1_event_guid")
ethTransactionWithdrawals = ethTransactionWithdrawals.Joins("LEFT JOIN l1_contract_events AS finalized_l1_events ON finalized_l1_events.guid = l2_transaction_withdrawals.finalized_l1_event_guid")
......
......@@ -3,6 +3,7 @@ package etl
import (
"context"
"errors"
"fmt"
"math/big"
"time"
......@@ -66,14 +67,18 @@ func (etl *ETL) Start(ctx context.Context) error {
if len(headers) > 0 {
etl.log.Info("retrying previous batch")
} else {
newHeaders, err := etl.headerTraversal.NextFinalizedHeaders(etl.headerBufferSize)
newHeaders, err := etl.headerTraversal.NextHeaders(etl.headerBufferSize)
if err != nil {
etl.log.Error("error querying for headers", "err", err)
} else if len(newHeaders) == 0 {
etl.log.Warn("no new headers. processor unexpectedly at head...")
etl.log.Warn("no new headers. etl at head?")
} else {
headers = newHeaders
etl.metrics.RecordBatchHeaders(len(newHeaders))
}
latestHeader := etl.headerTraversal.LatestHeader()
if latestHeader != nil {
etl.metrics.RecordLatestHeight(latestHeader.Number)
}
}
......@@ -97,7 +102,6 @@ func (etl *ETL) processBatch(headers []types.Header) error {
batchLog := etl.log.New("batch_start_block_number", firstHeader.Number, "batch_end_block_number", lastHeader.Number)
batchLog.Info("extracting batch", "size", len(headers))
etl.metrics.RecordBatchLatestHeight(lastHeader.Number)
headerMap := make(map[common.Hash]*types.Header, len(headers))
for i := range headers {
header := headers[i]
......@@ -105,31 +109,40 @@ func (etl *ETL) processBatch(headers []types.Header) error {
}
headersWithLog := make(map[common.Hash]bool, len(headers))
logs, err := etl.EthClient.FilterLogs(ethereum.FilterQuery{FromBlock: firstHeader.Number, ToBlock: lastHeader.Number, Addresses: etl.contracts})
filterQuery := ethereum.FilterQuery{FromBlock: firstHeader.Number, ToBlock: lastHeader.Number, Addresses: etl.contracts}
logs, err := etl.EthClient.FilterLogs(filterQuery)
if err != nil {
batchLog.Info("failed to extract logs", "err", err)
return err
}
if len(logs) > 0 {
batchLog.Info("detected logs", "size", len(logs))
if logs.ToBlockHeader.Number.Cmp(lastHeader.Number) != 0 {
// Warn and simply wait for the provider to synchronize state
batchLog.Warn("mismatch in FilterLog#ToBlock number", "queried_to_block_number", lastHeader.Number, "reported_to_block_number", logs.ToBlockHeader.Number)
return fmt.Errorf("mismatch in FilterLog#ToBlock number")
} else if logs.ToBlockHeader.Hash() != lastHeader.Hash() {
batchLog.Error("mismatch in FitlerLog#ToBlock block hash!!!", "queried_to_block_hash", lastHeader.Hash().String(), "reported_to_block_hash", logs.ToBlockHeader.Hash().String())
return fmt.Errorf("mismatch in FitlerLog#ToBlock block hash!!!")
}
if len(logs.Logs) > 0 {
batchLog.Info("detected logs", "size", len(logs.Logs))
}
for i := range logs {
log := logs[i]
for i := range logs.Logs {
log := logs.Logs[i]
headersWithLog[log.BlockHash] = true
if _, ok := headerMap[log.BlockHash]; !ok {
// NOTE. Definitely an error state if the none of the headers were re-orged out in between
// the blocks and logs retrieval operations. Unlikely as long as the confirmation depth has
// been appropriately set or when we get to natively handling reorgs.
batchLog.Error("log found with block hash not in the batch", "block_hash", logs[i].BlockHash, "log_index", logs[i].Index)
batchLog.Error("log found with block hash not in the batch", "block_hash", logs.Logs[i].BlockHash, "log_index", logs.Logs[i].Index)
return errors.New("parsed log with a block hash not in the batch")
}
etl.metrics.RecordBatchLog(log.Address)
headersWithLog[log.BlockHash] = true
}
// ensure we use unique downstream references for the etl batch
headersRef := headers
etl.etlBatches <- ETLBatch{Logger: batchLog, Headers: headersRef, HeaderMap: headerMap, Logs: logs, HeadersWithLog: headersWithLog}
etl.etlBatches <- ETLBatch{Logger: batchLog, Headers: headersRef, HeaderMap: headerMap, Logs: logs.Logs, HeadersWithLog: headersWithLog}
return nil
}
......@@ -108,6 +108,7 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error {
l1BlockHeaders = append(l1BlockHeaders, database.L1BlockHeader{BlockHeader: database.BlockHeaderFromHeader(&batch.Headers[i])})
}
}
if len(l1BlockHeaders) == 0 {
batch.Logger.Info("no l1 blocks with logs in batch")
continue
......@@ -117,6 +118,7 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error {
for i := range batch.Logs {
timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time
l1ContractEvents[i] = database.L1ContractEvent{ContractEvent: database.ContractEventFromLog(&batch.Logs[i], timestamp)}
l1Etl.ETL.metrics.RecordIndexedLog(batch.Logs[i].Address)
}
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out
......@@ -138,7 +140,6 @@ func (l1Etl *L1ETL) Start(ctx context.Context) error {
l1Etl.ETL.metrics.RecordIndexedHeaders(len(l1BlockHeaders))
l1Etl.ETL.metrics.RecordIndexedLatestHeight(l1BlockHeaders[len(l1BlockHeaders)-1].Number)
l1Etl.ETL.metrics.RecordIndexedLogs(len(l1ContractEvents))
// a-ok!
return nil, nil
......
......@@ -62,7 +62,7 @@ func TestL1ETLConstruction(t *testing.T) {
},
assertion: func(etl *L1ETL, err error) {
require.NoError(t, err)
require.Equal(t, etl.headerTraversal.LastHeader().ParentHash, common.HexToHash("0x69"))
require.Equal(t, etl.headerTraversal.LastTraversedHeader().ParentHash, common.HexToHash("0x69"))
},
},
{
......@@ -94,7 +94,7 @@ func TestL1ETLConstruction(t *testing.T) {
},
assertion: func(etl *L1ETL, err error) {
require.NoError(t, err)
header := etl.headerTraversal.LastHeader()
header := etl.headerTraversal.LastTraversedHeader()
require.True(t, header.Number.Cmp(big.NewInt(69)) == 0)
},
......
......@@ -93,6 +93,7 @@ func (l2Etl *L2ETL) Start(ctx context.Context) error {
for i := range batch.Logs {
timestamp := batch.HeaderMap[batch.Logs[i].BlockHash].Time
l2ContractEvents[i] = database.L2ContractEvent{ContractEvent: database.ContractEventFromLog(&batch.Logs[i], timestamp)}
l2Etl.ETL.metrics.RecordIndexedLog(batch.Logs[i].Address)
}
// Continually try to persist this batch. If it fails after 10 attempts, we simply error out
......@@ -115,9 +116,6 @@ func (l2Etl *L2ETL) Start(ctx context.Context) error {
l2Etl.ETL.metrics.RecordIndexedHeaders(len(l2BlockHeaders))
l2Etl.ETL.metrics.RecordIndexedLatestHeight(l2BlockHeaders[len(l2BlockHeaders)-1].Number)
if len(l2ContractEvents) > 0 {
l2Etl.ETL.metrics.RecordIndexedLogs(len(l2ContractEvents))
}
// a-ok!
return nil, nil
......
......@@ -9,35 +9,28 @@ import (
)
var (
MetricsNamespace string = "etl"
MetricsNamespace string = "op_indexer_etl"
)
type Metricer interface {
RecordInterval() (done func(err error))
// Batch Extraction
RecordBatchLatestHeight(height *big.Int)
RecordBatchHeaders(size int)
RecordBatchLog(contractAddress common.Address)
RecordLatestHeight(height *big.Int)
// Indexed Batches
RecordIndexedLatestHeight(height *big.Int)
RecordIndexedHeaders(size int)
RecordIndexedLogs(size int)
RecordIndexedLog(contractAddress common.Address)
}
type etlMetrics struct {
intervalTick prometheus.Counter
intervalDuration prometheus.Histogram
batchFailures prometheus.Counter
batchLatestHeight prometheus.Gauge
batchHeaders prometheus.Counter
batchLogs *prometheus.CounterVec
intervalFailures prometheus.Counter
latestHeight prometheus.Gauge
indexedLatestHeight prometheus.Gauge
indexedHeaders prometheus.Counter
indexedLogs prometheus.Counter
indexedLogs *prometheus.CounterVec
}
func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
......@@ -55,31 +48,17 @@ func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
Name: "interval_seconds",
Help: "duration elapsed for during the processing loop",
}),
batchFailures: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "failures_total",
Help: "number of times the etl encountered a failure to extract a batch",
}),
batchLatestHeight: factory.NewGauge(prometheus.GaugeOpts{
intervalFailures: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "height",
Help: "the latest block height observed by an etl interval",
Name: "interval_failures_total",
Help: "number of times the etl encountered a failure during the processing loop",
}),
batchHeaders: factory.NewCounter(prometheus.CounterOpts{
latestHeight: factory.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "headers_total",
Help: "number of headers observed by the etl",
}),
batchLogs: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "logs_total",
Help: "number of logs observed by the etl",
}, []string{
"contract",
Name: "latest_height",
Help: "the latest height reported by the connected client",
}),
indexedLatestHeight: factory.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
......@@ -93,11 +72,13 @@ func NewMetrics(registry *prometheus.Registry, subsystem string) Metricer {
Name: "indexed_headers_total",
Help: "number of headers indexed by the etl",
}),
indexedLogs: factory.NewCounter(prometheus.CounterOpts{
indexedLogs: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: subsystem,
Name: "indexed_logs_total",
Help: "number of logs indexed by the etl",
}, []string{
"contract",
}),
}
}
......@@ -107,23 +88,14 @@ func (m *etlMetrics) RecordInterval() func(error) {
timer := prometheus.NewTimer(m.intervalDuration)
return func(err error) {
if err != nil {
m.batchFailures.Inc()
m.intervalFailures.Inc()
}
timer.ObserveDuration()
}
}
func (m *etlMetrics) RecordBatchLatestHeight(height *big.Int) {
m.batchLatestHeight.Set(float64(height.Uint64()))
}
func (m *etlMetrics) RecordBatchHeaders(size int) {
m.batchHeaders.Add(float64(size))
}
func (m *etlMetrics) RecordBatchLog(contractAddress common.Address) {
m.batchLogs.WithLabelValues(contractAddress.String()).Inc()
func (m *etlMetrics) RecordLatestHeight(height *big.Int) {
m.latestHeight.Set(float64(height.Uint64()))
}
func (m *etlMetrics) RecordIndexedLatestHeight(height *big.Int) {
......@@ -134,6 +106,6 @@ func (m *etlMetrics) RecordIndexedHeaders(size int) {
m.indexedHeaders.Add(float64(size))
}
func (m *etlMetrics) RecordIndexedLogs(size int) {
m.indexedLogs.Add(float64(size))
func (m *etlMetrics) RecordIndexedLog(addr common.Address) {
m.indexedLogs.WithLabelValues(addr.String()).Inc()
}
......@@ -39,7 +39,7 @@ type EthClient interface {
TxByHash(common.Hash) (*types.Transaction, error)
StorageHash(common.Address, *big.Int) (common.Hash, error)
FilterLogs(ethereum.FilterQuery) ([]types.Log, error)
FilterLogs(ethereum.FilterQuery) (Logs, error)
}
type clnt struct {
......@@ -122,15 +122,12 @@ func (c *clnt) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.Hea
}
count := new(big.Int).Sub(endHeight, startHeight).Uint64() + 1
headers := make([]types.Header, count)
batchElems := make([]rpc.BatchElem, count)
for i := uint64(0); i < count; i++ {
height := new(big.Int).Add(startHeight, new(big.Int).SetUint64(i))
batchElems[i] = rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []interface{}{toBlockNumArg(height), false},
Result: new(types.Header),
Error: nil,
}
batchElems[i] = rpc.BatchElem{Method: "eth_getBlockByNumber", Args: []interface{}{toBlockNumArg(height), false}, Result: &headers[i]}
}
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
......@@ -144,23 +141,21 @@ func (c *clnt) BlockHeadersByRange(startHeight, endHeight *big.Int) ([]types.Hea
// - Ensure integrity that they build on top of each other
// - Truncate out headers that do not exist (endHeight > "latest")
size := 0
headers := make([]types.Header, count)
for i, batchElem := range batchElems {
if batchElem.Error != nil {
return nil, batchElem.Error
if size == 0 {
return nil, batchElem.Error
} else {
break // try return whatever headers are available
}
} else if batchElem.Result == nil {
break
}
header, ok := batchElem.Result.(*types.Header)
if !ok {
return nil, fmt.Errorf("unable to transform rpc response %v into types.Header", batchElem.Result)
}
if i > 0 && header.ParentHash != headers[i-1].Hash() {
return nil, fmt.Errorf("queried header %s does not follow parent %s", header.Hash(), headers[i-1].Hash())
if i > 0 && headers[i].ParentHash != headers[i-1].Hash() {
return nil, fmt.Errorf("queried header %s does not follow parent %s", headers[i].Hash(), headers[i-1].Hash())
}
headers[i] = *header
size = size + 1
}
......@@ -197,19 +192,43 @@ func (c *clnt) StorageHash(address common.Address, blockNumber *big.Int) (common
return proof.StorageHash, nil
}
// FilterLogs returns logs that fit the query parameters
func (c *clnt) FilterLogs(query ethereum.FilterQuery) ([]types.Log, error) {
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
type Logs struct {
Logs []types.Log
ToBlockHeader *types.Header
}
var result []types.Log
// FilterLogs returns logs that fit the query parameters. The underlying request is a batch
// request including `eth_getBlockByNumber` to allow the caller to check that connected
// node has the state necessary to fulfill this request
func (c *clnt) FilterLogs(query ethereum.FilterQuery) (Logs, error) {
arg, err := toFilterArg(query)
if err != nil {
return nil, err
return Logs{}, err
}
err = c.rpc.CallContext(ctxwt, &result, "eth_getLogs", arg)
return result, err
var logs []types.Log
var header types.Header
batchElems := make([]rpc.BatchElem, 2)
batchElems[0] = rpc.BatchElem{Method: "eth_getBlockByNumber", Args: []interface{}{toBlockNumArg(query.ToBlock), false}, Result: &header}
batchElems[1] = rpc.BatchElem{Method: "eth_getLogs", Args: []interface{}{arg}, Result: &logs}
ctxwt, cancel := context.WithTimeout(context.Background(), defaultRequestTimeout)
defer cancel()
err = c.rpc.BatchCallContext(ctxwt, batchElems)
if err != nil {
return Logs{}, err
}
if batchElems[0].Error != nil {
return Logs{}, fmt.Errorf("unable to query for the `FilterQuery#ToBlock` header: %w", batchElems[0].Error)
}
if batchElems[1].Error != nil {
return Logs{}, fmt.Errorf("unable to query logs: %w", batchElems[1].Error)
}
return Logs{Logs: logs, ToBlockHeader: &header}, nil
}
// Modeled off op-service/client.go. We can refactor this once the client/metrics portion
......@@ -262,10 +281,7 @@ func toBlockNumArg(number *big.Int) string {
}
func toFilterArg(q ethereum.FilterQuery) (interface{}, error) {
arg := map[string]interface{}{
"address": q.Addresses,
"topics": q.Topics,
}
arg := map[string]interface{}{"address": q.Addresses, "topics": q.Topics}
if q.BlockHash != nil {
arg["blockHash"] = *q.BlockHash
if q.FromBlock != nil || q.ToBlock != nil {
......
......@@ -17,38 +17,56 @@ var (
type HeaderTraversal struct {
ethClient EthClient
lastHeader *types.Header
latestHeader *types.Header
lastTraversedHeader *types.Header
blockConfirmationDepth *big.Int
}
// NewHeaderTraversal instantiates a new instance of HeaderTraversal against the supplied rpc client.
// The HeaderTraversal will start fetching blocks starting from the supplied header unless nil, indicating genesis.
func NewHeaderTraversal(ethClient EthClient, fromHeader *types.Header, confDepth *big.Int) *HeaderTraversal {
return &HeaderTraversal{ethClient: ethClient, lastHeader: fromHeader, blockConfirmationDepth: confDepth}
return &HeaderTraversal{
ethClient: ethClient,
lastTraversedHeader: fromHeader,
blockConfirmationDepth: confDepth,
}
}
// LatestHeader returns the latest header reported by underlying eth client
// as headers are traversed via `NextHeaders`.
func (f *HeaderTraversal) LatestHeader() *types.Header {
return f.latestHeader
}
// LastHeader returns the last header that was fetched by the HeaderTraversal
// This is useful for testing the state of the HeaderTraversal
func (f *HeaderTraversal) LastHeader() *types.Header {
return f.lastHeader
// LastTraversedHeader returns the last header traversed.
// - This is useful for testing the state of the HeaderTraversal
// - LastTraversedHeader may be << LatestHeader depending on the number
// headers traversed via `NextHeaders`.
func (f *HeaderTraversal) LastTraversedHeader() *types.Header {
return f.lastTraversedHeader
}
// NextFinalizedHeaders retrieves the next set of headers that have been
// NextHeaders retrieves the next set of headers that have been
// marked as finalized by the connected client, bounded by the supplied size
func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, error) {
latestBlockHeader, err := f.ethClient.BlockHeaderByNumber(nil)
func (f *HeaderTraversal) NextHeaders(maxSize uint64) ([]types.Header, error) {
latestHeader, err := f.ethClient.BlockHeaderByNumber(nil)
if err != nil {
return nil, fmt.Errorf("unable to query latest block: %w", err)
} else if latestHeader == nil {
return nil, fmt.Errorf("latest header unreported")
} else {
f.latestHeader = latestHeader
}
endHeight := new(big.Int).Sub(latestBlockHeader.Number, f.blockConfirmationDepth)
endHeight := new(big.Int).Sub(latestHeader.Number, f.blockConfirmationDepth)
if endHeight.Sign() < 0 {
// No blocks with the provided confirmation depth available
return nil, nil
}
if f.lastHeader != nil {
cmp := f.lastHeader.Number.Cmp(endHeight)
if f.lastTraversedHeader != nil {
cmp := f.lastTraversedHeader.Number.Cmp(endHeight)
if cmp == 0 { // We're synced to head and there are no new headers
return nil, nil
} else if cmp > 0 {
......@@ -57,8 +75,8 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header,
}
nextHeight := bigint.Zero
if f.lastHeader != nil {
nextHeight = new(big.Int).Add(f.lastHeader.Number, bigint.One)
if f.lastTraversedHeader != nil {
nextHeight = new(big.Int).Add(f.lastTraversedHeader.Number, bigint.One)
}
// endHeight = (nextHeight - endHeight) <= maxSize
......@@ -71,12 +89,12 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header,
numHeaders := len(headers)
if numHeaders == 0 {
return nil, nil
} else if f.lastHeader != nil && headers[0].ParentHash != f.lastHeader.Hash() {
} else if f.lastTraversedHeader != nil && headers[0].ParentHash != f.lastTraversedHeader.Hash() {
// The indexer's state is in an irrecoverable state relative to the provider. This
// should never happen since the indexer is dealing with only finalized blocks.
return nil, ErrHeaderTraversalAndProviderMismatchedState
}
f.lastHeader = &headers[numHeaders-1]
f.lastTraversedHeader = &headers[numHeaders-1]
return headers, nil
}
......@@ -33,44 +33,55 @@ func makeHeaders(numHeaders uint64, prevHeader *types.Header) []types.Header {
return headers
}
func TestHeaderTraversalNextFinalizedHeadersNoOp(t *testing.T) {
func TestHeaderTraversalNextHeadersNoOp(t *testing.T) {
client := new(MockEthClient)
// start from block 10 as the latest fetched block
lastHeader := &types.Header{Number: big.NewInt(10)}
headerTraversal := NewHeaderTraversal(client, lastHeader, bigint.Zero)
LastTraversedHeader := &types.Header{Number: big.NewInt(10)}
headerTraversal := NewHeaderTraversal(client, LastTraversedHeader, bigint.Zero)
require.Nil(t, headerTraversal.LatestHeader())
require.NotNil(t, headerTraversal.LastTraversedHeader())
// no new headers when matched with head
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(lastHeader, nil)
headers, err := headerTraversal.NextFinalizedHeaders(100)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(LastTraversedHeader, nil)
headers, err := headerTraversal.NextHeaders(100)
require.NoError(t, err)
require.Empty(t, headers)
require.NotNil(t, headerTraversal.LatestHeader())
require.NotNil(t, headerTraversal.LastTraversedHeader())
require.Equal(t, LastTraversedHeader.Number.Uint64(), headerTraversal.LatestHeader().Number.Uint64())
}
func TestHeaderTraversalNextFinalizedHeadersCursored(t *testing.T) {
func TestHeaderTraversalNextHeadersCursored(t *testing.T) {
client := new(MockEthClient)
// start from genesis
headerTraversal := NewHeaderTraversal(client, nil, bigint.Zero)
// blocks [0..4]
headers := makeHeaders(5, nil)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5)
headers := makeHeaders(10, nil)
// blocks [0..4]. Latest reported is 7
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[7], nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers[:5], nil)
_, err := headerTraversal.NextHeaders(5)
require.NoError(t, err)
require.Len(t, headers, 5)
// blocks [5..9]
headers = makeHeaders(5, &headers[len(headers)-1])
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(9))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(5)
require.Equal(t, uint64(7), headerTraversal.LatestHeader().Number.Uint64())
require.Equal(t, uint64(4), headerTraversal.LastTraversedHeader().Number.Uint64())
// blocks [5..9]. Latest Reported is 9
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[9], nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(9))).Return(headers[5:], nil)
_, err = headerTraversal.NextHeaders(5)
require.NoError(t, err)
require.Len(t, headers, 5)
require.Equal(t, uint64(9), headerTraversal.LatestHeader().Number.Uint64())
require.Equal(t, uint64(9), headerTraversal.LastTraversedHeader().Number.Uint64())
}
func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) {
func TestHeaderTraversalNextHeadersMaxSize(t *testing.T) {
client := new(MockEthClient)
// start from genesis
......@@ -82,16 +93,22 @@ func TestHeaderTraversalNextFinalizedHeadersMaxSize(t *testing.T) {
// clamped by the supplied size
headers := makeHeaders(5, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5)
headers, err := headerTraversal.NextHeaders(5)
require.NoError(t, err)
require.Len(t, headers, 5)
require.Equal(t, uint64(100), headerTraversal.LatestHeader().Number.Uint64())
require.Equal(t, uint64(4), headerTraversal.LastTraversedHeader().Number.Uint64())
// clamped by the supplied size. FinalizedHeight == 100
headers = makeHeaders(10, &headers[len(headers)-1])
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(14))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(10)
headers, err = headerTraversal.NextHeaders(10)
require.NoError(t, err)
require.Len(t, headers, 10)
require.Equal(t, uint64(100), headerTraversal.LatestHeader().Number.Uint64())
require.Equal(t, uint64(14), headerTraversal.LastTraversedHeader().Number.Uint64())
}
func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
......@@ -104,7 +121,7 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
headers := makeHeaders(5, nil)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&headers[4], nil).Times(1) // Times so that we can override next
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(0)), mock.MatchedBy(bigint.Matcher(4))).Return(headers, nil)
headers, err := headerTraversal.NextFinalizedHeaders(5)
headers, err := headerTraversal.NextHeaders(5)
require.NoError(t, err)
require.Len(t, headers, 5)
......@@ -112,7 +129,7 @@ func TestHeaderTraversalMismatchedProviderStateError(t *testing.T) {
headers = makeHeaders(5, nil)
client.On("BlockHeaderByNumber", (*big.Int)(nil)).Return(&types.Header{Number: big.NewInt(9)}, nil)
client.On("BlockHeadersByRange", mock.MatchedBy(bigint.Matcher(5)), mock.MatchedBy(bigint.Matcher(9))).Return(headers, nil)
headers, err = headerTraversal.NextFinalizedHeaders(5)
headers, err = headerTraversal.NextHeaders(5)
require.Nil(t, headers)
require.Equal(t, ErrHeaderTraversalAndProviderMismatchedState, err)
}
......@@ -12,7 +12,7 @@ import (
)
var (
MetricsNamespace = "rpc"
MetricsNamespace = "op_indexer_rpc"
batchMethod = "<batch>"
)
......
......@@ -41,7 +41,7 @@ func (m *MockEthClient) StorageHash(address common.Address, blockNumber *big.Int
return args.Get(0).(common.Hash), args.Error(1)
}
func (m *MockEthClient) FilterLogs(query ethereum.FilterQuery) ([]types.Log, error) {
func (m *MockEthClient) FilterLogs(query ethereum.FilterQuery) (Logs, error) {
args := m.Called(query)
return args.Get(0).([]types.Log), args.Error(1)
return args.Get(0).(Logs), args.Error(1)
}
......@@ -231,6 +231,9 @@ func (b *BridgeProcessor) run() error {
batchLog.Info("indexed bridge events", "latest_l1_block_number", toL1Height, "latest_l2_block_number", toL2Height)
b.LatestL1Header = latestEpoch.L1BlockHeader.RLPHeader.Header()
b.metrics.RecordLatestIndexedL1Height(b.LatestL1Header.Number)
b.LatestL2Header = latestEpoch.L2BlockHeader.RLPHeader.Header()
b.metrics.RecordLatestIndexedL2Height(b.LatestL2Header.Number)
return nil
}
......@@ -5,7 +5,6 @@ import (
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/indexer/bigint"
......@@ -40,8 +39,8 @@ func LegacyL1ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, metri
ctcTxDeposits[logKey{deposit.Event.BlockHash, deposit.Event.LogIndex}] = &deposit
mintedWEI = new(big.Int).Add(mintedWEI, deposit.Tx.Amount)
// We re-use the L2 Transaction hash as the source hash to remain consistent in the schema.
transactionDeposits[i] = database.L1TransactionDeposit{
// We re-use the L2 Transaction hash as the source hash to remain consistent in the schema.
SourceHash: deposit.TxHash,
L2TransactionHash: deposit.TxHash,
InitiatedL1EventGUID: deposit.Event.GUID,
......@@ -175,11 +174,9 @@ func LegacyL2ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, metri
sentMessage := crossDomainSentMessages[i]
withdrawnWEI = new(big.Int).Add(withdrawnWEI, sentMessage.BridgeMessage.Tx.Amount)
// To ensure consistency in the schema, we duplicate this as the "root" transaction withdrawal. The storage key in the message
// passer contract is sha3(calldata + sender). The sender always being the L2CrossDomainMessenger pre-bedrock.
withdrawalHash := crypto.Keccak256Hash(append(sentMessage.MessageCalldata, l2Contracts.L2CrossDomainMessenger[:]...))
// We re-use the L2CrossDomainMessenger message hash as the withdrawal hash to remain consistent in the schema.
transactionWithdrawals[i] = database.L2TransactionWithdrawal{
WithdrawalHash: withdrawalHash,
WithdrawalHash: sentMessage.BridgeMessage.MessageHash,
InitiatedL2EventGUID: sentMessage.Event.GUID,
Nonce: sentMessage.BridgeMessage.Nonce,
GasLimit: sentMessage.BridgeMessage.GasLimit,
......@@ -192,9 +189,9 @@ func LegacyL2ProcessInitiatedBridgeEvents(log log.Logger, db *database.DB, metri
},
}
sentMessages[logKey{sentMessage.Event.BlockHash, sentMessage.Event.LogIndex}] = sentMessageEvent{&sentMessage, withdrawalHash}
sentMessages[logKey{sentMessage.Event.BlockHash, sentMessage.Event.LogIndex}] = sentMessageEvent{&sentMessage, sentMessage.BridgeMessage.MessageHash}
bridgeMessages[i] = database.L2BridgeMessage{
TransactionWithdrawalHash: withdrawalHash,
TransactionWithdrawalHash: sentMessage.BridgeMessage.MessageHash,
BridgeMessage: sentMessage.BridgeMessage,
}
}
......@@ -285,7 +282,8 @@ func LegacyL1ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, metri
// for OP-Mainnet pre-regensis withdrawals that no longer exist on L2.
tx, err := l1Client.TxByHash(relayedMessage.Event.TransactionHash)
if err != nil {
return err
log.Error("unable to query legacy relayed tx", "tx_hash", relayedMessage.Event.TransactionHash.String(), "err", err)
return fmt.Errorf("unable to query legacy relayed tx_hash = %s: %w", relayedMessage.Event.TransactionHash.String(), err)
} else if tx == nil {
log.Error("missing tx for relayed message", "tx_hash", relayedMessage.Event.TransactionHash.String())
return fmt.Errorf("missing tx for relayed message. tx_hash = %s", relayedMessage.Event.TransactionHash.String())
......@@ -311,7 +309,7 @@ func LegacyL1ProcessFinalizedBridgeEvents(log log.Logger, db *database.DB, metri
}
}
// Mark the associated tx withdrawal as proven/finalized with the same event
// Mark the associated tx withdrawal as proven/finalized with the same event. The message hash is also the transaction withdrawal hash
if err := db.BridgeTransactions.MarkL2TransactionWithdrawalProvenEvent(relayedMessage.MessageHash, relayedMessage.Event.GUID); err != nil {
log.Error("failed to mark withdrawal as proven", "err", err)
return err
......
......@@ -10,7 +10,7 @@ import (
)
var (
MetricsNamespace string = "bridge"
MetricsNamespace string = "op_indexer_bridge"
)
type L1Metricer interface {
......@@ -83,7 +83,7 @@ func NewMetrics(registry *prometheus.Registry) Metricer {
}),
intervalFailures: factory.NewCounter(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "failures_total",
Name: "interval_failures_total",
Help: "number of failures encountered",
}),
latestL1Height: factory.NewGauge(prometheus.GaugeOpts{
......
ARG OP_STACK_GO_BUILDER=us-docker.pkg.dev/oplabs-tools-artifacts/images/op_stack_go:latest
ARG OP_STACK_GO_BUILDER=us-docker.pkg.dev/oplabs-tools-artifacts/images/op-stack-go:latest
FROM $OP_STACK_GO_BUILDER as builder
# See "make golang-docker" and /ops/docker/op-stack-go
......
......@@ -5,6 +5,7 @@ import (
"math"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/core/types"
......@@ -26,8 +27,8 @@ type channel struct {
confirmedTransactions map[txID]eth.BlockID
}
func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) (*channel, error) {
cb, err := newChannelBuilder(cfg)
func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rcfg *rollup.Config) (*channel, error) {
cb, err := newChannelBuilder(cfg, rcfg)
if err != nil {
return nil, fmt.Errorf("creating new channel: %w", err)
}
......
......@@ -8,6 +8,7 @@ import (
"math"
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/core/types"
)
......@@ -58,6 +59,9 @@ type ChannelConfig struct {
// CompressorConfig contains the configuration for creating new compressors.
CompressorConfig compressor.Config
// BatchType indicates whether the channel uses SingularBatch or SpanBatch.
BatchType uint
}
// Check validates the [ChannelConfig] parameters.
......@@ -83,6 +87,10 @@ func (cc *ChannelConfig) Check() error {
return fmt.Errorf("max frame size %d is less than the minimum 23", cc.MaxFrameSize)
}
if cc.BatchType > derive.SpanBatchType {
return fmt.Errorf("unrecognized batch type: %d", cc.BatchType)
}
return nil
}
......@@ -114,7 +122,7 @@ type channelBuilder struct {
// guaranteed to be a ChannelFullError wrapping the specific reason.
fullErr error
// current channel
co *derive.ChannelOut
co derive.ChannelOut
// list of blocks in the channel. Saved in case the channel must be rebuilt
blocks []*types.Block
// frames data queue, to be send as txs
......@@ -127,12 +135,16 @@ type channelBuilder struct {
// newChannelBuilder creates a new channel builder or returns an error if the
// channel out could not be created.
func newChannelBuilder(cfg ChannelConfig) (*channelBuilder, error) {
func newChannelBuilder(cfg ChannelConfig, rcfg *rollup.Config) (*channelBuilder, error) {
c, err := cfg.CompressorConfig.NewCompressor()
if err != nil {
return nil, err
}
co, err := derive.NewChannelOut(c)
var spanBatchBuilder *derive.SpanBatchBuilder
if cfg.BatchType == derive.SpanBatchType {
spanBatchBuilder = derive.NewSpanBatchBuilder(rcfg.Genesis.L2Time, rcfg.L2ChainID)
}
co, err := derive.NewChannelOut(cfg.BatchType, c, spanBatchBuilder)
if err != nil {
return nil, err
}
......@@ -194,12 +206,12 @@ func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error
return derive.L1BlockInfo{}, c.FullErr()
}
batch, l1info, err := derive.BlockToBatch(block)
batch, l1info, err := derive.BlockToSingularBatch(block)
if err != nil {
return l1info, fmt.Errorf("converting block to batch: %w", err)
}
if _, err = c.co.AddBatch(batch); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.CompressorFullErr) {
if _, err = c.co.AddSingularBatch(batch, l1info.SequenceNumber); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.CompressorFullErr) {
c.setFullErr(err)
return l1info, c.FullErr()
} else if err != nil {
......@@ -252,7 +264,7 @@ func (c *channelBuilder) updateDurationTimeout(l1BlockNum uint64) {
// derived from the batch's origin L1 block. The timeout is only moved forward
// if the derived sequencer window timeout is earlier than the currently set
// timeout.
func (c *channelBuilder) updateSwTimeout(batch *derive.BatchData) {
func (c *channelBuilder) updateSwTimeout(batch *derive.SingularBatch) {
timeout := uint64(batch.EpochNum) + c.cfg.SeqWindowSize - c.cfg.SubSafetyMargin
c.updateTimeout(timeout, ErrSeqWindowClose)
}
......
This diff is collapsed.
......@@ -7,6 +7,7 @@ import (
"sync"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
......@@ -28,6 +29,7 @@ type channelManager struct {
log log.Logger
metr metrics.Metricer
cfg ChannelConfig
rcfg *rollup.Config
// All blocks since the last request for new tx data.
blocks []*types.Block
......@@ -45,17 +47,18 @@ type channelManager struct {
closed bool
}
func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) *channelManager {
func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rcfg *rollup.Config) *channelManager {
return &channelManager{
log: log,
metr: metr,
cfg: cfg,
rcfg: rcfg,
txChannels: make(map[txID]*channel),
}
}
// Clear clears the entire state of the channel manager.
// It is intended to be used after an L2 reorg.
// It is intended to be used before launching op-batcher and after an L2 reorg.
func (s *channelManager) Clear() {
s.mu.Lock()
defer s.mu.Unlock()
......@@ -195,7 +198,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
return nil
}
pc, err := newChannel(s.log, s.metr, s.cfg)
pc, err := newChannel(s.log, s.metr, s.cfg, s.rcfg)
if err != nil {
return fmt.Errorf("creating new channel: %w", err)
}
......
......@@ -9,6 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
derivetest "github.com/ethereum-optimism/optimism/op-node/rollup/derive/test"
"github.com/ethereum-optimism/optimism/op-service/eth"
......@@ -19,11 +20,41 @@ import (
"github.com/stretchr/testify/require"
)
// TestChannelManagerReturnsErrReorg ensures that the channel manager
func TestChannelManagerBatchType(t *testing.T) {
tests := []struct {
name string
f func(t *testing.T, batchType uint)
}{
{"ChannelManagerReturnsErrReorg", ChannelManagerReturnsErrReorg},
{"ChannelManagerReturnsErrReorgWhenDrained", ChannelManagerReturnsErrReorgWhenDrained},
{"ChannelManager_Clear", ChannelManager_Clear},
{"ChannelManager_TxResend", ChannelManager_TxResend},
{"ChannelManagerCloseBeforeFirstUse", ChannelManagerCloseBeforeFirstUse},
{"ChannelManagerCloseNoPendingChannel", ChannelManagerCloseNoPendingChannel},
{"ChannelManagerClosePendingChannel", ChannelManagerClosePendingChannel},
{"ChannelManagerCloseAllTxsFailed", ChannelManagerCloseAllTxsFailed},
}
for _, test := range tests {
test := test
t.Run(test.name+"_SingularBatch", func(t *testing.T) {
test.f(t, derive.SingularBatchType)
})
}
for _, test := range tests {
test := test
t.Run(test.name+"_SpanBatch", func(t *testing.T) {
test.f(t, derive.SpanBatchType)
})
}
}
// ChannelManagerReturnsErrReorg ensures that the channel manager
// detects a reorg when it has cached L1 blocks.
func TestChannelManagerReturnsErrReorg(t *testing.T) {
func ChannelManagerReturnsErrReorg(t *testing.T, batchType uint) {
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{})
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{BatchType: batchType}, &rollup.Config{})
m.Clear()
a := types.NewBlock(&types.Header{
Number: big.NewInt(0),
......@@ -49,9 +80,9 @@ func TestChannelManagerReturnsErrReorg(t *testing.T) {
require.Equal(t, []*types.Block{a, b, c}, m.blocks)
}
// TestChannelManagerReturnsErrReorgWhenDrained ensures that the channel manager
// ChannelManagerReturnsErrReorgWhenDrained ensures that the channel manager
// detects a reorg even if it does not have any blocks inside it.
func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
func ChannelManagerReturnsErrReorgWhenDrained(t *testing.T, batchType uint) {
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
......@@ -61,7 +92,11 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&rollup.Config{},
)
m.Clear()
a := newMiniL2Block(0)
x := newMiniL2BlockWithNumberParent(0, big.NewInt(1), common.Hash{0xff})
......@@ -76,8 +111,8 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) {
require.ErrorIs(t, m.AddL2Block(x), ErrReorg)
}
// TestChannelManager_Clear tests clearing the channel manager.
func TestChannelManager_Clear(t *testing.T) {
// ChannelManager_Clear tests clearing the channel manager.
func ChannelManager_Clear(t *testing.T, batchType uint) {
require := require.New(t)
// Create a channel manager
......@@ -96,7 +131,10 @@ func TestChannelManager_Clear(t *testing.T) {
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&defaultTestRollupConfig,
)
// Channel Manager state should be empty by default
require.Empty(m.blocks)
......@@ -104,9 +142,11 @@ func TestChannelManager_Clear(t *testing.T) {
require.Nil(m.currentChannel)
require.Empty(m.channelQueue)
require.Empty(m.txChannels)
// Set the last block
m.Clear()
// Add a block to the channel manager
a, _ := derivetest.RandomL2Block(rng, 4)
a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID)
newL1Tip := a.Hash()
l1BlockID := eth.BlockID{
Hash: a.Hash(),
......@@ -153,7 +193,7 @@ func TestChannelManager_Clear(t *testing.T) {
require.Empty(m.txChannels)
}
func TestChannelManager_TxResend(t *testing.T) {
func ChannelManager_TxResend(t *testing.T, batchType uint) {
require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LvlError)
......@@ -165,9 +205,13 @@ func TestChannelManager_TxResend(t *testing.T) {
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&defaultTestRollupConfig,
)
m.Clear()
a, _ := derivetest.RandomL2Block(rng, 4)
a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID)
require.NoError(m.AddL2Block(a))
......@@ -195,9 +239,9 @@ func TestChannelManager_TxResend(t *testing.T) {
require.Len(fs, 1)
}
// TestChannelManagerCloseBeforeFirstUse ensures that the channel manager
// ChannelManagerCloseBeforeFirstUse ensures that the channel manager
// will not produce any frames if closed immediately.
func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
func ChannelManagerCloseBeforeFirstUse(t *testing.T, batchType uint) {
require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LvlCrit)
......@@ -209,9 +253,13 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
TargetFrameSize: 0,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&defaultTestRollupConfig,
)
m.Clear()
a, _ := derivetest.RandomL2Block(rng, 4)
a := derivetest.RandomL2BlockWithChainId(rng, 4, defaultTestRollupConfig.L2ChainID)
m.Close()
......@@ -222,10 +270,10 @@ func TestChannelManagerCloseBeforeFirstUse(t *testing.T) {
require.ErrorIs(err, io.EOF, "Expected closed channel manager to contain no tx data")
}
// TestChannelManagerCloseNoPendingChannel ensures that the channel manager
// ChannelManagerCloseNoPendingChannel ensures that the channel manager
// can gracefully close with no pending channels, and will not emit any new
// channel frames.
func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
func ChannelManagerCloseNoPendingChannel(t *testing.T, batchType uint) {
require := require.New(t)
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
......@@ -237,7 +285,11 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
TargetNumFrames: 1,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&defaultTestRollupConfig,
)
m.Clear()
a := newMiniL2Block(0)
b := newMiniL2BlockWithNumberParent(0, big.NewInt(1), a.Hash())
......@@ -261,25 +313,37 @@ func TestChannelManagerCloseNoPendingChannel(t *testing.T) {
require.ErrorIs(err, io.EOF, "Expected closed channel manager to return no new tx data")
}
// TestChannelManagerCloseNoPendingChannel ensures that the channel manager
// ChannelManagerCloseNoPendingChannel ensures that the channel manager
// can gracefully close with a pending channel, and will not produce any
// new channel frames after this point.
func TestChannelManagerClosePendingChannel(t *testing.T) {
func ChannelManagerClosePendingChannel(t *testing.T, batchType uint) {
require := require.New(t)
// The number of batch txs depends on compression of the random data, hence the static test RNG seed.
// Example of different RNG seed that creates less than 2 frames: 1698700588902821588
rng := rand.New(rand.NewSource(123))
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
MaxFrameSize: 1000,
MaxFrameSize: 10000,
ChannelTimeout: 1000,
CompressorConfig: compressor.Config{
TargetNumFrames: 100,
TargetFrameSize: 1000,
TargetNumFrames: 1,
TargetFrameSize: 10000,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
},
&defaultTestRollupConfig,
)
m.Clear()
a := newMiniL2Block(50_000)
b := newMiniL2BlockWithNumberParent(10, big.NewInt(1), a.Hash())
numTx := 20 // Adjust number of txs to make 2 frames
a := derivetest.RandomL2BlockWithChainId(rng, numTx, defaultTestRollupConfig.L2ChainID)
b := derivetest.RandomL2BlockWithChainId(rng, 10, defaultTestRollupConfig.L2ChainID)
bHeader := b.Header()
bHeader.Number = new(big.Int).Add(a.Number(), big.NewInt(1))
bHeader.ParentHash = a.Hash()
b = b.WithSeal(bHeader)
err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
......@@ -306,11 +370,12 @@ func TestChannelManagerClosePendingChannel(t *testing.T) {
require.ErrorIs(err, io.EOF, "Expected closed channel manager to produce no more tx data")
}
// TestChannelManagerCloseAllTxsFailed ensures that the channel manager
// ChannelManagerCloseAllTxsFailed ensures that the channel manager
// can gracefully close after producing transaction frames if none of these
// have successfully landed on chain.
func TestChannelManagerCloseAllTxsFailed(t *testing.T) {
func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) {
require := require.New(t)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics,
ChannelConfig{
......@@ -321,9 +386,12 @@ func TestChannelManagerCloseAllTxsFailed(t *testing.T) {
TargetFrameSize: 1000,
ApproxComprRatio: 1.0,
},
})
BatchType: batchType,
}, &defaultTestRollupConfig,
)
m.Clear()
a := newMiniL2Block(50_000)
a := derivetest.RandomL2BlockWithChainId(rng, 50000, defaultTestRollupConfig.L2ChainID)
err := m.AddL2Block(a)
require.NoError(err, "Failed to add L2 block")
......
......@@ -5,6 +5,7 @@ import (
"testing"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
......@@ -20,7 +21,8 @@ func TestChannelTimeout(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{
ChannelTimeout: 100,
})
}, &rollup.Config{})
m.Clear()
// Pending channel is nil so is cannot be timed out
require.Nil(t, m.currentChannel)
......@@ -61,7 +63,8 @@ func TestChannelTimeout(t *testing.T) {
// TestChannelNextTxData checks the nextTxData function.
func TestChannelNextTxData(t *testing.T) {
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{})
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{})
m.Clear()
// Nil pending channel should return EOF
returnedTxData, err := m.nextTxData(nil)
......@@ -109,7 +112,8 @@ func TestChannelTxConfirmed(t *testing.T) {
// channels on confirmation. This would result in [TxConfirmed]
// clearing confirmed transactions, and reseting the pendingChannels map
ChannelTimeout: 10,
})
}, &rollup.Config{})
m.Clear()
// Let's add a valid pending transaction to the channel manager
// So we can demonstrate that TxConfirmed's correctness
......@@ -157,7 +161,8 @@ func TestChannelTxConfirmed(t *testing.T) {
func TestChannelTxFailed(t *testing.T) {
// Create a channel manager
log := testlog.Logger(t, log.LvlCrit)
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{})
m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}, &rollup.Config{})
m.Clear()
// Let's add a valid pending transaction to the channel
// manager so we can demonstrate correctness
......
......@@ -52,6 +52,8 @@ type CLIConfig struct {
Stopped bool
BatchType uint
TxMgrConfig txmgr.CLIConfig
LogConfig oplog.CLIConfig
MetricsConfig opmetrics.CLIConfig
......@@ -93,6 +95,7 @@ func NewConfig(ctx *cli.Context) *CLIConfig {
MaxChannelDuration: ctx.Uint64(flags.MaxChannelDurationFlag.Name),
MaxL1TxSize: ctx.Uint64(flags.MaxL1TxSizeBytesFlag.Name),
Stopped: ctx.Bool(flags.StoppedFlag.Name),
BatchType: ctx.Uint(flags.BatchTypeFlag.Name),
TxMgrConfig: txmgr.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
......
......@@ -74,7 +74,7 @@ type BatchSubmitter struct {
func NewBatchSubmitter(setup DriverSetup) *BatchSubmitter {
return &BatchSubmitter{
DriverSetup: setup,
state: NewChannelManager(setup.Log, setup.Metr, setup.Channel),
state: NewChannelManager(setup.Log, setup.Metr, setup.Channel, setup.RollupCfg),
}
}
......
......@@ -173,6 +173,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
SubSafetyMargin: cfg.SubSafetyMargin,
MaxFrameSize: cfg.MaxL1TxSize - 1, // subtract 1 byte for version
CompressorConfig: cfg.CompressorConfig.Config(),
BatchType: cfg.BatchType,
}
if err := bs.Channel.Check(); err != nil {
return fmt.Errorf("invalid channel configuration: %w", err)
......@@ -289,8 +290,10 @@ func (bs *BatcherService) Stop(ctx context.Context) error {
bs.Log.Info("Stopping batcher")
var result error
if err := bs.driver.StopBatchSubmittingIfRunning(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop batch submitting: %w", err))
if bs.driver != nil {
if err := bs.driver.StopBatchSubmittingIfRunning(ctx); err != nil {
result = errors.Join(result, fmt.Errorf("failed to stop batch submitting: %w", err))
}
}
if bs.rpcServer != nil {
......@@ -327,7 +330,7 @@ func (bs *BatcherService) Stop(ctx context.Context) error {
if result == nil {
bs.stopped.Store(true)
bs.driver.Log.Info("Batch Submitter stopped")
bs.Log.Info("Batch Submitter stopped")
}
return result
}
......
......@@ -76,6 +76,12 @@ var (
Usage: "Initialize the batcher in a stopped state. The batcher can be started using the admin_startBatcher RPC",
EnvVars: prefixEnvVars("STOPPED"),
}
BatchTypeFlag = &cli.UintFlag{
Name: "batch-type",
Usage: "The batch type. 0 for SingularBatch and 1 for SpanBatch.",
Value: 0,
EnvVars: prefixEnvVars("BATCH_TYPE"),
}
// Legacy Flags
SequencerHDPathFlag = txmgr.SequencerHDPathFlag
)
......@@ -94,6 +100,7 @@ var optionalFlags = []cli.Flag{
MaxL1TxSizeBytesFlag,
StoppedFlag,
SequencerHDPathFlag,
BatchTypeFlag,
}
func init() {
......
......@@ -9,7 +9,7 @@ import (
"github.com/ethereum-optimism/optimism/op-bindings/solc"
)
const AlphabetVMStorageLayoutJSON = "{\"storage\":[{\"astId\":1000,\"contract\":\"test/FaultDisputeGame.t.sol:AlphabetVM\",\"label\":\"oracle\",\"offset\":0,\"slot\":\"0\",\"type\":\"t_contract(IPreimageOracle)1001\"}],\"types\":{\"t_contract(IPreimageOracle)1001\":{\"encoding\":\"inplace\",\"label\":\"contract IPreimageOracle\",\"numberOfBytes\":\"20\"}}}"
const AlphabetVMStorageLayoutJSON = "{\"storage\":[{\"astId\":1000,\"contract\":\"test/mocks/AlphabetVM.sol:AlphabetVM\",\"label\":\"oracle\",\"offset\":0,\"slot\":\"0\",\"type\":\"t_contract(IPreimageOracle)1001\"}],\"types\":{\"t_contract(IPreimageOracle)1001\":{\"encoding\":\"inplace\",\"label\":\"contract IPreimageOracle\",\"numberOfBytes\":\"20\"}}}"
var AlphabetVMStorageLayout = new(solc.StorageLayout)
......
This diff is collapsed.
This diff is collapsed.
......@@ -185,6 +185,8 @@ type DeployConfig struct {
EIP1559Elasticity uint64 `json:"eip1559Elasticity"`
// EIP1559Denominator is the denominator of EIP1559 base fee market.
EIP1559Denominator uint64 `json:"eip1559Denominator"`
// EIP1559DenominatorCanyon is the denominator of EIP1559 base fee market when Canyon is active.
EIP1559DenominatorCanyon uint64 `json:"eip1559DenominatorCanyon"`
// SystemConfigStartBlock represents the block at which the op-node should start syncing
// from. It is an override to set this value on legacy networks where it is not set by
// default. It can be removed once all networks have this value set in their storage.
......@@ -318,6 +320,9 @@ func (d *DeployConfig) Check() error {
if d.EIP1559Denominator == 0 {
return fmt.Errorf("%w: EIP1559Denominator cannot be 0", ErrInvalidDeployConfig)
}
if d.L2GenesisCanyonTimeOffset != nil && d.EIP1559DenominatorCanyon == 0 {
return fmt.Errorf("%w: EIP1559DenominatorCanyon cannot be 0 if Canyon is activated", ErrInvalidDeployConfig)
}
if d.EIP1559Elasticity == 0 {
return fmt.Errorf("%w: EIP1559Elasticity cannot be 0", ErrInvalidDeployConfig)
}
......
......@@ -31,6 +31,10 @@ func NewL2Genesis(config *DeployConfig, block *types.Block) (*core.Genesis, erro
if eip1559Denom == 0 {
eip1559Denom = 50
}
eip1559DenomCanyon := config.EIP1559DenominatorCanyon
if eip1559DenomCanyon == 0 {
eip1559DenomCanyon = 250
}
eip1559Elasticity := config.EIP1559Elasticity
if eip1559Elasticity == 0 {
eip1559Elasticity = 10
......@@ -61,8 +65,9 @@ func NewL2Genesis(config *DeployConfig, block *types.Block) (*core.Genesis, erro
CanyonTime: config.CanyonTime(block.Time()),
ShanghaiTime: config.CanyonTime(block.Time()),
Optimism: &params.OptimismConfig{
EIP1559Denominator: eip1559Denom,
EIP1559Elasticity: eip1559Elasticity,
EIP1559Denominator: eip1559Denom,
EIP1559Elasticity: eip1559Elasticity,
EIP1559DenominatorCanyon: eip1559DenomCanyon,
},
}
......
......@@ -62,6 +62,7 @@
"governanceTokenOwner": "0x0000000000000000000000000000000000000333",
"deploymentWaitConfirmations": 1,
"eip1559Denominator": 8,
"eip1559DenominatorCanyon": 12,
"eip1559Elasticity": 2,
"fundDevAccounts": true,
"faultGameAbsolutePrestate": "0x0000000000000000000000000000000000000000000000000000000000000000",
......
ARG OP_STACK_GO_BUILDER=us-docker.pkg.dev/oplabs-tools-artifacts/images/op_stack_go:latest
ARG OP_STACK_GO_BUILDER=us-docker.pkg.dev/oplabs-tools-artifacts/images/op-stack-go:latest
FROM $OP_STACK_GO_BUILDER as builder
# See "make golang-docker" and /ops/docker/op-stack-go
......
......@@ -7,7 +7,6 @@ import (
"time"
"github.com/ethereum-optimism/optimism/op-challenger/config"
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
......@@ -17,8 +16,8 @@ import (
var (
l1EthRpc = "http://example.com:8545"
gameFactoryAddressValue = "0xbb00000000000000000000000000000000000000"
cannonNetwork = chaincfg.AvailableNetworks()[0]
otherCannonNetwork = chaincfg.AvailableNetworks()[1]
cannonNetwork = "op-mainnet"
otherCannonNetwork = "op-goerli"
cannonBin = "./bin/cannon"
cannonServer = "./bin/op-program"
cannonPreState = "./pre.json"
......
......@@ -141,6 +141,7 @@ func (m *gameMonitor) MonitorGames(ctx context.Context) error {
for {
select {
case <-ctx.Done():
m.l1HeadsSub.Unsubscribe()
return nil
case err, ok := <-m.l1HeadsSub.Err():
if !ok {
......
......@@ -100,7 +100,8 @@ func TestMonitorGames(t *testing.T) {
defer cancel()
go func() {
waitErr := wait.For(context.Background(), 100*time.Millisecond, func() (bool, error) {
// Wait for the subscription to be created
waitErr := wait.For(context.Background(), 5*time.Second, func() (bool, error) {
return mockHeadSource.sub != nil, nil
})
require.NoError(t, waitErr)
......
......@@ -61,8 +61,11 @@ type ChannelOutIface interface {
OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, error)
}
// Compile-time check for ChannelOutIface interface implementation for the ChannelOut type.
var _ ChannelOutIface = (*derive.ChannelOut)(nil)
// Compile-time check for ChannelOutIface interface implementation for the SingularChannelOut type.
var _ ChannelOutIface = (*derive.SingularChannelOut)(nil)
// Compile-time check for ChannelOutIface interface implementation for the SpanChannelOut type.
var _ ChannelOutIface = (*derive.SpanChannelOut)(nil)
// Compile-time check for ChannelOutIface interface implementation for the GarbageChannelOut type.
var _ ChannelOutIface = (*GarbageChannelOut)(nil)
......@@ -252,13 +255,13 @@ func blockToBatch(block *types.Block) (*derive.BatchData, error) {
return nil, fmt.Errorf("could not parse the L1 Info deposit: %w", err)
}
return &derive.BatchData{
SingularBatch: derive.SingularBatch{
ParentHash: block.ParentHash(),
EpochNum: rollup.Epoch(l1Info.Number),
EpochHash: l1Info.BlockHash,
Timestamp: block.Time(),
Transactions: opaqueTxs,
},
}, nil
singularBatch := &derive.SingularBatch{
ParentHash: block.ParentHash(),
EpochNum: rollup.Epoch(l1Info.Number),
EpochHash: l1Info.BlockHash,
Timestamp: block.Time(),
Transactions: opaqueTxs,
}
return derive.NewBatchData(singularBatch), nil
}
......@@ -67,7 +67,7 @@ func (s *L1Miner) ActL1StartBlock(timeDelta uint64) Action {
MixDigest: common.Hash{}, // TODO: maybe randomize this (prev-randao value)
}
if s.l1Cfg.Config.IsLondon(header.Number) {
header.BaseFee = eip1559.CalcBaseFee(s.l1Cfg.Config, parent)
header.BaseFee = eip1559.CalcBaseFee(s.l1Cfg.Config, parent, header.Time)
// At the transition, double the gas limit so the gas target is equal to the old gas limit.
if !s.l1Cfg.Config.IsLondon(parent.Number) {
header.GasLimit = parent.GasLimit * s.l1Cfg.Config.ElasticityMultiplier()
......@@ -95,14 +95,12 @@ func (s *L1Miner) ActL1IncludeTx(from common.Address) Action {
t.InvalidAction("no tx inclusion when not building l1 block")
return
}
i := s.pendingIndices[from]
txs, q := s.eth.TxPool().ContentFrom(from)
if uint64(len(txs)) <= i {
t.Fatalf("no pending txs from %s, and have %d unprocessable queued txs from this account", from, len(q))
getPendingIndex := func(from common.Address) uint64 {
return s.pendingIndices[from]
}
tx := txs[i]
tx := firstValidTx(t, from, getPendingIndex, s.eth.TxPool().ContentFrom, s.EthClient().NonceAt)
s.IncludeTx(t, tx)
s.pendingIndices[from] = i + 1 // won't retry the tx
s.pendingIndices[from] = s.pendingIndices[from] + 1 // won't retry the tx
}
}
......
......@@ -140,7 +140,7 @@ func (s *L2Batcher) Buffer(t Testing) error {
ApproxComprRatio: 1,
})
require.NoError(t, e, "failed to create compressor")
ch, err = derive.NewChannelOut(c)
ch, err = derive.NewChannelOut(derive.SingularBatchType, c, nil)
}
require.NoError(t, err, "failed to create channel")
s.l2ChannelOut = ch
......
package actions
import (
"context"
"errors"
"time"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/wait"
"github.com/ethereum-optimism/optimism/op-program/client/l2/engineapi"
"github.com/stretchr/testify/require"
......@@ -179,22 +176,8 @@ func (e *L2Engine) ActL2IncludeTx(from common.Address) Action {
return
}
var i uint64
var txs []*types.Transaction
var q []*types.Transaction
// Wait for the tx to be in the pending tx queue
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err := wait.For(ctx, time.Second, func() (bool, error) {
i = e.engineApi.PendingIndices(from)
txs, q = e.eth.TxPool().ContentFrom(from)
return uint64(len(txs)) > i, nil
})
require.NoError(t, err,
"no pending txs from %s, and have %d unprocessable queued txs from this account: %w", from, len(q), err)
tx := txs[i]
err = e.engineApi.IncludeTx(tx, from)
tx := firstValidTx(t, from, e.engineApi.PendingIndices, e.eth.TxPool().ContentFrom, e.EthClient().NonceAt)
err := e.engineApi.IncludeTx(tx, from)
if errors.Is(err, engineapi.ErrNotBuildingBlock) {
t.InvalidAction(err.Error())
} else if errors.Is(err, engineapi.ErrUsesTooMuchGas) {
......
package actions
import (
"context"
"math/big"
"time"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/wait"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"
)
// firstValidTx finds the first transaction that is valid for inclusion from the specified address.
// It uses a waiter and filtering of already included transactions to avoid race conditions with the async
// updates to the transaction pool.
func firstValidTx(
t Testing,
from common.Address,
pendingIndices func(common.Address) uint64,
contentFrom func(common.Address) ([]*types.Transaction, []*types.Transaction),
nonceAt func(context.Context, common.Address, *big.Int) (uint64, error),
) *types.Transaction {
var i uint64
var txs []*types.Transaction
var q []*types.Transaction
// Wait for the tx to be in the pending tx queue
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err := wait.For(ctx, time.Second, func() (bool, error) {
i = pendingIndices(from)
txs, q = contentFrom(from)
// Remove any transactions that have already been included in the head block
// The tx pool only prunes included transactions async so they may still be in the list
nonce, err := nonceAt(ctx, from, nil)
if err != nil {
return false, err
}
for len(txs) > 0 && txs[0].Nonce() < nonce {
t.Logf("Removing already included transaction from list of length %v", len(txs))
txs = txs[1:]
}
return uint64(len(txs)) > i, nil
})
require.NoError(t, err,
"no pending txs from %s, and have %d unprocessable queued txs from this account: %w", from, len(q), err)
return txs[i]
}
......@@ -3,6 +3,7 @@ package disputegame
import (
"context"
"errors"
"time"
"github.com/ethereum-optimism/optimism/op-challenger/game/fault/types"
"github.com/ethereum/go-ethereum/common"
......@@ -96,7 +97,9 @@ func (d *DishonestHelper) ExhaustDishonestClaims(ctx context.Context) {
var numClaimsSeen int64
for {
newCount, err := d.WaitForNewClaim(ctx, numClaimsSeen)
// Use a short timeout since we don't know the challenger will respond,
// and this is only designed for the alphabet game where the response should be fast.
newCount, err := d.waitForNewClaim(ctx, numClaimsSeen, 30*time.Second)
if errors.Is(err, context.DeadlineExceeded) {
// we assume that the honest challenger has stopped responding
// There's nothing to respond to.
......
......@@ -18,6 +18,8 @@ import (
"github.com/stretchr/testify/require"
)
const defaultTimeout = 5 * time.Minute
type FaultGameHelper struct {
t *testing.T
require *require.Assertions
......@@ -42,7 +44,7 @@ func (g *FaultGameHelper) GameDuration(ctx context.Context) time.Duration {
// This does not check that the number of claims is exactly the specified count to avoid intermittent failures
// where a challenger posts an additional claim before this method sees the number of claims it was waiting for.
func (g *FaultGameHelper) WaitForClaimCount(ctx context.Context, count int64) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
err := wait.For(ctx, time.Second, func() (bool, error) {
actual, err := g.game.ClaimDataLen(&bind.CallOpts{Context: ctx})
......@@ -70,7 +72,7 @@ func (g *FaultGameHelper) MaxDepth(ctx context.Context) int64 {
}
func (g *FaultGameHelper) waitForClaim(ctx context.Context, errorMsg string, predicate func(claim ContractClaim) bool) {
timedCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
timedCtx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
err := wait.For(timedCtx, time.Second, func() (bool, error) {
count, err := g.game.ClaimDataLen(&bind.CallOpts{Context: timedCtx})
......@@ -95,7 +97,7 @@ func (g *FaultGameHelper) waitForClaim(ctx context.Context, errorMsg string, pre
}
func (g *FaultGameHelper) waitForNoClaim(ctx context.Context, errorMsg string, predicate func(claim ContractClaim) bool) {
timedCtx, cancel := context.WithTimeout(ctx, 3*time.Minute)
timedCtx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
err := wait.For(timedCtx, time.Second, func() (bool, error) {
count, err := g.game.ClaimDataLen(&bind.CallOpts{Context: timedCtx})
......@@ -193,7 +195,7 @@ func (g *FaultGameHelper) Status(ctx context.Context) Status {
func (g *FaultGameHelper) WaitForGameStatus(ctx context.Context, expected Status) {
g.t.Logf("Waiting for game %v to have status %v", g.addr, expected)
timedCtx, cancel := context.WithTimeout(ctx, time.Minute)
timedCtx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
err := wait.For(timedCtx, time.Second, func() (bool, error) {
ctx, cancel := context.WithTimeout(timedCtx, 30*time.Second)
......@@ -302,7 +304,10 @@ func (g *FaultGameHelper) ChallengeRootClaim(ctx context.Context, performMove Mo
}
func (g *FaultGameHelper) WaitForNewClaim(ctx context.Context, checkPoint int64) (int64, error) {
timedCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
return g.waitForNewClaim(ctx, checkPoint, defaultTimeout)
}
func (g *FaultGameHelper) waitForNewClaim(ctx context.Context, checkPoint int64, timeout time.Duration) (int64, error) {
timedCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
var newClaimLen int64
err := wait.For(timedCtx, time.Second, func() (bool, error) {
......
......@@ -2,6 +2,7 @@ package op_e2e
import (
"encoding/json"
"errors"
"math/big"
"os"
"os/exec"
......@@ -51,6 +52,11 @@ func (eec *ExternalEthClient) Close() error {
select {
case <-time.After(5 * time.Second):
eec.Session.Kill()
select {
case <-time.After(30 * time.Second):
return errors.New("external client failed to terminate")
case <-eec.Session.Exited:
}
case <-eec.Session.Exited:
}
return nil
......
......@@ -18,7 +18,7 @@ import (
)
func TestMultipleCannonGames(t *testing.T) {
InitParallel(t)
InitParallel(t, UsesCannon, UseExecutor(0))
ctx := context.Background()
sys, l1Client := startFaultDisputeSystem(t)
......@@ -78,7 +78,7 @@ func TestMultipleCannonGames(t *testing.T) {
}
func TestMultipleGameTypes(t *testing.T) {
InitParallel(t)
InitParallel(t, UsesCannon, UseExecutor(0))
ctx := context.Background()
sys, l1Client := startFaultDisputeSystem(t)
......@@ -113,7 +113,7 @@ func TestMultipleGameTypes(t *testing.T) {
}
func TestChallengerCompleteDisputeGame(t *testing.T) {
InitParallel(t)
InitParallel(t, UseExecutor(1))
tests := []struct {
name string
......@@ -182,7 +182,7 @@ func TestChallengerCompleteDisputeGame(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
InitParallel(t)
InitParallel(t, UseExecutor(1))
ctx := context.Background()
sys, l1Client := startFaultDisputeSystem(t)
......@@ -219,7 +219,7 @@ func TestChallengerCompleteDisputeGame(t *testing.T) {
}
func TestChallengerCompleteExhaustiveDisputeGame(t *testing.T) {
InitParallel(t)
InitParallel(t, UseExecutor(1))
testCase := func(t *testing.T, isRootCorrect bool) {
ctx := context.Background()
......@@ -267,17 +267,17 @@ func TestChallengerCompleteExhaustiveDisputeGame(t *testing.T) {
}
t.Run("RootCorrect", func(t *testing.T) {
InitParallel(t)
InitParallel(t, UseExecutor(1))
testCase(t, true)
})
t.Run("RootIncorrect", func(t *testing.T) {
InitParallel(t)
InitParallel(t, UseExecutor(1))
testCase(t, false)
})
}
func TestCannonDisputeGame(t *testing.T) {
InitParallel(t)
InitParallel(t, UsesCannon, UseExecutor(1))
tests := []struct {
name string
......@@ -290,7 +290,7 @@ func TestCannonDisputeGame(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
InitParallel(t)
InitParallel(t, UseExecutor(1))
ctx := context.Background()
sys, l1Client := startFaultDisputeSystem(t)
......@@ -328,7 +328,7 @@ func TestCannonDisputeGame(t *testing.T) {
}
func TestCannonDefendStep(t *testing.T) {
InitParallel(t)
InitParallel(t, UsesCannon, UseExecutor(1))
ctx := context.Background()
sys, l1Client := startFaultDisputeSystem(t)
......@@ -370,7 +370,7 @@ func TestCannonDefendStep(t *testing.T) {
}
func TestCannonProposedOutputRootInvalid(t *testing.T) {
InitParallel(t)
InitParallel(t, UsesCannon, UseExecutor(0))
// honestStepsFail attempts to perform both an attack and defend step using the correct trace.
honestStepsFail := func(ctx context.Context, game *disputegame.CannonGameHelper, correctTrace *disputegame.HonestHelper, parentClaimIdx int64) {
// Attack step should fail
......@@ -421,7 +421,7 @@ func TestCannonProposedOutputRootInvalid(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
InitParallel(t)
InitParallel(t, UseExecutor(0))
ctx := context.Background()
sys, l1Client, game, correctTrace := setupDisputeGameForInvalidOutputRoot(t, test.outputRoot)
......@@ -448,7 +448,7 @@ func TestCannonProposedOutputRootInvalid(t *testing.T) {
}
func TestCannonPoisonedPostState(t *testing.T) {
InitParallel(t)
InitParallel(t, UsesCannon, UseExecutor(0))
ctx := context.Background()
sys, l1Client := startFaultDisputeSystem(t)
......@@ -558,8 +558,7 @@ func setupDisputeGameForInvalidOutputRoot(t *testing.T, outputRoot common.Hash)
}
func TestCannonChallengeWithCorrectRoot(t *testing.T) {
InitParallel(t)
InitParallel(t, UsesCannon, UseExecutor(0))
ctx := context.Background()
sys, l1Client := startFaultDisputeSystem(t)
t.Cleanup(sys.Close)
......
......@@ -2,14 +2,70 @@ package op_e2e
import (
"os"
"strconv"
"testing"
)
var enableParallelTesting bool = os.Getenv("OP_E2E_DISABLE_PARALLEL") != "true"
func InitParallel(t *testing.T) {
type testopts struct {
executor uint64
}
func InitParallel(t *testing.T, args ...func(t *testing.T, opts *testopts)) {
t.Helper()
if enableParallelTesting {
t.Parallel()
}
opts := &testopts{}
for _, arg := range args {
arg(t, opts)
}
checkExecutor(t, opts.executor)
}
func UsesCannon(t *testing.T, opts *testopts) {
if os.Getenv("OP_E2E_CANNON_ENABLED") == "false" {
t.Skip("Skipping cannon test")
}
}
// UseExecutor allows manually splitting tests between circleci executors
//
// Tests default to run on the first executor but can be moved to the second with:
// InitParallel(t, UseExecutor(1))
// Any tests assigned to an executor greater than the number available automatically use the last executor.
// Executor indexes start from 0
func UseExecutor(assignedIdx uint64) func(t *testing.T, opts *testopts) {
return func(t *testing.T, opts *testopts) {
opts.executor = assignedIdx
}
}
func checkExecutor(t *testing.T, assignedIdx uint64) {
envTotal := os.Getenv("CIRCLE_NODE_TOTAL")
envIdx := os.Getenv("CIRCLE_NODE_INDEX")
if envTotal == "" || envIdx == "" {
// Not using test splitting, so ignore assigned executor
t.Logf("Running test. Test splitting not in use.")
return
}
total, err := strconv.ParseUint(envTotal, 10, 0)
if err != nil {
t.Fatalf("Could not parse CIRCLE_NODE_TOTAL env var %v: %v", envTotal, err)
}
idx, err := strconv.ParseUint(envIdx, 10, 0)
if err != nil {
t.Fatalf("Could not parse CIRCLE_NODE_INDEX env var %v: %v", envIdx, err)
}
if assignedIdx >= total && idx == total-1 {
t.Logf("Running test. Current executor (%v) is the last executor and assigned executor (%v) >= total executors (%v).", idx, assignedIdx, total)
return
}
if idx == assignedIdx {
t.Logf("Running test. Assigned executor (%v) matches current executor (%v) of total (%v)", assignedIdx, idx, total)
return
}
t.Skipf("Skipping test. Assigned executor %v, current executor %v of total %v", assignedIdx, idx, total)
}
......@@ -102,7 +102,7 @@ func NewOpGeth(t *testing.T, ctx context.Context, cfg *SystemConfig) (*OpGeth, e
)
require.Nil(t, err)
l2Client, err := ethclient.Dial(node.HTTPEndpoint())
l2Client, err := ethclient.Dial(selectEndpoint(node))
require.Nil(t, err)
genesisPayload, err := eth.BlockAsPayload(l2GenesisBlock, cfg.DeployConfig.CanyonTime(l2GenesisBlock.Time()))
......
......@@ -49,6 +49,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/p2p"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/driver"
proposermetrics "github.com/ethereum-optimism/optimism/op-proposer/metrics"
l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer"
......@@ -202,6 +203,9 @@ type SystemConfig struct {
// Target L1 tx size for the batcher transactions
BatcherTargetL1TxSizeBytes uint64
// Max L1 tx size for the batcher transactions
BatcherMaxL1TxSizeBytes uint64
// SupportL1TimeTravel determines if the L1 node supports quickly skipping forward in time
SupportL1TimeTravel bool
}
......@@ -679,13 +683,21 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
return nil, fmt.Errorf("unable to start l2 output submitter: %w", err)
}
batchType := derive.SingularBatchType
if os.Getenv("OP_E2E_USE_SPAN_BATCH") == "true" {
batchType = derive.SpanBatchType
}
batcherMaxL1TxSizeBytes := cfg.BatcherMaxL1TxSizeBytes
if batcherMaxL1TxSizeBytes == 0 {
batcherMaxL1TxSizeBytes = 240_000
}
batcherCLIConfig := &bss.CLIConfig{
L1EthRpc: sys.EthInstances["l1"].WSEndpoint(),
L2EthRpc: sys.EthInstances["sequencer"].WSEndpoint(),
RollupRpc: sys.RollupNodes["sequencer"].HTTPEndpoint(),
MaxPendingTransactions: 0,
MaxChannelDuration: 1,
MaxL1TxSize: 240_000,
MaxL1TxSize: batcherMaxL1TxSizeBytes,
CompressorConfig: compressor.CLIConfig{
TargetL1TxSizeBytes: cfg.BatcherTargetL1TxSizeBytes,
TargetNumFrames: 1,
......@@ -698,7 +710,8 @@ func (cfg SystemConfig) Start(t *testing.T, _opts ...SystemConfigOption) (*Syste
Level: log.LvlInfo,
Format: oplog.FormatText,
},
Stopped: sys.cfg.DisableBatcher, // Batch submitter may be enabled later
Stopped: sys.cfg.DisableBatcher, // Batch submitter may be enabled later
BatchType: uint(batchType),
}
// Batch Submitter
batcher, err := bss.BatcherServiceFromCLIConfig(context.Background(), "0.0.1", batcherCLIConfig, sys.cfg.Loggers["batcher"])
......@@ -758,9 +771,12 @@ func (sys *System) newMockNetPeer() (host.Host, error) {
return sys.Mocknet.AddPeerWithPeerstore(p, eps)
}
func UseHTTP() bool {
return os.Getenv("OP_E2E_USE_HTTP") == "true"
}
func selectEndpoint(node EthInstance) string {
useHTTP := os.Getenv("OP_E2E_USE_HTTP") == "true"
if useHTTP {
if UseHTTP() {
log.Info("using HTTP client")
return node.HTTPEndpoint()
}
......@@ -785,9 +801,8 @@ type WSOrHTTPEndpoint interface {
}
func configureL2(rollupNodeCfg *rollupNode.Config, l2Node WSOrHTTPEndpoint, jwtSecret [32]byte) {
useHTTP := os.Getenv("OP_E2E_USE_HTTP") == "true"
l2EndpointConfig := l2Node.WSAuthEndpoint()
if useHTTP {
if UseHTTP() {
l2EndpointConfig = l2Node.HTTPAuthEndpoint()
}
......
......@@ -1553,3 +1553,14 @@ func TestRequiredProtocolVersionChangeAndHalt(t *testing.T) {
require.NoError(t, err)
t.Log("verified that op-geth closed!")
}
func TestIncorrectBatcherConfiguration(t *testing.T) {
InitParallel(t)
cfg := DefaultSystemConfig(t)
// make the batcher configuration invalid
cfg.BatcherMaxL1TxSizeBytes = 1
_, err := cfg.Start(t)
require.Error(t, err, "Expected error on invalid batcher configuration")
}
ARG OP_STACK_GO_BUILDER=us-docker.pkg.dev/oplabs-tools-artifacts/images/op_stack_go:latest
ARG OP_STACK_GO_BUILDER=us-docker.pkg.dev/oplabs-tools-artifacts/images/op-stack-go:latest
FROM $OP_STACK_GO_BUILDER as builder
# See "make golang-docker" and /ops/docker/op-stack-go
......
ARG OP_STACK_GO_BUILDER=us-docker.pkg.dev/oplabs-tools-artifacts/images/op_stack_go:latest
ARG OP_STACK_GO_BUILDER=us-docker.pkg.dev/oplabs-tools-artifacts/images/op-stack-go:latest
FROM $OP_STACK_GO_BUILDER as builder
# See "make golang-docker" and /ops/docker/op-stack-go
......
......@@ -33,14 +33,7 @@ var L2ChainIDToNetworkDisplayName = func() map[string]string {
}()
// AvailableNetworks returns the selection of network configurations that is available by default.
// Other configurations that are part of the superchain-registry can be used with the --beta.network flag.
func AvailableNetworks() []string {
return []string{"op-mainnet", "op-goerli", "op-sepolia"}
}
// BetaAvailableNetworks returns all available network configurations in the superchain-registry.
// This set of configurations is experimental, and may change at any time.
func BetaAvailableNetworks() []string {
var networks []string
for _, cfg := range superchain.OPChains {
networks = append(networks, cfg.Chain+"-"+cfg.Superchain)
......@@ -48,20 +41,6 @@ func BetaAvailableNetworks() []string {
return networks
}
func IsAvailableNetwork(name string, beta bool) bool {
name = handleLegacyName(name)
available := AvailableNetworks()
if beta {
available = BetaAvailableNetworks()
}
for _, v := range available {
if v == name {
return true
}
}
return false
}
func handleLegacyName(name string) string {
switch name {
case "goerli":
......@@ -91,7 +70,7 @@ func ChainByName(name string) *superchain.ChainConfig {
func GetRollupConfig(name string) (*rollup.Config, error) {
chainCfg := ChainByName(name)
if chainCfg == nil {
return nil, fmt.Errorf("invalid network %s", name)
return nil, fmt.Errorf("invalid network: %q", name)
}
rollupCfg, err := rollup.LoadOPStackRollupConfig(chainCfg.ChainID)
if err != nil {
......
......@@ -27,8 +27,6 @@ func TestGetRollupConfig(t *testing.T) {
}
for name, expectedCfg := range configsByName {
require.True(t, IsAvailableNetwork(name, false))
gotCfg, err := GetRollupConfig(name)
require.NoError(t, err)
......@@ -95,6 +93,7 @@ var goerliCfg = rollup.Config{
DepositContractAddress: common.HexToAddress("0x5b47E1A08Ea6d985D6649300584e6722Ec4B1383"),
L1SystemConfigAddress: common.HexToAddress("0xAe851f927Ee40dE99aaBb7461C00f9622ab91d60"),
RegolithTime: u64Ptr(1679079600),
CanyonTime: u64Ptr(1699981200),
ProtocolVersionsAddress: common.HexToAddress("0x0C24F5098774aA366827D667494e9F889f7cFc08"),
}
......@@ -126,6 +125,7 @@ var sepoliaCfg = rollup.Config{
DepositContractAddress: common.HexToAddress("0x16fc5058f25648194471939df75cf27a2fdc48bc"),
L1SystemConfigAddress: common.HexToAddress("0x034edd2a225f7f429a63e0f1d2084b9e0a93b538"),
RegolithTime: u64Ptr(0),
CanyonTime: u64Ptr(1699981200),
ProtocolVersionsAddress: common.HexToAddress("0x79ADD5713B383DAa0a138d3C4780C7A1804a8090"),
}
......
......@@ -19,12 +19,12 @@ import (
)
type ChannelWithMetadata struct {
ID derive.ChannelID `json:"id"`
IsReady bool `json:"is_ready"`
InvalidFrames bool `json:"invalid_frames"`
InvalidBatches bool `json:"invalid_batches"`
Frames []FrameWithMetadata `json:"frames"`
Batches []derive.SingularBatch `json:"batches"`
ID derive.ChannelID `json:"id"`
IsReady bool `json:"is_ready"`
InvalidFrames bool `json:"invalid_frames"`
InvalidBatches bool `json:"invalid_batches"`
Frames []FrameWithMetadata `json:"frames"`
Batches []derive.BatchData `json:"batches"`
}
type FrameWithMetadata struct {
......@@ -104,17 +104,17 @@ func processFrames(cfg *rollup.Config, id derive.ChannelID, frames []FrameWithMe
}
}
var batches []derive.SingularBatch
var batches []derive.BatchData
invalidBatches := false
if ch.IsReady() {
br, err := derive.BatchReader(cfg, ch.Reader(), eth.L1BlockRef{})
br, err := derive.BatchReader(ch.Reader())
if err == nil {
for batch, err := br(); err != io.EOF; batch, err = br() {
if err != nil {
fmt.Printf("Error reading batch for channel %v. Err: %v\n", id.String(), err)
invalidBatches = true
} else {
batches = append(batches, batch.Batch.SingularBatch)
batches = append(batches, *batch)
}
}
} else {
......
......@@ -82,6 +82,13 @@ var (
return &out
}(),
}
L1RethDBPath = &cli.StringFlag{
Name: "l1.rethdb",
Usage: "The L1 RethDB path, used to fetch receipts for L1 blocks. Only applicable when using the `reth_db` RPC kind with `l1.rpckind`.",
EnvVars: prefixEnvVars("L1_RETHDB"),
Required: false,
Hidden: true,
}
L1RPCRateLimit = &cli.Float64Flag{
Name: "l1.rpc-rate-limit",
Usage: "Optional self-imposed global rate-limit on L1 RPC requests, specified in requests / second. Disabled if set to 0.",
......@@ -237,11 +244,10 @@ var (
Value: false,
}
BetaExtraNetworks = &cli.BoolFlag{
Name: "beta.extra-networks",
Usage: fmt.Sprintf("Beta feature: enable selection of a predefined-network from the superchain-registry. "+
"The superchain-registry is experimental, and the availability of configurations may change."+
"Available networks: %s", strings.Join(chaincfg.BetaAvailableNetworks(), ", ")),
Name: "beta.extra-networks",
Usage: "Legacy flag, ignored, all superchain-registry networks are enabled by default.",
EnvVars: prefixEnvVars("BETA_EXTRA_NETWORKS"),
Hidden: true, // hidden, this is deprecated, the flag is not used anymore.
}
RollupHalt = &cli.StringFlag{
Name: "rollup.halt",
......@@ -254,9 +260,10 @@ var (
EnvVars: prefixEnvVars("ROLLUP_LOAD_PROTOCOL_VERSIONS"),
}
CanyonOverrideFlag = &cli.Uint64Flag{
Name: "override.canyon",
Usage: "Manually specify the Canyon fork timestamp, overriding the bundled setting",
Hidden: true,
Name: "override.canyon",
Usage: "Manually specify the Canyon fork timestamp, overriding the bundled setting",
EnvVars: prefixEnvVars("OVERRIDE_CANYON"),
Hidden: false,
}
)
......@@ -303,6 +310,7 @@ var optionalFlags = []cli.Flag{
RollupHalt,
RollupLoadProtocolVersions,
CanyonOverrideFlag,
L1RethDBPath,
}
// Flags contains the list of configuration options available to the binary.
......
......@@ -60,6 +60,9 @@ type Config struct {
// Cancel to request a premature shutdown of the node itself, e.g. when halting. This may be nil.
Cancel context.CancelCauseFunc
// [OPTIONAL] The reth DB path to read receipts from
RethDBPath string
}
type RPCConfig struct {
......
......@@ -156,6 +156,9 @@ func (n *OpNode) initL1(ctx context.Context, cfg *Config) error {
return fmt.Errorf("failed to get L1 RPC client: %w", err)
}
// Set the RethDB path in the EthClientConfig, if there is one configured.
rpcCfg.EthClientConfig.RethDBPath = cfg.RethDBPath
n.l1Source, err = sources.NewL1Client(
client.NewInstrumentedRPC(l1Node, n.metrics), n.log, n.metrics.L1SourceCache, rpcCfg)
if err != nil {
......
......@@ -2,6 +2,7 @@ package store
import (
"context"
"sync/atomic"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
......@@ -17,17 +18,18 @@ const (
var scoresBase = ds.NewKey("/peers/scores")
// LastUpdate requires atomic update operations. Use the helper functions SetLastUpdated and LastUpdated to modify and access this field.
type scoreRecord struct {
PeerScores PeerScores `json:"peerScores"`
LastUpdate int64 `json:"lastUpdate"` // unix timestamp in seconds
PeerScores PeerScores `json:"peerScores"`
}
func (s *scoreRecord) SetLastUpdated(t time.Time) {
s.LastUpdate = t.Unix()
atomic.StoreInt64(&s.LastUpdate, t.Unix())
}
func (s *scoreRecord) LastUpdated() time.Time {
return time.Unix(s.LastUpdate, 0)
return time.Unix(atomic.LoadInt64(&s.LastUpdate), 0)
}
func (s *scoreRecord) MarshalBinary() (data []byte, err error) {
......
......@@ -32,7 +32,7 @@ type AttributesQueue struct {
config *rollup.Config
builder AttributesBuilder
prev *BatchQueue
batch *BatchData
batch *SingularBatch
}
func NewAttributesQueue(log log.Logger, cfg *rollup.Config, builder AttributesBuilder, prev *BatchQueue) *AttributesQueue {
......@@ -71,7 +71,7 @@ func (aq *AttributesQueue) NextAttributes(ctx context.Context, l2SafeHead eth.L2
// createNextAttributes transforms a batch into a payload attributes. This sets `NoTxPool` and appends the batched transactions
// to the attributes transaction list
func (aq *AttributesQueue) createNextAttributes(ctx context.Context, batch *BatchData, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
func (aq *AttributesQueue) createNextAttributes(ctx context.Context, batch *SingularBatch, l2SafeHead eth.L2BlockRef) (*eth.PayloadAttributes, error) {
// sanity check parent hash
if batch.ParentHash != l2SafeHead.Hash {
return nil, NewResetError(fmt.Errorf("valid batch has bad parent hash %s, expected %s", batch.ParentHash, l2SafeHead.Hash))
......
......@@ -42,13 +42,13 @@ func TestAttributesQueue(t *testing.T) {
safeHead.L1Origin = l1Info.ID()
safeHead.Time = l1Info.InfoTime
batch := NewSingularBatchData(SingularBatch{
batch := SingularBatch{
ParentHash: safeHead.Hash,
EpochNum: rollup.Epoch(l1Info.InfoNum),
EpochHash: l1Info.InfoHash,
Timestamp: safeHead.Time + cfg.BlockTime,
Transactions: []eth.Data{eth.Data("foobar"), eth.Data("example")},
})
}
parentL1Cfg := eth.SystemConfig{
BatcherAddr: common.Address{42},
......@@ -80,7 +80,7 @@ func TestAttributesQueue(t *testing.T) {
aq := NewAttributesQueue(testlog.Logger(t, log.LvlError), cfg, attrBuilder, nil)
actual, err := aq.createNextAttributes(context.Background(), batch, safeHead)
actual, err := aq.createNextAttributes(context.Background(), &batch, safeHead)
require.NoError(t, err)
require.Equal(t, attrs, *actual)
......
......@@ -25,9 +25,9 @@ var encodeBufferPool = sync.Pool{
const (
// SingularBatchType is the first version of Batch format, representing a single L2 block.
SingularBatchType = iota
SingularBatchType = 0
// SpanBatchType is the Batch version used after SpanBatch hard fork, representing a span of L2 blocks.
SpanBatchType
SpanBatchType = 1
)
// Batch contains information to build one or multiple L2 blocks.
......@@ -39,12 +39,20 @@ type Batch interface {
LogContext(log.Logger) log.Logger
}
// BatchData is a composition type that contains raw data of each batch version.
// It has encoding & decoding methods to implement typed encoding.
// BatchData is used to represent the typed encoding & decoding.
// and wraps around a single interface InnerBatchData.
// Further fields such as cache can be added in the future, without embedding each type of InnerBatchData.
// Similar design with op-geth's types.Transaction struct.
type BatchData struct {
BatchType int
SingularBatch
RawSpanBatch
inner InnerBatchData
}
// InnerBatchData is the underlying data of a BatchData.
// This is implemented by SingularBatch and RawSpanBatch.
type InnerBatchData interface {
GetBatchType() int
encode(w io.Writer) error
decode(r *bytes.Reader) error
}
// EncodeRLP implements rlp.Encoder
......@@ -58,6 +66,10 @@ func (b *BatchData) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, buf.Bytes())
}
func (bd *BatchData) GetBatchType() uint8 {
return uint8(bd.inner.GetBatchType())
}
// MarshalBinary returns the canonical encoding of the batch.
func (b *BatchData) MarshalBinary() ([]byte, error) {
var buf bytes.Buffer
......@@ -67,16 +79,10 @@ func (b *BatchData) MarshalBinary() ([]byte, error) {
// encodeTyped encodes batch type and payload for each batch type.
func (b *BatchData) encodeTyped(buf *bytes.Buffer) error {
switch b.BatchType {
case SingularBatchType:
buf.WriteByte(SingularBatchType)
return rlp.Encode(buf, &b.SingularBatch)
case SpanBatchType:
buf.WriteByte(SpanBatchType)
return b.RawSpanBatch.encode(buf)
default:
return fmt.Errorf("unrecognized batch type: %d", b.BatchType)
if err := buf.WriteByte(b.GetBatchType()); err != nil {
return err
}
return b.inner.encode(buf)
}
// DecodeRLP implements rlp.Decoder
......@@ -99,35 +105,28 @@ func (b *BatchData) UnmarshalBinary(data []byte) error {
return b.decodeTyped(data)
}
// decodeTyped decodes batch type and payload for each batch type.
// decodeTyped decodes a typed batchData
func (b *BatchData) decodeTyped(data []byte) error {
if len(data) == 0 {
return fmt.Errorf("batch too short")
return errors.New("batch too short")
}
var inner InnerBatchData
switch data[0] {
case SingularBatchType:
b.BatchType = SingularBatchType
return rlp.DecodeBytes(data[1:], &b.SingularBatch)
inner = new(SingularBatch)
case SpanBatchType:
b.BatchType = int(data[0])
return b.RawSpanBatch.decodeBytes(data[1:])
inner = new(RawSpanBatch)
default:
return fmt.Errorf("unrecognized batch type: %d", data[0])
}
}
// NewSingularBatchData creates new BatchData with SingularBatch
func NewSingularBatchData(singularBatch SingularBatch) *BatchData {
return &BatchData{
BatchType: SingularBatchType,
SingularBatch: singularBatch,
if err := inner.decode(bytes.NewReader(data[1:])); err != nil {
return err
}
b.inner = inner
return nil
}
// NewSpanBatchData creates new BatchData with SpanBatch
func NewSpanBatchData(spanBatch RawSpanBatch) *BatchData {
return &BatchData{
BatchType: SpanBatchType,
RawSpanBatch: spanBatch,
}
// NewBatchData creates a new BatchData
func NewBatchData(inner InnerBatchData) *BatchData {
return &BatchData{inner: inner}
}
This diff is collapsed.
This diff is collapsed.
......@@ -6,15 +6,16 @@ import (
"math/rand"
"testing"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testutils"
)
func RandomRawSpanBatch(rng *rand.Rand, chainId *big.Int) *RawSpanBatch {
......@@ -52,8 +53,8 @@ func RandomRawSpanBatch(rng *rand.Rand, chainId *big.Int) *RawSpanBatch {
spanBatchPrefix: spanBatchPrefix{
relTimestamp: uint64(rng.Uint32()),
l1OriginNum: rng.Uint64(),
parentCheck: testutils.RandomData(rng, 20),
l1OriginCheck: testutils.RandomData(rng, 20),
parentCheck: [20]byte(testutils.RandomData(rng, 20)),
l1OriginCheck: [20]byte(testutils.RandomData(rng, 20)),
},
spanBatchPayload: spanBatchPayload{
blockCount: blockCount,
......@@ -141,40 +142,42 @@ func TestBatchRoundTrip(t *testing.T) {
chainID := new(big.Int).SetUint64(rng.Uint64())
batches := []*BatchData{
{
SingularBatch: SingularBatch{
NewBatchData(
&SingularBatch{
ParentHash: common.Hash{},
EpochNum: 0,
Timestamp: 0,
Transactions: []hexutil.Bytes{},
},
},
{
SingularBatch: SingularBatch{
),
NewBatchData(
&SingularBatch{
ParentHash: common.Hash{31: 0x42},
EpochNum: 1,
Timestamp: 1647026951,
Transactions: []hexutil.Bytes{[]byte{0, 0, 0}, []byte{0x76, 0xfd, 0x7c}},
},
},
NewSingularBatchData(*RandomSingularBatch(rng, 5, chainID)),
NewSingularBatchData(*RandomSingularBatch(rng, 7, chainID)),
NewSpanBatchData(*RandomRawSpanBatch(rng, chainID)),
NewSpanBatchData(*RandomRawSpanBatch(rng, chainID)),
NewSpanBatchData(*RandomRawSpanBatch(rng, chainID)),
),
NewBatchData(RandomSingularBatch(rng, 5, chainID)),
NewBatchData(RandomSingularBatch(rng, 7, chainID)),
NewBatchData(RandomRawSpanBatch(rng, chainID)),
NewBatchData(RandomRawSpanBatch(rng, chainID)),
NewBatchData(RandomRawSpanBatch(rng, chainID)),
}
for i, batch := range batches {
enc, err := batch.MarshalBinary()
assert.NoError(t, err)
require.NoError(t, err)
var dec BatchData
err = dec.UnmarshalBinary(enc)
assert.NoError(t, err)
if dec.BatchType == SpanBatchType {
_, err := dec.RawSpanBatch.derive(blockTime, genesisTimestamp, chainID)
assert.NoError(t, err)
require.NoError(t, err)
if dec.GetBatchType() == SpanBatchType {
rawSpanBatch, ok := dec.inner.(*RawSpanBatch)
require.True(t, ok)
_, err := rawSpanBatch.derive(blockTime, genesisTimestamp, chainID)
require.NoError(t, err)
}
assert.Equal(t, batch, &dec, "Batch not equal test case %v", i)
require.Equal(t, batch, &dec, "Batch not equal test case %v", i)
}
}
......@@ -185,43 +188,45 @@ func TestBatchRoundTripRLP(t *testing.T) {
chainID := new(big.Int).SetUint64(rng.Uint64())
batches := []*BatchData{
{
SingularBatch: SingularBatch{
NewBatchData(
&SingularBatch{
ParentHash: common.Hash{},
EpochNum: 0,
Timestamp: 0,
Transactions: []hexutil.Bytes{},
},
},
{
SingularBatch: SingularBatch{
),
NewBatchData(
&SingularBatch{
ParentHash: common.Hash{31: 0x42},
EpochNum: 1,
Timestamp: 1647026951,
Transactions: []hexutil.Bytes{[]byte{0, 0, 0}, []byte{0x76, 0xfd, 0x7c}},
},
},
NewSingularBatchData(*RandomSingularBatch(rng, 5, chainID)),
NewSingularBatchData(*RandomSingularBatch(rng, 7, chainID)),
NewSpanBatchData(*RandomRawSpanBatch(rng, chainID)),
NewSpanBatchData(*RandomRawSpanBatch(rng, chainID)),
NewSpanBatchData(*RandomRawSpanBatch(rng, chainID)),
),
NewBatchData(RandomSingularBatch(rng, 5, chainID)),
NewBatchData(RandomSingularBatch(rng, 7, chainID)),
NewBatchData(RandomRawSpanBatch(rng, chainID)),
NewBatchData(RandomRawSpanBatch(rng, chainID)),
NewBatchData(RandomRawSpanBatch(rng, chainID)),
}
for i, batch := range batches {
var buf bytes.Buffer
err := batch.EncodeRLP(&buf)
assert.NoError(t, err)
require.NoError(t, err)
result := buf.Bytes()
var dec BatchData
r := bytes.NewReader(result)
s := rlp.NewStream(r, 0)
err = dec.DecodeRLP(s)
assert.NoError(t, err)
if dec.BatchType == SpanBatchType {
_, err := dec.RawSpanBatch.derive(blockTime, genesisTimestamp, chainID)
assert.NoError(t, err)
require.NoError(t, err)
if dec.GetBatchType() == SpanBatchType {
rawSpanBatch, ok := dec.inner.(*RawSpanBatch)
require.True(t, ok)
_, err := rawSpanBatch.derive(blockTime, genesisTimestamp, chainID)
require.NoError(t, err)
}
assert.Equal(t, batch, &dec, "Batch not equal test case %v", i)
require.Equal(t, batch, &dec, "Batch not equal test case %v", i)
}
}
......@@ -17,13 +17,13 @@ func FuzzBatchRoundTrip(f *testing.F) {
typeProvider := fuzz.NewFromGoFuzz(fuzzedData).NilChance(0).MaxDepth(10000).NumElements(0, 0x100).AllowUnexportedFields(true)
fuzzerutils.AddFuzzerFunctions(typeProvider)
var singularBatch SingularBatch
typeProvider.Fuzz(&singularBatch)
// Create our batch data from fuzzed data
var batchData BatchData
typeProvider.Fuzz(&batchData)
// force batchdata to only contain singular batch
batchData.BatchType = SingularBatchType
batchData.RawSpanBatch = RawSpanBatch{}
batchData.inner = &singularBatch
// Encode our batch data
enc, err := batchData.MarshalBinary()
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -35,6 +35,7 @@ type Engine interface {
PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayload, error)
L2BlockRefByLabel(ctx context.Context, label eth.BlockLabel) (eth.L2BlockRef, error)
L2BlockRefByHash(ctx context.Context, l2Hash common.Hash) (eth.L2BlockRef, error)
L2BlockRefByNumber(ctx context.Context, num uint64) (eth.L2BlockRef, error)
SystemConfigL2Fetcher
}
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment