Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
4dddba09
Unverified
Commit
4dddba09
authored
Jul 19, 2023
by
Adrian Sutton
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
op-e2e: Proof of concept for time travelling L1
parent
6c5d8c62
Changes
8
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
353 additions
and
104 deletions
+353
-104
fakepos.go
op-e2e/fakepos.go
+124
-0
faultproof_test.go
op-e2e/faultproof_test.go
+41
-0
geth.go
op-e2e/geth.go
+3
-101
setup.go
op-e2e/setup.go
+17
-1
advancing.go
op-service/clock/advancing.go
+71
-0
advancing_test.go
op-service/clock/advancing_test.go
+58
-0
deterministic.go
op-service/clock/deterministic.go
+6
-2
deterministic_test.go
op-service/clock/deterministic_test.go
+33
-0
No files found.
op-e2e/fakepos.go
0 → 100644
View file @
4dddba09
package
op_e2e
import
(
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/eth/catalyst"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
)
// fakePoS is a testing-only utility to attach to Geth,
// to build a fake proof-of-stake L1 chain with fixed block time and basic lagging safe/finalized blocks.
type
fakePoS
struct
{
clock
clock
.
Clock
eth
*
eth
.
Ethereum
log
log
.
Logger
blockTime
uint64
finalizedDistance
uint64
safeDistance
uint64
engineAPI
*
catalyst
.
ConsensusAPI
sub
ethereum
.
Subscription
}
func
(
f
*
fakePoS
)
Start
()
error
{
if
advancing
,
ok
:=
f
.
clock
.
(
*
clock
.
AdvancingClock
);
ok
{
advancing
.
Start
()
}
f
.
sub
=
event
.
NewSubscription
(
func
(
quit
<-
chan
struct
{})
error
{
// poll every half a second: enough to catch up with any block time when ticks are missed
t
:=
f
.
clock
.
NewTicker
(
time
.
Second
/
2
)
for
{
select
{
case
now
:=
<-
t
.
Ch
()
:
chain
:=
f
.
eth
.
BlockChain
()
head
:=
chain
.
CurrentBlock
()
finalized
:=
chain
.
CurrentFinalBlock
()
if
finalized
==
nil
{
// fallback to genesis if nothing is finalized
finalized
=
chain
.
Genesis
()
.
Header
()
}
safe
:=
chain
.
CurrentSafeBlock
()
if
safe
==
nil
{
// fallback to finalized if nothing is safe
safe
=
finalized
}
if
head
.
Number
.
Uint64
()
>
f
.
finalizedDistance
{
// progress finalized block, if we can
finalized
=
f
.
eth
.
BlockChain
()
.
GetHeaderByNumber
(
head
.
Number
.
Uint64
()
-
f
.
finalizedDistance
)
}
if
head
.
Number
.
Uint64
()
>
f
.
safeDistance
{
// progress safe block, if we can
safe
=
f
.
eth
.
BlockChain
()
.
GetHeaderByNumber
(
head
.
Number
.
Uint64
()
-
f
.
safeDistance
)
}
// start building the block as soon as we are past the current head time
if
head
.
Time
>=
uint64
(
now
.
Unix
())
{
continue
}
newBlockTime
:=
head
.
Time
+
f
.
blockTime
if
time
.
Unix
(
int64
(
newBlockTime
),
0
)
.
Add
(
5
*
time
.
Minute
)
.
Before
(
f
.
clock
.
Now
())
{
// We're a long way behind, let's skip some blocks...
newBlockTime
=
uint64
(
f
.
clock
.
Now
()
.
Unix
())
}
res
,
err
:=
f
.
engineAPI
.
ForkchoiceUpdatedV1
(
engine
.
ForkchoiceStateV1
{
HeadBlockHash
:
head
.
Hash
(),
SafeBlockHash
:
safe
.
Hash
(),
FinalizedBlockHash
:
finalized
.
Hash
(),
},
&
engine
.
PayloadAttributes
{
Timestamp
:
newBlockTime
,
Random
:
common
.
Hash
{},
SuggestedFeeRecipient
:
head
.
Coinbase
,
})
if
err
!=
nil
{
f
.
log
.
Error
(
"failed to start building L1 block"
,
"err"
,
err
)
continue
}
if
res
.
PayloadID
==
nil
{
f
.
log
.
Error
(
"failed to start block building"
,
"res"
,
res
)
continue
}
// wait with sealing, if we are not behind already
delay
:=
time
.
Unix
(
int64
(
newBlockTime
),
0
)
.
Sub
(
f
.
clock
.
Now
())
tim
:=
f
.
clock
.
NewTimer
(
delay
)
select
{
case
<-
tim
.
Ch
()
:
// no-op
case
<-
quit
:
tim
.
Stop
()
return
nil
}
payload
,
err
:=
f
.
engineAPI
.
GetPayloadV1
(
*
res
.
PayloadID
)
if
err
!=
nil
{
f
.
log
.
Error
(
"failed to finish building L1 block"
,
"err"
,
err
)
continue
}
if
_
,
err
:=
f
.
engineAPI
.
NewPayloadV1
(
*
payload
);
err
!=
nil
{
f
.
log
.
Error
(
"failed to insert built L1 block"
,
"err"
,
err
)
continue
}
if
_
,
err
:=
f
.
engineAPI
.
ForkchoiceUpdatedV1
(
engine
.
ForkchoiceStateV1
{
HeadBlockHash
:
payload
.
BlockHash
,
SafeBlockHash
:
safe
.
Hash
(),
FinalizedBlockHash
:
finalized
.
Hash
(),
},
nil
);
err
!=
nil
{
f
.
log
.
Error
(
"failed to make built L1 block canonical"
,
"err"
,
err
)
continue
}
case
<-
quit
:
return
nil
}
}
})
return
nil
}
func
(
f
*
fakePoS
)
Stop
()
error
{
f
.
sub
.
Unsubscribe
()
if
advancing
,
ok
:=
f
.
clock
.
(
*
clock
.
AdvancingClock
);
ok
{
advancing
.
Stop
()
}
return
nil
}
op-e2e/faultproof_test.go
0 → 100644
View file @
4dddba09
package
op_e2e
import
(
"context"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/stretchr/testify/require"
)
func
TestTimeTravel
(
t
*
testing
.
T
)
{
InitParallel
(
t
)
cfg
:=
DefaultSystemConfig
(
t
)
delete
(
cfg
.
Nodes
,
"verifier"
)
cfg
.
SupportL1TimeTravel
=
true
sys
,
err
:=
cfg
.
Start
()
require
.
Nil
(
t
,
err
,
"Error starting up system"
)
defer
sys
.
Close
()
l1Client
:=
sys
.
Clients
[
"l1"
]
preTravel
,
err
:=
l1Client
.
BlockByNumber
(
context
.
Background
(),
nil
)
require
.
NoError
(
t
,
err
)
sys
.
TimeTravelClock
.
AdvanceTime
(
24
*
time
.
Hour
)
// Check that the L1 chain reaches the new time reasonably quickly (ie without taking a week)
// It should be able to jump straight to the new time with just a single block
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
3
*
time
.
Minute
)
defer
cancel
()
err
=
e2eutils
.
WaitFor
(
ctx
,
time
.
Second
,
func
()
(
bool
,
error
)
{
postTravel
,
err
:=
l1Client
.
BlockByNumber
(
context
.
Background
(),
nil
)
if
err
!=
nil
{
return
false
,
err
}
diff
:=
time
.
Duration
(
postTravel
.
Time
()
-
preTravel
.
Time
())
*
time
.
Second
return
diff
.
Hours
()
>
23
,
nil
})
require
.
NoError
(
t
,
err
)
}
op-e2e/geth.go
View file @
4dddba09
...
@@ -9,9 +9,9 @@ import (
...
@@ -9,9 +9,9 @@ import (
"time"
"time"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core"
...
@@ -21,7 +21,6 @@ import (
...
@@ -21,7 +21,6 @@ import (
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/tracers"
"github.com/ethereum/go-ethereum/eth/tracers"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/node"
...
@@ -110,7 +109,7 @@ func waitForBlock(number *big.Int, client *ethclient.Client, timeout time.Durati
...
@@ -110,7 +109,7 @@ func waitForBlock(number *big.Int, client *ethclient.Client, timeout time.Durati
}
}
}
}
func
initL1Geth
(
cfg
*
SystemConfig
,
genesis
*
core
.
Genesis
,
opts
...
GethOption
)
(
*
node
.
Node
,
*
eth
.
Ethereum
,
error
)
{
func
initL1Geth
(
cfg
*
SystemConfig
,
genesis
*
core
.
Genesis
,
c
clock
.
Clock
,
opts
...
GethOption
)
(
*
node
.
Node
,
*
eth
.
Ethereum
,
error
)
{
ethConfig
:=
&
ethconfig
.
Config
{
ethConfig
:=
&
ethconfig
.
Config
{
NetworkId
:
cfg
.
DeployConfig
.
L1ChainID
,
NetworkId
:
cfg
.
DeployConfig
.
L1ChainID
,
Genesis
:
genesis
,
Genesis
:
genesis
,
...
@@ -134,6 +133,7 @@ func initL1Geth(cfg *SystemConfig, genesis *core.Genesis, opts ...GethOption) (*
...
@@ -134,6 +133,7 @@ func initL1Geth(cfg *SystemConfig, genesis *core.Genesis, opts ...GethOption) (*
// Instead of running a whole beacon node, we run this fake-proof-of-stake sidecar that sequences L1 blocks using the Engine API.
// Instead of running a whole beacon node, we run this fake-proof-of-stake sidecar that sequences L1 blocks using the Engine API.
l1Node
.
RegisterLifecycle
(
&
fakePoS
{
l1Node
.
RegisterLifecycle
(
&
fakePoS
{
clock
:
c
,
eth
:
l1Eth
,
eth
:
l1Eth
,
log
:
log
.
Root
(),
// geth logger is global anyway. Would be nice to replace with a local logger though.
log
:
log
.
Root
(),
// geth logger is global anyway. Would be nice to replace with a local logger though.
blockTime
:
cfg
.
DeployConfig
.
L1BlockTime
,
blockTime
:
cfg
.
DeployConfig
.
L1BlockTime
,
...
@@ -146,104 +146,6 @@ func initL1Geth(cfg *SystemConfig, genesis *core.Genesis, opts ...GethOption) (*
...
@@ -146,104 +146,6 @@ func initL1Geth(cfg *SystemConfig, genesis *core.Genesis, opts ...GethOption) (*
return
l1Node
,
l1Eth
,
nil
return
l1Node
,
l1Eth
,
nil
}
}
// fakePoS is a testing-only utility to attach to Geth,
// to build a fake proof-of-stake L1 chain with fixed block time and basic lagging safe/finalized blocks.
type
fakePoS
struct
{
eth
*
eth
.
Ethereum
log
log
.
Logger
blockTime
uint64
finalizedDistance
uint64
safeDistance
uint64
engineAPI
*
catalyst
.
ConsensusAPI
sub
ethereum
.
Subscription
}
func
(
f
*
fakePoS
)
Start
()
error
{
f
.
sub
=
event
.
NewSubscription
(
func
(
quit
<-
chan
struct
{})
error
{
// poll every half a second: enough to catch up with any block time when ticks are missed
t
:=
time
.
NewTicker
(
time
.
Second
/
2
)
for
{
select
{
case
now
:=
<-
t
.
C
:
chain
:=
f
.
eth
.
BlockChain
()
head
:=
chain
.
CurrentBlock
()
finalized
:=
chain
.
CurrentFinalBlock
()
if
finalized
==
nil
{
// fallback to genesis if nothing is finalized
finalized
=
chain
.
Genesis
()
.
Header
()
}
safe
:=
chain
.
CurrentSafeBlock
()
if
safe
==
nil
{
// fallback to finalized if nothing is safe
safe
=
finalized
}
if
head
.
Number
.
Uint64
()
>
f
.
finalizedDistance
{
// progress finalized block, if we can
finalized
=
f
.
eth
.
BlockChain
()
.
GetHeaderByNumber
(
head
.
Number
.
Uint64
()
-
f
.
finalizedDistance
)
}
if
head
.
Number
.
Uint64
()
>
f
.
safeDistance
{
// progress safe block, if we can
safe
=
f
.
eth
.
BlockChain
()
.
GetHeaderByNumber
(
head
.
Number
.
Uint64
()
-
f
.
safeDistance
)
}
// start building the block as soon as we are past the current head time
if
head
.
Time
>=
uint64
(
now
.
Unix
())
{
continue
}
res
,
err
:=
f
.
engineAPI
.
ForkchoiceUpdatedV1
(
engine
.
ForkchoiceStateV1
{
HeadBlockHash
:
head
.
Hash
(),
SafeBlockHash
:
safe
.
Hash
(),
FinalizedBlockHash
:
finalized
.
Hash
(),
},
&
engine
.
PayloadAttributes
{
Timestamp
:
head
.
Time
+
f
.
blockTime
,
Random
:
common
.
Hash
{},
SuggestedFeeRecipient
:
head
.
Coinbase
,
})
if
err
!=
nil
{
f
.
log
.
Error
(
"failed to start building L1 block"
,
"err"
,
err
)
continue
}
if
res
.
PayloadID
==
nil
{
f
.
log
.
Error
(
"failed to start block building"
,
"res"
,
res
)
continue
}
// wait with sealing, if we are not behind already
delay
:=
time
.
Until
(
time
.
Unix
(
int64
(
head
.
Time
+
f
.
blockTime
),
0
))
tim
:=
time
.
NewTimer
(
delay
)
select
{
case
<-
tim
.
C
:
// no-op
case
<-
quit
:
tim
.
Stop
()
return
nil
}
payload
,
err
:=
f
.
engineAPI
.
GetPayloadV1
(
*
res
.
PayloadID
)
if
err
!=
nil
{
f
.
log
.
Error
(
"failed to finish building L1 block"
,
"err"
,
err
)
continue
}
if
_
,
err
:=
f
.
engineAPI
.
NewPayloadV1
(
*
payload
);
err
!=
nil
{
f
.
log
.
Error
(
"failed to insert built L1 block"
,
"err"
,
err
)
continue
}
if
_
,
err
:=
f
.
engineAPI
.
ForkchoiceUpdatedV1
(
engine
.
ForkchoiceStateV1
{
HeadBlockHash
:
payload
.
BlockHash
,
SafeBlockHash
:
safe
.
Hash
(),
FinalizedBlockHash
:
finalized
.
Hash
(),
},
nil
);
err
!=
nil
{
f
.
log
.
Error
(
"failed to make built L1 block canonical"
,
"err"
,
err
)
continue
}
case
<-
quit
:
return
nil
}
}
})
return
nil
}
func
(
f
*
fakePoS
)
Stop
()
error
{
f
.
sub
.
Unsubscribe
()
return
nil
}
func
defaultNodeConfig
(
name
string
,
jwtPath
string
)
*
node
.
Config
{
func
defaultNodeConfig
(
name
string
,
jwtPath
string
)
*
node
.
Config
{
return
&
node
.
Config
{
return
&
node
.
Config
{
Name
:
name
,
Name
:
name
,
...
...
op-e2e/setup.go
View file @
4dddba09
...
@@ -241,6 +241,9 @@ type SystemConfig struct {
...
@@ -241,6 +241,9 @@ type SystemConfig struct {
// Target L1 tx size for the batcher transactions
// Target L1 tx size for the batcher transactions
BatcherTargetL1TxSizeBytes
uint64
BatcherTargetL1TxSizeBytes
uint64
// SupportL1TimeTravel determines if the L1 node supports quickly skipping forward in time
SupportL1TimeTravel
bool
}
}
type
System
struct
{
type
System
struct
{
...
@@ -258,6 +261,13 @@ type System struct {
...
@@ -258,6 +261,13 @@ type System struct {
L2OutputSubmitter
*
l2os
.
L2OutputSubmitter
L2OutputSubmitter
*
l2os
.
L2OutputSubmitter
BatchSubmitter
*
bss
.
BatchSubmitter
BatchSubmitter
*
bss
.
BatchSubmitter
Mocknet
mocknet
.
Mocknet
Mocknet
mocknet
.
Mocknet
// TimeTravelClock is nil unless SystemConfig.SupportL1TimeTravel was set to true
// It provides access to the clock instance used by the L1 node. Calling TimeTravelClock.AdvanceBy
// allows tests to quickly time travel L1 into the future.
// Note that this time travel may occur in a single block, creating a very large difference in the Time
// on sequential blocks.
TimeTravelClock
*
clock
.
AdvancingClock
}
}
func
(
sys
*
System
)
NodeEndpoint
(
name
string
)
string
{
func
(
sys
*
System
)
NodeEndpoint
(
name
string
)
string
{
...
@@ -339,6 +349,12 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
...
@@ -339,6 +349,12 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
}
}
}()
}()
c
:=
clock
.
SystemClock
if
cfg
.
SupportL1TimeTravel
{
sys
.
TimeTravelClock
=
clock
.
NewAdvancingClock
(
100
*
time
.
Millisecond
)
c
=
sys
.
TimeTravelClock
}
l1Genesis
,
err
:=
genesis
.
BuildL1DeveloperGenesis
(
cfg
.
DeployConfig
)
l1Genesis
,
err
:=
genesis
.
BuildL1DeveloperGenesis
(
cfg
.
DeployConfig
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
err
...
@@ -412,7 +428,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
...
@@ -412,7 +428,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
sys
.
RollupConfig
=
&
defaultConfig
sys
.
RollupConfig
=
&
defaultConfig
// Initialize nodes
// Initialize nodes
l1Node
,
l1Backend
,
err
:=
initL1Geth
(
&
cfg
,
l1Genesis
,
cfg
.
GethOptions
[
"l1"
]
...
)
l1Node
,
l1Backend
,
err
:=
initL1Geth
(
&
cfg
,
l1Genesis
,
c
,
c
fg
.
GethOptions
[
"l1"
]
...
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
err
}
}
...
...
op-service/clock/advancing.go
0 → 100644
View file @
4dddba09
package
clock
import
(
"sync/atomic"
"time"
)
type
AdvancingClock
struct
{
*
DeterministicClock
systemTime
Clock
ticker
Ticker
advanceEvery
time
.
Duration
quit
chan
interface
{}
running
atomic
.
Bool
lastTick
time
.
Time
}
// NewAdvancingClock creates a clock that, when started, advances at the same rate as the system clock but
// can also be advanced arbitrary amounts using the AdvanceTime method.
// Unlike the system clock, time does not progress smoothly but only increments when AdvancedTime is called or
// approximately after advanceEvery duration has elapsed. When advancing based on the system clock, the total time
// the system clock has advanced is added to the current time, preventing time differences from building up over time.
func
NewAdvancingClock
(
advanceEvery
time
.
Duration
)
*
AdvancingClock
{
now
:=
SystemClock
.
Now
()
return
&
AdvancingClock
{
DeterministicClock
:
NewDeterministicClock
(
now
),
systemTime
:
SystemClock
,
advanceEvery
:
advanceEvery
,
quit
:
make
(
chan
interface
{}),
lastTick
:
now
,
}
}
func
(
c
*
AdvancingClock
)
Start
()
{
if
!
c
.
running
.
CompareAndSwap
(
false
,
true
)
{
// Already running
return
}
c
.
ticker
=
c
.
systemTime
.
NewTicker
(
c
.
advanceEvery
)
go
func
()
{
for
{
select
{
case
now
:=
<-
c
.
ticker
.
Ch
()
:
c
.
onTick
(
now
)
case
<-
c
.
quit
:
return
}
}
}()
}
func
(
c
*
AdvancingClock
)
Stop
()
{
if
!
c
.
running
.
CompareAndSwap
(
true
,
false
)
{
// Already stopped
return
}
c
.
quit
<-
nil
}
func
(
c
*
AdvancingClock
)
onTick
(
now
time
.
Time
)
{
if
!
now
.
After
(
c
.
lastTick
)
{
// Time hasn't progressed for some reason, so do nothing
return
}
// Advance time by however long it has been since the last update.
// Ensures we don't drift from system time by more and more over time
advanceBy
:=
now
.
Sub
(
c
.
lastTick
)
c
.
AdvanceTime
(
advanceBy
)
c
.
lastTick
=
now
}
op-service/clock/advancing_test.go
0 → 100644
View file @
4dddba09
package
clock
import
(
"testing"
"time"
"github.com/stretchr/testify/require"
)
func
TestAdvancingClock_AdvancesByTimeBetweenTicks
(
t
*
testing
.
T
)
{
clock
,
realTime
:=
newTestAdvancingClock
(
1
*
time
.
Second
)
clock
.
Start
()
defer
clock
.
Stop
()
eventTicker
:=
clock
.
NewTicker
(
1
*
time
.
Second
)
start
:=
clock
.
Now
()
realTime
.
AdvanceTime
(
1
*
time
.
Second
)
require
.
Equal
(
t
,
start
.
Add
(
1
*
time
.
Second
),
<-
eventTicker
.
Ch
(),
"should trigger events when advancing"
)
require
.
Equal
(
t
,
start
.
Add
(
1
*
time
.
Second
),
clock
.
Now
(),
"Should advance on single tick"
)
start
=
clock
.
Now
()
realTime
.
AdvanceTime
(
15
*
time
.
Second
)
require
.
Equal
(
t
,
start
.
Add
(
15
*
time
.
Second
),
<-
eventTicker
.
Ch
(),
"should trigger events when advancing"
)
require
.
Equal
(
t
,
start
.
Add
(
15
*
time
.
Second
),
clock
.
Now
(),
"Should advance by time between ticks"
)
}
func
TestAdvancingClock_Stop
(
t
*
testing
.
T
)
{
clock
,
realTime
:=
newTestAdvancingClock
(
1
*
time
.
Second
)
clock
.
Start
()
defer
clock
.
Stop
()
eventTicker
:=
clock
.
NewTicker
(
1
*
time
.
Second
)
// Stop the clock again
clock
.
Stop
()
start
:=
clock
.
Now
()
realTime
.
AdvanceTime
(
15
*
time
.
Second
)
clock
.
Start
()
// Trigger the next tick
realTime
.
AdvanceTime
(
1
*
time
.
Second
)
// Time advances by the whole time the clock was stopped
// Note: if events were triggered while the clock was stopped, this event would be for the wrong time
require
.
Equal
(
t
,
start
.
Add
(
16
*
time
.
Second
),
<-
eventTicker
.
Ch
(),
"should trigger events again after restarting"
)
require
.
Equal
(
t
,
start
.
Add
(
16
*
time
.
Second
),
clock
.
Now
(),
"Should advance by time between ticks after restarting"
)
}
func
newTestAdvancingClock
(
advanceEvery
time
.
Duration
)
(
*
AdvancingClock
,
*
DeterministicClock
)
{
systemTime
:=
NewDeterministicClock
(
time
.
UnixMilli
(
1000
))
clock
:=
&
AdvancingClock
{
DeterministicClock
:
NewDeterministicClock
(
time
.
UnixMilli
(
5000
)),
systemTime
:
systemTime
,
advanceEvery
:
advanceEvery
,
quit
:
make
(
chan
interface
{}),
lastTick
:
systemTime
.
Now
(),
}
return
clock
,
systemTime
}
op-service/clock/deterministic.go
View file @
4dddba09
...
@@ -107,8 +107,12 @@ func (t *ticker) fire(now time.Time) bool {
...
@@ -107,8 +107,12 @@ func (t *ticker) fire(now time.Time) bool {
if
t
.
stopped
{
if
t
.
stopped
{
return
false
return
false
}
}
t
.
ch
<-
now
// Publish without blocking and only update due time if we publish successfully
t
.
nextDue
=
now
.
Add
(
t
.
period
)
select
{
case
t
.
ch
<-
now
:
t
.
nextDue
=
now
.
Add
(
t
.
period
)
default
:
}
return
true
return
true
}
}
...
...
op-service/clock/deterministic_test.go
View file @
4dddba09
...
@@ -2,6 +2,7 @@ package clock
...
@@ -2,6 +2,7 @@ package clock
import
(
import
(
"context"
"context"
"sync"
"sync/atomic"
"sync/atomic"
"testing"
"testing"
"time"
"time"
...
@@ -156,6 +157,38 @@ func TestNewTicker(t *testing.T) {
...
@@ -156,6 +157,38 @@ func TestNewTicker(t *testing.T) {
require
.
Len
(
t
,
ticker
.
Ch
(),
0
,
"should not fire until due again"
)
require
.
Len
(
t
,
ticker
.
Ch
(),
0
,
"should not fire until due again"
)
})
})
t
.
Run
(
"SkipsFiringWhenProcessingIsSlow"
,
func
(
t
*
testing
.
T
)
{
clock
:=
NewDeterministicClock
(
time
.
UnixMilli
(
1000
))
ticker
:=
clock
.
NewTicker
(
5
*
time
.
Second
)
// Fire once to fill the channel queue
clock
.
AdvanceTime
(
5
*
time
.
Second
)
firstEventTime
:=
clock
.
Now
()
var
startProcessing
sync
.
WaitGroup
startProcessing
.
Add
(
1
)
processedTicks
:=
make
(
chan
time
.
Time
)
go
func
()
{
startProcessing
.
Wait
()
// Read two events then exit
for
i
:=
0
;
i
<
2
;
i
++
{
event
:=
<-
ticker
.
Ch
()
processedTicks
<-
event
}
}()
// Advance time further before processing of events has started
// Can't publish any further events to the channel but shouldn't block
clock
.
AdvanceTime
(
30
*
time
.
Second
)
// Allow processing to start
startProcessing
.
Done
()
require
.
Equal
(
t
,
firstEventTime
,
<-
processedTicks
,
"Should process first event"
)
clock
.
AdvanceTime
(
5
*
time
.
Second
)
require
.
Equal
(
t
,
clock
.
Now
(),
<-
processedTicks
,
"Should skip to latest time"
)
})
t
.
Run
(
"StopFiring"
,
func
(
t
*
testing
.
T
)
{
t
.
Run
(
"StopFiring"
,
func
(
t
*
testing
.
T
)
{
clock
:=
NewDeterministicClock
(
time
.
UnixMilli
(
1000
))
clock
:=
NewDeterministicClock
(
time
.
UnixMilli
(
1000
))
ticker
:=
clock
.
NewTicker
(
5
*
time
.
Second
)
ticker
:=
clock
.
NewTicker
(
5
*
time
.
Second
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment