Commit f856cf27 authored by Alok Nerurkar's avatar Alok Nerurkar Committed by GitHub

fix: shutdown node on errors from blockchain endpoint (#1868)

parent 95b8441b
...@@ -18,8 +18,10 @@ import ( ...@@ -18,8 +18,10 @@ import (
"math/big" "math/big"
"net" "net"
"net/http" "net/http"
"os"
"path/filepath" "path/filepath"
"strings" "strings"
"syscall"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -365,7 +367,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, ...@@ -365,7 +367,7 @@ func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey,
postageSyncStart = startBlock postageSyncStart = startBlock
} }
eventListener = listener.New(logger, swapBackend, postageContractAddress, o.BlockTime) eventListener = listener.New(logger, swapBackend, postageContractAddress, o.BlockTime, &pidKiller{node: b})
b.listenerCloser = eventListener b.listenerCloser = eventListener
batchSvc = batchservice.New(stateStore, batchStore, logger, eventListener) batchSvc = batchservice.New(stateStore, batchStore, logger, eventListener)
...@@ -791,3 +793,26 @@ func getTxHash(stateStore storage.StateStorer, logger logging.Logger, o Options) ...@@ -791,3 +793,26 @@ func getTxHash(stateStore storage.StateStorer, logger logging.Logger, o Options)
logger.Infof("using the chequebook transaction hash %x", txHash) logger.Infof("using the chequebook transaction hash %x", txHash)
return txHash.Bytes(), nil return txHash.Bytes(), nil
} }
// pidKiller is used to issue a forced shut down of the node from sub modules. The issue with using the
// node's Shutdown method is that it only shuts down the node and does not exit the start process
// which is waiting on the os.Signals. This is not desirable, but currently bee node cannot handle
// rate-limiting blockchain API calls properly. We will shut down the node in this case to allow the
// user to rectify the API issues (by adjusting limits or using a different one). There is no platform
// agnostic way to trigger os.Signals in go unfortunately. Which is why we will use the process.Kill
// approach which works on windows as well.
type pidKiller struct {
node *Bee
}
func (p *pidKiller) Shutdown(ctx context.Context) error {
err := p.node.Shutdown(ctx)
if err != nil {
return err
}
ps, err := os.FindProcess(syscall.Getpid())
if err != nil {
return err
}
return ps.Kill()
}
...@@ -47,6 +47,12 @@ type BlockHeightContractFilterer interface { ...@@ -47,6 +47,12 @@ type BlockHeightContractFilterer interface {
BlockNumber(context.Context) (uint64, error) BlockNumber(context.Context) (uint64, error)
} }
// Shutdowner interface is passed to the listener to shutdown the node if we hit
// error while listening for blockchain events.
type Shutdowner interface {
Shutdown(context.Context) error
}
type listener struct { type listener struct {
logger logging.Logger logger logging.Logger
ev BlockHeightContractFilterer ev BlockHeightContractFilterer
...@@ -56,6 +62,7 @@ type listener struct { ...@@ -56,6 +62,7 @@ type listener struct {
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
metrics metrics metrics metrics
shutdowner Shutdowner
} }
func New( func New(
...@@ -63,6 +70,7 @@ func New( ...@@ -63,6 +70,7 @@ func New(
ev BlockHeightContractFilterer, ev BlockHeightContractFilterer,
postageStampAddress common.Address, postageStampAddress common.Address,
blockTime uint64, blockTime uint64,
shutdowner Shutdowner,
) postage.Listener { ) postage.Listener {
return &listener{ return &listener{
logger: logger, logger: logger,
...@@ -71,6 +79,7 @@ func New( ...@@ -71,6 +79,7 @@ func New(
postageStampAddress: postageStampAddress, postageStampAddress: postageStampAddress,
quit: make(chan struct{}), quit: make(chan struct{}),
metrics: newMetrics(), metrics: newMetrics(),
shutdowner: shutdowner,
} }
} }
...@@ -243,7 +252,13 @@ func (l *listener) Listen(from uint64, updater postage.EventUpdater) <-chan stru ...@@ -243,7 +252,13 @@ func (l *listener) Listen(from uint64, updater postage.EventUpdater) <-chan stru
go func() { go func() {
err := listenf() err := listenf()
if err != nil { if err != nil {
l.logger.Errorf("event listener sync: %v", err) l.logger.Errorf("failed syncing event listener, shutting down node err: %v", err)
if l.shutdowner != nil {
err = l.shutdowner.Shutdown(context.Background())
if err != nil {
l.logger.Errorf("failed shutting down node: %v", err)
}
}
} }
}() }()
......
...@@ -7,8 +7,10 @@ package listener_test ...@@ -7,8 +7,10 @@ package listener_test
import ( import (
"bytes" "bytes"
"context" "context"
"errors"
"io/ioutil" "io/ioutil"
"math/big" "math/big"
"sync"
"testing" "testing"
"time" "time"
...@@ -45,7 +47,7 @@ func TestListener(t *testing.T) { ...@@ -45,7 +47,7 @@ func TestListener(t *testing.T) {
c.toLog(496), c.toLog(496),
), ),
) )
l := listener.New(logger, mf, postageStampAddress, 1) l := listener.New(logger, mf, postageStampAddress, 1, nil)
l.Listen(0, ev) l.Listen(0, ev)
select { select {
...@@ -76,7 +78,7 @@ func TestListener(t *testing.T) { ...@@ -76,7 +78,7 @@ func TestListener(t *testing.T) {
topup.toLog(496), topup.toLog(496),
), ),
) )
l := listener.New(logger, mf, postageStampAddress, 1) l := listener.New(logger, mf, postageStampAddress, 1, nil)
l.Listen(0, ev) l.Listen(0, ev)
select { select {
...@@ -107,7 +109,7 @@ func TestListener(t *testing.T) { ...@@ -107,7 +109,7 @@ func TestListener(t *testing.T) {
depthIncrease.toLog(496), depthIncrease.toLog(496),
), ),
) )
l := listener.New(logger, mf, postageStampAddress, 1) l := listener.New(logger, mf, postageStampAddress, 1, nil)
l.Listen(0, ev) l.Listen(0, ev)
select { select {
...@@ -136,7 +138,7 @@ func TestListener(t *testing.T) { ...@@ -136,7 +138,7 @@ func TestListener(t *testing.T) {
priceUpdate.toLog(496), priceUpdate.toLog(496),
), ),
) )
l := listener.New(logger, mf, postageStampAddress, 1) l := listener.New(logger, mf, postageStampAddress, 1, nil)
l.Listen(0, ev) l.Listen(0, ev)
select { select {
case e := <-evC: case e := <-evC:
...@@ -188,7 +190,7 @@ func TestListener(t *testing.T) { ...@@ -188,7 +190,7 @@ func TestListener(t *testing.T) {
), ),
WithBlockNumber(blockNumber), WithBlockNumber(blockNumber),
) )
l := listener.New(logger, mf, postageStampAddress, 1) l := listener.New(logger, mf, postageStampAddress, 1, nil)
l.Listen(0, ev) l.Listen(0, ev)
select { select {
...@@ -250,6 +252,47 @@ func TestListener(t *testing.T) { ...@@ -250,6 +252,47 @@ func TestListener(t *testing.T) {
t.Fatal("timed out waiting for block number update") t.Fatal("timed out waiting for block number update")
} }
}) })
t.Run("shutdown on error event", func(t *testing.T) {
shutdowner := &countShutdowner{}
ev, _ := newEventUpdaterMock()
mf := newMockFilterer(
WithBlockNumberError(errors.New("dummy error")),
)
l := listener.New(logger, mf, postageStampAddress, 1, shutdowner)
l.Listen(0, ev)
start := time.Now()
for {
time.Sleep(time.Millisecond * 100)
if shutdowner.NoOfCalls() == 1 {
break
}
if time.Since(start) > time.Second*5 {
t.Fatal("expected shutdown call by now")
}
}
})
}
type countShutdowner struct {
mtx sync.Mutex
shutdownCalls int
}
func (c *countShutdowner) NoOfCalls() int {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.shutdownCalls
}
func (c *countShutdowner) Shutdown(_ context.Context) error {
c.mtx.Lock()
defer c.mtx.Unlock()
c.shutdownCalls++
return nil
} }
func newEventUpdaterMock() (*updater, chan interface{}) { func newEventUpdaterMock() (*updater, chan interface{}) {
...@@ -309,6 +352,7 @@ type mockFilterer struct { ...@@ -309,6 +352,7 @@ type mockFilterer struct {
subscriptionEvents []types.Log subscriptionEvents []types.Log
sub *sub sub *sub
blockNumber uint64 blockNumber uint64
blockNumberError error
} }
func newMockFilterer(opts ...Option) *mockFilterer { func newMockFilterer(opts ...Option) *mockFilterer {
...@@ -333,6 +377,12 @@ func WithBlockNumber(blockNumber uint64) Option { ...@@ -333,6 +377,12 @@ func WithBlockNumber(blockNumber uint64) Option {
}) })
} }
func WithBlockNumberError(err error) Option {
return optionFunc(func(s *mockFilterer) {
s.blockNumberError = err
})
}
func (m *mockFilterer) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) { func (m *mockFilterer) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) {
return m.filterLogEvents, nil return m.filterLogEvents, nil
} }
...@@ -352,6 +402,9 @@ func (m *mockFilterer) Close() { ...@@ -352,6 +402,9 @@ func (m *mockFilterer) Close() {
} }
func (m *mockFilterer) BlockNumber(context.Context) (uint64, error) { func (m *mockFilterer) BlockNumber(context.Context) (uint64, error) {
if m.blockNumberError != nil {
return 0, m.blockNumberError
}
return m.blockNumber, nil return m.blockNumber, nil
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment