Commit 3695b730 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into 07-11-feat_Add_ts_contract_package_for_TS

parents f0be885a 4604825f
package op_e2e
import (
"time"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/eth/catalyst"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
)
// fakePoS is a testing-only utility to attach to Geth,
// to build a fake proof-of-stake L1 chain with fixed block time and basic lagging safe/finalized blocks.
type fakePoS struct {
clock clock.Clock
eth *eth.Ethereum
log log.Logger
blockTime uint64
finalizedDistance uint64
safeDistance uint64
engineAPI *catalyst.ConsensusAPI
sub ethereum.Subscription
}
func (f *fakePoS) Start() error {
if advancing, ok := f.clock.(*clock.AdvancingClock); ok {
advancing.Start()
}
f.sub = event.NewSubscription(func(quit <-chan struct{}) error {
// poll every half a second: enough to catch up with any block time when ticks are missed
t := f.clock.NewTicker(time.Second / 2)
for {
select {
case now := <-t.Ch():
chain := f.eth.BlockChain()
head := chain.CurrentBlock()
finalized := chain.CurrentFinalBlock()
if finalized == nil { // fallback to genesis if nothing is finalized
finalized = chain.Genesis().Header()
}
safe := chain.CurrentSafeBlock()
if safe == nil { // fallback to finalized if nothing is safe
safe = finalized
}
if head.Number.Uint64() > f.finalizedDistance { // progress finalized block, if we can
finalized = f.eth.BlockChain().GetHeaderByNumber(head.Number.Uint64() - f.finalizedDistance)
}
if head.Number.Uint64() > f.safeDistance { // progress safe block, if we can
safe = f.eth.BlockChain().GetHeaderByNumber(head.Number.Uint64() - f.safeDistance)
}
// start building the block as soon as we are past the current head time
if head.Time >= uint64(now.Unix()) {
continue
}
newBlockTime := head.Time + f.blockTime
if time.Unix(int64(newBlockTime), 0).Add(5 * time.Minute).Before(f.clock.Now()) {
// We're a long way behind, let's skip some blocks...
newBlockTime = uint64(f.clock.Now().Unix())
}
res, err := f.engineAPI.ForkchoiceUpdatedV1(engine.ForkchoiceStateV1{
HeadBlockHash: head.Hash(),
SafeBlockHash: safe.Hash(),
FinalizedBlockHash: finalized.Hash(),
}, &engine.PayloadAttributes{
Timestamp: newBlockTime,
Random: common.Hash{},
SuggestedFeeRecipient: head.Coinbase,
})
if err != nil {
f.log.Error("failed to start building L1 block", "err", err)
continue
}
if res.PayloadID == nil {
f.log.Error("failed to start block building", "res", res)
continue
}
// wait with sealing, if we are not behind already
delay := time.Unix(int64(newBlockTime), 0).Sub(f.clock.Now())
tim := f.clock.NewTimer(delay)
select {
case <-tim.Ch():
// no-op
case <-quit:
tim.Stop()
return nil
}
payload, err := f.engineAPI.GetPayloadV1(*res.PayloadID)
if err != nil {
f.log.Error("failed to finish building L1 block", "err", err)
continue
}
if _, err := f.engineAPI.NewPayloadV1(*payload); err != nil {
f.log.Error("failed to insert built L1 block", "err", err)
continue
}
if _, err := f.engineAPI.ForkchoiceUpdatedV1(engine.ForkchoiceStateV1{
HeadBlockHash: payload.BlockHash,
SafeBlockHash: safe.Hash(),
FinalizedBlockHash: finalized.Hash(),
}, nil); err != nil {
f.log.Error("failed to make built L1 block canonical", "err", err)
continue
}
case <-quit:
return nil
}
}
})
return nil
}
func (f *fakePoS) Stop() error {
f.sub.Unsubscribe()
if advancing, ok := f.clock.(*clock.AdvancingClock); ok {
advancing.Stop()
}
return nil
}
package op_e2e
import (
"context"
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/stretchr/testify/require"
)
func TestTimeTravel(t *testing.T) {
InitParallel(t)
cfg := DefaultSystemConfig(t)
delete(cfg.Nodes, "verifier")
cfg.SupportL1TimeTravel = true
sys, err := cfg.Start()
require.Nil(t, err, "Error starting up system")
defer sys.Close()
l1Client := sys.Clients["l1"]
preTravel, err := l1Client.BlockByNumber(context.Background(), nil)
require.NoError(t, err)
sys.TimeTravelClock.AdvanceTime(24 * time.Hour)
// Check that the L1 chain reaches the new time reasonably quickly (ie without taking a week)
// It should be able to jump straight to the new time with just a single block
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
err = e2eutils.WaitFor(ctx, time.Second, func() (bool, error) {
postTravel, err := l1Client.BlockByNumber(context.Background(), nil)
if err != nil {
return false, err
}
diff := time.Duration(postTravel.Time()-preTravel.Time()) * time.Second
return diff.Hours() > 23, nil
})
require.NoError(t, err)
}
...@@ -9,9 +9,9 @@ import ( ...@@ -9,9 +9,9 @@ import (
"time" "time"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
...@@ -21,7 +21,6 @@ import ( ...@@ -21,7 +21,6 @@ import (
"github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/eth/tracers"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/miner" "github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
...@@ -110,7 +109,7 @@ func waitForBlock(number *big.Int, client *ethclient.Client, timeout time.Durati ...@@ -110,7 +109,7 @@ func waitForBlock(number *big.Int, client *ethclient.Client, timeout time.Durati
} }
} }
func initL1Geth(cfg *SystemConfig, genesis *core.Genesis, opts ...GethOption) (*node.Node, *eth.Ethereum, error) { func initL1Geth(cfg *SystemConfig, genesis *core.Genesis, c clock.Clock, opts ...GethOption) (*node.Node, *eth.Ethereum, error) {
ethConfig := &ethconfig.Config{ ethConfig := &ethconfig.Config{
NetworkId: cfg.DeployConfig.L1ChainID, NetworkId: cfg.DeployConfig.L1ChainID,
Genesis: genesis, Genesis: genesis,
...@@ -134,6 +133,7 @@ func initL1Geth(cfg *SystemConfig, genesis *core.Genesis, opts ...GethOption) (* ...@@ -134,6 +133,7 @@ func initL1Geth(cfg *SystemConfig, genesis *core.Genesis, opts ...GethOption) (*
// Instead of running a whole beacon node, we run this fake-proof-of-stake sidecar that sequences L1 blocks using the Engine API. // Instead of running a whole beacon node, we run this fake-proof-of-stake sidecar that sequences L1 blocks using the Engine API.
l1Node.RegisterLifecycle(&fakePoS{ l1Node.RegisterLifecycle(&fakePoS{
clock: c,
eth: l1Eth, eth: l1Eth,
log: log.Root(), // geth logger is global anyway. Would be nice to replace with a local logger though. log: log.Root(), // geth logger is global anyway. Would be nice to replace with a local logger though.
blockTime: cfg.DeployConfig.L1BlockTime, blockTime: cfg.DeployConfig.L1BlockTime,
...@@ -146,104 +146,6 @@ func initL1Geth(cfg *SystemConfig, genesis *core.Genesis, opts ...GethOption) (* ...@@ -146,104 +146,6 @@ func initL1Geth(cfg *SystemConfig, genesis *core.Genesis, opts ...GethOption) (*
return l1Node, l1Eth, nil return l1Node, l1Eth, nil
} }
// fakePoS is a testing-only utility to attach to Geth,
// to build a fake proof-of-stake L1 chain with fixed block time and basic lagging safe/finalized blocks.
type fakePoS struct {
eth *eth.Ethereum
log log.Logger
blockTime uint64
finalizedDistance uint64
safeDistance uint64
engineAPI *catalyst.ConsensusAPI
sub ethereum.Subscription
}
func (f *fakePoS) Start() error {
f.sub = event.NewSubscription(func(quit <-chan struct{}) error {
// poll every half a second: enough to catch up with any block time when ticks are missed
t := time.NewTicker(time.Second / 2)
for {
select {
case now := <-t.C:
chain := f.eth.BlockChain()
head := chain.CurrentBlock()
finalized := chain.CurrentFinalBlock()
if finalized == nil { // fallback to genesis if nothing is finalized
finalized = chain.Genesis().Header()
}
safe := chain.CurrentSafeBlock()
if safe == nil { // fallback to finalized if nothing is safe
safe = finalized
}
if head.Number.Uint64() > f.finalizedDistance { // progress finalized block, if we can
finalized = f.eth.BlockChain().GetHeaderByNumber(head.Number.Uint64() - f.finalizedDistance)
}
if head.Number.Uint64() > f.safeDistance { // progress safe block, if we can
safe = f.eth.BlockChain().GetHeaderByNumber(head.Number.Uint64() - f.safeDistance)
}
// start building the block as soon as we are past the current head time
if head.Time >= uint64(now.Unix()) {
continue
}
res, err := f.engineAPI.ForkchoiceUpdatedV1(engine.ForkchoiceStateV1{
HeadBlockHash: head.Hash(),
SafeBlockHash: safe.Hash(),
FinalizedBlockHash: finalized.Hash(),
}, &engine.PayloadAttributes{
Timestamp: head.Time + f.blockTime,
Random: common.Hash{},
SuggestedFeeRecipient: head.Coinbase,
})
if err != nil {
f.log.Error("failed to start building L1 block", "err", err)
continue
}
if res.PayloadID == nil {
f.log.Error("failed to start block building", "res", res)
continue
}
// wait with sealing, if we are not behind already
delay := time.Until(time.Unix(int64(head.Time+f.blockTime), 0))
tim := time.NewTimer(delay)
select {
case <-tim.C:
// no-op
case <-quit:
tim.Stop()
return nil
}
payload, err := f.engineAPI.GetPayloadV1(*res.PayloadID)
if err != nil {
f.log.Error("failed to finish building L1 block", "err", err)
continue
}
if _, err := f.engineAPI.NewPayloadV1(*payload); err != nil {
f.log.Error("failed to insert built L1 block", "err", err)
continue
}
if _, err := f.engineAPI.ForkchoiceUpdatedV1(engine.ForkchoiceStateV1{
HeadBlockHash: payload.BlockHash,
SafeBlockHash: safe.Hash(),
FinalizedBlockHash: finalized.Hash(),
}, nil); err != nil {
f.log.Error("failed to make built L1 block canonical", "err", err)
continue
}
case <-quit:
return nil
}
}
})
return nil
}
func (f *fakePoS) Stop() error {
f.sub.Unsubscribe()
return nil
}
func defaultNodeConfig(name string, jwtPath string) *node.Config { func defaultNodeConfig(name string, jwtPath string) *node.Config {
return &node.Config{ return &node.Config{
Name: name, Name: name,
......
...@@ -241,6 +241,9 @@ type SystemConfig struct { ...@@ -241,6 +241,9 @@ type SystemConfig struct {
// Target L1 tx size for the batcher transactions // Target L1 tx size for the batcher transactions
BatcherTargetL1TxSizeBytes uint64 BatcherTargetL1TxSizeBytes uint64
// SupportL1TimeTravel determines if the L1 node supports quickly skipping forward in time
SupportL1TimeTravel bool
} }
type System struct { type System struct {
...@@ -258,6 +261,13 @@ type System struct { ...@@ -258,6 +261,13 @@ type System struct {
L2OutputSubmitter *l2os.L2OutputSubmitter L2OutputSubmitter *l2os.L2OutputSubmitter
BatchSubmitter *bss.BatchSubmitter BatchSubmitter *bss.BatchSubmitter
Mocknet mocknet.Mocknet Mocknet mocknet.Mocknet
// TimeTravelClock is nil unless SystemConfig.SupportL1TimeTravel was set to true
// It provides access to the clock instance used by the L1 node. Calling TimeTravelClock.AdvanceBy
// allows tests to quickly time travel L1 into the future.
// Note that this time travel may occur in a single block, creating a very large difference in the Time
// on sequential blocks.
TimeTravelClock *clock.AdvancingClock
} }
func (sys *System) NodeEndpoint(name string) string { func (sys *System) NodeEndpoint(name string) string {
...@@ -339,6 +349,12 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -339,6 +349,12 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
} }
}() }()
c := clock.SystemClock
if cfg.SupportL1TimeTravel {
sys.TimeTravelClock = clock.NewAdvancingClock(100 * time.Millisecond)
c = sys.TimeTravelClock
}
l1Genesis, err := genesis.BuildL1DeveloperGenesis(cfg.DeployConfig) l1Genesis, err := genesis.BuildL1DeveloperGenesis(cfg.DeployConfig)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -412,7 +428,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { ...@@ -412,7 +428,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) {
sys.RollupConfig = &defaultConfig sys.RollupConfig = &defaultConfig
// Initialize nodes // Initialize nodes
l1Node, l1Backend, err := initL1Geth(&cfg, l1Genesis, cfg.GethOptions["l1"]...) l1Node, l1Backend, err := initL1Geth(&cfg, l1Genesis, c, cfg.GethOptions["l1"]...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
package clock
import (
"sync/atomic"
"time"
)
type AdvancingClock struct {
*DeterministicClock
systemTime Clock
ticker Ticker
advanceEvery time.Duration
quit chan interface{}
running atomic.Bool
lastTick time.Time
}
// NewAdvancingClock creates a clock that, when started, advances at the same rate as the system clock but
// can also be advanced arbitrary amounts using the AdvanceTime method.
// Unlike the system clock, time does not progress smoothly but only increments when AdvancedTime is called or
// approximately after advanceEvery duration has elapsed. When advancing based on the system clock, the total time
// the system clock has advanced is added to the current time, preventing time differences from building up over time.
func NewAdvancingClock(advanceEvery time.Duration) *AdvancingClock {
now := SystemClock.Now()
return &AdvancingClock{
DeterministicClock: NewDeterministicClock(now),
systemTime: SystemClock,
advanceEvery: advanceEvery,
quit: make(chan interface{}),
lastTick: now,
}
}
func (c *AdvancingClock) Start() {
if !c.running.CompareAndSwap(false, true) {
// Already running
return
}
c.ticker = c.systemTime.NewTicker(c.advanceEvery)
go func() {
for {
select {
case now := <-c.ticker.Ch():
c.onTick(now)
case <-c.quit:
return
}
}
}()
}
func (c *AdvancingClock) Stop() {
if !c.running.CompareAndSwap(true, false) {
// Already stopped
return
}
c.quit <- nil
}
func (c *AdvancingClock) onTick(now time.Time) {
if !now.After(c.lastTick) {
// Time hasn't progressed for some reason, so do nothing
return
}
// Advance time by however long it has been since the last update.
// Ensures we don't drift from system time by more and more over time
advanceBy := now.Sub(c.lastTick)
c.AdvanceTime(advanceBy)
c.lastTick = now
}
package clock
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestAdvancingClock_AdvancesByTimeBetweenTicks(t *testing.T) {
clock, realTime := newTestAdvancingClock(1 * time.Second)
clock.Start()
defer clock.Stop()
eventTicker := clock.NewTicker(1 * time.Second)
start := clock.Now()
realTime.AdvanceTime(1 * time.Second)
require.Equal(t, start.Add(1*time.Second), <-eventTicker.Ch(), "should trigger events when advancing")
require.Equal(t, start.Add(1*time.Second), clock.Now(), "Should advance on single tick")
start = clock.Now()
realTime.AdvanceTime(15 * time.Second)
require.Equal(t, start.Add(15*time.Second), <-eventTicker.Ch(), "should trigger events when advancing")
require.Equal(t, start.Add(15*time.Second), clock.Now(), "Should advance by time between ticks")
}
func TestAdvancingClock_Stop(t *testing.T) {
clock, realTime := newTestAdvancingClock(1 * time.Second)
clock.Start()
defer clock.Stop()
eventTicker := clock.NewTicker(1 * time.Second)
// Stop the clock again
clock.Stop()
start := clock.Now()
realTime.AdvanceTime(15 * time.Second)
clock.Start()
// Trigger the next tick
realTime.AdvanceTime(1 * time.Second)
// Time advances by the whole time the clock was stopped
// Note: if events were triggered while the clock was stopped, this event would be for the wrong time
require.Equal(t, start.Add(16*time.Second), <-eventTicker.Ch(), "should trigger events again after restarting")
require.Equal(t, start.Add(16*time.Second), clock.Now(), "Should advance by time between ticks after restarting")
}
func newTestAdvancingClock(advanceEvery time.Duration) (*AdvancingClock, *DeterministicClock) {
systemTime := NewDeterministicClock(time.UnixMilli(1000))
clock := &AdvancingClock{
DeterministicClock: NewDeterministicClock(time.UnixMilli(5000)),
systemTime: systemTime,
advanceEvery: advanceEvery,
quit: make(chan interface{}),
lastTick: systemTime.Now(),
}
return clock, systemTime
}
...@@ -13,6 +13,8 @@ type Clock interface { ...@@ -13,6 +13,8 @@ type Clock interface {
// It is equivalent to time.After // It is equivalent to time.After
After(d time.Duration) <-chan time.Time After(d time.Duration) <-chan time.Time
AfterFunc(d time.Duration, f func()) Timer
// NewTicker returns a new Ticker containing a channel that will send // NewTicker returns a new Ticker containing a channel that will send
// the current time on the channel after each tick. The period of the // the current time on the channel after each tick. The period of the
// ticks is specified by the duration argument. The ticker will adjust // ticks is specified by the duration argument. The ticker will adjust
...@@ -20,6 +22,10 @@ type Clock interface { ...@@ -20,6 +22,10 @@ type Clock interface {
// The duration d must be greater than zero; if not, NewTicker will // The duration d must be greater than zero; if not, NewTicker will
// panic. Stop the ticker to release associated resources. // panic. Stop the ticker to release associated resources.
NewTicker(d time.Duration) Ticker NewTicker(d time.Duration) Ticker
// NewTimer creates a new Timer that will send
// the current time on its channel after at least duration d.
NewTimer(d time.Duration) Timer
} }
// A Ticker holds a channel that delivers "ticks" of a clock at intervals // A Ticker holds a channel that delivers "ticks" of a clock at intervals
...@@ -38,6 +44,25 @@ type Ticker interface { ...@@ -38,6 +44,25 @@ type Ticker interface {
Reset(d time.Duration) Reset(d time.Duration)
} }
// Timer represents a single event.
type Timer interface {
// Ch returns the channel for the ticker. Equivalent to time.Timer.C
Ch() <-chan time.Time
// Stop prevents the Timer from firing.
// It returns true if the call stops the timer, false if the timer has already
// expired or been stopped.
// Stop does not close the channel, to prevent a read from the channel succeeding
// incorrectly.
//
// For a timer created with AfterFunc(d, f), if t.Stop returns false, then the timer
// has already expired and the function f has been started in its own goroutine;
// Stop does not wait for f to complete before returning.
// If the caller needs to know whether f is completed, it must coordinate
// with f explicitly.
Stop() bool
}
// SystemClock provides an instance of Clock that uses the system clock via methods in the time package. // SystemClock provides an instance of Clock that uses the system clock via methods in the time package.
var SystemClock Clock = systemClock{} var SystemClock Clock = systemClock{}
...@@ -63,3 +88,19 @@ func (t *SystemTicker) Ch() <-chan time.Time { ...@@ -63,3 +88,19 @@ func (t *SystemTicker) Ch() <-chan time.Time {
func (s systemClock) NewTicker(d time.Duration) Ticker { func (s systemClock) NewTicker(d time.Duration) Ticker {
return &SystemTicker{time.NewTicker(d)} return &SystemTicker{time.NewTicker(d)}
} }
func (s systemClock) NewTimer(d time.Duration) Timer {
return &SystemTimer{time.NewTimer(d)}
}
type SystemTimer struct {
*time.Timer
}
func (t *SystemTimer) Ch() <-chan time.Time {
return t.C
}
func (s systemClock) AfterFunc(d time.Duration, f func()) Timer {
return &SystemTimer{time.AfterFunc(d, f)}
}
...@@ -29,6 +29,43 @@ func (t task) fire(now time.Time) bool { ...@@ -29,6 +29,43 @@ func (t task) fire(now time.Time) bool {
return false return false
} }
type timer struct {
f func()
ch chan time.Time
due time.Time
stopped bool
run bool
sync.Mutex
}
func (t *timer) isDue(now time.Time) bool {
t.Lock()
defer t.Unlock()
return !t.due.After(now)
}
func (t *timer) fire(now time.Time) bool {
t.Lock()
defer t.Unlock()
if !t.stopped {
t.f()
t.run = true
}
return false
}
func (t *timer) Ch() <-chan time.Time {
return t.ch
}
func (t *timer) Stop() bool {
t.Lock()
defer t.Unlock()
r := !t.stopped && !t.run
t.stopped = true
return r
}
type ticker struct { type ticker struct {
c Clock c Clock
ch chan time.Time ch chan time.Time
...@@ -70,8 +107,12 @@ func (t *ticker) fire(now time.Time) bool { ...@@ -70,8 +107,12 @@ func (t *ticker) fire(now time.Time) bool {
if t.stopped { if t.stopped {
return false return false
} }
t.ch <- now // Publish without blocking and only update due time if we publish successfully
select {
case t.ch <- now:
t.nextDue = now.Add(t.period) t.nextDue = now.Add(t.period)
default:
}
return true return true
} }
...@@ -110,6 +151,18 @@ func (s *DeterministicClock) After(d time.Duration) <-chan time.Time { ...@@ -110,6 +151,18 @@ func (s *DeterministicClock) After(d time.Duration) <-chan time.Time {
return ch return ch
} }
func (s *DeterministicClock) AfterFunc(d time.Duration, f func()) Timer {
s.lock.Lock()
defer s.lock.Unlock()
timer := &timer{f: f, due: s.now.Add(d)}
if d.Nanoseconds() == 0 {
timer.fire(s.now)
} else {
s.addPending(timer)
}
return timer
}
func (s *DeterministicClock) NewTicker(d time.Duration) Ticker { func (s *DeterministicClock) NewTicker(d time.Duration) Ticker {
if d <= 0 { if d <= 0 {
panic("Continuously firing tickers are a really bad idea") panic("Continuously firing tickers are a really bad idea")
...@@ -127,6 +180,21 @@ func (s *DeterministicClock) NewTicker(d time.Duration) Ticker { ...@@ -127,6 +180,21 @@ func (s *DeterministicClock) NewTicker(d time.Duration) Ticker {
return t return t
} }
func (s *DeterministicClock) NewTimer(d time.Duration) Timer {
s.lock.Lock()
defer s.lock.Unlock()
ch := make(chan time.Time, 1)
t := &timer{
f: func() {
ch <- s.now
},
ch: ch,
due: s.now.Add(d),
}
s.addPending(t)
return t
}
func (s *DeterministicClock) addPending(t action) { func (s *DeterministicClock) addPending(t action) {
s.pending = append(s.pending, t) s.pending = append(s.pending, t)
select { select {
......
...@@ -2,6 +2,8 @@ package clock ...@@ -2,6 +2,8 @@ package clock
import ( import (
"context" "context"
"sync"
"sync/atomic"
"testing" "testing"
"time" "time"
...@@ -62,6 +64,64 @@ func TestAfter(t *testing.T) { ...@@ -62,6 +64,64 @@ func TestAfter(t *testing.T) {
}) })
} }
func TestAfterFunc(t *testing.T) {
t.Run("ZeroExecutesImmediately", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ran := new(atomic.Bool)
timer := clock.AfterFunc(0, func() { ran.Store(true) })
require.True(t, ran.Load(), "duration should already have been reached")
require.False(t, timer.Stop(), "Stop should return false after executing")
})
t.Run("CompletesWhenTimeAdvances", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ran := new(atomic.Bool)
timer := clock.AfterFunc(500*time.Millisecond, func() { ran.Store(true) })
require.False(t, ran.Load(), "should not complete immediately")
clock.AdvanceTime(499 * time.Millisecond)
require.False(t, ran.Load(), "should not complete before time is due")
clock.AdvanceTime(1 * time.Millisecond)
require.True(t, ran.Load(), "should complete when time is reached")
require.False(t, timer.Stop(), "Stop should return false after executing")
})
t.Run("CompletesWhenTimeAdvancesPastDue", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ran := new(atomic.Bool)
timer := clock.AfterFunc(500*time.Millisecond, func() { ran.Store(true) })
require.False(t, ran.Load(), "should not complete immediately")
clock.AdvanceTime(9000 * time.Millisecond)
require.True(t, ran.Load(), "should complete when time is reached")
require.False(t, timer.Stop(), "Stop should return false after executing")
})
t.Run("RegisterAsPending", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ran := new(atomic.Bool)
clock.AfterFunc(500*time.Millisecond, func() { ran.Store(true) })
ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFunc()
require.True(t, clock.WaitForNewPendingTask(ctx), "should have added a new pending task")
})
t.Run("DoNotRunIfStopped", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ran := new(atomic.Bool)
timer := clock.AfterFunc(500*time.Millisecond, func() { ran.Store(true) })
require.False(t, ran.Load(), "should not complete immediately")
require.True(t, timer.Stop(), "Stop should return true on first call")
require.False(t, timer.Stop(), "Stop should return false on subsequent calls")
clock.AdvanceTime(9000 * time.Millisecond)
require.False(t, ran.Load(), "should not run when time is reached")
})
}
func TestNewTicker(t *testing.T) { func TestNewTicker(t *testing.T) {
t.Run("FiresAfterEachDuration", func(t *testing.T) { t.Run("FiresAfterEachDuration", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000)) clock := NewDeterministicClock(time.UnixMilli(1000))
...@@ -97,6 +157,38 @@ func TestNewTicker(t *testing.T) { ...@@ -97,6 +157,38 @@ func TestNewTicker(t *testing.T) {
require.Len(t, ticker.Ch(), 0, "should not fire until due again") require.Len(t, ticker.Ch(), 0, "should not fire until due again")
}) })
t.Run("SkipsFiringWhenProcessingIsSlow", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
ticker := clock.NewTicker(5 * time.Second)
// Fire once to fill the channel queue
clock.AdvanceTime(5 * time.Second)
firstEventTime := clock.Now()
var startProcessing sync.WaitGroup
startProcessing.Add(1)
processedTicks := make(chan time.Time)
go func() {
startProcessing.Wait()
// Read two events then exit
for i := 0; i < 2; i++ {
event := <-ticker.Ch()
processedTicks <- event
}
}()
// Advance time further before processing of events has started
// Can't publish any further events to the channel but shouldn't block
clock.AdvanceTime(30 * time.Second)
// Allow processing to start
startProcessing.Done()
require.Equal(t, firstEventTime, <-processedTicks, "Should process first event")
clock.AdvanceTime(5 * time.Second)
require.Equal(t, clock.Now(), <-processedTicks, "Should skip to latest time")
})
t.Run("StopFiring", func(t *testing.T) { t.Run("StopFiring", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000)) clock := NewDeterministicClock(time.UnixMilli(1000))
ticker := clock.NewTicker(5 * time.Second) ticker := clock.NewTicker(5 * time.Second)
...@@ -158,6 +250,46 @@ func TestNewTicker(t *testing.T) { ...@@ -158,6 +250,46 @@ func TestNewTicker(t *testing.T) {
}) })
} }
func TestNewTimer(t *testing.T) {
t.Run("FireOnceAfterDuration", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
timer := clock.NewTimer(5 * time.Second)
require.Len(t, timer.Ch(), 0, "should not fire immediately")
clock.AdvanceTime(4 * time.Second)
require.Len(t, timer.Ch(), 0, "should not fire before due")
clock.AdvanceTime(1 * time.Second)
require.Len(t, timer.Ch(), 1, "should fire when due")
require.Equal(t, clock.Now(), <-timer.Ch(), "should post current time")
clock.AdvanceTime(6 * time.Second)
require.Len(t, timer.Ch(), 0, "should not fire when due again")
})
t.Run("StopBeforeExecuted", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
timer := clock.NewTimer(5 * time.Second)
require.True(t, timer.Stop())
clock.AdvanceTime(10 * time.Second)
require.Len(t, timer.Ch(), 0, "should not fire after stop")
})
t.Run("StopAfterExecuted", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000))
timer := clock.NewTimer(5 * time.Second)
clock.AdvanceTime(10 * time.Second)
require.Len(t, timer.Ch(), 1, "should fire when due")
require.Equal(t, clock.Now(), <-timer.Ch(), "should post current time")
require.False(t, timer.Stop())
})
}
func TestWaitForPending(t *testing.T) { func TestWaitForPending(t *testing.T) {
t.Run("DoNotBlockWhenAlreadyPending", func(t *testing.T) { t.Run("DoNotBlockWhenAlreadyPending", func(t *testing.T) {
clock := NewDeterministicClock(time.UnixMilli(1000)) clock := NewDeterministicClock(time.UnixMilli(1000))
......
...@@ -19,3 +19,9 @@ test: ...@@ -19,3 +19,9 @@ test:
lint: lint:
go vet ./... go vet ./...
.PHONY: test .PHONY: test
tls:
kubectl get secrets op-ufm-client-tls -o yaml | yq '.data."tls.key"' | base64 --decode > tls/tls.key
kubectl get secrets op-ufm-client-tls -o yaml | yq '.data."tls.crt"' | base64 --decode > tls/tls.crt
kubectl get secrets op-ufm-client-tls -o yaml | yq '.data."ca.crt"' | base64 --decode > tls/ca.crt
.PHONY: tls
...@@ -3,6 +3,7 @@ module github.com/ethereum-optimism/optimism/op-ufm ...@@ -3,6 +3,7 @@ module github.com/ethereum-optimism/optimism/op-ufm
go 1.20 go 1.20
require ( require (
cloud.google.com/go/kms v1.12.1
github.com/BurntSushi/toml v1.3.2 github.com/BurntSushi/toml v1.3.2
github.com/ethereum-optimism/optimism/op-service v0.10.14 github.com/ethereum-optimism/optimism/op-service v0.10.14
github.com/ethereum-optimism/optimism/op-signer v0.1.1 github.com/ethereum-optimism/optimism/op-signer v0.1.1
...@@ -11,9 +12,13 @@ require ( ...@@ -11,9 +12,13 @@ require (
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.16.0 github.com/prometheus/client_golang v1.16.0
github.com/rs/cors v1.9.0 github.com/rs/cors v1.9.0
google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130
) )
require ( require (
cloud.google.com/go/compute v1.20.1 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.0 // indirect
github.com/DataDog/zstd v1.5.2 // indirect github.com/DataDog/zstd v1.5.2 // indirect
github.com/VictoriaMetrics/fastcache v1.10.0 // indirect github.com/VictoriaMetrics/fastcache v1.10.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
...@@ -32,8 +37,13 @@ require ( ...@@ -32,8 +37,13 @@ require (
github.com/go-stack/stack v1.8.1 // indirect github.com/go-stack/stack v1.8.1 // indirect
github.com/gofrs/flock v0.8.1 // indirect github.com/gofrs/flock v0.8.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.5 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
github.com/holiman/uint256 v1.2.2-0.20230321075855-87b91420868c // indirect github.com/holiman/uint256 v1.2.2-0.20230321075855-87b91420868c // indirect
...@@ -55,10 +65,18 @@ require ( ...@@ -55,10 +65,18 @@ require (
github.com/tklauser/numcpus v0.5.0 // indirect github.com/tklauser/numcpus v0.5.0 // indirect
github.com/urfave/cli v1.22.9 // indirect github.com/urfave/cli v1.22.9 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect
golang.org/x/crypto v0.1.0 // indirect go.opencensus.io v0.24.0 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/exp v0.0.0-20230206171751-46f607a40771 // indirect golang.org/x/exp v0.0.0-20230206171751-46f607a40771 // indirect
golang.org/x/sys v0.8.0 // indirect golang.org/x/net v0.12.0 // indirect
golang.org/x/text v0.8.0 // indirect golang.org/x/oauth2 v0.10.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect golang.org/x/sys v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
google.golang.org/api v0.132.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230706204954-ccb25ca9f130 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/grpc v1.56.2 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
) )
This diff is collapsed.
...@@ -5,8 +5,6 @@ import ( ...@@ -5,8 +5,6 @@ import (
"time" "time"
"github.com/ethereum-optimism/optimism/op-ufm/pkg/metrics" "github.com/ethereum-optimism/optimism/op-ufm/pkg/metrics"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
...@@ -33,9 +31,7 @@ func Dial(providerName string, url string) (*InstrumentedEthClient, error) { ...@@ -33,9 +31,7 @@ func Dial(providerName string, url string) (*InstrumentedEthClient, error) {
func (i *InstrumentedEthClient) TransactionByHash(ctx context.Context, hash common.Hash) (*types.Transaction, bool, error) { func (i *InstrumentedEthClient) TransactionByHash(ctx context.Context, hash common.Hash) (*types.Transaction, bool, error) {
start := time.Now() start := time.Now()
log.Debug(">> TransactionByHash", "hash", hash, "provider", i.providerName)
tx, isPending, err := i.c.TransactionByHash(ctx, hash) tx, isPending, err := i.c.TransactionByHash(ctx, hash)
log.Debug("<< TransactionByHash", "tx", tx, "isPending", isPending, "err", err, "hash", hash, "provider", i.providerName)
if err != nil { if err != nil {
if !i.ignorableErrors(err) { if !i.ignorableErrors(err) {
metrics.RecordError(i.providerName, "ethclient.TransactionByHash") metrics.RecordError(i.providerName, "ethclient.TransactionByHash")
......
package main
import (
"context"
"crypto/x509/pkix"
"encoding/asn1"
"encoding/pem"
"fmt"
"os"
kms "cloud.google.com/go/kms/apiv1"
"cloud.google.com/go/kms/apiv1/kmspb"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
)
func main() {
println("kmstool - usage: kmstool <key>")
if len(os.Args) < 2 {
panic("missing <key>")
}
keyName := os.Args[1]
ctx := context.Background()
client, err := kms.NewKeyManagementClient(ctx)
if err != nil {
panic(fmt.Sprintf("failed to create kms client: %w", err))
}
defer client.Close()
addr, err := resolveAddr(ctx, client, keyName)
if err != nil {
panic(fmt.Sprintf("failed to retrieve the key: %w", err))
}
fmt.Printf("ethereum addr: %s", addr)
println()
println()
}
func resolveAddr(ctx context.Context, client *kms.KeyManagementClient, keyName string) (common.Address, error) {
resp, err := client.GetPublicKey(ctx, &kmspb.GetPublicKeyRequest{Name: keyName})
if err != nil {
return common.Address{}, fmt.Errorf("google kms public key %q lookup: %w", keyName, err)
}
block, _ := pem.Decode([]byte(resp.Pem))
if block == nil {
return common.Address{}, fmt.Errorf("google kms public key %q pem empty: %.130q", keyName, resp.Pem)
}
var info struct {
AlgID pkix.AlgorithmIdentifier
Key asn1.BitString
}
_, err = asn1.Unmarshal(block.Bytes, &info)
if err != nil {
return common.Address{}, fmt.Errorf("google kms public key %q pem block %q: %v", keyName, block.Type, err)
}
wantAlg := asn1.ObjectIdentifier{1, 2, 840, 10045, 2, 1}
if gotAlg := info.AlgID.Algorithm; !gotAlg.Equal(wantAlg) {
return common.Address{}, fmt.Errorf("google kms public key %q asn.1 algorithm %s intead of %s", keyName, gotAlg, wantAlg)
}
return pubKeyAddr(info.Key.Bytes), nil
}
// PubKeyAddr returns the Ethereum address for the (uncompressed) key bytes.
func pubKeyAddr(bytes []byte) common.Address {
digest := crypto.Keccak256(bytes[1:])
var addr common.Address
copy(addr[:], digest[12:])
return addr
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment