Skip to content

Commit

Permalink
feat(storage): support online cache resize via risectl (#19677)
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <mrcroxx@outlook.com>
  • Loading branch information
MrCroxx authored Dec 9, 2024
1 parent f3ed1de commit 7005c05
Show file tree
Hide file tree
Showing 16 changed files with 177 additions and 29 deletions.
16 changes: 8 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ license = "Apache-2.0"
repository = "https://github.com/risingwavelabs/risingwave"

[workspace.dependencies]
foyer = { version = "0.13.0", features = ["tracing", "nightly", "prometheus"] }
foyer = { version = "0.13.1", features = ["tracing", "nightly", "prometheus"] }
apache-avro = { git = "https://github.com/risingwavelabs/avro", rev = "25113ba88234a9ae23296e981d8302c290fdaa4b", features = [
"snappy",
"zstandard",
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1736,7 +1736,7 @@ def section_streaming_actors(outer_panels: Panels):
"it's very likely to be the performance bottleneck",
[
panels.target(
# Here we use `min` but actually no much difference. Any of the sampled epoches makes sense.
# Here we use `min` but actually no much difference. Any of the sampled epochs makes sense.
f"min({metric('stream_actor_current_epoch')} != 0) by (fragment_id)",
"fragment {{fragment_id}}",
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions proto/compute.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ message ShowConfigResponse {
string stream_config = 2;
}

message ResizeCacheRequest {
uint64 meta_cache_capacity = 1;
uint64 data_cache_capacity = 2;
}

message ResizeCacheResponse {}

service ConfigService {
rpc ShowConfig(ShowConfigRequest) returns (ShowConfigResponse);
rpc ResizeCache(ResizeCacheRequest) returns (ResizeCacheResponse);
}
52 changes: 50 additions & 2 deletions src/compute/src/rpc/service/config_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,24 @@
// limitations under the License.
use std::sync::Arc;

use foyer::HybridCache;
use risingwave_batch::task::BatchManager;
use risingwave_common::error::tonic::ToTonicStatus;
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_pb::compute::config_service_server::ConfigService;
use risingwave_pb::compute::{ShowConfigRequest, ShowConfigResponse};
use risingwave_pb::compute::{
ResizeCacheRequest, ResizeCacheResponse, ShowConfigRequest, ShowConfigResponse,
};
use risingwave_storage::hummock::{Block, Sstable, SstableBlockIndex};
use risingwave_stream::task::LocalStreamManager;
use thiserror_ext::AsReport;
use tonic::{Code, Request, Response, Status};

pub struct ConfigServiceImpl {
batch_mgr: Arc<BatchManager>,
stream_mgr: LocalStreamManager,
meta_cache: Option<HybridCache<HummockSstableObjectId, Box<Sstable>>>,
block_cache: Option<HybridCache<SstableBlockIndex, Box<Block>>>,
}

#[async_trait::async_trait]
Expand All @@ -42,13 +50,53 @@ impl ConfigService for ConfigServiceImpl {
};
Ok(Response::new(show_config_response))
}

async fn resize_cache(
&self,
request: Request<ResizeCacheRequest>,
) -> Result<Response<ResizeCacheResponse>, Status> {
let req = request.into_inner();

if let Some(meta_cache) = &self.meta_cache
&& req.meta_cache_capacity > 0
{
match meta_cache.memory().resize(req.meta_cache_capacity as _) {
Ok(_) => tracing::info!(
"resize meta cache capacity to {:?}",
req.meta_cache_capacity
),
Err(e) => return Err(Status::internal(e.to_report_string())),
}
}

if let Some(block_cache) = &self.block_cache
&& req.data_cache_capacity > 0
{
match block_cache.memory().resize(req.data_cache_capacity as _) {
Ok(_) => tracing::info!(
"resize data cache capacity to {:?}",
req.data_cache_capacity
),
Err(e) => return Err(Status::internal(e.to_report_string())),
}
}

Ok(Response::new(ResizeCacheResponse {}))
}
}

impl ConfigServiceImpl {
pub fn new(batch_mgr: Arc<BatchManager>, stream_mgr: LocalStreamManager) -> Self {
pub fn new(
batch_mgr: Arc<BatchManager>,
stream_mgr: LocalStreamManager,
meta_cache: Option<HybridCache<HummockSstableObjectId, Box<Sstable>>>,
block_cache: Option<HybridCache<SstableBlockIndex, Box<Block>>>,
) -> Self {
Self {
batch_mgr,
stream_mgr,
meta_cache,
block_cache,
}
}
}
6 changes: 3 additions & 3 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,10 +404,10 @@ pub async fn compute_node_serve(
let monitor_srv = MonitorServiceImpl::new(
stream_mgr.clone(),
config.server.clone(),
meta_cache,
block_cache,
meta_cache.clone(),
block_cache.clone(),
);
let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr.clone());
let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr.clone(), meta_cache, block_cache);
let health_srv = HealthServiceImpl::new();

let telemetry_manager = TelemetryManager::new(
Expand Down
2 changes: 2 additions & 0 deletions src/ctl/src/cmd_impl/hummock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod compaction_group;
mod list_version_deltas;
mod migrate_legacy_object;
mod pause_resume;
mod resize_cache;
mod tiered_cache_tracing;
mod trigger_full_gc;
mod trigger_manual_compaction;
Expand All @@ -31,6 +32,7 @@ pub use compaction_group::*;
pub use list_version_deltas::*;
pub use migrate_legacy_object::migrate_legacy_object;
pub use pause_resume::*;
pub use resize_cache::*;
pub use tiered_cache_tracing::*;
pub use trigger_full_gc::*;
pub use trigger_manual_compaction::*;
Expand Down
64 changes: 64 additions & 0 deletions src/ctl/src/cmd_impl/hummock/resize_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::process::exit;

use futures::future::try_join_all;
use risingwave_pb::compute::ResizeCacheRequest;
use risingwave_pb::meta::GetClusterInfoResponse;
use risingwave_rpc_client::ComputeClient;
use thiserror_ext::AsReport;

use crate::common::CtlContext;

macro_rules! fail {
($($arg:tt)*) => {{
println!($($arg)*);
exit(1);
}};
}

pub async fn resize_cache(
context: &CtlContext,
meta_cache_capacity: Option<u64>,
data_cache_capacity: Option<u64>,
) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;

let GetClusterInfoResponse { worker_nodes, .. } = match meta_client.get_cluster_info().await {
Ok(resp) => resp,
Err(e) => {
fail!("Failed to get cluster info: {}", e.as_report());
}
};

let futures = worker_nodes.iter().map(|worker| async {
let addr = worker.get_host().expect("worker host must be set");
let client = ComputeClient::new(addr.into())
.await
.unwrap_or_else(|_| panic!("Cannot open client to compute node {addr:?}"));
client
.resize_cache(ResizeCacheRequest {
meta_cache_capacity: meta_cache_capacity.unwrap_or(0),
data_cache_capacity: data_cache_capacity.unwrap_or(0),
})
.await
});

if let Err(e) = try_join_all(futures).await {
fail!("Failed to resize cache: {}", e.as_report())
}

Ok(())
}
18 changes: 18 additions & 0 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,12 @@ enum HummockCommands {
#[clap(long, default_value = "100")]
concurrency: u32,
},
ResizeCache {
#[clap(long)]
meta_cache_capacity_mb: Option<u64>,
#[clap(long)]
data_cache_capacity_mb: Option<u64>,
},
}

#[derive(Subcommand)]
Expand Down Expand Up @@ -732,6 +738,18 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
}) => {
migrate_legacy_object(url, source_dir, target_dir, concurrency).await?;
}
Commands::Hummock(HummockCommands::ResizeCache {
meta_cache_capacity_mb,
data_cache_capacity_mb,
}) => {
const MIB: u64 = 1024 * 1024;
cmd_impl::hummock::resize_cache(
context,
meta_cache_capacity_mb.map(|v| v * MIB),
data_cache_capacity_mb.map(|v| v * MIB),
)
.await?
}
Commands::Table(TableCommands::Scan {
mv_name,
data_dir,
Expand Down
14 changes: 13 additions & 1 deletion src/rpc_client/src/compute_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use risingwave_common::util::tracing::TracingContext;
use risingwave_pb::batch_plan::{PlanFragment, TaskId, TaskOutputId};
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::compute::config_service_client::ConfigServiceClient;
use risingwave_pb::compute::{ShowConfigRequest, ShowConfigResponse};
use risingwave_pb::compute::{
ResizeCacheRequest, ResizeCacheResponse, ShowConfigRequest, ShowConfigResponse,
};
use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
use risingwave_pb::monitor_service::{
AnalyzeHeapRequest, AnalyzeHeapResponse, GetBackPressureRequest, GetBackPressureResponse,
Expand Down Expand Up @@ -277,6 +279,16 @@ impl ComputeClient {
.map_err(RpcError::from_compute_status)?
.into_inner())
}

pub async fn resize_cache(&self, request: ResizeCacheRequest) -> Result<ResizeCacheResponse> {
Ok(self
.config_client
.to_owned()
.resize_cache(request)
.await
.map_err(RpcError::from_compute_status)?
.into_inner())
}
}

#[async_trait]
Expand Down
4 changes: 0 additions & 4 deletions src/storage/src/hummock/sstable_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,10 +611,6 @@ impl SstableStore {
);
}

pub fn get_meta_memory_usage(&self) -> u64 {
self.meta_cache.memory().usage() as _
}

pub fn get_prefetch_memory_usage(&self) -> usize {
self.prefetch_buffer_usage.load(Ordering::Acquire)
}
Expand Down
10 changes: 5 additions & 5 deletions src/storage/src/hummock/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ impl HummockMemoryCollector {

impl MemoryCollector for HummockMemoryCollector {
fn get_meta_memory_usage(&self) -> u64 {
self.sstable_store.get_meta_memory_usage()
self.sstable_store.meta_cache().memory().usage() as _
}

fn get_data_memory_usage(&self) -> u64 {
Expand All @@ -701,13 +701,13 @@ impl MemoryCollector for HummockMemoryCollector {
}

fn get_meta_cache_memory_usage_ratio(&self) -> f64 {
self.sstable_store.get_meta_memory_usage() as f64
/ (self.storage_memory_config.meta_cache_capacity_mb * 1024 * 1024) as f64
self.sstable_store.meta_cache().memory().usage() as f64
/ self.sstable_store.meta_cache().memory().capacity() as f64
}

fn get_block_cache_memory_usage_ratio(&self) -> f64 {
self.get_data_memory_usage() as f64
/ (self.storage_memory_config.block_cache_capacity_mb * 1024 * 1024) as f64
self.sstable_store.block_cache().memory().usage() as f64
/ self.sstable_store.block_cache().memory().capacity() as f64
}

fn get_shared_buffer_usage_ratio(&self) -> f64 {
Expand Down

0 comments on commit 7005c05

Please sign in to comment.