Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DNS resolution for IPv6 #3023

Merged
merged 24 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,21 @@ In order to test IPv6 on a local cluster on macOS, you can use Kind:
3. `kind create cluster --config kind-config.yaml`
4. When you run `kubectl get svc -o wide --all-namespaces` you should see IPv6 addresses.

In order to use an agent image from a local registry, you can load the image to kind's registry with:

```
kind load docker-image test:latest
```

In order to test on EKS, I used this blueprint: https://github.com/aws-ia/terraform-aws-eks-blueprints/tree/main/patterns/ipv6-eks-cluster

After creating the cluster, I had to give myself permissions to the K8s objects, I did that via the AWS console (in the browser).
Feel free to add instructions on how to make that "manual" step unnecessary.

IPv6 tests (they currently don't run in the CI):
- steal_http_ipv6_traffic
- connect_to_kubernetes_api_service_over_ipv6


### Cleanup

Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions changelog.d/2958.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support for in-cluster DNS resolution of IPv6 addresses.
68 changes: 59 additions & 9 deletions mirrord/agent/src/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{future, path::PathBuf, time::Duration};
use futures::{stream::FuturesOrdered, StreamExt};
use hickory_resolver::{system_conf::parse_resolv_conf, Hosts, Resolver};
use mirrord_protocol::{
dns::{DnsLookup, GetAddrInfoRequest, GetAddrInfoResponse},
dns::{DnsLookup, GetAddrInfoRequest, GetAddrInfoRequestV2, GetAddrInfoResponse},
DnsLookupError, RemoteResult, ResolveErrorKindInternal, ResponseError,
};
use tokio::{
Expand All @@ -21,9 +21,24 @@ use crate::{
watched_task::TaskStatus,
};

#[derive(Debug)]
pub(crate) enum ClientGetAddrInfoRequest {
V1(GetAddrInfoRequest),
V2(GetAddrInfoRequestV2),
}

impl ClientGetAddrInfoRequest {
pub(crate) fn into_v2(self) -> GetAddrInfoRequestV2 {
match self {
ClientGetAddrInfoRequest::V1(old_req) => old_req.into(),
ClientGetAddrInfoRequest::V2(v2_req) => v2_req,
}
}
}

#[derive(Debug)]
pub(crate) struct DnsCommand {
request: GetAddrInfoRequest,
request: ClientGetAddrInfoRequest,
response_tx: oneshot::Sender<RemoteResult<DnsLookup>>,
}

Expand All @@ -34,6 +49,7 @@ pub(crate) struct DnsWorker {
request_rx: Receiver<DnsCommand>,
attempts: usize,
timeout: Duration,
support_ipv6: bool,
}

impl DnsWorker {
Expand All @@ -45,7 +61,11 @@ impl DnsWorker {
/// # Note
///
/// `pid` is used to find the correct path of `etc` directory.
pub(crate) fn new(pid: Option<u64>, request_rx: Receiver<DnsCommand>) -> Self {
pub(crate) fn new(
pid: Option<u64>,
request_rx: Receiver<DnsCommand>,
support_ipv6: bool,
) -> Self {
let etc_path = pid
.map(|pid| {
PathBuf::from("/proc")
Expand All @@ -66,6 +86,7 @@ impl DnsWorker {
.ok()
.and_then(|attempts| attempts.parse().ok())
.unwrap_or(1),
support_ipv6,
}
}

Expand All @@ -79,9 +100,10 @@ impl DnsWorker {
#[tracing::instrument(level = Level::TRACE, ret, err(level = Level::TRACE))]
async fn do_lookup(
etc_path: PathBuf,
host: String,
request: GetAddrInfoRequestV2,
attempts: usize,
timeout: Duration,
support_ipv6: bool,
) -> RemoteResult<DnsLookup> {
// Prepares the `Resolver` after reading some `/etc` DNS files.
//
Expand All @@ -94,13 +116,32 @@ impl DnsWorker {
let hosts_conf = fs::read(hosts_path).await?;

let (config, mut options) = parse_resolv_conf(resolv_conf)?;
tracing::debug!(?config, ?options, "parsed config options");
options.server_ordering_strategy =
hickory_resolver::config::ServerOrderingStrategy::UserProvidedOrder;
options.timeout = timeout;
options.attempts = attempts;
options.ip_strategy = hickory_resolver::config::LookupIpStrategy::Ipv4Only;
options.ip_strategy = if support_ipv6 {
tracing::debug!("IPv6 support enabled. Respecting client IP family.");
request
.family
.try_into()
.inspect_err(|e| {
tracing::error!(%e,
"Unknown address family in addrinfo request. Using IPv4 and IPv6.")
})
// If the agent gets some new, unknown variant of family address, it's the
// client's fault, so the agent queries both IPv4 and IPv6 and if that's not
// good enough for the client, the client can error out.
.unwrap_or(hickory_resolver::config::LookupIpStrategy::Ipv4AndIpv6)
} else {
tracing::debug!("IPv6 support disabled. Resolving IPv4 only.");
hickory_resolver::config::LookupIpStrategy::Ipv4Only
};
tracing::debug!(?config, ?options, "updated config options");

let mut resolver = Resolver::tokio(config, options);
tracing::debug!(?resolver, "tokio resolver");

let mut hosts = Hosts::default();
hosts.read_hosts_conf(hosts_conf.as_slice())?;
Expand All @@ -111,9 +152,10 @@ impl DnsWorker {

let lookup = resolver
.inspect_err(|fail| tracing::error!(?fail, "Failed to build DNS resolver"))?
.lookup_ip(host)
.lookup_ip(request.node)
.await
.inspect(|lookup| tracing::trace!(?lookup, "Lookup finished"))?
.inspect(|lookup| tracing::trace!(?lookup, "Lookup finished"))
.inspect_err(|e| tracing::trace!(%e, "lookup failed"))?
.into();

Ok(lookup)
Expand All @@ -125,8 +167,16 @@ impl DnsWorker {
let etc_path = self.etc_path.clone();
let timeout = self.timeout;
let attempts = self.attempts;
let support_ipv6 = self.support_ipv6;
let lookup_future = async move {
let result = Self::do_lookup(etc_path, message.request.node, attempts, timeout).await;
let result = Self::do_lookup(
etc_path,
message.request.into_v2(),
attempts,
timeout,
support_ipv6,
)
.await;

if let Err(result) = message.response_tx.send(result) {
tracing::error!(?result, "Failed to send query response");
Expand Down Expand Up @@ -174,7 +224,7 @@ impl DnsApi {
/// Results of scheduled requests are available via [`Self::recv`] (order is preserved).
pub(crate) async fn make_request(
&mut self,
request: GetAddrInfoRequest,
request: ClientGetAddrInfoRequest,
) -> Result<(), AgentError> {
let (response_tx, response_rx) = oneshot::channel();

Expand Down
14 changes: 11 additions & 3 deletions mirrord/agent/src/entrypoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
};

use client_connection::AgentTlsConnector;
use dns::{DnsCommand, DnsWorker};
use dns::{ClientGetAddrInfoRequest, DnsCommand, DnsWorker};
use futures::TryFutureExt;
use mirrord_protocol::{ClientMessage, DaemonMessage, GetEnvVarsRequest, LogMessage};
use sniffer::tcp_capture::RawSocketTcpCapture;
Expand Down Expand Up @@ -433,7 +433,14 @@ impl ClientConnectionHandler {
.await?
}
ClientMessage::GetAddrInfoRequest(request) => {
self.dns_api.make_request(request).await?;
self.dns_api
.make_request(ClientGetAddrInfoRequest::V1(request))
.await?;
}
ClientMessage::GetAddrInfoRequestV2(request) => {
self.dns_api
.make_request(ClientGetAddrInfoRequest::V2(request))
.await?;
}
ClientMessage::Ping => self.respond(DaemonMessage::Pong).await?,
ClientMessage::Tcp(message) => {
Expand Down Expand Up @@ -613,7 +620,8 @@ async fn start_agent(args: Args) -> Result<()> {
let cancellation_token = cancellation_token.clone();
let watched_task = WatchedTask::new(
DnsWorker::TASK_NAME,
DnsWorker::new(state.container_pid(), dns_command_rx).run(cancellation_token),
DnsWorker::new(state.container_pid(), dns_command_rx, args.ipv6)
.run(cancellation_token),
);
let status = watched_task.status();
let task = run_thread_in_namespace(
Expand Down
2 changes: 1 addition & 1 deletion mirrord/config/src/feature/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
util::MirrordToggleableConfig,
};

const IPV6_ENV_VAR: &str = "MIRRORD_INCOMING_ENABLE_IPV6";
const IPV6_ENV_VAR: &str = "MIRRORD_ENABLE_IPV6";

pub mod dns;
pub mod filter;
Expand Down
8 changes: 4 additions & 4 deletions mirrord/intproxy/protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{

use bincode::{Decode, Encode};
use mirrord_protocol::{
dns::{GetAddrInfoRequest, GetAddrInfoResponse},
dns::{GetAddrInfoRequestV2, GetAddrInfoResponse},
file::*,
outgoing::SocketAddress,
tcp::StealType,
Expand Down Expand Up @@ -44,7 +44,7 @@ pub enum LayerToProxyMessage {
/// A file operation request.
File(FileRequest),
/// A DNS request.
GetAddrInfo(GetAddrInfoRequest),
GetAddrInfo(GetAddrInfoRequestV2),
/// A request to initiate a new outgoing connection.
OutgoingConnect(OutgoingConnectRequest),
/// Requests related to incoming connections.
Expand Down Expand Up @@ -210,7 +210,7 @@ pub enum ProxyToLayerMessage {
NewSession(LayerId),
/// A response to layer's [`FileRequest`].
File(FileResponse),
/// A response to layer's [`GetAddrInfoRequest`].
/// A response to layer's [`GetAddrInfoRequestV2`].
GetAddrInfo(GetAddrInfoResponse),
/// A response to layer's [`OutgoingConnectRequest`].
OutgoingConnect(RemoteResult<OutgoingConnectResponse>),
Expand Down Expand Up @@ -428,7 +428,7 @@ impl_request!(
);

impl_request!(
req = GetAddrInfoRequest,
req = GetAddrInfoRequestV2,
res = GetAddrInfoResponse,
req_path = LayerToProxyMessage::GetAddrInfo,
res_path = ProxyToLayerMessage::GetAddrInfo,
Expand Down
2 changes: 1 addition & 1 deletion mirrord/intproxy/src/background_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! The proxy utilizes multiple background tasks to split the code into more self-contained parts.
//! Structs in this module aim to ease managing their state.
//!
//! Each background task implement the [`BackgroundTask`] trait, which specifies its properties and
//! Each background task implements the [`BackgroundTask`] trait, which specifies its properties and
//! allows for managing groups of related tasks with one [`BackgroundTasks`] instance.

use std::{collections::HashMap, fmt, future::Future, hash::Hash};
Expand Down
7 changes: 7 additions & 0 deletions mirrord/intproxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,13 @@ impl IntProxy {
.send(FilesProxyMessage::ProtocolVersion(protocol_version.clone()))
.await;

self.task_txs
.simple
.send(SimpleProxyMessage::ProtocolVersion(
protocol_version.clone(),
))
.await;

self.task_txs
.incoming
.send(IncomingProxyMessage::AgentProtocolVersion(protocol_version))
Expand Down
47 changes: 41 additions & 6 deletions mirrord/intproxy/src/proxies/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use std::collections::HashMap;

use mirrord_intproxy_protocol::{LayerId, MessageId, ProxyToLayerMessage};
use mirrord_protocol::{
dns::{GetAddrInfoRequest, GetAddrInfoResponse},
dns::{AddressFamily, GetAddrInfoRequestV2, GetAddrInfoResponse, ADDRINFO_V2_VERSION},
ClientMessage, DaemonMessage, GetEnvVarsRequest, RemoteResult,
};
use semver::Version;
use thiserror::Error;

use crate::{
Expand All @@ -20,10 +21,12 @@ use crate::{

#[derive(Debug)]
pub enum SimpleProxyMessage {
AddrInfoReq(MessageId, LayerId, GetAddrInfoRequest),
AddrInfoReq(MessageId, LayerId, GetAddrInfoRequestV2),
AddrInfoRes(GetAddrInfoResponse),
GetEnvReq(MessageId, LayerId, GetEnvVarsRequest),
GetEnvRes(RemoteResult<HashMap<String, String>>),
/// Protocol version was negotiated with the agent.
ProtocolVersion(Version),
}

#[derive(Error, Debug)]
Expand All @@ -34,10 +37,27 @@ pub struct SimpleProxyError(#[from] UnexpectedAgentMessage);
/// Run as a [`BackgroundTask`].
#[derive(Default)]
pub struct SimpleProxy {
/// For [`GetAddrInfoRequest`]s.
/// For [`GetAddrInfoRequestV2`]s.
addr_info_reqs: RequestQueue,
/// For [`GetEnvVarsRequest`]s.
get_env_reqs: RequestQueue,
/// [`mirrord_protocol`] version negotiated with the agent.
/// Determines whether we can use `GetAddrInfoRequestV2`.
protocol_version: Option<Version>,
}

impl SimpleProxy {
#[tracing::instrument(skip(self), level = tracing::Level::TRACE)]
fn set_protocol_version(&mut self, version: Version) {
self.protocol_version.replace(version);
}

/// Returns whether [`mirrord_protocol`] version allows for a V2 addrinfo request.
fn addr_info_v2(&self) -> bool {
self.protocol_version
.as_ref()
.is_some_and(|version| ADDRINFO_V2_VERSION.matches(version))
}
}

impl BackgroundTask for SimpleProxy {
Expand All @@ -52,9 +72,23 @@ impl BackgroundTask for SimpleProxy {
match msg {
SimpleProxyMessage::AddrInfoReq(message_id, session_id, req) => {
self.addr_info_reqs.push_back(message_id, session_id);
message_bus
.send(ClientMessage::GetAddrInfoRequest(req))
.await;
if self.addr_info_v2() {
message_bus
.send(ClientMessage::GetAddrInfoRequestV2(req))
.await;
} else {
if matches!(req.family, AddressFamily::Ipv6Only) {
tracing::warn!(
"The agent version you're using does not support DNS\
queries for IPv6 addresses. This version will only fetch IPv4\
address. Please update to a newer agent image for better IPv6\
support."
)
}
message_bus
.send(ClientMessage::GetAddrInfoRequest(req.into()))
t4lz marked this conversation as resolved.
Show resolved Hide resolved
.await;
}
}
SimpleProxyMessage::AddrInfoRes(res) => {
let (message_id, layer_id) =
Expand Down Expand Up @@ -88,6 +122,7 @@ impl BackgroundTask for SimpleProxy {
})
.await
}
SimpleProxyMessage::ProtocolVersion(version) => self.set_protocol_version(version),
}
}

Expand Down
4 changes: 3 additions & 1 deletion mirrord/layer/src/file/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use mirrord_protocol::{
ResponseError,
};
use rand::distributions::{Alphanumeric, DistString};
use tracing::{error, trace, Level};
#[cfg(debug_assertions)]
use tracing::Level;
use tracing::{error, trace};

use super::{hooks::FN_OPEN, open_dirs::OPEN_DIRS, *};
#[cfg(target_os = "linux")]
Expand Down
Loading
Loading