Skip to content

Commit

Permalink
Simple copy-target Reuse (#2749)
Browse files Browse the repository at this point in the history
* Simple copy-target reuse

* Order

* Add check for session_id

* Changelog

* More Docs
  • Loading branch information
DmitryDodzin authored Sep 12, 2024
1 parent 54fcc95 commit e61fc82
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add patch to allow a user to reuse copy-target and fix issue where prelauch commands in intellij prevented execution.
79 changes: 61 additions & 18 deletions mirrord/operator/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use conn_wrapper::ConnectionWrapper;
use error::{OperatorApiError, OperatorApiResult, OperatorOperation};
use http::{request::Request, HeaderName, HeaderValue};
use k8s_openapi::api::apps::v1::Deployment;
use kube::{api::PostParams, Api, Client, Config, Resource};
use kube::{
api::{ListParams, PostParams},
Api, Client, Config, Resource,
};
use mirrord_analytics::{AnalyticsHash, AnalyticsOperatorProperties, Reporter};
use mirrord_auth::{
certificate::Certificate,
Expand Down Expand Up @@ -669,7 +672,16 @@ impl OperatorApi<PreparedClientCert> {
let scale_down = config.feature.copy_target.scale_down;
let namespace = self.target_namespace(config);
let copied = self
.copy_target(target, scale_down, namespace, &config.feature.split_queues)
.copy_target(
target,
scale_down,
namespace,
config
.feature
.split_queues
.is_set()
.then(|| config.feature.split_queues.clone()),
)
.await?;

copy_subtask.success(Some("target copied"));
Expand Down Expand Up @@ -722,8 +734,14 @@ impl OperatorApi<PreparedClientCert> {
Ok(OperatorSessionConnection { session, tx, rx })
}

/// Creates a new [`CopyTargetCrd`] resource using the operator.
/// This should create a new dummy pod out of the given [`Target`].
/// Returns client cert's public key in a base64 encoded string (no padding same like in
/// operator logic)
pub fn get_user_id_str(&self) -> String {
general_purpose::STANDARD_NO_PAD.encode(self.client_cert.cert.public_key_data())
}

/// Creates a new or reuses existing [`CopyTargetCrd`] resource using the operator.
/// If new this should create a new dummy pod out of the given [`Target`].
///
/// # Note
///
Expand All @@ -735,22 +753,47 @@ impl OperatorApi<PreparedClientCert> {
target: Target,
scale_down: bool,
namespace: &str,
split_queues: &SplitQueuesConfig,
split_queues: Option<SplitQueuesConfig>,
) -> OperatorApiResult<CopyTargetCrd> {
let name = TargetCrd::urlfied_name(&target);

let requested = CopyTargetCrd::new(
&name,
CopyTargetSpec {
target,
idle_ttl: Some(Self::COPIED_POD_IDLE_TTL),
scale_down,
split_queues: Some(split_queues.clone()),
},
);
let user_id = self.get_user_id_str();

let copy_target_api: Api<CopyTargetCrd> = Api::namespaced(self.client.clone(), namespace);

let existing_copy_targets =
copy_target_api
.list(&ListParams::default())
.await
.map_err(|error| OperatorApiError::KubeError {
error,
operation: OperatorOperation::CopyingTarget,
})?;

Api::namespaced(self.client.clone(), namespace)
.create(&PostParams::default(), &requested)
let copy_target_name = TargetCrd::urlfied_name(&target);
let copy_target_spec = CopyTargetSpec {
target,
idle_ttl: Some(Self::COPIED_POD_IDLE_TTL),
scale_down,
split_queues,
};

if let Some(copy_target) = existing_copy_targets.items.into_iter().find(|copy_target| {
copy_target.spec == copy_target_spec
&& copy_target
.status
.as_ref()
.map(|status| status.creator_session.user_id.as_ref() == Some(&user_id))
.unwrap_or(false)
}) {
tracing::debug!(?copy_target, "reusing copy_target");

return Ok(copy_target);
}

copy_target_api
.create(
&PostParams::default(),
&CopyTargetCrd::new(&copy_target_name, copy_target_spec),
)
.await
.map_err(|error| OperatorApiError::KubeError {
error,
Expand Down
11 changes: 10 additions & 1 deletion mirrord/operator/src/crd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ pub struct Session {
pub target: String,
pub namespace: Option<String>,
pub locked_ports: Option<Vec<(u16, String, Option<String>)>>,
pub user_id: Option<String>,
}

/// Resource used to access the operator's session management routes.
Expand Down Expand Up @@ -292,12 +293,13 @@ impl From<&OperatorFeatures> for NewOperatorFeature {

/// This [`Resource`](kube::Resource) represents a copy pod created from an existing [`Target`]
/// (operator's copy pod feature).
#[derive(CustomResource, Clone, Debug, Deserialize, Serialize, JsonSchema)]
#[derive(CustomResource, Clone, Debug, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
#[kube(
group = "operator.metalbear.co",
version = "v1",
kind = "CopyTarget",
root = "CopyTargetCrd",
status = "CopyTargetStatus",
namespaced
)]
pub struct CopyTargetSpec {
Expand All @@ -313,6 +315,13 @@ pub struct CopyTargetSpec {
pub split_queues: Option<SplitQueuesConfig>,
}

/// This is the `status` field for [`CopyTargetCrd`].
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
pub struct CopyTargetStatus {
/// The session object of the original session that created this CopyTarget
pub creator_session: Session,
}

/// Features and operations that can be blocked by a `MirrordPolicy`.
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)]
#[serde(rename_all = "kebab-case")] // StealWithoutFilter -> steal-without-filter in yaml.
Expand Down

0 comments on commit e61fc82

Please sign in to comment.