Commit 0fee34b6 authored by Sam Stokes's avatar Sam Stokes Committed by GitHub

op-service: add configurable client timeout (#12074)

* op-service: add configurable timeouts

* op-service: fix lazy_dial
parent 718b9b01
......@@ -49,7 +49,7 @@ func (l *LazyRPC) dial(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to dial: %w", err)
}
l.inner = &BaseRPCClient{c: underlying}
l.inner = NewBaseRPCClient(underlying)
return nil
}
......@@ -66,6 +66,7 @@ func (l *LazyRPC) CallContext(ctx context.Context, result any, method string, ar
if err := l.dial(ctx); err != nil {
return err
}
fmt.Println("checkpoin 1")
return l.inner.CallContext(ctx, result, method, args...)
}
......
......@@ -36,10 +36,26 @@ type rpcConfig struct {
limit float64
burst int
lazy bool
callTimeout time.Duration
batchCallTimeout time.Duration
}
type RPCOption func(cfg *rpcConfig) error
func WithCallTimeout(d time.Duration) RPCOption {
return func(cfg *rpcConfig) error {
cfg.callTimeout = d
return nil
}
}
func WithBatchCallTimeout(d time.Duration) RPCOption {
return func(cfg *rpcConfig) error {
cfg.batchCallTimeout = d
return nil
}
}
// WithDialBackoff configures the number of attempts for the initial dial to the RPC,
// attempts are executed with an exponential backoff strategy.
func WithDialBackoff(attempts int) RPCOption {
......@@ -98,6 +114,12 @@ func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...RPCOption)
if cfg.backoffAttempts < 1 { // default to at least 1 attempt, or it always fails to dial.
cfg.backoffAttempts = 1
}
if cfg.callTimeout == 0 {
cfg.callTimeout = 10 * time.Second
}
if cfg.batchCallTimeout == 0 {
cfg.batchCallTimeout = 20 * time.Second
}
var wrapped RPC
if cfg.lazy {
......@@ -107,7 +129,7 @@ func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...RPCOption)
if err != nil {
return nil, err
}
wrapped = &BaseRPCClient{c: underlying}
wrapped = &BaseRPCClient{c: underlying, callTimeout: cfg.callTimeout, batchCallTimeout: cfg.batchCallTimeout}
}
if cfg.limit != 0 {
......@@ -172,10 +194,12 @@ func IsURLAvailable(ctx context.Context, address string) bool {
// It sets a timeout of 10s on CallContext & 20s on BatchCallContext made through it.
type BaseRPCClient struct {
c *rpc.Client
batchCallTimeout time.Duration
callTimeout time.Duration
}
func NewBaseRPCClient(c *rpc.Client) *BaseRPCClient {
return &BaseRPCClient{c: c}
return &BaseRPCClient{c: c, callTimeout: 10 * time.Second, batchCallTimeout: 20 * time.Second}
}
func (b *BaseRPCClient) Close() {
......@@ -183,13 +207,13 @@ func (b *BaseRPCClient) Close() {
}
func (b *BaseRPCClient) CallContext(ctx context.Context, result any, method string, args ...any) error {
cCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
cCtx, cancel := context.WithTimeout(ctx, b.callTimeout)
defer cancel()
return b.c.CallContext(cCtx, result, method, args...)
}
func (b *BaseRPCClient) BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error {
cCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
cCtx, cancel := context.WithTimeout(ctx, b.batchCallTimeout)
defer cancel()
return b.c.BatchCallContext(cCtx, batch)
}
......
......@@ -53,6 +53,7 @@ type EngineAPIClient struct {
RPC client.RPC
log log.Logger
evp EngineVersionProvider
timeout time.Duration
}
type EngineVersionProvider interface {
......@@ -66,6 +67,16 @@ func NewEngineAPIClient(rpc client.RPC, l log.Logger, evp EngineVersionProvider)
RPC: rpc,
log: l,
evp: evp,
timeout: time.Second * 5,
}
}
func NewEngineAPIClientWithTimeout(rpc client.RPC, l log.Logger, evp EngineVersionProvider, timeout time.Duration) *EngineAPIClient {
return &EngineAPIClient{
RPC: rpc,
log: l,
evp: evp,
timeout: timeout,
}
}
......@@ -84,7 +95,7 @@ func (s *EngineAPIClient) ForkchoiceUpdate(ctx context.Context, fc *eth.Forkchoi
llog := s.log.New("state", fc) // local logger
tlog := llog.New("attr", attributes) // trace logger
tlog.Trace("Sharing forkchoice-updated signal")
fcCtx, cancel := context.WithTimeout(ctx, time.Second*5)
fcCtx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()
var result eth.ForkchoiceUpdatedResult
method := s.evp.ForkchoiceUpdatedVersion(attributes)
......@@ -120,7 +131,7 @@ func (s *EngineAPIClient) NewPayload(ctx context.Context, payload *eth.Execution
e := s.log.New("block_hash", payload.BlockHash)
e.Trace("sending payload for execution")
execCtx, cancel := context.WithTimeout(ctx, time.Second*5)
execCtx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()
var result eth.PayloadStatusV1
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment