node.go 27.9 KB
Newer Older
1
// Copyright 2020 The Swarm Authors. All rights reserved.
Janos Guljas's avatar
Janos Guljas committed
2 3 4
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

5 6 7
// Package node defines the concept of a Bee node
// by bootstrapping and injecting all necessary
// dependencies.
Janos Guljas's avatar
Janos Guljas committed
8 9 10 11
package node

import (
	"context"
12
	"crypto/ecdsa"
acud's avatar
acud committed
13
	"errors"
Janos Guljas's avatar
Janos Guljas committed
14 15 16
	"fmt"
	"io"
	"log"
17
	"math/big"
Janos Guljas's avatar
Janos Guljas committed
18 19
	"net"
	"net/http"
20
	"os"
21
	"path/filepath"
22
	"runtime"
23
	"sync"
24
	"syscall"
25
	"time"
Janos Guljas's avatar
Janos Guljas committed
26

27
	"github.com/ethereum/go-ethereum/common"
28
	"github.com/ethereum/go-ethereum/core/types"
29
	"github.com/ethereum/go-ethereum/ethclient"
30
	"github.com/ethersphere/bee/pkg/accounting"
31
	"github.com/ethersphere/bee/pkg/addressbook"
32
	"github.com/ethersphere/bee/pkg/api"
33
	"github.com/ethersphere/bee/pkg/config"
34
	"github.com/ethersphere/bee/pkg/crypto"
35
	"github.com/ethersphere/bee/pkg/debugapi"
36
	"github.com/ethersphere/bee/pkg/feeds/factory"
37
	"github.com/ethersphere/bee/pkg/hive"
38
	"github.com/ethersphere/bee/pkg/localstore"
39 40
	"github.com/ethersphere/bee/pkg/logging"
	"github.com/ethersphere/bee/pkg/metrics"
41
	"github.com/ethersphere/bee/pkg/netstore"
42
	"github.com/ethersphere/bee/pkg/p2p"
43 44
	"github.com/ethersphere/bee/pkg/p2p/libp2p"
	"github.com/ethersphere/bee/pkg/pingpong"
Peter Mrekaj's avatar
Peter Mrekaj committed
45
	"github.com/ethersphere/bee/pkg/pinning"
acud's avatar
acud committed
46 47 48 49 50
	"github.com/ethersphere/bee/pkg/postage"
	"github.com/ethersphere/bee/pkg/postage/batchservice"
	"github.com/ethersphere/bee/pkg/postage/batchstore"
	"github.com/ethersphere/bee/pkg/postage/listener"
	"github.com/ethersphere/bee/pkg/postage/postagecontract"
51
	"github.com/ethersphere/bee/pkg/pricer"
52
	"github.com/ethersphere/bee/pkg/pricing"
Zahoor Mohamed's avatar
Zahoor Mohamed committed
53
	"github.com/ethersphere/bee/pkg/pss"
54 55 56
	"github.com/ethersphere/bee/pkg/puller"
	"github.com/ethersphere/bee/pkg/pullsync"
	"github.com/ethersphere/bee/pkg/pullsync/pullstorage"
57 58
	"github.com/ethersphere/bee/pkg/pusher"
	"github.com/ethersphere/bee/pkg/pushsync"
Zahoor Mohamed's avatar
Zahoor Mohamed committed
59
	"github.com/ethersphere/bee/pkg/recovery"
60
	"github.com/ethersphere/bee/pkg/resolver/multiresolver"
61
	"github.com/ethersphere/bee/pkg/retrieval"
62
	"github.com/ethersphere/bee/pkg/settlement/pseudosettle"
63
	"github.com/ethersphere/bee/pkg/settlement/swap"
64
	"github.com/ethersphere/bee/pkg/settlement/swap/chequebook"
65
	"github.com/ethersphere/bee/pkg/settlement/swap/priceoracle"
66
	"github.com/ethersphere/bee/pkg/shed"
67
	"github.com/ethersphere/bee/pkg/steward"
68
	"github.com/ethersphere/bee/pkg/storage"
69
	"github.com/ethersphere/bee/pkg/swarm"
Zahoor Mohamed's avatar
Zahoor Mohamed committed
70
	"github.com/ethersphere/bee/pkg/tags"
71
	"github.com/ethersphere/bee/pkg/topology"
72 73
	"github.com/ethersphere/bee/pkg/topology/kademlia"
	"github.com/ethersphere/bee/pkg/topology/lightnode"
74
	"github.com/ethersphere/bee/pkg/tracing"
75
	"github.com/ethersphere/bee/pkg/transaction"
76
	"github.com/ethersphere/bee/pkg/traversal"
77
	"github.com/hashicorp/go-multierror"
78
	ma "github.com/multiformats/go-multiaddr"
79
	"github.com/sirupsen/logrus"
80
	"golang.org/x/crypto/sha3"
81
	"golang.org/x/sync/errgroup"
Janos Guljas's avatar
Janos Guljas committed
82 83 84
)

type Bee struct {
luxq's avatar
luxq committed
85
	persistCfg               *PersistConfig
86
	p2pService               io.Closer
87
	p2pHalter                p2p.Halter
88 89 90 91 92 93 94 95 96 97 98
	p2pCancel                context.CancelFunc
	apiCloser                io.Closer
	apiServer                *http.Server
	debugAPIServer           *http.Server
	resolverCloser           io.Closer
	errorLogWriter           *io.PipeWriter
	tracerCloser             io.Closer
	tagsCloser               io.Closer
	stateStoreCloser         io.Closer
	localstoreCloser         io.Closer
	topologyCloser           io.Closer
99
	topologyHalter           topology.Halter
100 101
	pusherCloser             io.Closer
	pullerCloser             io.Closer
102
	accountingCloser         io.Closer
103 104 105 106
	pullSyncCloser           io.Closer
	pssCloser                io.Closer
	ethClientCloser          func()
	transactionMonitorCloser io.Closer
107
	transactionCloser        io.Closer
108
	recoveryHandleCleanup    func()
acud's avatar
acud committed
109
	listenerCloser           io.Closer
110
	postageServiceCloser     io.Closer
111
	priceOracleCloser        io.Closer
112
	hiveCloser               io.Closer
113 114
	shutdownInProgress       bool
	shutdownMutex            sync.Mutex
Janos Guljas's avatar
Janos Guljas committed
115 116 117
}

type Options struct {
118
	DataDir                    string
119
	CacheCapacity              uint64
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
	DBOpenFilesLimit           uint64
	DBWriteBufferSize          uint64
	DBBlockCacheCapacity       uint64
	DBDisableSeeksCompaction   bool
	APIAddr                    string
	DebugAPIAddr               string
	Addr                       string
	NATAddr                    string
	EnableWS                   bool
	EnableQUIC                 bool
	WelcomeMessage             string
	Bootnodes                  []string
	CORSAllowedOrigins         []string
	Logger                     logging.Logger
	TracingEnabled             bool
	TracingEndpoint            string
	TracingServiceName         string
	GlobalPinningEnabled       bool
	PaymentThreshold           string
	PaymentTolerance           string
	PaymentEarly               string
	ResolverConnectionCfgs     []multiresolver.ConnectionConfig
142
	RetrievalCaching           bool
143 144 145 146 147 148 149 150 151
	GatewayMode                bool
	BootnodeMode               bool
	SwapEndpoint               string
	SwapFactoryAddress         string
	SwapLegacyFactoryAddresses []string
	SwapInitialDeposit         string
	SwapEnable                 bool
	FullNodeMode               bool
	Transaction                string
152
	BlockHash                  string
153 154 155
	PostageContractAddress     string
	PriceOracleAddress         string
	BlockTime                  uint64
156
	DeployGasPrice             string
157
	WarmupTime                 time.Duration
158
	ChainID                    int64
159 160 161
	Resync                     bool
	BlockProfile               bool
	MutexProfile               bool
Janos Guljas's avatar
Janos Guljas committed
162 163
}

164
const (
165
	refreshRate = int64(4500000)
166
	basePrice   = 10000
167 168
)

169
func NewBee(addr string, publicKey *ecdsa.PublicKey, signer crypto.Signer, networkID uint64, logger logging.Logger, libp2pPrivateKey, pssPrivateKey *ecdsa.PrivateKey, o *Options) (b *Bee, err error) {
170 171 172 173 174 175 176 177 178
	tracer, tracerCloser, err := tracing.NewTracer(&tracing.Options{
		Enabled:     o.TracingEnabled,
		Endpoint:    o.TracingEndpoint,
		ServiceName: o.TracingServiceName,
	})
	if err != nil {
		return nil, fmt.Errorf("tracer: %w", err)
	}

Janos Guljas's avatar
Janos Guljas committed
179
	p2pCtx, p2pCancel := context.WithCancel(context.Background())
180 181 182 183 184 185 186 187
	defer func() {
		// if there's been an error on this function
		// we'd like to cancel the p2p context so that
		// incoming connections will not be possible
		if err != nil {
			p2pCancel()
		}
	}()
Janos Guljas's avatar
Janos Guljas committed
188

189 190 191 192 193 194
	// light nodes have zero warmup time for pull/pushsync protocols
	warmupTime := o.WarmupTime
	if !o.FullNodeMode {
		warmupTime = 0
	}

195
	b = &Bee{
Janos Guljas's avatar
Janos Guljas committed
196 197
		p2pCancel:      p2pCancel,
		errorLogWriter: logger.WriterLevel(logrus.ErrorLevel),
198
		tracerCloser:   tracerCloser,
Janos Guljas's avatar
Janos Guljas committed
199
	}
luxq's avatar
luxq committed
200
	b.initPersistConfig(o.DataDir)
Janos Guljas's avatar
Janos Guljas committed
201

202 203 204 205 206 207 208 209
	stateStore, err := InitStateStore(logger, o.DataDir)
	if err != nil {
		return nil, err
	}
	b.stateStoreCloser = stateStore

	addressbook := addressbook.New(stateStore)

210 211 212 213 214 215 216 217 218 219 220 221
	var (
		swapBackend        *ethclient.Client
		overlayEthAddress  common.Address
		chainID            int64
		transactionService transaction.Service
		transactionMonitor transaction.Monitor
		chequebookFactory  chequebook.Factory
		chequebookService  chequebook.Service
		chequeStore        chequebook.ChequeStore
		cashoutService     chequebook.CashoutService
		pollingInterval    = time.Duration(o.BlockTime) * time.Second
	)
222 223 224 225 226 227 228 229 230 231 232 233 234 235
	swapBackend, overlayEthAddress, chainID, transactionMonitor, transactionService, err = InitChain(
		p2pCtx,
		logger,
		stateStore,
		o.SwapEndpoint,
		signer,
		pollingInterval,
	)
	if err != nil {
		return nil, fmt.Errorf("init chain: %w", err)
	}
	b.ethClientCloser = swapBackend.Close
	b.transactionCloser = tracerCloser
	b.transactionMonitorCloser = transactionMonitor
236

237 238
	if o.ChainID != -1 && o.ChainID != chainID {
		return nil, fmt.Errorf("connected to wrong ethereum network: got chainID %d, want %d", chainID, o.ChainID)
239 240
	}

241
	var debugAPIService *debugapi.Service
242 243 244 245 246
	if o.DebugAPIAddr != "" {
		overlayEthAddress, err := signer.EthereumAddress()
		if err != nil {
			return nil, fmt.Errorf("eth address: %w", err)
		}
247 248 249 250 251 252 253 254 255

		if o.MutexProfile {
			_ = runtime.SetMutexProfileFraction(1)
		}

		if o.BlockProfile {
			runtime.SetBlockProfileRate(1)
		}

256
		// set up basic debug api endpoints for debugging and /health endpoint
257
		debugAPIService = debugapi.New(*publicKey, pssPrivateKey.PublicKey, overlayEthAddress, logger, tracer, o.CORSAllowedOrigins, big.NewInt(int64(o.BlockTime)), transactionService)
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282

		debugAPIListener, err := net.Listen("tcp", o.DebugAPIAddr)
		if err != nil {
			return nil, fmt.Errorf("debug api listener: %w", err)
		}

		debugAPIServer := &http.Server{
			IdleTimeout:       30 * time.Second,
			ReadHeaderTimeout: 3 * time.Second,
			Handler:           debugAPIService,
			ErrorLog:          log.New(b.errorLogWriter, "", 0),
		}

		go func() {
			logger.Infof("debug api address: %s", debugAPIListener.Addr())

			if err := debugAPIServer.Serve(debugAPIListener); err != nil && err != http.ErrServerClosed {
				logger.Debugf("debug api server: %v", err)
				logger.Error("unable to serve debug api")
			}
		}()

		b.debugAPIServer = debugAPIServer
	}

283 284 285 286 287 288 289
	// Sync the with the given Ethereum backend:
	isSynced, _, err := transaction.IsSynced(p2pCtx, swapBackend, maxDelay)
	if err != nil {
		return nil, fmt.Errorf("is synced: %w", err)
	}
	if !isSynced {
		logger.Infof("waiting to sync with the Ethereum backend")
290

291 292 293
		err := transaction.WaitSynced(p2pCtx, logger, swapBackend, maxDelay)
		if err != nil {
			return nil, fmt.Errorf("waiting backend sync: %w", err)
294
		}
acud's avatar
acud committed
295 296 297
	}

	if o.SwapEnable {
298 299 300 301 302 303
		chequebookFactory, err = InitChequebookFactory(
			logger,
			swapBackend,
			chainID,
			transactionService,
			o.SwapFactoryAddress,
304
			o.SwapLegacyFactoryAddresses,
305
		)
306
		if err != nil {
307
			return nil, err
308 309
		}

310 311
		if err = chequebookFactory.VerifyBytecode(p2pCtx); err != nil {
			return nil, fmt.Errorf("factory fail: %w", err)
312 313
		}

314 315
		chequebookService, err = InitChequebookService(
			p2pCtx,
316
			logger,
317 318 319
			stateStore,
			signer,
			chainID,
320 321
			swapBackend,
			overlayEthAddress,
322 323 324
			transactionService,
			chequebookFactory,
			o.SwapInitialDeposit,
325
			o.DeployGasPrice,
326
		)
327 328 329
		if err != nil {
			return nil, err
		}
330

331
		chequeStore, cashoutService = initChequeStoreCashout(
332 333 334 335 336 337 338
			stateStore,
			swapBackend,
			chequebookFactory,
			chainID,
			overlayEthAddress,
			transactionService,
		)
339 340
	}

341 342 343 344 345 346 347 348 349
	pubKey, _ := signer.PublicKey()
	if err != nil {
		return nil, err
	}

	var (
		blockHash []byte
		txHash    []byte
	)
350

351
	txHash, err = GetTxHash(stateStore, logger, o.Transaction)
352
	if err != nil {
353
		return nil, fmt.Errorf("invalid transaction hash: %w", err)
354 355
	}

luxq's avatar
luxq committed
356 357 358 359
	if o.BlockHash == "" {
		o.BlockHash = b.GetNextBlock(common.Bytes2Hex(txHash))
	}

360 361 362 363 364
	blockHash, err = GetTxNextBlock(p2pCtx, logger, swapBackend, transactionMonitor, pollingInterval, txHash, o.BlockHash)
	if err != nil {
		return nil, fmt.Errorf("invalid block hash: %w", err)
	}

luxq's avatar
luxq committed
365 366 367
	if err := b.UpdateTxNextBlock(common.Bytes2Hex(txHash), common.Bytes2Hex(blockHash)); err != nil {
		logger.Errorf("update txNextBlock failed: %v\n", err)
	}
luxq's avatar
luxq committed
368

369 370 371 372 373 374 375 376 377
	swarmAddress, err := crypto.NewOverlayAddress(*pubKey, networkID, blockHash)

	err = CheckOverlayWithStore(swarmAddress, stateStore)
	if err != nil {
		return nil, err
	}

	lightNodes := lightnode.NewContainer(swarmAddress)

378
	senderMatcher := transaction.NewMatcher(swapBackend, types.NewLondonSigner(big.NewInt(chainID)), stateStore)
379 380

	p2ps, err := libp2p.New(p2pCtx, signer, networkID, swarmAddress, addr, addressbook, stateStore, lightNodes, senderMatcher, logger, tracer, libp2p.Options{
381 382
		PrivateKey:     libp2pPrivateKey,
		NATAddr:        o.NATAddr,
383 384
		EnableWS:       o.EnableWS,
		EnableQUIC:     o.EnableQUIC,
385
		WelcomeMessage: o.WelcomeMessage,
386
		FullNode:       o.FullNodeMode,
387
		Transaction:    txHash,
388
	})
Janos Guljas's avatar
Janos Guljas committed
389 390 391
	if err != nil {
		return nil, fmt.Errorf("p2p service: %w", err)
	}
392
	b.p2pService = p2ps
393
	b.p2pHalter = p2ps
Janos Guljas's avatar
Janos Guljas committed
394

395 396 397 398 399 400 401 402 403 404 405
	var unreserveFn func([]byte, uint8) (uint64, error)
	var evictFn = func(b []byte) error {
		_, err := unreserveFn(b, swarm.MaxPO+1)
		return err
	}

	batchStore, err := batchstore.New(stateStore, evictFn, logger)
	if err != nil {
		return nil, fmt.Errorf("batchstore: %w", err)
	}

acud's avatar
acud committed
406 407 408 409
	// localstore depends on batchstore
	var path string

	if o.DataDir != "" {
410
		logger.Infof("using datadir in: '%s'", o.DataDir)
acud's avatar
acud committed
411 412 413
		path = filepath.Join(o.DataDir, "localstore")
	}
	lo := &localstore.Options{
414
		Capacity:               o.CacheCapacity,
415 416
		ReserveCapacity:        uint64(batchstore.Capacity),
		UnreserveFunc:          batchStore.Unreserve,
acud's avatar
acud committed
417 418 419 420 421 422
		OpenFilesLimit:         o.DBOpenFilesLimit,
		BlockCacheCapacity:     o.DBBlockCacheCapacity,
		WriteBufferSize:        o.DBWriteBufferSize,
		DisableSeeksCompaction: o.DBDisableSeeksCompaction,
	}

423
	storer, err := localstore.New(path, swarmAddress.Bytes(), stateStore, lo, logger)
acud's avatar
acud committed
424 425 426 427
	if err != nil {
		return nil, fmt.Errorf("localstore: %w", err)
	}
	b.localstoreCloser = storer
428
	unreserveFn = storer.UnreserveBatch
acud's avatar
acud committed
429 430

	validStamp := postage.ValidStamp(batchStore)
431
	post, err := postage.NewService(stateStore, batchStore, chainID)
432 433 434 435
	if err != nil {
		return nil, fmt.Errorf("postage service load: %w", err)
	}
	b.postageServiceCloser = post
acud's avatar
acud committed
436 437 438 439

	var (
		postageContractService postagecontract.Interface
		batchSvc               postage.EventUpdater
440
		eventListener          postage.Listener
acud's avatar
acud committed
441 442
	)

443
	var postageSyncStart uint64 = 0
444 445 446 447 448
	chainCfg, found := config.GetChainConfig(chainID)
	postageContractAddress, startBlock := chainCfg.PostageStamp, chainCfg.StartBlock
	if o.PostageContractAddress != "" {
		if !common.IsHexAddress(o.PostageContractAddress) {
			return nil, errors.New("malformed postage stamp address")
449
		}
450 451 452 453 454 455 456
		postageContractAddress = common.HexToAddress(o.PostageContractAddress)
	} else if !found {
		return nil, errors.New("no known postage stamp addresses for this network")
	}
	if found {
		postageSyncStart = startBlock
	}
acud's avatar
acud committed
457

458 459
	eventListener = listener.New(logger, swapBackend, postageContractAddress, o.BlockTime, &pidKiller{node: b})
	b.listenerCloser = eventListener
acud's avatar
acud committed
460

461
	batchSvc, err = batchservice.New(stateStore, batchStore, logger, eventListener, overlayEthAddress.Bytes(), post, sha3.New256, o.Resync)
462 463 464
	if err != nil {
		return nil, err
	}
acud's avatar
acud committed
465

466 467 468
	erc20Address, err := postagecontract.LookupERC20Address(p2pCtx, transactionService, postageContractAddress)
	if err != nil {
		return nil, err
acud's avatar
acud committed
469 470
	}

471 472 473 474 475 476
	postageContractService = postagecontract.New(
		overlayEthAddress,
		postageContractAddress,
		erc20Address,
		transactionService,
		post,
477
		batchStore,
478 479 480 481 482 483 484 485 486 487 488 489 490
	)

	if natManager := p2ps.NATManager(); natManager != nil {
		// wait for nat manager to init
		logger.Debug("initializing NAT manager")
		select {
		case <-natManager.Ready():
			// this is magic sleep to give NAT time to sync the mappings
			// this is a hack, kind of alchemy and should be improved
			time.Sleep(3 * time.Second)
			logger.Debug("NAT manager initialized")
		case <-time.After(10 * time.Second):
			logger.Warning("NAT manager init timeout")
491
		}
492 493
	}

Janos Guljas's avatar
Janos Guljas committed
494
	// Construct protocols.
495
	pingPong := pingpong.New(p2ps, logger, tracer)
Janos Guljas's avatar
Janos Guljas committed
496 497 498 499 500

	if err = p2ps.AddProtocol(pingPong.Protocol()); err != nil {
		return nil, fmt.Errorf("pingpong service: %w", err)
	}

501
	hive := hive.New(p2ps, addressbook, networkID, logger)
502 503 504
	if err = p2ps.AddProtocol(hive.Protocol()); err != nil {
		return nil, fmt.Errorf("hive service: %w", err)
	}
505
	b.hiveCloser = hive
506

507
	var bootnodes []ma.Multiaddr
508

509 510 511 512 513 514
	for _, a := range o.Bootnodes {
		addr, err := ma.NewMultiaddr(a)
		if err != nil {
			logger.Debugf("multiaddress fail %s: %v", a, err)
			logger.Warningf("invalid bootnode address %s", a)
			continue
515
		}
516 517

		bootnodes = append(bootnodes, addr)
518 519
	}

520 521
	var swapService *swap.Service

522 523 524 525 526
	metricsDB, err := shed.NewDBWrap(stateStore.DB())
	if err != nil {
		return nil, fmt.Errorf("unable to create metrics storage for kademlia: %w", err)
	}

527
	kad := kademlia.New(swarmAddress, addressbook, hive, p2ps, metricsDB, logger, kademlia.Options{Bootnodes: bootnodes, BootnodeMode: o.BootnodeMode})
528
	b.topologyCloser = kad
529
	b.topologyHalter = kad
530 531
	hive.SetAddPeersHandler(kad.AddPeers)
	p2ps.SetPickyNotifier(kad)
acud's avatar
acud committed
532 533
	batchStore.SetRadiusSetter(kad)

534
	if batchSvc != nil {
535 536 537 538
		syncedChan, err := batchSvc.Start(postageSyncStart)
		if err != nil {
			return nil, fmt.Errorf("unable to start batch service: %w", err)
		}
539 540
		// wait for the postage contract listener to sync
		logger.Info("waiting to sync postage contract data, this may take a while... more info available in Debug loglevel")
acud's avatar
acud committed
541

542 543 544 545 546
		// arguably this is not a very nice solution since we dont support
		// interrupts at this stage of the application lifecycle. some changes
		// would be needed on the cmd level to support context cancellation at
		// this stage
		<-syncedChan
547

548
	}
549 550

	minThreshold := big.NewInt(2 * refreshRate)
551
	maxThreshold := big.NewInt(24 * refreshRate)
552

553 554 555 556 557
	paymentThreshold, ok := new(big.Int).SetString(o.PaymentThreshold, 10)
	if !ok {
		return nil, fmt.Errorf("invalid payment threshold: %s", paymentThreshold)
	}

558
	pricer := pricer.NewFixedPricer(swarmAddress, basePrice)
559

560 561 562
	if paymentThreshold.Cmp(minThreshold) < 0 {
		return nil, fmt.Errorf("payment threshold below minimum generally accepted value, need at least %s", minThreshold)
	}
563

564 565 566 567
	if paymentThreshold.Cmp(maxThreshold) > 0 {
		return nil, fmt.Errorf("payment threshold above maximum generally accepted value, needs to be reduced to at most %s", maxThreshold)
	}

568
	pricing := pricing.New(p2ps, logger, paymentThreshold, minThreshold)
569 570 571 572 573 574 575 576 577 578 579 580 581 582

	if err = p2ps.AddProtocol(pricing.Protocol()); err != nil {
		return nil, fmt.Errorf("pricing service: %w", err)
	}

	addrs, err := p2ps.Addresses()
	if err != nil {
		return nil, fmt.Errorf("get server addresses: %w", err)
	}

	for _, addr := range addrs {
		logger.Debugf("p2p address: %s", addr)
	}

583 584 585 586 587 588 589 590
	paymentTolerance, ok := new(big.Int).SetString(o.PaymentTolerance, 10)
	if !ok {
		return nil, fmt.Errorf("invalid payment tolerance: %s", paymentTolerance)
	}
	paymentEarly, ok := new(big.Int).SetString(o.PaymentEarly, 10)
	if !ok {
		return nil, fmt.Errorf("invalid payment early: %s", paymentEarly)
	}
591

592 593 594 595 596 597 598
	acc, err := accounting.NewAccounting(
		paymentThreshold,
		paymentTolerance,
		paymentEarly,
		logger,
		stateStore,
		pricing,
599
		big.NewInt(refreshRate),
600
		p2ps,
601 602 603 604
	)
	if err != nil {
		return nil, fmt.Errorf("accounting: %w", err)
	}
605
	b.accountingCloser = acc
606

607 608 609 610 611 612 613
	pseudosettleService := pseudosettle.New(p2ps, logger, stateStore, acc, big.NewInt(refreshRate), p2ps)
	if err = p2ps.AddProtocol(pseudosettleService.Protocol()); err != nil {
		return nil, fmt.Errorf("pseudosettle service: %w", err)
	}

	acc.SetRefreshFunc(pseudosettleService.Pay)

614
	if o.SwapEnable {
615 616
		var priceOracle priceoracle.Service
		swapService, priceOracle, err = InitSwap(
617 618 619 620 621 622 623 624
			p2ps,
			logger,
			stateStore,
			networkID,
			overlayEthAddress,
			chequebookService,
			chequeStore,
			cashoutService,
625
			acc,
626 627 628
			o.PriceOracleAddress,
			chainID,
			transactionService,
629 630 631
		)
		if err != nil {
			return nil, err
632
		}
633
		b.priceOracleCloser = priceOracle
634
		acc.SetPayFunc(swapService.Pay)
635 636
	}

637
	pricing.SetPaymentThresholdObserver(acc)
Janos Guljas's avatar
Janos Guljas committed
638

639
	retrieve := retrieval.New(swarmAddress, storer, p2ps, kad, logger, acc, pricer, tracer, o.RetrievalCaching, validStamp)
acud's avatar
acud committed
640 641
	tagService := tags.NewTags(stateStore, logger)
	b.tagsCloser = tagService
642

643
	pssService := pss.New(pssPrivateKey, logger)
acud's avatar
acud committed
644
	b.pssCloser = pssService
645

Zahoor Mohamed's avatar
Zahoor Mohamed committed
646 647 648
	var ns storage.Storer
	if o.GlobalPinningEnabled {
		// create recovery callback for content repair
649
		recoverFunc := recovery.NewCallback(pssService)
acud's avatar
acud committed
650
		ns = netstore.New(storer, validStamp, recoverFunc, retrieve, logger)
Zahoor Mohamed's avatar
Zahoor Mohamed committed
651
	} else {
acud's avatar
acud committed
652
		ns = netstore.New(storer, validStamp, nil, retrieve, logger)
Zahoor Mohamed's avatar
Zahoor Mohamed committed
653
	}
654

655
	traversalService := traversal.New(ns)
656

Peter Mrekaj's avatar
Peter Mrekaj committed
657 658
	pinningService := pinning.NewService(storer, stateStore, traversalService)

659
	pushSyncProtocol := pushsync.New(swarmAddress, blockHash, p2ps, storer, kad, tagService, o.FullNodeMode, pssService.TryUnwrap, validStamp, logger, acc, pricer, signer, tracer, warmupTime)
660

Zahoor Mohamed's avatar
Zahoor Mohamed committed
661
	// set the pushSyncer in the PSS
acud's avatar
acud committed
662
	pssService.SetPushSyncer(pushSyncProtocol)
Zahoor Mohamed's avatar
Zahoor Mohamed committed
663 664 665 666

	if o.GlobalPinningEnabled {
		// register function for chunk repair upon receiving a trojan message
		chunkRepairHandler := recovery.NewRepairHandler(ns, logger, pushSyncProtocol)
667
		b.recoveryHandleCleanup = pssService.Register(recovery.Topic, chunkRepairHandler)
Zahoor Mohamed's avatar
Zahoor Mohamed committed
668 669
	}

670
	pusherService := pusher.New(networkID, storer, kad, pushSyncProtocol, validStamp, tagService, logger, tracer, warmupTime)
671
	b.pusherCloser = pusherService
672

673 674
	pullStorage := pullstorage.New(storer)

acud's avatar
acud committed
675
	pullSyncProtocol := pullsync.New(p2ps, pullStorage, pssService.TryUnwrap, validStamp, logger)
676
	b.pullSyncCloser = pullSyncProtocol
677

678 679
	var pullerService *puller.Puller
	if o.FullNodeMode {
680
		pullerService := puller.New(stateStore, kad, pullSyncProtocol, logger, puller.Options{}, warmupTime)
681 682
		b.pullerCloser = pullerService
	}
683

684 685 686
	retrieveProtocolSpec := retrieve.Protocol()
	pushSyncProtocolSpec := pushSyncProtocol.Protocol()
	pullSyncProtocolSpec := pullSyncProtocol.Protocol()
687

688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705
	if o.FullNodeMode {
		logger.Info("starting in full mode")
	} else {
		logger.Info("starting in light mode")
		p2p.WithBlocklistStreams(p2p.DefaultBlocklistTime, retrieveProtocolSpec)
		p2p.WithBlocklistStreams(p2p.DefaultBlocklistTime, pushSyncProtocolSpec)
		p2p.WithBlocklistStreams(p2p.DefaultBlocklistTime, pullSyncProtocolSpec)
	}

	if err = p2ps.AddProtocol(retrieveProtocolSpec); err != nil {
		return nil, fmt.Errorf("retrieval service: %w", err)
	}
	if err = p2ps.AddProtocol(pushSyncProtocolSpec); err != nil {
		return nil, fmt.Errorf("pushsync service: %w", err)
	}
	if err = p2ps.AddProtocol(pullSyncProtocolSpec); err != nil {
		return nil, fmt.Errorf("pullsync protocol: %w", err)
	}
706

707 708 709 710
	multiResolver := multiresolver.NewMultiResolver(
		multiresolver.WithConnectionConfigs(o.ResolverConnectionCfgs),
		multiresolver.WithLogger(o.Logger),
	)
711 712
	b.resolverCloser = multiResolver

Janos Guljas's avatar
Janos Guljas committed
713 714 715
	var apiService api.Service
	if o.APIAddr != "" {
		// API server
716
		feedFactory := factory.New(ns)
717
		steward := steward.New(storer, traversalService, retrieve, pushSyncProtocol)
718
		apiService = api.New(tagService, ns, multiResolver, pssService, traversalService, pinningService, feedFactory, post, postageContractService, steward, signer, logger, tracer, api.Options{
719 720
			CORSAllowedOrigins: o.CORSAllowedOrigins,
			GatewayMode:        o.GatewayMode,
721
			WsPingPeriod:       60 * time.Second,
722
		})
Janos Guljas's avatar
Janos Guljas committed
723 724 725 726 727 728
		apiListener, err := net.Listen("tcp", o.APIAddr)
		if err != nil {
			return nil, fmt.Errorf("api listener: %w", err)
		}

		apiServer := &http.Server{
729 730 731 732
			IdleTimeout:       30 * time.Second,
			ReadHeaderTimeout: 3 * time.Second,
			Handler:           apiService,
			ErrorLog:          log.New(b.errorLogWriter, "", 0),
Janos Guljas's avatar
Janos Guljas committed
733 734 735 736 737 738
		}

		go func() {
			logger.Infof("api address: %s", apiListener.Addr())

			if err := apiServer.Serve(apiListener); err != nil && err != http.ErrServerClosed {
739 740
				logger.Debugf("api server: %v", err)
				logger.Error("unable to serve api")
Janos Guljas's avatar
Janos Guljas committed
741 742 743 744
			}
		}()

		b.apiServer = apiServer
745
		b.apiCloser = apiService
Janos Guljas's avatar
Janos Guljas committed
746 747
	}

748
	if debugAPIService != nil {
Janos Guljas's avatar
Janos Guljas committed
749 750 751
		// register metrics from components
		debugAPIService.MustRegisterMetrics(p2ps.Metrics()...)
		debugAPIService.MustRegisterMetrics(pingPong.Metrics()...)
752
		debugAPIService.MustRegisterMetrics(acc.Metrics()...)
753
		debugAPIService.MustRegisterMetrics(storer.Metrics()...)
754
		debugAPIService.MustRegisterMetrics(kad.Metrics()...)
755 756 757 758 759

		if pullerService != nil {
			debugAPIService.MustRegisterMetrics(pullerService.Metrics()...)
		}

760
		debugAPIService.MustRegisterMetrics(pushSyncProtocol.Metrics()...)
761 762
		debugAPIService.MustRegisterMetrics(pusherService.Metrics()...)
		debugAPIService.MustRegisterMetrics(pullSyncProtocol.Metrics()...)
763
		debugAPIService.MustRegisterMetrics(pullStorage.Metrics()...)
764
		debugAPIService.MustRegisterMetrics(retrieve.Metrics()...)
765
		debugAPIService.MustRegisterMetrics(lightNodes.Metrics()...)
766
		debugAPIService.MustRegisterMetrics(hive.Metrics()...)
767

acud's avatar
acud committed
768 769 770 771
		if bs, ok := batchStore.(metrics.Collector); ok {
			debugAPIService.MustRegisterMetrics(bs.Metrics()...)
		}

772 773 774 775 776 777
		if eventListener != nil {
			if ls, ok := eventListener.(metrics.Collector); ok {
				debugAPIService.MustRegisterMetrics(ls.Metrics()...)
			}
		}

acud's avatar
acud committed
778 779
		if pssServiceMetrics, ok := pssService.(metrics.Collector); ok {
			debugAPIService.MustRegisterMetrics(pssServiceMetrics.Metrics()...)
780
		}
781

Janos Guljas's avatar
Janos Guljas committed
782 783 784
		if apiService != nil {
			debugAPIService.MustRegisterMetrics(apiService.Metrics()...)
		}
785 786 787
		if l, ok := logger.(metrics.Collector); ok {
			debugAPIService.MustRegisterMetrics(l.Metrics()...)
		}
Janos Guljas's avatar
Janos Guljas committed
788

789 790 791 792
		debugAPIService.MustRegisterMetrics(pseudosettleService.Metrics()...)

		if swapService != nil {
			debugAPIService.MustRegisterMetrics(swapService.Metrics()...)
793 794
		}

795
		// inject dependencies and configure full debug api http path routes
796
		debugAPIService.Configure(swarmAddress, p2ps, pingPong, kad, lightNodes, storer, tagService, acc, pseudosettleService, o.SwapEnable, swapService, chequebookService, batchStore, post, postageContractService, traversalService)
Janos Guljas's avatar
Janos Guljas committed
797 798
	}

799 800
	if err := kad.Start(p2pCtx); err != nil {
		return nil, err
801
	}
802
	p2ps.Ready()
803

Janos Guljas's avatar
Janos Guljas committed
804 805 806 807
	return b, nil
}

func (b *Bee) Shutdown(ctx context.Context) error {
808
	var mErr error
809

810 811 812 813 814 815 816 817 818
	// if a shutdown is already in process, return here
	b.shutdownMutex.Lock()
	if b.shutdownInProgress {
		b.shutdownMutex.Unlock()
		return ErrShutdownInProgress
	}
	b.shutdownInProgress = true
	b.shutdownMutex.Unlock()

819 820 821 822 823 824 825
	// halt kademlia while shutting down other
	// components.
	b.topologyHalter.Halt()

	// halt p2p layer from accepting new connections
	// while shutting down other components
	b.p2pHalter.Halt()
826 827 828 829 830 831 832 833
	// tryClose is a convenient closure which decrease
	// repetitive io.Closer tryClose procedure.
	tryClose := func(c io.Closer, errMsg string) {
		if c == nil {
			return
		}
		if err := c.Close(); err != nil {
			mErr = multierror.Append(mErr, fmt.Errorf("%s: %w", errMsg, err))
834 835 836
		}
	}

837 838
	tryClose(b.apiCloser, "api")

Janos Guljas's avatar
Janos Guljas committed
839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855
	var eg errgroup.Group
	if b.apiServer != nil {
		eg.Go(func() error {
			if err := b.apiServer.Shutdown(ctx); err != nil {
				return fmt.Errorf("api server: %w", err)
			}
			return nil
		})
	}
	if b.debugAPIServer != nil {
		eg.Go(func() error {
			if err := b.debugAPIServer.Shutdown(ctx); err != nil {
				return fmt.Errorf("debug api server: %w", err)
			}
			return nil
		})
	}
856

Janos Guljas's avatar
Janos Guljas committed
857
	if err := eg.Wait(); err != nil {
858
		mErr = multierror.Append(mErr, err)
Janos Guljas's avatar
Janos Guljas committed
859 860
	}

861 862 863
	if b.recoveryHandleCleanup != nil {
		b.recoveryHandleCleanup()
	}
864
	var wg sync.WaitGroup
865
	wg.Add(6)
866 867 868 869 870 871 872 873 874 875 876 877
	go func() {
		defer wg.Done()
		tryClose(b.pssCloser, "pss")
	}()
	go func() {
		defer wg.Done()
		tryClose(b.pusherCloser, "pusher")
	}()
	go func() {
		defer wg.Done()
		tryClose(b.pullerCloser, "puller")
	}()
878 879 880 881
	go func() {
		defer wg.Done()
		tryClose(b.accountingCloser, "accounting")
	}()
882

Janos Guljas's avatar
Janos Guljas committed
883
	b.p2pCancel()
884 885 886 887
	go func() {
		defer wg.Done()
		tryClose(b.pullSyncCloser, "pull sync")
	}()
888 889 890 891
	go func() {
		defer wg.Done()
		tryClose(b.hiveCloser, "pull sync")
	}()
892 893 894

	wg.Wait()

895
	tryClose(b.p2pService, "p2p server")
896
	tryClose(b.priceOracleCloser, "price oracle service")
897 898 899 900 901

	wg.Add(3)
	go func() {
		defer wg.Done()
		tryClose(b.transactionMonitorCloser, "transaction monitor")
902
		tryClose(b.transactionCloser, "transaction")
903 904 905 906 907 908 909 910 911 912 913
	}()
	go func() {
		defer wg.Done()
		tryClose(b.listenerCloser, "listener")
	}()
	go func() {
		defer wg.Done()
		tryClose(b.postageServiceCloser, "postage service")
	}()

	wg.Wait()
914

915 916 917 918
	if c := b.ethClientCloser; c != nil {
		c()
	}

919 920
	tryClose(b.tracerCloser, "tracer")
	tryClose(b.tagsCloser, "tag persistence")
921
	tryClose(b.topologyCloser, "topology driver")
922 923 924 925
	tryClose(b.stateStoreCloser, "statestore")
	tryClose(b.localstoreCloser, "localstore")
	tryClose(b.errorLogWriter, "error log writer")
	tryClose(b.resolverCloser, "resolver service")
926

927
	return mErr
Janos Guljas's avatar
Janos Guljas committed
928
}
929

930 931 932 933 934 935 936 937 938 939 940
// pidKiller is used to issue a forced shut down of the node from sub modules. The issue with using the
// node's Shutdown method is that it only shuts down the node and does not exit the start process
// which is waiting on the os.Signals. This is not desirable, but currently bee node cannot handle
// rate-limiting blockchain API calls properly. We will shut down the node in this case to allow the
// user to rectify the API issues (by adjusting limits or using a different one). There is no platform
// agnostic way to trigger os.Signals in go unfortunately. Which is why we will use the process.Kill
// approach which works on windows as well.
type pidKiller struct {
	node *Bee
}

941 942
var ErrShutdownInProgress error = errors.New("shutdown in progress")

943 944 945 946 947 948 949 950 951 952 953
func (p *pidKiller) Shutdown(ctx context.Context) error {
	err := p.node.Shutdown(ctx)
	if err != nil {
		return err
	}
	ps, err := os.FindProcess(syscall.Getpid())
	if err != nil {
		return err
	}
	return ps.Kill()
}