Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HTTP route filter for setting client IP headers #1817

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 53 additions & 40 deletions linkerd/app/inbound/src/policy/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,12 @@ where
None => err!(self.mk_route_not_found()),
Some(Routes::Http(routes)) => {
let (permit, mtch, route) = try_fut!(self.authorize(&routes, &req));
try_fut!(apply_http_filters(mtch, route, &mut req));
try_fut!(self.apply_http_filters(mtch, route, &mut req));
permit
}
Some(Routes::Grpc(routes)) => {
let (permit, _, route) = try_fut!(self.authorize(&routes, &req));
try_fut!(apply_grpc_filters(route, &mut req));
try_fut!(self.apply_grpc_filters(route, &mut req));
permit
}
};
Expand Down Expand Up @@ -252,59 +252,72 @@ impl<T, N> HttpPolicyService<T, N> {
.route_not_found(labels, self.connection.dst, self.connection.tls.clone());
HttpRouteNotFound(()).into()
}
}

fn apply_http_filters<B>(
r#match: http::RouteMatch,
route: &http::Policy,
req: &mut ::http::Request<B>,
) -> Result<()> {
// TODO Do any metrics apply here?
for filter in &route.filters {
match filter {
http::Filter::InjectFailure(fail) => {
if let Some(http::filter::FailureResponse { status, message }) = fail.apply() {
return Err(HttpRouteInjectedFailure { status, message }.into());
fn apply_http_filters<B>(
&self,
r#match: http::RouteMatch,
route: &http::Policy,
req: &mut ::http::Request<B>,
) -> Result<()> {
// TODO Do any metrics apply here?
for filter in &route.filters {
match filter {
http::Filter::InjectFailure(fail) => {
if let Some(http::filter::FailureResponse { status, message }) = fail.apply() {
return Err(HttpRouteInjectedFailure { status, message }.into());
}
}
}

http::Filter::Redirect(redir) => match redir.apply(req.uri(), &r#match) {
Ok(Some(http::filter::Redirection { status, location })) => {
return Err(HttpRouteRedirect { status, location }.into());
}
http::Filter::Redirect(redir) => match redir.apply(req.uri(), &r#match) {
Ok(Some(http::filter::Redirection { status, location })) => {
return Err(HttpRouteRedirect { status, location }.into());
}

Err(invalid) => {
return Err(HttpRouteInvalidRedirect(invalid).into());
}
Err(invalid) => {
return Err(HttpRouteInvalidRedirect(invalid).into());
}

Ok(None) => {
tracing::debug!("Ignoring irrelevant redirect");
}
},

Ok(None) => {
tracing::debug!("Ignoring irrelevant redirect");
http::Filter::RequestHeaders(rh) => {
rh.apply(req.headers_mut());
}
},

http::Filter::RequestHeaders(rh) => {
rh.apply(req.headers_mut());
http::Filter::ClientIpHeaders(c) => {
c.apply(self.connection.client.ip(), req.headers_mut());
}
}
}

Ok(())
}

Ok(())
}
fn apply_grpc_filters<B>(
&self,
route: &grpc::Policy,
req: &mut ::http::Request<B>,
) -> Result<()> {
for filter in &route.filters {
match filter {
grpc::Filter::InjectFailure(fail) => {
if let Some(grpc::filter::FailureResponse { code, message }) = fail.apply() {
return Err(GrpcRouteInjectedFailure { code, message }.into());
}
}

fn apply_grpc_filters<B>(route: &grpc::Policy, req: &mut ::http::Request<B>) -> Result<()> {
for filter in &route.filters {
match filter {
grpc::Filter::InjectFailure(fail) => {
if let Some(grpc::filter::FailureResponse { code, message }) = fail.apply() {
return Err(GrpcRouteInjectedFailure { code, message }.into());
grpc::Filter::RequestHeaders(rh) => {
rh.apply(req.headers_mut());
}
}

grpc::Filter::RequestHeaders(rh) => {
rh.apply(req.headers_mut());
grpc::Filter::ClientIpHeaders(c) => {
c.apply(self.connection.client.ip(), req.headers_mut());
}
}
}
}

Ok(())
Ok(())
}
}
83 changes: 75 additions & 8 deletions linkerd/app/inbound/src/policy/http/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::*;
use crate::policy::{Authentication, Authorization, Meta, Protocol, ServerPolicy};
use linkerd_app_core::{svc::Service, Infallible};
use std::sync::Arc;
use std::{net, sync::Arc};

macro_rules! conn {
($client:expr, $dst:expr) => {{
Expand All @@ -15,10 +15,12 @@ macro_rules! conn {
}
}};
() => {{
conn!([192, 168, 3, 3], [192, 168, 3, 4])
conn!(CLIENT_IP, [192, 168, 3, 4])
}};
}

const CLIENT_IP: net::IpAddr = net::IpAddr::V4(net::Ipv4Addr::new(192, 168, 3, 3));

macro_rules! new_svc {
($proto:expr, $conn:expr, $rsp:expr) => {{
let (policy, tx) = AllowPolicy::for_test(
Expand Down Expand Up @@ -81,7 +83,7 @@ async fn http_route() {
policy: Policy {
authorizations: Arc::new([Authorization {
authentication: Authentication::Unauthenticated,
networks: vec![std::net::IpAddr::from([192, 168, 3, 3]).into()],
networks: vec![CLIENT_IP.into()],
meta: Arc::new(Meta::Resource {
group: "policy.linkerd.io".into(),
kind: "server".into(),
Expand Down Expand Up @@ -162,7 +164,7 @@ async fn http_filter_header() {
policy: Policy {
authorizations: Arc::new([Authorization {
authentication: Authentication::Unauthenticated,
networks: vec![std::net::IpAddr::from([192, 168, 3, 3]).into()],
networks: vec![CLIENT_IP.into()],
meta: Arc::new(Meta::Resource {
group: "policy.linkerd.io".into(),
kind: "server".into(),
Expand Down Expand Up @@ -206,6 +208,71 @@ async fn http_filter_header() {
assert_eq!(permit.labels.route.route, rmeta);
}

#[tokio::test(flavor = "current_thread")]
async fn http_filter_client_ip() {
use linkerd_server_policy::http::{filter, r#match::MatchRequest, Filter, Policy, Route, Rule};

let rmeta = Arc::new(Meta::Resource {
group: "gateway.networking.k8s.io".into(),
kind: "httproute".into(),
name: "testrt".into(),
});
let proto = Protocol::Http1(Arc::new([Route {
hosts: vec![],
rules: vec![Rule {
matches: vec![MatchRequest {
method: Some(::http::Method::GET),
..MatchRequest::default()
}],
policy: Policy {
authorizations: Arc::new([Authorization {
authentication: Authentication::Unauthenticated,
networks: vec![CLIENT_IP.into()],
meta: Arc::new(Meta::Resource {
group: "policy.linkerd.io".into(),
kind: "server".into(),
name: "testsaz".into(),
}),
}]),
filters: vec![Filter::ClientIpHeaders(filter::ClientIpHeaders {
headers: vec![(
"X-Forwarded-For".parse().unwrap(),
filter::client_ip_headers::Action::Add,
)],
})],
meta: rmeta.clone(),
},
}],
}]));
let inner = |permit: HttpRoutePermit, req: ::http::Request<hyper::Body>| -> Result<_> {
assert_eq!(req.headers().len(), 1);
assert_eq!(
req.headers().get("X-Forwarded-For"),
Some(&CLIENT_IP.to_string().parse().unwrap())
);
let mut rsp = ::http::Response::builder()
.body(hyper::Body::default())
.unwrap();
rsp.extensions_mut().insert(permit);
Ok(rsp)
};
let (mut svc, _tx) = new_svc!(proto, conn!(), inner);

let rsp = svc
.call(
::http::Request::builder()
.body(hyper::Body::default())
.unwrap(),
)
.await
.expect("serves");
let permit = rsp
.extensions()
.get::<HttpRoutePermit>()
.expect("permitted");
assert_eq!(permit.labels.route.route, rmeta);
}

#[tokio::test(flavor = "current_thread")]
async fn http_filter_inject_failure() {
use linkerd_server_policy::http::{filter, r#match::MatchRequest, Filter, Policy, Route, Rule};
Expand All @@ -225,7 +292,7 @@ async fn http_filter_inject_failure() {
policy: Policy {
authorizations: Arc::new([Authorization {
authentication: Authentication::Unauthenticated,
networks: vec![std::net::IpAddr::from([192, 168, 3, 3]).into()],
networks: vec![CLIENT_IP.into()],
meta: Arc::new(Meta::Resource {
group: "policy.linkerd.io".into(),
kind: "server".into(),
Expand Down Expand Up @@ -291,7 +358,7 @@ async fn grpc_route() {
policy: Policy {
authorizations: Arc::new([Authorization {
authentication: Authentication::Unauthenticated,
networks: vec![std::net::IpAddr::from([192, 168, 3, 3]).into()],
networks: vec![CLIENT_IP.into()],
meta: Arc::new(Meta::Resource {
group: "policy.linkerd.io".into(),
kind: "server".into(),
Expand Down Expand Up @@ -389,7 +456,7 @@ async fn grpc_filter_header() {
policy: Policy {
authorizations: Arc::new([Authorization {
authentication: Authentication::Unauthenticated,
networks: vec![std::net::IpAddr::from([192, 168, 3, 3]).into()],
networks: vec![CLIENT_IP.into()],
meta: Arc::new(Meta::Resource {
group: "policy.linkerd.io".into(),
kind: "server".into(),
Expand Down Expand Up @@ -462,7 +529,7 @@ async fn grpc_filter_inject_failure() {
policy: Policy {
authorizations: Arc::new([Authorization {
authentication: Authentication::Unauthenticated,
networks: vec![std::net::IpAddr::from([192, 168, 3, 3]).into()],
networks: vec![CLIENT_IP.into()],
meta: Arc::new(Meta::Resource {
group: "policy.linkerd.io".into(),
kind: "server".into(),
Expand Down
2 changes: 1 addition & 1 deletion linkerd/http-route/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ regex = "1"
rand = "0.8"
thiserror = "1"
tracing = "0.1"
url = "2"
url = "2"
2 changes: 2 additions & 0 deletions linkerd/http-route/src/http/filter.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
pub mod client_ip_headers;
pub mod inject_failure;
pub mod modify_header;
pub mod redirect;

pub use self::{
client_ip_headers::ClientIpHeaders,
inject_failure::{Distribution, FailureResponse, InjectFailure},
modify_header::ModifyHeader,
redirect::{InvalidRedirect, RedirectRequest, Redirection},
Expand Down
40 changes: 40 additions & 0 deletions linkerd/http-route/src/http/filter/client_ip_headers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use http::header::{HeaderMap, HeaderName, HeaderValue};
use std::net::IpAddr;

/// Adds or sets HTTP headers containing the client's IP address.
///
/// This is typically used to add headers such as
/// `Forwarded-For`, `X-Forwarded-For`, and friends.
#[derive(Clone, Debug, Default, Hash, PartialEq, Eq)]
pub struct ClientIpHeaders {
pub headers: Vec<(HeaderName, Action)>,
}

#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
pub enum Action {
Add,
Set,
}

// === impl ForwardedFor ===

impl ClientIpHeaders {
pub fn apply(&self, client_addr: IpAddr, headers: &mut HeaderMap) {
if self.headers.is_empty() {
return;
}

let value = HeaderValue::try_from(client_addr.to_string())
.expect("an IP address should format as a valid header value");
for (header, action) in &self.headers {
match action {
Action::Add => {
headers.append(header.clone(), value.clone());
}
Action::Set => {
headers.insert(header.clone(), value.clone());
}
}
}
}
}
1 change: 1 addition & 0 deletions linkerd/server-policy/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub type Rule = grpc::Rule<Policy>;
pub enum Filter {
InjectFailure(filter::InjectFailure),
RequestHeaders(http::filter::ModifyHeader),
ClientIpHeaders(http::filter::ClientIpHeaders),
}

#[inline]
Expand Down
1 change: 1 addition & 0 deletions linkerd/server-policy/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub enum Filter {
InjectFailure(filter::InjectFailure),
Redirect(filter::RedirectRequest),
RequestHeaders(filter::ModifyHeader),
ClientIpHeaders(filter::ClientIpHeaders),
}

#[inline]
Expand Down