worker_test.go 3.5 KB
Newer Older
1 2 3 4
package cross

import (
	"context"
5
	"sync/atomic"
6 7 8 9 10 11 12 13 14 15 16
	"testing"
	"time"

	"github.com/ethereum-optimism/optimism/op-service/testlog"
	"github.com/ethereum/go-ethereum/log"
	"github.com/stretchr/testify/require"
)

func TestWorker(t *testing.T) {
	logger := testlog.Logger(t, log.LevelDebug)
	t.Run("do work", func(t *testing.T) {
17
		var count int32
18
		w := NewWorker(logger, func(ctx context.Context) error {
19
			atomic.AddInt32(&count, 1)
20 21 22 23 24
			return nil
		})
		t.Cleanup(w.Close)
		// when ProcessWork is called, the workFn is called once
		require.NoError(t, w.ProcessWork())
25
		require.EqualValues(t, 1, atomic.LoadInt32(&count))
26 27
	})
	t.Run("background worker", func(t *testing.T) {
28
		var count int32
29
		w := NewWorker(logger, func(ctx context.Context) error {
30
			atomic.AddInt32(&count, 1)
31 32 33 34 35 36 37 38 39
			return nil
		})
		t.Cleanup(w.Close)
		// set a long poll duration so the worker does not auto-run
		w.pollDuration = 100 * time.Second
		// when StartBackground is called, the worker runs in the background
		// the count should increment once
		w.StartBackground()
		require.Eventually(t, func() bool {
40
			return atomic.LoadInt32(&count) == 1
41 42 43
		}, 2*time.Second, 100*time.Millisecond)
	})
	t.Run("background worker OnNewData", func(t *testing.T) {
44
		var count int32
45
		w := NewWorker(logger, func(ctx context.Context) error {
46
			atomic.AddInt32(&count, 1)
47 48 49 50 51 52 53 54 55
			return nil
		})
		t.Cleanup(w.Close)
		// set a long poll duration so the worker does not auto-run
		w.pollDuration = 100 * time.Second
		// when StartBackground is called, the worker runs in the background
		// the count should increment once
		w.StartBackground()
		require.Eventually(t, func() bool {
56
			return atomic.LoadInt32(&count) == 1
57 58
		}, 2*time.Second, 100*time.Millisecond)
		// when OnNewData is called, the worker runs again
59
		w.OnNewData()
60
		require.Eventually(t, func() bool {
61
			return atomic.LoadInt32(&count) == 2
62 63 64
		}, 2*time.Second, 100*time.Millisecond)
		// and due to the long poll duration, the worker does not run again
		require.Never(t, func() bool {
65 66
			return atomic.LoadInt32(&count) > 2
		}, time.Second, 100*time.Millisecond)
67 68
	})
	t.Run("background fast poll", func(t *testing.T) {
69
		var count int32
70
		w := NewWorker(logger, func(ctx context.Context) error {
71
			atomic.AddInt32(&count, 1)
72 73 74 75 76 77
			return nil
		})
		t.Cleanup(w.Close)
		// set a long poll duration so the worker does not auto-run
		w.pollDuration = 100 * time.Millisecond
		// when StartBackground is called, the worker runs in the background
78
		// the count should increment rapidly and reach at least 10 in 1 second
79 80
		w.StartBackground()
		require.Eventually(t, func() bool {
81
			return atomic.LoadInt32(&count) >= 10
82 83 84
		}, 2*time.Second, 100*time.Millisecond)
	})
	t.Run("close", func(t *testing.T) {
85
		var count int32
86
		w := NewWorker(logger, func(ctx context.Context) error {
87
			atomic.AddInt32(&count, 1)
88 89 90 91 92 93
			return nil
		})
		t.Cleanup(w.Close) // close on cleanup in case of early error
		// set a long poll duration so the worker does not auto-run
		w.pollDuration = 100 * time.Millisecond
		// when StartBackground is called, the worker runs in the background
94
		// the count should increment rapidly and reach at least 10 in 1 second
95 96
		w.StartBackground()
		require.Eventually(t, func() bool {
97
			return atomic.LoadInt32(&count) >= 10
98
		}, 10*time.Second, time.Second)
99 100 101
		// once the worker is closed, it stops running
		// and the count does not increment
		w.Close()
102
		stopCount := atomic.LoadInt32(&count)
103
		require.Never(t, func() bool {
104 105
			return atomic.LoadInt32(&count) != stopCount
		}, time.Second, 100*time.Millisecond)
106 107
	})
}