Commit f5f57c19 authored by Axel Kingsley's avatar Axel Kingsley

Add Metadata Store to Extended Peerstore

parent 6e371b4b
......@@ -27,6 +27,7 @@ import (
"github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/rollup"
)
......@@ -355,10 +356,22 @@ func (n *NodeP2P) DiscoveryProcess(ctx context.Context, log log.Logger, cfg *rol
if err != nil {
continue
}
// record metadata to the peerstore if it is an extended peerstore
if eps, ok := pstore.(store.ExtendedPeerstore); ok {
_, err := eps.SetPeerMetadata(info.ID, store.PeerMetadata{
ENR: found.String(),
OPStackID: dat.chainID,
})
if err != nil {
log.Warn("failed to set peer metadata", "peer", info.ID, "err", err)
}
}
// We add the addresses to the peerstore, and update the address TTL.
//After that we stop using the address, assuming it may not be valid anymore (until we rediscover the node)
pstore.AddAddrs(info.ID, info.Addrs, discoveredAddrTTL)
_ = pstore.AddPubKey(info.ID, pub)
// Tag the peer, we'd rather have the connection manager prune away old peers,
// or peers on different chains, or anyone we have not seen via discovery.
// There is no tag score decay yet, so just set it to 42.
......
......@@ -25,6 +25,7 @@ import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum-optimism/optimism/op-node/metrics"
"github.com/ethereum-optimism/optimism/op-node/p2p/store"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testlog"
......@@ -333,6 +334,23 @@ func TestDiscovery(t *testing.T) {
peersOfB = append(peersOfB, c.RemotePeer())
}
}
// For each node, check that they have recorded metadata about the other nodes during discovery
for _, n1 := range []*NodeP2P{nodeA, nodeB, nodeC} {
eps, ok := n1.Host().Peerstore().(store.ExtendedPeerstore)
require.True(t, ok)
for _, n2 := range []*NodeP2P{nodeA, nodeB, nodeC} {
if n1 == n2 {
continue
}
md, err := eps.GetPeerMetadata(n2.Host().ID())
require.NoError(t, err)
// we don't scrutinize the ENR itself, just that it exists
require.NotEmpty(t, md.ENR)
require.Equal(t, uint64(901), md.OPStackID)
}
}
}
// Most tests should use mocknets instead of using the actual local host network
......
......@@ -115,6 +115,10 @@ func dumpPeer(id peer.ID, nw network.Network, pstore peerstore.Peerstore, connMg
if dat, err := eps.GetPeerScores(id); err == nil {
info.PeerScores = dat
}
if md, err := eps.GetPeerMetadata(id); err == nil {
info.ENR = md.ENR
info.ChainID = md.OPStackID
}
}
if dat, err := pstore.Get(id, "ProtocolVersion"); err == nil {
protocolVersion, ok := dat.(string)
......@@ -128,12 +132,6 @@ func dumpPeer(id peer.ID, nw network.Network, pstore peerstore.Peerstore, connMg
info.UserAgent = agentVersion
}
}
if dat, err := pstore.Get(id, "ENR"); err == nil {
enodeData, ok := dat.(*enode.Node)
if ok {
info.ENR = enodeData.String()
}
}
// include the /p2p/ address component in all of the addresses for convenience of the API user.
p2pAddrs, err := peer.AddrInfoToP2pAddrs(&peer.AddrInfo{ID: id, Addrs: pstore.Addrs(id)})
if err == nil {
......@@ -152,12 +150,6 @@ func dumpPeer(id peer.ID, nw network.Network, pstore peerstore.Peerstore, connMg
info.Direction = c.Stat().Direction
break
}
if dat, err := pstore.Get(id, "optimismChainID"); err == nil {
chID, ok := dat.(uint64)
if ok {
info.ChainID = chID
}
}
info.Latency = pstore.LatencyEWMA(id)
if connMgr != nil {
info.Protected = connMgr.IsProtected(id, "")
......
......@@ -18,6 +18,7 @@ type extendedStore struct {
*scoreBook
*peerBanBook
*ipBanBook
*metadataBook
}
func NewExtendedPeerstore(ctx context.Context, logger log.Logger, clock clock.Clock, ps peerstore.Peerstore, store ds.Batching, scoreRetention time.Duration) (ExtendedPeerstore, error) {
......@@ -40,17 +41,26 @@ func NewExtendedPeerstore(ctx context.Context, logger log.Logger, clock clock.Cl
return nil, fmt.Errorf("create IP ban book: %w", err)
}
ib.startGC()
md, err := newMetadataBook(ctx, logger, clock, store)
if err != nil {
return nil, fmt.Errorf("create metadata book: %w", err)
}
md.startGC()
return &extendedStore{
Peerstore: ps,
CertifiedAddrBook: cab,
scoreBook: sb,
peerBanBook: pb,
ipBanBook: ib,
metadataBook: md,
}, nil
}
func (s *extendedStore) Close() error {
s.scoreBook.Close()
s.peerBanBook.Close()
s.ipBanBook.Close()
s.metadataBook.Close()
return s.Peerstore.Close()
}
......
......@@ -120,6 +120,13 @@ type IPBanStore interface {
GetIPBanExpiration(ip net.IP) (time.Time, error)
}
type MetadataStore interface {
// SetPeerMetadata sets the metadata for the specified peer
SetPeerMetadata(id peer.ID, md PeerMetadata) (PeerMetadata, error)
// GetPeerMetadata returns the metadata for the specified peer
GetPeerMetadata(id peer.ID) (PeerMetadata, error)
}
// ExtendedPeerstore defines a type-safe API to work with additional peer metadata based on a libp2p peerstore.Peerstore
type ExtendedPeerstore interface {
peerstore.Peerstore
......@@ -127,4 +134,5 @@ type ExtendedPeerstore interface {
peerstore.CertifiedAddrBook
PeerBanStore
IPBanStore
MetadataStore
}
package store
import (
"context"
"encoding/json"
"sync/atomic"
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/log"
ds "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p/core/peer"
)
const (
mdCacheSize = 100
mdRecordExpiration = time.Hour * 24 * 7
)
var metadataBase = ds.NewKey("/peers/md")
// LastUpdate requires atomic update operations. Use the helper functions SetLastUpdated and LastUpdated to modify and access this field.
type metadataRecord struct {
LastUpdate int64 `json:"lastUpdate"` // unix timestamp in seconds
PeerMetadata PeerMetadata `json:"peerMetadata"`
}
type PeerMetadata struct {
ENR string `json:"enr"`
OPStackID uint64 `json:"opStackID"`
}
func (m *metadataRecord) SetLastUpdated(t time.Time) {
atomic.StoreInt64(&m.LastUpdate, t.Unix())
}
func (m *metadataRecord) LastUpdated() time.Time {
return time.Unix(atomic.LoadInt64(&m.LastUpdate), 0)
}
func (m *metadataRecord) MarshalBinary() (data []byte, err error) {
return json.Marshal(m)
}
func (m *metadataRecord) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, m)
}
type metadataBook struct {
book *recordsBook[peer.ID, *metadataRecord]
}
func newMetadataRecord() *metadataRecord {
return new(metadataRecord)
}
func newMetadataBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*metadataBook, error) {
book, err := newRecordsBook[peer.ID, *metadataRecord](ctx, logger, clock, store, mdCacheSize, mdRecordExpiration, metadataBase, newMetadataRecord, peerIDKey)
if err != nil {
return nil, err
}
return &metadataBook{book: book}, nil
}
func (m *metadataBook) startGC() {
m.book.startGC()
}
func (m *metadataBook) GetPeerMetadata(id peer.ID) (PeerMetadata, error) {
record, err := m.book.getRecord(id)
// If the record is not found, return an empty PeerMetadata
if err == UnknownRecordErr {
return PeerMetadata{}, nil
}
if err != nil {
return PeerMetadata{}, err
}
return record.PeerMetadata, nil
}
// Apply simply overwrites the record with the new one.
// presently, metadata is only collected during peering, so this is fine.
// if in the future this data can be updated or expanded, this function will need to be updated.
func (md *metadataRecord) Apply(rec *metadataRecord) {
*rec = *md
}
func (m *metadataBook) SetPeerMetadata(id peer.ID, md PeerMetadata) (PeerMetadata, error) {
rec := newMetadataRecord()
rec.PeerMetadata = md
rec.SetLastUpdated(m.book.clock.Now())
v, err := m.book.SetRecord(id, rec)
return v.PeerMetadata, err
}
func (m *metadataBook) Close() {
m.book.Close()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment