filepoller_test.go 2.13 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
package preimage

import (
	"context"
	"testing"
	"time"

	"github.com/stretchr/testify/require"
)

func TestFilePoller_Read(t *testing.T) {
	chanA, chanB, err := CreateBidirectionalChannel()
	require.NoError(t, err)
	ctx := context.Background()
	chanAPoller := NewFilePoller(ctx, chanA, time.Millisecond*100)

	go func() {
18
		_, _ = chanB.Write([]byte("hello"))
19
		time.Sleep(time.Second * 1)
20
		_, _ = chanB.Write([]byte("world"))
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
	}()
	var buf [10]byte
	n, err := chanAPoller.Read(buf[:])
	require.Equal(t, 10, n)
	require.NoError(t, err)
}

func TestFilePoller_Write(t *testing.T) {
	chanA, chanB, err := CreateBidirectionalChannel()
	require.NoError(t, err)
	ctx := context.Background()
	chanAPoller := NewFilePoller(ctx, chanA, time.Millisecond*100)

	bufch := make(chan []byte, 1)
	go func() {
		var buf [10]byte
37
		_, _ = chanB.Read(buf[:5])
38
		time.Sleep(time.Second * 1)
39
		_, _ = chanB.Read(buf[5:])
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
		bufch <- buf[:]
		close(bufch)
	}()
	buf := []byte("helloworld")
	n, err := chanAPoller.Write(buf)
	require.Equal(t, 10, n)
	require.NoError(t, err)
	select {
	case <-time.After(time.Second * 60):
		t.Fatal("timed out waiting for read")
	case readbuf := <-bufch:
		require.Equal(t, buf, readbuf)
	}
}

func TestFilePoller_ReadCancel(t *testing.T) {
	chanA, chanB, err := CreateBidirectionalChannel()
	require.NoError(t, err)
	ctx, cancel := context.WithCancel(context.Background())
	chanAPoller := NewFilePoller(ctx, chanA, time.Millisecond*100)

	go func() {
62
		_, _ = chanB.Write([]byte("hello"))
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
		cancel()
	}()
	var buf [10]byte
	n, err := chanAPoller.Read(buf[:])
	require.Equal(t, 5, n)
	require.ErrorIs(t, err, context.Canceled)
}

func TestFilePoller_WriteCancel(t *testing.T) {
	chanA, chanB, err := CreateBidirectionalChannel()
	require.NoError(t, err)
	ctx, cancel := context.WithCancel(context.Background())
	chanAPoller := NewFilePoller(ctx, chanA, time.Millisecond*100)

	go func() {
		var buf [5]byte
79
		_, _ = chanB.Read(buf[:])
80 81 82 83 84 85 86
		cancel()
	}()
	// use a large buffer to overflow the kernel buffer provided to pipe(2) so the write actually blocks
	buf := make([]byte, 1024*1024)
	_, err = chanAPoller.Write(buf)
	require.ErrorIs(t, err, context.Canceled)
}