Commit 77158cb6 authored by jianhua.zhang's avatar jianhua.zhang

抓取合约事件功能模块开发

parent ad2abe74
package com.wuban.tron.explore.constant;
import okhttp3.MediaType;
import org.springframework.beans.factory.annotation.Value;
/**
* <core>常量定义类</core>
......@@ -16,11 +15,6 @@ public class Constant {
*/
public static final int SUCCESS_CODE = 200;
/**
* 波场API地址
*/
public static final String HOST = "https://api.shasta.trongrid.io";
/**
* 请求数据类型:JSON
*/
......@@ -47,10 +41,14 @@ public class Constant {
*/
public static final String ADDRESS_KEY = "address";
/**
* 合约地址:根据此地址抓取合约事件
*/
public static final String CONTRACT_ADDRESS_KEY = "contract_address";
/**
* 数据统计:根据此数据(账户数量、交易数量、区块大小)更新DB统计信息
*/
public static final String CENSUS_ADDRESS = "census_address";
public static final String CENSUS_TRANS = "census_trans";
public static final String CENSUS_BLOCK_SIZE = "census_block_size";
......@@ -69,8 +67,6 @@ public class Constant {
*/
public static final int TX_ID_LEN = 64;
public static final String EXCUTOR_NAME_ACCOUNT = "抓取余额Pool";
public static final String EXCUTOR_NAME_BLOCK = "抓取区块Pool";
}
package com.wuban.tron.explore.controller;
import com.wuban.tron.explore.fetch.PersistThreadPoolV2;
import com.wuban.tron.explore.param.request.CensusRequest;
import com.wuban.tron.explore.service.TransactionService;
import lombok.RequiredArgsConstructor;
......@@ -29,11 +30,11 @@ public class CensusController {
private final TransactionService transactionService;
private final ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
private final PersistThreadPoolV2 pool = PersistThreadPoolV2.getInstance();
@RequestMapping(method = RequestMethod.POST)
public void census(@RequestBody @Valid CensusRequest reqParam) {
this.pool.execute(() -> this.transactionService.censusBlockByDate(reqParam.getStartDate(), reqParam.getEndDate()));
this.pool.getPool().execute(() -> this.transactionService.censusBlockByDate(reqParam.getStartDate(), reqParam.getEndDate()));
}
}
package com.wuban.tron.explore.controller;
import com.alibaba.fastjson.JSON;
import com.wuban.tron.explore.domain.TronTransEvent;
import com.wuban.tron.explore.domain.TronTransEventResult;
import com.wuban.tron.explore.domain.TronTransEventResultType;
import com.wuban.tron.explore.entity.Contract;
import com.wuban.tron.explore.entity.ContractEvent;
import com.wuban.tron.explore.entity.SolidityVersion;
import com.wuban.tron.explore.entity.example.ContractEventExample;
import com.wuban.tron.explore.entity.example.ContractExample;
import com.wuban.tron.explore.param.request.ContractCompilerRequest;
import com.wuban.tron.explore.param.request.ContractRequest;
import com.wuban.tron.explore.param.response.ContractModel;
import com.wuban.tron.explore.service.ContractCompilerService;
import com.wuban.tron.explore.service.ContractService;
import com.wuban.tron.explore.service.SolidityVersionService;
import com.wuban.tron.explore.service.TronService;
import com.wuban.tron.explore.service.*;
import com.wuban.tron.explore.util.ApiResponse;
import com.wuban.tron.explore.util.BizExceptionEnum;
import com.wuban.tron.explore.util.ResponseKit;
......@@ -17,12 +20,14 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;
import org.tron.common.utils.ByteArray;
import org.tron.walletserver.WalletApi;
import javax.validation.Valid;
import java.util.ArrayList;
import java.util.List;
/**
......@@ -45,6 +50,8 @@ public class ContractController {
private final SolidityVersionService solidityVersionService;
private final ContractEventService contractEventService;
@RequestMapping(value="getVersion", method = RequestMethod.GET)
public ApiResponse getVersion() {
List<SolidityVersion> list = this.solidityVersionService.selectByExample(null);
......@@ -144,8 +151,39 @@ public class ContractController {
return ResponseKit.success(code);
}
@RequestMapping(value="getEvent/{address}", method = RequestMethod.GET)
public ApiResponse getEvent(@PathVariable("address") String address) {
ContractEventExample example = new ContractEventExample();
example.createCriteria().andContractAddressEqualTo(address);
List<ContractEvent> list = this.contractEventService.selectByExample(example);
List<TronTransEvent> retList = new ArrayList<>();
if (!CollectionUtils.isEmpty(list)) {
list.forEach(o -> {
TronTransEvent event = new TronTransEvent();
event.setBlock_number(o.getBlockNumber());
event.setBlock_timestamp(o.getBlockTimestamp());
event.setCaller_contract_address(o.getCallerContractAddress());
event.setEvent_index(o.getEventIndex());
event.setEvent_name(o.getEventName());
event.setContract_address(o.getContractAddress());
event.setTransaction_id(o.getTransactionId());
if (!StringUtils.isEmpty(o.getResult())) {
TronTransEventResult result = JSON.parseObject(o.getResult(), TronTransEventResult.class);
event.setResult(result);
}
if(!StringUtils.isEmpty(o.getResultType())) {
TronTransEventResultType resultType = JSON.parseObject(o.getResultType(), TronTransEventResultType.class);
event.setResult_type(resultType);
}
retList.add(event);
});
}
return ResponseKit.success(retList);
}
......
......@@ -45,4 +45,12 @@ public interface ContractEventRepository {
*/
void deleteByExample(@Param("example") ContractEventExample example);
/**
* 根据检索条件获取数据
*
* @param example
* @return
*/
List<ContractEvent> selectByExample(@Param("example") ContractEventExample example);
}
package com.wuban.tron.explore.fetch;
import com.wuban.tron.explore.constant.Constant;
import com.wuban.tron.explore.domain.TronTransEvent;
import com.wuban.tron.explore.handler.AddressBalanceHandler;
import com.wuban.tron.explore.handler.ContractEventHandler;
import com.wuban.tron.explore.handler.IAddressBalanceHandler;
import com.wuban.tron.explore.handler.IContractEventHandler;
import com.wuban.tron.explore.service.TronService;
import com.wuban.tron.explore.util.SpringContextUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.ArrayList;
import java.util.List;
/**
* <core>合约事件抓取者服务接口方法实现类</core>
*
* @author sky
* @date 2020/12/4
*/
@Slf4j
public class ContractEventFetcher<T> extends AbstractJob implements IContractEventFetcher {
private TronService tronService;
private IContractEventHandler handler;
private StringRedisTemplate stringRedisTemplate;
ContractEventFetcher(final ContractEventHandler handler) {
this.handler = handler;
tronService = SpringContextUtil.getBean(TronService.class);
stringRedisTemplate = SpringContextUtil.getBean(StringRedisTemplate.class);
}
@Override
public List fetch() {
List<TronTransEvent> dataList = new ArrayList<>();
try {
// Redis 获取合约地址
String address = this.stringRedisTemplate.opsForSet().pop(Constant.CONTRACT_ADDRESS_KEY);
if (!StringUtils.isEmpty(address)) {
List<TronTransEvent> data = tronService.getContractEvent(address);
if (data != null) {
dataList.addAll(data);
}
}
} catch (Exception e) {
log.info(e.getMessage(), e);
}
return dataList;
}
@Override
public void send(List list) throws InterruptedException {
if (CollectionUtils.isEmpty(list)) {
return;
}
this.handler.receive(list);
}
@Override
public boolean execute() {
try {
final List<TronTransEvent> list = fetch();
if (CollectionUtils.isEmpty(list)) {
return false;
}
send(list);
return true;
} catch (final Exception e) {
log.error("fetch exception", e);
return false;
}
}
}
package com.wuban.tron.explore.fetch;
import com.wuban.tron.explore.domain.TronTransEvent;
import com.wuban.tron.explore.entity.Census;
import com.wuban.tron.explore.handler.AddressBalanceHandler;
import com.wuban.tron.explore.handler.BlockDataHandler;
import com.wuban.tron.explore.domain.TronAccount;
import com.wuban.tron.explore.domain.TronResponseData;
import com.wuban.tron.explore.handler.CensusDataHandler;
import com.wuban.tron.explore.handler.ContractEventHandler;
import lombok.extern.slf4j.Slf4j;
/**
......@@ -57,9 +59,19 @@ public class Engine {
*/
private CensusDataHandler censusDataHandler;
public static final int BLOCK_FETCHER_NUM = 3;
/**
* 合约事件猎手
*/
private ContractEventFetcher<TronTransEvent> contractEventFetcher;
/**
* 合约事件处理者
*/
private ContractEventHandler contractEventHandler;
public static final int BLOCK_FETCHER_NUM = 2;
public static final int BALANCE_FETCHER_NUM = 3;
public static final int BALANCE_FETCHER_NUM = 2;
public Engine(String name) {
this.executor = new Executor(name);
......@@ -111,6 +123,14 @@ public class Engine {
this.censusDataFetcher = new CensusDataFetcher<>(this.censusDataHandler);
this.executor.execute(this.censusDataFetcher);
this.executor.execute(this.censusDataHandler);
/*
合约事件数据抓取、处理
*/
this.contractEventHandler = new ContractEventHandler();
this.contractEventFetcher = new ContractEventFetcher<>(this.contractEventHandler);
this.executor.execute(this.contractEventFetcher);
this.executor.execute(this.contractEventHandler);
}
......
package com.wuban.tron.explore.fetch;
import java.util.List;
/**
* <core>合约事件数据抓取者服务接口方法</core>
*
* @author sky
* @date 2020/12/4
*/
public interface IContractEventFetcher<T> {
/**
* 抓取数据
*
* @return
*/
List<T> fetch();
/**
* 发送数据
*
* @param list
* @throws InterruptedException
*/
void send(final List<T> list) throws InterruptedException;
}
......@@ -5,6 +5,7 @@ import com.wuban.tron.explore.domain.TronFreeze;
import com.wuban.tron.explore.entity.Address;
import com.wuban.tron.explore.fetch.AbstractJob;
import com.wuban.tron.explore.fetch.PersistThreadPool;
import com.wuban.tron.explore.fetch.PersistThreadPoolV2;
import com.wuban.tron.explore.service.AddressService;
import com.wuban.tron.explore.util.BigDecimalUtil;
import com.wuban.tron.explore.util.SpringContextUtil;
......@@ -28,14 +29,14 @@ public class AddressBalanceHandler extends AbstractJob implements IAddressBalanc
private AddressService addressService;
private PersistThreadPool threadPool;
private PersistThreadPoolV2 threadPool;
private final LinkedBlockingQueue<List<TronAccount>> dataList;
public AddressBalanceHandler() {
this.dataList = new LinkedBlockingQueue<>();
addressService = SpringContextUtil.getBean(AddressService.class);
threadPool = new PersistThreadPool();
threadPool = PersistThreadPoolV2.getInstance();
}
@Override
......
......@@ -3,6 +3,7 @@ package com.wuban.tron.explore.handler;
import com.wuban.tron.explore.domain.TronResponseData;
import com.wuban.tron.explore.fetch.AbstractJob;
import com.wuban.tron.explore.fetch.PersistThreadPool;
import com.wuban.tron.explore.fetch.PersistThreadPoolV2;
import com.wuban.tron.explore.service.TransactionService;
import com.wuban.tron.explore.util.SpringContextUtil;
import lombok.extern.slf4j.Slf4j;
......@@ -22,14 +23,14 @@ public class BlockDataHandler extends AbstractJob implements IBlockDataHandler {
private TransactionService transactionService;
private PersistThreadPool threadPool;
private PersistThreadPoolV2 threadPool;
private final LinkedBlockingQueue<List<TronResponseData>> dataList;
public BlockDataHandler() {
this.dataList = new LinkedBlockingQueue<>();
transactionService = SpringContextUtil.getBean(TransactionService.class);
threadPool = new PersistThreadPool();
threadPool = PersistThreadPoolV2.getInstance();
}
@Override
......
......@@ -3,6 +3,7 @@ package com.wuban.tron.explore.handler;
import com.wuban.tron.explore.entity.Census;
import com.wuban.tron.explore.fetch.AbstractJob;
import com.wuban.tron.explore.fetch.PersistThreadPool;
import com.wuban.tron.explore.fetch.PersistThreadPoolV2;
import com.wuban.tron.explore.service.CensusService;
import com.wuban.tron.explore.util.SpringContextUtil;
import lombok.extern.slf4j.Slf4j;
......@@ -22,14 +23,14 @@ public class CensusDataHandler extends AbstractJob implements ICensusDataHandler
private CensusService service;
private PersistThreadPool threadPool;
private PersistThreadPoolV2 threadPool;
private final LinkedBlockingQueue<List<Census>> dataList;
public CensusDataHandler() {
this.dataList = new LinkedBlockingQueue<>();
service = SpringContextUtil.getBean(CensusService.class);
threadPool = new PersistThreadPool();
threadPool = PersistThreadPoolV2.getInstance();
}
@Override
......
package com.wuban.tron.explore.handler;
import com.wuban.tron.explore.domain.TronTransEvent;
import com.wuban.tron.explore.fetch.AbstractJob;
import com.wuban.tron.explore.fetch.PersistThreadPool;
import com.wuban.tron.explore.fetch.PersistThreadPoolV2;
import com.wuban.tron.explore.service.ContractEventService;
import com.wuban.tron.explore.util.SpringContextUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
/**
* <core>合约事件处理者服务接口方法实现类</core>
*
* @author sky
* @date 2020/12/4
*/
@Slf4j
public class ContractEventHandler extends AbstractJob implements IContractEventHandler<TronTransEvent> {
private ContractEventService contractEventService;
private PersistThreadPoolV2 threadPool;
private final LinkedBlockingQueue<List<TronTransEvent>> dataList;
public ContractEventHandler() {
this.dataList = new LinkedBlockingQueue<>();
contractEventService = SpringContextUtil.getBean(ContractEventService.class);
threadPool = PersistThreadPoolV2.getInstance();
}
@Override
public boolean execute() {
final List<TronTransEvent> list = this.dataList.poll();
if(list != null && list.size() != 0) {
threadPool.getPool().execute(() -> flush(list));
}
return true;
}
@Override
public void receive(List<TronTransEvent> e) throws InterruptedException {
if (!CollectionUtils.isEmpty(e)) {
this.dataList.put(e);
}
}
@Override
public void flush(List<TronTransEvent> e) {
if (!CollectionUtils.isEmpty(e)) {
this.contractEventService.batchInsert(e);
}
}
}
package com.wuban.tron.explore.handler;
import java.util.List;
/**
* <core>合约事件处理者服务接口方法</core>
*
* @author sky
* @date 2020/12/4
*/
public interface IContractEventHandler<T> {
/**
* 接收合约事件
*
* @param e 区块数据
* @throws InterruptedException 异常
*/
void receive(List<T> e) throws InterruptedException;
/**
* 数据刷入DB
*
* @param e 合约事件数据
*/
void flush(List<T> e);
}
......@@ -2,6 +2,7 @@ package com.wuban.tron.explore.service;
import com.wuban.tron.explore.domain.TronTransEvent;
import com.wuban.tron.explore.entity.ContractEvent;
import com.wuban.tron.explore.entity.example.ContractEventExample;
import java.util.List;
......@@ -28,4 +29,12 @@ public interface ContractEventService {
*/
int batchInsert(List<TronTransEvent> list);
/**
* 根据检索条件获取数据
*
* @param example
* @return
*/
List<ContractEvent> selectByExample(ContractEventExample example);
}
......@@ -70,5 +70,16 @@ public class ContractEventServiceImpl implements ContractEventService {
return this.contractEventRepository.batchInsert(list);
}
/**
* 根据检索条件获取数据
*
* @param example
* @return
*/
@Override
public List<ContractEvent> selectByExample(ContractEventExample example) {
return this.contractEventRepository.selectByExample(example);
}
}
......@@ -95,46 +95,22 @@ public class TransactionServiceImpl implements TransactionService {
headerList.add(header);
});
Integer censusTransNumber = 0;
String censusBlockSize = headerList.get(0).getBlockBytes();
/*
区块头、区块交易、hex持久化
*/
this.blockHeaderRepository.batchInsert(headerList);
Set<String> hexSet = new HashSet<>();
Set<String> base58Set = new HashSet<>();
Integer censusTransNumber = 0;
if (transactionList.size() != 0) {
censusTransNumber = transactionList.size();
transactionList.forEach(o -> {
// hex 数据转换成base58
String ownerAddress;
String toAddress;
String contractAddress;
if (!StringUtils.isEmpty(o.getOwnerAddress())) {
hexSet.add(o.getOwnerAddress());
ownerAddress = WalletApi.encode58Check(ByteArray.fromHexString(o.getOwnerAddress()));
o.setOwnerAddress(ownerAddress);
base58Set.add(ownerAddress);
}
if (!StringUtils.isEmpty(o.getToAddress())) {
hexSet.add(o.getToAddress());
toAddress = WalletApi.encode58Check(ByteArray.fromHexString(o.getToAddress()));
o.setToAddress(toAddress);
base58Set.add(toAddress);
}
if (!StringUtils.isEmpty(o.getContractAddress())) {
hexSet.add(o.getContractAddress());
contractAddress = WalletApi.encode58Check(ByteArray.fromHexString(o.getContractAddress()));
o.setContractAddress(contractAddress);
base58Set.add(contractAddress);
}
});
this.transactionRepository.batchInsert(transactionList);
this.transactionHexRepository.batchInsert(hexList);
}
Map<String, Set<String>> map = getAddress(transactionList);
Set<String> hexSet = map.get("hexSet");
Set<String> base58Set = map.get("base58Set");
Set<String> conAddrSet = map.get("conAddrSet");
// 更新区块高度
LastBlock lastBlock = new LastBlock();
......@@ -147,8 +123,9 @@ public class TransactionServiceImpl implements TransactionService {
List<Address> records = transferAddress(base58Set);
if (!CollectionUtils.isEmpty(records)) {
this.addressRepository.batchInsertOnDuplicateKey(records);
addressPushRedis(hexSet);
}
addressPushRedis(hexSet);
conAddrPushRedis(conAddrSet);
// 数据统计
this.censusFlushRedis(censusBlockSize, censusTransNumber);
......@@ -156,11 +133,80 @@ public class TransactionServiceImpl implements TransactionService {
}
/**
* 获取账户、合约地址
* hexSet:hex地址
* base58Set:base58地址
* conAddrSet:合约地址
* @param tranList 交易列表
* @return
*/
private Map<String, Set<String>> getAddress(List<Transaction> tranList) {
Map<String, Set<String>> map = new HashMap<>(3);
Set<String> hexSet = new HashSet<>();
Set<String> base58Set = new HashSet<>();
Set<String> conAddrSet = new HashSet<>();
tranList.forEach(o -> {
// hex 数据转换成base58
String ownerAddress;
String toAddress;
String contractAddress;
if (!StringUtils.isEmpty(o.getOwnerAddress())) {
hexSet.add(o.getOwnerAddress());
ownerAddress = WalletApi.encode58Check(ByteArray.fromHexString(o.getOwnerAddress()));
o.setOwnerAddress(ownerAddress);
base58Set.add(ownerAddress);
}
if (!StringUtils.isEmpty(o.getToAddress())) {
hexSet.add(o.getToAddress());
toAddress = WalletApi.encode58Check(ByteArray.fromHexString(o.getToAddress()));
o.setToAddress(toAddress);
base58Set.add(toAddress);
}
if (!StringUtils.isEmpty(o.getContractAddress())) {
hexSet.add(o.getContractAddress());
contractAddress = WalletApi.encode58Check(ByteArray.fromHexString(o.getContractAddress()));
o.setContractAddress(contractAddress);
base58Set.add(contractAddress);
conAddrSet.add(contractAddress);
}
});
map.put("hexSet", hexSet);
map.put("base58Set", base58Set);
map.put("conAddrSet", conAddrSet);
return map;
}
/**
* 合约地址同步到redis
* @param set
*/
private void conAddrPushRedis(Set<String> set) {
if (CollectionUtils.isEmpty(set)) {
return;
}
stringRedisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
StringRedisConnection conn = (StringRedisConnection)redisConnection;
set.forEach(o -> conn.sAdd(Constant.CONTRACT_ADDRESS_KEY, o));
return null;
}
});
}
/**
* 账户地址同步到redis
* @param set
*/
private void addressPushRedis(Set<String> set) {
if (CollectionUtils.isEmpty(set)) {
return;
}
stringRedisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
......
......@@ -91,9 +91,9 @@ public class TronServiceImpl extends BaseCommonService implements TronService {
return null;
}
System.out.println("处理前:》》》》》"+str);
//System.out.println("处理前:》》》》》"+str);
String ret = clearData(str);
System.out.println("处理后:》》》》》"+ret);
//System.out.println("处理后:》》》》》"+ret);
return JSONArray.parseArray(ret, TronTransEvent.class);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment