Commit a1cb4f0b authored by metacertain's avatar metacertain Committed by GitHub

fix: only in neighborhood replication (#2237)

parent 67a4212e
...@@ -322,6 +322,10 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo ...@@ -322,6 +322,10 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo
return nil, ErrWarmup return nil, ErrWarmup
} }
if !ps.topologyDriver.IsWithinDepth(ch.Address()) {
return nil, ErrNoPush
}
count := 0 count := 0
// Push the chunk to some peers in the neighborhood in parallel for replication. // Push the chunk to some peers in the neighborhood in parallel for replication.
// Any errors here should NOT impact the rest of the handler. // Any errors here should NOT impact the rest of the handler.
......
...@@ -51,6 +51,9 @@ var ( ...@@ -51,6 +51,9 @@ var (
defaultSigner = cryptomock.New(cryptomock.WithSignFunc(func([]byte) ([]byte, error) { defaultSigner = cryptomock.New(cryptomock.WithSignFunc(func([]byte) ([]byte, error) {
return nil, nil return nil, nil
})) }))
WithinDepthMock = mock.WithIsWithinFunc(func(addr swarm.Address) bool {
return true
})
) )
// TestPushClosest inserts a chunk as uploaded chunk in db. This triggers sending a chunk to the closest node // TestPushClosest inserts a chunk as uploaded chunk in db. This triggers sending a chunk to the closest node
...@@ -66,7 +69,7 @@ func TestPushClosest(t *testing.T) { ...@@ -66,7 +69,7 @@ func TestPushClosest(t *testing.T) {
// peer is the node responding to the chunk receipt message // peer is the node responding to the chunk receipt message
// mock should return ErrWantSelf since there's no one to forward to // mock should return ErrWantSelf since there's no one to forward to
psPeer, storerPeer, _, peerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf)) psPeer, storerPeer, _, peerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock)
defer storerPeer.Close() defer storerPeer.Close()
recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()), streamtest.WithBaseAddr(pivotNode)) recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()), streamtest.WithBaseAddr(pivotNode))
...@@ -129,23 +132,19 @@ func TestReplicateBeforeReceipt(t *testing.T) { ...@@ -129,23 +132,19 @@ func TestReplicateBeforeReceipt(t *testing.T) {
defer storerEmpty.Close() defer storerEmpty.Close()
emptyRecorder := streamtest.New(streamtest.WithProtocols(psEmpty.Protocol()), streamtest.WithBaseAddr(secondPeer)) emptyRecorder := streamtest.New(streamtest.WithProtocols(psEmpty.Protocol()), streamtest.WithBaseAddr(secondPeer))
wFunc := func(addr swarm.Address) bool {
return true
}
// node that is connected to closestPeer // node that is connected to closestPeer
// will receieve chunk from closestPeer // will receieve chunk from closestPeer
psSecond, storerSecond, _, secondAccounting := createPushSyncNode(t, secondPeer, defaultPrices, emptyRecorder, nil, defaultSigner, mock.WithPeers(emptyPeer), mock.WithIsWithinFunc(wFunc)) psSecond, storerSecond, _, secondAccounting := createPushSyncNode(t, secondPeer, defaultPrices, emptyRecorder, nil, defaultSigner, mock.WithPeers(emptyPeer), WithinDepthMock)
defer storerSecond.Close() defer storerSecond.Close()
secondRecorder := streamtest.New(streamtest.WithProtocols(psSecond.Protocol()), streamtest.WithBaseAddr(closestPeer)) secondRecorder := streamtest.New(streamtest.WithProtocols(psSecond.Protocol()), streamtest.WithBaseAddr(closestPeer))
psStorer, storerPeer, _, storerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, secondRecorder, nil, defaultSigner, mock.WithPeers(secondPeer), mock.WithClosestPeerErr(topology.ErrWantSelf)) psStorer, storerPeer, _, storerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, secondRecorder, nil, defaultSigner, mock.WithPeers(secondPeer), mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock)
defer storerPeer.Close() defer storerPeer.Close()
recorder := streamtest.New(streamtest.WithProtocols(psStorer.Protocol()), streamtest.WithBaseAddr(pivotNode)) recorder := streamtest.New(streamtest.WithProtocols(psStorer.Protocol()), streamtest.WithBaseAddr(pivotNode))
// pivot node needs the streamer since the chunk is intercepted by // pivot node needs the streamer since the chunk is intercepted by
// the chunk worker, then gets sent by opening a new stream // the chunk worker, then gets sent by opening a new stream
psPivot, storerPivot, _, pivotAccounting := createPushSyncNode(t, pivotNode, defaultPrices, recorder, nil, defaultSigner, mock.WithClosestPeer(closestPeer)) psPivot, storerPivot, _, pivotAccounting := createPushSyncNode(t, pivotNode, defaultPrices, recorder, nil, defaultSigner, mock.WithPeers(closestPeer))
defer storerPivot.Close() defer storerPivot.Close()
// Trigger the sending of chunk to the closest node // Trigger the sending of chunk to the closest node
...@@ -238,13 +237,13 @@ func TestFailToReplicateBeforeReceipt(t *testing.T) { ...@@ -238,13 +237,13 @@ func TestFailToReplicateBeforeReceipt(t *testing.T) {
defer storerSecond.Close() defer storerSecond.Close()
secondRecorder := streamtest.New(streamtest.WithProtocols(psSecond.Protocol()), streamtest.WithBaseAddr(closestPeer)) secondRecorder := streamtest.New(streamtest.WithProtocols(psSecond.Protocol()), streamtest.WithBaseAddr(closestPeer))
psStorer, storerPeer, _, storerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, secondRecorder, nil, defaultSigner, mock.WithPeers(secondPeer), mock.WithClosestPeerErr(topology.ErrWantSelf)) psStorer, storerPeer, _, storerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, secondRecorder, nil, defaultSigner, mock.WithPeers(secondPeer), mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock)
defer storerPeer.Close() defer storerPeer.Close()
recorder := streamtest.New(streamtest.WithProtocols(psStorer.Protocol()), streamtest.WithBaseAddr(pivotNode)) recorder := streamtest.New(streamtest.WithProtocols(psStorer.Protocol()), streamtest.WithBaseAddr(pivotNode))
// pivot node needs the streamer since the chunk is intercepted by // pivot node needs the streamer since the chunk is intercepted by
// the chunk worker, then gets sent by opening a new stream // the chunk worker, then gets sent by opening a new stream
psPivot, storerPivot, _, pivotAccounting := createPushSyncNode(t, pivotNode, defaultPrices, recorder, nil, defaultSigner, mock.WithClosestPeer(closestPeer)) psPivot, storerPivot, _, pivotAccounting := createPushSyncNode(t, pivotNode, defaultPrices, recorder, nil, defaultSigner, mock.WithPeers(closestPeer))
defer storerPivot.Close() defer storerPivot.Close()
// Trigger the sending of chunk to the closest node // Trigger the sending of chunk to the closest node
...@@ -324,7 +323,7 @@ func TestPushChunkToClosest(t *testing.T) { ...@@ -324,7 +323,7 @@ func TestPushChunkToClosest(t *testing.T) {
// peer is the node responding to the chunk receipt message // peer is the node responding to the chunk receipt message
// mock should return ErrWantSelf since there's no one to forward to // mock should return ErrWantSelf since there's no one to forward to
psPeer, storerPeer, _, peerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, nil, chanFunc(callbackC), defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf)) psPeer, storerPeer, _, peerAccounting := createPushSyncNode(t, closestPeer, defaultPrices, nil, chanFunc(callbackC), defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock)
defer storerPeer.Close() defer storerPeer.Close()
recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()), streamtest.WithBaseAddr(pivotNode)) recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()), streamtest.WithBaseAddr(pivotNode))
...@@ -415,7 +414,7 @@ func TestPushChunkToNextClosest(t *testing.T) { ...@@ -415,7 +414,7 @@ func TestPushChunkToNextClosest(t *testing.T) {
psPeer1, storerPeer1, _, peerAccounting1 := createPushSyncNode(t, peer1, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf)) psPeer1, storerPeer1, _, peerAccounting1 := createPushSyncNode(t, peer1, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer1.Close() defer storerPeer1.Close()
psPeer2, storerPeer2, _, peerAccounting2 := createPushSyncNode(t, peer2, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf)) psPeer2, storerPeer2, _, peerAccounting2 := createPushSyncNode(t, peer2, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock)
defer storerPeer2.Close() defer storerPeer2.Close()
var fail = true var fail = true
...@@ -546,7 +545,7 @@ func TestPushChunkToClosestFailedAttemptRetry(t *testing.T) { ...@@ -546,7 +545,7 @@ func TestPushChunkToClosestFailedAttemptRetry(t *testing.T) {
psPeer3, storerPeer3, _, peerAccounting3 := createPushSyncNode(t, peer3, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf)) psPeer3, storerPeer3, _, peerAccounting3 := createPushSyncNode(t, peer3, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer3.Close() defer storerPeer3.Close()
psPeer4, storerPeer4, _, peerAccounting4 := createPushSyncNode(t, peer4, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf)) psPeer4, storerPeer4, _, peerAccounting4 := createPushSyncNode(t, peer4, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock)
defer storerPeer4.Close() defer storerPeer4.Close()
recorder := streamtest.New( recorder := streamtest.New(
...@@ -665,19 +664,19 @@ func TestHandler(t *testing.T) { ...@@ -665,19 +664,19 @@ func TestHandler(t *testing.T) {
closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
// Create the closest peer // Create the closest peer
psClosestPeer, closestStorerPeerDB, _, closestAccounting := createPushSyncNode(t, closestPeer, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf)) psClosestPeer, closestStorerPeerDB, _, closestAccounting := createPushSyncNode(t, closestPeer, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock)
defer closestStorerPeerDB.Close() defer closestStorerPeerDB.Close()
closestRecorder := streamtest.New(streamtest.WithProtocols(psClosestPeer.Protocol()), streamtest.WithBaseAddr(pivotPeer)) closestRecorder := streamtest.New(streamtest.WithProtocols(psClosestPeer.Protocol()), streamtest.WithBaseAddr(pivotPeer))
// creating the pivot peer // creating the pivot peer
psPivot, storerPivotDB, _, pivotAccounting := createPushSyncNode(t, pivotPeer, defaultPrices, closestRecorder, nil, defaultSigner, mock.WithClosestPeer(closestPeer)) psPivot, storerPivotDB, _, pivotAccounting := createPushSyncNode(t, pivotPeer, defaultPrices, closestRecorder, nil, defaultSigner, mock.WithPeers(closestPeer))
defer storerPivotDB.Close() defer storerPivotDB.Close()
pivotRecorder := streamtest.New(streamtest.WithProtocols(psPivot.Protocol()), streamtest.WithBaseAddr(triggerPeer)) pivotRecorder := streamtest.New(streamtest.WithProtocols(psPivot.Protocol()), streamtest.WithBaseAddr(triggerPeer))
// Creating the trigger peer // Creating the trigger peer
psTriggerPeer, triggerStorerDB, _, triggerAccounting := createPushSyncNode(t, triggerPeer, defaultPrices, pivotRecorder, nil, defaultSigner, mock.WithClosestPeer(pivotPeer)) psTriggerPeer, triggerStorerDB, _, triggerAccounting := createPushSyncNode(t, triggerPeer, defaultPrices, pivotRecorder, nil, defaultSigner, mock.WithPeers(pivotPeer))
defer triggerStorerDB.Close() defer triggerStorerDB.Close()
receipt, err := psTriggerPeer.PushChunkToClosest(context.Background(), chunk) receipt, err := psTriggerPeer.PushChunkToClosest(context.Background(), chunk)
...@@ -753,13 +752,13 @@ func TestSignsReceipt(t *testing.T) { ...@@ -753,13 +752,13 @@ func TestSignsReceipt(t *testing.T) {
closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
// Create the closest peer // Create the closest peer
psClosestPeer, closestStorerPeerDB, _, _ := createPushSyncNode(t, closestPeer, defaultPrices, nil, nil, signer, mock.WithClosestPeerErr(topology.ErrWantSelf)) psClosestPeer, closestStorerPeerDB, _, _ := createPushSyncNode(t, closestPeer, defaultPrices, nil, nil, signer, mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock)
defer closestStorerPeerDB.Close() defer closestStorerPeerDB.Close()
closestRecorder := streamtest.New(streamtest.WithProtocols(psClosestPeer.Protocol()), streamtest.WithBaseAddr(pivotPeer)) closestRecorder := streamtest.New(streamtest.WithProtocols(psClosestPeer.Protocol()), streamtest.WithBaseAddr(pivotPeer))
// creating the pivot peer who will act as a forwarder node with a higher price (17) // creating the pivot peer who will act as a forwarder node with a higher price (17)
psPivot, storerPivotDB, _, _ := createPushSyncNode(t, pivotPeer, defaultPrices, closestRecorder, nil, signer, mock.WithClosestPeer(closestPeer)) psPivot, storerPivotDB, _, _ := createPushSyncNode(t, pivotPeer, defaultPrices, closestRecorder, nil, signer, mock.WithPeers(closestPeer))
defer storerPivotDB.Close() defer storerPivotDB.Close()
receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk) receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk)
...@@ -783,87 +782,6 @@ func TestSignsReceipt(t *testing.T) { ...@@ -783,87 +782,6 @@ func TestSignsReceipt(t *testing.T) {
t.Fatal("receipt block hash do not match") t.Fatal("receipt block hash do not match")
} }
} }
func createPushSyncNode(t *testing.T, addr swarm.Address, prices pricerParameters, recorder *streamtest.Recorder, unwrap func(swarm.Chunk), signer crypto.Signer, mockOpts ...mock.Option) (*pushsync.PushSync, *mocks.MockStorer, *tags.Tags, accounting.Interface) {
t.Helper()
mockAccounting := accountingmock.NewAccounting()
ps, mstorer, ts := createPushSyncNodeWithAccounting(t, addr, prices, recorder, unwrap, signer, mockAccounting, mockOpts...)
return ps, mstorer, ts, mockAccounting
}
func createPushSyncNodeWithAccounting(t *testing.T, addr swarm.Address, prices pricerParameters, recorder *streamtest.Recorder, unwrap func(swarm.Chunk), signer crypto.Signer, acct accounting.Interface, mockOpts ...mock.Option) (*pushsync.PushSync, *mocks.MockStorer, *tags.Tags) {
t.Helper()
logger := logging.New(ioutil.Discard, 0)
storer := mocks.NewStorer()
mockTopology := mock.NewTopologyDriver(mockOpts...)
mockStatestore := statestore.NewStateStore()
mtag := tags.NewTags(mockStatestore, logger)
mockPricer := pricermock.NewMockService(prices.price, prices.peerPrice)
recorderDisconnecter := streamtest.NewRecorderDisconnecter(recorder)
if unwrap == nil {
unwrap = func(swarm.Chunk) {}
}
validStamp := func(ch swarm.Chunk, stamp []byte) (swarm.Chunk, error) {
return ch, nil
}
return pushsync.New(addr, blockHash.Bytes(), recorderDisconnecter, storer, mockTopology, mtag, true, unwrap, validStamp, logger, acct, mockPricer, signer, nil, 0), storer, mtag
}
func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.Recorder, add swarm.Address, data []byte) {
t.Helper()
records := recorder.WaitRecords(t, peer, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName, 1, 5)
if data != nil {
messages, err := protobuf.ReadMessages(
bytes.NewReader(records[0].In()),
func() protobuf.Message { return new(pb.Delivery) },
)
if err != nil {
t.Fatal(err)
}
if messages == nil {
t.Fatal("nil rcvd. for message")
}
if len(messages) > 1 {
t.Fatal("too many messages")
}
delivery := messages[0].(*pb.Delivery)
if !bytes.Equal(delivery.Address, add.Bytes()) {
t.Fatalf("chunk address mismatch")
}
if !bytes.Equal(delivery.Data, data) {
t.Fatalf("chunk data mismatch")
}
} else {
messages, err := protobuf.ReadMessages(
bytes.NewReader(records[0].In()),
func() protobuf.Message { return new(pb.Receipt) },
)
if err != nil {
t.Fatal(err)
}
if messages == nil {
t.Fatal("nil rcvd. for message")
}
if len(messages) > 1 {
t.Fatal("too many messages")
}
receipt := messages[0].(*pb.Receipt)
receiptAddress := swarm.NewAddress(receipt.Address)
if !receiptAddress.Equal(add) {
t.Fatalf("receipt address mismatch")
}
}
}
func TestPeerSkipList(t *testing.T) { func TestPeerSkipList(t *testing.T) {
skipList := pushsync.NewPeerSkipList() skipList := pushsync.NewPeerSkipList()
...@@ -915,7 +833,7 @@ func TestPushChunkToClosestSkipFailed(t *testing.T) { ...@@ -915,7 +833,7 @@ func TestPushChunkToClosestSkipFailed(t *testing.T) {
psPeer2, storerPeer2, _, _ := createPushSyncNode(t, peer2, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf)) psPeer2, storerPeer2, _, _ := createPushSyncNode(t, peer2, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer2.Close() defer storerPeer2.Close()
psPeer3, storerPeer3, _, _ := createPushSyncNode(t, peer3, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf)) psPeer3, storerPeer3, _, _ := createPushSyncNode(t, peer3, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf), WithinDepthMock)
defer storerPeer3.Close() defer storerPeer3.Close()
var ( var (
...@@ -984,6 +902,86 @@ func TestPushChunkToClosestSkipFailed(t *testing.T) { ...@@ -984,6 +902,86 @@ func TestPushChunkToClosestSkipFailed(t *testing.T) {
} }
} }
func createPushSyncNode(t *testing.T, addr swarm.Address, prices pricerParameters, recorder *streamtest.Recorder, unwrap func(swarm.Chunk), signer crypto.Signer, mockOpts ...mock.Option) (*pushsync.PushSync, *mocks.MockStorer, *tags.Tags, accounting.Interface) {
t.Helper()
mockAccounting := accountingmock.NewAccounting()
ps, mstorer, ts := createPushSyncNodeWithAccounting(t, addr, prices, recorder, unwrap, signer, mockAccounting, mockOpts...)
return ps, mstorer, ts, mockAccounting
}
func createPushSyncNodeWithAccounting(t *testing.T, addr swarm.Address, prices pricerParameters, recorder *streamtest.Recorder, unwrap func(swarm.Chunk), signer crypto.Signer, acct accounting.Interface, mockOpts ...mock.Option) (*pushsync.PushSync, *mocks.MockStorer, *tags.Tags) {
t.Helper()
logger := logging.New(ioutil.Discard, 0)
storer := mocks.NewStorer()
mockTopology := mock.NewTopologyDriver(mockOpts...)
mockStatestore := statestore.NewStateStore()
mtag := tags.NewTags(mockStatestore, logger)
mockPricer := pricermock.NewMockService(prices.price, prices.peerPrice)
recorderDisconnecter := streamtest.NewRecorderDisconnecter(recorder)
if unwrap == nil {
unwrap = func(swarm.Chunk) {}
}
validStamp := func(ch swarm.Chunk, stamp []byte) (swarm.Chunk, error) {
return ch, nil
}
return pushsync.New(addr, blockHash.Bytes(), recorderDisconnecter, storer, mockTopology, mtag, true, unwrap, validStamp, logger, acct, mockPricer, signer, nil, 0), storer, mtag
}
func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.Recorder, add swarm.Address, data []byte) {
t.Helper()
records := recorder.WaitRecords(t, peer, pushsync.ProtocolName, pushsync.ProtocolVersion, pushsync.StreamName, 1, 5)
if data != nil {
messages, err := protobuf.ReadMessages(
bytes.NewReader(records[0].In()),
func() protobuf.Message { return new(pb.Delivery) },
)
if err != nil {
t.Fatal(err)
}
if messages == nil {
t.Fatal("nil rcvd. for message")
}
if len(messages) > 1 {
t.Fatal("too many messages")
}
delivery := messages[0].(*pb.Delivery)
if !bytes.Equal(delivery.Address, add.Bytes()) {
t.Fatalf("chunk address mismatch")
}
if !bytes.Equal(delivery.Data, data) {
t.Fatalf("chunk data mismatch")
}
} else {
messages, err := protobuf.ReadMessages(
bytes.NewReader(records[0].In()),
func() protobuf.Message { return new(pb.Receipt) },
)
if err != nil {
t.Fatal(err)
}
if messages == nil {
t.Fatal("nil rcvd. for message")
}
if len(messages) > 1 {
t.Fatal("too many messages")
}
receipt := messages[0].(*pb.Receipt)
receiptAddress := swarm.NewAddress(receipt.Address)
if !receiptAddress.Equal(add) {
t.Fatalf("receipt address mismatch")
}
}
}
func chanFunc(c chan<- struct{}) func(swarm.Chunk) { func chanFunc(c chan<- struct{}) func(swarm.Chunk) {
return func(_ swarm.Chunk) { return func(_ swarm.Chunk) {
c <- struct{}{} c <- struct{}{}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment