Skip to content

Commit b0a71aa

Browse files
authored
refactor: simply management of tmp table in session. (#18162)
* chore: unify log format. * chore: log when create temp table. * chore: warn about overlapped query in a session. * refactor: improve refreshing of server side state. * feat: return session state only when changed. * refactor: simply management of tmp table in session. * ci: update tests. * dynamic refresh to ease tests. * chore: log restoring of temp_tbl_mgr. * fix: clear_m_cte_temp_table when bind fail. * fix * add test
1 parent 3c9e817 commit b0a71aa

File tree

14 files changed

+289
-251
lines changed

14 files changed

+289
-251
lines changed

src/query/service/src/catalogs/default/session_catalog.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ use databend_storages_common_table_meta::table_id_ranges::is_temp_table_id;
113113

114114
use crate::catalogs::default::MutableCatalog;
115115
use crate::catalogs::Catalog;
116+
use crate::servers::http::v1::ClientSessionManager;
117+
116118
#[derive(Clone, Debug)]
117119
pub struct SessionCatalog {
118120
inner: MutableCatalog,
@@ -421,8 +423,8 @@ impl Catalog for SessionCatalog {
421423
}
422424

423425
async fn create_table(&self, req: CreateTableReq) -> Result<CreateTableReply> {
424-
match req.table_meta.options.get(OPT_KEY_TEMP_PREFIX) {
425-
Some(_) => self.temp_tbl_mgr.lock().create_table(req),
426+
match req.table_meta.options.get(OPT_KEY_TEMP_PREFIX).cloned() {
427+
Some(prefix) => self.temp_tbl_mgr.lock().create_table(req, prefix.clone()),
426428
None => self.inner.create_table(req).await,
427429
}
428430
}
@@ -434,6 +436,8 @@ impl Catalog for SessionCatalog {
434436
)
435437
.await?
436438
{
439+
ClientSessionManager::instance()
440+
.remove_temp_tbl_mgr(req.temp_prefix, self.temp_tbl_mgr.clone());
437441
return Ok(reply);
438442
}
439443
self.inner.drop_table_by_id(req).await

src/query/service/src/interpreters/interpreter_table_create.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ use crate::interpreters::hook::vacuum_hook::hook_vacuum_temp_files;
8080
use crate::interpreters::InsertInterpreter;
8181
use crate::interpreters::Interpreter;
8282
use crate::pipelines::PipelineBuildResult;
83+
use crate::servers::http::v1::ClientSessionManager;
8384
use crate::sessions::QueryContext;
8485
use crate::sessions::TableContext;
8586
use crate::sql::plans::Insert;
@@ -376,10 +377,24 @@ impl CreateTableInterpreter {
376377
}
377378

378379
let reply = catalog.create_table(req.clone()).await?;
380+
if let Some(prefix) = req.table_meta.options.get(OPT_KEY_TEMP_PREFIX).cloned() {
381+
let session = self.ctx.get_current_session();
382+
if let Some(id) = session.get_client_session_id() {
383+
let client_session_manager = ClientSessionManager::instance();
384+
client_session_manager.add_temp_tbl_mgr(prefix, session.temp_tbl_mgr().clone());
385+
client_session_manager
386+
.refresh_session_handle(
387+
self.ctx.get_tenant(),
388+
self.ctx.get_current_user()?.name,
389+
&id,
390+
)
391+
.await?;
392+
}
393+
}
379394

380395
if !req.table_meta.options.contains_key(OPT_KEY_TEMP_PREFIX) && !catalog.is_external() {
381396
// iceberg table do not need to generate ownership.
382-
// grant the ownership of the table to the current role, the above req.table_meta.owner could be removed in future.
397+
// grant the ownership of the table to the current role, the above req.table_meta.owner could be removed in the future.
383398
if let Some(current_role) = self.ctx.get_current_role() {
384399
let tenant = self.ctx.get_tenant();
385400
let db = catalog.get_database(&tenant, &self.plan.database).await?;

src/query/service/src/interpreters/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ mod util;
173173

174174
pub use access::ManagementModeAccess;
175175
pub use common::InterpreterQueryLog;
176+
pub use hook::vacuum_hook::hook_clear_m_cte_temp_table;
176177
pub use hook::HookOperator;
177178
pub use interpreter::interpreter_plan_sql;
178179
pub use interpreter::Interpreter;

src/query/service/src/servers/http/middleware/session.rs

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ use crate::servers::HttpHandlerKind;
8181
use crate::sessions::SessionManager;
8282
const USER_AGENT: &str = "User-Agent";
8383
const TRACE_PARENT: &str = "traceparent";
84-
const COOKIE_LAST_ACCESS_TIME: &str = "last_access_time";
84+
const COOKIE_LAST_REFRESH_TIME: &str = "last_refresh_time";
8585
const COOKIE_SESSION_ID: &str = "session_id";
8686
const COOKIE_COOKIE_ENABLED: &str = "cookie_enabled";
8787
#[derive(Debug, Copy, Clone)]
@@ -408,7 +408,6 @@ impl<E> HTTPSessionEndpoint<E> {
408408
if cookie_session_id.is_some() {
409409
login_history.disable_write = true;
410410
}
411-
412411
let client_session_id = match (&authed_client_session_id, &cookie_session_id) {
413412
(Some(id1), Some(id2)) => {
414413
if id1 != id2 {
@@ -420,7 +419,9 @@ impl<E> HTTPSessionEndpoint<E> {
420419
Some(id1.clone())
421420
}
422421
(Some(id), None) => {
423-
req.cookie().add(make_cookie(COOKIE_SESSION_ID, id));
422+
if cookie_enabled {
423+
req.cookie().add(make_cookie(COOKIE_SESSION_ID, id));
424+
}
424425
Some(id.clone())
425426
}
426427
(None, Some(id)) => Some(id.clone()),
@@ -439,35 +440,38 @@ impl<E> HTTPSessionEndpoint<E> {
439440

440441
if let Some(id) = &client_session_id {
441442
session.set_client_session_id(id.clone());
442-
let last_access_time = req
443+
}
444+
445+
if cookie_enabled {
446+
let last_refresh_time = req
443447
.cookie()
444-
.get(COOKIE_LAST_ACCESS_TIME)
448+
.get(COOKIE_LAST_REFRESH_TIME)
445449
.map(|s| s.value_str().to_string());
446-
if let Some(ts) = &last_access_time {
450+
451+
let need_update = if let Some(ts) = &last_refresh_time {
447452
let ts = ts.parse::<u64>().map_err(|_| {
448453
ErrorCode::BadArguments(format!(
449-
"[HTTP-SESSION] Invalid last_access_time value: {}",
454+
"[HTTP-SESSION] Invalid last_refresh_time value: {}",
450455
ts
451456
))
452457
})?;
453458
let ts = SystemTime::UNIX_EPOCH + Duration::from_secs(ts);
454-
if let Err(err) = ts.elapsed() {
455-
log::error!(
456-
"[HTTP-SESSION] Invalid last_access_time: detected clock drift or incorrect timestamp, difference: {:?}",
457-
err.duration()
458-
);
459-
};
460-
ClientSessionManager::instance()
461-
.refresh_state(session.get_current_tenant(), id, &user_name, &ts)
462-
.await?;
459+
if let Some(id) = &client_session_id {
460+
ClientSessionManager::instance()
461+
.refresh_state(session.get_current_tenant(), id, &user_name, &ts)
462+
.await?
463+
} else {
464+
true
465+
}
466+
} else {
467+
true
468+
};
469+
if need_update {
470+
let ts = unix_ts().as_secs().to_string();
471+
req.cookie().add(make_cookie(COOKIE_LAST_REFRESH_TIME, ts));
463472
}
464473
}
465474

466-
if cookie_enabled {
467-
let ts = unix_ts().as_secs().to_string();
468-
req.cookie().add(make_cookie(COOKIE_LAST_ACCESS_TIME, ts));
469-
}
470-
471475
let session = session_manager.register_session(session)?;
472476

473477
let deduplicate_label = req

src/query/service/src/servers/http/v1/http_query_handlers.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ impl QueryResponse {
170170
id: String,
171171
r: HttpQueryResponseInternal,
172172
is_final: bool,
173-
) -> impl IntoResponse {
173+
) -> (impl IntoResponse, bool) {
174174
let state = r.state.clone();
175175
let (data, next_uri) = if is_final {
176176
(Arc::new(BlocksSerializer::empty()), None)
@@ -220,7 +220,12 @@ impl QueryResponse {
220220
};
221221
let rows = data.num_rows();
222222

223-
Json(QueryResponse {
223+
let next_is_final = next_uri
224+
.as_ref()
225+
.map(|u| u.ends_with("final"))
226+
.unwrap_or(false);
227+
228+
let resp = Json(QueryResponse {
224229
data,
225230
state: state.state,
226231
schema: state.schema.clone(),
@@ -241,7 +246,8 @@ impl QueryResponse {
241246
})
242247
.with_header(HEADER_QUERY_ID, id.clone())
243248
.with_header(HEADER_QUERY_STATE, state.state.to_string())
244-
.with_header(HEADER_QUERY_PAGE_ROWS, rows)
249+
.with_header(HEADER_QUERY_PAGE_ROWS, rows);
250+
(resp, next_is_final)
245251
}
246252
}
247253

@@ -286,7 +292,7 @@ async fn query_final_handler(
286292
// it is safe to set these 2 fields to None, because client now check for null/None first.
287293
response.session = None;
288294
response.state.affect = None;
289-
Ok(QueryResponse::from_internal(query_id, response, true))
295+
Ok(QueryResponse::from_internal(query_id, response, true).0)
290296
}
291297
None => Err(query_id_not_found(&query_id, &ctx.node_id)),
292298
}
@@ -347,7 +353,7 @@ async fn query_state_handler(
347353
let response = query
348354
.get_response_state_only()
349355
.map_err(HttpErrorCode::server_error)?;
350-
Ok(QueryResponse::from_internal(query_id, response, false))
356+
Ok(QueryResponse::from_internal(query_id, response, false).0)
351357
}
352358
}
353359
None => Err(query_id_not_found(&query_id, &ctx.node_id)),
@@ -398,7 +404,11 @@ async fn query_page_handler(
398404
)
399405
})?;
400406
query.update_expire_time(false).await;
401-
Ok(QueryResponse::from_internal(query_id, resp, false))
407+
let (resp, next_is_final) = QueryResponse::from_internal(query_id, resp, false);
408+
if next_is_final {
409+
query.wait_for_final()
410+
}
411+
Ok(resp)
402412
}
403413
}
404414
};
@@ -491,7 +501,12 @@ pub(crate) async fn query_handler(
491501
&query.id, &resp.state, rows, next_page, mask_connection_info(&sql)
492502
);
493503
query.update_expire_time(false).await;
494-
Ok(QueryResponse::from_internal(query.id.to_string(), resp, false).into_response())
504+
let (resp, next_is_final) =
505+
QueryResponse::from_internal(query.id.to_string(), resp, false);
506+
if next_is_final {
507+
query.wait_for_final()
508+
}
509+
Ok(resp.into_response())
495510
}
496511
}
497512
};

src/query/service/src/servers/http/v1/query/execute_state.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use databend_common_expression::DataSchemaRef;
2727
use databend_common_expression::Scalar;
2828
use databend_common_io::prelude::FormatSettings;
2929
use databend_common_settings::Settings;
30-
use databend_storages_common_session::TempTblMgrRef;
3130
use databend_storages_common_session::TxnManagerRef;
3231
use futures::StreamExt;
3332
use log::debug;
@@ -156,7 +155,6 @@ pub struct ExecutorSessionState {
156155
pub secondary_roles: Option<Vec<String>>,
157156
pub settings: Arc<Settings>,
158157
pub txn_manager: TxnManagerRef,
159-
pub temp_tbl_mgr: TempTblMgrRef,
160158
pub variables: HashMap<String, Scalar>,
161159
pub last_query_ids: Vec<String>,
162160
pub last_query_result_cache_key: String,
@@ -182,7 +180,6 @@ impl ExecutorSessionState {
182180
secondary_roles: session.get_secondary_roles(),
183181
settings: session.get_settings(),
184182
txn_manager: session.txn_mgr(),
185-
temp_tbl_mgr: session.temp_tbl_mgr(),
186183
variables: session.get_all_variables(),
187184
last_query_ids,
188185
last_query_result_cache_key,

0 commit comments

Comments
 (0)