Commit f269f3f3 authored by Adrian Sutton's avatar Adrian Sutton Committed by GitHub

op-supervisor: Write logs to logdb (#11046)

Executing messages aren't extracted yet but basic log information is hashed and written.
parent 49647f2b
...@@ -65,7 +65,7 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg ...@@ -65,7 +65,7 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to 'Rewind' the database: %w", err) return nil, fmt.Errorf("failed to 'Rewind' the database: %w", err)
} }
monitor, err := source.NewChainMonitor(ctx, logger, cm, chainID, rpc, rpcClient, block) monitor, err := source.NewChainMonitor(ctx, logger, cm, chainID, rpc, rpcClient, logDB, block)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create monitor for rpc %v: %w", rpc, err) return nil, fmt.Errorf("failed to create monitor for rpc %v: %w", rpc, err)
} }
......
package db package db
import ( import (
"encoding/hex"
"errors" "errors"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
...@@ -20,6 +21,10 @@ func TruncateHash(hash common.Hash) TruncatedHash { ...@@ -20,6 +21,10 @@ func TruncateHash(hash common.Hash) TruncatedHash {
return truncated return truncated
} }
func (h TruncatedHash) String() string {
return hex.EncodeToString(h[:])
}
type ExecutingMessage struct { type ExecutingMessage struct {
Chain uint32 Chain uint32
BlockNum uint64 BlockNum uint64
......
...@@ -10,7 +10,6 @@ import ( ...@@ -10,7 +10,6 @@ import (
"github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/sources/caching" "github.com/ethereum-optimism/optimism/op-service/sources/caching"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
ethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
...@@ -31,7 +30,7 @@ type ChainMonitor struct { ...@@ -31,7 +30,7 @@ type ChainMonitor struct {
headMonitor *HeadMonitor headMonitor *HeadMonitor
} }
func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID types.ChainID, rpc string, client client.RPC, block uint64) (*ChainMonitor, error) { func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID types.ChainID, rpc string, client client.RPC, store LogStorage, block uint64) (*ChainMonitor, error) {
logger = logger.New("chainID", chainID) logger = logger.New("chainID", chainID)
cl, err := newClient(ctx, logger, m, rpc, client, pollInterval, trustRpc, rpcKind) cl, err := newClient(ctx, logger, m, rpc, client, pollInterval, trustRpc, rpcKind)
if err != nil { if err != nil {
...@@ -42,7 +41,8 @@ func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID ...@@ -42,7 +41,8 @@ func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID
Number: block, Number: block,
} }
fetchReceipts := newLogFetcher(cl, &loggingReceiptProcessor{logger}) processLogs := newLogProcessor(store)
fetchReceipts := newLogFetcher(cl, processLogs)
unsafeBlockProcessor := NewChainProcessor(logger, cl, startingHead, fetchReceipts) unsafeBlockProcessor := NewChainProcessor(logger, cl, startingHead, fetchReceipts)
unsafeProcessors := []HeadProcessor{unsafeBlockProcessor} unsafeProcessors := []HeadProcessor{unsafeBlockProcessor}
...@@ -64,15 +64,6 @@ func (c *ChainMonitor) Stop() error { ...@@ -64,15 +64,6 @@ func (c *ChainMonitor) Stop() error {
return c.headMonitor.Stop() return c.headMonitor.Stop()
} }
type loggingReceiptProcessor struct {
log log.Logger
}
func (n *loggingReceiptProcessor) ProcessLogs(_ context.Context, block eth.L1BlockRef, rcpts ethTypes.Receipts) error {
n.log.Info("Process unsafe block", "block", block, "rcpts", len(rcpts))
return nil
}
func newClient(ctx context.Context, logger log.Logger, m caching.Metrics, rpc string, rpcClient client.RPC, pollRate time.Duration, trustRPC bool, kind sources.RPCProviderKind) (*sources.L1Client, error) { func newClient(ctx context.Context, logger log.Logger, m caching.Metrics, rpc string, rpcClient client.RPC, pollRate time.Duration, trustRPC bool, kind sources.RPCProviderKind) (*sources.L1Client, error) {
c, err := client.NewRPCWithClient(ctx, logger, rpc, rpcClient, pollRate) c, err := client.NewRPCWithClient(ctx, logger, rpc, rpcClient, pollRate)
if err != nil { if err != nil {
......
package source
import (
"context"
"fmt"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
)
type LogStorage interface {
AddLog(logHash db.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *db.ExecutingMessage) error
}
type logProcessor struct {
logStore LogStorage
}
func newLogProcessor(logStore LogStorage) *logProcessor {
return &logProcessor{logStore}
}
func (p *logProcessor) ProcessLogs(_ context.Context, block eth.L1BlockRef, rcpts types.Receipts) error {
for _, rcpt := range rcpts {
for _, l := range rcpt.Logs {
logHash := logToHash(l)
err := p.logStore.AddLog(logHash, block.ID(), block.Time, uint32(l.Index), nil)
if err != nil {
// TODO(optimism#11044): Need to roll back to the start of the block....
return fmt.Errorf("failed to add log %d from block %v: %w", l.Index, block.ID(), err)
}
}
}
return nil
}
func logToHash(l *types.Log) db.TruncatedHash {
payloadHash := crypto.Keccak256(logToPayload(l))
msg := make([]byte, 0, 2*common.HashLength)
msg = append(msg, l.Address.Bytes()...)
msg = append(msg, payloadHash...)
return db.TruncateHash(crypto.Keccak256Hash(msg))
}
func logToPayload(l *types.Log) []byte {
msg := make([]byte, 0)
for _, topic := range l.Topics {
msg = append(msg, topic.Bytes()...)
}
msg = append(msg, l.Data...)
return msg
}
package source
import (
"context"
"testing"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"
)
func TestLogProcessor(t *testing.T) {
ctx := context.Background()
block1 := eth.L1BlockRef{Number: 100, Hash: common.Hash{0x11}, Time: 1111}
t.Run("NoOutputWhenLogsAreEmpty", func(t *testing.T) {
store := &stubLogStorage{}
processor := newLogProcessor(store)
err := processor.ProcessLogs(ctx, block1, types.Receipts{})
require.NoError(t, err)
require.Empty(t, store.logs)
})
t.Run("OutputLogs", func(t *testing.T) {
rcpts := types.Receipts{
{
Logs: []*types.Log{
{
Address: common.Address{0x11},
Topics: []common.Hash{{0xaa}},
Data: []byte{0xbb},
},
{
Address: common.Address{0x22},
Topics: []common.Hash{{0xcc}},
Data: []byte{0xdd},
},
},
},
{
Logs: []*types.Log{
{
Address: common.Address{0x33},
Topics: []common.Hash{{0xee}},
Data: []byte{0xff},
},
},
},
}
store := &stubLogStorage{}
processor := newLogProcessor(store)
err := processor.ProcessLogs(ctx, block1, rcpts)
require.NoError(t, err)
expected := []storedLog{
{
block: block1.ID(),
timestamp: block1.Time,
logIdx: 0,
logHash: logToHash(rcpts[0].Logs[0]),
execMsg: nil,
},
{
block: block1.ID(),
timestamp: block1.Time,
logIdx: 0,
logHash: logToHash(rcpts[0].Logs[1]),
execMsg: nil,
},
{
block: block1.ID(),
timestamp: block1.Time,
logIdx: 0,
logHash: logToHash(rcpts[1].Logs[0]),
execMsg: nil,
},
}
require.Equal(t, expected, store.logs)
})
}
func TestToLogHash(t *testing.T) {
mkLog := func() *types.Log {
return &types.Log{
Address: common.Address{0xaa, 0xbb},
Topics: []common.Hash{
{0xcc},
{0xdd},
},
Data: []byte{0xee, 0xff, 0x00},
BlockNumber: 12345,
TxHash: common.Hash{0x11, 0x22, 0x33},
TxIndex: 4,
BlockHash: common.Hash{0x44, 0x55},
Index: 8,
Removed: false,
}
}
relevantMods := []func(l *types.Log){
func(l *types.Log) { l.Address = common.Address{0xab, 0xcd} },
func(l *types.Log) { l.Topics = append(l.Topics, common.Hash{0x12, 0x34}) },
func(l *types.Log) { l.Topics = l.Topics[:len(l.Topics)-1] },
func(l *types.Log) { l.Topics[0] = common.Hash{0x12, 0x34} },
func(l *types.Log) { l.Data = append(l.Data, 0x56) },
func(l *types.Log) { l.Data = l.Data[:len(l.Data)-1] },
func(l *types.Log) { l.Data[0] = 0x45 },
}
irrelevantMods := []func(l *types.Log){
func(l *types.Log) { l.BlockNumber = 987 },
func(l *types.Log) { l.TxHash = common.Hash{0xab, 0xcd} },
func(l *types.Log) { l.TxIndex = 99 },
func(l *types.Log) { l.BlockHash = common.Hash{0xab, 0xcd} },
func(l *types.Log) { l.Index = 98 },
func(l *types.Log) { l.Removed = true },
}
refHash := logToHash(mkLog())
// The log hash is stored in the database so test that it matches the actual value.
// If this changes compatibility with existing databases may be affected
expectedRefHash := db.TruncateHash(common.HexToHash("0x4e1dc08fddeb273275f787762cdfe945cf47bb4e80a1fabbc7a825801e81b73f"))
require.Equal(t, expectedRefHash, refHash, "reference hash changed, check that database compatibility is not broken")
// Check that the hash is changed when any data it should include changes
for i, mod := range relevantMods {
l := mkLog()
mod(l)
hash := logToHash(l)
require.NotEqualf(t, refHash, hash, "expected relevant modification %v to affect the hash but it did not", i)
}
// Check that the hash is not changed when any data it should not include changes
for i, mod := range irrelevantMods {
l := mkLog()
mod(l)
hash := logToHash(l)
require.Equal(t, refHash, hash, "expected irrelevant modification %v to not affect the hash but it did", i)
}
}
type stubLogStorage struct {
logs []storedLog
}
func (s *stubLogStorage) AddLog(logHash db.TruncatedHash, block eth.BlockID, timestamp uint64, logIdx uint32, execMsg *db.ExecutingMessage) error {
s.logs = append(s.logs, storedLog{
block: block,
timestamp: timestamp,
logIdx: logIdx,
logHash: logHash,
execMsg: execMsg,
})
return nil
}
type storedLog struct {
block eth.BlockID
timestamp uint64
logIdx uint32
logHash db.TruncatedHash
execMsg *db.ExecutingMessage
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment