Commit 0c86d9ca authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into refactor/invariants-burn

parents 02623baa c5405c67
package challenger
import (
"context"
"sync"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-service/backoff"
)
// logStore manages log subscriptions.
type logStore struct {
// The log filter query
query ethereum.FilterQuery
// core sync mutex for log store
// this locks the entire log store
mu sync.Mutex
logList []types.Log
logMap map[common.Hash][]types.Log
// Log subscription
subscription *Subscription
// Client to query for logs
client ethereum.LogFilterer
// Logger
log log.Logger
}
// NewLogStore creates a new log store.
func NewLogStore(query ethereum.FilterQuery, client ethereum.LogFilterer, log log.Logger) *logStore {
return &logStore{
query: query,
mu: sync.Mutex{},
logList: make([]types.Log, 0),
logMap: make(map[common.Hash][]types.Log),
subscription: NewSubscription(query, client, log),
client: client,
log: log,
}
}
// Subscribed returns true if the subscription has started.
func (l *logStore) Subscribed() bool {
return l.subscription.Started()
}
// Query returns the log filter query.
func (l *logStore) Query() ethereum.FilterQuery {
return l.query
}
// Client returns the log filter client.
func (l *logStore) Client() ethereum.LogFilterer {
return l.client
}
// GetLogs returns all logs in the log store.
func (l *logStore) GetLogs() []types.Log {
l.mu.Lock()
defer l.mu.Unlock()
logs := make([]types.Log, len(l.logList))
copy(logs, l.logList)
return logs
}
// GetLogByBlockHash returns all logs in the log store for a given block hash.
func (l *logStore) GetLogByBlockHash(blockHash common.Hash) []types.Log {
l.mu.Lock()
defer l.mu.Unlock()
logs := make([]types.Log, len(l.logMap[blockHash]))
copy(logs, l.logMap[blockHash])
return logs
}
// Subscribe starts the subscription.
// This function spawns a new goroutine.
func (l *logStore) Subscribe(ctx context.Context) error {
err := l.subscription.Subscribe()
if err != nil {
l.log.Error("failed to subscribe", "err", err)
return err
}
go l.dispatchLogs(ctx)
return nil
}
// Quit stops all log store asynchronous tasks.
func (l *logStore) Quit() {
l.subscription.Quit()
}
// buildBackoffStrategy builds a [backoff.Strategy].
func (l *logStore) buildBackoffStrategy() backoff.Strategy {
return &backoff.ExponentialStrategy{
Min: 1000,
Max: 20_000,
MaxJitter: 250,
}
}
// resubscribe attempts to re-establish the log store internal
// subscription with a backoff strategy.
func (l *logStore) resubscribe(ctx context.Context) error {
l.log.Info("log store resubscribing with backoff")
backoffStrategy := l.buildBackoffStrategy()
return backoff.DoCtx(ctx, 10, backoffStrategy, func() error {
if l.subscription == nil {
l.log.Error("subscription zeroed out")
return nil
}
err := l.subscription.Subscribe()
if err == nil {
l.log.Info("subscription reconnected", "id", l.subscription.ID())
}
return err
})
}
// insertLog inserts a log into the log store.
func (l *logStore) insertLog(log types.Log) {
l.mu.Lock()
l.logList = append(l.logList, log)
l.logMap[log.BlockHash] = append(l.logMap[log.BlockHash], log)
l.mu.Unlock()
}
// dispatchLogs dispatches logs to the log store.
// This function is intended to be run as a goroutine.
func (l *logStore) dispatchLogs(ctx context.Context) {
for {
select {
case err := <-l.subscription.sub.Err():
l.log.Error("log subscription error", "err", err)
for {
err = l.resubscribe(ctx)
if err == nil {
break
}
}
case log := <-l.subscription.logs:
l.insertLog(log)
case <-l.subscription.quit:
l.log.Info("received quit signal from subscription", "id", l.subscription.ID())
return
}
}
}
package challenger
import (
"context"
"errors"
"testing"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/stretchr/testify/require"
)
type mockLogStoreClient struct {
sub mockSubscription
logs chan<- types.Log
subcount int
}
func newMockLogStoreClient() *mockLogStoreClient {
return &mockLogStoreClient{
sub: mockSubscription{
errorChan: make(chan error),
},
}
}
func (m *mockLogStoreClient) FilterLogs(context.Context, ethereum.FilterQuery) ([]types.Log, error) {
panic("this should not be called by the Subscription.Subscribe method")
}
func (m *mockLogStoreClient) SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, logs chan<- types.Log) (ethereum.Subscription, error) {
m.subcount = m.subcount + 1
m.logs = logs
return m.sub, nil
}
var (
ErrTestError = errors.New("test error")
)
// errLogStoreClient implements the [ethereum.LogFilter] interface for testing.
type errLogStoreClient struct{}
func (m errLogStoreClient) FilterLogs(context.Context, ethereum.FilterQuery) ([]types.Log, error) {
panic("this should not be called by the Subscription.Subscribe method")
}
func (m errLogStoreClient) SubscribeFilterLogs(context.Context, ethereum.FilterQuery, chan<- types.Log) (ethereum.Subscription, error) {
return nil, ErrTestError
}
type mockSubscription struct {
errorChan chan error
}
func (m mockSubscription) Err() <-chan error {
return m.errorChan
}
func (m mockSubscription) Unsubscribe() {}
func newLogStore(t *testing.T) (*logStore, *mockLogStoreClient) {
query := ethereum.FilterQuery{}
client := newMockLogStoreClient()
log := testlog.Logger(t, log.LvlError)
return NewLogStore(query, client, log), client
}
func newErrorLogStore(t *testing.T, client *errLogStoreClient) (*logStore, *errLogStoreClient) {
query := ethereum.FilterQuery{}
log := testlog.Logger(t, log.LvlError)
return NewLogStore(query, client, log), client
}
func TestLogStore_NewLogStore_NotSubscribed(t *testing.T) {
logStore, _ := newLogStore(t)
require.False(t, logStore.Subscribed())
}
func TestLogStore_NewLogStore_EmptyLogs(t *testing.T) {
logStore, _ := newLogStore(t)
require.Empty(t, logStore.GetLogs())
require.Empty(t, logStore.GetLogByBlockHash(common.Hash{}))
}
func TestLogStore_Subscribe_EstablishesSubscription(t *testing.T) {
logStore, client := newLogStore(t)
defer logStore.Quit()
require.Equal(t, 0, client.subcount)
require.False(t, logStore.Subscribed())
require.NoError(t, logStore.Subscribe(context.Background()))
require.True(t, logStore.Subscribed())
require.Equal(t, 1, client.subcount)
}
func TestLogStore_Subscribe_ReceivesLogs(t *testing.T) {
logStore, client := newLogStore(t)
defer logStore.Quit()
require.NoError(t, logStore.Subscribe(context.Background()))
mockLog := types.Log{
BlockHash: common.HexToHash("0x1"),
}
client.logs <- mockLog
timeout, tCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer tCancel()
err := e2eutils.WaitFor(timeout, 500*time.Millisecond, func() (bool, error) {
result := logStore.GetLogByBlockHash(mockLog.BlockHash)
return result[0].BlockHash == mockLog.BlockHash, nil
})
require.NoError(t, err)
}
func TestLogStore_Subscribe_SubscriptionErrors(t *testing.T) {
logStore, client := newLogStore(t)
defer logStore.Quit()
require.NoError(t, logStore.Subscribe(context.Background()))
client.sub.errorChan <- ErrTestError
timeout, tCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer tCancel()
err := e2eutils.WaitFor(timeout, 500*time.Millisecond, func() (bool, error) {
subcount := client.subcount == 2
started := logStore.subscription.Started()
return subcount && started, nil
})
require.NoError(t, err)
}
func TestLogStore_Subscribe_NoClient_Panics(t *testing.T) {
require.Panics(t, func() {
logStore, _ := newErrorLogStore(t, nil)
_ = logStore.Subscribe(context.Background())
})
}
func TestLogStore_Subscribe_ErrorSubscribing(t *testing.T) {
logStore, _ := newErrorLogStore(t, &errLogStoreClient{})
require.False(t, logStore.Subscribed())
require.EqualError(t, logStore.Subscribe(context.Background()), ErrTestError.Error())
}
func TestLogStore_Quit_ResetsSubscription(t *testing.T) {
logStore, _ := newLogStore(t)
require.False(t, logStore.Subscribed())
require.NoError(t, logStore.Subscribe(context.Background()))
require.True(t, logStore.Subscribed())
logStore.Quit()
require.False(t, logStore.Subscribed())
}
func TestLogStore_Quit_NoSubscription_Panics(t *testing.T) {
require.Panics(t, func() {
logStore, _ := newErrorLogStore(t, nil)
logStore.Quit()
})
}
package challenger
import (
"context"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
// SubscriptionId is a unique subscription ID.
type SubscriptionId uint64
// Increment returns the next subscription ID.
func (s *SubscriptionId) Increment() SubscriptionId {
*s++
return *s
}
// Subscription wraps an [ethereum.Subscription] to provide a restart.
type Subscription struct {
// The subscription ID
id SubscriptionId
// The current subscription
sub ethereum.Subscription
// If the subscription is started
started bool
// The query used to create the subscription
query ethereum.FilterQuery
// The log channel
logs chan types.Log
// The quit channel
quit chan struct{}
// Filter client used to open the log subscription
client ethereum.LogFilterer
// Logger
log log.Logger
}
// NewSubscription creates a new subscription.
func NewSubscription(query ethereum.FilterQuery, client ethereum.LogFilterer, log log.Logger) *Subscription {
return &Subscription{
id: SubscriptionId(0),
sub: nil,
started: false,
query: query,
logs: make(chan types.Log),
quit: make(chan struct{}),
client: client,
log: log,
}
}
// ID returns the subscription ID.
func (s *Subscription) ID() SubscriptionId {
return s.id
}
// Started returns true if the subscription has started.
func (s *Subscription) Started() bool {
return s.started
}
// Subscribe constructs the subscription.
func (s *Subscription) Subscribe() error {
s.log.Info("Subscribing to", "query", s.query.Topics, "id", s.id)
sub, err := s.client.SubscribeFilterLogs(context.Background(), s.query, s.logs)
if err != nil {
s.log.Error("failed to subscribe to logs", "err", err)
return err
}
s.sub = sub
s.started = true
return nil
}
// Quit closes the subscription.
func (s *Subscription) Quit() {
s.log.Info("Quitting subscription", "id", s.id)
s.sub.Unsubscribe()
s.quit <- struct{}{}
s.started = false
s.log.Info("Quit subscription", "id", s.id)
}
package challenger
import (
"context"
"errors"
"math"
"testing"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/op-node/testlog"
"github.com/stretchr/testify/require"
)
type mockLogFilterClient struct{}
func (m mockLogFilterClient) FilterLogs(context.Context, ethereum.FilterQuery) ([]types.Log, error) {
panic("this should not be called by the Subscription.Subscribe method")
}
func (m mockLogFilterClient) SubscribeFilterLogs(context.Context, ethereum.FilterQuery, chan<- types.Log) (ethereum.Subscription, error) {
return nil, nil
}
func newSubscription(t *testing.T, client *mockLogFilterClient) (*Subscription, *mockLogFilterClient) {
query := ethereum.FilterQuery{}
log := testlog.Logger(t, log.LvlError)
return NewSubscription(query, client, log), client
}
func FuzzSubscriptionId_Increment(f *testing.F) {
maxUint64 := uint64(math.MaxUint64)
f.Fuzz(func(t *testing.T, id uint64) {
if id >= maxUint64 {
t.Skip("skipping due to overflow")
} else {
subId := SubscriptionId(id)
require.Equal(t, subId.Increment(), SubscriptionId(id+1))
}
})
}
func TestSubscription_Subscribe_NilClient_Panics(t *testing.T) {
defer func() {
if recover() == nil {
t.Error("expected nil client to panic")
}
}()
subscription, _ := newSubscription(t, nil)
require.NoError(t, subscription.Subscribe())
}
func TestSubscription_Subscribe(t *testing.T) {
subscription, _ := newSubscription(t, &mockLogFilterClient{})
require.NoError(t, subscription.Subscribe())
require.True(t, subscription.Started())
}
var ErrSubscriptionFailed = errors.New("failed to subscribe to logs")
type errLogFilterClient struct{}
func (m errLogFilterClient) FilterLogs(context.Context, ethereum.FilterQuery) ([]types.Log, error) {
panic("this should not be called by the Subscription.Subscribe method")
}
func (m errLogFilterClient) SubscribeFilterLogs(context.Context, ethereum.FilterQuery, chan<- types.Log) (ethereum.Subscription, error) {
return nil, ErrSubscriptionFailed
}
func TestSubscription_Subscribe_SubscriptionErrors(t *testing.T) {
query := ethereum.FilterQuery{}
log := testlog.Logger(t, log.LvlError)
subscription := Subscription{
query: query,
client: errLogFilterClient{},
log: log,
}
require.EqualError(t, subscription.Subscribe(), ErrSubscriptionFailed.Error())
}
pragma solidity 0.8.15;
import { Test } from "forge-std/Test.sol";
import { StdInvariant } from "forge-std/StdInvariant.sol";
import { AddressAliasHelper } from "../../vendor/AddressAliasHelper.sol";
contract AddressAliasHelper_Converter {
bool public failedRoundtrip;
/**
* @dev Allows the actor to convert L1 to L2 addresses and vice versa.
*/
function convertRoundTrip(address addr) external {
// Alias our address
address aliasedAddr = AddressAliasHelper.applyL1ToL2Alias(addr);
// Unalias our address
address undoneAliasAddr = AddressAliasHelper.undoL1ToL2Alias(aliasedAddr);
// If our round trip aliasing did not return the original result, set our state.
if (addr != undoneAliasAddr) {
failedRoundtrip = true;
}
}
}
contract AddressAliasHelper_AddressAliasing_Invariant is StdInvariant, Test {
AddressAliasHelper_Converter internal actor;
function setUp() public {
// Create a converter actor.
actor = new AddressAliasHelper_Converter();
targetContract(address(actor));
bytes4[] memory selectors = new bytes4[](1);
selectors[0] = actor.convertRoundTrip.selector;
FuzzSelector memory selector = FuzzSelector({ addr: address(actor), selectors: selectors });
targetSelector(selector);
}
/**
* @custom:invariant Address aliases are always able to be undone.
*
* Asserts that an address that has been aliased with `applyL1ToL2Alias` can always
* be unaliased with `undoL1ToL2Alias`.
*/
function invariant_round_trip_aliasing() external {
// ASSERTION: The round trip aliasing done in testRoundTrip(...) should never fail.
assertEq(actor.failedRoundtrip(), false);
}
}
# `AddressAliasHelper` Invariants
## Address aliases are always able to be undone.
**Test:** [`AddressAliasHelper.t.sol#L48`](../contracts/test/invariants/AddressAliasHelper.t.sol#L48)
Asserts that an address that has been aliased with `applyL1ToL2Alias` can always be unaliased with `undoL1ToL2Alias`.
......@@ -6,6 +6,7 @@ This directory contains documentation for all defined invariant tests within `co
<!-- START autoTOC -->
## Table of Contents
- [AddressAliasHelper](./AddressAliasHelper.md)
- [AddressAliasing](./AddressAliasing.md)
- [Burn.Eth](./Burn.Eth.md)
- [Burn.Gas](./Burn.Gas.md)
......
......@@ -5,7 +5,12 @@ This tool implements `proxyd`, an RPC request router and proxy. It does the foll
1. Whitelists RPC methods.
2. Routes RPC methods to groups of backend services.
3. Automatically retries failed backend requests.
4. Provides metrics the measure request latency, error rates, and the like.
4. Track backend consensus (`latest`, `safe`, `finalized` blocks), peer count and sync state.
5. Re-write requests and responses to enforce consensus.
6. Load balance requests across backend services.
7. Cache immutable responses from backends.
8. Provides metrics the measure request latency, error rates, and the like.
## Usage
......@@ -15,6 +20,74 @@ To configure `proxyd` for use, you'll need to create a configuration file to def
Once you have a config file, start the daemon via `proxyd <path-to-config>.toml`.
## Consensus awareness
Starting on v4.0.0, `proxyd` is aware of the consensus state of its backends. This helps minimize chain reorgs experienced by clients.
To enable this behavior, you must set `consensus_aware` value to `true` in the backend group.
When consensus awareness is enabled, `proxyd` will poll the backends for their states and resolve a consensus group based on:
* the common ancestor `latest` block, i.e. if a backend is experiencing a fork, the fork won't be visible to the clients
* the lowest `safe` block
* the lowest `finalized` block
* peer count
* sync state
The backend group then acts as a round-robin load balancer distributing traffic equally across healthy backends in the consensus group, increasing the availability of the proxy.
A backend is considered healthy if it meets the following criteria:
* not banned
* avg 1-min moving window error rate ≤ configurable threshold
* avg 1-min moving window latency ≤ configurable threshold
* peer count ≥ configurable threshold
* `latest` block lag ≤ configurable threshold
* last state update ≤ configurable threshold
* not currently syncing
When a backend is experiencing inconsistent consensus, high error rates or high latency,
the backend will be banned for a configurable amount of time (default 5 minutes)
and won't receive any traffic during this period.
## Tag rewrite
When consensus awareness is enabled, `proxyd` will enforce the consensus state transparently for all the clients.
For example, if a client requests the `eth_getBlockByNumber` method with the `latest` tag,
`proxyd` will rewrite the request to use the resolved latest block from the consensus group
and forward it to the backend.
The following request methods are rewritten:
* `eth_getLogs`
* `eth_newFilter`
* `eth_getBalance`
* `eth_getCode`
* `eth_getTransactionCount`
* `eth_call`
* `eth_getStorageAt`
* `eth_getBlockTransactionCountByNumber`
* `eth_getUncleCountByBlockNumber`
* `eth_getBlockByNumber`
* `eth_getTransactionByBlockNumberAndIndex`
* `eth_getUncleByBlockNumberAndIndex`
And `eth_blockNumber` response is overridden with current block consensus.
## Cacheable methods
Cache use Redis and can be enabled for the following immutable methods:
* `eth_chainId`
* `net_version`
* `eth_getBlockTransactionCountByHash`
* `eth_getUncleCountByBlockHash`
* `eth_getBlockByHash`
* `eth_getTransactionByBlockHashAndIndex`
* `eth_getUncleByBlockHashAndIndex`
## Metrics
See `metrics.go` for a list of all available metrics.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment