From 5e9769562142e44b0381477fa25a0365ba02c5a9 Mon Sep 17 00:00:00 2001 From: Felipe Cardozo Date: Mon, 2 Sep 2024 22:07:19 -0300 Subject: [PATCH] fix: update sysinfo and fix fd leak on linux (#4163) * fix: update sysinfo and fix fd leak on linux * chore: unify kill local process into a process mod --- Cargo.lock | 58 ++++++++-- Cargo.toml | 2 +- crates/cdk/src/deploy.rs | 5 +- crates/fluvio-cluster/Cargo.toml | 2 +- crates/fluvio-cluster/src/check/mod.rs | 5 +- crates/fluvio-cluster/src/cli/diagnostics.rs | 25 +++-- crates/fluvio-cluster/src/cli/shutdown.rs | 58 ++-------- crates/fluvio-cluster/src/delete.rs | 88 +-------------- crates/fluvio-cluster/src/lib.rs | 1 + crates/fluvio-cluster/src/process/mod.rs | 112 +++++++++++++++++++ crates/fluvio-sc/src/start.rs | 1 + crates/fluvio-spu/src/start.rs | 1 + crates/fluvio-test/src/main.rs | 13 ++- 13 files changed, 211 insertions(+), 160 deletions(-) create mode 100644 crates/fluvio-cluster/src/process/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 775db0b2ad..d9c9bd253c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4261,7 +4261,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core", + "windows-core 0.52.0", ] [[package]] @@ -7057,15 +7057,14 @@ dependencies = [ [[package]] name = "sysinfo" -version = "0.30.13" +version = "0.31.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a5b4ddaee55fb2bea2bf0e5000747e5f5c0de765e5a5ff87f4cd106439f4bb3" +checksum = "355dbe4f8799b304b05e1b0f05fc59b2a18d36645cf169607da45bde2f69a1be" dependencies = [ - "cfg-if", "core-foundation-sys", "libc", + "memchr", "ntapi", - "once_cell", "windows", ] @@ -8350,11 +8349,11 @@ dependencies = [ [[package]] name = "windows" -version = "0.52.0" +version = "0.57.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" +checksum = "12342cb4d8e3b046f3d80effd474a7a02447231330ef77d71daa6fbc40681143" dependencies = [ - "windows-core", + "windows-core 0.57.0", "windows-targets 0.52.6", ] @@ -8367,6 +8366,49 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-core" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2ed2439a290666cd67ecce2b0ffaad89c2a56b976b736e6ece670297897832d" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-result", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-implement" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.71", +] + +[[package]] +name = "windows-interface" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.71", +] + +[[package]] +name = "windows-result" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e383302e8ec8515204254685643de10811af0ed97ea37210dc26fb0032647f8" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.45.0" diff --git a/Cargo.toml b/Cargo.toml index eb5cc714f7..6f4e9711bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -131,7 +131,7 @@ sha2 = { version = "0.10" } siphasher = "1.0.0" static_assertions = "1.1.0" syn = "2.0" -sysinfo = { version = "0.30.13", default-features = false } +sysinfo = { version = "0.31.4", default-features = false, features = ["system"] } tar = { version = "0.4.38", default-features = false } tempfile = "3.4.0" thiserror = "1.0.30" diff --git a/crates/cdk/src/deploy.rs b/crates/cdk/src/deploy.rs index 84a77713ce..8a24999cec 100644 --- a/crates/cdk/src/deploy.rs +++ b/crates/cdk/src/deploy.rs @@ -472,8 +472,9 @@ mod local_index { table.load_preset(comfy_table::presets::NOTHING); table.set_header(LIST_TABLE_HEADERS); + sysinfo::set_open_files_limit(0); let mut system = sysinfo::System::new(); - system.refresh_processes(); + system.refresh_processes(sysinfo::ProcessesToUpdate::All); for connector in self.entries { let status = self.operator.status(&connector)?; let Entry::Local { @@ -524,7 +525,7 @@ mod local_index { impl Default for LocalProcesses { fn default() -> Self { let mut system: sysinfo::System = Default::default(); - system.refresh_processes(); + system.refresh_processes(sysinfo::ProcessesToUpdate::All); Self { system } } } diff --git a/crates/fluvio-cluster/Cargo.toml b/crates/fluvio-cluster/Cargo.toml index 027c45ca73..76d4b51f54 100644 --- a/crates/fluvio-cluster/Cargo.toml +++ b/crates/fluvio-cluster/Cargo.toml @@ -58,7 +58,7 @@ duct = { workspace = true, optional = true } comfy-table = { workspace = true, optional = true } flate2 = { workspace = true, optional = true } tar = { workspace = true , optional = true } -sysinfo = { workspace = true, default-features = false } +sysinfo = { workspace = true, default-features = false, features = ["system", "network", "disk"] } # External Fluvio dependencies diff --git a/crates/fluvio-cluster/src/check/mod.rs b/crates/fluvio-cluster/src/check/mod.rs index 89d77eaf7c..e09fa9e8f6 100644 --- a/crates/fluvio-cluster/src/check/mod.rs +++ b/crates/fluvio-cluster/src/check/mod.rs @@ -686,10 +686,11 @@ struct LocalClusterCheck; #[async_trait] impl ClusterCheck for LocalClusterCheck { async fn perform_check(&self, _pb: &ProgressRenderer) -> CheckResult { + sysinfo::set_open_files_limit(0); let mut sys = System::new(); - sys.refresh_processes(); // Only load what we need. + sys.refresh_processes(sysinfo::ProcessesToUpdate::All); // Only load what we need. let proc_count = sys - .processes_by_exact_name("fluvio-run") + .processes_by_exact_name("fluvio-run".as_ref()) .map(|x| println!(" found existing fluvio-run process. pid: {}", x.pid())) .count(); if proc_count > 0 { diff --git a/crates/fluvio-cluster/src/cli/diagnostics.rs b/crates/fluvio-cluster/src/cli/diagnostics.rs index c7cc1edc89..4788f0a783 100644 --- a/crates/fluvio-cluster/src/cli/diagnostics.rs +++ b/crates/fluvio-cluster/src/cli/diagnostics.rs @@ -17,6 +17,8 @@ use crate::cli::ClusterCliError; use crate::cli::start::default_log_directory; use crate::start::local::{DEFAULT_DATA_DIR as DEFAULT_LOCAL_DIR, DEFAULT_METADATA_SUB_DIR}; +const FLUVIO_PROCESS_NAME: &str = "fluvio"; + #[derive(Parser, Debug)] pub struct DiagnosticsOpt { #[arg(long)] @@ -155,7 +157,7 @@ impl DiagnosticsOpt { // Filter for only Fluvio pods let pods = pods .split(' ') - .filter(|pod| pod.contains("fluvio")) + .filter(|pod| pod.contains(FLUVIO_PROCESS_NAME)) .collect::>(); for &pod in &pods { @@ -196,7 +198,7 @@ impl DiagnosticsOpt { // Filter for only Fluvio services let objects = objects .split(' ') - .filter(|obj| !filter_fluvio || obj.contains("fluvio")) + .filter(|obj| !filter_fluvio || obj.contains(FLUVIO_PROCESS_NAME)) .map(|name| name.trim()) .collect::>(); @@ -262,6 +264,7 @@ impl DiagnosticsOpt { Ok(()) }; + sysinfo::set_open_files_limit(0); let mut sys = System::new_all(); let mut net = Networks::new(); @@ -452,13 +455,17 @@ impl ProcessInfo { let mut processes = Vec::new(); for (pid, process) in sys.processes() { - if process.name().contains("fluvio") { - processes.push(ProcessInfo { - pid: pid.as_u32(), - name: process.name().to_string(), - disk_usage: format!("{:?}", process.disk_usage()), - cmd: format!("{:?}", process.cmd()), - }); + let process_name = process.name().to_str(); + + if let Some(process_name) = process_name { + if process_name.contains(FLUVIO_PROCESS_NAME) { + processes.push(ProcessInfo { + pid: pid.as_u32(), + name: process_name.to_string(), + disk_usage: format!("{:?}", process.disk_usage()), + cmd: format!("{:?}", process.cmd()), + }); + } } } diff --git a/crates/fluvio-cluster/src/cli/shutdown.rs b/crates/fluvio-cluster/src/cli/shutdown.rs index a97a89dc64..f1c9b88cd3 100644 --- a/crates/fluvio-cluster/src/cli/shutdown.rs +++ b/crates/fluvio-cluster/src/cli/shutdown.rs @@ -1,15 +1,14 @@ -use std::fs::remove_file; use std::process::Command; use anyhow::bail; use anyhow::Result; use clap::Parser; use tracing::debug; -use sysinfo::System; use fluvio_types::defaults::SPU_MONITORING_UNIX_SOCKET; use fluvio_command::CommandExt; +use crate::process; use crate::render::ProgressRenderer; use crate::cli::ClusterCliError; use crate::progress::ProgressBarFactory; @@ -35,7 +34,7 @@ impl ShutdownOpt { match installation_type { InstallationType::Local | InstallationType::LocalK8 | InstallationType::ReadOnly => { - Self::kill_local_processes(&installation_type, &pb).await?; + Self::shutdown_local(&installation_type, &pb).await?; } InstallationType::Cloud => { let profile = config.config().current_profile_name().unwrap_or("none"); @@ -49,60 +48,23 @@ impl ShutdownOpt { Ok(()) } - async fn kill_local_processes( + async fn shutdown_local( installation_type: &InstallationType, pb: &ProgressRenderer, ) -> Result<()> { - pb.set_message("Uninstalling fluvio local components"); - - let kill_proc = |name: &str, command_args: Option<&[String]>| { - let mut sys = System::new(); - sys.refresh_processes(); // Only load what we need. - for process in sys.processes_by_exact_name(name) { - if let Some(cmd_args) = command_args { - let proc_cmds = process.cmd(); - if cmd_args.len() > proc_cmds.len() { - continue; // Ignore procs with less command_args than the target. - } - if cmd_args.iter().ne(proc_cmds[..cmd_args.len()].iter()) { - continue; // Ignore procs which don't match. - } - } - if !process.kill() { - // This will fail if called on a proc running as root, so only log failure. - debug!( - "Sysinto process.kill() returned false. pid: {}, name: {}: user: {:?}", - process.pid(), - process.name(), - process.user_id(), - ); - } - } - }; - kill_proc("fluvio", Some(&["cluster".into(), "run".into()])); - kill_proc("fluvio", Some(&["run".into()])); - kill_proc("fluvio-run", None); + process::kill_local_processes(pb).await?; if let InstallationType::LocalK8 = installation_type { let _ = Self::remove_custom_objects("spus", true); } // remove monitoring socket - match remove_file(SPU_MONITORING_UNIX_SOCKET) { - Ok(_) => { - pb.println(format!( - "Removed spu monitoring socket: {SPU_MONITORING_UNIX_SOCKET}" - )); - } - Err(io_err) if io_err.kind() == std::io::ErrorKind::NotFound => { - debug!("SPU monitoring socket not found: {SPU_MONITORING_UNIX_SOCKET}"); - } - Err(err) => { - pb.println(format!( - "SPU monitoring socket {SPU_MONITORING_UNIX_SOCKET}, can't be removed: {err}" - )); - } - } + process::delete_fs( + Some(SPU_MONITORING_UNIX_SOCKET), + "SPU monitoring socket", + true, + Some(pb), + ); pb.println("Uninstalled fluvio local components"); pb.finish_and_clear(); diff --git a/crates/fluvio-cluster/src/delete.rs b/crates/fluvio-cluster/src/delete.rs index 431f81cd45..0904d90350 100644 --- a/crates/fluvio-cluster/src/delete.rs +++ b/crates/fluvio-cluster/src/delete.rs @@ -1,22 +1,17 @@ -use std::path::Path; use std::process::Command; -use std::fs::{remove_dir_all, remove_file}; use derive_builder::Builder; use k8_client::meta_client::MetadataClient; use tracing::{info, warn, debug, instrument}; -use sysinfo::System; use fluvio_command::CommandExt; -use fluvio_types::defaults::SPU_MONITORING_UNIX_SOCKET; use crate::helm::HelmClient; use crate::charts::{APP_CHART_NAME, SYS_CHART_NAME}; use crate::progress::ProgressBarFactory; use crate::render::ProgressRenderer; -use crate::DEFAULT_NAMESPACE; +use crate::{process, DEFAULT_NAMESPACE}; use crate::error::UninstallError; -use crate::start::local::{DEFAULT_DATA_DIR, LOCAL_CONFIG_PATH}; use anyhow::Result; /// Uninstalls different flavors of fluvio @@ -146,90 +141,17 @@ impl ClusterUninstaller { async fn uninstall_local(&self) -> Result<()> { let pb = self.pb_factory.create()?; - pb.set_message("Uninstalling fluvio local components"); - - let kill_proc = |name: &str, command_args: Option<&[String]>| { - let mut sys = System::new(); - sys.refresh_processes(); // Only load what we need. - for process in sys.processes_by_exact_name(name) { - if let Some(cmd_args) = command_args { - // First command is the executable so cut that out. - let proc_cmds = &process.cmd(); - if cmd_args.len() > proc_cmds.len() { - continue; // Ignore procs with less command_args than the target. - } - if cmd_args.iter().ne(proc_cmds[..cmd_args.len()].iter()) { - continue; // Ignore procs which don't match. - } - } - if !process.kill() { - // This will fail if called on a proc running as root, so only log failure. - debug!( - "Sysinto process.kill() returned false. pid: {}, name: {}: user: {:?}", - process.pid(), - process.name(), - process.user_id(), - ); - } - } - }; - kill_proc("fluvio", Some(&["cluster".into(), "run".into()])); - kill_proc("fluvio", Some(&["run".into()])); - kill_proc("fluvio-run", None); - - fn delete_fs>( - path: Option, - tag: &'static str, - is_file: bool, - pb: Option<&ProgressRenderer>, - ) { - match path { - Some(path) => { - let path_ref = path.as_ref(); - match if is_file { - remove_file(path_ref) - } else { - remove_dir_all(path_ref) - } { - Ok(_) => { - debug!("Removed {}: {}", tag, path_ref.display()); - if let Some(pb) = pb { - pb.println(format!("Removed {}", tag)) - } - } - Err(err) => { - warn!("{} can't be removed: {}", tag, err); - if let Some(pb) = pb { - pb.println(format!("{tag}, can't be removed: {err}")) - } - } - } - } - None => { - warn!("Unable to find {}, cannot remove", tag); - } - } - } + process::kill_local_processes(&pb).await?; // delete fluvio file debug!("Removing fluvio directory"); - delete_fs(DEFAULT_DATA_DIR.as_ref(), "data dir", false, None); + process::delete_data_dir(); // delete local cluster config file - delete_fs( - LOCAL_CONFIG_PATH.as_ref(), - "local cluster config", - true, - None, - ); + process::delete_local_config(); // remove monitoring socket - delete_fs( - Some(SPU_MONITORING_UNIX_SOCKET), - "SPU monitoring socket", - true, - Some(&pb), - ); + process::delete_spu_socket(); pb.println("Uninstalled fluvio local components"); pb.finish_and_clear(); diff --git a/crates/fluvio-cluster/src/lib.rs b/crates/fluvio-cluster/src/lib.rs index 023b3f5606..2fe76da357 100644 --- a/crates/fluvio-cluster/src/lib.rs +++ b/crates/fluvio-cluster/src/lib.rs @@ -34,6 +34,7 @@ mod delete; mod error; mod progress; pub mod runtime; +mod process; /// extensions #[cfg(feature = "cli")] diff --git a/crates/fluvio-cluster/src/process/mod.rs b/crates/fluvio-cluster/src/process/mod.rs new file mode 100644 index 0000000000..ffd51459e1 --- /dev/null +++ b/crates/fluvio-cluster/src/process/mod.rs @@ -0,0 +1,112 @@ +use std::ffi::OsString; +use std::fs::{remove_dir_all, remove_file}; +use std::path::Path; + +use fluvio_types::defaults::SPU_MONITORING_UNIX_SOCKET; +use sysinfo::System; +use anyhow::Result; + +use tracing::{debug, warn}; + +use crate::render::ProgressRenderer; +use crate::start::local::{DEFAULT_DATA_DIR, LOCAL_CONFIG_PATH}; + +pub async fn kill_local_processes(pb: &ProgressRenderer) -> Result<()> { + pb.set_message("Uninstalling fluvio local components"); + + let kill_proc = |name: &str, command_args: Option<&[String]>| { + sysinfo::set_open_files_limit(0); + let mut sys = System::new(); + sys.refresh_processes(sysinfo::ProcessesToUpdate::All); // Only load what we need. + for process in sys.processes_by_exact_name(name.as_ref()) { + if let Some(cmd_args) = command_args { + let proc_cmds = process.cmd(); + if cmd_args.len() > proc_cmds.len() { + continue; // Ignore procs with less command_args than the target. + } + if cmd_args + .iter() + .map(OsString::from) + .collect::>() + .iter() + .ne(proc_cmds[..cmd_args.len()].iter()) + { + continue; // Ignore procs which don't match. + } + } + if !process.kill() { + // This will fail if called on a proc running as root, so only log failure. + debug!( + "Sysinto process.kill() returned false. pid: {}, name: {}: user: {:?}", + process.pid(), + process.name().to_str().unwrap_or("unknown"), + process.user_id(), + ); + } + } + }; + kill_proc("fluvio", Some(&["cluster".into(), "run".into()])); + kill_proc("fluvio", Some(&["run".into()])); + kill_proc("fluvio-run", None); + + Ok(()) +} + +pub fn delete_fs>( + path: Option, + tag: &'static str, + is_file: bool, + pb: Option<&ProgressRenderer>, +) { + match path { + Some(path) => { + let path_ref = path.as_ref(); + match if is_file { + remove_file(path_ref) + } else { + remove_dir_all(path_ref) + } { + Ok(_) => { + debug!("Removed {}: {}", tag, path_ref.display()); + if let Some(pb) = pb { + pb.println(format!("Removed {}", tag)) + } + } + Err(io_err) if io_err.kind() == std::io::ErrorKind::NotFound => { + debug!("{} not found: {}", tag, path_ref.display()); + } + Err(err) => { + warn!("{} can't be removed: {}", tag, err); + if let Some(pb) = pb { + pb.println(format!("{tag}, can't be removed: {err}")) + } + } + } + } + None => { + warn!("Unable to find {}, cannot remove", tag); + } + } +} + +pub fn delete_spu_socket() { + delete_fs( + Some(SPU_MONITORING_UNIX_SOCKET), + "SPU monitoring socket", + true, + None, + ); +} + +pub fn delete_local_config() { + delete_fs( + LOCAL_CONFIG_PATH.as_ref(), + "local cluster config", + true, + None, + ); +} + +pub fn delete_data_dir() { + delete_fs(DEFAULT_DATA_DIR.as_ref(), "data dir", false, None); +} diff --git a/crates/fluvio-sc/src/start.rs b/crates/fluvio-sc/src/start.rs index 635fc9e1e1..950b812014 100644 --- a/crates/fluvio-sc/src/start.rs +++ b/crates/fluvio-sc/src/start.rs @@ -70,6 +70,7 @@ pub fn main_loop(opt: ScOpt) { fn inspect_system() { use sysinfo::System; + sysinfo::set_open_files_limit(0); let mut sys = System::new_all(); sys.refresh_all(); info!(version = crate::VERSION, "Platform"); diff --git a/crates/fluvio-spu/src/start.rs b/crates/fluvio-spu/src/start.rs index ce0c4d13ae..0b6e91d590 100644 --- a/crates/fluvio-spu/src/start.rs +++ b/crates/fluvio-spu/src/start.rs @@ -31,6 +31,7 @@ pub fn main_loop(opt: SpuOpt) { println!("starting spu server (id:{})", spu_config.id); + sysinfo::set_open_files_limit(0); let mut sys = System::new_all(); sys.refresh_all(); info!(version = crate::VERSION, "Platform"); diff --git a/crates/fluvio-test/src/main.rs b/crates/fluvio-test/src/main.rs index 14c4c9f1a6..7b2dd38ad5 100644 --- a/crates/fluvio-test/src/main.rs +++ b/crates/fluvio-test/src/main.rs @@ -94,10 +94,9 @@ fn run_test( // println!("supported signals: {:?}", System::SUPPORTED_SIGNALS); let root_pid = get_current_pid().expect("Unable to get current pid"); debug!(?root_pid, "current root pid"); + sysinfo::set_open_files_limit(0); let mut sys = System::new(); - if !sys.refresh_process(root_pid) { - panic!("Unable to refresh root"); - } + sys.refresh_processes(sysinfo::ProcessesToUpdate::Some(&[root_pid])); let root_process = sys.process(root_pid).expect("Unable to get root process"); let _child_pid = match fork::fork() { Ok(fork::Fork::Parent(child_pid)) => child_pid, @@ -198,8 +197,9 @@ fn run_test( /// kill all children of the root processes fn kill_child_processes(root_process: &Process) { let root_pid = root_process.pid(); + sysinfo::set_open_files_limit(0); let mut sys2 = System::new(); - sys2.refresh_processes(); + sys2.refresh_processes(sysinfo::ProcessesToUpdate::All); let g_id = root_process.group_id(); let processes = sys2.processes(); @@ -220,7 +220,7 @@ fn kill_child_processes(root_process: &Process) { for (pid, process) in processes { if pid != &root_pid && process.group_id() == g_id && is_root(process, root_pid, processes) { - println!("killing child test pid {} name {}", pid, process.name()); + println!("killing child test pid {} name {:?}", pid, process.name()); process.kill(); } } @@ -295,8 +295,9 @@ fn create_spinning_indicator() -> Option { fn get_parent_pid() -> sysinfo::Pid { let pid = get_current_pid().expect("Unable to get current pid"); + sysinfo::set_open_files_limit(0); let mut sys2 = System::new(); - sys2.refresh_processes(); + sys2.refresh_processes(sysinfo::ProcessesToUpdate::All); let current_process = sys2.process(pid).expect("Current process not found"); current_process.parent().expect("Parent process not found") }