Commit 45b81a2b authored by OptimismBot's avatar OptimismBot Committed by GitHub

Merge pull request #5777 from ethereum-optimism/p2p-ban-utils

op-node: persist p2p bans with expiration time, update ban-expiry gater
parents 0ed3f67c e0a1ed61
......@@ -2,15 +2,17 @@ package gating
import (
"errors"
"net"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-service/clock"
)
type UnbanMetrics interface {
......@@ -18,12 +20,10 @@ type UnbanMetrics interface {
RecordIPUnban()
}
var UnknownExpiry = errors.New("unknown ban expiry")
//go:generate mockery --name ExpiryStore --output mocks/ --with-expecter=true
type ExpiryStore interface {
PeerBanExpiry(id peer.ID) (time.Time, error)
IPBanExpiry(ip net.IP) (time.Time, error)
store.IPBanStore
store.PeerBanStore
}
// ExpiryConnectionGater enhances a BlockingConnectionGater by implementing ban-expiration
......@@ -47,11 +47,11 @@ func AddBanExpiry(gater BlockingConnectionGater, store ExpiryStore, log log.Logg
func (g *ExpiryConnectionGater) peerBanExpiryCheck(p peer.ID) (allow bool) {
// if the peer is blocked, check if it's time to unblock
expiry, err := g.store.PeerBanExpiry(p)
expiry, err := g.store.GetPeerBanExpiration(p)
if errors.Is(err, store.UnknownBanErr) {
return true // peer is allowed if it has not been banned
}
if err != nil {
if errors.Is(err, UnknownExpiry) {
return false // peer is permanently banned if no expiry time is set.
}
g.log.Warn("failed to load peer-ban expiry time", "peer_id", p, "err", err)
return false
}
......@@ -59,7 +59,7 @@ func (g *ExpiryConnectionGater) peerBanExpiryCheck(p peer.ID) (allow bool) {
return false
}
g.log.Info("peer-ban expired, unbanning peer", "peer_id", p, "expiry", expiry)
if err := g.BlockingConnectionGater.UnblockPeer(p); err != nil {
if err := g.store.SetPeerBanExpiration(p, time.Time{}); err != nil {
g.log.Warn("failed to unban peer", "peer_id", p, "err", err)
return false // if we ignored the error, then the inner connection-gater would drop them
}
......@@ -73,18 +73,12 @@ func (g *ExpiryConnectionGater) addrBanExpiryCheck(ma multiaddr.Multiaddr) (allo
g.log.Error("tried to check multi-addr with bad IP", "addr", ma)
return false
}
// Check if it's a subnet-wide ban first. Subnet-bans do not expire.
for _, ipnet := range g.BlockingConnectionGater.ListBlockedSubnets() {
if ipnet.Contains(ip) {
return false // peer is still in banned subnet
}
}
// if just the IP is blocked, check if it's time to unblock
expiry, err := g.store.IPBanExpiry(ip)
expiry, err := g.store.GetIPBanExpiration(ip)
if errors.Is(err, store.UnknownBanErr) {
return true // IP is allowed if it has not been banned
}
if err != nil {
if errors.Is(err, UnknownExpiry) {
return false // IP is permanently banned if no expiry time is set.
}
g.log.Warn("failed to load IP-ban expiry time", "ip", ip, "err", err)
return false
}
......@@ -92,7 +86,7 @@ func (g *ExpiryConnectionGater) addrBanExpiryCheck(ma multiaddr.Multiaddr) (allo
return false
}
g.log.Info("IP-ban expired, unbanning IP", "ip", ip, "expiry", expiry)
if err := g.BlockingConnectionGater.UnblockAddr(ip); err != nil {
if err := g.store.SetIPBanExpiration(ip, time.Time{}); err != nil {
g.log.Warn("failed to unban IP", "ip", ip, "err", err)
return false // if we ignored the error, then the inner connection-gater would drop them
}
......@@ -101,31 +95,24 @@ func (g *ExpiryConnectionGater) addrBanExpiryCheck(ma multiaddr.Multiaddr) (allo
}
func (g *ExpiryConnectionGater) InterceptPeerDial(p peer.ID) (allow bool) {
// if not allowed, and not expired, then do not allow the dial
return g.BlockingConnectionGater.InterceptPeerDial(p) || g.peerBanExpiryCheck(p)
if !g.BlockingConnectionGater.InterceptPeerDial(p) {
return false
}
return g.peerBanExpiryCheck(p)
}
func (g *ExpiryConnectionGater) InterceptAddrDial(id peer.ID, ma multiaddr.Multiaddr) (allow bool) {
if !g.BlockingConnectionGater.InterceptAddrDial(id, ma) {
// Check if it was intercepted because of a peer ban
if !g.BlockingConnectionGater.InterceptPeerDial(id) {
if !g.peerBanExpiryCheck(id) {
return false // peer is still peer-banned
}
if g.BlockingConnectionGater.InterceptAddrDial(id, ma) { // allow dial if peer-ban was everything
return true
}
}
// intercepted because of addr ban still, check if it is expired
if !g.addrBanExpiryCheck(ma) {
return false // peer is still addr-banned
}
return false
}
return true
return g.peerBanExpiryCheck(id) && g.addrBanExpiryCheck(ma)
}
func (g *ExpiryConnectionGater) InterceptAccept(mas network.ConnMultiaddrs) (allow bool) {
return g.BlockingConnectionGater.InterceptAccept(mas) || g.addrBanExpiryCheck(mas.RemoteMultiaddr())
if !g.BlockingConnectionGater.InterceptAccept(mas) {
return false
}
return g.addrBanExpiryCheck(mas.RemoteMultiaddr())
}
func (g *ExpiryConnectionGater) InterceptSecured(direction network.Direction, id peer.ID, mas network.ConnMultiaddrs) (allow bool) {
......@@ -133,7 +120,10 @@ func (g *ExpiryConnectionGater) InterceptSecured(direction network.Direction, id
if direction == network.DirOutbound {
return true
}
if !g.BlockingConnectionGater.InterceptSecured(direction, id, mas) {
return false
}
// InterceptSecured is called after InterceptAccept, we already checked the addrs.
// This leaves just the peer-ID expiry to check on inbound connections.
return g.BlockingConnectionGater.InterceptSecured(direction, id, mas) || g.peerBanExpiryCheck(id)
return g.peerBanExpiryCheck(id)
}
This diff is collapsed.
......@@ -25,8 +25,8 @@ func (_m *ExpiryStore) EXPECT() *ExpiryStore_Expecter {
return &ExpiryStore_Expecter{mock: &_m.Mock}
}
// IPBanExpiry provides a mock function with given fields: ip
func (_m *ExpiryStore) IPBanExpiry(ip net.IP) (time.Time, error) {
// GetIPBanExpiration provides a mock function with given fields: ip
func (_m *ExpiryStore) GetIPBanExpiration(ip net.IP) (time.Time, error) {
ret := _m.Called(ip)
var r0 time.Time
......@@ -49,36 +49,36 @@ func (_m *ExpiryStore) IPBanExpiry(ip net.IP) (time.Time, error) {
return r0, r1
}
// ExpiryStore_IPBanExpiry_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IPBanExpiry'
type ExpiryStore_IPBanExpiry_Call struct {
// ExpiryStore_GetIPBanExpiration_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetIPBanExpiration'
type ExpiryStore_GetIPBanExpiration_Call struct {
*mock.Call
}
// IPBanExpiry is a helper method to define mock.On call
// GetIPBanExpiration is a helper method to define mock.On call
// - ip net.IP
func (_e *ExpiryStore_Expecter) IPBanExpiry(ip interface{}) *ExpiryStore_IPBanExpiry_Call {
return &ExpiryStore_IPBanExpiry_Call{Call: _e.mock.On("IPBanExpiry", ip)}
func (_e *ExpiryStore_Expecter) GetIPBanExpiration(ip interface{}) *ExpiryStore_GetIPBanExpiration_Call {
return &ExpiryStore_GetIPBanExpiration_Call{Call: _e.mock.On("GetIPBanExpiration", ip)}
}
func (_c *ExpiryStore_IPBanExpiry_Call) Run(run func(ip net.IP)) *ExpiryStore_IPBanExpiry_Call {
func (_c *ExpiryStore_GetIPBanExpiration_Call) Run(run func(ip net.IP)) *ExpiryStore_GetIPBanExpiration_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(net.IP))
})
return _c
}
func (_c *ExpiryStore_IPBanExpiry_Call) Return(_a0 time.Time, _a1 error) *ExpiryStore_IPBanExpiry_Call {
func (_c *ExpiryStore_GetIPBanExpiration_Call) Return(_a0 time.Time, _a1 error) *ExpiryStore_GetIPBanExpiration_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *ExpiryStore_IPBanExpiry_Call) RunAndReturn(run func(net.IP) (time.Time, error)) *ExpiryStore_IPBanExpiry_Call {
func (_c *ExpiryStore_GetIPBanExpiration_Call) RunAndReturn(run func(net.IP) (time.Time, error)) *ExpiryStore_GetIPBanExpiration_Call {
_c.Call.Return(run)
return _c
}
// PeerBanExpiry provides a mock function with given fields: id
func (_m *ExpiryStore) PeerBanExpiry(id peer.ID) (time.Time, error) {
// GetPeerBanExpiration provides a mock function with given fields: id
func (_m *ExpiryStore) GetPeerBanExpiration(id peer.ID) (time.Time, error) {
ret := _m.Called(id)
var r0 time.Time
......@@ -101,30 +101,116 @@ func (_m *ExpiryStore) PeerBanExpiry(id peer.ID) (time.Time, error) {
return r0, r1
}
// ExpiryStore_PeerBanExpiry_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PeerBanExpiry'
type ExpiryStore_PeerBanExpiry_Call struct {
// ExpiryStore_GetPeerBanExpiration_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetPeerBanExpiration'
type ExpiryStore_GetPeerBanExpiration_Call struct {
*mock.Call
}
// PeerBanExpiry is a helper method to define mock.On call
// GetPeerBanExpiration is a helper method to define mock.On call
// - id peer.ID
func (_e *ExpiryStore_Expecter) PeerBanExpiry(id interface{}) *ExpiryStore_PeerBanExpiry_Call {
return &ExpiryStore_PeerBanExpiry_Call{Call: _e.mock.On("PeerBanExpiry", id)}
func (_e *ExpiryStore_Expecter) GetPeerBanExpiration(id interface{}) *ExpiryStore_GetPeerBanExpiration_Call {
return &ExpiryStore_GetPeerBanExpiration_Call{Call: _e.mock.On("GetPeerBanExpiration", id)}
}
func (_c *ExpiryStore_PeerBanExpiry_Call) Run(run func(id peer.ID)) *ExpiryStore_PeerBanExpiry_Call {
func (_c *ExpiryStore_GetPeerBanExpiration_Call) Run(run func(id peer.ID)) *ExpiryStore_GetPeerBanExpiration_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(peer.ID))
})
return _c
}
func (_c *ExpiryStore_PeerBanExpiry_Call) Return(_a0 time.Time, _a1 error) *ExpiryStore_PeerBanExpiry_Call {
func (_c *ExpiryStore_GetPeerBanExpiration_Call) Return(_a0 time.Time, _a1 error) *ExpiryStore_GetPeerBanExpiration_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *ExpiryStore_PeerBanExpiry_Call) RunAndReturn(run func(peer.ID) (time.Time, error)) *ExpiryStore_PeerBanExpiry_Call {
func (_c *ExpiryStore_GetPeerBanExpiration_Call) RunAndReturn(run func(peer.ID) (time.Time, error)) *ExpiryStore_GetPeerBanExpiration_Call {
_c.Call.Return(run)
return _c
}
// SetIPBanExpiration provides a mock function with given fields: ip, expiry
func (_m *ExpiryStore) SetIPBanExpiration(ip net.IP, expiry time.Time) error {
ret := _m.Called(ip, expiry)
var r0 error
if rf, ok := ret.Get(0).(func(net.IP, time.Time) error); ok {
r0 = rf(ip, expiry)
} else {
r0 = ret.Error(0)
}
return r0
}
// ExpiryStore_SetIPBanExpiration_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetIPBanExpiration'
type ExpiryStore_SetIPBanExpiration_Call struct {
*mock.Call
}
// SetIPBanExpiration is a helper method to define mock.On call
// - ip net.IP
// - expiry time.Time
func (_e *ExpiryStore_Expecter) SetIPBanExpiration(ip interface{}, expiry interface{}) *ExpiryStore_SetIPBanExpiration_Call {
return &ExpiryStore_SetIPBanExpiration_Call{Call: _e.mock.On("SetIPBanExpiration", ip, expiry)}
}
func (_c *ExpiryStore_SetIPBanExpiration_Call) Run(run func(ip net.IP, expiry time.Time)) *ExpiryStore_SetIPBanExpiration_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(net.IP), args[1].(time.Time))
})
return _c
}
func (_c *ExpiryStore_SetIPBanExpiration_Call) Return(_a0 error) *ExpiryStore_SetIPBanExpiration_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *ExpiryStore_SetIPBanExpiration_Call) RunAndReturn(run func(net.IP, time.Time) error) *ExpiryStore_SetIPBanExpiration_Call {
_c.Call.Return(run)
return _c
}
// SetPeerBanExpiration provides a mock function with given fields: id, expiry
func (_m *ExpiryStore) SetPeerBanExpiration(id peer.ID, expiry time.Time) error {
ret := _m.Called(id, expiry)
var r0 error
if rf, ok := ret.Get(0).(func(peer.ID, time.Time) error); ok {
r0 = rf(id, expiry)
} else {
r0 = ret.Error(0)
}
return r0
}
// ExpiryStore_SetPeerBanExpiration_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetPeerBanExpiration'
type ExpiryStore_SetPeerBanExpiration_Call struct {
*mock.Call
}
// SetPeerBanExpiration is a helper method to define mock.On call
// - id peer.ID
// - expiry time.Time
func (_e *ExpiryStore_Expecter) SetPeerBanExpiration(id interface{}, expiry interface{}) *ExpiryStore_SetPeerBanExpiration_Call {
return &ExpiryStore_SetPeerBanExpiration_Call{Call: _e.mock.On("SetPeerBanExpiration", id, expiry)}
}
func (_c *ExpiryStore_SetPeerBanExpiration_Call) Run(run func(id peer.ID, expiry time.Time)) *ExpiryStore_SetPeerBanExpiration_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(peer.ID), args[1].(time.Time))
})
return _c
}
func (_c *ExpiryStore_SetPeerBanExpiration_Call) Return(_a0 error) *ExpiryStore_SetPeerBanExpiration_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *ExpiryStore_SetPeerBanExpiration_Call) RunAndReturn(run func(peer.ID, time.Time) error) *ExpiryStore_SetPeerBanExpiration_Call {
_c.Call.Return(run)
return _c
}
......
......@@ -4,7 +4,12 @@ import (
"context"
"errors"
"fmt"
"net"
"strconv"
"time"
"github.com/libp2p/go-libp2p/core/peer"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/metrics"
......@@ -29,6 +34,8 @@ type NodeP2P struct {
gater gating.BlockingConnectionGater // p2p gater, to ban/unban peers with, may be nil even with p2p enabled
scorer Scorer // writes score-updates to the peerstore and keeps metrics of score changes
connMgr connmgr.ConnManager // p2p conn manager, to keep a reliable number of peers, may be nil even with p2p enabled
store store.ExtendedPeerstore // peerstore of host, with extra bindings for scoring and banning
log log.Logger
// the below components are all optional, and may be nil. They require the host to not be nil.
dv5Local *enode.LocalNode // p2p discovery identity
dv5Udp *discover.UDPv5 // p2p discovery service
......@@ -61,6 +68,8 @@ func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.
func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer) error {
bwc := p2pmetrics.NewBandwidthCounter()
n.log = log
var err error
// nil if disabled.
n.host, err = setup.Host(log, bwc, metrics)
......@@ -105,6 +114,7 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l
if !ok {
return fmt.Errorf("cannot init without extended peerstore: %w", err)
}
n.store = eps
n.scorer = NewScorer(rollupCfg, eps, metrics, setup.PeerBandScorer(), log)
n.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(_ network.Network, conn network.Conn) {
......@@ -184,6 +194,36 @@ func (n *NodeP2P) ConnectionManager() connmgr.ConnManager {
return n.connMgr
}
func (n *NodeP2P) BanPeer(id peer.ID, expiration time.Time) error {
if err := n.store.SetPeerBanExpiration(id, expiration); err != nil {
return fmt.Errorf("failed to set peer ban expiry: %w", err)
}
if err := n.host.Network().ClosePeer(id); err != nil {
return fmt.Errorf("failed to close peer connection: %w", err)
}
return nil
}
func (n *NodeP2P) BanIP(ip net.IP, expiration time.Time) error {
if err := n.store.SetIPBanExpiration(ip, expiration); err != nil {
return fmt.Errorf("failed to set IP ban expiry: %w", err)
}
// kick all peers that match this IP
for _, conn := range n.host.Network().Conns() {
addr := conn.RemoteMultiaddr()
remoteIP, err := manet.ToIP(addr)
if err != nil {
continue
}
if remoteIP.Equal(ip) {
if err := conn.Close(); err != nil {
n.log.Error("failed to close connection to peer with banned IP", "peer", conn.RemotePeer(), "ip", ip)
}
}
}
return nil
}
func (n *NodeP2P) Close() error {
var result *multierror.Error
if n.dv5Udp != nil {
......
......@@ -15,6 +15,8 @@ type extendedStore struct {
peerstore.Peerstore
peerstore.CertifiedAddrBook
*scoreBook
*peerBanBook
*ipBanBook
}
func NewExtendedPeerstore(ctx context.Context, logger log.Logger, clock clock.Clock, ps peerstore.Peerstore, store ds.Batching) (ExtendedPeerstore, error) {
......@@ -27,10 +29,22 @@ func NewExtendedPeerstore(ctx context.Context, logger log.Logger, clock clock.Cl
return nil, fmt.Errorf("create scorebook: %w", err)
}
sb.startGC()
pb, err := newPeerBanBook(ctx, logger, clock, store)
if err != nil {
return nil, fmt.Errorf("create peer ban book: %w", err)
}
pb.startGC()
ib, err := newIPBanBook(ctx, logger, clock, store)
if err != nil {
return nil, fmt.Errorf("create IP ban book: %w", err)
}
ib.startGC()
return &extendedStore{
Peerstore: ps,
CertifiedAddrBook: cab,
scoreBook: sb,
peerBanBook: pb,
ipBanBook: ib,
}, nil
}
......
package store
import (
"errors"
"net"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
)
......@@ -44,9 +48,29 @@ type ScoreDiff interface {
Apply(score *scoreRecord)
}
var UnknownBanErr = errors.New("unknown ban")
type PeerBanStore interface {
// SetPeerBanExpiration create the peer ban with expiration time.
// If expiry == time.Time{} then the ban is deleted.
SetPeerBanExpiration(id peer.ID, expiry time.Time) error
// GetPeerBanExpiration gets the peer ban expiration time, or UnknownBanErr error if none exists.
GetPeerBanExpiration(id peer.ID) (time.Time, error)
}
type IPBanStore interface {
// SetIPBanExpiration create the IP ban with expiration time.
// If expiry == time.Time{} then the ban is deleted.
SetIPBanExpiration(ip net.IP, expiry time.Time) error
// GetIPBanExpiration gets the IP ban expiration time, or UnknownBanErr error if none exists.
GetIPBanExpiration(ip net.IP) (time.Time, error)
}
// ExtendedPeerstore defines a type-safe API to work with additional peer metadata based on a libp2p peerstore.Peerstore
type ExtendedPeerstore interface {
peerstore.Peerstore
ScoreDatastore
peerstore.CertifiedAddrBook
PeerBanStore
IPBanStore
}
package store
import (
"context"
"encoding/json"
"net"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
ds "github.com/ipfs/go-datastore"
)
const (
ipBanCacheSize = 100
ipBanRecordExpiration = time.Hour * 24 * 7
)
var ipBanExpirationsBase = ds.NewKey("/ips/ban_expiration")
type ipBanRecord struct {
Expiry int64 `json:"expiry"` // unix timestamp in seconds
LastUpdate int64 `json:"lastUpdate"` // unix timestamp in seconds
}
func (s *ipBanRecord) SetLastUpdated(t time.Time) {
s.LastUpdate = t.Unix()
}
func (s *ipBanRecord) LastUpdated() time.Time {
return time.Unix(s.LastUpdate, 0)
}
func (s *ipBanRecord) MarshalBinary() (data []byte, err error) {
return json.Marshal(s)
}
func (s *ipBanRecord) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, s)
}
type ipBanUpdate time.Time
func (p ipBanUpdate) Apply(rec *ipBanRecord) {
rec.Expiry = time.Time(p).Unix()
}
type ipBanBook struct {
book *recordsBook[string, *ipBanRecord]
}
func newIPBanRecord() *ipBanRecord {
return new(ipBanRecord)
}
func ipKey(ip string) ds.Key {
return ds.NewKey(ip)
}
func newIPBanBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*ipBanBook, error) {
book, err := newRecordsBook[string, *ipBanRecord](ctx, logger, clock, store, ipBanCacheSize, ipBanRecordExpiration, ipBanExpirationsBase, newIPBanRecord, ipKey)
if err != nil {
return nil, err
}
return &ipBanBook{book: book}, nil
}
func (d *ipBanBook) startGC() {
d.book.startGC()
}
func (d *ipBanBook) GetIPBanExpiration(ip net.IP) (time.Time, error) {
rec, err := d.book.getRecord(ip.To16().String())
if err == UnknownRecordErr {
return time.Time{}, UnknownBanErr
}
if err != nil {
return time.Time{}, err
}
return time.Unix(rec.Expiry, 0), nil
}
func (d *ipBanBook) SetIPBanExpiration(ip net.IP, expirationTime time.Time) error {
if expirationTime == (time.Time{}) {
return d.book.deleteRecord(ip.To16().String())
}
return d.book.SetRecord(ip.To16().String(), ipBanUpdate(expirationTime))
}
func (d *ipBanBook) Close() {
d.book.Close()
}
package store
import (
"context"
"net"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/require"
)
func TestGetUnknownIPBan(t *testing.T) {
book := createMemoryIPBanBook(t)
defer book.Close()
exp, err := book.GetIPBanExpiration(net.IPv4(1, 2, 3, 4))
require.Same(t, UnknownBanErr, err)
require.Equal(t, time.Time{}, exp)
}
func TestRoundTripIPBan(t *testing.T) {
book := createMemoryIPBanBook(t)
defer book.Close()
expiry := time.Unix(2484924, 0)
ip := net.IPv4(1, 2, 3, 4)
require.NoError(t, book.SetIPBanExpiration(ip, expiry))
result, err := book.GetIPBanExpiration(ip)
require.NoError(t, err)
require.Equal(t, result, expiry)
}
func createMemoryIPBanBook(t *testing.T) *ipBanBook {
store := sync.MutexWrap(ds.NewMapDatastore())
logger := testlog.Logger(t, log.LvlInfo)
c := clock.NewDeterministicClock(time.UnixMilli(100))
book, err := newIPBanBook(context.Background(), logger, c, store)
require.NoError(t, err)
return book
}
package store
import (
"context"
"encoding/json"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
ds "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p/core/peer"
)
const (
peerBanCacheSize = 100
peerBanRecordExpiration = time.Hour * 24 * 7
)
var peerBanExpirationsBase = ds.NewKey("/peers/ban_expiration")
type peerBanRecord struct {
Expiry int64 `json:"expiry"` // unix timestamp in seconds
LastUpdate int64 `json:"lastUpdate"` // unix timestamp in seconds
}
func (s *peerBanRecord) SetLastUpdated(t time.Time) {
s.LastUpdate = t.Unix()
}
func (s *peerBanRecord) LastUpdated() time.Time {
return time.Unix(s.LastUpdate, 0)
}
func (s *peerBanRecord) MarshalBinary() (data []byte, err error) {
return json.Marshal(s)
}
func (s *peerBanRecord) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, s)
}
type peerBanUpdate time.Time
func (p peerBanUpdate) Apply(rec *peerBanRecord) {
rec.Expiry = time.Time(p).Unix()
}
type peerBanBook struct {
book *recordsBook[peer.ID, *peerBanRecord]
}
func newPeerBanRecord() *peerBanRecord {
return new(peerBanRecord)
}
func newPeerBanBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*peerBanBook, error) {
book, err := newRecordsBook[peer.ID, *peerBanRecord](ctx, logger, clock, store, peerBanCacheSize, peerBanRecordExpiration, peerBanExpirationsBase, newPeerBanRecord, peerIDKey)
if err != nil {
return nil, err
}
return &peerBanBook{book: book}, nil
}
func (d *peerBanBook) startGC() {
d.book.startGC()
}
func (d *peerBanBook) GetPeerBanExpiration(id peer.ID) (time.Time, error) {
rec, err := d.book.getRecord(id)
if err == UnknownRecordErr {
return time.Time{}, UnknownBanErr
}
if err != nil {
return time.Time{}, err
}
return time.Unix(rec.Expiry, 0), nil
}
func (d *peerBanBook) SetPeerBanExpiration(id peer.ID, expirationTime time.Time) error {
if expirationTime == (time.Time{}) {
return d.book.deleteRecord(id)
}
return d.book.SetRecord(id, peerBanUpdate(expirationTime))
}
func (d *peerBanBook) Close() {
d.book.Close()
}
package store
import (
"context"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/require"
)
func TestGetUnknownPeerBan(t *testing.T) {
book := createMemoryPeerBanBook(t)
defer book.Close()
exp, err := book.GetPeerBanExpiration("a")
require.Same(t, UnknownBanErr, err)
require.Equal(t, time.Time{}, exp)
}
func TestRoundTripPeerBan(t *testing.T) {
book := createMemoryPeerBanBook(t)
defer book.Close()
expiry := time.Unix(2484924, 0)
require.NoError(t, book.SetPeerBanExpiration("a", expiry))
result, err := book.GetPeerBanExpiration("a")
require.NoError(t, err)
require.Equal(t, result, expiry)
}
func createMemoryPeerBanBook(t *testing.T) *peerBanBook {
store := sync.MutexWrap(ds.NewMapDatastore())
logger := testlog.Logger(t, log.LvlInfo)
c := clock.NewDeterministicClock(time.UnixMilli(100))
book, err := newPeerBanBook(context.Background(), logger, c, store)
require.NoError(t, err)
return book
}
package store
import (
"context"
"encoding"
"errors"
"fmt"
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
lru "github.com/hashicorp/golang-lru/v2"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
)
const (
maxPruneBatchSize = 20
)
type record interface {
SetLastUpdated(time.Time)
LastUpdated() time.Time
encoding.BinaryMarshaler
encoding.BinaryUnmarshaler
}
type recordDiff[V record] interface {
Apply(v V)
}
var UnknownRecordErr = errors.New("unknown record")
// recordsBook is a generic K-V store to embed in the extended-peerstore.
// It prunes old entries to keep the store small.
// The recordsBook can be wrapped to customize typing more.
type recordsBook[K ~string, V record] struct {
ctx context.Context
cancelFn context.CancelFunc
clock clock.Clock
log log.Logger
bgTasks sync.WaitGroup
store ds.Batching
cache *lru.Cache[K, V]
newRecord func() V
dsBaseKey ds.Key
dsEntryKey func(K) ds.Key
recordExpiry time.Duration // pruning is disabled if this is 0
sync.RWMutex
}
func newRecordsBook[K ~string, V record](ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching, cacheSize int, recordExpiry time.Duration,
dsBaseKey ds.Key, newRecord func() V, dsEntryKey func(K) ds.Key) (*recordsBook[K, V], error) {
cache, err := lru.New[K, V](cacheSize)
if err != nil {
return nil, fmt.Errorf("failed to create records cache: %w", err)
}
ctx, cancelFn := context.WithCancel(ctx)
book := &recordsBook[K, V]{
ctx: ctx,
cancelFn: cancelFn,
clock: clock,
log: logger,
store: store,
cache: cache,
newRecord: newRecord,
dsBaseKey: dsBaseKey,
dsEntryKey: dsEntryKey,
recordExpiry: recordExpiry,
}
return book, nil
}
func (d *recordsBook[K, V]) startGC() {
if d.recordExpiry == 0 {
return
}
startGc(d.ctx, d.log, d.clock, &d.bgTasks, d.prune)
}
func (d *recordsBook[K, V]) GetRecord(key K) (V, error) {
d.RLock()
defer d.RUnlock()
rec, err := d.getRecord(key)
return rec, err
}
func (d *recordsBook[K, V]) dsKey(key K) ds.Key {
return d.dsBaseKey.Child(d.dsEntryKey(key))
}
func (d *recordsBook[K, V]) deleteRecord(key K) error {
d.cache.Remove(key)
err := d.store.Delete(d.ctx, d.dsKey(key))
if errors.Is(err, ds.ErrNotFound) {
return nil
}
return fmt.Errorf("failed to delete entry with key %v: %w", key, err)
}
func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) {
if val, ok := d.cache.Get(key); ok {
return val, nil
}
data, err := d.store.Get(d.ctx, d.dsKey(key))
if errors.Is(err, ds.ErrNotFound) {
return v, UnknownRecordErr
} else if err != nil {
return v, fmt.Errorf("failed to load value of key %v: %w", key, err)
}
v = d.newRecord()
if err := v.UnmarshalBinary(data); err != nil {
return v, fmt.Errorf("invalid value for key %v: %w", key, err)
}
d.cache.Add(key, v)
return v, nil
}
func (d *recordsBook[K, V]) SetRecord(key K, diff recordDiff[V]) error {
d.Lock()
defer d.Unlock()
rec, err := d.getRecord(key)
if err == UnknownRecordErr { // instantiate new record if it does not exist yet
rec = d.newRecord()
} else if err != nil {
return err
}
rec.SetLastUpdated(d.clock.Now())
diff.Apply(rec)
data, err := rec.MarshalBinary()
if err != nil {
return fmt.Errorf("failed to encode record for key %v: %w", key, err)
}
err = d.store.Put(d.ctx, d.dsKey(key), data)
if err != nil {
return fmt.Errorf("storing updated record for key %v: %w", key, err)
}
d.cache.Add(key, rec)
return nil
}
// prune deletes entries from the store that are older than the configured prune expiration.
// Note that the expiry period is not a strict TTL. Entries that are eligible for deletion may still be present
// either because the prune function hasn't yet run or because they are still preserved in the in-memory cache after
// having been deleted from the database.
func (d *recordsBook[K, V]) prune() error {
results, err := d.store.Query(d.ctx, query.Query{
Prefix: d.dsBaseKey.String(),
})
if err != nil {
return err
}
pending := 0
batch, err := d.store.Batch(d.ctx)
if err != nil {
return err
}
for result := range results.Next() {
// Bail out if the context is done
select {
case <-d.ctx.Done():
return d.ctx.Err()
default:
}
v := d.newRecord()
if err := v.UnmarshalBinary(result.Value); err != nil {
return err
}
if v.LastUpdated().Add(d.recordExpiry).Before(d.clock.Now()) {
if pending > maxPruneBatchSize {
if err := batch.Commit(d.ctx); err != nil {
return err
}
batch, err = d.store.Batch(d.ctx)
if err != nil {
return err
}
pending = 0
}
pending++
if err := batch.Delete(d.ctx, ds.NewKey(result.Key)); err != nil {
return err
}
}
}
if err := batch.Commit(d.ctx); err != nil {
return err
}
return nil
}
func (d *recordsBook[K, V]) Close() {
d.cancelFn()
d.bgTasks.Wait()
}
......@@ -2,25 +2,18 @@ package store
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
lru "github.com/hashicorp/golang-lru/v2"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-base32"
)
const (
scoreDataV0 = "0"
scoreCacheSize = 100
expiryPeriod = 24 * time.Hour
maxPruneBatchSize = 20
scoreCacheSize = 100
scoreRecordExpiryPeriod = 24 * time.Hour
)
var scoresBase = ds.NewKey("/peers/scores")
......@@ -30,143 +23,66 @@ type scoreRecord struct {
LastUpdate int64 `json:"lastUpdate"` // unix timestamp in seconds
}
type scoreBook struct {
ctx context.Context
cancelFn context.CancelFunc
clock clock.Clock
log log.Logger
bgTasks sync.WaitGroup
store ds.Batching
cache *lru.Cache[peer.ID, scoreRecord]
sync.RWMutex
func (s *scoreRecord) SetLastUpdated(t time.Time) {
s.LastUpdate = t.Unix()
}
func newScoreBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*scoreBook, error) {
cache, err := lru.New[peer.ID, scoreRecord](scoreCacheSize)
func (s *scoreRecord) LastUpdated() time.Time {
return time.Unix(s.LastUpdate, 0)
}
func (s *scoreRecord) MarshalBinary() (data []byte, err error) {
return serializeScoresV0(*s)
}
func (s *scoreRecord) UnmarshalBinary(data []byte) error {
r, err := deserializeScoresV0(data)
if err != nil {
return nil, fmt.Errorf("creating cache: %w", err)
return err
}
*s = r
return nil
}
ctx, cancelFn := context.WithCancel(ctx)
book := scoreBook{
ctx: ctx,
cancelFn: cancelFn,
clock: clock,
log: logger,
store: store,
cache: cache,
}
return &book, nil
type scoreBook struct {
book *recordsBook[peer.ID, *scoreRecord]
}
func (d *scoreBook) startGC() {
startGc(d.ctx, d.log, d.clock, &d.bgTasks, d.prune)
func newScoreRecord() *scoreRecord {
return new(scoreRecord)
}
func (d *scoreBook) GetPeerScores(id peer.ID) (PeerScores, error) {
d.RLock()
defer d.RUnlock()
record, err := d.getRecord(id)
if err != nil {
return PeerScores{}, nil
}
return record.PeerScores, err
func peerIDKey(id peer.ID) ds.Key {
return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(id)))
}
func (d *scoreBook) getRecord(id peer.ID) (scoreRecord, error) {
if scores, ok := d.cache.Get(id); ok {
return scores, nil
}
data, err := d.store.Get(d.ctx, scoreKey(id))
if errors.Is(err, ds.ErrNotFound) {
return scoreRecord{}, nil
} else if err != nil {
return scoreRecord{}, fmt.Errorf("load scores for peer %v: %w", id, err)
}
record, err := deserializeScoresV0(data)
func newScoreBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*scoreBook, error) {
book, err := newRecordsBook[peer.ID, *scoreRecord](ctx, logger, clock, store, scoreCacheSize, scoreRecordExpiryPeriod, scoresBase, newScoreRecord, peerIDKey)
if err != nil {
return scoreRecord{}, fmt.Errorf("invalid score data for peer %v: %w", id, err)
return nil, err
}
d.cache.Add(id, record)
return record, nil
return &scoreBook{book: book}, nil
}
func (d *scoreBook) SetScore(id peer.ID, diff ScoreDiff) error {
d.Lock()
defer d.Unlock()
scores, err := d.getRecord(id)
if err != nil {
return err
}
scores.LastUpdate = d.clock.Now().Unix()
diff.Apply(&scores)
data, err := serializeScoresV0(scores)
if err != nil {
return fmt.Errorf("encode scores for peer %v: %w", id, err)
}
err = d.store.Put(d.ctx, scoreKey(id), data)
if err != nil {
return fmt.Errorf("storing updated scores for peer %v: %w", id, err)
}
d.cache.Add(id, scores)
return nil
func (d *scoreBook) startGC() {
d.book.startGC()
}
// prune deletes entries from the store that are older than expiryPeriod.
// Note that the expiry period is not a strict TTL. Entries that are eligible for deletion may still be present
// either because the prune function hasn't yet run or because they are still preserved in the in-memory cache after
// having been deleted from the database.
func (d *scoreBook) prune() error {
results, err := d.store.Query(d.ctx, query.Query{
Prefix: scoresBase.String(),
})
if err != nil {
return err
func (d *scoreBook) GetPeerScores(id peer.ID) (PeerScores, error) {
record, err := d.book.getRecord(id)
if err == UnknownRecordErr {
return PeerScores{}, nil // return zeroed scores by default
}
pending := 0
batch, err := d.store.Batch(d.ctx)
if err != nil {
return err
}
for result := range results.Next() {
// Bail out if the context is done
select {
case <-d.ctx.Done():
return d.ctx.Err()
default:
}
record, err := deserializeScoresV0(result.Value)
if err != nil {
return err
}
if time.Unix(record.LastUpdate, 0).Add(expiryPeriod).Before(d.clock.Now()) {
if pending > maxPruneBatchSize {
if err := batch.Commit(d.ctx); err != nil {
return err
}
batch, err = d.store.Batch(d.ctx)
if err != nil {
return err
}
pending = 0
}
pending++
if err := batch.Delete(d.ctx, ds.NewKey(result.Key)); err != nil {
return err
}
}
}
if err := batch.Commit(d.ctx); err != nil {
return err
return PeerScores{}, err
}
return nil
return record.PeerScores, nil
}
func (d *scoreBook) Close() {
d.cancelFn()
d.bgTasks.Wait()
func (d *scoreBook) SetScore(id peer.ID, diff ScoreDiff) error {
return d.book.SetRecord(id, diff)
}
func scoreKey(id peer.ID) ds.Key {
return scoresBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(id))).ChildString(scoreDataV0)
func (d *scoreBook) Close() {
d.book.Close()
}
......@@ -110,19 +110,19 @@ func TestPrune(t *testing.T) {
require.True(t, hasScoreRecorded("dddd"))
elapsedTime := clock.Now().Sub(firstStore)
timeToFirstExpiry := expiryPeriod - elapsedTime
timeToFirstExpiry := book.book.recordExpiry - elapsedTime
// Advance time until the score for aaaa should be pruned.
clock.AdvanceTime(timeToFirstExpiry + 1)
require.NoError(t, book.prune())
require.NoError(t, book.book.prune())
// Clear the cache so reads have to come from the database
book.cache.Purge()
book.book.cache.Purge()
require.False(t, hasScoreRecorded("aaaa"), "should have pruned aaaa record")
// Advance time so cccc, dddd and the original bbbb entry should be pruned
clock.AdvanceTime(90 * time.Minute)
require.NoError(t, book.prune())
require.NoError(t, book.book.prune())
// Clear the cache so reads have to come from the database
book.cache.Purge()
book.book.cache.Purge()
require.False(t, hasScoreRecorded("cccc"), "should have pruned cccc record")
require.False(t, hasScoreRecorded("dddd"), "should have pruned cccc record")
......@@ -149,10 +149,10 @@ func TestPruneMultipleBatches(t *testing.T) {
for i := 0; i < peerCount; i++ {
require.NoError(t, book.SetScore(peer.ID(strconv.Itoa(i)), &GossipScores{Total: 123.45}))
}
clock.AdvanceTime(expiryPeriod + 1)
require.NoError(t, book.prune())
clock.AdvanceTime(book.book.recordExpiry + 1)
require.NoError(t, book.book.prune())
// Clear the cache so reads have to come from the database
book.cache.Purge()
book.book.cache.Purge()
for i := 0; i < peerCount; i++ {
require.Falsef(t, hasScoreRecorded(peer.ID(strconv.Itoa(i))), "Should prune record peer %v", i)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment