-
Notifications
You must be signed in to change notification settings - Fork 1k
/
Copy pathreassign.rs
126 lines (109 loc) · 3.75 KB
/
reassign.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use std::sync::Arc;
use anyhow::anyhow;
use graph::components::store::DeploymentLocator;
use graph::components::store::StoreEvent;
use graph::prelude::AssignmentChange;
use graph::prelude::NodeId;
use graph_store_postgres::command_support::catalog;
use graph_store_postgres::command_support::catalog::Site;
use graph_store_postgres::connection_pool::ConnectionPool;
use graph_store_postgres::NotificationSender;
use thiserror::Error;
use crate::deployment::DeploymentSelector;
use crate::deployment::DeploymentVersionSelector;
use crate::GraphmanError;
pub struct Deployment {
locator: DeploymentLocator,
site: Site,
}
impl Deployment {
pub fn locator(&self) -> &DeploymentLocator {
&self.locator
}
pub fn assigned_node(
&self,
primary_pool: ConnectionPool,
) -> Result<Option<NodeId>, GraphmanError> {
let primary_conn = primary_pool.get().map_err(GraphmanError::from)?;
let mut catalog_conn = catalog::Connection::new(primary_conn);
let node = catalog_conn
.assigned_node(&self.site)
.map_err(GraphmanError::from)?;
Ok(node)
}
}
#[derive(Debug, Error)]
pub enum ReassignDeploymentError {
#[error("deployment '{0}' is already assigned to '{1}'")]
AlreadyAssigned(String, String),
#[error(transparent)]
Common(#[from] GraphmanError),
}
#[derive(Clone, Debug)]
pub enum ReassignResult {
EmptyResponse,
CompletedWithWarnings(Vec<String>),
}
pub fn load_deployment(
primary_pool: ConnectionPool,
deployment: &DeploymentSelector,
) -> Result<Deployment, ReassignDeploymentError> {
let mut primary_conn = primary_pool.get().map_err(GraphmanError::from)?;
let locator = crate::deployment::load_deployment_locator(
&mut primary_conn,
deployment,
&DeploymentVersionSelector::All,
)?;
let mut catalog_conn = catalog::Connection::new(primary_conn);
let site = catalog_conn
.locate_site(locator.clone())
.map_err(GraphmanError::from)?
.ok_or_else(|| {
GraphmanError::Store(anyhow!("deployment site not found for '{locator}'"))
})?;
Ok(Deployment { locator, site })
}
pub fn reassign_deployment(
primary_pool: ConnectionPool,
notification_sender: Arc<NotificationSender>,
deployment: &Deployment,
node: &NodeId,
curr_node: Option<NodeId>,
) -> Result<ReassignResult, ReassignDeploymentError> {
let primary_conn = primary_pool.get().map_err(GraphmanError::from)?;
let mut catalog_conn = catalog::Connection::new(primary_conn);
let changes: Vec<AssignmentChange> = match &curr_node {
Some(curr) => {
if &curr == &node {
vec![]
} else {
catalog_conn
.reassign_subgraph(&deployment.site, &node)
.map_err(GraphmanError::from)?
}
}
None => catalog_conn
.assign_subgraph(&deployment.site, &node)
.map_err(GraphmanError::from)?,
};
if changes.is_empty() {
return Err(ReassignDeploymentError::AlreadyAssigned(
deployment.locator.to_string(),
node.to_string(),
));
}
catalog_conn
.send_store_event(¬ification_sender, &StoreEvent::new(changes))
.map_err(GraphmanError::from)?;
let mirror = catalog::Mirror::primary_only(primary_pool);
let count = mirror
.assignments(&node)
.map_err(GraphmanError::from)?
.len();
if count == 1 {
let warning_msg = format!("This is the only deployment assigned to '{}'. Please make sure that the node ID is spelled correctly.",node.as_str());
Ok(ReassignResult::CompletedWithWarnings(vec![warning_msg]))
} else {
Ok(ReassignResult::EmptyResponse)
}
}