Skip to content

Commit

Permalink
[catpowder] Feature: Add port-based filtering to catpowder backend
Browse files Browse the repository at this point in the history
  • Loading branch information
kyleholohan committed Oct 24, 2024
1 parent dafa989 commit cd3e281
Show file tree
Hide file tree
Showing 9 changed files with 316 additions and 30 deletions.
8 changes: 7 additions & 1 deletion src/rust/catpowder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@
mod win;

#[cfg(target_os = "windows")]
pub use win::runtime::SharedCatpowderRuntime;
pub use win::{
runtime::SharedCatpowderRuntime,
transport::SharedCatpowderTransport,
};

#[cfg(target_os = "linux")]
mod linux;

#[cfg(target_os = "linux")]
pub use linux::LinuxRuntime as SharedCatpowderRuntime;

#[cfg(target_os = "linux")]
pub use crate::inetstack::SharedInetStack as SharedCatpowderTransport;
1 change: 1 addition & 0 deletions src/rust/catpowder/win/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ mod socket;
//======================================================================================================================

pub mod runtime;
pub mod transport;
22 changes: 21 additions & 1 deletion src/rust/catpowder/win/ring/rule/rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

use crate::{
catpowder::win::{ring::rule::params::XdpRedirectParams, socket::XdpSocket},
inetstack::protocols::Protocol,
runtime::libxdp,
};
use ::std::mem;
Expand All @@ -24,7 +25,7 @@ pub struct XdpRule(libxdp::XDP_RULE);
//======================================================================================================================

impl XdpRule {
/// Creates a new XDP rule for the target socket.
/// Creates a new XDP rule for the target socket which filters all traffic.
pub fn new(socket: &XdpSocket) -> Self {
let redirect: XdpRedirectParams = XdpRedirectParams::new(socket);
let rule: libxdp::XDP_RULE = unsafe {
Expand All @@ -39,4 +40,23 @@ impl XdpRule {
};
Self(rule)
}

/// Creates a new XDP rule for the target socket which filters for a specific (protocol, port) combination.
pub fn new_for_dest(socket: &XdpSocket, protocol: Protocol, port: u16) -> Self {
let redirect: XdpRedirectParams = XdpRedirectParams::new(socket);
let rule: libxdp::XDP_RULE = unsafe {
let mut rule: libxdp::XDP_RULE = std::mem::zeroed();
rule.Match = match protocol {
Protocol::Udp => libxdp::_XDP_MATCH_TYPE_XDP_MATCH_UDP_DST,
Protocol::Tcp => libxdp::_XDP_MATCH_TYPE_XDP_MATCH_TCP_DST,
};
rule.Action = libxdp::_XDP_RULE_ACTION_XDP_PROGRAM_ACTION_REDIRECT;
*rule.Pattern.Port.as_mut() = port;
// Perform bitwise copy from redirect to rule, as this is a union field.
*rule.__bindgen_anon_1.Redirect.as_mut() = mem::transmute_copy(redirect.as_ref());

rule
};
Self(rule)
}
}
36 changes: 26 additions & 10 deletions src/rust/catpowder/win/ring/rx_ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
},
socket::XdpSocket,
},
inetstack::protocols::Protocol,
runtime::{fail::Fail, libxdp, limits},
};
use ::std::{cell::RefCell, rc::Rc};
Expand All @@ -26,6 +27,10 @@ use ::std::{cell::RefCell, rc::Rc};

/// A ring for receiving packets.
pub struct RxRing {
/// Index of the interface for the ring.
ifindex: u32,
/// Index of the queue for the ring.
queueid: u32,
/// A user memory region where receive buffers are stored.
mem: Rc<RefCell<UmemReg>>,
/// A ring for receiving packets.
Expand All @@ -35,7 +40,7 @@ pub struct RxRing {
/// Underlying XDP socket.
_socket: XdpSocket, // NOTE: we keep this here to prevent the socket from being dropped.
/// Underlying XDP program.
_program: XdpProgram, // NOTE: we keep this here to prevent the program from being dropped.
_program: Option<XdpProgram>, // NOTE: we keep this here to prevent the program from being dropped.
}

//======================================================================================================================
Expand Down Expand Up @@ -111,25 +116,36 @@ impl RxRing {
unsafe { *b = 0 };
rx_fill_ring.producer_submit(length);

Ok(Self {
ifindex,
queueid,
mem,
rx_ring,
rx_fill_ring,
_socket: socket,
_program: None,
})
}

/// Update the RxRing to use the specified rules for filtering.
pub fn reprogram(&mut self, api: &mut XdpApi, rules: &[(Protocol, u16)]) -> Result<(), Fail> {
// Create XDP program.
trace!("creating xdp program");
const XDP_INSPECT_RX: libxdp::XDP_HOOK_ID = libxdp::XDP_HOOK_ID {
Layer: libxdp::_XDP_HOOK_LAYER_XDP_HOOK_L2,
Direction: libxdp::_XDP_HOOK_DATAPATH_DIRECTION_XDP_HOOK_RX,
SubLayer: libxdp::_XDP_HOOK_SUBLAYER_XDP_HOOK_INSPECT,
};
let rules: Vec<XdpRule> = vec![XdpRule::new(&socket)];
let program: XdpProgram = XdpProgram::new(api, &rules, ifindex, &XDP_INSPECT_RX, queueid, 0)?;
let mut xdp_rules: Vec<XdpRule> = Vec::with_capacity(rules.len());
for (protocol, port) in rules.iter() {
xdp_rules.push(XdpRule::new_for_dest(&self._socket, *protocol, *port));
}

let program: XdpProgram = XdpProgram::new(api, &xdp_rules, self.ifindex, &XDP_INSPECT_RX, self.queueid, 0)?;
trace!("xdp program created");

Ok(Self {
mem,
rx_ring,
rx_fill_ring,
_socket: socket,
_program: program,
})
self._program = Some(program);
Ok(())
}

/// Reserves a consumer slot in the rx ring.
Expand Down
77 changes: 75 additions & 2 deletions src/rust/catpowder/win/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
},
demi_sgarray_t, demi_sgaseg_t,
demikernel::config::Config,
inetstack::protocols::{layer1::PhysicalLayer, MAX_HEADER_SIZE},
inetstack::protocols::{layer1::PhysicalLayer, layer4::Socket, Protocol, MAX_HEADER_SIZE},
runtime::{
fail::Fail,
libxdp,
Expand All @@ -24,6 +24,7 @@ use crate::{
use ::arrayvec::ArrayVec;
use ::libc::c_void;
use ::std::{borrow::BorrowMut, mem};
use std::{borrow::Borrow, net::SocketAddr};
use windows::Win32::{
Foundation::ERROR_INSUFFICIENT_BUFFER,
System::SystemInformation::{
Expand All @@ -44,6 +45,7 @@ struct CatpowderRuntimeInner {
api: XdpApi,
tx: TxRing,
rx_rings: Vec<RxRing>,
rules: Vec<(Protocol, u16)>,
}
//======================================================================================================================
// Implementations
Expand Down Expand Up @@ -85,7 +87,78 @@ impl SharedCatpowderRuntime {
}
trace!("Created {} RX rings.", rx_rings.len());

Ok(Self(SharedObject::new(CatpowderRuntimeInner { api, tx, rx_rings })))
Ok(Self(SharedObject::new(CatpowderRuntimeInner {
api,
tx,
rx_rings,
rules: vec![],
})))
}

/// Reprograms the runtime with the current rule set. If new_rules is None, the current ruleset
/// is used. If roll_back is true, the ruleset for each updated queue is rolled back to the
/// previous rule set if any error occurs during reprogramming. stop_idx is the index of the
/// last queue to reprogram, useful when rolling back partially updated rule sets.
fn reprogram(
&mut self,
new_rules: Option<&Vec<(Protocol, u16)>>,
roll_back: bool,
stop_idx: usize,
) -> Result<(), Fail> {
let inner_self: &mut CatpowderRuntimeInner = self.0.borrow_mut();
let mut err: Option<Fail> = None;

for (idx, rx) in inner_self.rx_rings.iter_mut().take(stop_idx).enumerate() {
if let Err(e) = rx.reprogram(&mut inner_self.api, &new_rules.unwrap_or(&inner_self.rules)) {
if roll_back {
if let Err(sub_err) = self.reprogram(None, false, idx) {
error!("Failed to roll back rule set: {:?}", sub_err);
}
return Err(e);
} else {
err = Some(e);
}
}
}

err.map_or(Ok(()), |e| Err(e))
}

/// Updates the rule set to a new rule vector, triggering a reprogram of the XDP runtime. If
/// roll_back is false, the rule set is updated even if the reprogram fails.
fn update_rule_set(&mut self, new_rules: Vec<(Protocol, u16)>, roll_back: bool) -> Result<(), Fail> {
let result: Result<(), Fail> = self.reprogram(Some(&new_rules), roll_back, self.0.borrow().rx_rings.len());
if result.is_ok() || !roll_back {
self.0.borrow_mut().rules = new_rules;
}
result
}

/// Adds a rule to the current rule set, triggering a reprogram of the XDP runtime.
fn add_rule(&mut self, protocol: Protocol, port: u16) -> Result<(), Fail> {
let inner_self: &mut CatpowderRuntimeInner = self.0.borrow_mut();
let mut rules: Vec<(Protocol, u16)> = inner_self.rules.clone();
rules.push((protocol, port));
self.update_rule_set(rules, true)
}

/// Removes a rule from the current rule set, triggering a reprogram of the XDP runtime.
pub fn remove_rule(&mut self, protocol: Protocol, port: u16) -> Result<(), Fail> {
let inner_self: &mut CatpowderRuntimeInner = self.0.borrow_mut();
let mut rules: Vec<(Protocol, u16)> = inner_self.rules.clone();
rules.retain(|(p, prt)| *p != protocol || *prt != port);
self.update_rule_set(rules, false)
}

/// Add a rule for a port binding, triggering a reprogram of the XDP runtime. On success,
pub fn bind(&mut self, socket: &Socket, local: SocketAddr) -> Result<(Protocol, u16), Fail> {
let protocol: Protocol = match socket {
Socket::Udp(_) => Protocol::Udp,
Socket::Tcp(_) => Protocol::Tcp,
};

self.add_rule(protocol, local.port())?;
Ok((protocol, local.port()))
}
}

Expand Down
Loading

0 comments on commit cd3e281

Please sign in to comment.