Commit 740a51d7 authored by acud's avatar acud Committed by GitHub

pullsync: coalesce batch writes (#1386)

parent aa45aa15
...@@ -202,33 +202,53 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8 ...@@ -202,33 +202,53 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8
// thus, the following loop will not get executed and the method // thus, the following loop will not get executed and the method
// returns immediately with the topmost value on the offer, which // returns immediately with the topmost value on the offer, which
// will seal the interval and request the next one // will seal the interval and request the next one
err = nil
var chunksToPut []swarm.Chunk
for ; ctr > 0; ctr-- { for ; ctr > 0; ctr-- {
var delivery pb.Delivery var delivery pb.Delivery
if err = r.ReadMsgWithContext(ctx, &delivery); err != nil { if err = r.ReadMsgWithContext(ctx, &delivery); err != nil {
return 0, ru.Ruid, fmt.Errorf("read delivery: %w", err) // this is not a fatal error and we should write
// a partial batch if some chunks have been received.
err = fmt.Errorf("read delivery: %w", err)
break
} }
addr := swarm.NewAddress(delivery.Address) addr := swarm.NewAddress(delivery.Address)
if _, ok := wantChunks[addr.String()]; !ok { if _, ok := wantChunks[addr.String()]; !ok {
// this is fatal for the entire batch, return the
// error and don't write the partial batch.
return 0, ru.Ruid, ErrUnsolicitedChunk return 0, ru.Ruid, ErrUnsolicitedChunk
} }
delete(wantChunks, addr.String()) delete(wantChunks, addr.String())
s.metrics.DbOpsCounter.Inc()
s.metrics.DeliveryCounter.Inc() s.metrics.DeliveryCounter.Inc()
chunk := swarm.NewChunk(addr, delivery.Data) chunk := swarm.NewChunk(addr, delivery.Data)
if cac.Valid(chunk) { if cac.Valid(chunk) {
go s.unwrap(chunk) go s.unwrap(chunk)
} else if !soc.Valid(chunk) { } else if !soc.Valid(chunk) {
// this is fatal for the entire batch, return the
// error and don't write the partial batch.
return 0, ru.Ruid, swarm.ErrInvalidChunk return 0, ru.Ruid, swarm.ErrInvalidChunk
} }
chunksToPut = append(chunksToPut, chunk)
if err = s.storage.Put(ctx, storage.ModePutSync, chunk); err != nil { }
return 0, ru.Ruid, fmt.Errorf("delivery put: %w", err) if len(chunksToPut) > 0 {
s.metrics.DbOpsCounter.Inc()
if ierr := s.storage.Put(ctx, storage.ModePutSync, chunksToPut...); ierr != nil {
if err != nil {
ierr = fmt.Errorf(", sync err: %w", err)
}
return 0, ru.Ruid, fmt.Errorf("delivery put: %w", ierr)
} }
} }
// there might have been an error in the for loop above,
// return it if it indeed happened
if err != nil {
return 0, ru.Ruid, err
}
return offer.Topmost, ru.Ruid, nil return offer.Topmost, ru.Ruid, nil
} }
......
...@@ -133,8 +133,8 @@ func TestIncoming_WantAll(t *testing.T) { ...@@ -133,8 +133,8 @@ func TestIncoming_WantAll(t *testing.T) {
// should have all // should have all
haveChunks(t, clientDb, addrs...) haveChunks(t, clientDb, addrs...)
if p := clientDb.PutCalls(); p != 5 { if p := clientDb.PutCalls(); p != 1 {
t.Fatalf("want %d puts but got %d", 5, p) t.Fatalf("want %d puts but got %d", 1, p)
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment