Commit b9523cb7 authored by Sam Calder-Mason's avatar Sam Calder-Mason Committed by GitHub

feat: Add Xatu Sentry (#466)

Adds support for [Xatu](https://github.com/ethpandaops/xatu) in Sentry
mode. Similarly to Ethereum Metrics Exporter, Xatu Sentry can be enabled
globally, or per client pair.

Note: This PR only enables "Sentry". Xatu Server/Clickhouse etc will
need to run outside of Kurtosis for this to be useful.

---------
Co-authored-by: default avatarBarnabas Busa <busa.barnabas@gmail.com>
parent 0bd29dd7
......@@ -231,6 +231,10 @@ participants:
# Defaults to false
ethereum_metrics_exporter_enabled: false
# Enables Xatu Sentry for this participant. Can be set globally.
# Defaults to false
xatu_sentry_enabled: false
# Count of nodes to spin up for this participant
# Default to 1
count: 1
......@@ -466,6 +470,31 @@ mev_params:
# Optional parameters to send to the custom_flood script that sends reliable payloads
custom_flood_params:
interval_between_transactions: 1
# Enables Xatu Sentry for all participants
# Defaults to false
xatu_sentry_enabled: false
# Xatu Sentry params
xatu_sentry_params:
# The image to use for Xatu Sentry
xatu_sentry_image: ethpandaops/xatu:latest
# GRPC Endpoint of Xatu Server to send events to
xatu_server_addr: localhost:8080
# Enables TLS to Xatu Server
xatu_server_tls: false
# Headers to add on to Xatu Server requests
xatu_server_headers: {}
# Beacon event stream topics to subscribe to
beacon_subscriptions:
- attestation
- block
- chain_reorg
- finalized_checkpoint
- head
- voluntary_exit
- contribution_and_proof
- blob_sidecar
```
#### Example configurations
......
......@@ -61,6 +61,7 @@ def run(plan, args={}):
mev_params = args_with_right_defaults.mev_params
parallel_keystore_generation = args_with_right_defaults.parallel_keystore_generation
persistent = args_with_right_defaults.persistent
xatu_sentry_params = args_with_right_defaults.xatu_sentry_params
grafana_datasource_config_template = read_file(
static_files.GRAFANA_DATASOURCE_CONFIG_TEMPLATE_FILEPATH
......@@ -94,6 +95,7 @@ def run(plan, args={}):
args_with_right_defaults.global_client_log_level,
jwt_file,
persistent,
xatu_sentry_params,
parallel_keystore_generation,
)
......@@ -107,12 +109,14 @@ def run(plan, args={}):
all_el_client_contexts = []
all_cl_client_contexts = []
all_ethereum_metrics_exporter_contexts = []
all_xatu_sentry_contexts = []
for participant in all_participants:
all_el_client_contexts.append(participant.el_client_context)
all_cl_client_contexts.append(participant.cl_client_context)
all_ethereum_metrics_exporter_contexts.append(
participant.ethereum_metrics_exporter_context
)
all_xatu_sentry_contexts.append(participant.xatu_sentry_context)
# Generate validator ranges
validator_ranges_config_template = read_file(
......@@ -405,6 +409,7 @@ def run(plan, args={}):
all_cl_client_contexts,
prometheus_additional_metrics_jobs,
all_ethereum_metrics_exporter_contexts,
all_xatu_sentry_contexts,
)
plan.print("Launching grafana...")
......
......@@ -15,6 +15,7 @@ participants:
validator_count: null
snooper_enabled: false
ethereum_metrics_exporter_enabled: false
xatu_sentry_enabled: false
el_min_cpu: 0
el_max_cpu: 0
el_min_mem: 0
......@@ -76,3 +77,4 @@ mev_params:
mev_flood_seconds_per_bundle: 15
grafana_additional_dashboards: []
persistent: false
xatu_sentry_enabled: false
......@@ -56,6 +56,7 @@ ATTR_TO_BE_SKIPPED_AT_ROOT = (
"goomy_blob_params",
"tx_spammer_params",
"custom_flood_params",
"xatu_sentry_params",
)
......@@ -75,6 +76,7 @@ def input_parser(plan, input_args):
result["disable_peer_scoring"] = False
result["goomy_blob_params"] = get_default_goomy_blob_params()
result["assertoor_params"] = get_default_assertoor_params()
result["xatu_sentry_params"] = get_default_xatu_sentry_params()
result["persistent"] = False
for attr in input_args:
......@@ -103,6 +105,10 @@ def input_parser(plan, input_args):
for sub_attr in input_args["assertoor_params"]:
sub_value = input_args["assertoor_params"][sub_attr]
result["assertoor_params"][sub_attr] = sub_value
elif attr == "xatu_sentry_params":
for sub_attr in input_args["xatu_sentry_params"]:
sub_value = input_args["xatu_sentry_params"][sub_attr]
result["xatu_sentry_params"][sub_attr] = sub_value
if result.get("disable_peer_scoring"):
result = enrich_disable_peer_scoring(result)
......@@ -165,6 +171,7 @@ def input_parser(plan, input_args):
ethereum_metrics_exporter_enabled=participant[
"ethereum_metrics_exporter_enabled"
],
xatu_sentry_enabled=participant["xatu_sentry_enabled"],
prometheus_config=struct(
scrape_interval=participant["prometheus_config"]["scrape_interval"],
labels=participant["prometheus_config"]["labels"],
......@@ -249,10 +256,18 @@ def input_parser(plan, input_args):
mev_type=result["mev_type"],
snooper_enabled=result["snooper_enabled"],
ethereum_metrics_exporter_enabled=result["ethereum_metrics_exporter_enabled"],
xatu_sentry_enabled=result["xatu_sentry_enabled"],
parallel_keystore_generation=result["parallel_keystore_generation"],
grafana_additional_dashboards=result["grafana_additional_dashboards"],
disable_peer_scoring=result["disable_peer_scoring"],
persistent=result["persistent"],
xatu_sentry_params=struct(
xatu_sentry_image=result["xatu_sentry_params"]["xatu_sentry_image"],
xatu_server_addr=result["xatu_sentry_params"]["xatu_server_addr"],
xatu_server_headers=result["xatu_sentry_params"]["xatu_server_headers"],
beacon_subscriptions=result["xatu_sentry_params"]["beacon_subscriptions"],
xatu_server_tls=result["xatu_sentry_params"]["xatu_server_tls"],
),
)
......@@ -333,6 +348,8 @@ def parse_network_params(input_args):
"ethereum_metrics_exporter_enabled"
]
xatu_sentry_enabled = participant["xatu_sentry_enabled"]
blobber_enabled = participant["blobber_enabled"]
if blobber_enabled:
# unless we are running lighthouse, we don't support blobber
......@@ -352,6 +369,11 @@ def parse_network_params(input_args):
"ethereum_metrics_exporter_enabled"
] = default_ethereum_metrics_exporter_enabled
if xatu_sentry_enabled == False:
default_xatu_sentry_enabled = result["xatu_sentry_enabled"]
if default_xatu_sentry_enabled:
participant["xatu_sentry_enabled"] = default_xatu_sentry_enabled
validator_count = participant["validator_count"]
if validator_count == None:
default_validator_count = result["network_params"][
......@@ -442,6 +464,7 @@ def default_input_args():
"global_client_log_level": "info",
"snooper_enabled": False,
"ethereum_metrics_exporter_enabled": False,
"xatu_sentry_enabled": False,
"parallel_keystore_generation": False,
"disable_peer_scoring": False,
}
......@@ -501,6 +524,7 @@ def default_participant():
"validator_count": None,
"snooper_enabled": False,
"ethereum_metrics_exporter_enabled": False,
"xatu_sentry_enabled": False,
"count": 1,
"prometheus_config": {
"scrape_interval": "15s",
......@@ -551,6 +575,25 @@ def get_default_assertoor_params():
}
def get_default_xatu_sentry_params():
return {
"xatu_sentry_image": "ethpandaops/xatu:latest",
"xatu_server_addr": "localhost:8080",
"xatu_server_headers": {},
"xatu_server_tls": False,
"beacon_subscriptions": [
"attestation",
"block",
"chain_reorg",
"finalized_checkpoint",
"head",
"voluntary_exit",
"contribution_and_proof",
"blob_sidecar",
],
}
def get_default_custom_flood_params():
# this is a simple script that increases the balance of the coinbase address at a cadence
return {"interval_between_transactions": 1}
......
......@@ -5,6 +5,7 @@ def new_participant(
cl_client_context,
snooper_engine_context,
ethereum_metrics_exporter_context,
xatu_sentry_context,
):
return struct(
el_client_type=el_client_type,
......@@ -13,4 +14,5 @@ def new_participant(
cl_client_context=cl_client_context,
snooper_engine_context=snooper_engine_context,
ethereum_metrics_exporter_context=ethereum_metrics_exporter_context,
xatu_sentry_context=xatu_sentry_context,
)
......@@ -31,6 +31,8 @@ ethereum_metrics_exporter = import_module(
"./ethereum_metrics_exporter/ethereum_metrics_exporter_launcher.star"
)
xatu_sentry = import_module("./xatu_sentry/xatu_sentry_launcher.star")
genesis_constants = import_module(
"./prelaunch_data_generator/genesis_constants/genesis_constants.star"
)
......@@ -61,6 +63,7 @@ def launch_participant_network(
global_log_level,
jwt_file,
persistent,
xatu_sentry_params,
parallel_keystore_generation=False,
):
num_participants = len(participants)
......@@ -365,6 +368,7 @@ def launch_participant_network(
all_snooper_engine_contexts = []
all_cl_client_contexts = []
all_ethereum_metrics_exporter_contexts = []
all_xatu_sentry_contexts = []
preregistered_validator_keys_for_nodes = (
validator_data.per_node_keystores
if network_params.network == "kurtosis"
......@@ -513,6 +517,29 @@ def launch_participant_network(
all_ethereum_metrics_exporter_contexts.append(ethereum_metrics_exporter_context)
xatu_sentry_context = None
if participant.xatu_sentry_enabled:
pair_name = "{0}-{1}-{2}".format(index_str, cl_client_type, el_client_type)
xatu_sentry_service_name = "xatu-sentry-{0}".format(pair_name)
xatu_sentry_context = xatu_sentry.launch(
plan,
xatu_sentry_service_name,
cl_client_context,
xatu_sentry_params,
network_params,
pair_name,
)
plan.print(
"Successfully added {0} xatu sentry participants".format(
xatu_sentry_context
)
)
all_xatu_sentry_contexts.append(xatu_sentry_context)
plan.print("Successfully added {0} CL participants".format(num_participants))
all_participants = []
......@@ -533,6 +560,10 @@ def launch_participant_network(
ethereum_metrics_exporter_context = all_ethereum_metrics_exporter_contexts[
index
]
xatu_sentry_context = None
if participant.xatu_sentry_enabled:
xatu_sentry_context = all_xatu_sentry_contexts[index]
participant_entry = participant_module.new_participant(
el_client_type,
......@@ -541,6 +572,7 @@ def launch_participant_network(
cl_client_context,
snooper_engine_context,
ethereum_metrics_exporter_context,
xatu_sentry_context,
)
all_participants.append(participant_entry)
......
......@@ -25,12 +25,14 @@ def launch_prometheus(
cl_client_contexts,
additional_metrics_jobs,
ethereum_metrics_exporter_contexts,
xatu_sentry_contexts,
):
metrics_jobs = get_metrics_jobs(
el_client_contexts,
cl_client_contexts,
additional_metrics_jobs,
ethereum_metrics_exporter_contexts,
xatu_sentry_contexts,
)
prometheus_url = prometheus.run(
plan, metrics_jobs, MIN_CPU, MAX_CPU, MIN_MEMORY, MAX_MEMORY
......@@ -44,6 +46,7 @@ def get_metrics_jobs(
cl_client_contexts,
additional_metrics_jobs,
ethereum_metrics_exporter_contexts,
xatu_sentry_contexts,
):
metrics_jobs = []
# Adding execution clients metrics jobs
......@@ -159,6 +162,23 @@ def get_metrics_jobs(
},
)
)
# Adding Xatu Sentry metrics jobs
for context in xatu_sentry_contexts:
if context != None:
metrics_jobs.append(
new_metrics_job(
job_name="xatu-sentry-{0}".format(context.pair_name),
endpoint="{}:{}".format(
context.ip_addr,
context.metrics_port_num,
),
metrics_path="/metrics",
labels={
"pair": context.pair_name,
},
)
)
# Adding additional metrics jobs
for job in additional_metrics_jobs:
if job == None:
......
......@@ -31,6 +31,12 @@ ASSERTOOR_TESTS_CONFIG_DIRPATH = (
STATIC_FILES_DIRPATH + ASSERTOOR_CONFIG_DIRPATH + "/tests"
)
# xatu-sentry config
XATU_SENTRY_CONFIG_DIRPATH = "/xatu-sentry-config"
XATU_SENTRY_CONFIG_TEMPLATE_FILEPATH = (
STATIC_FILES_DIRPATH + XATU_SENTRY_CONFIG_DIRPATH + "/config.yaml.tmpl"
)
# Grafana config
GRAFANA_CONFIG_DIRPATH = "/grafana-config"
GRAFANA_DATASOURCE_CONFIG_TEMPLATE_FILEPATH = (
......
def new_xatu_sentry_context(
ip_addr,
metrics_port_num,
pair_name,
):
return struct(
ip_addr=ip_addr,
metrics_port_num=metrics_port_num,
pair_name=pair_name,
)
shared_utils = import_module("../shared_utils/shared_utils.star")
static_files = import_module("../static_files/static_files.star")
xatu_sentry_context = import_module("../xatu_sentry/xatu_sentry_context.star")
HTTP_PORT_ID = "http"
METRICS_PORT_NUMBER = 9090
XATU_SENTRY_CONFIG_MOUNT_DIRPATH_ON_SERVICE = "/config"
XATU_SENTRY_CONFIG_FILENAME = "config.yaml"
# The min/max CPU/memory that xatu-sentry can use
MIN_CPU = 10
MAX_CPU = 1000
MIN_MEMORY = 16
MAX_MEMORY = 1024
def launch(
plan,
xatu_sentry_service_name,
cl_client_context,
xatu_sentry_params,
network_params,
pair_name,
):
config_template = read_file(static_files.XATU_SENTRY_CONFIG_TEMPLATE_FILEPATH)
template_data = new_config_template_data(
str(METRICS_PORT_NUMBER),
pair_name,
"http://{}:{}".format(
cl_client_context.ip_addr,
cl_client_context.http_port_num,
),
xatu_sentry_params.xatu_server_addr,
network_params.network,
xatu_sentry_params.beacon_subscriptions,
xatu_sentry_params.xatu_server_headers,
xatu_sentry_params.xatu_server_tls,
)
template_and_data = shared_utils.new_template_and_data(
config_template, template_data
)
template_and_data_by_rel_dest_filepath = {}
config_name = "{}-{}".format(xatu_sentry_service_name, XATU_SENTRY_CONFIG_FILENAME)
template_and_data_by_rel_dest_filepath[config_name] = template_and_data
config_files_artifact_name = plan.render_templates(
template_and_data_by_rel_dest_filepath, config_name
)
config_file_path = shared_utils.path_join(
XATU_SENTRY_CONFIG_MOUNT_DIRPATH_ON_SERVICE,
config_name,
)
xatu_sentry_service = plan.add_service(
xatu_sentry_service_name,
ServiceConfig(
image=xatu_sentry_params.xatu_sentry_image,
ports={
HTTP_PORT_ID: shared_utils.new_port_spec(
METRICS_PORT_NUMBER,
shared_utils.TCP_PROTOCOL,
shared_utils.HTTP_APPLICATION_PROTOCOL,
)
},
files={
XATU_SENTRY_CONFIG_MOUNT_DIRPATH_ON_SERVICE: config_files_artifact_name,
},
cmd=[
"sentry",
"--config",
config_file_path,
],
min_cpu=MIN_CPU,
max_cpu=MAX_CPU,
min_memory=MIN_MEMORY,
max_memory=MAX_MEMORY,
),
)
return xatu_sentry_context.new_xatu_sentry_context(
xatu_sentry_service.ip_address,
METRICS_PORT_NUMBER,
pair_name,
)
def new_config_template_data(
metrics_port,
beacon_node_name,
beacon_node_addr,
xatu_server_addr,
network_name,
beacon_subscriptions,
xatu_server_headers,
xatu_server_tls,
):
return {
"MetricsPort": metrics_port,
"BeaconNodeName": beacon_node_name,
"BeaconNodeAddress": beacon_node_addr,
"XatuServerAddress": xatu_server_addr,
"EthereumNetworkName": network_name,
"BeaconSubscriptions": beacon_subscriptions,
"XatuServerHeaders": xatu_server_headers,
"XatuServerTLS": xatu_server_tls,
}
logging: "debug"
metricsAddr: ":{{ .MetricsPort }}"
name: "{{ .BeaconNodeName }}"
ntpServer: time.google.com
ethereum:
beaconNodeAddress: "{{ .BeaconNodeAddress }}"
overrideNetworkName: "{{ .EthereumNetworkName }}"
beaconSubscriptions:
{{- range .BeaconSubscriptions }}
- "{{ . }}"
{{- end }}
forkChoice:
enabled: false
onReOrgEvent:
enabled: false
interval:
enabled: false
every: 30s
at:
enabled: false
slotTimes:
- 4s
attestationData:
enabled: false
allCommittees: false
interval:
enabled: false
every: 30s
at:
enabled: false
slotTimes:
- 4s
beaconCommittees:
enabled: true
outputs:
- name: xatu-server
type: xatu
config:
address: "{{ .XatuServerAddress }}"
tls: {{ .XatuServerTLS }}
maxQueueSize: 51200
batchTimeout: 1s
exportTimeout: 10s
maxExportBatchSize: 256
{{- if .XatuServerHeaders }}
headers:
{{- range $key, $value := .XatuServerHeaders }}
{{ $key }}: "{{ $value }}"
{{- end }}
{{- end }}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment