Commit 773e476b authored by Axel Kingsley's avatar Axel Kingsley Committed by GitHub

op-supervisor: Include executing message info when storing logs. (#11369)

* Rebase: op-supervisor: Include executing message info when storing logs.

Takes from aj/parse-exec-msg and makes the following updates:
- uses upstream ABI definitions for identifier hash
- removes the core recording functionality for the moment
- fixes up inconsistent typing and merge conflicts due to rearranged packages

* Incorporate new ABI format

* remove trailing newline in contract

---------
Co-authored-by: default avatarAdrian Sutton <adrian@oplabs.co>
parent d098cf8c
package contracts
import (
"bytes"
"errors"
"fmt"
"io"
"math/big"
"github.com/ethereum-optimism/optimism/op-service/predeploys"
"github.com/ethereum-optimism/optimism/op-service/solabi"
"github.com/ethereum-optimism/optimism/op-service/sources/batching"
backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum-optimism/optimism/packages/contracts-bedrock/snapshots"
"github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
)
const (
eventExecutingMessage = "ExecutingMessage"
)
var (
ErrEventNotFound = errors.New("event not found")
)
type contractIdentifier struct {
// Origin represents the address that initiated the message
// it is used in combination with the MsgHash to uniquely identify a message
// and is hashed into the log hash, not stored directly.
Origin common.Address
LogIndex *big.Int
BlockNumber *big.Int
ChainId *big.Int
Timestamp *big.Int
}
type CrossL2Inbox struct {
contract *batching.BoundContract
}
func NewCrossL2Inbox() *CrossL2Inbox {
abi := snapshots.LoadCrossL2InboxABI()
return &CrossL2Inbox{
contract: batching.NewBoundContract(abi, predeploys.CrossL2InboxAddr),
}
}
func (i *CrossL2Inbox) DecodeExecutingMessageLog(l *ethTypes.Log) (backendTypes.ExecutingMessage, error) {
if l.Address != i.contract.Addr() {
return backendTypes.ExecutingMessage{}, fmt.Errorf("%w: log not from CrossL2Inbox", ErrEventNotFound)
}
// use DecodeEvent to check the name of the event
// but the actual decoding is done manually to extract the contract identifier
name, _, err := i.contract.DecodeEvent(l)
if errors.Is(err, batching.ErrUnknownEvent) {
return backendTypes.ExecutingMessage{}, fmt.Errorf("%w: %v", ErrEventNotFound, err.Error())
} else if err != nil {
return backendTypes.ExecutingMessage{}, fmt.Errorf("failed to decode event: %w", err)
}
if name != eventExecutingMessage {
return backendTypes.ExecutingMessage{}, fmt.Errorf("%w: event %v not an ExecutingMessage event", ErrEventNotFound, name)
}
// the second topic is the hash of the payload (the first is the event ID)
msgHash := l.Topics[1]
// the first 32 bytes of the data are the msgHash, so we skip them
identifierBytes := bytes.NewReader(l.Data[32:])
identifier, err := identifierFromBytes(identifierBytes)
if err != nil {
return backendTypes.ExecutingMessage{}, fmt.Errorf("failed to read contract identifier: %w", err)
}
chainID, err := types.ChainIDFromBig(identifier.ChainId).ToUInt32()
if err != nil {
return backendTypes.ExecutingMessage{}, fmt.Errorf("failed to convert chain ID %v to uint32: %w", identifier.ChainId, err)
}
hash := payloadHashToLogHash(msgHash, identifier.Origin)
return backendTypes.ExecutingMessage{
Chain: chainID,
Hash: hash,
BlockNum: identifier.BlockNumber.Uint64(),
LogIdx: uint32(identifier.LogIndex.Uint64()),
Timestamp: identifier.Timestamp.Uint64(),
}, nil
}
// identifierFromBytes reads a contract identifier from a byte stream.
// it follows the spec and matches the CrossL2Inbox.json definition,
// rather than relying on reflection, as that can be error-prone regarding struct ordering
func identifierFromBytes(identifierBytes io.Reader) (contractIdentifier, error) {
origin, err := solabi.ReadAddress(identifierBytes)
if err != nil {
return contractIdentifier{}, fmt.Errorf("failed to read origin address: %w", err)
}
originAddr := common.BytesToAddress(origin[:])
blockNumber, err := solabi.ReadUint256(identifierBytes)
if err != nil {
return contractIdentifier{}, fmt.Errorf("failed to read block number: %w", err)
}
logIndex, err := solabi.ReadUint256(identifierBytes)
if err != nil {
return contractIdentifier{}, fmt.Errorf("failed to read log index: %w", err)
}
timestamp, err := solabi.ReadUint256(identifierBytes)
if err != nil {
return contractIdentifier{}, fmt.Errorf("failed to read timestamp: %w", err)
}
chainID, err := solabi.ReadUint256(identifierBytes)
if err != nil {
return contractIdentifier{}, fmt.Errorf("failed to read chain ID: %w", err)
}
return contractIdentifier{
Origin: originAddr,
BlockNumber: blockNumber,
LogIndex: logIndex,
Timestamp: timestamp,
ChainId: chainID,
}, nil
}
// payloadHashToLogHash converts the payload hash to the log hash
// it is the concatenation of the log's address and the hash of the log's payload,
// which is then hashed again. This is the hash that is stored in the log storage.
// The logHash can then be used to traverse from the executing message
// to the log the referenced initiating message.
// TODO: this function is duplicated between contracts and backend/source/log_processor.go
// to avoid a circular dependency. It should be reorganized to avoid this duplication.
func payloadHashToLogHash(payloadHash common.Hash, addr common.Address) backendTypes.TruncatedHash {
msg := make([]byte, 0, 2*common.HashLength)
msg = append(msg, addr.Bytes()...)
msg = append(msg, payloadHash.Bytes()...)
return backendTypes.TruncateHash(crypto.Keccak256Hash(msg))
}
package contracts
import (
"bytes"
"math/big"
"testing"
"github.com/ethereum-optimism/optimism/op-service/predeploys"
"github.com/ethereum-optimism/optimism/op-service/sources/batching"
backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum-optimism/optimism/packages/contracts-bedrock/snapshots"
"github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/stretchr/testify/require"
)
func TestDecodeExecutingMessageEvent(t *testing.T) {
inbox := NewCrossL2Inbox()
payload := bytes.Repeat([]byte{0xaa, 0xbb}, 50)
payloadHash := crypto.Keccak256Hash(payload)
expected := backendTypes.ExecutingMessage{
Chain: 42424,
BlockNum: 12345,
LogIdx: 98,
Timestamp: 9578295,
}
contractIdent := contractIdentifier{
Origin: common.Address{0xbb, 0xcc},
ChainId: new(big.Int).SetUint64(uint64(expected.Chain)),
BlockNumber: new(big.Int).SetUint64(expected.BlockNum),
Timestamp: new(big.Int).SetUint64(expected.Timestamp),
LogIndex: new(big.Int).SetUint64(uint64(expected.LogIdx)),
}
expected.Hash = payloadHashToLogHash(payloadHash, contractIdent.Origin)
abi := snapshots.LoadCrossL2InboxABI()
validData, err := abi.Events[eventExecutingMessage].Inputs.Pack(payloadHash, contractIdent)
require.NoError(t, err)
createValidLog := func() *ethTypes.Log {
//protoHack := bytes.Repeat([]byte{0x00}, 32*5)
return &ethTypes.Log{
Address: predeploys.CrossL2InboxAddr,
Topics: []common.Hash{abi.Events[eventExecutingMessage].ID, payloadHash},
Data: validData,
}
}
t.Run("ParseValid", func(t *testing.T) {
l := createValidLog()
result, err := inbox.DecodeExecutingMessageLog(l)
require.NoError(t, err)
require.Equal(t, expected, result)
})
t.Run("IgnoreIncorrectContract", func(t *testing.T) {
l := createValidLog()
l.Address = common.Address{0xff}
_, err := inbox.DecodeExecutingMessageLog(l)
require.ErrorIs(t, err, ErrEventNotFound)
})
t.Run("IgnoreWrongEvent", func(t *testing.T) {
l := createValidLog()
l.Topics[0] = common.Hash{0xbb}
_, err := inbox.DecodeExecutingMessageLog(l)
require.ErrorIs(t, err, ErrEventNotFound)
})
t.Run("ErrorOnInvalidEvent", func(t *testing.T) {
l := createValidLog()
l.Data = []byte{0xbb, 0xcc}
_, err := inbox.DecodeExecutingMessageLog(l)
require.ErrorIs(t, err, batching.ErrInvalidEvent)
})
}
......@@ -2,10 +2,12 @@ package source
import (
"context"
"errors"
"fmt"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/source/contracts"
backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
supTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
......@@ -13,23 +15,46 @@ import (
)
type LogStorage interface {
AddLog(chain supTypes.ChainID, logHash types.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *types.ExecutingMessage) error
AddLog(chain supTypes.ChainID, logHash backendTypes.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *backendTypes.ExecutingMessage) error
}
type EventDecoder interface {
DecodeExecutingMessageLog(log *ethTypes.Log) (backendTypes.ExecutingMessage, error)
}
type logProcessor struct {
chain supTypes.ChainID
logStore LogStorage
chain supTypes.ChainID
logStore LogStorage
eventDecoder EventDecoder
}
func newLogProcessor(chain supTypes.ChainID, logStore LogStorage) *logProcessor {
return &logProcessor{chain: chain, logStore: logStore}
return &logProcessor{
chain: chain,
logStore: logStore,
eventDecoder: contracts.NewCrossL2Inbox(),
}
}
// ProcessLogs processes logs from a block and stores them in the log storage
// for any logs that are related to executing messages, they are decoded and stored
func (p *logProcessor) ProcessLogs(_ context.Context, block eth.L1BlockRef, rcpts ethTypes.Receipts) error {
for _, rcpt := range rcpts {
for _, l := range rcpt.Logs {
logHash := logToHash(l)
err := p.logStore.AddLog(p.chain, logHash, block.ID(), block.Time, uint32(l.Index), nil)
// log hash represents the hash of *this* log as a potentially initiating message
logHash := logToLogHash(l)
var execMsg *backendTypes.ExecutingMessage
msg, err := p.eventDecoder.DecodeExecutingMessageLog(l)
if err != nil && !errors.Is(err, contracts.ErrEventNotFound) {
return fmt.Errorf("failed to decode executing message log: %w", err)
} else if err == nil {
// if the log is an executing message, store the message
execMsg = &msg
}
// executing messages have multiple entries in the database
// they should start with the initiating message and then include the execution
fmt.Println("p.chain", p.chain)
err = p.logStore.AddLog(p.chain, logHash, block.ID(), block.Time, uint32(l.Index), execMsg)
if err != nil {
return fmt.Errorf("failed to add log %d from block %v: %w", l.Index, block.ID(), err)
}
......@@ -38,15 +63,20 @@ func (p *logProcessor) ProcessLogs(_ context.Context, block eth.L1BlockRef, rcpt
return nil
}
func logToHash(l *ethTypes.Log) types.TruncatedHash {
payloadHash := crypto.Keccak256(logToPayload(l))
msg := make([]byte, 0, 2*common.HashLength)
msg = append(msg, l.Address.Bytes()...)
msg = append(msg, payloadHash...)
return types.TruncateHash(crypto.Keccak256Hash(msg))
// logToLogHash transforms a log into a hash that represents the log.
// it is the concatenation of the log's address and the hash of the log's payload,
// which is then hashed again. This is the hash that is stored in the log storage.
// The address is hashed into the payload hash to save space in the log storage,
// and because they represent paired data.
func logToLogHash(l *ethTypes.Log) backendTypes.TruncatedHash {
payloadHash := crypto.Keccak256(logToMessagePayload(l))
return payloadHashToLogHash(common.Hash(payloadHash), l.Address)
}
func logToPayload(l *ethTypes.Log) []byte {
// logToMessagePayload is the data that is hashed to get the logHash
// it is the concatenation of the log's topics and data
// the implementation is based on the interop messaging spec
func logToMessagePayload(l *ethTypes.Log) []byte {
msg := make([]byte, 0)
for _, topic := range l.Topics {
msg = append(msg, topic.Bytes()...)
......@@ -54,3 +84,15 @@ func logToPayload(l *ethTypes.Log) []byte {
msg = append(msg, l.Data...)
return msg
}
// payloadHashToLogHash converts the payload hash to the log hash
// it is the concatenation of the log's address and the hash of the log's payload,
// which is then hashed. This is the hash that is stored in the log storage.
// The logHash can then be used to traverse from the executing message
// to the log the referenced initiating message.
func payloadHashToLogHash(payloadHash common.Hash, addr common.Address) backendTypes.TruncatedHash {
msg := make([]byte, 0, 2*common.HashLength)
msg = append(msg, addr.Bytes()...)
msg = append(msg, payloadHash.Bytes()...)
return backendTypes.TruncateHash(crypto.Keccak256Hash(msg))
}
......@@ -6,7 +6,8 @@ import (
"testing"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
"github.com/ethereum-optimism/optimism/op-service/predeploys"
backendTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/types"
supTypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
"github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
......@@ -63,26 +64,66 @@ func TestLogProcessor(t *testing.T) {
block: block1.ID(),
timestamp: block1.Time,
logIdx: 0,
logHash: logToHash(rcpts[0].Logs[0]),
logHash: logToLogHash(rcpts[0].Logs[0]),
execMsg: nil,
},
{
block: block1.ID(),
timestamp: block1.Time,
logIdx: 0,
logHash: logToHash(rcpts[0].Logs[1]),
logHash: logToLogHash(rcpts[0].Logs[1]),
execMsg: nil,
},
{
block: block1.ID(),
timestamp: block1.Time,
logIdx: 0,
logHash: logToHash(rcpts[1].Logs[0]),
logHash: logToLogHash(rcpts[1].Logs[0]),
execMsg: nil,
},
}
require.Equal(t, expected, store.logs)
})
t.Run("IncludeExecutingMessage", func(t *testing.T) {
rcpts := ethTypes.Receipts{
{
Logs: []*ethTypes.Log{
{
Address: predeploys.CrossL2InboxAddr,
Topics: []common.Hash{},
Data: []byte{0xff},
},
},
},
}
execMsg := backendTypes.ExecutingMessage{
Chain: 4,
BlockNum: 6,
LogIdx: 8,
Timestamp: 10,
Hash: backendTypes.TruncatedHash{0xaa},
}
store := &stubLogStorage{}
processor := newLogProcessor(supTypes.ChainID{4}, store)
processor.eventDecoder = EventDecoderFn(func(l *ethTypes.Log) (backendTypes.ExecutingMessage, error) {
require.Equal(t, rcpts[0].Logs[0], l)
return execMsg, nil
})
err := processor.ProcessLogs(ctx, block1, rcpts)
require.NoError(t, err)
expected := []storedLog{
{
block: block1.ID(),
timestamp: block1.Time,
logIdx: 0,
logHash: logToLogHash(rcpts[0].Logs[0]),
execMsg: &execMsg,
},
}
require.Equal(t, expected, store.logs)
})
}
func TestToLogHash(t *testing.T) {
......@@ -119,24 +160,24 @@ func TestToLogHash(t *testing.T) {
func(l *ethTypes.Log) { l.Index = 98 },
func(l *ethTypes.Log) { l.Removed = true },
}
refHash := logToHash(mkLog())
refHash := logToLogHash(mkLog())
// The log hash is stored in the database so test that it matches the actual value.
// If this changes compatibility with existing databases may be affected
expectedRefHash := types.TruncateHash(common.HexToHash("0x4e1dc08fddeb273275f787762cdfe945cf47bb4e80a1fabbc7a825801e81b73f"))
// If this changes, compatibility with existing databases may be affected
expectedRefHash := backendTypes.TruncateHash(common.HexToHash("0x4e1dc08fddeb273275f787762cdfe945cf47bb4e80a1fabbc7a825801e81b73f"))
require.Equal(t, expectedRefHash, refHash, "reference hash changed, check that database compatibility is not broken")
// Check that the hash is changed when any data it should include changes
for i, mod := range relevantMods {
l := mkLog()
mod(l)
hash := logToHash(l)
hash := logToLogHash(l)
require.NotEqualf(t, refHash, hash, "expected relevant modification %v to affect the hash but it did not", i)
}
// Check that the hash is not changed when any data it should not include changes
for i, mod := range irrelevantMods {
l := mkLog()
mod(l)
hash := logToHash(l)
hash := logToLogHash(l)
require.Equal(t, refHash, hash, "expected irrelevant modification %v to not affect the hash but it did", i)
}
}
......@@ -145,7 +186,7 @@ type stubLogStorage struct {
logs []storedLog
}
func (s *stubLogStorage) AddLog(chainID supTypes.ChainID, logHash types.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *types.ExecutingMessage) error {
func (s *stubLogStorage) AddLog(chainID supTypes.ChainID, logHash backendTypes.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *backendTypes.ExecutingMessage) error {
if logProcessorChainID != chainID {
return fmt.Errorf("chain id mismatch, expected %v but got %v", logProcessorChainID, chainID)
}
......@@ -163,6 +204,12 @@ type storedLog struct {
block eth.BlockID
timestamp uint64
logIdx uint32
logHash types.TruncatedHash
execMsg *types.ExecutingMessage
logHash backendTypes.TruncatedHash
execMsg *backendTypes.ExecutingMessage
}
type EventDecoderFn func(*ethTypes.Log) (backendTypes.ExecutingMessage, error)
func (f EventDecoderFn) DecodeExecutingMessageLog(log *ethTypes.Log) (backendTypes.ExecutingMessage, error) {
return f(log)
}
......@@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"math/big"
"github.com/holiman/uint256"
......@@ -17,7 +18,7 @@ type Identifier struct {
BlockNumber uint64
LogIndex uint64
Timestamp uint64
ChainID uint256.Int // flat, not a pointer, to make Identifier safe as map key
ChainID ChainID // flat, not a pointer, to make Identifier safe as map key
}
type identifierMarshaling struct {
......@@ -47,7 +48,7 @@ func (id *Identifier) UnmarshalJSON(input []byte) error {
id.BlockNumber = uint64(dec.BlockNumber)
id.LogIndex = uint64(dec.LogIndex)
id.Timestamp = uint64(dec.Timestamp)
id.ChainID = (uint256.Int)(dec.ChainID)
id.ChainID = (ChainID)(dec.ChainID)
return nil
}
......@@ -102,3 +103,15 @@ func ChainIDFromUInt64(i uint64) ChainID {
func (id ChainID) String() string {
return ((*uint256.Int)(&id)).Dec()
}
func (id ChainID) ToUInt32() (uint32, error) {
v := (*uint256.Int)(&id)
if !v.IsUint64() {
return 0, fmt.Errorf("ChainID too large for uint32: %v", id)
}
v64 := v.Uint64()
if v64 > math.MaxUint32 {
return 0, fmt.Errorf("ChainID too large for uint32: %v", id)
}
return uint32(v64), nil
}
......@@ -3,6 +3,7 @@ package types
import (
"encoding/json"
"math"
"math/big"
"testing"
"github.com/stretchr/testify/require"
......@@ -22,9 +23,8 @@ func FuzzRoundtripIdentifierJSONMarshal(f *testing.F) {
BlockNumber: blockNumber,
LogIndex: logIndex,
Timestamp: timestamp,
ChainID: uint256.Int{},
ChainID: ChainIDFromBig(new(big.Int).SetBytes(chainID)),
}
id.ChainID.SetBytes(chainID)
raw, err := json.Marshal(&id)
require.NoError(t, err)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment