Commit acdf4a00 authored by wuxinyang's avatar wuxinyang

本次更新添加交易分层,交易并行池,以及process执行器

parent 363b6ec1
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="ChangeListManager"> <component name="ChangeListManager">
<list default="true" id="b5e0db37-7f3c-47e2-965e-a9d467d53f11" name="Default Changelist" comment=""> <list default="true" id="b5e0db37-7f3c-47e2-965e-a9d467d53f11" name="Default Changelist" comment="" />
<change afterPath="$PROJECT_DIR$/core/transcut/transactionMark.go" afterDir="false" />
<change afterPath="$PROJECT_DIR$/core/transcut/transactionMark_test.go" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" /> <option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" /> <option name="HIGHLIGHT_CONFLICTS" value="true" />
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" /> <option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
<option name="LAST_RESOLUTION" value="IGNORE" /> <option name="LAST_RESOLUTION" value="IGNORE" />
</component> </component>
<component name="FileTemplateManagerImpl">
<option name="RECENT_TEMPLATES">
<list>
<option value="Go File" />
</list>
</option>
</component>
<component name="GOROOT" path="/usr/local/go" /> <component name="GOROOT" path="/usr/local/go" />
<component name="Git.Settings"> <component name="Git.Settings">
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" /> <option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" />
...@@ -23,6 +27,7 @@ ...@@ -23,6 +27,7 @@
<option name="showLibraryContents" value="true" /> <option name="showLibraryContents" value="true" />
</component> </component>
<component name="PropertiesComponent"> <component name="PropertiesComponent">
<property name="DefaultGoTemplateProperty" value="Go File" />
<property name="RunOnceActivity.OpenProjectViewOnStart" value="true" /> <property name="RunOnceActivity.OpenProjectViewOnStart" value="true" />
<property name="WebServerToolWindowFactoryState" value="false" /> <property name="WebServerToolWindowFactoryState" value="false" />
<property name="go.import.settings.migrated" value="true" /> <property name="go.import.settings.migrated" value="true" />
...@@ -37,6 +42,7 @@ ...@@ -37,6 +42,7 @@
<property name="node.js.selected.package.tslint" value="(autodetect)" /> <property name="node.js.selected.package.tslint" value="(autodetect)" />
<property name="nodejs_interpreter_path.stuck_in_default_project" value="undefined stuck path" /> <property name="nodejs_interpreter_path.stuck_in_default_project" value="undefined stuck path" />
<property name="nodejs_npm_path_reset_for_default_project" value="true" /> <property name="nodejs_npm_path_reset_for_default_project" value="true" />
<property name="settings.editor.selected.configurable" value="preferences.lookFeel" />
</component> </component>
<component name="RecentsManager"> <component name="RecentsManager">
<key name="CopyFile.RECENT_KEYS"> <key name="CopyFile.RECENT_KEYS">
...@@ -51,9 +57,13 @@ ...@@ -51,9 +57,13 @@
<integration-enabled>true</integration-enabled> <integration-enabled>true</integration-enabled>
</component> </component>
<component name="WindowStateProjectService"> <component name="WindowStateProjectService">
<state x="421" y="205" width="597" height="548" key="find.popup" timestamp="1665369622158"> <state x="229" y="86" key="SettingsEditor" timestamp="1669866464036">
<screen x="0" y="0" width="1440" height="900" />
</state>
<state x="229" y="86" key="SettingsEditor/0.0.1440.900@0.0.1440.900" timestamp="1669866464036" />
<state x="421" y="205" width="597" height="548" key="find.popup" timestamp="1669801139898">
<screen x="0" y="0" width="1440" height="900" /> <screen x="0" y="0" width="1440" height="900" />
</state> </state>
<state x="421" y="205" width="597" height="548" key="find.popup/0.0.1440.900@0.0.1440.900" timestamp="1665369622158" /> <state x="421" y="205" width="597" height="548" key="find.popup/0.0.1440.900@0.0.1440.900" timestamp="1669801139898" />
</component> </component>
</project> </project>
\ No newline at end of file
...@@ -74,6 +74,9 @@ func NewEVMTxContext(msg Message) vm.TxContext { ...@@ -74,6 +74,9 @@ func NewEVMTxContext(msg Message) vm.TxContext {
return vm.TxContext{ return vm.TxContext{
Origin: msg.From(), Origin: msg.From(),
GasPrice: new(big.Int).Set(msg.GasPrice()), GasPrice: new(big.Int).Set(msg.GasPrice()),
//added by echo
ReadSet: make(map[common.Hash][]byte),
WriteSet: make(map[common.Hash][]byte),
} }
} }
......
package executionpool
import (
"context"
"go-ethereum-advance/common"
"go-ethereum-advance/core"
"go-ethereum-advance/core/state"
"go-ethereum-advance/core/transcut"
"go-ethereum-advance/core/types"
"go-ethereum-advance/core/vm"
"go-ethereum-advance/params"
"log"
"math/big"
"sync"
"time"
)
//type Task func(msg types.Message, config *params.ChainConfig, bc core.ChainContext, author *common.Address, gp *core.GasPool, statedb *state.StateDB, blockNumber *big.Int, blockHash common.Hash, tx *types.Transaction, usedGas *uint64, evm *vm.EVM) (*types.Receipt, error)
type Task struct {
item *transcut.Item
config *params.ChainConfig
bc core.ChainContext
author *common.Address
gp *core.GasPool
statedb *state.StateDB
blockNumber *big.Int
blockHash common.Hash
usedGas *uint64
evm *vm.EVM
level int //批次
}
func NewTask(config *params.ChainConfig,bc core.ChainContext,author *common.Address,gp *core.GasPool,statedb *state.StateDB,blockNumber *big.Int,blockHash common.Hash,item *transcut.Item,usedGas *uint64,evm *vm.EVM,level int) *Task {
return &Task{
item: item,
config: config,
bc: bc,
author: author,
gp: gp,
statedb: statedb,
blockNumber: blockNumber,
blockHash: blockHash,
usedGas: usedGas,
evm: evm,
level:level,
}
}
func (t *Task)DoTask() (*types.Receipt, error) {
return core.ApplyTransactionForExecPool(t.item.MessageInfo(),t.config,t.bc,t.author,t.gp,t.statedb,t.blockNumber,t.blockHash,t.item.TxInfo(),t.usedGas,t.evm,t.level)
}
type Pool struct {
MaxWorkerIdleTime time.Duration // worker 最大空闲时间
MaxWorkerNum int32 // 协程最大数量
TaskEntryChan chan *Task // 任务入列
Workers []*worker // 已创建worker
FreeWorkerChan chan *worker // 空闲worker
Lock sync.Mutex
ResultChan chan *Result
}
const (
WorkerStatusStop = 1
WorkerStatusLive = 0
)
const (
MaxWaitingTime = 3
)
type worker struct {
Pool *Pool
StartTime time.Time // 开始时间
TaskChan chan *Task // 执行队列
LastWorkTime time.Time // 最后执行时间
Ctx context.Context
Cancel context.CancelFunc
Status int32 // 被过期删掉的标记
//ResultChan chan *Result //执行结果
}
type Result struct {
Receipt *types.Receipt //
Err error
Tx *types.Transaction
}
// 初始化
func NewPool(num int) *Pool {
p := &Pool{
MaxWorkerIdleTime: MaxWaitingTime * time.Second,
MaxWorkerNum: int32(num),
TaskEntryChan: make(chan *Task, 2000),
FreeWorkerChan: make(chan *worker, 2000),
ResultChan: make(chan *Result, num),
}
// 分发任务
go p.dispatchTask()
//清理空闲worker
go p.fireWorker()
return p
}
// 定期清理空闲worker
func (p *Pool) fireWorker() {
for {
select {
case <-time.After(MaxWaitingTime * time.Second):
for k, w := range p.Workers {
if time.Now().Sub(w.LastWorkTime) > p.MaxWorkerIdleTime {
log.Printf("overtime %v %p", k, w)
// 终止协程
w.Cancel()
// 清理Free
w.Status = WorkerStatusStop
}
}
p.Lock.Lock()
p.Workers = p.cleanWorker(p.Workers)
p.Lock.Unlock()
}
}
}
// 递归清理无用worker
func (p *Pool) cleanWorker(workers []*worker) []*worker {
for k, w := range workers {
if time.Now().Sub(w.LastWorkTime) > p.MaxWorkerIdleTime {
workers = append(workers[:k], workers[k+1:]...) // 删除中间1个元素
return p.cleanWorker(workers)
}
}
return workers
}
// 分发任务
func (p *Pool) dispatchTask() {
for {
select {
case t := <-p.TaskEntryChan:
log.Printf("dispatch task %p", t)
// 获取worker
w := p.fetchWorker()
// 将任务扔给worker
w.accept(t)
}
}
}
// 获取可用worker
func (p *Pool) fetchWorker() *worker {
for {
select {
// 获取空闲worker
case w := <-p.FreeWorkerChan:
if w.Status == WorkerStatusLive {
return w
}
default:
// 创建新的worker
if int32(len(p.Workers)) < p.MaxWorkerNum {
w := &worker{
Pool: p,
StartTime: time.Now(),
LastWorkTime: time.Now(),
TaskChan: make(chan *Task, 1),
Ctx: context.Background(),
Status: WorkerStatusLive,
}
ctx, cancel := context.WithCancel(w.Ctx)
w.Cancel = cancel
// 接到任务自己去执行
go w.execute(ctx)
p.Lock.Lock()
p.Workers = append(p.Workers, w)
p.Lock.Unlock()
p.FreeWorkerChan <- w
log.Printf("worker create %p", w)
}
}
}
}
// 添加任务
func (p *Pool) addTask(t Task) {
// 将任务放到入口任务队列
p.TaskEntryChan <- &t
}
// 接受任务
func (w *worker) accept(t *Task) {
// 每个worker自己的工作队列
w.TaskChan <- t
}
// 执行任务
func (w *worker) execute(ctx context.Context) {
for {
select {
case t := <-w.TaskChan:
// 执行
result,err := t.DoTask()
// 记录工作状态
w.Pool.ResultChan <- &Result{Receipt: result,Err: err,Tx:t.item.TxInfo()}
w.LastWorkTime = time.Now()
w.Pool.FreeWorkerChan <- w
case <-ctx.Done():
log.Printf("worker done %p", w)
return
}
}
}
// 执行
func (p *Pool)AddTask(t Task) {
p.addTask(t)
}
\ No newline at end of file
package process
import (
"go-ethereum-advance/common"
"go-ethereum-advance/consensus"
"go-ethereum-advance/core"
"go-ethereum-advance/core/executionpool"
"go-ethereum-advance/core/state"
"go-ethereum-advance/core/transcut"
"go-ethereum-advance/core/types"
"go-ethereum-advance/core/vm"
"go-ethereum-advance/log"
"go-ethereum-advance/params"
)
type Result struct {
Receipt types.Receipt //
Ed ErrorDetail
}
type Process interface {
//执行批次交易并返回结果以及读写集
ExecBenchTxs(block *types.Block,statedb *state.StateDB,txs []*types.Transaction, gp *core.GasPool,cfg vm.Config) []*Result
//执行单一交易并返回结果以及读写集
ExecSingleTx(block *types.Block,tx *types.Transaction,chainConf *params.ChainConfig,author *common.Address, gp *core.GasPool,usedGass *uint64) *Result
//检查传递进来的读写集是否有冲突
RWSetCanBeWritten(c *vm.CacheSet) ([]common.Hash,bool)
}
type Processor struct {
config *params.ChainConfig // Chain configuration options
bc *core.BlockChain // Canonical block chain
engine consensus.Engine // Consensus engine used for block rewards
}
type ErrorDetail struct {
err error
tx *types.Transaction
}
func NewProcessor(config *params.ChainConfig, bc *core.BlockChain, engine consensus.Engine) *Processor {
return &Processor{
config: config,
bc: bc,
engine: engine,
}
}
func (p *Processor) ExecBenchTxs(block *types.Block,statedb *state.StateDB,txs []*types.Transaction,authors []*common.Address ,gp *core.GasPool,cfg vm.Config) []*Result {
var (
results []*Result
header = block.Header()
usedGas = new(uint64)
blockHash = block.Hash()
blockNumber = block.Number()
errs []*ErrorDetail
)
blockContext := core.NewEVMBlockContext(header, p.bc, nil)
vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
cache := transcut.NewFromMarkCache()
for i, tx := range txs {
msg, err := tx.AsMessage(types.MakeSigner(p.config, header.Number), header.BaseFee)
if err != nil {
ed := &ErrorDetail{
err: err,
tx: tx,
}
errs = append(errs,ed)
log.Error("tx asMessage failed","err",err.Error(),"tx info",tx.Hash().String())
continue
}
item := transcut.NewItem(tx,msg,authors[i])
from := msg.From()
cache.SetMarkCache(from,item)
}
//TODO modify by echo
level := cache.GetMaxLevel()
for i:=0;i < int(level);i++ {
items := cache.GetLevelItemWithMark(uint32(i))
pool := executionpool.NewPool(len(items))
for _,item := range items{
task := executionpool.NewTask(p.config,p.bc,nil,gp,statedb,blockNumber,blockHash,item,usedGas,vmenv,i)
pool.AddTask(*task)
}
//需要等待第一批次的交易都执行完自开启下一批次
for i:=0;i< len(items);i++ {
select {
case v := <- pool.ResultChan:
ed := ErrorDetail{
err: v.Err,
tx: v.Tx,
}
result := &Result{
Receipt: *v.Receipt,
Ed: ed,
}
results = append(results,result)
}
}
}
return results
}
func (p *Processor) ExecSingleTx(config *params.ChainConfig, bc core.ChainContext, author *common.Address, gp *core.GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, cfg vm.Config) *Result {
var (
usedGas = new(uint64)
)
receipt,err := core.ApplyTransaction(config,bc,author,gp,statedb,header,tx,usedGas,cfg)
if err != nil {
log.Error("ExecSingleTx err","err",err.Error(),"tx info",tx.Hash().String())
return &Result{
Receipt: nil,
Ed: ErrorDetail{
err: err,
tx: tx,
},
}
}
return &Result{
Receipt: *receipt,
Ed: ErrorDetail{
err: nil,
tx: tx,
},
}
}
type WrittenBefore struct {
written map[common.Hash]struct{}
}
/**
对同一批次级别的读写集进行检测
有读写冲突的交易需要拿出来单独执行、
所谓读写冲突是,在当前的读集中在之前的写集给予修改
**/
func (p *Processor) RWSetCanBeWritten(c *vm.CacheSet) ([]common.Hash,bool) {
maxLevel := c.GetMaxLevel()
res := make([]common.Hash,0)
flag := true
for i:=0;i<maxLevel;i++ {
tmpCache := c.GetRecordsWithLevel(i)
wb := make(map[common.Hash]struct{})
for _,key := range tmpCache.Order {
record,ok := tmpCache.GetRecordWithTxHash(key)
if !ok {
log.Info("没有找到对应的读写集","level",i,"txInfo",key.String())
continue
}
for k,_ := range record.ReadRecord {
_,ok := wb[k]
if ok {
//在读集中发现了之前有写集修改过的记录
res = append(res,key)
}
}
for key,_ := range record.WriteRecord {
wb[key] = struct{}{}
}
}
}
if len(res) > 0 {
flag = false
}
return res,flag
}
...@@ -18,16 +18,17 @@ package core ...@@ -18,16 +18,17 @@ package core
import ( import (
"fmt" "fmt"
"math/big"
"go-ethereum-advance/common" "go-ethereum-advance/common"
"go-ethereum-advance/consensus" "go-ethereum-advance/consensus"
"go-ethereum-advance/consensus/misc" "go-ethereum-advance/consensus/misc"
"go-ethereum-advance/core/executionpool"
"go-ethereum-advance/core/state" "go-ethereum-advance/core/state"
"go-ethereum-advance/core/transcut"
"go-ethereum-advance/core/types" "go-ethereum-advance/core/types"
"go-ethereum-advance/core/vm" "go-ethereum-advance/core/vm"
"go-ethereum-advance/crypto" "go-ethereum-advance/crypto"
"go-ethereum-advance/params" "go-ethereum-advance/params"
"math/big"
) )
// StateProcessor is a basic Processor, which takes care of transitioning // StateProcessor is a basic Processor, which takes care of transitioning
...@@ -73,25 +74,112 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg ...@@ -73,25 +74,112 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
blockContext := NewEVMBlockContext(header, p.bc, nil) blockContext := NewEVMBlockContext(header, p.bc, nil)
vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg) vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
// Iterate over and process the individual transactions // Iterate over and process the individual transactions
cache := transcut.NewFromMarkCache()
for i, tx := range block.Transactions() { for i, tx := range block.Transactions() {
msg, err := tx.AsMessage(types.MakeSigner(p.config, header.Number), header.BaseFee) msg, err := tx.AsMessage(types.MakeSigner(p.config, header.Number), header.BaseFee)
if err != nil { if err != nil {
return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
} }
item := transcut.NewItem(tx,msg,nil)
from := msg.From()
cache.SetMarkCache(from,item)
statedb.Prepare(tx.Hash(), i) statedb.Prepare(tx.Hash(), i)
receipt, err := applyTransaction(msg, p.config, p.bc, nil, gp, statedb, blockNumber, blockHash, tx, usedGas, vmenv) //TODO modify by echo
if err != nil { //receipt, err := applyTransaction(msg, p.config, p.bc, nil, gp, statedb, blockNumber, blockHash, tx, usedGas, vmenv)
return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) //if err != nil {
// return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
//}
//receipts = append(receipts, receipt)
//allLogs = append(allLogs, receipt.Logs...)
}
//TODO modify by echo
level := cache.GetMaxLevel()
for i:=0;i < int(level);i++ {
items := cache.GetLevelItemWithMark(uint32(i))
pool := executionpool.NewPool(len(items))
for _,item := range items{
task := executionpool.NewTask(p.config,p.bc,nil,gp,statedb,blockNumber,blockHash,item,usedGas,vmenv,i)
pool.AddTask(*task)
}
//需要等待第一批次的交易都执行完自开启下一批次
for i:=0;i< len(items);i++ {
select {
case v := <- pool.ResultChan:
receipt := v.Receipt
err := v.Err
tx := v.Tx
if err != nil {
return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
}
receipts = append(receipts, receipt)
allLogs = append(allLogs, receipt.Logs...)
}
} }
receipts = append(receipts, receipt)
allLogs = append(allLogs, receipt.Logs...)
} }
// Finalize the block, applying any consensus engine specific extras (e.g. block rewards) // Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
p.engine.Finalize(p.bc, header, statedb, block.Transactions(), block.Uncles()) p.engine.Finalize(p.bc, header, statedb, block.Transactions(), block.Uncles())
return receipts, allLogs, *usedGas, nil return receipts, allLogs, *usedGas, nil
} }
//TODO added by echo
func ApplyTransactionForExecPool(msg types.Message, config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, blockNumber *big.Int, blockHash common.Hash, tx *types.Transaction, usedGas *uint64, evm *vm.EVM,level int) (*types.Receipt, error) {
// Create a new context to be used in the EVM environment.
txContext := NewEVMTxContext(msg)
evm.Reset(txContext, statedb)
// Apply the transaction to the current state (included in the env).
result, err := ApplyMessage(evm, msg, gp)
if err != nil {
return nil, err
}
// Update the state with pending changes.
var root []byte
if config.IsByzantium(blockNumber) {
statedb.Finalise(true)
} else {
root = statedb.IntermediateRoot(config.IsEIP158(blockNumber)).Bytes()
}
*usedGas += result.UsedGas
//added by echo
for key,value := range txContext.ReadSet {
vm.TempRWSet.SetCacheWithReadRecord(tx.Hash(),key,value,level)
}
for key,value := range txContext.WriteSet {
vm.TempRWSet.SetCacheWithWriteRecord(tx.Hash(),key,value,level)
}
// Create a new receipt for the transaction, storing the intermediate root and gas used
// by the tx.
receipt := &types.Receipt{Type: tx.Type(), PostState: root, CumulativeGasUsed: *usedGas}
if result.Failed() {
receipt.Status = types.ReceiptStatusFailed
} else {
receipt.Status = types.ReceiptStatusSuccessful
}
receipt.TxHash = tx.Hash()
receipt.GasUsed = result.UsedGas
// If the transaction created a contract, store the creation address in the receipt.
if msg.To() == nil {
receipt.ContractAddress = crypto.CreateAddress(evm.TxContext.Origin, tx.Nonce())
}
// Set the receipt logs and create the bloom filter.
receipt.Logs = statedb.GetLogs(tx.Hash(), blockHash)
receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
receipt.BlockHash = blockHash
receipt.BlockNumber = blockNumber
receipt.TransactionIndex = uint(statedb.TxIndex())
return receipt, err
}
func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, blockNumber *big.Int, blockHash common.Hash, tx *types.Transaction, usedGas *uint64, evm *vm.EVM) (*types.Receipt, error) { func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, blockNumber *big.Int, blockHash common.Hash, tx *types.Transaction, usedGas *uint64, evm *vm.EVM) (*types.Receipt, error) {
// Create a new context to be used in the EVM environment. // Create a new context to be used in the EVM environment.
txContext := NewEVMTxContext(msg) txContext := NewEVMTxContext(msg)
......
...@@ -125,8 +125,26 @@ import ( ...@@ -125,8 +125,26 @@ import (
/**from的标记**/ /**from的标记**/
type Item struct { type Item struct {
tx *types.Transaction txInfo *types.Transaction
mark uint32 msg types.Message
author *common.Address
mark uint32
}
func NewItem(tx *types.Transaction,msg types.Message,author *common.Address) *Item {
return &Item{
txInfo: tx,
msg: msg,
author: author,
}
}
func (i *Item)TxInfo() *types.Transaction {
return i.txInfo
}
func (i *Item)MessageInfo() types.Message {
return i.msg
} }
type FromMarkCache struct { type FromMarkCache struct {
...@@ -161,7 +179,7 @@ func (f *FromMarkCache) SetMarkCache(addr common.Address,data *Item) { ...@@ -161,7 +179,7 @@ func (f *FromMarkCache) SetMarkCache(addr common.Address,data *Item) {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
v,ok := f.fromCache[addr] v,ok := f.fromCache[addr]
dataTo := data.tx.To() dataTo := data.txInfo.To()
if ok { if ok {
data.mark = f.maxLevel+1 data.mark = f.maxLevel+1
f.maxLevel = data.mark f.maxLevel = data.mark
...@@ -199,13 +217,27 @@ func (f *FromMarkCache) GetLevelTxsWithMark(index uint32) []*types.Transaction { ...@@ -199,13 +217,27 @@ func (f *FromMarkCache) GetLevelTxsWithMark(index uint32) []*types.Transaction {
for _,v := range f.fromCache{ for _,v := range f.fromCache{
for _,data := range v{ for _,data := range v{
if data.mark == index { if data.mark == index {
arr = append(arr,data.tx) arr = append(arr,data.txInfo)
} }
} }
} }
return arr return arr
} }
/**找某个层级的所有item**/
func (f *FromMarkCache) GetLevelItemWithMark(index uint32) []*Item {
f.lock.RLock()
defer f.lock.RUnlock()
arr := make([]*Item,0)
for _,v := range f.fromCache{
for _,data := range v{
if data.mark == index {
arr = append(arr,data)
}
}
}
return arr
}
func (f *FromMarkCache) GetMaxLevel() uint32 { func (f *FromMarkCache) GetMaxLevel() uint32 {
return f.maxLevel+1 return f.maxLevel+1
......
package transcut package transcut
import ( import (
"github.com/ethereum/go-ethereum/common" "go-ethereum-advance/common"
"github.com/ethereum/go-ethereum/core/types" "go-ethereum-advance/core/types"
"math/big" "math/big"
"testing" "testing"
) )
...@@ -82,7 +82,7 @@ func TestNewFromMarkCache(t *testing.T) { ...@@ -82,7 +82,7 @@ func TestNewFromMarkCache(t *testing.T) {
for i,tx := range txs{ for i,tx := range txs{
var key common.Address var key common.Address
item := &Item{ item := &Item{
tx:tx, txInfo:tx,
} }
switch i { switch i {
case 0: case 0:
...@@ -126,7 +126,7 @@ func TestFromMarkCache_GetLevelTxsWithMark(t *testing.T) { ...@@ -126,7 +126,7 @@ func TestFromMarkCache_GetLevelTxsWithMark(t *testing.T) {
for i,tx := range txs { for i,tx := range txs {
var key common.Address var key common.Address
item := &Item{ item := &Item{
tx:tx, txInfo:tx,
} }
switch i { switch i {
......
...@@ -48,4 +48,5 @@ type Processor interface { ...@@ -48,4 +48,5 @@ type Processor interface {
// the transaction messages using the statedb and applying any rewards to both // the transaction messages using the statedb and applying any rewards to both
// the processor (coinbase) and any included uncles. // the processor (coinbase) and any included uncles.
Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error)
} }
package vm
import (
"bytes"
"go-ethereum-advance/common"
"sort"
)
var TempRWSet = NewCacheSet()
type RWRecord struct {
ReadRecord map[common.Hash][]byte
WriteRecord map[common.Hash][]byte
level int
}
func NewRWRecord() *RWRecord {
return &RWRecord{
ReadRecord: make(map[common.Hash][]byte),
WriteRecord: make(map[common.Hash][]byte),
}
}
func (rw *RWRecord) Level() int {
return rw.level
}
func (rw *RWRecord) SetReadRecord(rKey common.Hash,rValue []byte) {
rw.ReadRecord[rKey] = rValue
}
func (rw *RWRecord) SetWriteRecord(wKey common.Hash,wValue []byte) {
rw.WriteRecord[wKey] = wValue
}
func (rw *RWRecord) SetCurrentRecordLevel(level int) {
rw.level = level
}
type CacheSet struct {
cache map[common.Hash]*RWRecord
Order []common.Hash
maxLevel int
}
func NewCacheSet() *CacheSet {
return &CacheSet{
cache: make(map[common.Hash]*RWRecord),
Order: make([]common.Hash,0),
}
}
func (c *CacheSet) GetMaxLevel() int {
return c.maxLevel
}
func (c *CacheSet) SetCache(txHash common.Hash,record *RWRecord) {
_,ok := c.cache[txHash]
if !ok {
c.cache[txHash] = record
c.Order = append(c.Order,txHash)
}
}
func (c *CacheSet) SetCacheWithReadRecord(txHash common.Hash,key common.Hash,value []byte,level int) {
record,ok := c.cache[txHash]
if !ok {
record = NewRWRecord()
c.Order = append(c.Order,txHash)
}
record.SetReadRecord(key,value)
record.SetCurrentRecordLevel(level)
if c.maxLevel < level {
c.maxLevel = level
}
c.SetCache(txHash,record)
}
func (c *CacheSet) SetCacheWithWriteRecord(txHash common.Hash,key common.Hash,value []byte,level int) {
record,ok := c.cache[txHash]
if !ok {
record = NewRWRecord()
c.Order = append(c.Order,txHash)
}
record.SetWriteRecord(key,value)
record.SetCurrentRecordLevel(level)
if c.maxLevel < level {
c.maxLevel = level
}
c.SetCache(txHash,record)
}
func (c *CacheSet) GetRecordsWithLevel(level int) *CacheSet {
newC := NewCacheSet()
for txHash,record := range c.cache{
if record.Level() == level {
newC.SetCache(txHash,record)
}
}
//按照交易哈希进行排序
sort.Sort(newC)
return newC
}
func (c *CacheSet) GetRecordWithTxHash(txHash common.Hash) (*RWRecord,bool) {
v,ok := c.cache[txHash]
return v,ok
}
func (c *CacheSet) Len() int {
return len(c.Order)
}
func (c *CacheSet) Less(i,j int) bool {
iItem := c.Order[i]
jItem := c.Order[j]
//0 if a==b, -1 if a < b, and +1 if a > b.
v := bytes.Compare(iItem.Bytes(),jItem.Bytes())
if v == -1 {
return true
}
return false
}
func (c *CacheSet) Swap(i,j int) {
c.Order[i],c.Order[j] = c.Order[j],c.Order[i]
}
\ No newline at end of file
...@@ -13,7 +13,6 @@ import ( ...@@ -13,7 +13,6 @@ import (
type contractRelationship struct {} type contractRelationship struct {}
func (c *contractRelationship) RequiredGas(input []byte) uint64 { func (c *contractRelationship) RequiredGas(input []byte) uint64 {
return uint64(len(input)+31)/32*params.Sha256PerWordGas + params.Sha256BaseGas return uint64(len(input)+31)/32*params.Sha256PerWordGas + params.Sha256BaseGas
} }
......
...@@ -84,6 +84,9 @@ type TxContext struct { ...@@ -84,6 +84,9 @@ type TxContext struct {
// Message information // Message information
Origin common.Address // Provides information for ORIGIN Origin common.Address // Provides information for ORIGIN
GasPrice *big.Int // Provides information for GASPRICE GasPrice *big.Int // Provides information for GASPRICE
//TODO added by echo
ReadSet map[common.Hash][]byte //读集
WriteSet map[common.Hash][]byte //写集
} }
// EVM is the Ethereum Virtual Machine base object and provides // EVM is the Ethereum Virtual Machine base object and provides
......
...@@ -517,6 +517,8 @@ func opSload(pc *uint64, interpreter *EVMInterpreter, scope *ScopeContext) ([]by ...@@ -517,6 +517,8 @@ func opSload(pc *uint64, interpreter *EVMInterpreter, scope *ScopeContext) ([]by
loc := scope.Stack.peek() loc := scope.Stack.peek()
hash := common.Hash(loc.Bytes32()) hash := common.Hash(loc.Bytes32())
val := interpreter.evm.StateDB.GetState(scope.Contract.Address(), hash) val := interpreter.evm.StateDB.GetState(scope.Contract.Address(), hash)
//added by echo
interpreter.evm.ReadSet[hash] = val.Bytes()
loc.SetBytes(val.Bytes()) loc.SetBytes(val.Bytes())
return nil, nil return nil, nil
} }
...@@ -529,6 +531,8 @@ func opSstore(pc *uint64, interpreter *EVMInterpreter, scope *ScopeContext) ([]b ...@@ -529,6 +531,8 @@ func opSstore(pc *uint64, interpreter *EVMInterpreter, scope *ScopeContext) ([]b
val := scope.Stack.pop() val := scope.Stack.pop()
interpreter.evm.StateDB.SetState(scope.Contract.Address(), interpreter.evm.StateDB.SetState(scope.Contract.Address(),
loc.Bytes32(), val.Bytes32()) loc.Bytes32(), val.Bytes32())
//added by echo
interpreter.evm.WriteSet[loc.Bytes32()] = val.Bytes()
return nil, nil return nil, nil
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment