Commit 4f5cd706 authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into feat/beta-1-artifacts

parents b41a4018 26b1d32e
......@@ -10,8 +10,8 @@ import (
"github.com/ethereum/go-ethereum/log"
)
type NextDataProvider interface {
NextData(ctx context.Context) ([]byte, error)
type NextFrameProvider interface {
NextFrame(ctx context.Context) (Frame, error)
Origin() eth.L1BlockRef
}
......@@ -34,14 +34,14 @@ type ChannelBank struct {
channels map[ChannelID]*Channel // channels by ID
channelQueue []ChannelID // channels in FIFO order
prev NextDataProvider
prev NextFrameProvider
fetcher L1Fetcher
}
var _ ResetableStage = (*ChannelBank)(nil)
// NewChannelBank creates a ChannelBank, which should be Reset(origin) before use.
func NewChannelBank(log log.Logger, cfg *rollup.Config, prev NextDataProvider, fetcher L1Fetcher) *ChannelBank {
func NewChannelBank(log log.Logger, cfg *rollup.Config, prev NextFrameProvider, fetcher L1Fetcher) *ChannelBank {
return &ChannelBank{
log: log,
cfg: cfg,
......@@ -73,22 +73,12 @@ func (cb *ChannelBank) prune() {
}
// IngestData adds new L1 data to the channel bank.
// Read() should be called repeatedly first, until everything has been read, before adding new data.\
func (cb *ChannelBank) IngestData(data []byte) {
// Read() should be called repeatedly first, until everything has been read, before adding new data.
func (cb *ChannelBank) IngestFrame(f Frame) {
origin := cb.Origin()
cb.log.Debug("channel bank got new data", "origin", origin, "data_len", len(data))
log := log.New("origin", origin, "channel", f.ID, "length", len(f.Data), "frame_number", f.FrameNumber)
log.Debug("channel bank got new data")
// TODO: Why is the prune here?
cb.prune()
frames, err := ParseFrames(data)
if err != nil {
cb.log.Warn("malformed frame", "err", err)
return
}
// Process each frame
for _, f := range frames {
currentCh, ok := cb.channels[f.ID]
if !ok {
// create new channel if it doesn't exist yet
......@@ -99,16 +89,18 @@ func (cb *ChannelBank) IngestData(data []byte) {
// check if the channel is not timed out
if currentCh.OpenBlockNumber()+cb.cfg.ChannelTimeout < origin.Number {
cb.log.Warn("channel is timed out, ignore frame", "channel", f.ID, "frame", f.FrameNumber)
continue
log.Warn("channel is timed out, ignore frame")
return
}
cb.log.Trace("ingesting frame", "channel", f.ID, "frame_number", f.FrameNumber, "length", len(f.Data))
log.Trace("ingesting frame")
if err := currentCh.AddFrame(f, origin); err != nil {
cb.log.Warn("failed to ingest frame into channel", "channel", f.ID, "frame_number", f.FrameNumber, "err", err)
continue
}
log.Warn("failed to ingest frame into channel", "err", err)
return
}
// Prune after the frame is loaded.
cb.prune()
}
// Read the raw data of the first channel, if it's timed-out or closed.
......@@ -156,12 +148,12 @@ func (cb *ChannelBank) NextData(ctx context.Context) ([]byte, error) {
}
// Then load data into the channel bank
if data, err := cb.prev.NextData(ctx); err == io.EOF {
if frame, err := cb.prev.NextFrame(ctx); err == io.EOF {
return nil, io.EOF
} else if err != nil {
return nil, err
} else {
cb.IngestData(data)
cb.IngestFrame(frame)
return nil, NotEnoughData
}
}
......
package derive
import (
"bytes"
"context"
"fmt"
"io"
"math/rand"
"strconv"
......@@ -21,7 +19,7 @@ import (
type fakeChannelBankInput struct {
origin eth.L1BlockRef
data []struct {
data []byte
frame Frame
err error
}
}
......@@ -30,34 +28,28 @@ func (f *fakeChannelBankInput) Origin() eth.L1BlockRef {
return f.origin
}
func (f *fakeChannelBankInput) NextData(_ context.Context) ([]byte, error) {
func (f *fakeChannelBankInput) NextFrame(_ context.Context) (Frame, error) {
out := f.data[0]
f.data = f.data[1:]
return out.data, out.err
return out.frame, out.err
}
func (f *fakeChannelBankInput) AddOutput(data []byte, err error) {
func (f *fakeChannelBankInput) AddFrame(frame Frame, err error) {
f.data = append(f.data, struct {
data []byte
frame Frame
err error
}{data: data, err: err})
}{frame: frame, err: err})
}
// ExpectNextFrameData takes a set of test frame & turns into the raw data
// for reading into the channel bank via `NextData`
func (f *fakeChannelBankInput) AddFrames(frames ...testFrame) {
data := new(bytes.Buffer)
data.WriteByte(DerivationVersion0)
for _, frame := range frames {
ff := frame.ToFrame()
if err := ff.MarshalBinary(data); err != nil {
panic(fmt.Errorf("error in making frame during test: %w", err))
f.AddFrame(frame.ToFrame(), nil)
}
}
f.AddOutput(data.Bytes(), nil)
}
var _ NextDataProvider = (*fakeChannelBankInput)(nil)
var _ NextFrameProvider = (*fakeChannelBankInput)(nil)
// format: <channelID-data>:<frame-number>:<content><optional-last-frame-marker "!">
// example: "abc:0:helloworld!"
......@@ -105,17 +97,22 @@ func TestChannelBankSimple(t *testing.T) {
input := &fakeChannelBankInput{origin: a}
input.AddFrames("a:0:first", "a:2:third!")
input.AddFrames("a:1:second")
input.AddOutput(nil, io.EOF)
input.AddFrame(Frame{}, io.EOF)
cfg := &rollup.Config{ChannelTimeout: 10}
cb := NewChannelBank(testlog.Logger(t, log.LvlCrit), cfg, input, nil)
// Load the first + third frame
// Load the first frame
out, err := cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load the third frame
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load the second frame
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
......@@ -140,21 +137,29 @@ func TestChannelBankDuplicates(t *testing.T) {
input.AddFrames("a:0:first", "a:2:third!")
input.AddFrames("a:0:altfirst", "a:2:altthird!")
input.AddFrames("a:1:second")
input.AddOutput(nil, io.EOF)
input.AddFrame(Frame{}, io.EOF)
cfg := &rollup.Config{ChannelTimeout: 10}
cb := NewChannelBank(testlog.Logger(t, log.LvlCrit), cfg, input, nil)
// Load the first + third frame
// Load the first frame
out, err := cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load the third frame
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load the duplicate frames
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
out, err = cb.NextData(context.Background())
require.ErrorIs(t, err, NotEnoughData)
require.Equal(t, []byte(nil), out)
// Load the second frame
out, err = cb.NextData(context.Background())
......
......@@ -9,7 +9,6 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
......@@ -404,21 +403,28 @@ func (eq *EngineQueue) forceNextSafeAttributes(ctx context.Context) error {
case BlockInsertPrestateErr:
return NewResetError(fmt.Errorf("need reset to resolve pre-state problem: %w", err))
case BlockInsertPayloadErr:
eq.log.Warn("could not process payload derived from L1 data", "err", err)
// filter everything but the deposits
var deposits []hexutil.Bytes
eq.log.Warn("could not process payload derived from L1 data, dropping batch", "err", err)
// Count the number of deposits to see if the tx list is deposit only.
depositCount := 0
for _, tx := range attrs.Transactions {
if len(tx) > 0 && tx[0] == types.DepositTxType {
deposits = append(deposits, tx)
depositCount += 1
}
}
if len(attrs.Transactions) > len(deposits) {
eq.log.Warn("dropping sequencer transactions from payload for re-attempt, batcher may have included invalid transactions",
"txs", len(attrs.Transactions), "deposits", len(deposits), "parent", eq.safeHead)
eq.safeAttributes[0].Transactions = deposits
return nil
}
// Deposit transaction execution errors are suppressed in the execution engine, but if the
// block is somehow invalid, there is nothing we can do to recover & we should exit.
// TODO: Can this be triggered by an empty batch with invalid data (like parent hash or gas limit?)
if len(attrs.Transactions) == depositCount {
eq.log.Error("deposit only block was invalid", "parent", eq.safeHead, "err", err)
return NewCriticalError(fmt.Errorf("failed to process block with only deposit transactions: %w", err))
}
// drop the payload without inserting it
eq.safeAttributes = eq.safeAttributes[1:]
// suppress the error b/c we want to retry with the next batch from the batch queue
// If there is no valid batch the node will eventually force a deposit only block. If
// the deposit only block fails, this will return the critical error above.
return nil
default:
return NewCriticalError(fmt.Errorf("unknown InsertHeadBlock error type %d: %w", errType, err))
}
......
package derive
import (
"context"
"io"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum/go-ethereum/log"
)
var _ NextFrameProvider = &FrameQueue{}
type NextDataProvider interface {
NextData(context.Context) ([]byte, error)
Origin() eth.L1BlockRef
}
type FrameQueue struct {
log log.Logger
frames []Frame
prev NextDataProvider
}
func NewFrameQueue(log log.Logger, prev NextDataProvider) *FrameQueue {
return &FrameQueue{
log: log,
prev: prev,
}
}
func (fq *FrameQueue) Origin() eth.L1BlockRef {
return fq.prev.Origin()
}
func (fq *FrameQueue) NextFrame(ctx context.Context) (Frame, error) {
// Find more frames if we need to
if len(fq.frames) == 0 {
if data, err := fq.prev.NextData(ctx); err != nil {
return Frame{}, err
} else {
if new, err := ParseFrames(data); err == nil {
fq.frames = append(fq.frames, new...)
} else {
fq.log.Warn("Failed to parse frames", "origin", fq.prev.Origin(), "err", err)
}
}
}
// If we did not add more frames but still have more data, retry this function.
if len(fq.frames) == 0 {
return Frame{}, NotEnoughData
}
ret := fq.frames[0]
fq.frames = fq.frames[1:]
return ret, nil
}
func (fq *FrameQueue) Reset(ctx context.Context, base eth.L1BlockRef) error {
fq.frames = fq.frames[:0]
return io.EOF
}
......@@ -67,7 +67,8 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch
l1Traversal := NewL1Traversal(log, l1Fetcher)
dataSrc := NewDataSourceFactory(log, cfg, l1Fetcher) // auxiliary stage for L1Retrieval
l1Src := NewL1Retrieval(log, dataSrc, l1Traversal)
bank := NewChannelBank(log, cfg, l1Src, l1Fetcher)
frameQueue := NewFrameQueue(log, l1Src)
bank := NewChannelBank(log, cfg, frameQueue, l1Fetcher)
chInReader := NewChannelInReader(log, bank)
batchQueue := NewBatchQueue(log, cfg, chInReader)
attributesQueue := NewAttributesQueue(log, cfg, l1Fetcher, batchQueue)
......
......@@ -16,7 +16,7 @@
"scripts": {
"build:forge": "forge build",
"build:differential": "tsc scripts/differential-testing.ts --outDir dist --moduleResolution node --esModuleInterop",
"prebuild": "yarn ts-node scripts/verifyFoundryInstall.ts",
"prebuild": "yarn ts-node scripts/verify-foundry-install.ts",
"build": "hardhat compile && yarn autogen:artifacts && yarn build:ts && yarn typechain",
"build:ts": "tsc -p tsconfig.json",
"autogen:artifacts": "ts-node scripts/generate-artifacts.ts",
......
/data/evm-messages.json
/data/slots.json
/data/evm-addresses.json
......@@ -14,6 +14,33 @@ program
.description('CLI for querying Bedrock migration data')
.version(version)
program
.command('parse-state-dump')
.description('parses state dump to json')
.option('--file <file>', 'path to state dump file')
.action(async (options) => {
const iface = getContractInterface('OVM_L2ToL1MessagePasser')
const dump = fs.readFileSync(options.file, 'utf-8')
const addrs: string[] = []
const msgs: any[] = []
for (const line of dump.split('\n')) {
if (line.startsWith('ETH')) {
addrs.push(line.split('|')[1].replace('\r', ''))
} else if (line.startsWith('MSG')) {
const msg = '0x' + line.split('|')[2].replace('\r', '')
const parsed = iface.decodeFunctionData('passMessageToL1', msg)
msgs.push({
who: line.split('|')[1],
msg: parsed._message,
})
}
}
fs.writeFileSync('./data/evm-addresses.json', JSON.stringify(addrs, null, 2))
fs.writeFileSync('./data/evm-messages.json', JSON.stringify(msgs, null, 2))
})
program
.command('evm-sent-messages')
.description('queries messages sent after the EVM upgrade')
......
......@@ -520,6 +520,14 @@ As currently implemented, each step in this stage performs the following actions
frame are discarded.
- Concatenate the data of the *contiguous frame sequence* (in sequential order) and push it to the next stage.
The ordering of these actions is very important to be consistent across nodes & pipeline resets. The rollup node
must attempt to do the following in order to maintain a consistent channel bank even in the presence of pruning.
1. Attempt to read as many channels as possible from the channel bank.
2. Load in a single frame
3. Check if channel bank needs to be pruned & do so if needed.
4. Go to step 1 once the channel bank is under it's size limit.
> **TODO** Instead of waiting on the first seen channel (which might not contain the oldest batches, meaning buffering
> further down the pipeline), we could process any channel in the queue that is ready. We could do this by checking for
> channel readiness upon writing into the bank, and moving ready channel to the front of the queue.
......@@ -537,8 +545,9 @@ During the *Batch Buffering* stage, we reorder batches by their timestamps. If b
slots][g-time-slot] and a valid batch with a higher timestamp exists, this stage also generates empty batches to fill
the gaps.
Batches are pushed to the next stage whenever there is one or more sequential batch(es) directly following the timestamp
Batches are pushed to the next stage whenever there is one sequential batch directly following the timestamp
of the current [safe L2 head][g-safe-l2-head] (the last block that can be derived from the canonical L1 chain).
The parent hash of the batch must also match the hash of the current safe L2 head.
Note that the presence of any gaps in the batches derived from L1 means that this stage will need to buffer for a whole
[sequencing window][g-sequencing-window] before it can generate empty batches (because the missing batch(es) could have
......@@ -645,6 +654,12 @@ If consolidation fails, the unsafe L2 head is reset to the safe L2 head.
If the safe and unsafe L2 heads are identical (whether because of failed consolidation or not), we send the block to the
execution engine to be converted into a proper L2 block, which will become both the new L2 safe and unsafe head.
If a payload attributes created from a batch cannot be inserted into the chain because of a validation error (i.e. there
was an invalid transaction or state transition in the block) the batch should be dropped & the safe head should not be
advanced. The engine queue will attempt to use the next batch for that timestamp from the batch queue. If no valid batch
is found, the rollup node will create a deposit only batch which should always pass validation because deposits are
always valid.
Interaction with the execution engine via the execution engine API is detailed in the [Communication with the Execution
Engine][exec-engine-comm] section.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment