Commit fdabf5d9 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into aj/integrate-extended-peerstore

parents 9369371c 5bce421e
......@@ -5,6 +5,8 @@ import (
"errors"
"fmt"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
ds "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p/core/peerstore"
)
......@@ -15,15 +17,16 @@ type extendedStore struct {
*scoreBook
}
func NewExtendedPeerstore(ctx context.Context, ps peerstore.Peerstore, store ds.Batching) (ExtendedPeerstore, error) {
func NewExtendedPeerstore(ctx context.Context, logger log.Logger, clock clock.Clock, ps peerstore.Peerstore, store ds.Batching) (ExtendedPeerstore, error) {
cab, ok := peerstore.GetCertifiedAddrBook(ps)
if !ok {
return nil, errors.New("peerstore should also be a certified address book")
}
sb, err := newScoreBook(ctx, store)
sb, err := newScoreBook(ctx, logger, clock, store)
if err != nil {
return nil, fmt.Errorf("create scorebook: %w", err)
}
sb.startGC()
return &extendedStore{
Peerstore: ps,
CertifiedAddrBook: cab,
......@@ -31,4 +34,9 @@ func NewExtendedPeerstore(ctx context.Context, ps peerstore.Peerstore, store ds.
}, nil
}
func (s *extendedStore) Close() error {
s.scoreBook.Close()
return s.Peerstore.Close()
}
var _ ExtendedPeerstore = (*extendedStore)(nil)
package store
import (
"context"
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
)
const (
gcPeriod = 2 * time.Hour
)
type gcAction func() error
func startGc(ctx context.Context, logger log.Logger, clock clock.Clock, bgTasks *sync.WaitGroup, action gcAction) {
bgTasks.Add(1)
go func() {
defer bgTasks.Done()
gcTimer := clock.NewTicker(gcPeriod)
defer gcTimer.Stop()
for {
select {
case <-gcTimer.Ch():
if err := action(); err != nil {
logger.Warn("GC failed", "err", err)
}
case <-ctx.Done():
return
}
}
}()
}
package store
import (
"context"
"sync"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func TestScheduleGcPeriodically(t *testing.T) {
var bgTasks sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer func() {
cancel()
// Wait for the gc background process to complete after cancelling the context
bgTasks.Wait()
}()
logger := testlog.Logger(t, log.LvlInfo)
clock := clock.NewDeterministicClock(time.UnixMilli(5000))
called := make(chan struct{}, 10)
action := func() error {
called <- struct{}{}
return nil
}
waitForGc := func(failMsg string) {
timeout, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
select {
case <-timeout.Done():
t.Fatal(failMsg)
case <-called:
require.Len(t, called, 0, "should only run once after gc period")
}
}
startGc(ctx, logger, clock, &bgTasks, action)
timeout, tCancel := context.WithTimeout(ctx, 10*time.Second)
defer tCancel()
require.True(t, clock.WaitForNewPendingTask(timeout), "did not schedule pending GC")
require.Len(t, called, 0, "should not run immediately")
clock.AdvanceTime(gcPeriod)
waitForGc("should run gc after first time period")
clock.AdvanceTime(gcPeriod)
waitForGc("should run gc again after second time period")
}
......@@ -5,71 +5,101 @@ import (
"errors"
"fmt"
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
lru "github.com/hashicorp/golang-lru/v2"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-base32"
)
type scoreBook struct {
ctx context.Context
store ds.Batching
cache *lru.Cache[peer.ID, PeerScores]
sync.RWMutex
}
const (
scoreDataV0 = "0"
scoreCacheSize = 100
expiryPeriod = 24 * time.Hour
maxPruneBatchSize = 20
)
var scoresBase = ds.NewKey("/peers/scores")
const (
scoreDataV0 = "0"
scoreCacheSize = 100
)
type scoreRecord struct {
PeerScores
lastUpdate time.Time
}
type scoreBook struct {
ctx context.Context
cancelFn context.CancelFunc
clock clock.Clock
log log.Logger
bgTasks sync.WaitGroup
store ds.Batching
cache *lru.Cache[peer.ID, scoreRecord]
sync.RWMutex
}
func newScoreBook(ctx context.Context, store ds.Batching) (*scoreBook, error) {
cache, err := lru.New[peer.ID, PeerScores](scoreCacheSize)
func newScoreBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*scoreBook, error) {
cache, err := lru.New[peer.ID, scoreRecord](scoreCacheSize)
if err != nil {
return nil, fmt.Errorf("creating cache: %w", err)
}
return &scoreBook{
ctx: ctx,
store: store,
cache: cache,
}, nil
ctx, cancelFn := context.WithCancel(ctx)
book := scoreBook{
ctx: ctx,
cancelFn: cancelFn,
clock: clock,
log: logger,
store: store,
cache: cache,
}
return &book, nil
}
func (d *scoreBook) startGC() {
startGc(d.ctx, d.log, d.clock, &d.bgTasks, d.prune)
}
func (d *scoreBook) GetPeerScores(id peer.ID) (PeerScores, error) {
d.RLock()
defer d.RUnlock()
return d.getPeerScoresNoLock(id)
record, err := d.getRecord(id)
if err != nil {
return PeerScores{}, nil
}
return record.PeerScores, err
}
func (d *scoreBook) getPeerScoresNoLock(id peer.ID) (PeerScores, error) {
scores, ok := d.cache.Get(id)
if ok {
func (d *scoreBook) getRecord(id peer.ID) (scoreRecord, error) {
if scores, ok := d.cache.Get(id); ok {
return scores, nil
}
data, err := d.store.Get(d.ctx, scoreKey(id))
if errors.Is(err, ds.ErrNotFound) {
return PeerScores{}, nil
return scoreRecord{}, nil
} else if err != nil {
return PeerScores{}, fmt.Errorf("load scores for peer %v: %w", id, err)
return scoreRecord{}, fmt.Errorf("load scores for peer %v: %w", id, err)
}
scores, err = deserializeScoresV0(data)
record, err := deserializeScoresV0(data)
if err != nil {
return PeerScores{}, fmt.Errorf("invalid score data for peer %v: %w", id, err)
return scoreRecord{}, fmt.Errorf("invalid score data for peer %v: %w", id, err)
}
d.cache.Add(id, scores)
return scores, nil
d.cache.Add(id, record)
return record, nil
}
func (d *scoreBook) SetScore(id peer.ID, scoreType ScoreType, score float64) error {
d.Lock()
defer d.Unlock()
scores, err := d.getPeerScoresNoLock(id)
scores, err := d.getRecord(id)
if err != nil {
return err
}
scores.lastUpdate = d.clock.Now()
scores.Gossip = score
switch scoreType {
case TypeGossip:
scores.Gossip = score
......@@ -88,6 +118,61 @@ func (d *scoreBook) SetScore(id peer.ID, scoreType ScoreType, score float64) err
return nil
}
// prune deletes entries from the store that are older than expiryPeriod.
// Note that the expiry period is not a strict TTL. Entries that are eligible for deletion may still be present
// either because the prune function hasn't yet run or because they are still preserved in the in-memory cache after
// having been deleted from the database.
func (d *scoreBook) prune() error {
results, err := d.store.Query(d.ctx, query.Query{
Prefix: scoresBase.String(),
})
if err != nil {
return err
}
pending := 0
batch, err := d.store.Batch(d.ctx)
if err != nil {
return err
}
for result := range results.Next() {
// Bail out if the context is done
select {
case <-d.ctx.Done():
return d.ctx.Err()
default:
}
record, err := deserializeScoresV0(result.Value)
if err != nil {
return err
}
if record.lastUpdate.Add(expiryPeriod).Before(d.clock.Now()) {
if pending > maxPruneBatchSize {
if err := batch.Commit(d.ctx); err != nil {
return err
}
batch, err = d.store.Batch(d.ctx)
if err != nil {
return err
}
pending = 0
}
pending++
if err := batch.Delete(d.ctx, ds.NewKey(result.Key)); err != nil {
return err
}
}
}
if err := batch.Commit(d.ctx); err != nil {
return err
}
return nil
}
func (d *scoreBook) Close() {
d.cancelFn()
d.bgTasks.Wait()
}
func scoreKey(id peer.ID) ds.Key {
return scoresBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(id))).ChildString(scoreDataV0)
}
......@@ -2,8 +2,13 @@ package store
import (
"context"
"strconv"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p/core/peer"
......@@ -71,6 +76,95 @@ func TestUnknownScoreType(t *testing.T) {
require.ErrorContains(t, err, "unknown score type")
}
func TestCloseCompletes(t *testing.T) {
store := createMemoryStore(t)
require.NoError(t, store.Close())
}
func TestPrune(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
logger := testlog.Logger(t, log.LvlInfo)
store := sync.MutexWrap(ds.NewMapDatastore())
clock := clock.NewDeterministicClock(time.UnixMilli(1000))
book, err := newScoreBook(ctx, logger, clock, store)
require.NoError(t, err)
hasScoreRecorded := func(id peer.ID) bool {
scores, err := book.GetPeerScores(id)
require.NoError(t, err)
return scores != PeerScores{}
}
firstStore := clock.Now()
// Set some scores all 30 minutes apart so they have different expiry times
require.NoError(t, book.SetScore("aaaa", TypeGossip, 123.45))
clock.AdvanceTime(30 * time.Minute)
require.NoError(t, book.SetScore("bbbb", TypeGossip, 123.45))
clock.AdvanceTime(30 * time.Minute)
require.NoError(t, book.SetScore("cccc", TypeGossip, 123.45))
clock.AdvanceTime(30 * time.Minute)
require.NoError(t, book.SetScore("dddd", TypeGossip, 123.45))
clock.AdvanceTime(30 * time.Minute)
// Update bbbb again which should extend its expiry
require.NoError(t, book.SetScore("bbbb", TypeGossip, 123.45))
require.True(t, hasScoreRecorded("aaaa"))
require.True(t, hasScoreRecorded("bbbb"))
require.True(t, hasScoreRecorded("cccc"))
require.True(t, hasScoreRecorded("dddd"))
elapsedTime := clock.Now().Sub(firstStore)
timeToFirstExpiry := expiryPeriod - elapsedTime
// Advance time until the score for aaaa should be pruned.
clock.AdvanceTime(timeToFirstExpiry + 1)
require.NoError(t, book.prune())
// Clear the cache so reads have to come from the database
book.cache.Purge()
require.False(t, hasScoreRecorded("aaaa"), "should have pruned aaaa record")
// Advance time so cccc, dddd and the original bbbb entry should be pruned
clock.AdvanceTime(90 * time.Minute)
require.NoError(t, book.prune())
// Clear the cache so reads have to come from the database
book.cache.Purge()
require.False(t, hasScoreRecorded("cccc"), "should have pruned cccc record")
require.False(t, hasScoreRecorded("dddd"), "should have pruned cccc record")
require.True(t, hasScoreRecorded("bbbb"), "should not prune bbbb record")
}
func TestPruneMultipleBatches(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
logger := testlog.Logger(t, log.LvlInfo)
clock := clock.NewDeterministicClock(time.UnixMilli(1000))
book, err := newScoreBook(ctx, logger, clock, sync.MutexWrap(ds.NewMapDatastore()))
require.NoError(t, err)
hasScoreRecorded := func(id peer.ID) bool {
scores, err := book.GetPeerScores(id)
require.NoError(t, err)
return scores != PeerScores{}
}
// Set scores for more peers than the max batch size
peerCount := maxPruneBatchSize*3 + 5
for i := 0; i < peerCount; i++ {
require.NoError(t, book.SetScore(peer.ID(strconv.Itoa(i)), TypeGossip, 123.45))
}
clock.AdvanceTime(expiryPeriod + 1)
require.NoError(t, book.prune())
// Clear the cache so reads have to come from the database
book.cache.Purge()
for i := 0; i < peerCount; i++ {
require.Falsef(t, hasScoreRecorded(peer.ID(strconv.Itoa(i))), "Should prune record peer %v", i)
}
}
func assertPeerScores(t *testing.T, store ExtendedPeerstore, id peer.ID, expected PeerScores) {
result, err := store.GetPeerScores(id)
require.NoError(t, err)
......@@ -85,7 +179,12 @@ func createMemoryStore(t *testing.T) ExtendedPeerstore {
func createPeerstoreWithBacking(t *testing.T, store *sync.MutexDatastore) ExtendedPeerstore {
ps, err := pstoreds.NewPeerstore(context.Background(), store, pstoreds.DefaultOpts())
require.NoError(t, err, "Failed to create peerstore")
eps, err := NewExtendedPeerstore(context.Background(), ps, store)
logger := testlog.Logger(t, log.LvlInfo)
clock := clock.NewDeterministicClock(time.UnixMilli(100))
eps, err := NewExtendedPeerstore(context.Background(), logger, clock, ps, store)
require.NoError(t, err)
t.Cleanup(func() {
_ = eps.Close()
})
return eps
}
......@@ -3,23 +3,34 @@ package store
import (
"bytes"
"encoding/binary"
"time"
)
func serializeScoresV0(scores PeerScores) ([]byte, error) {
func serializeScoresV0(scores scoreRecord) ([]byte, error) {
var b bytes.Buffer
err := binary.Write(&b, binary.BigEndian, scores.Gossip)
err := binary.Write(&b, binary.BigEndian, scores.lastUpdate.UnixMilli())
if err != nil {
return nil, err
}
err = binary.Write(&b, binary.BigEndian, scores.Gossip)
if err != nil {
return nil, err
}
return b.Bytes(), nil
}
func deserializeScoresV0(data []byte) (PeerScores, error) {
var scores PeerScores
func deserializeScoresV0(data []byte) (scoreRecord, error) {
var scores scoreRecord
r := bytes.NewReader(data)
err := binary.Read(r, binary.BigEndian, &scores.Gossip)
var lastUpdate int64
err := binary.Read(r, binary.BigEndian, &lastUpdate)
if err != nil {
return scoreRecord{}, err
}
scores.lastUpdate = time.UnixMilli(lastUpdate)
err = binary.Read(r, binary.BigEndian, &scores.Gossip)
if err != nil {
return PeerScores{}, err
return scoreRecord{}, err
}
return scores, nil
}
package store
import (
"strconv"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
)
func TestRoundtripScoresV0(t *testing.T) {
scores := PeerScores{
Gossip: 1234.52382,
scores := scoreRecord{
PeerScores: PeerScores{Gossip: 1234.52382},
lastUpdate: time.UnixMilli(1923841),
}
data, err := serializeScoresV0(scores)
require.NoError(t, err)
......@@ -24,19 +27,20 @@ func TestRoundtripScoresV0(t *testing.T) {
// A new entry should be added to this test each time any fields are changed to ensure it can always be deserialized
func TestParseHistoricSerializationsV0(t *testing.T) {
tests := []struct {
name string
data []byte
expected PeerScores
expected scoreRecord
}{
{
name: "GossipOnly",
data: common.Hex2Bytes("40934A18644523F6"),
expected: PeerScores{Gossip: 1234.52382},
data: common.Hex2Bytes("00000000001D5B0140934A18644523F6"),
expected: scoreRecord{
PeerScores: PeerScores{Gossip: 1234.52382},
lastUpdate: time.UnixMilli(1923841),
},
},
}
for _, test := range tests {
for idx, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Run(strconv.Itoa(idx), func(t *testing.T) {
result, err := deserializeScoresV0(test.data)
require.NoError(t, err)
require.Equal(t, test.expected, result)
......
......@@ -29,7 +29,7 @@ DeployerWhitelist_Test:test_owner_succeeds() (gas: 7582)
DeployerWhitelist_Test:test_storageSlots_succeeds() (gas: 33395)
DisputeGameFactory_Test:test_owner_succeeds() (gas: 7582)
DisputeGameFactory_Test:test_setImplementation_notOwner_reverts() (gas: 11191)
DisputeGameFactory_Test:test_setImplementation_succeeds() (gas: 32635)
DisputeGameFactory_Test:test_setImplementation_succeeds() (gas: 38765)
DisputeGameFactory_Test:test_transferOwnership_notOwner_reverts() (gas: 10979)
DisputeGameFactory_Test:test_transferOwnership_succeeds() (gas: 13180)
FeeVault_Test:test_constructor_succeeds() (gas: 10736)
......
......@@ -13,7 +13,9 @@ import { IDisputeGameFactory } from "./IDisputeGameFactory.sol";
* @notice The Bond Manager serves as an escrow for permissionless output proposal bonds.
*/
contract BondManager {
// The Bond Type
/**
* @notice The Bond Type
*/
struct Bond {
address owner;
uint256 expiration;
......@@ -58,6 +60,13 @@ contract BondManager {
*/
IDisputeGameFactory public immutable DISPUTE_GAME_FACTORY;
/**
* @notice Amount of gas used to transfer ether when splitting the bond.
* This is a reasonable amount of gas for a transfer, even to a smart contract.
* The number of participants is bound of by the block gas limit.
*/
uint256 private constant TRANSFER_GAS = 30_000;
/**
* @notice Instantiates the bond maanger with the registered dispute game factory.
* @param _disputeGameFactory is the dispute game factory.
......@@ -147,13 +156,14 @@ contract BondManager {
uint256 len = _claimRecipients.length;
uint256 proportionalAmount = b.amount / len;
for (uint256 i = 0; i < len; i++) {
bool success = SafeCall.send(
payable(_claimRecipients[i]),
gasleft() / len,
proportionalAmount
);
require(success, "BondManager: Failed to send Ether.");
// Send the proportional amount to each recipient. Do not revert if a send fails as that
// will prevent other recipients from receiving their share.
for (uint256 i; i < len; i++) {
SafeCall.send({
_target: payable(_claimRecipients[i]),
_gas: TRANSFER_GAS,
_value: proportionalAmount
});
}
}
......
......@@ -113,6 +113,7 @@ contract DisputeGameFactory is Ownable, IDisputeGameFactory {
*/
function setImplementation(GameType gameType, IDisputeGame impl) external onlyOwner {
gameImpls[gameType] = impl;
emit ImplementationSet(address(impl), gameType);
}
/**
......
......@@ -23,6 +23,13 @@ interface IDisputeGameFactory {
Claim indexed rootClaim
);
/**
* @notice Emitted when a new game implementation added to the factory
* @param impl The implementation contract for the given `GameType`.
* @param gameType The type of the DisputeGame.
*/
event ImplementationSet(address indexed impl, GameType indexed gameType);
/**
* @notice `games` queries an internal a mapping that maps the hash of
* `gameType ++ rootClaim ++ extraData` to the deployed `DisputeGame` clone.
......
......@@ -18,6 +18,8 @@ contract DisputeGameFactory_Test is Test {
Claim indexed rootClaim
);
event ImplementationSet(address indexed impl, GameType indexed gameType);
function setUp() public {
factory = new DisputeGameFactory(address(this));
fakeClone = new FakeClone();
......@@ -105,6 +107,9 @@ contract DisputeGameFactory_Test is Test {
// There should be no implementation for the `GameType.FAULT` enum value, it has not been set.
assertEq(address(factory.gameImpls(GameType.FAULT)), address(0));
vm.expectEmit(true, true, true, true, address(factory));
emit ImplementationSet(address(1), GameType.FAULT);
// Set the implementation for the `GameType.FAULT` enum value.
factory.setImplementation(GameType.FAULT, IDisputeGame(address(1)));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment