Skip to content

Commit

Permalink
Resolve some PeerDAS todos (#6434)
Browse files Browse the repository at this point in the history
* Resolve some PeerDAS todos
  • Loading branch information
dapplion authored Nov 5, 2024
1 parent 3838897 commit d8dbda3
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 19 deletions.
12 changes: 2 additions & 10 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1188,22 +1188,14 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}

fn on_sampling_result(&mut self, requester: SamplingRequester, result: SamplingResult) {
// TODO(das): How is a consumer of sampling results?
// - Fork-choice for trailing DA
// - Single lookups to complete import requirements
// - Range sync to complete import requirements? Can sampling for syncing lag behind and
// accumulate in fork-choice?

match requester {
SamplingRequester::ImportedBlock(block_root) => {
debug!(self.log, "Sampling result"; "block_root" => %block_root, "result" => ?result);

// TODO(das): Consider moving SamplingResult to the beacon_chain crate and import
// here. No need to add too much enum variants, just whatever the beacon_chain or
// fork-choice needs to make a decision. Currently the fork-choice only needs to
// be notified of successful samplings, i.e. sampling failures don't trigger pruning
match result {
Ok(_) => {
// Notify the fork-choice of a successful sampling result to mark the block
// branch as safe.
if let Err(e) = self
.network
.beacon_processor()
Expand Down
2 changes: 0 additions & 2 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.log.clone(),
);

// TODO(das): start request
// Note that you can only send, but not handle a response here
match request.continue_requests(self) {
Ok(_) => {
Expand All @@ -779,7 +778,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.custody_by_root_requests.insert(requester, request);
Ok(LookupRequestResult::RequestSent(req_id))
}
// TODO(das): handle this error properly
Err(e) => Err(RpcRequestSendError::CustodyRequestError(e)),
}
}
Expand Down
15 changes: 8 additions & 7 deletions beacon_node/network/src/sync/peer_sampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ pub type SamplingResult = Result<(), SamplingError>;
type DataColumnSidecarList<E> = Vec<Arc<DataColumnSidecar<E>>>;

pub struct Sampling<T: BeaconChainTypes> {
// TODO(das): stalled sampling request are never cleaned up
requests: HashMap<SamplingRequester, ActiveSamplingRequest<T>>,
sampling_config: SamplingConfig,
log: slog::Logger,
Expand Down Expand Up @@ -313,8 +312,8 @@ impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {
.iter()
.position(|data| &data.index == column_index)
else {
// Peer does not have the requested data.
// TODO(das) what to do?
// Peer does not have the requested data, mark peer as "dont have" and try
// again with a different peer.
debug!(self.log,
"Sampling peer claims to not have the data";
"block_root" => %self.block_root,
Expand Down Expand Up @@ -373,7 +372,9 @@ impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {
sampling_request_id,
},
) {
// TODO(das): Beacon processor is overloaded, what should we do?
// Beacon processor is overloaded, drop sampling attempt. Failing to sample
// is not a permanent state so we should recover once the node has capacity
// and receives a descendant block.
error!(self.log,
"Dropping sampling";
"block" => %self.block_root,
Expand All @@ -391,8 +392,8 @@ impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {
);
metrics::inc_counter_vec(&metrics::SAMPLE_DOWNLOAD_RESULT, &[metrics::FAILURE]);

// Error downloading, maybe penalize peer and retry again.
// TODO(das) with different peer or different peer?
// Error downloading, malicious network errors are already penalized before
// reaching this function. Mark the peer as failed and try again with another.
for column_index in column_indexes {
let Some(request) = self.column_requests.get_mut(column_index) else {
warn!(self.log,
Expand Down Expand Up @@ -453,7 +454,7 @@ impl<T: BeaconChainTypes> ActiveSamplingRequest<T> {
debug!(self.log, "Sample verification failure"; "block_root" => %self.block_root, "column_indexes" => ?column_indexes, "reason" => ?err);
metrics::inc_counter_vec(&metrics::SAMPLE_VERIFY_RESULT, &[metrics::FAILURE]);

// TODO(das): Peer sent invalid data, penalize and try again from different peer
// Peer sent invalid data, penalize and try again from different peer
// TODO(das): Count individual failures
for column_index in column_indexes {
let Some(request) = self.column_requests.get_mut(column_index) else {
Expand Down

0 comments on commit d8dbda3

Please sign in to comment.