Commit d3cb9821 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into felipe/consensus-finalized-safe

parents 329ad4e4 06245265
...@@ -190,6 +190,6 @@ require ( ...@@ -190,6 +190,6 @@ require (
nhooyr.io/websocket v1.8.7 // indirect nhooyr.io/websocket v1.8.7 // indirect
) )
replace github.com/ethereum/go-ethereum v1.11.6 => github.com/ethereum-optimism/op-geth v1.101105.2-0.20230502202351-9cc072e922f6 replace github.com/ethereum/go-ethereum v1.11.6 => github.com/ethereum-optimism/op-geth v1.101105.2-0.20230526154603-bdab05ca786f
//replace github.com/ethereum/go-ethereum v1.11.6 => ../go-ethereum //replace github.com/ethereum/go-ethereum v1.11.6 => ../go-ethereum
...@@ -151,8 +151,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 ...@@ -151,8 +151,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7
github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
github.com/ethereum-optimism/go-ethereum-hdwallet v0.1.3 h1:RWHKLhCrQThMfch+QJ1Z8veEq5ZO3DfIhZ7xgRP9WTc= github.com/ethereum-optimism/go-ethereum-hdwallet v0.1.3 h1:RWHKLhCrQThMfch+QJ1Z8veEq5ZO3DfIhZ7xgRP9WTc=
github.com/ethereum-optimism/go-ethereum-hdwallet v0.1.3/go.mod h1:QziizLAiF0KqyLdNJYD7O5cpDlaFMNZzlxYNcWsJUxs= github.com/ethereum-optimism/go-ethereum-hdwallet v0.1.3/go.mod h1:QziizLAiF0KqyLdNJYD7O5cpDlaFMNZzlxYNcWsJUxs=
github.com/ethereum-optimism/op-geth v1.101105.2-0.20230502202351-9cc072e922f6 h1:Fh9VBmDCwjVn8amx1Dfrx+hIh16C/FDkS17EN25MGO8= github.com/ethereum-optimism/op-geth v1.101105.2-0.20230526154603-bdab05ca786f h1:+yZN8K/4AIN5f+gazMwZAeqzDG2EL2GydMrccjnEK+A=
github.com/ethereum-optimism/op-geth v1.101105.2-0.20230502202351-9cc072e922f6/go.mod h1:X9t7oeerFMU9/zMIjZKT/jbIca+O05QqtBTLjL+XVeA= github.com/ethereum-optimism/op-geth v1.101105.2-0.20230526154603-bdab05ca786f/go.mod h1:X9t7oeerFMU9/zMIjZKT/jbIca+O05QqtBTLjL+XVeA=
github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8= github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/fjl/memsize v0.0.1 h1:+zhkb+dhUgx0/e+M8sF0QqiouvMQUiKR+QYvdxIOKcQ= github.com/fjl/memsize v0.0.1 h1:+zhkb+dhUgx0/e+M8sF0QqiouvMQUiKR+QYvdxIOKcQ=
......
docker-compose.dev.yml
.env
# @eth-optimism/indexer
## Getting started
### Run indexer vs goerli
- install docker
- `cp example.env .env`
- fill in .env
- run `docker-compose up` to start the indexer vs optimism goerli network
### Run indexer with go
See the flags in `flags.go` for reference of what command line flags to pass to `go run`
### Run indexer vs devnet
TODO add indexer to the optimism devnet compose file (previously removed for breaking CI)
### Run indexer vs a custom configuration
`docker-compose.dev.yml` is git ignored. Fill in your own docker-compose file here.
version: '3.8'
services:
postgres:
image: postgres:latest
environment:
- POSTGRES_USER=db_username
- POSTGRES_PASSWORD=db_password
- POSTGRES_DB=db_name
- PGDATA=/data/postgres
- POSTGRES_HOST_AUTH_METHOD=trust
healthcheck:
test: [ "CMD-SHELL", "pg_isready -q -U db_username -d db_name" ]
ports:
- "5434:5432"
volumes:
- postgres_data:/data/postgres
indexer:
build:
context: ..
dockerfile: indexer/Dockerfile
healthcheck:
test: wget localhost:8080/healthz -q -O - > /dev/null 2>&1
environment:
# Note that you must index goerli with INDEXER_BEDROCK=false first, then
# reindex with INDEXER_BEDROCK=true or seed the database
- INDEXER_BEDROCK=${INDEXER_BEDROCK_GOERLI:-true}
- INDEXER_BUILD_ENV=${INDEXER_BUILD_ENV:-development}
- INDEXER_DB_PORT=${INDEXER_DB_PORT:-5432}
- INDEXER_DB_USER=${INDEXER_DB_USER:-db_username}
- INDEXER_DB_PASSWORD=${INDEXER_DB_PASSWORD:-db_password}
- INDEXER_DB_NAME=${INDEXER_DB_NAME:-db_name}
- INDEXER_DB_HOST=${INDEXER_DB_HOST:-postgres}
- INDEXER_CHAIN_ID=${INDEXER_CHAIN_ID:-5}
- INDEXER_L1_ETH_RPC=$INDEXER_L1_ETH_RPC
- INDEXER_L2_ETH_RPC=$INDEXER_L2_ETH_RPC
- INDEXER_REST_HOSTNAME=0.0.0.0
- INDEXER_REST_PORT=8080
- INDEXER_BEDROCK_L1_STANDARD_BRIDGE=0
- INDEXER_BEDROCK_L1_STANDARD_BRIDGE=0x636Af16bf2f682dD3109e60102b8E1A089FedAa8
- INDEXER_BEDROCK_OPTIMISM_PORTAL=0xB7040fd32359688346A3D1395a42114cf8E3b9b2
ports:
- 8080:8080
depends_on:
postgres:
condition: service_healthy
volumes:
postgres_data:
# Fill me in with goerli uris and run docker-compose up to run indexer vs goerli
INDEXER_L1_ETH_RPC=FILL_ME_IN
INDEXER_L2_ETH_RPC=FILL_ME_IN
...@@ -212,7 +212,14 @@ func TestGPOParamsChange(gt *testing.T) { ...@@ -212,7 +212,14 @@ func TestGPOParamsChange(gt *testing.T) {
receipt := alice.LastTxReceipt(t) receipt := alice.LastTxReceipt(t)
require.Equal(t, basefee, receipt.L1GasPrice, "L1 gas price matches basefee of L1 origin") require.Equal(t, basefee, receipt.L1GasPrice, "L1 gas price matches basefee of L1 origin")
require.NotZero(t, receipt.L1GasUsed, "L2 tx uses L1 data") require.NotZero(t, receipt.L1GasUsed, "L2 tx uses L1 data")
l1Cost := types.L1Cost(receipt.L1GasUsed.Uint64(), basefee, big.NewInt(2100), big.NewInt(1000_000)) require.Equal(t,
new(big.Float).Mul(
new(big.Float).SetInt(basefee),
new(big.Float).Mul(new(big.Float).SetInt(receipt.L1GasUsed), receipt.FeeScalar),
),
new(big.Float).SetInt(receipt.L1Fee), "fee field in receipt matches gas used times scalar times basefee")
// receipt.L1GasUsed includes the overhead already, so subtract that before passing it into the L1 cost func
l1Cost := types.L1Cost(receipt.L1GasUsed.Uint64()-2100, basefee, big.NewInt(2100), big.NewInt(1000_000))
require.Equal(t, l1Cost, receipt.L1Fee, "L1 fee is computed with standard GPO params") require.Equal(t, l1Cost, receipt.L1Fee, "L1 fee is computed with standard GPO params")
require.Equal(t, "1", receipt.FeeScalar.String(), "1000_000 divided by 6 decimals = float(1)") require.Equal(t, "1", receipt.FeeScalar.String(), "1000_000 divided by 6 decimals = float(1)")
...@@ -268,7 +275,8 @@ func TestGPOParamsChange(gt *testing.T) { ...@@ -268,7 +275,8 @@ func TestGPOParamsChange(gt *testing.T) {
receipt = alice.LastTxReceipt(t) receipt = alice.LastTxReceipt(t)
require.Equal(t, basefeeGPOUpdate, receipt.L1GasPrice, "L1 gas price matches basefee of L1 origin") require.Equal(t, basefeeGPOUpdate, receipt.L1GasPrice, "L1 gas price matches basefee of L1 origin")
require.NotZero(t, receipt.L1GasUsed, "L2 tx uses L1 data") require.NotZero(t, receipt.L1GasUsed, "L2 tx uses L1 data")
l1Cost = types.L1Cost(receipt.L1GasUsed.Uint64(), basefeeGPOUpdate, big.NewInt(1000), big.NewInt(2_300_000)) // subtract overhead from L1GasUsed receipt field, types.L1Cost applies it again
l1Cost = types.L1Cost(receipt.L1GasUsed.Uint64()-1000, basefeeGPOUpdate, big.NewInt(1000), big.NewInt(2_300_000))
require.Equal(t, l1Cost, receipt.L1Fee, "L1 fee is computed with updated GPO params") require.Equal(t, l1Cost, receipt.L1Fee, "L1 fee is computed with updated GPO params")
require.Equal(t, "2.3", receipt.FeeScalar.String(), "2_300_000 divided by 6 decimals = float(2.3)") require.Equal(t, "2.3", receipt.FeeScalar.String(), "2_300_000 divided by 6 decimals = float(2.3)")
...@@ -288,7 +296,8 @@ func TestGPOParamsChange(gt *testing.T) { ...@@ -288,7 +296,8 @@ func TestGPOParamsChange(gt *testing.T) {
receipt = alice.LastTxReceipt(t) receipt = alice.LastTxReceipt(t)
require.Equal(t, basefee, receipt.L1GasPrice, "L1 gas price matches basefee of L1 origin") require.Equal(t, basefee, receipt.L1GasPrice, "L1 gas price matches basefee of L1 origin")
require.NotZero(t, receipt.L1GasUsed, "L2 tx uses L1 data") require.NotZero(t, receipt.L1GasUsed, "L2 tx uses L1 data")
l1Cost = types.L1Cost(receipt.L1GasUsed.Uint64(), basefee, big.NewInt(1000), big.NewInt(2_300_000)) // subtract overhead from L1GasUsed receipt field, types.L1Cost applies it again
l1Cost = types.L1Cost(receipt.L1GasUsed.Uint64()-1000, basefee, big.NewInt(1000), big.NewInt(2_300_000))
require.Equal(t, l1Cost, receipt.L1Fee, "L1 fee is computed with updated GPO params") require.Equal(t, l1Cost, receipt.L1Fee, "L1 fee is computed with updated GPO params")
require.Equal(t, "2.3", receipt.FeeScalar.String(), "2_300_000 divided by 6 decimals = float(2.3)") require.Equal(t, "2.3", receipt.FeeScalar.String(), "2_300_000 divided by 6 decimals = float(2.3)")
} }
......
...@@ -100,6 +100,126 @@ func TestInvalidDepositInFCU(t *testing.T) { ...@@ -100,6 +100,126 @@ func TestInvalidDepositInFCU(t *testing.T) {
require.Equal(t, 0, balance.Cmp(common.Big0)) require.Equal(t, 0, balance.Cmp(common.Big0))
} }
// TestGethOnlyPendingBlockIsLatest walks through an engine-API block building job,
// and asserts that the pending block is set to match the latest block at every stage,
// for stability and tx-privacy.
func TestGethOnlyPendingBlockIsLatest(t *testing.T) {
InitParallel(t)
cfg := DefaultSystemConfig(t)
cfg.DeployConfig.FundDevAccounts = true
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
opGeth, err := NewOpGeth(t, ctx, &cfg)
require.NoError(t, err)
defer opGeth.Close()
checkPending := func(stage string, number uint64) {
// TODO(CLI-4044): pending-block ID change
pendingBlock, err := opGeth.L2Client.BlockByNumber(ctx, big.NewInt(-1))
require.NoError(t, err, "failed to fetch pending block at stage "+stage)
require.Equal(t, number, pendingBlock.NumberU64(), "pending block must have expected number")
latestBlock, err := opGeth.L2Client.BlockByNumber(ctx, nil)
require.NoError(t, err, "failed to fetch latest block at stage "+stage)
require.Equal(t, pendingBlock.Hash(), latestBlock.Hash(), "pending and latest do not match at stage "+stage)
}
checkPending("genesis", 0)
amount := big.NewInt(42) // send 42 wei
aliceStartBalance, err := opGeth.L2Client.PendingBalanceAt(ctx, cfg.Secrets.Addresses().Alice)
require.NoError(t, err)
require.True(t, aliceStartBalance.Cmp(big.NewInt(0)) > 0, "alice must be funded")
checkPendingBalance := func() {
pendingBalance, err := opGeth.L2Client.PendingBalanceAt(ctx, cfg.Secrets.Addresses().Alice)
require.NoError(t, err)
require.Equal(t, pendingBalance, aliceStartBalance, "pending balance must still be the same")
}
startBlock, err := opGeth.L2Client.BlockByNumber(ctx, nil)
require.NoError(t, err)
signer := types.LatestSigner(opGeth.L2ChainConfig)
tip := big.NewInt(7_000_000_000) // 7 gwei tip
tx := types.MustSignNewTx(cfg.Secrets.Alice, signer, &types.DynamicFeeTx{
ChainID: big.NewInt(int64(cfg.DeployConfig.L2ChainID)),
Nonce: 0,
GasTipCap: tip,
GasFeeCap: new(big.Int).Add(startBlock.BaseFee(), tip),
Gas: 1_000_000,
To: &cfg.Secrets.Addresses().Bob,
Value: amount,
Data: nil,
})
require.NoError(t, opGeth.L2Client.SendTransaction(ctx, tx), "send tx to make pending work different")
checkPending("prepared", 0)
rpcClient, err := opGeth.node.Attach()
require.NoError(t, err)
defer rpcClient.Close()
// Wait for tx to be in tx-pool, for it to be picked up in block building
var txPoolStatus struct {
Pending hexutil.Uint64 `json:"pending"`
}
for i := 0; i < 5; i++ {
require.NoError(t, rpcClient.CallContext(ctx, &txPoolStatus, "txpool_status"))
if txPoolStatus.Pending == 0 {
time.Sleep(time.Second)
} else {
break
}
}
require.NotZero(t, txPoolStatus.Pending, "must have pending tx in pool")
checkPending("in-pool", 0)
checkPendingBalance()
// start building a block
attrs, err := opGeth.CreatePayloadAttributes()
require.NoError(t, err)
attrs.NoTxPool = false // we want to include a tx
fc := eth.ForkchoiceState{
HeadBlockHash: opGeth.L2Head.BlockHash,
SafeBlockHash: opGeth.L2Head.BlockHash,
}
res, err := opGeth.l2Engine.ForkchoiceUpdate(ctx, &fc, attrs)
require.NoError(t, err)
checkPending("building", 0)
checkPendingBalance()
// Now we have to wait until the block-building job picks up the tx from the tx-pool.
// See go routine that spins up in buildPayload() func in payload_building.go in miner package.
// We can't check it, we don't want to finish block-building prematurely, and so we have to wait.
time.Sleep(time.Second * 4) // conservatively wait 4 seconds, CI might lag during block building.
// retrieve the block
payload, err := opGeth.l2Engine.GetPayload(ctx, *res.PayloadID)
require.NoError(t, err)
checkPending("retrieved", 0)
require.Len(t, payload.Transactions, 2, "must include L1 info tx and tx from alice")
checkPendingBalance()
// process the block
status, err := opGeth.l2Engine.NewPayload(ctx, payload)
require.NoError(t, err)
require.Equal(t, eth.ExecutionValid, status.Status)
checkPending("processed", 0)
checkPendingBalance()
// make the block canonical
fc = eth.ForkchoiceState{
HeadBlockHash: payload.BlockHash,
SafeBlockHash: payload.BlockHash,
}
res, err = opGeth.l2Engine.ForkchoiceUpdate(ctx, &fc, nil)
require.NoError(t, err)
require.Equal(t, eth.ExecutionValid, res.PayloadStatus.Status)
checkPending("canonical", 1)
}
func TestPreregolith(t *testing.T) { func TestPreregolith(t *testing.T) {
InitParallel(t) InitParallel(t)
futureTimestamp := hexutil.Uint64(4) futureTimestamp := hexutil.Uint64(4)
......
...@@ -679,7 +679,7 @@ func (sys *System) newMockNetPeer() (host.Host, error) { ...@@ -679,7 +679,7 @@ func (sys *System) newMockNetPeer() (host.Host, error) {
_ = ps.AddPubKey(p, sk.GetPublic()) _ = ps.AddPubKey(p, sk.GetPublic())
ds := sync.MutexWrap(ds.NewMapDatastore()) ds := sync.MutexWrap(ds.NewMapDatastore())
eps, err := store.NewExtendedPeerstore(context.Background(), log.Root(), clock.SystemClock, ps, ds) eps, err := store.NewExtendedPeerstore(context.Background(), log.Root(), clock.SystemClock, ps, ds, 24*time.Hour)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -241,12 +241,14 @@ func TestPendingGasLimit(t *testing.T) { ...@@ -241,12 +241,14 @@ func TestPendingGasLimit(t *testing.T) {
cfg.GethOptions["sequencer"] = []GethOption{ cfg.GethOptions["sequencer"] = []GethOption{
func(ethCfg *ethconfig.Config, nodeCfg *node.Config) error { func(ethCfg *ethconfig.Config, nodeCfg *node.Config) error {
ethCfg.Miner.GasCeil = 10_000_000 ethCfg.Miner.GasCeil = 10_000_000
ethCfg.Miner.RollupComputePendingBlock = true
return nil return nil
}, },
} }
cfg.GethOptions["verifier"] = []GethOption{ cfg.GethOptions["verifier"] = []GethOption{
func(ethCfg *ethconfig.Config, nodeCfg *node.Config) error { func(ethCfg *ethconfig.Config, nodeCfg *node.Config) error {
ethCfg.Miner.GasCeil = 9_000_000 ethCfg.Miner.GasCeil = 9_000_000
ethCfg.Miner.RollupComputePendingBlock = true
return nil return nil
}, },
} }
...@@ -1230,6 +1232,14 @@ func TestFees(t *testing.T) { ...@@ -1230,6 +1232,14 @@ func TestFees(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
require.Equal(t, l1Fee, gpoL1Fee, "l1 fee mismatch") require.Equal(t, l1Fee, gpoL1Fee, "l1 fee mismatch")
require.Equal(t, receipt.L1Fee, l1Fee, "l1 fee in receipt is correct")
require.Equal(t,
new(big.Float).Mul(
new(big.Float).SetInt(l1Header.BaseFee),
new(big.Float).Mul(new(big.Float).SetInt(receipt.L1GasUsed), receipt.FeeScalar),
),
new(big.Float).SetInt(receipt.L1Fee), "fee field in receipt matches gas used times scalar times basefee")
// Calculate total fee // Calculate total fee
baseFeeRecipientDiff.Add(baseFeeRecipientDiff, coinbaseDiff) baseFeeRecipientDiff.Add(baseFeeRecipientDiff, coinbaseDiff)
totalFee := new(big.Int).Add(baseFeeRecipientDiff, l1FeeRecipientDiff) totalFee := new(big.Int).Add(baseFeeRecipientDiff, l1FeeRecipientDiff)
...@@ -1371,3 +1381,45 @@ func latestBlock(t *testing.T, client *ethclient.Client) uint64 { ...@@ -1371,3 +1381,45 @@ func latestBlock(t *testing.T, client *ethclient.Client) uint64 {
require.Nil(t, err, "Error getting latest block") require.Nil(t, err, "Error getting latest block")
return blockAfter return blockAfter
} }
// TestPendingBlockIsLatest tests that we serve the latest block as pending block
func TestPendingBlockIsLatest(t *testing.T) {
InitParallel(t)
cfg := DefaultSystemConfig(t)
sys, err := cfg.Start()
require.Nil(t, err, "Error starting up system")
defer sys.Close()
l2Seq := sys.Clients["sequencer"]
t.Run("block", func(t *testing.T) {
for i := 0; i < 10; i++ {
// TODO(CLI-4044): pending-block ID change
pending, err := l2Seq.BlockByNumber(context.Background(), big.NewInt(-1))
require.NoError(t, err)
latest, err := l2Seq.BlockByNumber(context.Background(), nil)
require.NoError(t, err)
if pending.NumberU64() == latest.NumberU64() {
require.Equal(t, pending.Hash(), latest.Hash(), "pending must exactly match latest block")
return
}
// re-try until we have the same number, as the requests are not an atomic bundle, and the sequencer may create a block.
}
t.Fatal("failed to get pending block with same number as latest block")
})
t.Run("header", func(t *testing.T) {
for i := 0; i < 10; i++ {
// TODO(CLI-4044): pending-block ID change
pending, err := l2Seq.HeaderByNumber(context.Background(), big.NewInt(-1))
require.NoError(t, err)
latest, err := l2Seq.HeaderByNumber(context.Background(), nil)
require.NoError(t, err)
if pending.Number.Uint64() == latest.Number.Uint64() {
require.Equal(t, pending.Hash(), latest.Hash(), "pending must exactly match latest header")
return
}
// re-try until we have the same number, as the requests are not an atomic bundle, and the sequencer may create a block.
}
t.Fatal("failed to get pending header with same number as latest header")
})
}
...@@ -141,7 +141,8 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics Host ...@@ -141,7 +141,8 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics Host
return nil, fmt.Errorf("failed to open peerstore: %w", err) return nil, fmt.Errorf("failed to open peerstore: %w", err)
} }
ps, err := store.NewExtendedPeerstore(context.Background(), log, clock.SystemClock, basePs, conf.Store) peerScoreParams := conf.PeerScoringParams()
ps, err := store.NewExtendedPeerstore(context.Background(), log, clock.SystemClock, basePs, conf.Store, peerScoreParams.RetainScore)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to open extended peerstore: %w", err) return nil, fmt.Errorf("failed to open extended peerstore: %w", err)
} }
......
...@@ -76,7 +76,7 @@ func getNetHosts(testSuite *PeerScoresTestSuite, ctx context.Context, n int) []h ...@@ -76,7 +76,7 @@ func getNetHosts(testSuite *PeerScoresTestSuite, ctx context.Context, n int) []h
log := testlog.Logger(testSuite.T(), log.LvlError) log := testlog.Logger(testSuite.T(), log.LvlError)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
swarm := tswarm.GenSwarm(testSuite.T()) swarm := tswarm.GenSwarm(testSuite.T())
eps, err := store.NewExtendedPeerstore(ctx, log, clock.SystemClock, swarm.Peerstore(), sync.MutexWrap(ds.NewMapDatastore())) eps, err := store.NewExtendedPeerstore(ctx, log, clock.SystemClock, swarm.Peerstore(), sync.MutexWrap(ds.NewMapDatastore()), 1*time.Hour)
netw := &customPeerstoreNetwork{swarm, eps} netw := &customPeerstoreNetwork{swarm, eps}
require.NoError(testSuite.T(), err) require.NoError(testSuite.T(), err)
h := bhost.NewBlankHost(netw) h := bhost.NewBlankHost(netw)
...@@ -99,7 +99,7 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts [] ...@@ -99,7 +99,7 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts []
dataStore := sync.MutexWrap(ds.NewMapDatastore()) dataStore := sync.MutexWrap(ds.NewMapDatastore())
peerStore, err := pstoreds.NewPeerstore(context.Background(), dataStore, pstoreds.DefaultOpts()) peerStore, err := pstoreds.NewPeerstore(context.Background(), dataStore, pstoreds.DefaultOpts())
require.NoError(testSuite.T(), err) require.NoError(testSuite.T(), err)
extPeerStore, err := store.NewExtendedPeerstore(context.Background(), logger, clock.SystemClock, peerStore, dataStore) extPeerStore, err := store.NewExtendedPeerstore(context.Background(), logger, clock.SystemClock, peerStore, dataStore, 1*time.Hour)
require.NoError(testSuite.T(), err) require.NoError(testSuite.T(), err)
scorer := NewScorer( scorer := NewScorer(
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock" "github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
...@@ -19,12 +20,12 @@ type extendedStore struct { ...@@ -19,12 +20,12 @@ type extendedStore struct {
*ipBanBook *ipBanBook
} }
func NewExtendedPeerstore(ctx context.Context, logger log.Logger, clock clock.Clock, ps peerstore.Peerstore, store ds.Batching) (ExtendedPeerstore, error) { func NewExtendedPeerstore(ctx context.Context, logger log.Logger, clock clock.Clock, ps peerstore.Peerstore, store ds.Batching, scoreRetention time.Duration) (ExtendedPeerstore, error) {
cab, ok := peerstore.GetCertifiedAddrBook(ps) cab, ok := peerstore.GetCertifiedAddrBook(ps)
if !ok { if !ok {
return nil, errors.New("peerstore should also be a certified address book") return nil, errors.New("peerstore should also be a certified address book")
} }
sb, err := newScoreBook(ctx, logger, clock, store) sb, err := newScoreBook(ctx, logger, clock, store, scoreRetention)
if err != nil { if err != nil {
return nil, fmt.Errorf("create scorebook: %w", err) return nil, fmt.Errorf("create scorebook: %w", err)
} }
......
...@@ -102,6 +102,9 @@ func (d *recordsBook[K, V]) deleteRecord(key K) error { ...@@ -102,6 +102,9 @@ func (d *recordsBook[K, V]) deleteRecord(key K) error {
func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) { func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) {
if val, ok := d.cache.Get(key); ok { if val, ok := d.cache.Get(key); ok {
if d.hasExpired(val) {
return v, UnknownRecordErr
}
return val, nil return val, nil
} }
data, err := d.store.Get(d.ctx, d.dsKey(key)) data, err := d.store.Get(d.ctx, d.dsKey(key))
...@@ -114,6 +117,9 @@ func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) { ...@@ -114,6 +117,9 @@ func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) {
if err := v.UnmarshalBinary(data); err != nil { if err := v.UnmarshalBinary(data); err != nil {
return v, fmt.Errorf("invalid value for key %v: %w", key, err) return v, fmt.Errorf("invalid value for key %v: %w", key, err)
} }
if d.hasExpired(v) {
return v, UnknownRecordErr
}
d.cache.Add(key, v) d.cache.Add(key, v)
return v, nil return v, nil
} }
...@@ -142,9 +148,9 @@ func (d *recordsBook[K, V]) SetRecord(key K, diff recordDiff[V]) error { ...@@ -142,9 +148,9 @@ func (d *recordsBook[K, V]) SetRecord(key K, diff recordDiff[V]) error {
} }
// prune deletes entries from the store that are older than the configured prune expiration. // prune deletes entries from the store that are older than the configured prune expiration.
// Note that the expiry period is not a strict TTL. Entries that are eligible for deletion may still be present // Entries that are eligible for deletion may still be present either because the prune function hasn't yet run or
// either because the prune function hasn't yet run or because they are still preserved in the in-memory cache after // because they are still preserved in the in-memory cache after having been deleted from the database.
// having been deleted from the database. // Such expired entries are filtered out in getRecord
func (d *recordsBook[K, V]) prune() error { func (d *recordsBook[K, V]) prune() error {
results, err := d.store.Query(d.ctx, query.Query{ results, err := d.store.Query(d.ctx, query.Query{
Prefix: d.dsBaseKey.String(), Prefix: d.dsBaseKey.String(),
...@@ -168,7 +174,7 @@ func (d *recordsBook[K, V]) prune() error { ...@@ -168,7 +174,7 @@ func (d *recordsBook[K, V]) prune() error {
if err := v.UnmarshalBinary(result.Value); err != nil { if err := v.UnmarshalBinary(result.Value); err != nil {
return err return err
} }
if v.LastUpdated().Add(d.recordExpiry).Before(d.clock.Now()) { if d.hasExpired(v) {
if pending > maxPruneBatchSize { if pending > maxPruneBatchSize {
if err := batch.Commit(d.ctx); err != nil { if err := batch.Commit(d.ctx); err != nil {
return err return err
...@@ -191,6 +197,10 @@ func (d *recordsBook[K, V]) prune() error { ...@@ -191,6 +197,10 @@ func (d *recordsBook[K, V]) prune() error {
return nil return nil
} }
func (d *recordsBook[K, V]) hasExpired(v V) bool {
return v.LastUpdated().Add(d.recordExpiry).Before(d.clock.Now())
}
func (d *recordsBook[K, V]) Close() { func (d *recordsBook[K, V]) Close() {
d.cancelFn() d.cancelFn()
d.bgTasks.Wait() d.bgTasks.Wait()
......
...@@ -12,8 +12,7 @@ import ( ...@@ -12,8 +12,7 @@ import (
) )
const ( const (
scoreCacheSize = 100 scoreCacheSize = 100
scoreRecordExpiryPeriod = 24 * time.Hour
) )
var scoresBase = ds.NewKey("/peers/scores") var scoresBase = ds.NewKey("/peers/scores")
...@@ -56,8 +55,8 @@ func peerIDKey(id peer.ID) ds.Key { ...@@ -56,8 +55,8 @@ func peerIDKey(id peer.ID) ds.Key {
return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(id))) return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(id)))
} }
func newScoreBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*scoreBook, error) { func newScoreBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching, retain time.Duration) (*scoreBook, error) {
book, err := newRecordsBook[peer.ID, *scoreRecord](ctx, logger, clock, store, scoreCacheSize, scoreRecordExpiryPeriod, scoresBase, newScoreRecord, peerIDKey) book, err := newRecordsBook[peer.ID, *scoreRecord](ctx, logger, clock, store, scoreCacheSize, retain, scoresBase, newScoreRecord, peerIDKey)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -81,7 +81,7 @@ func TestPrune(t *testing.T) { ...@@ -81,7 +81,7 @@ func TestPrune(t *testing.T) {
logger := testlog.Logger(t, log.LvlInfo) logger := testlog.Logger(t, log.LvlInfo)
store := sync.MutexWrap(ds.NewMapDatastore()) store := sync.MutexWrap(ds.NewMapDatastore())
clock := clock.NewDeterministicClock(time.UnixMilli(1000)) clock := clock.NewDeterministicClock(time.UnixMilli(1000))
book, err := newScoreBook(ctx, logger, clock, store) book, err := newScoreBook(ctx, logger, clock, store, 24*time.Hour)
require.NoError(t, err) require.NoError(t, err)
hasScoreRecorded := func(id peer.ID) bool { hasScoreRecorded := func(id peer.ID) bool {
...@@ -135,7 +135,7 @@ func TestPruneMultipleBatches(t *testing.T) { ...@@ -135,7 +135,7 @@ func TestPruneMultipleBatches(t *testing.T) {
defer cancelFunc() defer cancelFunc()
logger := testlog.Logger(t, log.LvlInfo) logger := testlog.Logger(t, log.LvlInfo)
clock := clock.NewDeterministicClock(time.UnixMilli(1000)) clock := clock.NewDeterministicClock(time.UnixMilli(1000))
book, err := newScoreBook(ctx, logger, clock, sync.MutexWrap(ds.NewMapDatastore())) book, err := newScoreBook(ctx, logger, clock, sync.MutexWrap(ds.NewMapDatastore()), 24*time.Hour)
require.NoError(t, err) require.NoError(t, err)
hasScoreRecorded := func(id peer.ID) bool { hasScoreRecorded := func(id peer.ID) bool {
...@@ -159,6 +159,31 @@ func TestPruneMultipleBatches(t *testing.T) { ...@@ -159,6 +159,31 @@ func TestPruneMultipleBatches(t *testing.T) {
} }
} }
// Check that scores that are eligible for pruning are not returned, even if they haven't yet been removed
func TestIgnoreOutdatedScores(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
logger := testlog.Logger(t, log.LvlInfo)
clock := clock.NewDeterministicClock(time.UnixMilli(1000))
retentionPeriod := 24 * time.Hour
book, err := newScoreBook(ctx, logger, clock, sync.MutexWrap(ds.NewMapDatastore()), retentionPeriod)
require.NoError(t, err)
require.NoError(t, book.SetScore("a", &GossipScores{Total: 123.45}))
clock.AdvanceTime(retentionPeriod + 1)
// Not available from cache
scores, err := book.GetPeerScores("a")
require.NoError(t, err)
require.Equal(t, scores, PeerScores{})
book.book.cache.Purge()
// Not available from disk
scores, err = book.GetPeerScores("a")
require.NoError(t, err)
require.Equal(t, scores, PeerScores{})
}
func assertPeerScores(t *testing.T, store ExtendedPeerstore, id peer.ID, expected PeerScores) { func assertPeerScores(t *testing.T, store ExtendedPeerstore, id peer.ID, expected PeerScores) {
result, err := store.GetPeerScores(id) result, err := store.GetPeerScores(id)
require.NoError(t, err) require.NoError(t, err)
...@@ -174,8 +199,8 @@ func createPeerstoreWithBacking(t *testing.T, store *sync.MutexDatastore) Extend ...@@ -174,8 +199,8 @@ func createPeerstoreWithBacking(t *testing.T, store *sync.MutexDatastore) Extend
ps, err := pstoreds.NewPeerstore(context.Background(), store, pstoreds.DefaultOpts()) ps, err := pstoreds.NewPeerstore(context.Background(), store, pstoreds.DefaultOpts())
require.NoError(t, err, "Failed to create peerstore") require.NoError(t, err, "Failed to create peerstore")
logger := testlog.Logger(t, log.LvlInfo) logger := testlog.Logger(t, log.LvlInfo)
clock := clock.NewDeterministicClock(time.UnixMilli(100)) c := clock.NewDeterministicClock(time.UnixMilli(100))
eps, err := NewExtendedPeerstore(context.Background(), logger, clock, ps, store) eps, err := NewExtendedPeerstore(context.Background(), logger, c, ps, store, 24*time.Hour)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
_ = eps.Close() _ = eps.Close()
......
...@@ -49,6 +49,7 @@ RUN apt-get update && \ ...@@ -49,6 +49,7 @@ RUN apt-get update && \
ln -s /usr/local/go/bin/gofmt /usr/local/bin/gofmt && \ ln -s /usr/local/go/bin/gofmt /usr/local/bin/gofmt && \
bash nodesource_setup.sh && \ bash nodesource_setup.sh && \
apt-get install -y nodejs && \ apt-get install -y nodejs && \
npm i -g npm@8.11.0 \
npm i -g yarn && \ npm i -g yarn && \
npm i -g depcheck && \ npm i -g depcheck && \
pip install slither-analyzer==0.9.1 && \ pip install slither-analyzer==0.9.1 && \
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment