node.go 25.1 KB
Newer Older
1
// Copyright 2020 The Swarm Authors. All rights reserved.
Janos Guljas's avatar
Janos Guljas committed
2 3 4
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

5 6 7
// Package node defines the concept of a Bee node
// by bootstrapping and injecting all necessary
// dependencies.
Janos Guljas's avatar
Janos Guljas committed
8 9 10 11
package node

import (
	"context"
12
	"crypto/ecdsa"
13
	"encoding/hex"
acud's avatar
acud committed
14
	"errors"
Janos Guljas's avatar
Janos Guljas committed
15 16 17
	"fmt"
	"io"
	"log"
18
	"math/big"
Janos Guljas's avatar
Janos Guljas committed
19 20
	"net"
	"net/http"
21
	"os"
22
	"path/filepath"
23
	"strings"
24
	"sync"
25
	"syscall"
26
	"time"
Janos Guljas's avatar
Janos Guljas committed
27

28
	"github.com/ethereum/go-ethereum/common"
29
	"github.com/ethereum/go-ethereum/core/types"
30
	"github.com/ethereum/go-ethereum/ethclient"
31
	"github.com/ethersphere/bee/pkg/accounting"
32
	"github.com/ethersphere/bee/pkg/addressbook"
33
	"github.com/ethersphere/bee/pkg/api"
34
	"github.com/ethersphere/bee/pkg/crypto"
35
	"github.com/ethersphere/bee/pkg/debugapi"
36
	"github.com/ethersphere/bee/pkg/feeds/factory"
37
	"github.com/ethersphere/bee/pkg/hive"
38
	"github.com/ethersphere/bee/pkg/localstore"
39 40
	"github.com/ethersphere/bee/pkg/logging"
	"github.com/ethersphere/bee/pkg/metrics"
41
	"github.com/ethersphere/bee/pkg/netstore"
42
	"github.com/ethersphere/bee/pkg/p2p"
43 44
	"github.com/ethersphere/bee/pkg/p2p/libp2p"
	"github.com/ethersphere/bee/pkg/pingpong"
Peter Mrekaj's avatar
Peter Mrekaj committed
45
	"github.com/ethersphere/bee/pkg/pinning"
acud's avatar
acud committed
46 47 48 49 50
	"github.com/ethersphere/bee/pkg/postage"
	"github.com/ethersphere/bee/pkg/postage/batchservice"
	"github.com/ethersphere/bee/pkg/postage/batchstore"
	"github.com/ethersphere/bee/pkg/postage/listener"
	"github.com/ethersphere/bee/pkg/postage/postagecontract"
51
	"github.com/ethersphere/bee/pkg/pricer"
52
	"github.com/ethersphere/bee/pkg/pricing"
Zahoor Mohamed's avatar
Zahoor Mohamed committed
53
	"github.com/ethersphere/bee/pkg/pss"
54 55 56
	"github.com/ethersphere/bee/pkg/puller"
	"github.com/ethersphere/bee/pkg/pullsync"
	"github.com/ethersphere/bee/pkg/pullsync/pullstorage"
57 58
	"github.com/ethersphere/bee/pkg/pusher"
	"github.com/ethersphere/bee/pkg/pushsync"
Zahoor Mohamed's avatar
Zahoor Mohamed committed
59
	"github.com/ethersphere/bee/pkg/recovery"
60
	"github.com/ethersphere/bee/pkg/resolver/multiresolver"
61
	"github.com/ethersphere/bee/pkg/retrieval"
62
	"github.com/ethersphere/bee/pkg/settlement/pseudosettle"
63
	"github.com/ethersphere/bee/pkg/settlement/swap"
64
	"github.com/ethersphere/bee/pkg/settlement/swap/chequebook"
65
	"github.com/ethersphere/bee/pkg/shed"
66
	"github.com/ethersphere/bee/pkg/steward"
67
	"github.com/ethersphere/bee/pkg/storage"
68
	"github.com/ethersphere/bee/pkg/swarm"
Zahoor Mohamed's avatar
Zahoor Mohamed committed
69
	"github.com/ethersphere/bee/pkg/tags"
70
	"github.com/ethersphere/bee/pkg/topology"
71 72
	"github.com/ethersphere/bee/pkg/topology/kademlia"
	"github.com/ethersphere/bee/pkg/topology/lightnode"
73
	"github.com/ethersphere/bee/pkg/tracing"
74
	"github.com/ethersphere/bee/pkg/transaction"
75
	"github.com/ethersphere/bee/pkg/traversal"
76
	"github.com/hashicorp/go-multierror"
77
	ma "github.com/multiformats/go-multiaddr"
78 79
	"github.com/sirupsen/logrus"
	"golang.org/x/sync/errgroup"
Janos Guljas's avatar
Janos Guljas committed
80 81 82
)

type Bee struct {
83
	p2pService               io.Closer
84
	p2pHalter                p2p.Halter
85 86 87 88 89 90 91 92 93 94 95
	p2pCancel                context.CancelFunc
	apiCloser                io.Closer
	apiServer                *http.Server
	debugAPIServer           *http.Server
	resolverCloser           io.Closer
	errorLogWriter           *io.PipeWriter
	tracerCloser             io.Closer
	tagsCloser               io.Closer
	stateStoreCloser         io.Closer
	localstoreCloser         io.Closer
	topologyCloser           io.Closer
96
	topologyHalter           topology.Halter
97 98 99 100 101 102 103
	pusherCloser             io.Closer
	pullerCloser             io.Closer
	pullSyncCloser           io.Closer
	pssCloser                io.Closer
	ethClientCloser          func()
	transactionMonitorCloser io.Closer
	recoveryHandleCleanup    func()
acud's avatar
acud committed
104
	listenerCloser           io.Closer
105
	postageServiceCloser     io.Closer
Janos Guljas's avatar
Janos Guljas committed
106 107 108
}

type Options struct {
109
	DataDir                    string
110
	CacheCapacity              uint64
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
	DBOpenFilesLimit           uint64
	DBWriteBufferSize          uint64
	DBBlockCacheCapacity       uint64
	DBDisableSeeksCompaction   bool
	APIAddr                    string
	DebugAPIAddr               string
	Addr                       string
	NATAddr                    string
	EnableWS                   bool
	EnableQUIC                 bool
	WelcomeMessage             string
	Bootnodes                  []string
	CORSAllowedOrigins         []string
	Logger                     logging.Logger
	Standalone                 bool
	TracingEnabled             bool
	TracingEndpoint            string
	TracingServiceName         string
	GlobalPinningEnabled       bool
	PaymentThreshold           string
	PaymentTolerance           string
	PaymentEarly               string
	ResolverConnectionCfgs     []multiresolver.ConnectionConfig
	GatewayMode                bool
	BootnodeMode               bool
	SwapEndpoint               string
	SwapFactoryAddress         string
	SwapLegacyFactoryAddresses []string
	SwapInitialDeposit         string
	SwapEnable                 bool
	FullNodeMode               bool
	Transaction                string
	PostageContractAddress     string
	PriceOracleAddress         string
	BlockTime                  uint64
146
	DeployGasPrice             string
Janos Guljas's avatar
Janos Guljas committed
147 148
}

149 150 151 152 153
const (
	refreshRate = int64(1000000000000)
	basePrice   = 1000000000
)

154
func NewBee(addr string, swarmAddress swarm.Address, publicKey ecdsa.PublicKey, signer crypto.Signer, networkID uint64, logger logging.Logger, libp2pPrivateKey, pssPrivateKey *ecdsa.PrivateKey, o Options) (b *Bee, err error) {
155 156 157 158 159 160 161 162 163
	tracer, tracerCloser, err := tracing.NewTracer(&tracing.Options{
		Enabled:     o.TracingEnabled,
		Endpoint:    o.TracingEndpoint,
		ServiceName: o.TracingServiceName,
	})
	if err != nil {
		return nil, fmt.Errorf("tracer: %w", err)
	}

Janos Guljas's avatar
Janos Guljas committed
164
	p2pCtx, p2pCancel := context.WithCancel(context.Background())
165 166 167 168 169 170 171 172
	defer func() {
		// if there's been an error on this function
		// we'd like to cancel the p2p context so that
		// incoming connections will not be possible
		if err != nil {
			p2pCancel()
		}
	}()
Janos Guljas's avatar
Janos Guljas committed
173

174
	b = &Bee{
Janos Guljas's avatar
Janos Guljas committed
175 176
		p2pCancel:      p2pCancel,
		errorLogWriter: logger.WriterLevel(logrus.ErrorLevel),
177
		tracerCloser:   tracerCloser,
Janos Guljas's avatar
Janos Guljas committed
178 179
	}

180
	var debugAPIService *debugapi.Service
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
	if o.DebugAPIAddr != "" {
		overlayEthAddress, err := signer.EthereumAddress()
		if err != nil {
			return nil, fmt.Errorf("eth address: %w", err)
		}
		// set up basic debug api endpoints for debugging and /health endpoint
		debugAPIService = debugapi.New(swarmAddress, publicKey, pssPrivateKey.PublicKey, overlayEthAddress, logger, tracer, o.CORSAllowedOrigins)

		debugAPIListener, err := net.Listen("tcp", o.DebugAPIAddr)
		if err != nil {
			return nil, fmt.Errorf("debug api listener: %w", err)
		}

		debugAPIServer := &http.Server{
			IdleTimeout:       30 * time.Second,
			ReadHeaderTimeout: 3 * time.Second,
			Handler:           debugAPIService,
			ErrorLog:          log.New(b.errorLogWriter, "", 0),
		}

		go func() {
			logger.Infof("debug api address: %s", debugAPIListener.Addr())

			if err := debugAPIServer.Serve(debugAPIListener); err != nil && err != http.ErrServerClosed {
				logger.Debugf("debug api server: %v", err)
				logger.Error("unable to serve debug api")
			}
		}()

		b.debugAPIServer = debugAPIServer
	}

213 214 215
	stateStore, err := InitStateStore(logger, o.DataDir)
	if err != nil {
		return nil, err
216 217
	}
	b.stateStoreCloser = stateStore
218

219 220 221 222 223
	err = CheckOverlayWithStore(swarmAddress, stateStore)
	if err != nil {
		return nil, err
	}

224 225
	addressbook := addressbook.New(stateStore)

226 227 228 229 230 231 232 233 234 235
	var (
		swapBackend        *ethclient.Client
		overlayEthAddress  common.Address
		chainID            int64
		transactionService transaction.Service
		transactionMonitor transaction.Monitor
		chequebookFactory  chequebook.Factory
		chequebookService  chequebook.Service
		chequeStore        chequebook.ChequeStore
		cashoutService     chequebook.CashoutService
acud's avatar
acud committed
236
	)
237 238 239 240 241 242 243
	if !o.Standalone {
		swapBackend, overlayEthAddress, chainID, transactionMonitor, transactionService, err = InitChain(
			p2pCtx,
			logger,
			stateStore,
			o.SwapEndpoint,
			signer,
244
			o.BlockTime,
245 246 247 248 249 250
		)
		if err != nil {
			return nil, fmt.Errorf("init chain: %w", err)
		}
		b.ethClientCloser = swapBackend.Close
		b.transactionMonitorCloser = transactionMonitor
acud's avatar
acud committed
251 252 253
	}

	if o.SwapEnable {
254 255 256 257 258 259
		chequebookFactory, err = InitChequebookFactory(
			logger,
			swapBackend,
			chainID,
			transactionService,
			o.SwapFactoryAddress,
260
			o.SwapLegacyFactoryAddresses,
261
		)
262
		if err != nil {
263
			return nil, err
264 265
		}

266 267
		if err = chequebookFactory.VerifyBytecode(p2pCtx); err != nil {
			return nil, fmt.Errorf("factory fail: %w", err)
268 269
		}

270 271
		chequebookService, err = InitChequebookService(
			p2pCtx,
272
			logger,
273 274 275
			stateStore,
			signer,
			chainID,
276 277
			swapBackend,
			overlayEthAddress,
278 279 280
			transactionService,
			chequebookFactory,
			o.SwapInitialDeposit,
281
			o.DeployGasPrice,
282
		)
283 284 285
		if err != nil {
			return nil, err
		}
286

287
		chequeStore, cashoutService = initChequeStoreCashout(
288 289 290 291 292 293 294
			stateStore,
			swapBackend,
			chequebookFactory,
			chainID,
			overlayEthAddress,
			transactionService,
		)
295 296
	}

297
	lightNodes := lightnode.NewContainer(swarmAddress)
298

299
	txHash, err := getTxHash(stateStore, logger, o)
300
	if err != nil {
301
		return nil, fmt.Errorf("invalid transaction hash: %w", err)
302 303 304 305 306
	}

	senderMatcher := transaction.NewMatcher(swapBackend, types.NewEIP155Signer(big.NewInt(chainID)))

	p2ps, err := libp2p.New(p2pCtx, signer, networkID, swarmAddress, addr, addressbook, stateStore, lightNodes, senderMatcher, logger, tracer, libp2p.Options{
307 308
		PrivateKey:     libp2pPrivateKey,
		NATAddr:        o.NATAddr,
309 310
		EnableWS:       o.EnableWS,
		EnableQUIC:     o.EnableQUIC,
311
		Standalone:     o.Standalone,
312
		WelcomeMessage: o.WelcomeMessage,
313
		FullNode:       o.FullNodeMode,
314
		Transaction:    txHash,
315
	})
Janos Guljas's avatar
Janos Guljas committed
316 317 318
	if err != nil {
		return nil, fmt.Errorf("p2p service: %w", err)
	}
319
	b.p2pService = p2ps
320
	b.p2pHalter = p2ps
Janos Guljas's avatar
Janos Guljas committed
321

acud's avatar
acud committed
322 323 324 325
	// localstore depends on batchstore
	var path string

	if o.DataDir != "" {
326
		logger.Infof("using datadir in: '%s'", o.DataDir)
acud's avatar
acud committed
327 328 329
		path = filepath.Join(o.DataDir, "localstore")
	}
	lo := &localstore.Options{
330
		Capacity:               o.CacheCapacity,
acud's avatar
acud committed
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347
		OpenFilesLimit:         o.DBOpenFilesLimit,
		BlockCacheCapacity:     o.DBBlockCacheCapacity,
		WriteBufferSize:        o.DBWriteBufferSize,
		DisableSeeksCompaction: o.DBDisableSeeksCompaction,
	}

	storer, err := localstore.New(path, swarmAddress.Bytes(), lo, logger)
	if err != nil {
		return nil, fmt.Errorf("localstore: %w", err)
	}
	b.localstoreCloser = storer

	batchStore, err := batchstore.New(stateStore, storer.UnreserveBatch)
	if err != nil {
		return nil, fmt.Errorf("batchstore: %w", err)
	}
	validStamp := postage.ValidStamp(batchStore)
348 349 350 351 352
	post, err := postage.NewService(stateStore, chainID)
	if err != nil {
		return nil, fmt.Errorf("postage service load: %w", err)
	}
	b.postageServiceCloser = post
acud's avatar
acud committed
353 354 355 356

	var (
		postageContractService postagecontract.Interface
		batchSvc               postage.EventUpdater
357
		eventListener          postage.Listener
acud's avatar
acud committed
358 359
	)

360
	var postageSyncStart uint64 = 0
acud's avatar
acud committed
361
	if !o.Standalone {
362
		postageContractAddress, startBlock, found := listener.DiscoverAddresses(chainID)
acud's avatar
acud committed
363 364 365 366 367
		if o.PostageContractAddress != "" {
			if !common.IsHexAddress(o.PostageContractAddress) {
				return nil, errors.New("malformed postage stamp address")
			}
			postageContractAddress = common.HexToAddress(o.PostageContractAddress)
368
		} else if !found {
acud's avatar
acud committed
369 370
			return nil, errors.New("no known postage stamp addresses for this network")
		}
371 372 373
		if found {
			postageSyncStart = startBlock
		}
acud's avatar
acud committed
374

375
		eventListener = listener.New(logger, swapBackend, postageContractAddress, o.BlockTime, &pidKiller{node: b})
acud's avatar
acud committed
376 377
		b.listenerCloser = eventListener

378
		batchSvc = batchservice.New(stateStore, batchStore, logger, eventListener)
acud's avatar
acud committed
379 380 381 382 383 384 385 386 387 388 389 390 391 392 393

		erc20Address, err := postagecontract.LookupERC20Address(p2pCtx, transactionService, postageContractAddress)
		if err != nil {
			return nil, err
		}

		postageContractService = postagecontract.New(
			overlayEthAddress,
			postageContractAddress,
			erc20Address,
			transactionService,
			post,
		)
	}

394 395 396 397 398 399 400 401 402 403 404 405 406
	if !o.Standalone {
		if natManager := p2ps.NATManager(); natManager != nil {
			// wait for nat manager to init
			logger.Debug("initializing NAT manager")
			select {
			case <-natManager.Ready():
				// this is magic sleep to give NAT time to sync the mappings
				// this is a hack, kind of alchemy and should be improved
				time.Sleep(3 * time.Second)
				logger.Debug("NAT manager initialized")
			case <-time.After(10 * time.Second):
				logger.Warning("NAT manager init timeout")
			}
407
		}
408 409
	}

Janos Guljas's avatar
Janos Guljas committed
410
	// Construct protocols.
411
	pingPong := pingpong.New(p2ps, logger, tracer)
Janos Guljas's avatar
Janos Guljas committed
412 413 414 415 416

	if err = p2ps.AddProtocol(pingPong.Protocol()); err != nil {
		return nil, fmt.Errorf("pingpong service: %w", err)
	}

417
	hive := hive.New(p2ps, addressbook, networkID, logger)
418 419 420 421
	if err = p2ps.AddProtocol(hive.Protocol()); err != nil {
		return nil, fmt.Errorf("hive service: %w", err)
	}

422
	var bootnodes []ma.Multiaddr
423 424 425 426 427 428 429 430 431 432 433 434
	if o.Standalone {
		logger.Info("Starting node in standalone mode, no p2p connections will be made or accepted")
	} else {
		for _, a := range o.Bootnodes {
			addr, err := ma.NewMultiaddr(a)
			if err != nil {
				logger.Debugf("multiaddress fail %s: %v", a, err)
				logger.Warningf("invalid bootnode address %s", a)
				continue
			}

			bootnodes = append(bootnodes, addr)
435 436 437
		}
	}

438 439
	var swapService *swap.Service

440 441 442 443 444 445
	metricsDB, err := shed.NewDBWrap(stateStore.DB())
	if err != nil {
		return nil, fmt.Errorf("unable to create metrics storage for kademlia: %w", err)
	}

	kad := kademlia.New(swarmAddress, addressbook, hive, p2ps, metricsDB, logger, kademlia.Options{Bootnodes: bootnodes, StandaloneMode: o.Standalone, BootnodeMode: o.BootnodeMode})
446
	b.topologyCloser = kad
447
	b.topologyHalter = kad
448 449
	hive.SetAddPeersHandler(kad.AddPeers)
	p2ps.SetPickyNotifier(kad)
acud's avatar
acud committed
450 451
	batchStore.SetRadiusSetter(kad)

452
	if batchSvc != nil {
453 454 455 456
		syncedChan, err := batchSvc.Start(postageSyncStart)
		if err != nil {
			return nil, fmt.Errorf("unable to start batch service: %w", err)
		}
457 458
		// wait for the postage contract listener to sync
		logger.Info("waiting to sync postage contract data, this may take a while... more info available in Debug loglevel")
acud's avatar
acud committed
459

460 461 462 463 464
		// arguably this is not a very nice solution since we dont support
		// interrupts at this stage of the application lifecycle. some changes
		// would be needed on the cmd level to support context cancellation at
		// this stage
		<-syncedChan
465

466
	}
467 468 469 470 471
	paymentThreshold, ok := new(big.Int).SetString(o.PaymentThreshold, 10)
	if !ok {
		return nil, fmt.Errorf("invalid payment threshold: %s", paymentThreshold)
	}

472
	pricer := pricer.NewFixedPricer(swarmAddress, basePrice)
473

474 475 476
	minThreshold := pricer.MostExpensive()

	pricing := pricing.New(p2ps, logger, paymentThreshold, minThreshold)
477 478 479 480 481 482 483 484 485 486 487 488 489 490

	if err = p2ps.AddProtocol(pricing.Protocol()); err != nil {
		return nil, fmt.Errorf("pricing service: %w", err)
	}

	addrs, err := p2ps.Addresses()
	if err != nil {
		return nil, fmt.Errorf("get server addresses: %w", err)
	}

	for _, addr := range addrs {
		logger.Debugf("p2p address: %s", addr)
	}

491 492 493 494 495 496 497 498
	paymentTolerance, ok := new(big.Int).SetString(o.PaymentTolerance, 10)
	if !ok {
		return nil, fmt.Errorf("invalid payment tolerance: %s", paymentTolerance)
	}
	paymentEarly, ok := new(big.Int).SetString(o.PaymentEarly, 10)
	if !ok {
		return nil, fmt.Errorf("invalid payment early: %s", paymentEarly)
	}
499

500 501 502 503 504 505 506
	acc, err := accounting.NewAccounting(
		paymentThreshold,
		paymentTolerance,
		paymentEarly,
		logger,
		stateStore,
		pricing,
507
		big.NewInt(refreshRate),
508 509 510 511 512
	)
	if err != nil {
		return nil, fmt.Errorf("accounting: %w", err)
	}

513 514 515 516 517 518 519
	pseudosettleService := pseudosettle.New(p2ps, logger, stateStore, acc, big.NewInt(refreshRate), p2ps)
	if err = p2ps.AddProtocol(pseudosettleService.Protocol()); err != nil {
		return nil, fmt.Errorf("pseudosettle service: %w", err)
	}

	acc.SetRefreshFunc(pseudosettleService.Pay)

520
	if o.SwapEnable {
521 522 523 524 525 526 527 528 529
		swapService, err = InitSwap(
			p2ps,
			logger,
			stateStore,
			networkID,
			overlayEthAddress,
			chequebookService,
			chequeStore,
			cashoutService,
530
			acc,
531 532 533
		)
		if err != nil {
			return nil, err
534
		}
535
		acc.SetPayFunc(swapService.Pay)
536 537
	}

538
	pricing.SetPaymentThresholdObserver(acc)
Janos Guljas's avatar
Janos Guljas committed
539

540
	retrieve := retrieval.New(swarmAddress, storer, p2ps, kad, logger, acc, pricer, tracer)
acud's avatar
acud committed
541 542
	tagService := tags.NewTags(stateStore, logger)
	b.tagsCloser = tagService
543

544
	pssService := pss.New(pssPrivateKey, logger)
acud's avatar
acud committed
545
	b.pssCloser = pssService
546

Zahoor Mohamed's avatar
Zahoor Mohamed committed
547 548 549
	var ns storage.Storer
	if o.GlobalPinningEnabled {
		// create recovery callback for content repair
550
		recoverFunc := recovery.NewCallback(pssService)
acud's avatar
acud committed
551
		ns = netstore.New(storer, validStamp, recoverFunc, retrieve, logger)
Zahoor Mohamed's avatar
Zahoor Mohamed committed
552
	} else {
acud's avatar
acud committed
553
		ns = netstore.New(storer, validStamp, nil, retrieve, logger)
Zahoor Mohamed's avatar
Zahoor Mohamed committed
554
	}
555

556
	traversalService := traversal.New(ns)
557

Peter Mrekaj's avatar
Peter Mrekaj committed
558 559
	pinningService := pinning.NewService(storer, stateStore, traversalService)

acud's avatar
acud committed
560
	pushSyncProtocol := pushsync.New(swarmAddress, p2ps, storer, kad, tagService, o.FullNodeMode, pssService.TryUnwrap, validStamp, logger, acc, pricer, signer, tracer)
561

Zahoor Mohamed's avatar
Zahoor Mohamed committed
562
	// set the pushSyncer in the PSS
acud's avatar
acud committed
563
	pssService.SetPushSyncer(pushSyncProtocol)
Zahoor Mohamed's avatar
Zahoor Mohamed committed
564 565 566 567

	if o.GlobalPinningEnabled {
		// register function for chunk repair upon receiving a trojan message
		chunkRepairHandler := recovery.NewRepairHandler(ns, logger, pushSyncProtocol)
568
		b.recoveryHandleCleanup = pssService.Register(recovery.Topic, chunkRepairHandler)
Zahoor Mohamed's avatar
Zahoor Mohamed committed
569 570
	}

571 572
	pusherService := pusher.New(networkID, storer, kad, pushSyncProtocol, tagService, logger, tracer)
	b.pusherCloser = pusherService
573

574 575
	pullStorage := pullstorage.New(storer)

acud's avatar
acud committed
576
	pullSyncProtocol := pullsync.New(p2ps, pullStorage, pssService.TryUnwrap, validStamp, logger)
577
	b.pullSyncCloser = pullSyncProtocol
578

579 580 581 582 583
	var pullerService *puller.Puller
	if o.FullNodeMode {
		pullerService := puller.New(stateStore, kad, pullSyncProtocol, logger, puller.Options{})
		b.pullerCloser = pullerService
	}
584

585 586 587
	retrieveProtocolSpec := retrieve.Protocol()
	pushSyncProtocolSpec := pushSyncProtocol.Protocol()
	pullSyncProtocolSpec := pullSyncProtocol.Protocol()
588

589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606
	if o.FullNodeMode {
		logger.Info("starting in full mode")
	} else {
		logger.Info("starting in light mode")
		p2p.WithBlocklistStreams(p2p.DefaultBlocklistTime, retrieveProtocolSpec)
		p2p.WithBlocklistStreams(p2p.DefaultBlocklistTime, pushSyncProtocolSpec)
		p2p.WithBlocklistStreams(p2p.DefaultBlocklistTime, pullSyncProtocolSpec)
	}

	if err = p2ps.AddProtocol(retrieveProtocolSpec); err != nil {
		return nil, fmt.Errorf("retrieval service: %w", err)
	}
	if err = p2ps.AddProtocol(pushSyncProtocolSpec); err != nil {
		return nil, fmt.Errorf("pushsync service: %w", err)
	}
	if err = p2ps.AddProtocol(pullSyncProtocolSpec); err != nil {
		return nil, fmt.Errorf("pullsync protocol: %w", err)
	}
607

608 609 610 611
	multiResolver := multiresolver.NewMultiResolver(
		multiresolver.WithConnectionConfigs(o.ResolverConnectionCfgs),
		multiresolver.WithLogger(o.Logger),
	)
612 613
	b.resolverCloser = multiResolver

Janos Guljas's avatar
Janos Guljas committed
614 615 616
	var apiService api.Service
	if o.APIAddr != "" {
		// API server
617
		feedFactory := factory.New(ns)
618 619
		steward := steward.New(storer, traversalService, pushSyncProtocol)
		apiService = api.New(tagService, ns, multiResolver, pssService, traversalService, pinningService, feedFactory, post, postageContractService, steward, signer, logger, tracer, api.Options{
620 621
			CORSAllowedOrigins: o.CORSAllowedOrigins,
			GatewayMode:        o.GatewayMode,
622
			WsPingPeriod:       60 * time.Second,
623
		})
Janos Guljas's avatar
Janos Guljas committed
624 625 626 627 628 629
		apiListener, err := net.Listen("tcp", o.APIAddr)
		if err != nil {
			return nil, fmt.Errorf("api listener: %w", err)
		}

		apiServer := &http.Server{
630 631 632 633
			IdleTimeout:       30 * time.Second,
			ReadHeaderTimeout: 3 * time.Second,
			Handler:           apiService,
			ErrorLog:          log.New(b.errorLogWriter, "", 0),
Janos Guljas's avatar
Janos Guljas committed
634 635 636 637 638 639
		}

		go func() {
			logger.Infof("api address: %s", apiListener.Addr())

			if err := apiServer.Serve(apiListener); err != nil && err != http.ErrServerClosed {
640 641
				logger.Debugf("api server: %v", err)
				logger.Error("unable to serve api")
Janos Guljas's avatar
Janos Guljas committed
642 643 644 645
			}
		}()

		b.apiServer = apiServer
646
		b.apiCloser = apiService
Janos Guljas's avatar
Janos Guljas committed
647 648
	}

649
	if debugAPIService != nil {
Janos Guljas's avatar
Janos Guljas committed
650 651 652
		// register metrics from components
		debugAPIService.MustRegisterMetrics(p2ps.Metrics()...)
		debugAPIService.MustRegisterMetrics(pingPong.Metrics()...)
653
		debugAPIService.MustRegisterMetrics(acc.Metrics()...)
654
		debugAPIService.MustRegisterMetrics(storer.Metrics()...)
655 656 657 658 659

		if pullerService != nil {
			debugAPIService.MustRegisterMetrics(pullerService.Metrics()...)
		}

660
		debugAPIService.MustRegisterMetrics(pushSyncProtocol.Metrics()...)
661 662
		debugAPIService.MustRegisterMetrics(pusherService.Metrics()...)
		debugAPIService.MustRegisterMetrics(pullSyncProtocol.Metrics()...)
663
		debugAPIService.MustRegisterMetrics(pullStorage.Metrics()...)
664
		debugAPIService.MustRegisterMetrics(retrieve.Metrics()...)
665

acud's avatar
acud committed
666 667 668 669
		if bs, ok := batchStore.(metrics.Collector); ok {
			debugAPIService.MustRegisterMetrics(bs.Metrics()...)
		}

670 671 672 673 674 675
		if eventListener != nil {
			if ls, ok := eventListener.(metrics.Collector); ok {
				debugAPIService.MustRegisterMetrics(ls.Metrics()...)
			}
		}

acud's avatar
acud committed
676 677
		if pssServiceMetrics, ok := pssService.(metrics.Collector); ok {
			debugAPIService.MustRegisterMetrics(pssServiceMetrics.Metrics()...)
678
		}
679

Janos Guljas's avatar
Janos Guljas committed
680 681 682
		if apiService != nil {
			debugAPIService.MustRegisterMetrics(apiService.Metrics()...)
		}
683 684 685
		if l, ok := logger.(metrics.Collector); ok {
			debugAPIService.MustRegisterMetrics(l.Metrics()...)
		}
Janos Guljas's avatar
Janos Guljas committed
686

687 688 689 690
		debugAPIService.MustRegisterMetrics(pseudosettleService.Metrics()...)

		if swapService != nil {
			debugAPIService.MustRegisterMetrics(swapService.Metrics()...)
691 692
		}

693
		// inject dependencies and configure full debug api http path routes
694
		debugAPIService.Configure(p2ps, pingPong, kad, lightNodes, storer, tagService, acc, pseudosettleService, o.SwapEnable, swapService, chequebookService, batchStore)
Janos Guljas's avatar
Janos Guljas committed
695 696
	}

697 698
	if err := kad.Start(p2pCtx); err != nil {
		return nil, err
699
	}
700
	p2ps.Ready()
701

Janos Guljas's avatar
Janos Guljas committed
702 703 704 705
	return b, nil
}

