Commit 16276759 authored by luxq's avatar luxq

add new inject point

parent fbccbf30
......@@ -20,10 +20,15 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.attacker.AttackService;
import tech.pegasys.teku.attacker.AttackerCommand;
import tech.pegasys.teku.attacker.AttackerResponse;
import tech.pegasys.teku.bls.BLSSignature;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.metrics.Validator.DutyType;
......@@ -120,39 +125,103 @@ public class AttestationProductionDuty implements Duty {
final UInt64 slot,
final ForkInfo forkInfo,
final Int2ObjectMap<ScheduledCommittee> validatorsByCommitteeIndex) {
AttackService attack = new AttackService();
return validatorsByCommitteeIndex.int2ObjectEntrySet().stream()
.flatMap(
entry ->
produceAttestationsForCommittee(slot, forkInfo, entry.getIntKey(), entry.getValue())
.stream());
.flatMap(
entry ->
produceAttestationsForCommittee(slot, forkInfo, entry.getIntKey(), entry.getValue()).stream()
)
.map(attestationFuture -> {
if (attack.enabled()) {
return attack.attestBeforePropose(slot.longValue(), "","").thenCompose(
response -> {
if (response.getCmd() == AttackerCommand.CMD_SKIP || response.getCmd()==AttackerCommand.CMD_RETURN
|| response.getCmd() == AttackerCommand.CMD_CONTINUE) {
try {
return SafeFuture.completedFuture(ProductionResult.failure(
attestationFuture.get().getValidatorPublicKeys(),
new IllegalStateException(
"Attestation propose skipped due to attack command.")));
} catch (ExecutionException | InterruptedException e) {
// do nothing.
return attestationFuture;
}
}else if (response.getCmd() == AttackerCommand.CMD_NULL) {
return attestationFuture;
} else {
return attestationFuture;
}
});
} else {
return attestationFuture;
}
})
.map(attestationFuture -> {
if (attack.enabled()) {
return attack.attestBeforeBroadcast(slot.longValue()).thenCompose(
response -> {
if (response.getCmd() == AttackerCommand.CMD_SKIP || response.getCmd()==AttackerCommand.CMD_RETURN
|| response.getCmd() == AttackerCommand.CMD_CONTINUE) {
try {
return SafeFuture.completedFuture(ProductionResult.failure(
attestationFuture.get().getValidatorPublicKeys(),
new IllegalStateException(
"Attestation broad skipped due to attack command.")));
} catch (ExecutionException | InterruptedException e) {
// do nothing.
return attestationFuture;
}
}else if (response.getCmd() == AttackerCommand.CMD_NULL) {
return attestationFuture;
} else {
return attestationFuture;
}
});
} else {
return attestationFuture;
}
});
}
private List<SafeFuture<ProductionResult<Attestation>>> produceAttestationsForCommittee(
final UInt64 slot,
final ForkInfo forkInfo,
final int committeeIndex,
final ScheduledCommittee committee) {
final UInt64 slot,
final ForkInfo forkInfo,
final int committeeIndex,
final ScheduledCommittee committee) {
final SafeFuture<Optional<AttestationData>> unsignedAttestationFuture =
validatorDutyMetrics.record(
() -> validatorApiChannel.createAttestationData(slot, committeeIndex),
this,
CREATE_TOTAL);
validatorDutyMetrics.record(
() -> validatorApiChannel.createAttestationData(slot, committeeIndex),
this,
CREATE_TOTAL);
unsignedAttestationFuture.propagateTo(committee.getAttestationDataFuture());
final SignedAttestationProducer signedAttestationProducer =
selectSignedAttestationProducer(slot);
selectSignedAttestationProducer(slot);
AttackService attack = new AttackService();
return committee.getValidators().stream()
.map(
validator ->
signAttestationForValidatorInCommittee(
slot,
forkInfo,
committeeIndex,
validator,
signedAttestationProducer,
unsignedAttestationFuture))
.toList();
.map(
validator ->
attack.attestBeforeSign(slot.longValue(),validator.publicKey().toHexString(), "")
.thenCompose(response -> {
if (response.getCmd() == AttackerCommand.CMD_SKIP ||
response.getCmd() == AttackerCommand.CMD_RETURN ||
response.getCmd() == AttackerCommand.CMD_EXIT ) {
return SafeFuture.completedFuture(
ProductionResult.failure(
validator.publicKey(),
new IllegalStateException(
"Attestation signing skipped due to attack command.")));
} else {
return signAttestationForValidatorInCommittee(
slot,
forkInfo,
committeeIndex,
validator,
signedAttestationProducer,
unsignedAttestationFuture);
}
}))
.toList();
}
private SignedAttestationProducer selectSignedAttestationProducer(final UInt64 slot) {
......@@ -185,6 +254,31 @@ public class AttestationProductionDuty implements Duty {
final ValidatorWithAttestationDutyInfo validator,
final SignedAttestationProducer signedAttestationProducer,
final SafeFuture<Optional<AttestationData>> attestationDataFuture) {
// AttackService attack = new AttackService();
// if (attack.enabled()) {
// try {
// AttackerResponse res =
// attack.blockGetNewParentRoot(slot.longValue(), "", "").get();
// switch (res.getCmd()) {
// case CMD_EXIT:
// case CMD_ABORT:
// System.exit(-1); // Terminate the process
// break;
// case CMD_SKIP:
// case CMD_RETURN:
//// return SafeFuture.completedFuture(Optional.empty());
// case CMD_NULL:
// break;
// case CMD_CONTINUE:
// // Do nothing
// break;
// default:
// // Do nothing.
// }
// } catch (Exception e) {
//// return SafeFuture.completedFuture(Optional.empty());
// }
// }
return attestationDataFuture
.thenCompose(
maybeUnsignedAttestation ->
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment