Commit d4300c89 authored by Matthew Slipper's avatar Matthew Slipper Committed by GitHub

Merge pull request #1814 from cfromknecht/deadlock-followup

fix: minor improvements to empty sequencer block handling
parents 639e5b13 5febe10f
---
'@eth-optimism/l2geth': patch
---
fixes empty block detection and removes empty worker tasks
...@@ -110,7 +110,6 @@ const ( ...@@ -110,7 +110,6 @@ const (
// newWorkReq represents a request for new sealing work submitting with relative interrupt notifier. // newWorkReq represents a request for new sealing work submitting with relative interrupt notifier.
type newWorkReq struct { type newWorkReq struct {
interrupt *int32 interrupt *int32
noempty bool
timestamp int64 timestamp int64
} }
...@@ -307,12 +306,12 @@ func (w *worker) newWorkLoop(recommit time.Duration) { ...@@ -307,12 +306,12 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
<-timer.C // discard the initial tick <-timer.C // discard the initial tick
// commit aborts in-flight transaction execution with given signal and resubmits a new one. // commit aborts in-flight transaction execution with given signal and resubmits a new one.
commit := func(noempty bool, s int32) { commit := func(s int32) {
if interrupt != nil { if interrupt != nil {
atomic.StoreInt32(interrupt, s) atomic.StoreInt32(interrupt, s)
} }
interrupt = new(int32) interrupt = new(int32)
w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp} w.newWorkCh <- &newWorkReq{interrupt: interrupt, timestamp: timestamp}
timer.Reset(recommit) timer.Reset(recommit)
atomic.StoreInt32(&w.newTxs, 0) atomic.StoreInt32(&w.newTxs, 0)
} }
...@@ -352,7 +351,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) { ...@@ -352,7 +351,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
select { select {
case <-w.startCh: case <-w.startCh:
clearPending(w.chain.CurrentBlock().NumberU64()) clearPending(w.chain.CurrentBlock().NumberU64())
commit(false, commitInterruptNewHead) commit(commitInterruptNewHead)
// Remove this code for the OVM implementation. It is responsible for // Remove this code for the OVM implementation. It is responsible for
// cleaning up memory with the call to `clearPending`, so be sure to // cleaning up memory with the call to `clearPending`, so be sure to
...@@ -361,7 +360,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) { ...@@ -361,7 +360,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
case <-w.chainHeadCh: case <-w.chainHeadCh:
clearPending(head.Block.NumberU64()) clearPending(head.Block.NumberU64())
timestamp = time.Now().Unix() timestamp = time.Now().Unix()
commit(false, commitInterruptNewHead) commit(commitInterruptNewHead)
*/ */
case <-timer.C: case <-timer.C:
...@@ -373,7 +372,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) { ...@@ -373,7 +372,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
timer.Reset(recommit) timer.Reset(recommit)
continue continue
} }
commit(true, commitInterruptResubmit) commit(commitInterruptResubmit)
} }
case interval := <-w.resubmitIntervalCh: case interval := <-w.resubmitIntervalCh:
...@@ -421,7 +420,7 @@ func (w *worker) mainLoop() { ...@@ -421,7 +420,7 @@ func (w *worker) mainLoop() {
for { for {
select { select {
case req := <-w.newWorkCh: case req := <-w.newWorkCh:
w.commitNewWork(req.interrupt, req.noempty, req.timestamp) w.commitNewWork(req.interrupt, req.timestamp)
case ev := <-w.chainSideCh: case ev := <-w.chainSideCh:
// Short circuit for duplicate side blocks // Short circuit for duplicate side blocks
...@@ -458,7 +457,7 @@ func (w *worker) mainLoop() { ...@@ -458,7 +457,7 @@ func (w *worker) mainLoop() {
uncles = append(uncles, uncle.Header()) uncles = append(uncles, uncle.Header())
return false return false
}) })
w.commit(uncles, nil, true, start) w.commit(uncles, nil, start)
} }
} }
// Read from the sync service and mine single txs // Read from the sync service and mine single txs
...@@ -544,7 +543,7 @@ func (w *worker) mainLoop() { ...@@ -544,7 +543,7 @@ func (w *worker) mainLoop() {
// If clique is running in dev mode(period is 0), disable // If clique is running in dev mode(period is 0), disable
// advance sealing here. // advance sealing here.
if w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0 { if w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0 {
w.commitNewWork(nil, true, time.Now().Unix()) w.commitNewWork(nil, time.Now().Unix())
} }
} }
atomic.AddInt32(&w.newTxs, int32(len(ev.Txs))) atomic.AddInt32(&w.newTxs, int32(len(ev.Txs)))
...@@ -784,11 +783,6 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin ...@@ -784,11 +783,6 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
} }
var coalescedLogs []*types.Log var coalescedLogs []*types.Log
// UsingOVM
// Keep track of the number of transactions being added to the block.
// Blocks should only have a single transaction. This value is used to
// compute a success return value
var txCount int
for { for {
// In the following three cases, we will interrupt the execution of the transaction. // In the following three cases, we will interrupt the execution of the transaction.
...@@ -809,7 +803,7 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin ...@@ -809,7 +803,7 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
inc: true, inc: true,
} }
} }
return atomic.LoadInt32(interrupt) == commitInterruptNewHead return w.current.tcount == 0 || atomic.LoadInt32(interrupt) == commitInterruptNewHead
} }
// If we don't have enough gas for any further transactions then we're done // If we don't have enough gas for any further transactions then we're done
if w.current.gasPool.Gas() < params.TxGas { if w.current.gasPool.Gas() < params.TxGas {
...@@ -822,8 +816,6 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin ...@@ -822,8 +816,6 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
break break
} }
txCount++
// Error may be ignored here. The error has already been checked // Error may be ignored here. The error has already been checked
// during transaction acceptance is the transaction pool. // during transaction acceptance is the transaction pool.
// //
...@@ -891,7 +883,7 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin ...@@ -891,7 +883,7 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
if interrupt != nil { if interrupt != nil {
w.resubmitAdjustCh <- &intervalAdjust{inc: false} w.resubmitAdjustCh <- &intervalAdjust{inc: false}
} }
return txCount == 0 return w.current.tcount == 0
} }
// commitNewTx is an OVM addition that mines a block with a single tx in it. // commitNewTx is an OVM addition that mines a block with a single tx in it.
...@@ -956,11 +948,11 @@ func (w *worker) commitNewTx(tx *types.Transaction) error { ...@@ -956,11 +948,11 @@ func (w *worker) commitNewTx(tx *types.Transaction) error {
if w.commitTransactions(txs, w.coinbase, nil) { if w.commitTransactions(txs, w.coinbase, nil) {
return errors.New("Cannot commit transaction in miner") return errors.New("Cannot commit transaction in miner")
} }
return w.commit(nil, w.fullTaskHook, true, tstart) return w.commit(nil, w.fullTaskHook, tstart)
} }
// commitNewWork generates several new sealing tasks based on the parent block. // commitNewWork generates several new sealing tasks based on the parent block.
func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) { func (w *worker) commitNewWork(interrupt *int32, timestamp int64) {
w.mu.RLock() w.mu.RLock()
defer w.mu.RUnlock() defer w.mu.RUnlock()
...@@ -1036,12 +1028,6 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) ...@@ -1036,12 +1028,6 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
commitUncles(w.localUncles) commitUncles(w.localUncles)
commitUncles(w.remoteUncles) commitUncles(w.remoteUncles)
if !noempty {
// Create an empty block based on temporary copied state for sealing in advance without waiting block
// execution finished.
w.commit(uncles, nil, false, tstart)
}
// Fill the block with all available pending transactions. // Fill the block with all available pending transactions.
pending, err := w.eth.TxPool().Pending() pending, err := w.eth.TxPool().Pending()
if err != nil { if err != nil {
...@@ -1073,12 +1059,12 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) ...@@ -1073,12 +1059,12 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
return return
} }
} }
w.commit(uncles, w.fullTaskHook, true, tstart) w.commit(uncles, w.fullTaskHook, tstart)
} }
// commit runs any post-transaction state modifications, assembles the final block // commit runs any post-transaction state modifications, assembles the final block
// and commits new work if consensus engine is running. // and commits new work if consensus engine is running.
func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error { func (w *worker) commit(uncles []*types.Header, interval func(), start time.Time) error {
// Deep copy receipts here to avoid interaction between different tasks. // Deep copy receipts here to avoid interaction between different tasks.
receipts := make([]*types.Receipt, len(w.current.receipts)) receipts := make([]*types.Receipt, len(w.current.receipts))
for i, l := range w.current.receipts { for i, l := range w.current.receipts {
...@@ -1090,6 +1076,15 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st ...@@ -1090,6 +1076,15 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st
if err != nil { if err != nil {
return err return err
} }
// As a sanity check, ensure all new blocks have exactly one
// transaction. This check is done here just in case any of our
// higher-evel checks failed to catch empty blocks passed to commit.
txs := block.Transactions()
if len(txs) != 1 {
return fmt.Errorf("Block created with %d transactions rather than 1 at %d", len(txs), block.NumberU64())
}
if w.isRunning() { if w.isRunning() {
if interval != nil { if interval != nil {
interval() interval()
...@@ -1106,10 +1101,6 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st ...@@ -1106,10 +1101,6 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st
} }
feesEth := new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether))) feesEth := new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether)))
txs := block.Transactions()
if len(txs) != 1 {
return fmt.Errorf("Block created with not %d transactions at %d", len(txs), block.NumberU64())
}
tx := txs[0] tx := txs[0]
bn := tx.L1BlockNumber() bn := tx.L1BlockNumber()
if bn == nil { if bn == nil {
...@@ -1122,9 +1113,7 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st ...@@ -1122,9 +1113,7 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st
log.Info("Worker has exited") log.Info("Worker has exited")
} }
} }
if update { w.updateSnapshot()
w.updateSnapshot()
}
return nil return nil
} }
......
...@@ -328,15 +328,35 @@ func TestStreamUncleBlock(t *testing.T) { ...@@ -328,15 +328,35 @@ func TestStreamUncleBlock(t *testing.T) {
taskIndex := 0 taskIndex := 0
w.newTaskHook = func(task *task) { w.newTaskHook = func(task *task) {
if task.block.NumberU64() == 2 { if task.block.NumberU64() == 2 {
// The first task is an empty task, the second // The first task has 1 pending tx, the second one has 1
// one has 1 pending tx, the third one has 1 tx // tx and 1 uncle.
// and 1 uncle. numTxs := len(task.block.Transactions())
if taskIndex == 2 { numUncles := len(task.block.Uncles())
switch taskIndex {
case 0:
if numTxs != 1 {
t.Errorf("expected 1 tx in first task, got: %d", numTxs)
}
if numUncles != 0 {
t.Errorf("expected no uncles in first task, got: %d", numUncles)
}
case 1:
if numTxs != 1 {
t.Errorf("expected 1 tx in second task, got: %d", numTxs)
}
if numUncles != 1 {
t.Errorf("expected 1 uncle in second task, got: %d", numUncles)
}
have := task.block.Header().UncleHash have := task.block.Header().UncleHash
want := types.CalcUncleHash([]*types.Header{b.uncleBlock.Header()}) want := types.CalcUncleHash([]*types.Header{b.uncleBlock.Header()})
if have != want { if have != want {
t.Errorf("uncle hash mismatch: have %s, want %s", have.Hex(), want.Hex()) t.Errorf("uncle hash mismatch: have %s, want %s", have.Hex(), want.Hex())
} }
default:
t.Errorf("only expected two tasks")
} }
taskCh <- struct{}{} taskCh <- struct{}{}
taskIndex += 1 taskIndex += 1
...@@ -350,12 +370,10 @@ func TestStreamUncleBlock(t *testing.T) { ...@@ -350,12 +370,10 @@ func TestStreamUncleBlock(t *testing.T) {
} }
w.start() w.start()
for i := 0; i < 2; i += 1 { select {
select { case <-taskCh:
case <-taskCh: case <-time.NewTimer(time.Second).C:
case <-time.NewTimer(time.Second).C: t.Error("new task timeout")
t.Error("new task timeout")
}
} }
w.postSideBlock(core.ChainSideEvent{Block: b.uncleBlock}) w.postSideBlock(core.ChainSideEvent{Block: b.uncleBlock})
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment