Commit 367fbe7b authored by jianhua.zhang's avatar jianhua.zhang

区块数据、账户余额抓取数据失败后,将数据重新push到队列中

parent 6699e8fb
...@@ -65,9 +65,17 @@ public class Bootstrapper { ...@@ -65,9 +65,17 @@ public class Bootstrapper {
public synchronized void start() { public synchronized void start() {
this.init(); this.init();
this.startEngine(); this.startEngine();
// 区块同步到redis
this.executorService.scheduleWithFixedDelay(() -> this.lastBlockService.sync(), 0, 1, TimeUnit.SECONDS); this.executorService.scheduleWithFixedDelay(() -> this.lastBlockService.sync(), 0, 1, TimeUnit.SECONDS);
// 账户总数统计
this.executorService.scheduleWithFixedDelay(() -> this.censusService.updateTotalAccount(), 0, 5, TimeUnit.SECONDS); this.executorService.scheduleWithFixedDelay(() -> this.censusService.updateTotalAccount(), 0, 5, TimeUnit.SECONDS);
this.executorService.scheduleWithFixedDelay(() -> this.lastBlockService.refresh(), 0, 500, TimeUnit.MILLISECONDS);
// 抓取最新区块高度
this.executorService.scheduleWithFixedDelay(() -> this.lastBlockService.refresh(), 0, 1, TimeUnit.SECONDS);
// 统计前一天区块、交易数据
this.executorService.scheduleWithFixedDelay(() -> this.transactionService.censusBlockByLastDay(), getInitialDelay(), 24 * 60 * 60, TimeUnit.SECONDS); this.executorService.scheduleWithFixedDelay(() -> this.transactionService.censusBlockByLastDay(), getInitialDelay(), 24 * 60 * 60, TimeUnit.SECONDS);
} }
......
...@@ -30,7 +30,7 @@ import java.util.Date; ...@@ -30,7 +30,7 @@ import java.util.Date;
import java.util.List; import java.util.List;
/** /**
* <core>API接口</core> * <core>波场浏览器API接口</core>
* *
* @author sky * @author sky
* @date 2020/11/02 * @date 2020/11/02
...@@ -122,11 +122,12 @@ public class TransactionControllerV1 { ...@@ -122,11 +122,12 @@ public class TransactionControllerV1 {
*/ */
if (condition.length()== Constant.TX_ID_LEN) { if (condition.length()== Constant.TX_ID_LEN) {
ResDataModel<AddessInfoModel> resDataModel = new ResDataModel<>(); ResDataModel<AddessInfoModel> resDataModel = new ResDataModel<>();
TransInfoModel model = TransInfoModel.getInstance();
TransactionExample example = new TransactionExample(); TransactionExample example = new TransactionExample();
example.createCriteria().andTxIdEqualTo(condition); example.createCriteria().andTxIdEqualTo(condition);
List<Transaction> txList = this.transactionService.getByExample(example); List<Transaction> txList = this.transactionService.getByExample(example);
if (!CollectionUtils.isEmpty(txList)) { List<TransInfoModel> modelList = transferTransInfoModel(txList);
TransInfoModel model = modelList.get(0);
/*if (!CollectionUtils.isEmpty(txList)) {
Transaction trans = txList.get(0); Transaction trans = txList.get(0);
model.setHash(trans.getTxId()); model.setHash(trans.getTxId());
model.setBlockHash(trans.getBlockId()); model.setBlockHash(trans.getBlockId());
...@@ -142,7 +143,7 @@ public class TransactionControllerV1 { ...@@ -142,7 +143,7 @@ public class TransactionControllerV1 {
Long tt = trans.getTimestamp()/1000; Long tt = trans.getTimestamp()/1000;
model.setTimestamp(tt.toString()); model.setTimestamp(tt.toString());
} }
} }*/
resDataModel.setData(model); resDataModel.setData(model);
resDataModel.setT(HomeSearchTypeEnum.TRANSACTION_INFO.getCode()); resDataModel.setT(HomeSearchTypeEnum.TRANSACTION_INFO.getCode());
return ResponseKit.success(resDataModel); return ResponseKit.success(resDataModel);
...@@ -239,7 +240,7 @@ public class TransactionControllerV1 { ...@@ -239,7 +240,7 @@ public class TransactionControllerV1 {
public ApiResponse lastTransList() { public ApiResponse lastTransList() {
PageInfo<Transaction> pageInfo = this.transactionService.getByPageWithCategory(null, null, new TransactionExample()); PageInfo<Transaction> pageInfo = this.transactionService.getByPageWithCategory(null, null, new TransactionExample());
List<TransInfoModel> modelList = transferTransInfoModel(pageInfo); List<TransInfoModel> modelList = transferTransInfoModel(pageInfo.getList());
return ResponseKit.success(modelList); return ResponseKit.success(modelList);
} }
...@@ -253,7 +254,7 @@ public class TransactionControllerV1 { ...@@ -253,7 +254,7 @@ public class TransactionControllerV1 {
public ApiResponse allTransList(@PathVariable("pageNo") Integer pageNo, @PathVariable("pageSize") Integer pageSize) { public ApiResponse allTransList(@PathVariable("pageNo") Integer pageNo, @PathVariable("pageSize") Integer pageSize) {
PageInfo<Transaction> pageInfo = this.transactionService.getByPageWithCategory(pageNo, pageSize, new TransactionExample()); PageInfo<Transaction> pageInfo = this.transactionService.getByPageWithCategory(pageNo, pageSize, new TransactionExample());
List<TransInfoModel> modelList = transferTransInfoModel(pageInfo); List<TransInfoModel> modelList = transferTransInfoModel(pageInfo.getList());
ResDataModel<TransInfoModel> resDataModel = new ResDataModel<>(); ResDataModel<TransInfoModel> resDataModel = new ResDataModel<>();
resDataModel.setTotal(Integer.valueOf(pageInfo.getTotal()+"")); resDataModel.setTotal(Integer.valueOf(pageInfo.getTotal()+""));
resDataModel.setData(modelList); resDataModel.setData(modelList);
...@@ -261,11 +262,10 @@ public class TransactionControllerV1 { ...@@ -261,11 +262,10 @@ public class TransactionControllerV1 {
return ResponseKit.success(resDataModel); return ResponseKit.success(resDataModel);
} }
private List<TransInfoModel> transferTransInfoModel(PageInfo<Transaction> pageInfo) { private List<TransInfoModel> transferTransInfoModel(List<Transaction> txList) {
List<TransInfoModel> modelList = new ArrayList<>(); List<TransInfoModel> modelList = new ArrayList<>();
List<Long> numberList = new ArrayList<>(); if (!CollectionUtils.isEmpty(txList)) {
if (!CollectionUtils.isEmpty(pageInfo.getList())) { txList.forEach(o -> {
pageInfo.getList().forEach(o -> {
TransInfoModel model = TransInfoModel.getInstance(); TransInfoModel model = TransInfoModel.getInstance();
model.setHash(o.getTxId()); model.setHash(o.getTxId());
model.setBlockHash(o.getTxId()); model.setBlockHash(o.getTxId());
...@@ -285,7 +285,6 @@ public class TransactionControllerV1 { ...@@ -285,7 +285,6 @@ public class TransactionControllerV1 {
model.setTimestamp(tt.toString()); model.setTimestamp(tt.toString());
} }
modelList.add(model); modelList.add(model);
numberList.add(o.getNumber());
}); });
} }
...@@ -328,7 +327,7 @@ public class TransactionControllerV1 { ...@@ -328,7 +327,7 @@ public class TransactionControllerV1 {
example.createCriteria().andNumberEqualTo(Long.valueOf(block)); example.createCriteria().andNumberEqualTo(Long.valueOf(block));
PageInfo<Transaction> pageInfo = this.transactionService.getByPageWithCategory(pageNo, pageSize, example); PageInfo<Transaction> pageInfo = this.transactionService.getByPageWithCategory(pageNo, pageSize, example);
List<TransInfoModel> modelList = transferTransInfoModel(pageInfo); List<TransInfoModel> modelList = transferTransInfoModel(pageInfo.getList());
ResDataModel<TransInfoModel> resDataModel = new ResDataModel<>(); ResDataModel<TransInfoModel> resDataModel = new ResDataModel<>();
resDataModel.setTotal(Integer.valueOf(pageInfo.getTotal()+"")); resDataModel.setTotal(Integer.valueOf(pageInfo.getTotal()+""));
resDataModel.setData(modelList); resDataModel.setData(modelList);
...@@ -399,7 +398,7 @@ public class TransactionControllerV1 { ...@@ -399,7 +398,7 @@ public class TransactionControllerV1 {
PageInfo<Transaction> pageInfo = this.transactionService.selectPageByAddress(address, 1, 25); PageInfo<Transaction> pageInfo = this.transactionService.selectPageByAddress(address, 1, 25);
List<TransInfoModel> modelList = new ArrayList<>(); List<TransInfoModel> modelList = new ArrayList<>();
if (!CollectionUtils.isEmpty(pageInfo.getList())) { if (!CollectionUtils.isEmpty(pageInfo.getList())) {
modelList = transferTransInfoModel(pageInfo); modelList = transferTransInfoModel(pageInfo.getList());
} }
return ResponseKit.success(modelList); return ResponseKit.success(modelList);
......
...@@ -104,6 +104,9 @@ public class Engine { ...@@ -104,6 +104,9 @@ public class Engine {
this.executor.execute(this.addressFetcher); this.executor.execute(this.addressFetcher);
} }
/*
统计数据抓取、处理
*/
this.censusDataHandler = new CensusDataHandler(); this.censusDataHandler = new CensusDataHandler();
this.censusDataFetcher = new CensusDataFetcher<>(this.censusDataHandler); this.censusDataFetcher = new CensusDataFetcher<>(this.censusDataHandler);
this.executor.execute(this.censusDataFetcher); this.executor.execute(this.censusDataFetcher);
......
...@@ -40,7 +40,8 @@ public interface LastBlockService { ...@@ -40,7 +40,8 @@ public interface LastBlockService {
void refresh(); void refresh();
/** /**
* 根据DB cur_block_num 同步固定批次的区块号 * 同步固定数量的区块号到Redis
* 如果Redis上没有缓存过区块号,从DB中获取,后续直接操作Redis
*/ */
void sync(); void sync();
......
...@@ -95,9 +95,7 @@ public class LastBlockServiceImpl implements LastBlockService { ...@@ -95,9 +95,7 @@ public class LastBlockServiceImpl implements LastBlockService {
} }
/**
* 根据DB cur_block_num 同步固定批次的区块号
*/
@Override @Override
public void sync() { public void sync() {
LastBlock lastBlock = this.lastBlockRepository.selectOneByExample(null); LastBlock lastBlock = this.lastBlockRepository.selectOneByExample(null);
...@@ -105,13 +103,16 @@ public class LastBlockServiceImpl implements LastBlockService { ...@@ -105,13 +103,16 @@ public class LastBlockServiceImpl implements LastBlockService {
return; return;
} }
/*
如果redis未缓存blockNum,从DB中获取
*/
BigDecimal bg; BigDecimal bg;
BigDecimal diff; BigDecimal diff;
long startNum; long startNum;
long endNum; long endNum;
/*
Redis获取已同步区块高度block_num
如果已存在,根据此值计算:(最新区块高度-Redis:block_num)将区间值push到队列中,已被后续抓取数据做准备
如果不存在,从DB中提取已抓取的数据值(最新区块高度-DB:block_num)将区间值push到队列中,已被后续抓取数据做准备
*/
String redisBlockNum = this.stringRedisTemplate.opsForValue().get(Constant.BLOCK_NUM_KEY); String redisBlockNum = this.stringRedisTemplate.opsForValue().get(Constant.BLOCK_NUM_KEY);
if (StringUtils.isEmpty(redisBlockNum)) { if (StringUtils.isEmpty(redisBlockNum)) {
startNum = lastBlock.getCurBlockNum(); startNum = lastBlock.getCurBlockNum();
...@@ -144,7 +145,7 @@ public class LastBlockServiceImpl implements LastBlockService { ...@@ -144,7 +145,7 @@ public class LastBlockServiceImpl implements LastBlockService {
this.stringRedisTemplate.opsForValue().set(Constant.BLOCK_NUM_KEY, lastBlock.getLastBlockNum().toString()); this.stringRedisTemplate.opsForValue().set(Constant.BLOCK_NUM_KEY, lastBlock.getLastBlockNum().toString());
} }
// 将区块高度同步到redis list // 将区块高度同步到redis 队列
this.stringRedisTemplate.executePipelined(new RedisCallback<Object>() { this.stringRedisTemplate.executePipelined(new RedisCallback<Object>() {
@Override @Override
public Object doInRedis(RedisConnection redisConnection) throws DataAccessException { public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
......
...@@ -67,6 +67,8 @@ public class TransactionServiceImpl implements TransactionService { ...@@ -67,6 +67,8 @@ public class TransactionServiceImpl implements TransactionService {
dataList.forEach(o -> { dataList.forEach(o -> {
TronBlockHeader tronBlockHeader = o.getBlock_header(); TronBlockHeader tronBlockHeader = o.getBlock_header();
// 区块头设置
BlockHeader header = transferBlockHeader(o.getBlockID(), tronBlockHeader); BlockHeader header = transferBlockHeader(o.getBlockID(), tronBlockHeader);
List<Transactions> trList = o.getTransactions(); List<Transactions> trList = o.getTransactions();
if (CollectionUtils.isEmpty(trList)) { if (CollectionUtils.isEmpty(trList)) {
...@@ -105,6 +107,7 @@ public class TransactionServiceImpl implements TransactionService { ...@@ -105,6 +107,7 @@ public class TransactionServiceImpl implements TransactionService {
if (transactionList.size() != 0) { if (transactionList.size() != 0) {
censusTransNumber = transactionList.size(); censusTransNumber = transactionList.size();
transactionList.forEach(o -> { transactionList.forEach(o -> {
// hex 数据转换成base58 // hex 数据转换成base58
String ownerAddress; String ownerAddress;
String toAddress; String toAddress;
...@@ -140,6 +143,7 @@ public class TransactionServiceImpl implements TransactionService { ...@@ -140,6 +143,7 @@ public class TransactionServiceImpl implements TransactionService {
this.lastBlockRepository.updateCurBlockNumById(lastBlock); this.lastBlockRepository.updateCurBlockNumById(lastBlock);
log.info("已同步数据区块高度num:{}",headerList.get(0).getNumber()); log.info("已同步数据区块高度num:{}",headerList.get(0).getNumber());
// 账户地址push到Redis,未后续抓取账户余额做准备
List<Address> records = transferAddress(base58Set); List<Address> records = transferAddress(base58Set);
if (!CollectionUtils.isEmpty(records)) { if (!CollectionUtils.isEmpty(records)) {
this.addressRepository.batchInsertOnDuplicateKey(records); this.addressRepository.batchInsertOnDuplicateKey(records);
...@@ -263,8 +267,15 @@ public class TransactionServiceImpl implements TransactionService { ...@@ -263,8 +267,15 @@ public class TransactionServiceImpl implements TransactionService {
return hexList; return hexList;
} }
/**
* 区块大小、区块交易数量push到Redis,未后续统计做准备
*
* @param blockSize
* @param transNumber
*/
private void censusFlushRedis(String blockSize, Integer transNumber) { private void censusFlushRedis(String blockSize, Integer transNumber) {
if (!StringUtils.isEmpty(blockSize) && !blockSize.equals("0")) { String zero = "0";
if (!StringUtils.isEmpty(blockSize) && !blockSize.equals(zero)) {
this.stringRedisTemplate.opsForList().leftPush(Constant.CENSUS_BLOCK_SIZE, blockSize); this.stringRedisTemplate.opsForList().leftPush(Constant.CENSUS_BLOCK_SIZE, blockSize);
} }
......
...@@ -35,7 +35,7 @@ public class TronServiceImpl extends BaseCommonService implements TronService { ...@@ -35,7 +35,7 @@ public class TronServiceImpl extends BaseCommonService implements TronService {
String str = this.analysisBlock(GET_BLOCK_BYNUM, blockNum); String str = this.analysisBlock(GET_BLOCK_BYNUM, blockNum);
if (StringUtils.isEmpty(str)) { if (StringUtils.isEmpty(str)) {
stringRedisTemplate.opsForList().leftPush(Constant.BLOCK_NUM_LIST_KEY, blockNum.toString()); stringRedisTemplate.opsForList().leftPush(Constant.BLOCK_NUM_LIST_KEY, blockNum.toString());
log.warn("区块高度blockNum:{}抓取失败,重新放置到队列中.", blockNum); log.warn("区块高度blockNum:{}抓取失败,重新放置到队列{}中.", blockNum, Constant.BLOCK_NUM_LIST_KEY);
return null; return null;
} }
...@@ -75,7 +75,7 @@ public class TronServiceImpl extends BaseCommonService implements TronService { ...@@ -75,7 +75,7 @@ public class TronServiceImpl extends BaseCommonService implements TronService {
String str = this.execute(request); String str = this.execute(request);
if (StringUtils.isEmpty(str)) { if (StringUtils.isEmpty(str)) {
this.stringRedisTemplate.opsForSet().add(Constant.ADDRESS_KEY, address); this.stringRedisTemplate.opsForSet().add(Constant.ADDRESS_KEY, address);
log.warn("账户余额address:{}抓取失败,重新放置到队列中.", address); log.warn("账户余额address:{}抓取失败,重新放置到队列{}中.", address, Constant.ADDRESS_KEY);
return null; return null;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment