Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[draft] state sync changes #12501

Draft
wants to merge 12 commits into
base: 2.4.0-test-shard-shuffling
Choose a base branch
from
18 changes: 11 additions & 7 deletions chain/chunks/src/logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use near_primitives::{
},
types::{AccountId, ShardId},
};
use std::collections::HashMap;
use tracing::{debug, debug_span, error};

pub fn need_receipt(
Expand Down Expand Up @@ -54,18 +55,21 @@ pub fn get_shards_cares_about_this_or_next_epoch(
block_header: &BlockHeader,
shard_tracker: &ShardTracker,
epoch_manager: &dyn EpochManagerAdapter,
) -> Vec<ShardId> {
) -> HashMap<ShardId, bool> {
epoch_manager
.shard_ids(&block_header.epoch_id())
.unwrap()
.into_iter()
.filter(|&shard_id| {
cares_about_shard_this_or_next_epoch(
account_id,
block_header.prev_hash(),
.map(|shard_id| {
(
shard_id,
is_me,
shard_tracker,
cares_about_shard_this_or_next_epoch(
account_id,
block_header.prev_hash(),
shard_id,
is_me,
shard_tracker,
),
)
})
.collect()
Expand Down
21 changes: 14 additions & 7 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ pub struct CatchupState {
pub sync_status: StateSyncStatus,
/// Manages going back to apply chunks after state has been downloaded.
pub catchup: BlocksCatchUpState,
/// Determines which shards should be synced
pub tracking_shards: HashMap<ShardId, bool>,
}

pub struct Client {
Expand Down Expand Up @@ -2552,11 +2554,20 @@ impl Client {
let sync_hash = self.get_catchup_sync_hash(&mut state_sync_info, &epoch_first_block)?;
let Some(sync_hash) = sync_hash else { continue };

let CatchupState { state_sync, sync_status: status, catchup } = self
let CatchupState { state_sync, sync_status: status, catchup, tracking_shards } = self
.catchup_state_syncs
.entry(sync_hash)
.or_insert_with(|| {
tracing::debug!(target: "client", ?epoch_first_block, ?sync_hash, "inserting new state sync");
let mut tracking_shards = self.epoch_manager
.shard_ids(&block_header.epoch_id())
.unwrap()
.into_iter()
.map(|shard_id| (shard_id, false))
.collect::<HashMap<ShardId, bool>>();
for shard_id in state_sync_info.shards() {
tracking_shards.insert(*shard_id, true);
}
CatchupState {
state_sync: StateSync::new(
self.clock.clone(),
Expand All @@ -2580,6 +2591,7 @@ impl Client {
computation_tasks: Vec::new(),
},
catchup: BlocksCatchUpState::new(sync_hash, *epoch_id),
tracking_shards,
}
});

Expand All @@ -2588,12 +2600,7 @@ impl Client {
// Initialize the new shard sync to contain the shards to split at
// first. It will get updated with the shard sync download status
// for other shards later.
match state_sync.run(
sync_hash,
status,
highest_height_peers,
state_sync_info.shards(),
)? {
match state_sync.run(sync_hash, status, highest_height_peers, tracking_shards)? {
StateSyncResult::InProgress => {}
StateSyncResult::Completed => {
debug!(target: "catchup", "state sync completed now catch up blocks");
Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ pub(crate) static STATE_SYNC_STAGE: LazyLock<IntGaugeVec> = LazyLock::new(|| {

pub(crate) static STATE_SYNC_DOWNLOAD_RESULT: LazyLock<IntCounterVec> = LazyLock::new(|| {
try_create_int_counter_vec(
"near_state_sync_header_download_result",
"near_state_sync_download_result",
"Count of number of state sync downloads by type (header, part),
source (network, external), and result (timeout, error, success)",
&["shard_id", "type", "source", "result"],
Expand Down
49 changes: 19 additions & 30 deletions chain/client/src/sync/state/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,16 @@ impl StateSyncDownloader {
.boxed()
}

/// Ensures that the shard part is downloaded and validated. If the part exists on disk,
/// just returns. Otherwise, downloads the part, validates it, and retries if needed.
/// Makes a single attempt to obtain a shard part. If the part exists on disk,
/// just returns. Otherwise, downloads the part and validates it.
///
/// This method will only return an error if the download cannot be completed even
/// with retries, or if the download is cancelled.
pub fn ensure_shard_part_downloaded(
/// This method will return an error if the download fails or is cancelled.
pub fn ensure_shard_part_downloaded_single_attempt(
&self,
shard_id: ShardId,
sync_hash: CryptoHash,
part_id: u64,
attempt_count: usize,
header: ShardStateSyncResponseHeader,
cancel: CancellationToken,
) -> BoxFuture<'static, Result<(), near_chain::Error>> {
Expand All @@ -151,15 +151,14 @@ impl StateSyncDownloader {
return Ok(());
}

let i = AtomicUsize::new(0); // for easier Rust async capture
let attempt = || async {
let source = if fallback_source.is_some()
&& i.load(Ordering::Relaxed) >= num_attempts_before_fallback
{
fallback_source.as_ref().unwrap().as_ref()
} else {
preferred_source.as_ref()
};
let source =
if fallback_source.is_some() && attempt_count >= num_attempts_before_fallback {
fallback_source.as_ref().unwrap().as_ref()
} else {
preferred_source.as_ref()
};

let part = source
.download_shard_part(
shard_id,
Expand Down Expand Up @@ -187,25 +186,15 @@ impl StateSyncDownloader {
Ok(())
};

loop {
match attempt().await {
Ok(()) => return Ok(()),
Err(err) => {
handle.set_status(&format!(
"Error: {}, will retry in {}",
err, retry_timeout
));
let deadline = clock.now() + retry_timeout;
tokio::select! {
_ = cancel.cancelled() => {
return Err(near_chain::Error::Other("Cancelled".to_owned()));
}
_ = clock.sleep_until(deadline) => {}
}
}
let res = attempt().await;
if res.is_err() {
let deadline = clock.now() + retry_timeout;
tokio::select! {
_ = clock.sleep_until(deadline) => {}
_ = cancel.cancelled() => {}
}
i.fetch_add(1, Ordering::Relaxed);
}
res
}
.instrument(tracing::debug_span!("StateSyncDownloader::ensure_shard_part_downloaded"))
.boxed()
Expand Down
18 changes: 15 additions & 3 deletions chain/client/src/sync/state/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;

const EXTERNAL_REQUEST_COOLDOWN: time::Duration = Duration::seconds(60);

/// Logic for downloading state sync headers and parts from an external source.
pub(super) struct StateSyncDownloadSourceExternal {
pub clock: Clock,
Expand Down Expand Up @@ -46,12 +48,22 @@ impl StateSyncDownloadSourceExternal {
Err(near_chain::Error::Other("Timeout".to_owned()))
}
_ = cancellation.cancelled() => {
increment_download_count(shard_id, typ, "external", "error");
increment_download_count(shard_id, typ, "external", "cancelled");
Err(near_chain::Error::Other("Cancelled".to_owned()))
}
result = fut => {
// A download error typically indicates that the file is not available yet. At the
// start of the epoch it takes a while for dumpers to populate the external storage
// with state files. This cooldown prevents spamming requests during that time.
let deadline = clock.now() + EXTERNAL_REQUEST_COOLDOWN;
tokio::select! {
_ = clock.sleep_until(deadline) => {}
_ = cancellation.cancelled() => {}
}

result.map_err(|e| {
increment_download_count(shard_id, typ, "network", "error");
increment_download_count(shard_id, typ, "external", "download_error");
tracing::debug!(target: "sync", "Failed to download with error {}", e);
near_chain::Error::Other(format!("Failed to download: {}", e))
})
}
Expand Down Expand Up @@ -94,7 +106,7 @@ impl StateSyncDownloadSource for StateSyncDownloadSourceExternal {
)
.await?;
let header = ShardStateSyncResponseHeader::try_from_slice(&data).map_err(|e| {
increment_download_count(shard_id, "header", "external", "error");
increment_download_count(shard_id, "header", "external", "parse_error");
near_chain::Error::Other(format!("Failed to parse header: {}", e))
})?;

Expand Down
8 changes: 5 additions & 3 deletions chain/client/src/sync/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod util;

use crate::metrics;
use crate::sync::external::{create_bucket_readonly, ExternalConnection};
use crate::sync::state::shard::StateToSync;
use chain_requests::ChainSenderForStateSync;
use downloader::StateSyncDownloader;
use external::StateSyncDownloadSourceExternal;
Expand Down Expand Up @@ -212,7 +213,7 @@ impl StateSync {
sync_hash: CryptoHash,
sync_status: &mut StateSyncStatus,
highest_height_peers: &[HighestHeightPeerInfo],
tracking_shards: &[ShardId],
tracking_shards: &HashMap<ShardId, bool>,
) -> Result<StateSyncResult, near_chain::Error> {
let _span =
tracing::debug_span!(target: "sync", "run_sync", sync_type = "StateSync").entered();
Expand All @@ -223,7 +224,7 @@ impl StateSync {
);

let mut all_done = true;
for shard_id in tracking_shards {
for (shard_id, is_tracking) in tracking_shards {
let key = (sync_hash, *shard_id);
let status = match self.shard_syncs.entry(key) {
Entry::Occupied(mut entry) => match entry.get_mut().result.try_recv() {
Expand Down Expand Up @@ -256,6 +257,7 @@ impl StateSync {
self.store.clone(),
*shard_id,
sync_hash,
if *is_tracking { StateToSync::Full } else { StateToSync::HeaderOnly },
self.downloader.clone(),
self.runtime.clone(),
self.epoch_manager.clone(),
Expand Down Expand Up @@ -288,7 +290,7 @@ impl StateSync {
// If a shard completed syncing, we just remove it. We will not be syncing it again the next time around,
// because we would've marked it as completed in the status for that shard.
self.shard_syncs.retain(|(existing_sync_hash, existing_shard_id), _v| {
tracking_shards.contains(existing_shard_id) && existing_sync_hash == &sync_hash
tracking_shards.contains_key(existing_shard_id) && existing_sync_hash == &sync_hash
});

sync_status.download_tasks = self.downloading_task_tracker.statuses();
Expand Down
8 changes: 4 additions & 4 deletions chain/client/src/sync/state/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,12 @@ impl StateSyncDownloadSourcePeer {
match request_sender.send_async(network_request).await {
Ok(response) => {
if let NetworkResponses::RouteNotFound = response.as_network_response() {
increment_download_count(key.shard_id, typ, "network", "error");
increment_download_count(key.shard_id, typ, "network", "route_not_found");
return Err(near_chain::Error::Other("Route not found".to_owned()));
}
}
Err(e) => {
increment_download_count(key.shard_id, typ, "network", "error");
increment_download_count(key.shard_id, typ, "network", "failed_to_send");
return Err(near_chain::Error::Other(format!("Failed to send request: {}", e)));
}
}
Expand All @@ -206,7 +206,7 @@ impl StateSyncDownloadSourcePeer {
Err(near_chain::Error::Other("Timeout".to_owned()))
}
_ = cancel.cancelled() => {
increment_download_count(key.shard_id, typ, "network", "error");
increment_download_count(key.shard_id, typ, "network", "cancelled");
Err(near_chain::Error::Other("Cancelled".to_owned()))
}
result = receiver => {
Expand All @@ -216,7 +216,7 @@ impl StateSyncDownloadSourcePeer {
Ok(result)
}
Err(_) => {
increment_download_count(key.shard_id, typ, "network", "error");
increment_download_count(key.shard_id, typ, "network", "sender_dropped");
Err(near_chain::Error::Other("Sender dropped".to_owned()))
},
}
Expand Down
66 changes: 52 additions & 14 deletions chain/client/src/sync/state/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,18 @@ use near_primitives::types::{EpochId, ShardId};
use near_store::adapter::{StoreAdapter, StoreUpdateAdapter};
use near_store::flat::{FlatStorageReadyStatus, FlatStorageStatus};
use near_store::{DBCol, ShardUId, Store};
use rand::prelude::SliceRandom;
use rand::thread_rng;
use std::sync::{Arc, Mutex};
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;

#[derive(Debug, Clone)]
pub enum StateToSync {
HeaderOnly,
Full,
}

pub(super) struct StateSyncShardHandle {
pub status: Arc<Mutex<ShardSyncStatus>>,
pub result: oneshot::Receiver<Result<(), near_chain::Error>>,
Expand Down Expand Up @@ -56,6 +64,7 @@ pub(super) async fn run_state_sync_for_shard(
store: Store,
shard_id: ShardId,
sync_hash: CryptoHash,
state_to_sync: StateToSync,
downloader: Arc<StateSyncDownloader>,
runtime: Arc<dyn RuntimeAdapter>,
epoch_manager: Arc<dyn EpochManagerAdapter>,
Expand All @@ -68,6 +77,12 @@ pub(super) async fn run_state_sync_for_shard(
tracing::info!("Running state sync for shard {}", shard_id);
*status.lock().unwrap() = ShardSyncStatus::StateDownloadHeader;
let header = downloader.ensure_shard_header(shard_id, sync_hash, cancel.clone()).await?;
match state_to_sync {
StateToSync::HeaderOnly => {
return Ok(());
}
StateToSync::Full => {}
};
let state_root = header.chunk_prev_state_root();
let num_parts = header.num_state_parts();
let block_header =
Expand All @@ -82,20 +97,43 @@ pub(super) async fn run_state_sync_for_shard(

return_if_cancelled!(cancel);
*status.lock().unwrap() = ShardSyncStatus::StateDownloadParts;
tokio_stream::iter(0..num_parts)
.map(|part_id| {
let future = downloader.ensure_shard_part_downloaded(
shard_id,
sync_hash,
part_id,
header.clone(),
cancel.clone(),
);
respawn_for_parallelism(&*future_spawner, "state sync download part", future)
})
.buffer_unordered(MAX_PARALLELISM_PER_SHARD_FOR_FAIRNESS)
.try_collect::<Vec<_>>()
.await?;
let mut parts_to_download: Vec<u64> = (0..num_parts).collect();
{
// Peer selection is designed such that requests for the same state parts are made to the
// same hosts, allowing the system to benefit from caching of parts on the host side. At
// the start of an epoch a number of nodes begin state sync at the same time. If we don't
// shuffle the order in which parts are requested, they will all request the same parts
// around the same time, producing spikes of traffic to particular hosts.
let mut rng = thread_rng();
parts_to_download.shuffle(&mut rng);
}
let mut attempt_count = 0;
while !parts_to_download.is_empty() {
let results = tokio_stream::iter(parts_to_download.clone())
.map(|part_id| {
let future = downloader.ensure_shard_part_downloaded_single_attempt(
shard_id,
sync_hash,
part_id,
attempt_count,
header.clone(),
cancel.clone(),
);
respawn_for_parallelism(&*future_spawner, "state sync download part", future)
})
.buffered(MAX_PARALLELISM_PER_SHARD_FOR_FAIRNESS)
.collect::<Vec<_>>()
.await;
attempt_count += 1;
// Update the list of parts_to_download retaining only the ones that failed
parts_to_download = results
.iter()
.enumerate()
.filter_map(|(task_index, res)| {
res.as_ref().err().map(|_| parts_to_download[task_index])
})
.collect();
}

return_if_cancelled!(cancel);
*status.lock().unwrap() = ShardSyncStatus::StateApplyInProgress;
Expand Down
Loading
Loading