worker.go 2.2 KB
Newer Older
1 2 3 4 5 6 7 8
package cross

import (
	"context"
	"errors"
	"sync"
	"time"

9
	"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
	"github.com/ethereum/go-ethereum/log"
)

// Worker iterates work
type Worker struct {
	log log.Logger

	// workFn is the function to call to process the scope
	workFn workFn

	// channel with capacity of 1, full if there is work to do
	poke         chan struct{}
	pollDuration time.Duration

	// lifetime management of the chain processor
	ctx    context.Context
	cancel context.CancelFunc
	wg     sync.WaitGroup
}

// workFn is a function used by the worker
// it is opaque to the worker, and is set by the constructor
type workFn func(ctx context.Context) error

// NewWorker creates a new worker to process updates
func NewWorker(log log.Logger, workFn workFn) *Worker {
	ctx, cancel := context.WithCancel(context.Background())
	out := &Worker{
		log:  log,
		poke: make(chan struct{}, 1),
		// The data may have changed, and we may have missed a poke, so re-attempt regularly.
41
		pollDuration: 250 * time.Millisecond,
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
		ctx:          ctx,
		cancel:       cancel,
	}
	out.workFn = workFn
	return out
}

func (s *Worker) StartBackground() {
	s.wg.Add(1)
	go s.worker()
}

func (s *Worker) ProcessWork() error {
	return s.workFn(s.ctx)
}

func (s *Worker) worker() {
	defer s.wg.Done()

	delay := time.NewTicker(s.pollDuration)
	for {
		if s.ctx.Err() != nil { // check if we are closing down
			return
		}

		// do the work
		err := s.workFn(s.ctx)
		if err != nil {
			if errors.Is(err, s.ctx.Err()) {
				return
			}
73
			if errors.Is(err, types.ErrFuture) {
74
				s.log.Debug("Worker awaits additional blocks", "err", err)
75 76 77
			} else {
				s.log.Warn("Failed to process work", "err", err)
			}
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
		}

		// await next time we process, or detect shutdown
		select {
		case <-s.ctx.Done():
			delay.Stop()
			return
		case <-s.poke:
			s.log.Debug("Continuing cross-safe verification after hint of new data")
			continue
		case <-delay.C:
			s.log.Debug("Checking for cross-safe updates")
			continue
		}
	}
}

95
func (s *Worker) OnNewData() {
96 97 98 99 100 101 102 103 104 105 106 107
	// signal that we have something to process
	select {
	case s.poke <- struct{}{}:
	default:
		// already requested an update
	}
}

func (s *Worker) Close() {
	s.cancel()
	s.wg.Wait()
}