Skip to content

Commit

Permalink
0.1.7rc2
Browse files Browse the repository at this point in the history
  • Loading branch information
maksimryndin committed May 22, 2024
1 parent 7d4e685 commit d85e59d
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 17 deletions.
19 changes: 10 additions & 9 deletions src/services/system/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::sync::{
};
use std::time::Duration;
use tokio::sync::mpsc::{self};
use tokio::sync::oneshot;
use tokio::task::JoinHandle;

pub const SYSTEM_SERVICE_NAME: &str = "system";
Expand Down Expand Up @@ -104,13 +103,14 @@ impl SystemService {
}
}

#[cfg(target_os = "linux")]
async fn ssh_observer(
is_shutdown: Arc<AtomicBool>,
sender: mpsc::Sender<TaskResult>,
messenger: Sender,
) {
tracing::info!("starting ssh monitoring");
let (tx, rx) = oneshot::channel::<()>();
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
std::thread::Builder::new()
.name("ssh-observer".into())
.spawn(move || ssh::process_sshd_log(is_shutdown, sender, messenger, tx))
Expand Down Expand Up @@ -187,7 +187,8 @@ impl Service for SystemService {

fn get_example_rules(&self) -> Vec<Datarow> {
let mut rows = Vec::with_capacity(3 + self.mounts.len() + 2 * self.process_names.len());
if cfg!(target_os = "linux") {
#[cfg(target_os = "linux")]
{
rows.push(
Rule {
log_name: ssh::SSH_LOG.to_string(),
Expand Down Expand Up @@ -314,16 +315,16 @@ impl Service for SystemService {
let scrape_interval = self.scrape_interval;
let scrape_timeout = self.scrape_timeout;

let mut tasks = if cfg!(target_os = "linux") {
let mut tasks = vec![];
#[cfg(target_os = "linux")]
{
let cloned_is_shutdown = is_shutdown.clone();
let cloned_sender = sender.clone();
let cloned_messenger = messenger.clone();
vec![tokio::spawn(async move {
tasks.push(tokio::spawn(async move {
Self::ssh_observer(cloned_is_shutdown, cloned_sender, cloned_messenger).await;
})]
} else {
vec![]
};
}));
}

tasks.push(tokio::spawn(async move {
Self::sys_observer(
Expand Down
117 changes: 109 additions & 8 deletions src/services/system/ssh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use chrono::{NaiveDateTime, Utc};
use lazy_static::lazy_static;
use logwatcher::{LogWatcher, LogWatcherAction, LogWatcherEvent};
use regex::Regex;
use std::collections::HashMap;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
Expand All @@ -16,6 +17,7 @@ use tracing::Level;
pub const SSH_LOG: &str = "ssh";
pub const SSH_LOG_STATUS: &str = "status";
pub const SSH_LOG_STATUS_CONNECTED: &str = "connected";
pub const SSH_LOG_STATUS_TERMINATED: &str = "terminated";

pub(super) fn process_sshd_log(
is_shutdown: Arc<AtomicBool>,
Expand All @@ -39,11 +41,25 @@ pub(super) fn process_sshd_log(
}
};

let mut connections = HashMap::new();

log_watcher.watch(&mut move |result| {
let result = match result {
Ok(event) => match event {
LogWatcherEvent::Line(line) => match parse(&line) {
Some(line) => Ok(Data::Single(line)),
Some(mut datarow) => {
lookup_connection(&mut datarow, &mut connections);
let Datavalue::Text(ref status) = datarow.data[4].1 else {
panic!("assert: ssh status is parsed")
};
if status == SSH_LOG_STATUS_CONNECTED && connections.len() > 100 {
let message =
format!("there are {} active ssh connections", connections.len());
tracing::warn!("{}", message);
messenger.send_nonblock(Notification::new(message, Level::WARN));
}
Ok(Data::Single(datarow))
}
None => {
return LogWatcherAction::None;
}
Expand All @@ -66,13 +82,69 @@ pub(super) fn process_sshd_log(
"assert: ssh monitoring messages queue shouldn't be closed before shutdown signal"
);
}

LogWatcherAction::None
});

let _ = tx.send(());
tracing::info!("exiting ssh monitoring thread");
}

struct Connection {
user_ip: String,
user_port: u32,
user_key: String,
}

fn lookup_connection(datarow: &mut Datarow, connections: &mut HashMap<u32, Connection>) {
let Datavalue::Text(ref status) = datarow.data[4].1 else {
panic!("assert: ssh status is parsed")
};
if status != SSH_LOG_STATUS_CONNECTED && status != SSH_LOG_STATUS_TERMINATED {
return;
}
let Datavalue::IntegerID(id) = datarow.data[0].1 else {
panic!("assert: ssh id is parsed")
};

// For terminated
if let Some(Connection {
user_ip,
user_port,
user_key,
}) = connections.remove(&id)
{
datarow.data[2].1 = Datavalue::Text(user_ip);
datarow.data[3].1 = Datavalue::IntegerID(user_port);
datarow.data[5].1 = Datavalue::Text(user_key);
return;
}

if status == SSH_LOG_STATUS_TERMINATED {
return;
}

// For connected
let Datavalue::Text(ref ip) = datarow.data[2].1 else {
panic!("assert: connected ssh user has an ip")
};
let Datavalue::IntegerID(port) = datarow.data[3].1 else {
panic!("assert: connected ssh user has a port")
};
let Datavalue::Text(ref key) = datarow.data[5].1 else {
panic!("assert: connected ssh user has a pubkey")
};

connections.insert(
id,
Connection {
user_ip: ip.to_string(),
user_port: port,
user_key: key.to_string(),
},
);
}

fn parse(line: &str) -> Option<Datarow> {
lazy_static! {
static ref RE: Regex = Regex::new(
Expand Down Expand Up @@ -107,9 +179,9 @@ fn parse(line: &str) -> Option<Datarow> {
})
.expect("assert: can get auth log datetime");
let id = cap.name("id").and_then(|d| d.as_str().parse().ok())?;
let ip = cap.name("ip").map(|d| d.as_str());
let ip = cap.name("ip").map(|d| d.as_str().to_string());
let port = cap.name("port").and_then(|d| d.as_str().parse().ok());
let key = cap.name("key").map(|d| d.as_str());
let key = cap.name("key").map(|d| d.as_str().to_string());

let (username, status) = if let Some(username) = cap.name("username_rejected") {
(Datavalue::Text(username.as_str().to_string()), "rejected")
Expand All @@ -119,7 +191,10 @@ fn parse(line: &str) -> Option<Datarow> {
SSH_LOG_STATUS_CONNECTED,
)
} else if let Some(username) = cap.name("username_terminated") {
(Datavalue::Text(username.as_str().to_string()), "terminated")
(
Datavalue::Text(username.as_str().to_string()),
SSH_LOG_STATUS_TERMINATED,
)
} else {
(Datavalue::NotAvailable, "timeout")
};
Expand All @@ -132,8 +207,7 @@ fn parse(line: &str) -> Option<Datarow> {
("user".to_string(), username),
(
"ip".to_string(),
ip.map(|ip| Datavalue::Text(ip.to_string()))
.unwrap_or(Datavalue::NotAvailable),
ip.map(Datavalue::Text).unwrap_or(Datavalue::NotAvailable),
),
(
"port".to_string(),
Expand All @@ -146,8 +220,7 @@ fn parse(line: &str) -> Option<Datarow> {
),
(
"pubkey".to_string(),
key.map(|key| Datavalue::Text(key.to_string()))
.unwrap_or(Datavalue::NotAvailable),
key.map(Datavalue::Text).unwrap_or(Datavalue::NotAvailable),
),
],
))
Expand Down Expand Up @@ -238,4 +311,32 @@ mod tests {
let line = "May 21 19:26:17 server1 sshd[59895]: pam_unix(sshd:session): session opened for user los(uid=1000) by (uid=0)";
assert!(parse(line).is_none());
}

#[test]
fn lookup_connections() {
let mut connections = HashMap::new();

let line = "May 21 11:22:18 server1 sshd[136063]: Accepted publickey for ubuntu from 77.222.27.80 port 17827 ssh2: RSA SHA256:D726XJ0DkstyhsyH2rAbfYuIaeBOa3Su2l2WWbyXnXs";
let mut parsed = parse(line).unwrap();
lookup_connection(&mut parsed, &mut connections);
assert_eq!(connections.len(), 1);

let line = "May 21 11:22:56 server1 sshd[136063]: pam_unix(sshd:session): session closed for user ubuntu";
let mut parsed = parse(line).unwrap();
lookup_connection(&mut parsed, &mut connections);
assert_eq!(connections.len(), 0);

assert_eq!(parsed.data[0].1, Datavalue::IntegerID(136063));
assert_eq!(parsed.data[1].1, Datavalue::Text("ubuntu".to_string()));
assert_eq!(
parsed.data[2].1,
Datavalue::Text("77.222.27.80".to_string())
);
assert_eq!(parsed.data[3].1, Datavalue::IntegerID(17827));
assert_eq!(parsed.data[4].1, Datavalue::Text("terminated".to_string()));
assert_eq!(
parsed.data[5].1,
Datavalue::Text("RSA SHA256:D726XJ0DkstyhsyH2rAbfYuIaeBOa3Su2l2WWbyXnXs".to_string())
);
}
}

0 comments on commit d85e59d

Please sign in to comment.