Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
N
nebula
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
exchain
nebula
Commits
4667823d
Unverified
Commit
4667823d
authored
Aug 25, 2023
by
mergify[bot]
Committed by
GitHub
Aug 25, 2023
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'develop' into indexer.configurable_polling
parents
8f17a974
96562692
Changes
11
Hide whitespace changes
Inline
Side-by-side
Showing
11 changed files
with
748 additions
and
227 deletions
+748
-227
monitor.go
op-challenger/fault/monitor.go
+15
-56
monitor_test.go
op-challenger/fault/monitor_test.go
+20
-151
coordinator.go
op-challenger/fault/scheduler/coordinator.go
+143
-0
coordinator_test.go
op-challenger/fault/scheduler/coordinator_test.go
+290
-0
scheduler.go
op-challenger/fault/scheduler/scheduler.go
+89
-0
scheduler_test.go
op-challenger/fault/scheduler/scheduler_test.go
+70
-0
types.go
op-challenger/fault/scheduler/types.go
+22
-0
worker.go
op-challenger/fault/scheduler/worker.go
+22
-0
worker_test.go
op-challenger/fault/scheduler/worker_test.go
+59
-0
service.go
op-challenger/fault/service.go
+16
-16
filechan.go
op-program/io/filechan.go
+2
-4
No files found.
op-challenger/fault/monitor.go
View file @
4667823d
...
...
@@ -2,20 +2,17 @@ package fault
import
(
"context"
"errors"
"fmt"
"math/big"
"time"
"github.com/ethereum-optimism/optimism/op-challenger/fault/scheduler"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
type
gamePlayer
interface
{
ProgressGame
(
ctx
context
.
Context
)
bool
}
type
playerCreator
func
(
address
common
.
Address
,
dir
string
)
(
gamePlayer
,
error
)
type
blockNumberFetcher
func
(
ctx
context
.
Context
)
(
uint64
,
error
)
// gameSource loads information about the games available to play
...
...
@@ -23,43 +20,37 @@ type gameSource interface {
FetchAllGamesAtBlock
(
ctx
context
.
Context
,
earliest
uint64
,
blockNumber
*
big
.
Int
)
([]
FaultDisputeGame
,
error
)
}
type
gameDiskAllocator
interface
{
DirForGame
(
common
.
Address
)
string
RemoveAllExcept
([]
common
.
Address
)
error
type
gameScheduler
interface
{
Schedule
([]
common
.
Address
)
error
}
type
gameMonitor
struct
{
logger
log
.
Logger
clock
clock
.
Clock
diskManager
gameDiskAllocator
source
gameSource
scheduler
gameScheduler
gameWindow
time
.
Duration
createPlayer
playerCreator
fetchBlockNumber
blockNumberFetcher
allowedGames
[]
common
.
Address
players
map
[
common
.
Address
]
gamePlayer
}
func
newGameMonitor
(
logger
log
.
Logger
,
gameWindow
time
.
Duration
,
cl
clock
.
Clock
,
disk
gameDiskAllocator
,
source
gameSource
,
scheduler
gameScheduler
,
gameWindow
time
.
Duration
,
fetchBlockNumber
blockNumberFetcher
,
allowedGames
[]
common
.
Address
,
source
gameSource
,
createGame
playerCreator
,
)
*
gameMonitor
{
return
&
gameMonitor
{
logger
:
logger
,
clock
:
cl
,
diskManager
:
disk
,
scheduler
:
scheduler
,
source
:
source
,
gameWindow
:
gameWindow
,
createPlayer
:
createGame
,
fetchBlockNumber
:
fetchBlockNumber
,
allowedGames
:
allowedGames
,
players
:
make
(
map
[
common
.
Address
]
gamePlayer
),
}
}
...
...
@@ -92,54 +83,22 @@ func (m *gameMonitor) progressGames(ctx context.Context, blockNum uint64) error
if
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to load games: %w"
,
err
)
}
requiredGames
:=
make
(
map
[
common
.
Address
]
bool
)
var
keepGameData
[]
common
.
Address
var
gamesToPlay
[]
common
.
Address
for
_
,
game
:=
range
games
{
if
!
m
.
allowedGame
(
game
.
Proxy
)
{
m
.
logger
.
Debug
(
"Skipping game not on allow list"
,
"game"
,
game
.
Proxy
)
continue
}
requiredGames
[
game
.
Proxy
]
=
true
player
,
err
:=
m
.
fetchOrCreateGamePlayer
(
game
)
if
err
!=
nil
{
m
.
logger
.
Error
(
"Error while progressing game"
,
"game"
,
game
.
Proxy
,
"err"
,
err
)
continue
}
done
:=
player
.
ProgressGame
(
ctx
)
if
!
done
{
// We only keep resources on disk for games that are incomplete.
// Games that are complete have their data removed as soon as possible to save disk space.
// We keep the player in memory to avoid recreating it on every update but will no longer
// need the resources on disk because there are no further actions required on the game.
keepGameData
=
append
(
keepGameData
,
game
.
Proxy
)
}
gamesToPlay
=
append
(
gamesToPlay
,
game
.
Proxy
)
}
if
err
:=
m
.
diskManager
.
RemoveAllExcept
(
keepGameData
);
err
!=
nil
{
m
.
logger
.
Error
(
"Unable to cleanup game data"
,
"err"
,
err
)
}
// Remove the player for any game that's no longer being returned from the list of active games
for
addr
:=
range
m
.
players
{
if
_
,
ok
:=
requiredGames
[
addr
];
ok
{
// Game still required
continue
}
delete
(
m
.
players
,
addr
)
if
err
:=
m
.
scheduler
.
Schedule
(
gamesToPlay
);
errors
.
Is
(
err
,
scheduler
.
ErrBusy
)
{
m
.
logger
.
Info
(
"Scheduler still busy with previous update"
)
}
else
if
err
!=
nil
{
return
fmt
.
Errorf
(
"failed to schedule games: %w"
,
err
)
}
return
nil
}
func
(
m
*
gameMonitor
)
fetchOrCreateGamePlayer
(
gameData
FaultDisputeGame
)
(
gamePlayer
,
error
)
{
if
player
,
ok
:=
m
.
players
[
gameData
.
Proxy
];
ok
{
return
player
,
nil
}
player
,
err
:=
m
.
createPlayer
(
gameData
.
Proxy
,
m
.
diskManager
.
DirForGame
(
gameData
.
Proxy
))
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"failed to create game player %v: %w"
,
gameData
.
Proxy
,
err
)
}
m
.
players
[
gameData
.
Proxy
]
=
player
return
player
,
nil
}
func
(
m
*
gameMonitor
)
MonitorGames
(
ctx
context
.
Context
)
error
{
m
.
logger
.
Info
(
"Monitoring fault dispute games"
)
...
...
op-challenger/fault/monitor_test.go
View file @
4667823d
...
...
@@ -6,33 +6,31 @@ import (
"testing"
"time"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum-optimism/optimism/op-service/clock"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
"github.com/ethereum-optimism/optimism/op-node/testlog"
)
func
TestMonitorMinGameTimestamp
(
t
*
testing
.
T
)
{
t
.
Parallel
()
t
.
Run
(
"zero game window returns zero"
,
func
(
t
*
testing
.
T
)
{
monitor
,
_
,
_
,
_
:=
setupMonitorTest
(
t
,
[]
common
.
Address
{})
monitor
,
_
,
_
:=
setupMonitorTest
(
t
,
[]
common
.
Address
{})
monitor
.
gameWindow
=
time
.
Duration
(
0
)
require
.
Equal
(
t
,
monitor
.
minGameTimestamp
(),
uint64
(
0
))
})
t
.
Run
(
"non-zero game window with zero clock"
,
func
(
t
*
testing
.
T
)
{
monitor
,
_
,
_
,
_
:=
setupMonitorTest
(
t
,
[]
common
.
Address
{})
monitor
,
_
,
_
:=
setupMonitorTest
(
t
,
[]
common
.
Address
{})
monitor
.
gameWindow
=
time
.
Minute
monitor
.
clock
=
clock
.
NewDeterministicClock
(
time
.
Unix
(
0
,
0
))
require
.
Equal
(
t
,
monitor
.
minGameTimestamp
(),
uint64
(
0
))
})
t
.
Run
(
"minimum computed correctly"
,
func
(
t
*
testing
.
T
)
{
monitor
,
_
,
_
,
_
:=
setupMonitorTest
(
t
,
[]
common
.
Address
{})
monitor
,
_
,
_
:=
setupMonitorTest
(
t
,
[]
common
.
Address
{})
monitor
.
gameWindow
=
time
.
Minute
frozen
:=
time
.
Unix
(
int64
(
time
.
Hour
.
Seconds
()),
0
)
monitor
.
clock
=
clock
.
NewDeterministicClock
(
frozen
)
...
...
@@ -42,7 +40,7 @@ func TestMonitorMinGameTimestamp(t *testing.T) {
}
func
TestMonitorExitsWhenContextDone
(
t
*
testing
.
T
)
{
monitor
,
_
,
_
,
_
:=
setupMonitorTest
(
t
,
[]
common
.
Address
{{}})
monitor
,
_
,
_
:=
setupMonitorTest
(
t
,
[]
common
.
Address
{{}})
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
cancel
()
err
:=
monitor
.
MonitorGames
(
ctx
)
...
...
@@ -50,7 +48,7 @@ func TestMonitorExitsWhenContextDone(t *testing.T) {
}
func
TestMonitorCreateAndProgressGameAgents
(
t
*
testing
.
T
)
{
monitor
,
source
,
games
,
_
:=
setupMonitorTest
(
t
,
[]
common
.
Address
{})
monitor
,
source
,
sched
:=
setupMonitorTest
(
t
,
[]
common
.
Address
{})
addr1
:=
common
.
Address
{
0xaa
}
addr2
:=
common
.
Address
{
0xbb
}
...
...
@@ -67,22 +65,14 @@ func TestMonitorCreateAndProgressGameAgents(t *testing.T) {
require
.
NoError
(
t
,
monitor
.
progressGames
(
context
.
Background
(),
uint64
(
1
)))
require
.
Len
(
t
,
games
.
created
,
2
,
"should create game agents"
)
require
.
Contains
(
t
,
games
.
created
,
addr1
)
require
.
Contains
(
t
,
games
.
created
,
addr2
)
require
.
Equal
(
t
,
1
,
games
.
created
[
addr1
]
.
progressCount
)
require
.
Equal
(
t
,
1
,
games
.
created
[
addr2
]
.
progressCount
)
// The stub will fail the test if a game is created with the same address multiple times
require
.
NoError
(
t
,
monitor
.
progressGames
(
context
.
Background
(),
uint64
(
2
)),
"should only create games once"
)
require
.
Equal
(
t
,
2
,
games
.
created
[
addr1
]
.
progressCount
)
require
.
Equal
(
t
,
2
,
games
.
created
[
addr2
]
.
progressCount
)
require
.
Len
(
t
,
sched
.
scheduled
,
1
)
require
.
Equal
(
t
,
[]
common
.
Address
{
addr1
,
addr2
},
sched
.
scheduled
[
0
])
}
func
TestMonitorOnly
Creat
eSpecifiedGame
(
t
*
testing
.
T
)
{
func
TestMonitorOnly
Schedul
eSpecifiedGame
(
t
*
testing
.
T
)
{
addr1
:=
common
.
Address
{
0xaa
}
addr2
:=
common
.
Address
{
0xbb
}
monitor
,
source
,
games
,
_
:=
setupMonitorTest
(
t
,
[]
common
.
Address
{
addr2
})
monitor
,
source
,
sched
:=
setupMonitorTest
(
t
,
[]
common
.
Address
{
addr2
})
source
.
games
=
[]
FaultDisputeGame
{
{
...
...
@@ -97,104 +87,21 @@ func TestMonitorOnlyCreateSpecifiedGame(t *testing.T) {
require
.
NoError
(
t
,
monitor
.
progressGames
(
context
.
Background
(),
uint64
(
1
)))
require
.
Len
(
t
,
games
.
created
,
1
,
"should only create allowed game"
)
require
.
Contains
(
t
,
games
.
created
,
addr2
)
require
.
NotContains
(
t
,
games
.
created
,
addr1
)
require
.
Equal
(
t
,
1
,
games
.
created
[
addr2
]
.
progressCount
)
require
.
Len
(
t
,
sched
.
scheduled
,
1
)
require
.
Equal
(
t
,
[]
common
.
Address
{
addr2
},
sched
.
scheduled
[
0
])
}
func
TestDeletePlayersWhenNoLongerInListOfGames
(
t
*
testing
.
T
)
{
addr1
:=
common
.
Address
{
0xaa
}
addr2
:=
common
.
Address
{
0xbb
}
monitor
,
source
,
games
,
_
:=
setupMonitorTest
(
t
,
nil
)
allGames
:=
[]
FaultDisputeGame
{
{
Proxy
:
addr1
,
Timestamp
:
9999
,
},
{
Proxy
:
addr2
,
Timestamp
:
9999
,
},
}
source
.
games
=
allGames
require
.
NoError
(
t
,
monitor
.
progressGames
(
context
.
Background
(),
uint64
(
1
)))
require
.
Len
(
t
,
games
.
created
,
2
)
require
.
Contains
(
t
,
games
.
created
,
addr1
)
require
.
Contains
(
t
,
games
.
created
,
addr2
)
// First game is now old enough it's not returned in the list of active games
source
.
games
=
source
.
games
[
1
:
]
require
.
NoError
(
t
,
monitor
.
progressGames
(
context
.
Background
(),
uint64
(
2
)))
require
.
Len
(
t
,
games
.
created
,
2
)
require
.
Contains
(
t
,
games
.
created
,
addr1
)
require
.
Contains
(
t
,
games
.
created
,
addr2
)
// Forget that we created the first game so it can be recreated if needed
delete
(
games
.
created
,
addr1
)
// First game now reappears (inexplicably but usefully for our testing)
source
.
games
=
allGames
require
.
NoError
(
t
,
monitor
.
progressGames
(
context
.
Background
(),
uint64
(
3
)))
// A new player is created for it because the original was deleted
require
.
Len
(
t
,
games
.
created
,
2
)
require
.
Contains
(
t
,
games
.
created
,
addr1
)
require
.
Contains
(
t
,
games
.
created
,
addr2
)
require
.
Equal
(
t
,
1
,
games
.
created
[
addr1
]
.
progressCount
)
}
func
TestCleanupResourcesOfCompletedGames
(
t
*
testing
.
T
)
{
addr1
:=
common
.
Address
{
0xaa
}
addr2
:=
common
.
Address
{
0xbb
}
monitor
,
source
,
games
,
disk
:=
setupMonitorTest
(
t
,
[]
common
.
Address
{})
games
.
createCompleted
=
addr1
source
.
games
=
[]
FaultDisputeGame
{
{
Proxy
:
addr1
,
Timestamp
:
1999
,
},
{
Proxy
:
addr2
,
Timestamp
:
9999
,
},
}
err
:=
monitor
.
progressGames
(
context
.
Background
(),
uint64
(
1
))
require
.
NoError
(
t
,
err
)
require
.
Len
(
t
,
games
.
created
,
2
,
"should create game agents"
)
require
.
Contains
(
t
,
games
.
created
,
addr1
)
require
.
Contains
(
t
,
games
.
created
,
addr2
)
require
.
Equal
(
t
,
1
,
games
.
created
[
addr1
]
.
progressCount
)
require
.
Equal
(
t
,
1
,
games
.
created
[
addr2
]
.
progressCount
)
require
.
Contains
(
t
,
disk
.
gameDirExists
,
addr1
,
"should have allocated a game dir for game 1"
)
require
.
False
(
t
,
disk
.
gameDirExists
[
addr1
],
"should have then deleted the game 1 dir"
)
require
.
Contains
(
t
,
disk
.
gameDirExists
,
addr2
,
"should have allocated a game dir for game 2"
)
require
.
True
(
t
,
disk
.
gameDirExists
[
addr2
],
"should not have deleted the game 2 dir"
)
}
func
setupMonitorTest
(
t
*
testing
.
T
,
allowedGames
[]
common
.
Address
)
(
*
gameMonitor
,
*
stubGameSource
,
*
createdGames
,
*
stubDiskManager
)
{
func
setupMonitorTest
(
t
*
testing
.
T
,
allowedGames
[]
common
.
Address
)
(
*
gameMonitor
,
*
stubGameSource
,
*
stubScheduler
)
{
logger
:=
testlog
.
Logger
(
t
,
log
.
LvlDebug
)
source
:=
&
stubGameSource
{}
games
:=
&
createdGames
{
t
:
t
,
created
:
make
(
map
[
common
.
Address
]
*
stubGame
),
}
i
:=
uint64
(
1
)
fetchBlockNum
:=
func
(
ctx
context
.
Context
)
(
uint64
,
error
)
{
i
++
return
i
,
nil
}
disk
:=
&
stubDiskManager
{
gameDirExists
:
make
(
map
[
common
.
Address
]
bool
),
}
monitor
:=
newGameMonitor
(
logger
,
time
.
Duration
(
0
),
clock
.
SystemClock
,
disk
,
fetchBlockNum
,
allowedGames
,
source
,
games
.
CreateGame
)
return
monitor
,
source
,
games
,
disk
sched
:=
&
stubScheduler
{}
monitor
:=
newGameMonitor
(
logger
,
clock
.
SystemClock
,
source
,
sched
,
time
.
Duration
(
0
),
fetchBlockNum
,
allowedGames
)
return
monitor
,
source
,
sched
}
type
stubGameSource
struct
{
...
...
@@ -205,49 +112,11 @@ func (s *stubGameSource) FetchAllGamesAtBlock(ctx context.Context, earliest uint
return
s
.
games
,
nil
}
type
stubGame
struct
{
addr
common
.
Address
progressCount
int
done
bool
dir
string
type
stubScheduler
struct
{
scheduled
[][]
common
.
Address
}
func
(
g
*
stubGame
)
ProgressGame
(
ctx
context
.
Context
)
bool
{
g
.
progressCount
++
return
g
.
done
}
type
createdGames
struct
{
t
*
testing
.
T
createCompleted
common
.
Address
created
map
[
common
.
Address
]
*
stubGame
}
func
(
c
*
createdGames
)
CreateGame
(
addr
common
.
Address
,
dir
string
)
(
gamePlayer
,
error
)
{
if
_
,
exists
:=
c
.
created
[
addr
];
exists
{
c
.
t
.
Fatalf
(
"game %v already exists"
,
addr
)
}
game
:=
&
stubGame
{
addr
:
addr
,
done
:
addr
==
c
.
createCompleted
,
dir
:
dir
,
}
c
.
created
[
addr
]
=
game
return
game
,
nil
}
type
stubDiskManager
struct
{
gameDirExists
map
[
common
.
Address
]
bool
}
func
(
s
*
stubDiskManager
)
DirForGame
(
addr
common
.
Address
)
string
{
s
.
gameDirExists
[
addr
]
=
true
return
addr
.
Hex
()
}
func
(
s
*
stubDiskManager
)
RemoveAllExcept
(
addrs
[]
common
.
Address
)
error
{
for
address
:=
range
s
.
gameDirExists
{
s
.
gameDirExists
[
address
]
=
slices
.
Contains
(
addrs
,
address
)
}
func
(
s
*
stubScheduler
)
Schedule
(
games
[]
common
.
Address
)
error
{
s
.
scheduled
=
append
(
s
.
scheduled
,
games
)
return
nil
}
op-challenger/fault/scheduler/coordinator.go
0 → 100644
View file @
4667823d
package
scheduler
import
(
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"golang.org/x/exp/slices"
)
var
errUnknownGame
=
errors
.
New
(
"unknown game"
)
type
PlayerCreator
func
(
address
common
.
Address
,
dir
string
)
(
GamePlayer
,
error
)
type
gameState
struct
{
player
GamePlayer
inflight
bool
resolved
bool
}
// coordinator manages the set of current games, queues games to be played (on separate worker threads) and
// cleans up data files once a game is resolved.
// All function calls must be made on the same thread.
type
coordinator
struct
{
// jobQueue is the outgoing queue for jobs being sent to workers for progression
jobQueue
chan
<-
job
// resultQueue is the incoming queue of jobs that have been completed by workers
resultQueue
<-
chan
job
logger
log
.
Logger
createPlayer
PlayerCreator
states
map
[
common
.
Address
]
*
gameState
disk
DiskManager
}
// schedule takes the current list of games to attempt to progress, filters out games that have previous
// progressions already in-flight and schedules jobs to progress on the outbound jobQueue.
// To avoid deadlock, it may process results from the inbound resultQueue while adding jobs to the outbound jobQueue.
// Returns an error if a game couldn't be scheduled because of an error. It will continue attempting to progress
// all games even if an error occurs with one game.
func
(
c
*
coordinator
)
schedule
(
ctx
context
.
Context
,
games
[]
common
.
Address
)
error
{
// First remove any game states we no longer require
for
addr
,
state
:=
range
c
.
states
{
if
!
state
.
inflight
&&
!
slices
.
Contains
(
games
,
addr
)
{
delete
(
c
.
states
,
addr
)
}
}
var
errs
[]
error
// Next collect all the jobs to schedule and ensure all games are recorded in the states map.
// Otherwise, results may start being processed before all games are recorded, resulting in existing
// data directories potentially being deleted for games that are required.
var
jobs
[]
job
for
_
,
addr
:=
range
games
{
if
j
,
err
:=
c
.
createJob
(
addr
);
err
!=
nil
{
errs
=
append
(
errs
,
err
)
}
else
if
j
!=
nil
{
jobs
=
append
(
jobs
,
*
j
)
}
}
// Finally, enqueue the jobs
for
_
,
j
:=
range
jobs
{
errs
=
append
(
errs
,
c
.
enqueueJob
(
ctx
,
j
))
}
return
errors
.
Join
(
errs
...
)
}
// createJob updates the state for the specified game and returns the job to enqueue for it, if any
// Returns (nil, nil) when there is no error and no job to enqueue
func
(
c
*
coordinator
)
createJob
(
game
common
.
Address
)
(
*
job
,
error
)
{
state
,
ok
:=
c
.
states
[
game
]
if
!
ok
{
state
=
&
gameState
{}
c
.
states
[
game
]
=
state
}
if
state
.
inflight
{
c
.
logger
.
Debug
(
"Not rescheduling already in-flight game"
,
"game"
,
game
)
return
nil
,
nil
}
// Create the player separately to the state so we retry creating it if it fails on the first attempt.
if
state
.
player
==
nil
{
player
,
err
:=
c
.
createPlayer
(
game
,
c
.
disk
.
DirForGame
(
game
))
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"failed to create game player: %w"
,
err
)
}
state
.
player
=
player
}
state
.
inflight
=
true
return
&
job
{
addr
:
game
,
player
:
state
.
player
},
nil
}
func
(
c
*
coordinator
)
enqueueJob
(
ctx
context
.
Context
,
j
job
)
error
{
for
{
select
{
case
c
.
jobQueue
<-
j
:
return
nil
case
result
:=
<-
c
.
resultQueue
:
if
err
:=
c
.
processResult
(
result
);
err
!=
nil
{
c
.
logger
.
Error
(
"Failed to process result"
,
"err"
,
err
)
}
case
<-
ctx
.
Done
()
:
return
ctx
.
Err
()
}
}
}
func
(
c
*
coordinator
)
processResult
(
j
job
)
error
{
state
,
ok
:=
c
.
states
[
j
.
addr
]
if
!
ok
{
return
fmt
.
Errorf
(
"game %v received unexpected result: %w"
,
j
.
addr
,
errUnknownGame
)
}
state
.
inflight
=
false
state
.
resolved
=
j
.
resolved
c
.
deleteResolvedGameFiles
()
return
nil
}
func
(
c
*
coordinator
)
deleteResolvedGameFiles
()
{
var
keepGames
[]
common
.
Address
for
addr
,
state
:=
range
c
.
states
{
if
!
state
.
resolved
||
state
.
inflight
{
keepGames
=
append
(
keepGames
,
addr
)
}
}
if
err
:=
c
.
disk
.
RemoveAllExcept
(
keepGames
);
err
!=
nil
{
c
.
logger
.
Error
(
"Unable to cleanup game data"
,
"err"
,
err
)
}
}
func
newCoordinator
(
logger
log
.
Logger
,
jobQueue
chan
<-
job
,
resultQueue
<-
chan
job
,
createPlayer
PlayerCreator
,
disk
DiskManager
)
*
coordinator
{
return
&
coordinator
{
logger
:
logger
,
jobQueue
:
jobQueue
,
resultQueue
:
resultQueue
,
createPlayer
:
createPlayer
,
disk
:
disk
,
states
:
make
(
map
[
common
.
Address
]
*
gameState
),
}
}
op-challenger/fault/scheduler/coordinator_test.go
0 → 100644
View file @
4667823d
package
scheduler
import
(
"context"
"fmt"
"testing"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
)
func
TestScheduleNewGames
(
t
*
testing
.
T
)
{
c
,
workQueue
,
_
,
games
,
disk
:=
setupCoordinatorTest
(
t
,
10
)
gameAddr1
:=
common
.
Address
{
0xaa
}
gameAddr2
:=
common
.
Address
{
0xbb
}
gameAddr3
:=
common
.
Address
{
0xcc
}
ctx
:=
context
.
Background
()
require
.
NoError
(
t
,
c
.
schedule
(
ctx
,
[]
common
.
Address
{
gameAddr1
,
gameAddr2
,
gameAddr3
}))
require
.
Len
(
t
,
workQueue
,
3
,
"should schedule job for each game"
)
require
.
Len
(
t
,
games
.
created
,
3
,
"should have created players"
)
var
players
[]
GamePlayer
for
i
:=
0
;
i
<
len
(
games
.
created
);
i
++
{
j
:=
<-
workQueue
players
=
append
(
players
,
j
.
player
)
}
for
addr
,
player
:=
range
games
.
created
{
require
.
Equal
(
t
,
disk
.
DirForGame
(
addr
),
player
.
dir
,
"should use allocated directory"
)
require
.
Containsf
(
t
,
players
,
player
,
"should have created a job for player %v"
,
addr
)
}
}
func
TestSkipSchedulingInflightGames
(
t
*
testing
.
T
)
{
c
,
workQueue
,
_
,
_
,
_
:=
setupCoordinatorTest
(
t
,
10
)
gameAddr1
:=
common
.
Address
{
0xaa
}
ctx
:=
context
.
Background
()
// Schedule the game once
require
.
NoError
(
t
,
c
.
schedule
(
ctx
,
[]
common
.
Address
{
gameAddr1
}))
require
.
Len
(
t
,
workQueue
,
1
,
"should schedule game"
)
// And then attempt to schedule again
require
.
NoError
(
t
,
c
.
schedule
(
ctx
,
[]
common
.
Address
{
gameAddr1
}))
require
.
Len
(
t
,
workQueue
,
1
,
"should not reschedule in-flight game"
)
}
func
TestExitWhenContextDoneWhileSchedulingJob
(
t
*
testing
.
T
)
{
// No space in buffer to schedule a job
c
,
workQueue
,
_
,
_
,
_
:=
setupCoordinatorTest
(
t
,
0
)
gameAddr1
:=
common
.
Address
{
0xaa
}
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
cancel
()
// Context is cancelled
// Should not block because the context is done.
err
:=
c
.
schedule
(
ctx
,
[]
common
.
Address
{
gameAddr1
})
require
.
ErrorIs
(
t
,
err
,
context
.
Canceled
)
require
.
Empty
(
t
,
workQueue
,
"should not have been able to schedule game"
)
}
func
TestScheduleGameAgainAfterCompletion
(
t
*
testing
.
T
)
{
c
,
workQueue
,
_
,
_
,
_
:=
setupCoordinatorTest
(
t
,
10
)
gameAddr1
:=
common
.
Address
{
0xaa
}
ctx
:=
context
.
Background
()
// Schedule the game once
require
.
NoError
(
t
,
c
.
schedule
(
ctx
,
[]
common
.
Address
{
gameAddr1
}))
require
.
Len
(
t
,
workQueue
,
1
,
"should schedule game"
)
// Read the job
j
:=
<-
workQueue
require
.
Len
(
t
,
workQueue
,
0
)
// Process the result
require
.
NoError
(
t
,
c
.
processResult
(
j
))
// And then attempt to schedule again
require
.
NoError
(
t
,
c
.
schedule
(
ctx
,
[]
common
.
Address
{
gameAddr1
}))
require
.
Len
(
t
,
workQueue
,
1
,
"should reschedule completed game"
)
}
func
TestResultForUnknownGame
(
t
*
testing
.
T
)
{
c
,
_
,
_
,
_
,
_
:=
setupCoordinatorTest
(
t
,
10
)
err
:=
c
.
processResult
(
job
{
addr
:
common
.
Address
{
0xaa
}})
require
.
ErrorIs
(
t
,
err
,
errUnknownGame
)
}
func
TestProcessResultsWhileJobQueueFull
(
t
*
testing
.
T
)
{
c
,
workQueue
,
resultQueue
,
games
,
disk
:=
setupCoordinatorTest
(
t
,
0
)
gameAddr1
:=
common
.
Address
{
0xaa
}
gameAddr2
:=
common
.
Address
{
0xbb
}
gameAddr3
:=
common
.
Address
{
0xcc
}
ctx
:=
context
.
Background
()
// Create pre-existing data for all three games
disk
.
DirForGame
(
gameAddr1
)
disk
.
DirForGame
(
gameAddr2
)
disk
.
DirForGame
(
gameAddr3
)
resultsSent
:=
make
(
chan
any
)
go
func
()
{
defer
close
(
resultsSent
)
// Process three jobs then exit
for
i
:=
0
;
i
<
3
;
i
++
{
j
:=
<-
workQueue
resultQueue
<-
j
}
}()
// Even though work queue length is only 1, should be able to schedule all three games
// by reading and processing results
require
.
NoError
(
t
,
c
.
schedule
(
ctx
,
[]
common
.
Address
{
gameAddr1
,
gameAddr2
,
gameAddr3
}))
require
.
Len
(
t
,
games
.
created
,
3
,
"should have created 3 games"
)
loop
:
for
{
select
{
case
<-
resultQueue
:
// Drain any remaining results
case
<-
resultsSent
:
break
loop
}
}
// Check that pre-existing directories weren't deleted.
// This would fail if we start processing results before we've added all the required games to the state
require
.
Empty
(
t
,
disk
.
deletedDirs
,
"should not have deleted any directories"
)
}
func
TestDeleteDataForResolvedGames
(
t
*
testing
.
T
)
{
c
,
workQueue
,
_
,
_
,
disk
:=
setupCoordinatorTest
(
t
,
10
)
gameAddr1
:=
common
.
Address
{
0xaa
}
gameAddr2
:=
common
.
Address
{
0xbb
}
gameAddr3
:=
common
.
Address
{
0xcc
}
ctx
:=
context
.
Background
()
// First get game 3 marked as resolved
require
.
NoError
(
t
,
c
.
schedule
(
ctx
,
[]
common
.
Address
{
gameAddr3
}))
require
.
Len
(
t
,
workQueue
,
1
)
j
:=
<-
workQueue
j
.
resolved
=
true
require
.
NoError
(
t
,
c
.
processResult
(
j
))
// But ensure its data directory is marked as existing
disk
.
DirForGame
(
gameAddr3
)
gameAddrs
:=
[]
common
.
Address
{
gameAddr1
,
gameAddr2
,
gameAddr3
}
require
.
NoError
(
t
,
c
.
schedule
(
ctx
,
gameAddrs
))
require
.
Len
(
t
,
workQueue
,
len
(
gameAddrs
),
"should schedule all games"
)
// Game 1 progresses and is still in progress
// Game 2 progresses and is now resolved
// Game 3 hasn't yet progressed (update is still in flight)
for
i
:=
0
;
i
<
len
(
gameAddrs
)
-
1
;
i
++
{
j
:=
<-
workQueue
j
.
resolved
=
j
.
addr
==
gameAddr2
require
.
NoError
(
t
,
c
.
processResult
(
j
))
}
require
.
True
(
t
,
disk
.
gameDirExists
[
gameAddr1
],
"game 1 data should be preserved (not resolved)"
)
require
.
False
(
t
,
disk
.
gameDirExists
[
gameAddr2
],
"game 2 data should be deleted"
)
require
.
True
(
t
,
disk
.
gameDirExists
[
gameAddr3
],
"game 3 data should be preserved (inflight)"
)
}
func
TestDoNotDeleteDataForGameThatFailedToCreatePlayer
(
t
*
testing
.
T
)
{
c
,
workQueue
,
_
,
games
,
disk
:=
setupCoordinatorTest
(
t
,
10
)
gameAddr1
:=
common
.
Address
{
0xaa
}
gameAddr2
:=
common
.
Address
{
0xbb
}
ctx
:=
context
.
Background
()
games
.
creationFails
=
gameAddr1
gameAddrs
:=
[]
common
.
Address
{
gameAddr1
,
gameAddr2
}
err
:=
c
.
schedule
(
ctx
,
gameAddrs
)
require
.
Error
(
t
,
err
)
// Game 1 won't be scheduled because the player failed to be created
require
.
Len
(
t
,
workQueue
,
1
,
"should schedule game 2"
)
// Process game 2 result
require
.
NoError
(
t
,
c
.
processResult
(
<-
workQueue
))
require
.
True
(
t
,
disk
.
gameDirExists
[
gameAddr1
],
"game 1 data should be preserved"
)
require
.
True
(
t
,
disk
.
gameDirExists
[
gameAddr2
],
"game 2 data should be preserved"
)
// Should create player for game 1 next time its scheduled
games
.
creationFails
=
common
.
Address
{}
require
.
NoError
(
t
,
c
.
schedule
(
ctx
,
gameAddrs
))
require
.
Len
(
t
,
workQueue
,
len
(
gameAddrs
),
"should schedule all games"
)
j
:=
<-
workQueue
require
.
Equal
(
t
,
gameAddr1
,
j
.
addr
,
"first job should be for first game"
)
require
.
NotNil
(
t
,
j
.
player
,
"should have created player for game 1"
)
}
func
TestDropOldGameStates
(
t
*
testing
.
T
)
{
c
,
workQueue
,
_
,
_
,
_
:=
setupCoordinatorTest
(
t
,
10
)
gameAddr1
:=
common
.
Address
{
0xaa
}
gameAddr2
:=
common
.
Address
{
0xbb
}
gameAddr3
:=
common
.
Address
{
0xcc
}
gameAddr4
:=
common
.
Address
{
0xdd
}
ctx
:=
context
.
Background
()
// Start tracking game 1, 2 and 3
require
.
NoError
(
t
,
c
.
schedule
(
ctx
,
[]
common
.
Address
{
gameAddr1
,
gameAddr2
,
gameAddr3
}))
require
.
Len
(
t
,
workQueue
,
3
,
"should schedule games"
)
// Complete processing of games 1 and 2, leaving 3 in flight
require
.
NoError
(
t
,
c
.
processResult
(
<-
workQueue
))
require
.
NoError
(
t
,
c
.
processResult
(
<-
workQueue
))
// Next update only has games 2 and 4
require
.
NoError
(
t
,
c
.
schedule
(
ctx
,
[]
common
.
Address
{
gameAddr2
,
gameAddr4
}))
require
.
NotContains
(
t
,
c
.
states
,
gameAddr1
,
"should drop state for game 1"
)
require
.
Contains
(
t
,
c
.
states
,
gameAddr2
,
"should keep state for game 2 (still active)"
)
require
.
Contains
(
t
,
c
.
states
,
gameAddr3
,
"should keep state for game 3 (inflight)"
)
require
.
Contains
(
t
,
c
.
states
,
gameAddr4
,
"should create state for game 4"
)
}
func
setupCoordinatorTest
(
t
*
testing
.
T
,
bufferSize
int
)
(
*
coordinator
,
<-
chan
job
,
chan
job
,
*
createdGames
,
*
stubDiskManager
)
{
logger
:=
testlog
.
Logger
(
t
,
log
.
LvlInfo
)
workQueue
:=
make
(
chan
job
,
bufferSize
)
resultQueue
:=
make
(
chan
job
,
bufferSize
)
games
:=
&
createdGames
{
t
:
t
,
created
:
make
(
map
[
common
.
Address
]
*
stubGame
),
}
disk
:=
&
stubDiskManager
{
gameDirExists
:
make
(
map
[
common
.
Address
]
bool
)}
c
:=
newCoordinator
(
logger
,
workQueue
,
resultQueue
,
games
.
CreateGame
,
disk
)
return
c
,
workQueue
,
resultQueue
,
games
,
disk
}
type
stubGame
struct
{
addr
common
.
Address
progressCount
int
done
bool
dir
string
}
func
(
g
*
stubGame
)
ProgressGame
(
_
context
.
Context
)
bool
{
g
.
progressCount
++
return
g
.
done
}
type
createdGames
struct
{
t
*
testing
.
T
createCompleted
common
.
Address
creationFails
common
.
Address
created
map
[
common
.
Address
]
*
stubGame
}
func
(
c
*
createdGames
)
CreateGame
(
addr
common
.
Address
,
dir
string
)
(
GamePlayer
,
error
)
{
if
c
.
creationFails
==
addr
{
return
nil
,
fmt
.
Errorf
(
"refusing to create player for game: %v"
,
addr
)
}
if
_
,
exists
:=
c
.
created
[
addr
];
exists
{
c
.
t
.
Fatalf
(
"game %v already exists"
,
addr
)
}
game
:=
&
stubGame
{
addr
:
addr
,
done
:
addr
==
c
.
createCompleted
,
dir
:
dir
,
}
c
.
created
[
addr
]
=
game
return
game
,
nil
}
type
stubDiskManager
struct
{
gameDirExists
map
[
common
.
Address
]
bool
deletedDirs
[]
common
.
Address
}
func
(
s
*
stubDiskManager
)
DirForGame
(
addr
common
.
Address
)
string
{
s
.
gameDirExists
[
addr
]
=
true
return
addr
.
Hex
()
}
func
(
s
*
stubDiskManager
)
RemoveAllExcept
(
addrs
[]
common
.
Address
)
error
{
for
address
:=
range
s
.
gameDirExists
{
keep
:=
slices
.
Contains
(
addrs
,
address
)
s
.
gameDirExists
[
address
]
=
keep
if
!
keep
{
s
.
deletedDirs
=
append
(
s
.
deletedDirs
,
address
)
}
}
return
nil
}
op-challenger/fault/scheduler/scheduler.go
0 → 100644
View file @
4667823d
package
scheduler
import
(
"context"
"errors"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)
var
ErrBusy
=
errors
.
New
(
"busy scheduling previous update"
)
type
Scheduler
struct
{
logger
log
.
Logger
coordinator
*
coordinator
maxConcurrency
int
scheduleQueue
chan
[]
common
.
Address
jobQueue
chan
job
resultQueue
chan
job
wg
sync
.
WaitGroup
cancel
func
()
}
func
NewScheduler
(
logger
log
.
Logger
,
disk
DiskManager
,
maxConcurrency
int
,
createPlayer
PlayerCreator
)
*
Scheduler
{
// Size job and results queues to be fairly small so backpressure is applied early
// but with enough capacity to keep the workers busy
jobQueue
:=
make
(
chan
job
,
maxConcurrency
*
2
)
resultQueue
:=
make
(
chan
job
,
maxConcurrency
*
2
)
// scheduleQueue has a size of 1 so backpressure quickly propagates to the caller
// allowing them to potentially skip update cycles.
scheduleQueue
:=
make
(
chan
[]
common
.
Address
,
1
)
return
&
Scheduler
{
logger
:
logger
,
coordinator
:
newCoordinator
(
logger
,
jobQueue
,
resultQueue
,
createPlayer
,
disk
),
maxConcurrency
:
maxConcurrency
,
scheduleQueue
:
scheduleQueue
,
jobQueue
:
jobQueue
,
resultQueue
:
resultQueue
,
}
}
func
(
s
*
Scheduler
)
Start
(
ctx
context
.
Context
)
{
ctx
,
cancel
:=
context
.
WithCancel
(
ctx
)
s
.
cancel
=
cancel
for
i
:=
0
;
i
<
s
.
maxConcurrency
;
i
++
{
s
.
wg
.
Add
(
1
)
go
progressGames
(
ctx
,
s
.
jobQueue
,
s
.
resultQueue
,
&
s
.
wg
)
}
s
.
wg
.
Add
(
1
)
go
s
.
loop
(
ctx
)
}
func
(
s
*
Scheduler
)
Close
()
error
{
s
.
cancel
()
s
.
wg
.
Wait
()
return
nil
}
func
(
s
*
Scheduler
)
Schedule
(
games
[]
common
.
Address
)
error
{
select
{
case
s
.
scheduleQueue
<-
games
:
return
nil
default
:
return
ErrBusy
}
}
func
(
s
*
Scheduler
)
loop
(
ctx
context
.
Context
)
{
defer
s
.
wg
.
Done
()
for
{
select
{
case
<-
ctx
.
Done
()
:
return
case
games
:=
<-
s
.
scheduleQueue
:
if
err
:=
s
.
coordinator
.
schedule
(
ctx
,
games
);
err
!=
nil
{
s
.
logger
.
Error
(
"Failed to schedule game updates"
,
"games"
,
games
,
"err"
,
err
)
}
case
j
:=
<-
s
.
resultQueue
:
if
err
:=
s
.
coordinator
.
processResult
(
j
);
err
!=
nil
{
s
.
logger
.
Error
(
"Error while processing game result"
,
"game"
,
j
.
addr
,
"err"
,
err
)
}
}
}
}
op-challenger/fault/scheduler/scheduler_test.go
0 → 100644
View file @
4667823d
package
scheduler
import
(
"context"
"testing"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"
)
func
TestSchedulerProcessesGames
(
t
*
testing
.
T
)
{
logger
:=
testlog
.
Logger
(
t
,
log
.
LvlInfo
)
ctx
:=
context
.
Background
()
createPlayer
:=
func
(
addr
common
.
Address
,
dir
string
)
(
GamePlayer
,
error
)
{
return
&
stubPlayer
{},
nil
}
removeExceptCalls
:=
make
(
chan
[]
common
.
Address
)
disk
:=
&
trackingDiskManager
{
removeExceptCalls
:
removeExceptCalls
}
s
:=
NewScheduler
(
logger
,
disk
,
2
,
createPlayer
)
s
.
Start
(
ctx
)
gameAddr1
:=
common
.
Address
{
0xaa
}
gameAddr2
:=
common
.
Address
{
0xbb
}
gameAddr3
:=
common
.
Address
{
0xcc
}
games
:=
[]
common
.
Address
{
gameAddr1
,
gameAddr2
,
gameAddr3
}
require
.
NoError
(
t
,
s
.
Schedule
(
games
))
// All jobs should be executed and completed, the last step being to clean up disk resources
for
i
:=
0
;
i
<
len
(
games
);
i
++
{
kept
:=
<-
removeExceptCalls
require
.
Len
(
t
,
kept
,
len
(
games
),
"should keep all games"
)
for
_
,
game
:=
range
games
{
require
.
Containsf
(
t
,
kept
,
game
,
"should keep game %v"
,
game
)
}
}
require
.
NoError
(
t
,
s
.
Close
())
}
func
TestReturnBusyWhenScheduleQueueFull
(
t
*
testing
.
T
)
{
logger
:=
testlog
.
Logger
(
t
,
log
.
LvlInfo
)
createPlayer
:=
func
(
addr
common
.
Address
,
dir
string
)
(
GamePlayer
,
error
)
{
return
&
stubPlayer
{},
nil
}
removeExceptCalls
:=
make
(
chan
[]
common
.
Address
)
disk
:=
&
trackingDiskManager
{
removeExceptCalls
:
removeExceptCalls
}
s
:=
NewScheduler
(
logger
,
disk
,
2
,
createPlayer
)
// Scheduler not started - first call fills the queue
require
.
NoError
(
t
,
s
.
Schedule
([]
common
.
Address
{{
0xaa
}}))
// Second call should return busy
err
:=
s
.
Schedule
([]
common
.
Address
{{
0xaa
}})
require
.
ErrorIs
(
t
,
err
,
ErrBusy
)
}
type
trackingDiskManager
struct
{
removeExceptCalls
chan
[]
common
.
Address
}
func
(
t
*
trackingDiskManager
)
DirForGame
(
addr
common
.
Address
)
string
{
return
addr
.
Hex
()
}
func
(
t
*
trackingDiskManager
)
RemoveAllExcept
(
addrs
[]
common
.
Address
)
error
{
t
.
removeExceptCalls
<-
addrs
return
nil
}
op-challenger/fault/scheduler/types.go
0 → 100644
View file @
4667823d
package
scheduler
import
(
"context"
"github.com/ethereum/go-ethereum/common"
)
type
GamePlayer
interface
{
ProgressGame
(
ctx
context
.
Context
)
bool
}
type
DiskManager
interface
{
DirForGame
(
addr
common
.
Address
)
string
RemoveAllExcept
(
addrs
[]
common
.
Address
)
error
}
type
job
struct
{
addr
common
.
Address
player
GamePlayer
resolved
bool
}
op-challenger/fault/scheduler/worker.go
0 → 100644
View file @
4667823d
package
scheduler
import
(
"context"
"sync"
)
// progressGames accepts jobs from in channel, calls ProgressGame on the job.player and returns the job
// with updated job.resolved via the out channel.
// The loop exits when the ctx is done. wg.Done() is called when the function returns.
func
progressGames
(
ctx
context
.
Context
,
in
<-
chan
job
,
out
chan
<-
job
,
wg
*
sync
.
WaitGroup
)
{
defer
wg
.
Done
()
for
{
select
{
case
<-
ctx
.
Done
()
:
return
case
j
:=
<-
in
:
j
.
resolved
=
j
.
player
.
ProgressGame
(
ctx
)
out
<-
j
}
}
}
op-challenger/fault/scheduler/worker_test.go
0 → 100644
View file @
4667823d
package
scheduler
import
(
"context"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func
TestWorkerShouldProcessJobsUntilContextDone
(
t
*
testing
.
T
)
{
in
:=
make
(
chan
job
,
2
)
out
:=
make
(
chan
job
,
2
)
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
defer
cancel
()
var
wg
sync
.
WaitGroup
wg
.
Add
(
1
)
go
progressGames
(
ctx
,
in
,
out
,
&
wg
)
in
<-
job
{
player
:
&
stubPlayer
{
done
:
false
},
}
in
<-
job
{
player
:
&
stubPlayer
{
done
:
true
},
}
result1
:=
readWithTimeout
(
t
,
out
)
result2
:=
readWithTimeout
(
t
,
out
)
require
.
Equal
(
t
,
result1
.
resolved
,
false
)
require
.
Equal
(
t
,
result2
.
resolved
,
true
)
// Cancel the context which should exit the worker
cancel
()
wg
.
Wait
()
}
type
stubPlayer
struct
{
done
bool
}
func
(
s
*
stubPlayer
)
ProgressGame
(
ctx
context
.
Context
)
bool
{
return
s
.
done
}
func
readWithTimeout
[
T
any
](
t
*
testing
.
T
,
ch
<-
chan
T
)
T
{
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
10
*
time
.
Second
)
defer
cancel
()
select
{
case
<-
ctx
.
Done
()
:
var
val
T
t
.
Fatal
(
"Did not receive event from channel"
)
return
val
// Won't be reached but makes the compiler happy
case
val
:=
<-
ch
:
return
val
}
}
op-challenger/fault/service.go
View file @
4667823d
...
...
@@ -7,6 +7,7 @@ import (
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-challenger/config"
"github.com/ethereum-optimism/optimism/op-challenger/fault/scheduler"
"github.com/ethereum-optimism/optimism/op-challenger/fault/types"
"github.com/ethereum-optimism/optimism/op-challenger/metrics"
"github.com/ethereum-optimism/optimism/op-challenger/version"
...
...
@@ -19,24 +20,22 @@ import (
"github.com/ethereum/go-ethereum/log"
)
// Service exposes top-level fault dispute game challenger functionality.
type
Service
interface
{
// MonitorGame monitors the fault dispute game and attempts to progress it.
MonitorGame
(
context
.
Context
)
error
}
// TODO(CLI-4342): Make this a cli option
const
maxConcurrency
=
4
type
Loader
interface
{
FetchAbsolutePrestateHash
(
ctx
context
.
Context
)
([]
byte
,
error
)
}
type
s
ervice
struct
{
type
S
ervice
struct
{
logger
log
.
Logger
metrics
metrics
.
Metricer
monitor
*
gameMonitor
sched
*
scheduler
.
Scheduler
}
// NewService creates a new Service.
func
NewService
(
ctx
context
.
Context
,
logger
log
.
Logger
,
cfg
*
config
.
Config
)
(
*
s
ervice
,
error
)
{
func
NewService
(
ctx
context
.
Context
,
logger
log
.
Logger
,
cfg
*
config
.
Config
)
(
*
S
ervice
,
error
)
{
cl
:=
clock
.
SystemClock
m
:=
metrics
.
NewMetrics
()
txMgr
,
err
:=
txmgr
.
NewSimpleTxManager
(
"challenger"
,
logger
,
&
m
.
TxMetrics
,
cfg
.
TxMgrConfig
)
...
...
@@ -77,25 +76,24 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*se
loader
:=
NewGameLoader
(
factory
)
disk
:=
newDiskManager
(
cfg
.
Datadir
)
monitor
:=
newGameMonito
r
(
sched
:=
scheduler
.
NewSchedule
r
(
logger
,
cfg
.
GameWindow
,
cl
,
disk
,
client
.
BlockNumber
,
cfg
.
GameAllowlist
,
loader
,
func
(
addr
common
.
Address
,
dir
string
)
(
gamePlayer
,
error
)
{
maxConcurrency
,
func
(
addr
common
.
Address
,
dir
string
)
(
scheduler
.
GamePlayer
,
error
)
{
return
NewGamePlayer
(
ctx
,
logger
,
cfg
,
dir
,
addr
,
txMgr
,
client
)
})
monitor
:=
newGameMonitor
(
logger
,
cl
,
loader
,
sched
,
cfg
.
GameWindow
,
client
.
BlockNumber
,
cfg
.
GameAllowlist
)
m
.
RecordInfo
(
version
.
SimpleWithMeta
)
m
.
RecordUp
()
return
&
s
ervice
{
return
&
S
ervice
{
logger
:
logger
,
metrics
:
m
,
monitor
:
monitor
,
sched
:
sched
,
},
nil
}
...
...
@@ -117,6 +115,8 @@ func ValidateAbsolutePrestate(ctx context.Context, trace types.TraceProvider, lo
}
// MonitorGame monitors the fault dispute game and attempts to progress it.
func
(
s
*
service
)
MonitorGame
(
ctx
context
.
Context
)
error
{
func
(
s
*
Service
)
MonitorGame
(
ctx
context
.
Context
)
error
{
s
.
sched
.
Start
(
ctx
)
defer
s
.
sched
.
Close
()
return
s
.
monitor
.
MonitorGames
(
ctx
)
}
op-program/io/filechan.go
View file @
4667823d
package
io
import
(
"errors"
"io"
"os"
)
...
...
@@ -41,10 +42,7 @@ func (rw *ReadWritePair) Writer() *os.File {
}
func
(
rw
*
ReadWritePair
)
Close
()
error
{
if
err
:=
rw
.
r
.
Close
();
err
!=
nil
{
return
err
}
return
rw
.
w
.
Close
()
return
errors
.
Join
(
rw
.
r
.
Close
(),
rw
.
w
.
Close
())
}
// CreateBidirectionalChannel creates a pair of FileChannels that are connected to each other.
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment