Skip to content

Commit

Permalink
fix: update sysinfo and fix fd leak on linux (#4163)
Browse files Browse the repository at this point in the history
* fix: update sysinfo and fix fd leak on linux

* chore: unify kill local process into a process mod
  • Loading branch information
fraidev authored Sep 3, 2024
1 parent b2b8eed commit 5e97695
Show file tree
Hide file tree
Showing 13 changed files with 211 additions and 160 deletions.
58 changes: 50 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ sha2 = { version = "0.10" }
siphasher = "1.0.0"
static_assertions = "1.1.0"
syn = "2.0"
sysinfo = { version = "0.30.13", default-features = false }
sysinfo = { version = "0.31.4", default-features = false, features = ["system"] }
tar = { version = "0.4.38", default-features = false }
tempfile = "3.4.0"
thiserror = "1.0.30"
Expand Down
5 changes: 3 additions & 2 deletions crates/cdk/src/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,9 @@ mod local_index {
table.load_preset(comfy_table::presets::NOTHING);
table.set_header(LIST_TABLE_HEADERS);

sysinfo::set_open_files_limit(0);
let mut system = sysinfo::System::new();
system.refresh_processes();
system.refresh_processes(sysinfo::ProcessesToUpdate::All);
for connector in self.entries {
let status = self.operator.status(&connector)?;
let Entry::Local {
Expand Down Expand Up @@ -524,7 +525,7 @@ mod local_index {
impl Default for LocalProcesses {
fn default() -> Self {
let mut system: sysinfo::System = Default::default();
system.refresh_processes();
system.refresh_processes(sysinfo::ProcessesToUpdate::All);
Self { system }
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ duct = { workspace = true, optional = true }
comfy-table = { workspace = true, optional = true }
flate2 = { workspace = true, optional = true }
tar = { workspace = true , optional = true }
sysinfo = { workspace = true, default-features = false }
sysinfo = { workspace = true, default-features = false, features = ["system", "network", "disk"] }


# External Fluvio dependencies
Expand Down
5 changes: 3 additions & 2 deletions crates/fluvio-cluster/src/check/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,10 +686,11 @@ struct LocalClusterCheck;
#[async_trait]
impl ClusterCheck for LocalClusterCheck {
async fn perform_check(&self, _pb: &ProgressRenderer) -> CheckResult {
sysinfo::set_open_files_limit(0);
let mut sys = System::new();
sys.refresh_processes(); // Only load what we need.
sys.refresh_processes(sysinfo::ProcessesToUpdate::All); // Only load what we need.
let proc_count = sys
.processes_by_exact_name("fluvio-run")
.processes_by_exact_name("fluvio-run".as_ref())
.map(|x| println!(" found existing fluvio-run process. pid: {}", x.pid()))
.count();
if proc_count > 0 {
Expand Down
25 changes: 16 additions & 9 deletions crates/fluvio-cluster/src/cli/diagnostics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use crate::cli::ClusterCliError;
use crate::cli::start::default_log_directory;
use crate::start::local::{DEFAULT_DATA_DIR as DEFAULT_LOCAL_DIR, DEFAULT_METADATA_SUB_DIR};

const FLUVIO_PROCESS_NAME: &str = "fluvio";

#[derive(Parser, Debug)]
pub struct DiagnosticsOpt {
#[arg(long)]
Expand Down Expand Up @@ -155,7 +157,7 @@ impl DiagnosticsOpt {
// Filter for only Fluvio pods
let pods = pods
.split(' ')
.filter(|pod| pod.contains("fluvio"))
.filter(|pod| pod.contains(FLUVIO_PROCESS_NAME))
.collect::<Vec<_>>();

for &pod in &pods {
Expand Down Expand Up @@ -196,7 +198,7 @@ impl DiagnosticsOpt {
// Filter for only Fluvio services
let objects = objects
.split(' ')
.filter(|obj| !filter_fluvio || obj.contains("fluvio"))
.filter(|obj| !filter_fluvio || obj.contains(FLUVIO_PROCESS_NAME))
.map(|name| name.trim())
.collect::<Vec<_>>();

Expand Down Expand Up @@ -262,6 +264,7 @@ impl DiagnosticsOpt {
Ok(())
};

sysinfo::set_open_files_limit(0);
let mut sys = System::new_all();
let mut net = Networks::new();

Expand Down Expand Up @@ -452,13 +455,17 @@ impl ProcessInfo {
let mut processes = Vec::new();

for (pid, process) in sys.processes() {
if process.name().contains("fluvio") {
processes.push(ProcessInfo {
pid: pid.as_u32(),
name: process.name().to_string(),
disk_usage: format!("{:?}", process.disk_usage()),
cmd: format!("{:?}", process.cmd()),
});
let process_name = process.name().to_str();

if let Some(process_name) = process_name {
if process_name.contains(FLUVIO_PROCESS_NAME) {
processes.push(ProcessInfo {
pid: pid.as_u32(),
name: process_name.to_string(),
disk_usage: format!("{:?}", process.disk_usage()),
cmd: format!("{:?}", process.cmd()),
});
}
}
}

Expand Down
58 changes: 10 additions & 48 deletions crates/fluvio-cluster/src/cli/shutdown.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use std::fs::remove_file;
use std::process::Command;

use anyhow::bail;
use anyhow::Result;
use clap::Parser;
use tracing::debug;
use sysinfo::System;

use fluvio_types::defaults::SPU_MONITORING_UNIX_SOCKET;
use fluvio_command::CommandExt;

use crate::process;
use crate::render::ProgressRenderer;
use crate::cli::ClusterCliError;
use crate::progress::ProgressBarFactory;
Expand All @@ -35,7 +34,7 @@ impl ShutdownOpt {

match installation_type {
InstallationType::Local | InstallationType::LocalK8 | InstallationType::ReadOnly => {
Self::kill_local_processes(&installation_type, &pb).await?;
Self::shutdown_local(&installation_type, &pb).await?;
}
InstallationType::Cloud => {
let profile = config.config().current_profile_name().unwrap_or("none");
Expand All @@ -49,60 +48,23 @@ impl ShutdownOpt {
Ok(())
}

async fn kill_local_processes(
async fn shutdown_local(
installation_type: &InstallationType,
pb: &ProgressRenderer,
) -> Result<()> {
pb.set_message("Uninstalling fluvio local components");

let kill_proc = |name: &str, command_args: Option<&[String]>| {
let mut sys = System::new();
sys.refresh_processes(); // Only load what we need.
for process in sys.processes_by_exact_name(name) {
if let Some(cmd_args) = command_args {
let proc_cmds = process.cmd();
if cmd_args.len() > proc_cmds.len() {
continue; // Ignore procs with less command_args than the target.
}
if cmd_args.iter().ne(proc_cmds[..cmd_args.len()].iter()) {
continue; // Ignore procs which don't match.
}
}
if !process.kill() {
// This will fail if called on a proc running as root, so only log failure.
debug!(
"Sysinto process.kill() returned false. pid: {}, name: {}: user: {:?}",
process.pid(),
process.name(),
process.user_id(),
);
}
}
};
kill_proc("fluvio", Some(&["cluster".into(), "run".into()]));
kill_proc("fluvio", Some(&["run".into()]));
kill_proc("fluvio-run", None);
process::kill_local_processes(pb).await?;

if let InstallationType::LocalK8 = installation_type {
let _ = Self::remove_custom_objects("spus", true);
}

// remove monitoring socket
match remove_file(SPU_MONITORING_UNIX_SOCKET) {
Ok(_) => {
pb.println(format!(
"Removed spu monitoring socket: {SPU_MONITORING_UNIX_SOCKET}"
));
}
Err(io_err) if io_err.kind() == std::io::ErrorKind::NotFound => {
debug!("SPU monitoring socket not found: {SPU_MONITORING_UNIX_SOCKET}");
}
Err(err) => {
pb.println(format!(
"SPU monitoring socket {SPU_MONITORING_UNIX_SOCKET}, can't be removed: {err}"
));
}
}
process::delete_fs(
Some(SPU_MONITORING_UNIX_SOCKET),
"SPU monitoring socket",
true,
Some(pb),
);

pb.println("Uninstalled fluvio local components");
pb.finish_and_clear();
Expand Down
Loading

0 comments on commit 5e97695

Please sign in to comment.