func (b *Bee) Shutdown(ctx context.Context) error {
706
	var mErr error
707

708 709 710 711 712 713 714 715
	// halt kademlia while shutting down other
	// components.
	b.topologyHalter.Halt()

	// halt p2p layer from accepting new connections
	// while shutting down other components
	b.p2pHalter.Halt()

716 717 718 719 720 721 722 723
	// tryClose is a convenient closure which decrease
	// repetitive io.Closer tryClose procedure.
	tryClose := func(c io.Closer, errMsg string) {
		if c == nil {
			return
		}
		if err := c.Close(); err != nil {
			mErr = multierror.Append(mErr, fmt.Errorf("%s: %w", errMsg, err))
724 725 726
		}
	}

727 728
	tryClose(b.apiCloser, "api")

Janos Guljas's avatar
Janos Guljas committed
729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745
	var eg errgroup.Group
	if b.apiServer != nil {
		eg.Go(func() error {
			if err := b.apiServer.Shutdown(ctx); err != nil {
				return fmt.Errorf("api server: %w", err)
			}
			return nil
		})
	}
	if b.debugAPIServer != nil {
		eg.Go(func() error {
			if err := b.debugAPIServer.Shutdown(ctx); err != nil {
				return fmt.Errorf("debug api server: %w", err)
			}
			return nil
		})
	}
746

Janos Guljas's avatar
Janos Guljas committed
747
	if err := eg.Wait(); err != nil {
748
		mErr = multierror.Append(mErr, err)
Janos Guljas's avatar
Janos Guljas committed
749 750
	}

751 752 753
	if b.recoveryHandleCleanup != nil {
		b.recoveryHandleCleanup()
	}
754 755 756 757 758 759 760 761 762 763 764 765 766 767
	var wg sync.WaitGroup
	wg.Add(4)
	go func() {
		defer wg.Done()
		tryClose(b.pssCloser, "pss")
	}()
	go func() {
		defer wg.Done()
		tryClose(b.pusherCloser, "pusher")
	}()
	go func() {
		defer wg.Done()
		tryClose(b.pullerCloser, "puller")
	}()
768

Janos Guljas's avatar
Janos Guljas committed
769
	b.p2pCancel()
770 771 772 773 774 775 776
	go func() {
		defer wg.Done()
		tryClose(b.pullSyncCloser, "pull sync")
	}()

	wg.Wait()

777
	tryClose(b.p2pService, "p2p server")
778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793

	wg.Add(3)
	go func() {
		defer wg.Done()
		tryClose(b.transactionMonitorCloser, "transaction monitor")
	}()
	go func() {
		defer wg.Done()
		tryClose(b.listenerCloser, "listener")
	}()
	go func() {
		defer wg.Done()
		tryClose(b.postageServiceCloser, "postage service")
	}()

	wg.Wait()
794

795 796 797 798
	if c := b.ethClientCloser; c != nil {
		c()
	}

799 800
	tryClose(b.tracerCloser, "tracer")
	tryClose(b.tagsCloser, "tag persistence")
801
	tryClose(b.topologyCloser, "topology driver")
802 803 804 805
	tryClose(b.stateStoreCloser, "statestore")
	tryClose(b.localstoreCloser, "localstore")
	tryClose(b.errorLogWriter, "error log writer")
	tryClose(b.resolverCloser, "resolver service")
806

807
	return mErr
Janos Guljas's avatar
Janos Guljas committed
808
}
809

810 811 812 813
func getTxHash(stateStore storage.StateStorer, logger logging.Logger, o Options) ([]byte, error) {
	if o.Standalone {
		return nil, nil // in standalone mode tx hash is not used
	}
814 815 816 817 818 819 820 821 822 823 824 825

	if o.Transaction != "" {
		txHashTrimmed := strings.TrimPrefix(o.Transaction, "0x")
		if len(txHashTrimmed) != 64 {
			return nil, errors.New("invalid length")
		}
		txHash, err := hex.DecodeString(txHashTrimmed)
		if err != nil {
			return nil, err
		}
		logger.Infof("using the provided transaction hash %x", txHash)
		return txHash, nil
826 827 828 829 830
	}

	var txHash common.Hash
	key := chequebook.ChequebookDeploymentKey
	if err := stateStore.Get(key, &txHash); err != nil {
831 832 833
		if errors.Is(err, storage.ErrNotFound) {
			return nil, errors.New("chequebook deployment transaction hash not found. Please specify the transaction hash manually.")
		}
834 835 836
		return nil, err
	}

837
	logger.Infof("using the chequebook transaction hash %x", txHash)
838 839
	return txHash.Bytes(), nil
}
840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862

// pidKiller is used to issue a forced shut down of the node from sub modules. The issue with using the
// node's Shutdown method is that it only shuts down the node and does not exit the start process
// which is waiting on the os.Signals. This is not desirable, but currently bee node cannot handle
// rate-limiting blockchain API calls properly. We will shut down the node in this case to allow the
// user to rectify the API issues (by adjusting limits or using a different one). There is no platform
// agnostic way to trigger os.Signals in go unfortunately. Which is why we will use the process.Kill
// approach which works on windows as well.
type pidKiller struct {
	node *Bee
}

func (p *pidKiller) Shutdown(ctx context.Context) error {
	err := p.node.Shutdown(ctx)
	if err != nil {
		return err
	}
	ps, err := os.FindProcess(syscall.Getpid())
	if err != nil {
		return err
	}
	return ps.Kill()
}