Commit 1029095c authored by mergify[bot]'s avatar mergify[bot] Committed by GitHub

Merge branch 'develop' into aj/fpp-engine-unknown-block

parents 7742c972 52779a2b
...@@ -600,7 +600,7 @@ jobs: ...@@ -600,7 +600,7 @@ jobs:
- run: - run:
name: run lint name: run lint
command: | command: |
golangci-lint run -E goimports,sqlclosecheck,bodyclose,asciicheck,misspell,errorlint --timeout 2m -e "errors.As" -e "errors.Is" ./... golangci-lint run -E goimports,sqlclosecheck,bodyclose,asciicheck,misspell,errorlint --timeout 5m -e "errors.As" -e "errors.Is" ./...
working_directory: <<parameters.module>> working_directory: <<parameters.module>>
go-test: go-test:
......
...@@ -82,7 +82,7 @@ func (s *channelManager) TxFailed(id txID) { ...@@ -82,7 +82,7 @@ func (s *channelManager) TxFailed(id txID) {
} }
s.metr.RecordBatchTxFailed() s.metr.RecordBatchTxFailed()
if s.closed && len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 { if s.closed && len(s.confirmedTransactions) == 0 && len(s.pendingTransactions) == 0 && s.pendingChannel != nil {
s.log.Info("Channel has no submitted transactions, clearing for shutdown", "chID", s.pendingChannel.ID()) s.log.Info("Channel has no submitted transactions, clearing for shutdown", "chID", s.pendingChannel.ID())
s.clearPendingChannel() s.clearPendingChannel()
} }
......
...@@ -191,7 +191,7 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) { ...@@ -191,7 +191,7 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) {
if err != nil { if err != nil {
l.log.Warn("Error calculating L2 block range", "err", err) l.log.Warn("Error calculating L2 block range", "err", err)
return return
} else if start.Number == end.Number { } else if start.Number >= end.Number {
return return
} }
......
...@@ -58,12 +58,14 @@ type Metrics struct { ...@@ -58,12 +58,14 @@ type Metrics struct {
PendingBlocksCount prometheus.GaugeVec PendingBlocksCount prometheus.GaugeVec
BlocksAddedCount prometheus.Gauge BlocksAddedCount prometheus.Gauge
ChannelInputBytes prometheus.GaugeVec ChannelInputBytes prometheus.GaugeVec
ChannelReadyBytes prometheus.Gauge ChannelReadyBytes prometheus.Gauge
ChannelOutputBytes prometheus.Gauge ChannelOutputBytes prometheus.Gauge
ChannelClosedReason prometheus.Gauge ChannelClosedReason prometheus.Gauge
ChannelNumFrames prometheus.Gauge ChannelNumFrames prometheus.Gauge
ChannelComprRatio prometheus.Histogram ChannelComprRatio prometheus.Histogram
ChannelInputBytesTotal prometheus.Counter
ChannelOutputBytesTotal prometheus.Counter
BatcherTxEvs opmetrics.EventVec BatcherTxEvs opmetrics.EventVec
} }
...@@ -144,6 +146,16 @@ func NewMetrics(procName string) *Metrics { ...@@ -144,6 +146,16 @@ func NewMetrics(procName string) *Metrics {
Help: "Compression ratios of closed channel.", Help: "Compression ratios of closed channel.",
Buckets: append([]float64{0.1, 0.2}, prometheus.LinearBuckets(0.3, 0.05, 14)...), Buckets: append([]float64{0.1, 0.2}, prometheus.LinearBuckets(0.3, 0.05, 14)...),
}), }),
ChannelInputBytesTotal: factory.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "input_bytes_total",
Help: "Total number of bytes to a channel.",
}),
ChannelOutputBytesTotal: factory.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "output_bytes_total",
Help: "Total number of compressed output bytes from a channel.",
}),
BatcherTxEvs: opmetrics.NewEventVec(factory, ns, "", "batcher_tx", "BatcherTx", []string{"stage"}), BatcherTxEvs: opmetrics.NewEventVec(factory, ns, "", "batcher_tx", "BatcherTx", []string{"stage"}),
} }
...@@ -219,6 +231,8 @@ func (m *Metrics) RecordChannelClosed(id derive.ChannelID, numPendingBlocks int, ...@@ -219,6 +231,8 @@ func (m *Metrics) RecordChannelClosed(id derive.ChannelID, numPendingBlocks int,
m.ChannelNumFrames.Set(float64(numFrames)) m.ChannelNumFrames.Set(float64(numFrames))
m.ChannelInputBytes.WithLabelValues(StageClosed).Set(float64(inputBytes)) m.ChannelInputBytes.WithLabelValues(StageClosed).Set(float64(inputBytes))
m.ChannelOutputBytes.Set(float64(outputComprBytes)) m.ChannelOutputBytes.Set(float64(outputComprBytes))
m.ChannelInputBytesTotal.Add(float64(inputBytes))
m.ChannelOutputBytesTotal.Add(float64(outputComprBytes))
var comprRatio float64 var comprRatio float64
if inputBytes > 0 { if inputBytes > 0 {
......
...@@ -24,6 +24,7 @@ type IterativeBatchCall[K any, V any] struct { ...@@ -24,6 +24,7 @@ type IterativeBatchCall[K any, V any] struct {
makeRequest func(K) (V, rpc.BatchElem) makeRequest func(K) (V, rpc.BatchElem)
getBatch BatchCallContextFn getBatch BatchCallContextFn
getSingle CallContextFn
requestsValues []V requestsValues []V
scheduled chan rpc.BatchElem scheduled chan rpc.BatchElem
...@@ -35,6 +36,7 @@ func NewIterativeBatchCall[K any, V any]( ...@@ -35,6 +36,7 @@ func NewIterativeBatchCall[K any, V any](
requestsKeys []K, requestsKeys []K,
makeRequest func(K) (V, rpc.BatchElem), makeRequest func(K) (V, rpc.BatchElem),
getBatch BatchCallContextFn, getBatch BatchCallContextFn,
getSingle CallContextFn,
batchSize int) *IterativeBatchCall[K, V] { batchSize int) *IterativeBatchCall[K, V] {
if len(requestsKeys) < batchSize { if len(requestsKeys) < batchSize {
...@@ -47,6 +49,7 @@ func NewIterativeBatchCall[K any, V any]( ...@@ -47,6 +49,7 @@ func NewIterativeBatchCall[K any, V any](
out := &IterativeBatchCall[K, V]{ out := &IterativeBatchCall[K, V]{
completed: 0, completed: 0,
getBatch: getBatch, getBatch: getBatch,
getSingle: getSingle,
requestsKeys: requestsKeys, requestsKeys: requestsKeys,
batchSize: batchSize, batchSize: batchSize,
makeRequest: makeRequest, makeRequest: makeRequest,
...@@ -119,11 +122,23 @@ func (ibc *IterativeBatchCall[K, V]) Fetch(ctx context.Context) error { ...@@ -119,11 +122,23 @@ func (ibc *IterativeBatchCall[K, V]) Fetch(ctx context.Context) error {
break break
} }
if err := ibc.getBatch(ctx, batch); err != nil { if len(batch) == 0 {
for _, r := range batch { return nil
ibc.scheduled <- r }
if ibc.batchSize == 1 {
first := batch[0]
if err := ibc.getSingle(ctx, &first.Result, first.Method, first.Args...); err != nil {
ibc.scheduled <- first
return err
}
} else {
if err := ibc.getBatch(ctx, batch); err != nil {
for _, r := range batch {
ibc.scheduled <- r
}
return fmt.Errorf("failed batch-retrieval: %w", err)
} }
return fmt.Errorf("failed batch-retrieval: %w", err)
} }
var result error var result error
for _, elem := range batch { for _, elem := range batch {
......
...@@ -34,7 +34,8 @@ type batchTestCase struct { ...@@ -34,7 +34,8 @@ type batchTestCase struct {
batchSize int batchSize int
batchCalls []batchCall batchCalls []batchCall
singleCalls []elemCall
mock.Mock mock.Mock
} }
...@@ -53,7 +54,14 @@ func (tc *batchTestCase) GetBatch(ctx context.Context, b []rpc.BatchElem) error ...@@ -53,7 +54,14 @@ func (tc *batchTestCase) GetBatch(ctx context.Context, b []rpc.BatchElem) error
if ctx.Err() != nil { if ctx.Err() != nil {
return ctx.Err() return ctx.Err()
} }
return tc.Mock.MethodCalled("get", b).Get(0).([]error)[0] return tc.Mock.MethodCalled("getBatch", b).Get(0).([]error)[0]
}
func (tc *batchTestCase) GetSingle(ctx context.Context, result any, method string, args ...any) error {
if ctx.Err() != nil {
return ctx.Err()
}
return tc.Mock.MethodCalled("getSingle", (*(result.(*interface{}))).(*string), method, args[0]).Get(0).([]error)[0]
} }
var mockErr = errors.New("mockErr") var mockErr = errors.New("mockErr")
...@@ -64,7 +72,7 @@ func (tc *batchTestCase) Run(t *testing.T) { ...@@ -64,7 +72,7 @@ func (tc *batchTestCase) Run(t *testing.T) {
keys[i] = i keys[i] = i
} }
makeMock := func(bci int, bc batchCall) func(args mock.Arguments) { makeBatchMock := func(bci int, bc batchCall) func(args mock.Arguments) {
return func(args mock.Arguments) { return func(args mock.Arguments) {
batch := args[0].([]rpc.BatchElem) batch := args[0].([]rpc.BatchElem)
for i, elem := range batch { for i, elem := range batch {
...@@ -94,10 +102,30 @@ func (tc *batchTestCase) Run(t *testing.T) { ...@@ -94,10 +102,30 @@ func (tc *batchTestCase) Run(t *testing.T) {
}) })
} }
if len(bc.elems) > 0 { if len(bc.elems) > 0 {
tc.On("get", batch).Once().Run(makeMock(bci, bc)).Return([]error{bc.rpcErr}) // wrap to preserve nil as type of error tc.On("getBatch", batch).Once().Run(makeBatchMock(bci, bc)).Return([]error{bc.rpcErr}) // wrap to preserve nil as type of error
}
}
makeSingleMock := func(eci int, ec elemCall) func(args mock.Arguments) {
return func(args mock.Arguments) {
result := args[0].(*string)
id := args[2].(int)
require.Equal(t, ec.id, id, "element should match expected element")
if ec.err {
*result = ""
} else {
*result = fmt.Sprintf("mock result id %d", id)
}
} }
} }
iter := NewIterativeBatchCall[int, *string](keys, makeTestRequest, tc.GetBatch, tc.batchSize) // mock the results of unbatched calls
for eci, ec := range tc.singleCalls {
var ret error
if ec.err {
ret = mockErr
}
tc.On("getSingle", new(string), "testing_foobar", ec.id).Once().Run(makeSingleMock(eci, ec)).Return([]error{ret})
}
iter := NewIterativeBatchCall[int, *string](keys, makeTestRequest, tc.GetBatch, tc.GetSingle, tc.batchSize)
for i, bc := range tc.batchCalls { for i, bc := range tc.batchCalls {
ctx := context.Background() ctx := context.Background()
if bc.makeCtx != nil { if bc.makeCtx != nil {
...@@ -116,6 +144,20 @@ func (tc *batchTestCase) Run(t *testing.T) { ...@@ -116,6 +144,20 @@ func (tc *batchTestCase) Run(t *testing.T) {
} }
} }
} }
for i, ec := range tc.singleCalls {
ctx := context.Background()
err := iter.Fetch(ctx)
if err == io.EOF {
require.Equal(t, i, len(tc.singleCalls)-1, "EOF only on last call")
} else {
require.False(t, iter.Complete())
if ec.err {
require.Error(t, err)
} else {
require.NoError(t, err)
}
}
}
require.True(t, iter.Complete(), "batch iter should be complete after the expected calls") require.True(t, iter.Complete(), "batch iter should be complete after the expected calls")
out, err := iter.Result() out, err := iter.Result()
require.NoError(t, err) require.NoError(t, err)
...@@ -154,6 +196,37 @@ func TestFetchBatched(t *testing.T) { ...@@ -154,6 +196,37 @@ func TestFetchBatched(t *testing.T) {
}, },
}, },
}, },
{
name: "single element",
items: 1,
batchSize: 4,
singleCalls: []elemCall{
{id: 0, err: false},
},
},
{
name: "unbatched",
items: 4,
batchSize: 1,
singleCalls: []elemCall{
{id: 0, err: false},
{id: 1, err: false},
{id: 2, err: false},
{id: 3, err: false},
},
},
{
name: "unbatched with retry",
items: 4,
batchSize: 1,
singleCalls: []elemCall{
{id: 0, err: false},
{id: 1, err: true},
{id: 2, err: false},
{id: 3, err: false},
{id: 1, err: false},
},
},
{ {
name: "split", name: "split",
items: 5, items: 5,
...@@ -240,7 +313,7 @@ func TestFetchBatched(t *testing.T) { ...@@ -240,7 +313,7 @@ func TestFetchBatched(t *testing.T) {
}, },
{ {
name: "context timeout", name: "context timeout",
items: 1, items: 2,
batchSize: 3, batchSize: 3,
batchCalls: []batchCall{ batchCalls: []batchCall{
{ {
...@@ -255,6 +328,7 @@ func TestFetchBatched(t *testing.T) { ...@@ -255,6 +328,7 @@ func TestFetchBatched(t *testing.T) {
{ {
elems: []elemCall{ elems: []elemCall{
{id: 0, err: false}, {id: 0, err: false},
{id: 1, err: false},
}, },
err: "", err: "",
}, },
......
...@@ -373,6 +373,7 @@ func (job *receiptsFetchingJob) runFetcher(ctx context.Context) error { ...@@ -373,6 +373,7 @@ func (job *receiptsFetchingJob) runFetcher(ctx context.Context) error {
job.txHashes, job.txHashes,
makeReceiptRequest, makeReceiptRequest,
job.client.BatchCallContext, job.client.BatchCallContext,
job.client.CallContext,
job.maxBatchSize, job.maxBatchSize,
) )
} }
......
...@@ -67,10 +67,10 @@ type ETHBackend interface { ...@@ -67,10 +67,10 @@ type ETHBackend interface {
// NonceAt returns the account nonce of the given account. // NonceAt returns the account nonce of the given account.
// The block number can be nil, in which case the nonce is taken from the latest known block. // The block number can be nil, in which case the nonce is taken from the latest known block.
NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
// PendingNonce returns the pending nonce. // PendingNonceAt returns the pending nonce.
PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error)
/// EstimateGas returns an estimate of the amount of gas needed to execute the given // EstimateGas returns an estimate of the amount of gas needed to execute the given
/// transaction against the current pending block. // transaction against the current pending block.
EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error)
} }
......
...@@ -207,7 +207,7 @@ function deleteL2Outputs(uint256 _l2OutputIndex) external ...@@ -207,7 +207,7 @@ function deleteL2Outputs(uint256 _l2OutputIndex) external
/** /**
* @notice Computes the block number of the next L2 block that needs to be checkpointed. * @notice Computes the block number of the next L2 block that needs to be checkpointed.
*/ */
function getNextBlockNumber() public view returns (uint256) function nextBlockNumber() public view returns (uint256)
``` ```
### Configuration ### Configuration
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment