Skip to content

Commit e5b30c0

Browse files
authored
Merge branch 'main' into subexpression
2 parents f0ced89 + 43cc181 commit e5b30c0

File tree

85 files changed

+1545
-690
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

85 files changed

+1545
-690
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
<br>
2424

25-
<img src="https://github.com/databendlabs/databend/assets/172204/9997d8bc-6462-4dbd-90e3-527cf50a709c" alt="databend" />
25+
<img src="https://github.com/user-attachments/assets/4c288d5c-9365-44f7-8cde-b2c7ebe15622" alt="databend" />
2626

2727
## Why Databend?
2828

src/common/base/src/base/dma.rs

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -305,12 +305,7 @@ pub type AsyncDmaFile = DmaFile<AsyncFile>;
305305

306306
impl AsyncDmaFile {
307307
async fn open_fd(path: impl AsRef<Path>, dio: bool) -> io::Result<AsyncFile> {
308-
let flags = if cfg!(target_os = "linux") && dio {
309-
OFlags::DIRECT.bits() as i32
310-
} else {
311-
0
312-
};
313-
308+
let flags = flags_direct_or_empty(dio).bits() as i32;
314309
AsyncFile::options()
315310
.read(true)
316311
.custom_flags(flags)
@@ -319,12 +314,7 @@ impl AsyncDmaFile {
319314
}
320315

321316
async fn create_fd(path: impl AsRef<Path>, dio: bool) -> io::Result<AsyncFile> {
322-
let flags = if cfg!(target_os = "linux") && dio {
323-
OFlags::EXCL | OFlags::DIRECT
324-
} else {
325-
OFlags::EXCL
326-
};
327-
317+
let flags = flags_direct_or_empty(dio) | OFlags::EXCL;
328318
AsyncFile::options()
329319
.write(true)
330320
.create(true)
@@ -421,25 +411,34 @@ impl AsyncDmaFile {
421411
}
422412
}
423413

414+
#[cfg(target_os = "linux")]
415+
fn flags_direct_or_empty(dio: bool) -> OFlags {
416+
if dio {
417+
OFlags::DIRECT
418+
} else {
419+
OFlags::empty()
420+
}
421+
}
422+
423+
#[cfg(not(target_os = "linux"))]
424+
fn flags_direct_or_empty(_dio: bool) -> OFlags {
425+
OFlags::empty()
426+
}
427+
424428
pub type SyncDmaFile = DmaFile<OwnedFd>;
425429

426430
impl SyncDmaFile {
427431
fn open_fd(path: impl rustix::path::Arg, dio: bool) -> io::Result<OwnedFd> {
428-
let flags = if cfg!(target_os = "linux") && dio {
429-
OFlags::RDONLY | OFlags::DIRECT
430-
} else {
431-
OFlags::RDONLY
432-
};
432+
let flags = OFlags::RDONLY | flags_direct_or_empty(dio);
433433
rustix::fs::open(path, flags, rustix::fs::Mode::empty()).map_err(io::Error::from)
434434
}
435435

436436
fn create_fd(path: impl rustix::path::Arg, dio: bool) -> io::Result<OwnedFd> {
437-
let flags = if cfg!(target_os = "linux") && dio {
438-
OFlags::EXCL | OFlags::CREATE | OFlags::TRUNC | OFlags::WRONLY | OFlags::DIRECT
439-
} else {
440-
OFlags::EXCL | OFlags::CREATE | OFlags::TRUNC | OFlags::WRONLY
441-
};
442-
437+
let flags = OFlags::EXCL
438+
| OFlags::CREATE
439+
| OFlags::TRUNC
440+
| OFlags::WRONLY
441+
| flags_direct_or_empty(dio);
443442
rustix::fs::open(path, flags, rustix::fs::Mode::from_raw_mode(0o666))
444443
.map_err(io::Error::from)
445444
}

src/common/exception/src/exception_code.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,8 @@ build_exceptions! {
289289
AlterTableError(1132),
290290
/// Constraint error
291291
ConstraintError(1133),
292+
/// Unknown row policy
293+
UnknownMaskPolicy(1134),
292294
}
293295

294296
// Sequence Errors [1124-1126, 3101-3102]

src/meta/api/src/auto_increment_nextval_impl.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use databend_common_meta_app::tenant::ToTenant;
1919
use databend_common_meta_kvapi::kvapi;
2020
use databend_common_meta_kvapi::kvapi::Key;
2121
use databend_common_meta_types::anyerror::func_name;
22-
use databend_common_meta_types::protobuf::FetchAddU64Response;
22+
use databend_common_meta_types::protobuf::FetchIncreaseU64Response;
2323
use databend_common_meta_types::MetaError;
2424
use databend_common_meta_types::TxnOp;
2525
use databend_common_meta_types::TxnRequest;
@@ -46,7 +46,7 @@ where KV: kvapi::KVApi<Error = MetaError> + ?Sized
4646
self,
4747
tenant: impl ToTenant,
4848
count: u64,
49-
) -> Result<Result<FetchAddU64Response, AutoIncrementError>, MetaTxnError> {
49+
) -> Result<Result<FetchIncreaseU64Response, AutoIncrementError>, MetaTxnError> {
5050
debug!("{}", func_name!());
5151

5252
// Key for the sequence number value.
@@ -69,7 +69,7 @@ where KV: kvapi::KVApi<Error = MetaError> + ?Sized
6969
);
7070
debug_assert!(succ);
7171

72-
let resp = responses[0].try_as_fetch_add_u64().unwrap();
72+
let resp = responses[0].try_as_fetch_increase_u64().unwrap();
7373
let got_delta = resp.delta();
7474

7575
if got_delta < delta {

src/meta/api/src/data_mask_api.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use databend_common_meta_app::data_mask::CreateDatamaskReq;
1717
use databend_common_meta_app::data_mask::DataMaskId;
1818
use databend_common_meta_app::data_mask::DataMaskNameIdent;
1919
use databend_common_meta_app::data_mask::DatamaskMeta;
20+
use databend_common_meta_app::tenant::Tenant;
2021
use databend_common_meta_types::MetaError;
2122
use databend_common_meta_types::SeqV;
2223

@@ -40,4 +41,10 @@ pub trait DatamaskApi: Send + Sync {
4041
&self,
4142
name_ident: &DataMaskNameIdent,
4243
) -> Result<Option<SeqV<DatamaskMeta>>, MetaError>;
44+
45+
async fn get_data_mask_by_id(
46+
&self,
47+
tenant: &Tenant,
48+
policy_id: u64,
49+
) -> Result<Option<SeqV<DatamaskMeta>>, MetaError>;
4350
}

src/meta/api/src/data_mask_api_impl.rs

Lines changed: 19 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use databend_common_meta_app::data_mask::MaskPolicyTableIdListIdent;
2323
use databend_common_meta_app::data_mask::MaskpolicyTableIdList;
2424
use databend_common_meta_app::id_generator::IdGenerator;
2525
use databend_common_meta_app::schema::CreateOption;
26-
use databend_common_meta_app::schema::TableId;
26+
use databend_common_meta_app::tenant::Tenant;
2727
use databend_common_meta_app::KeyWithTenant;
2828
use databend_common_meta_kvapi::kvapi;
2929
use databend_common_meta_types::MetaError;
@@ -40,7 +40,6 @@ use crate::txn_backoff::txn_backoff;
4040
use crate::txn_condition_util::txn_cond_eq_seq;
4141
use crate::txn_core_util::send_txn;
4242
use crate::txn_core_util::txn_delete_exact;
43-
use crate::txn_core_util::txn_replace_exact;
4443
use crate::txn_op_builder_util::txn_op_put_pb;
4544

4645
/// DatamaskApi is implemented upon kvapi::KVApi.
@@ -82,8 +81,6 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
8281

8382
txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);
8483

85-
clear_table_column_mask_policy(self, name_ident, &mut txn).await?;
86-
8784
curr_seq = seq_id.seq;
8885
}
8986
};
@@ -111,8 +108,9 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
111108
let id_list = MaskpolicyTableIdList::default();
112109
txn.condition.push(txn_cond_eq_seq(name_ident, curr_seq));
113110
txn.if_then.extend(vec![
114-
txn_op_put_pb(name_ident, &id, None)?, // name -> db_id
115-
txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta
111+
txn_op_put_pb(name_ident, &id, None)?, // name -> db_id
112+
txn_op_put_pb(&id_ident, &meta, None)?, // id -> meta
113+
// TODO: Tentative retention for compatibility MaskPolicyTableIdListIdent related logic. It can be directly deleted later
116114
txn_op_put_pb(&id_list_key, &id_list, None)?, // data mask name -> id_list
117115
]);
118116

@@ -143,7 +141,6 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
143141
let mut trials = txn_backoff(None, func_name!());
144142
loop {
145143
trials.next().unwrap()?.await;
146-
147144
let mut txn = TxnRequest::default();
148145

149146
let res = self.get_id_and_value(name_ident).await?;
@@ -157,9 +154,8 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
157154

158155
txn_delete_exact(&mut txn, name_ident, seq_id.seq);
159156
txn_delete_exact(&mut txn, &id_ident, seq_meta.seq);
160-
157+
// TODO: Tentative retention for compatibility MaskPolicyTableIdListIdent related logic. It can be directly deleted later
161158
clear_table_column_mask_policy(self, name_ident, &mut txn).await?;
162-
163159
let (succ, _responses) = send_txn(self, txn).await?;
164160
debug!(succ = succ;"{}", func_name!());
165161

@@ -179,6 +175,20 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {
179175

180176
Ok(res.map(|(_, seq_meta)| seq_meta))
181177
}
178+
179+
async fn get_data_mask_by_id(
180+
&self,
181+
tenant: &Tenant,
182+
policy_id: u64,
183+
) -> Result<Option<SeqV<DatamaskMeta>>, MetaError> {
184+
debug!(req :? =(policy_id); "DatamaskApi: {}", func_name!());
185+
186+
let id = DataMaskId::new(policy_id);
187+
let id_ident = DataMaskIdIdent::new_generic(tenant, id);
188+
189+
let res = self.get_pb(&id_ident).await?;
190+
Ok(res)
191+
}
182192
}
183193

184194
async fn clear_table_column_mask_policy(
@@ -195,30 +205,5 @@ async fn clear_table_column_mask_policy(
195205
};
196206

197207
txn_delete_exact(txn, &id_list_key, seq_id_list.seq);
198-
199-
// remove mask policy from table meta
200-
for table_id in seq_id_list.data.id_list.into_iter() {
201-
let tbid = TableId { table_id };
202-
203-
let seq_meta = kv_api.get_pb(&tbid).await?;
204-
205-
let Some(seq_meta) = seq_meta else {
206-
continue;
207-
};
208-
209-
let (seq, mut meta) = (seq_meta.seq, seq_meta.data);
210-
211-
if let Some(column_mask_policy) = meta.column_mask_policy {
212-
let new_column_mask_policy = column_mask_policy
213-
.into_iter()
214-
.filter(|(_, name)| name != name_ident.name())
215-
.collect();
216-
217-
meta.column_mask_policy = Some(new_column_mask_policy);
218-
219-
txn_replace_exact(txn, &tbid, seq, &meta)?;
220-
}
221-
}
222-
223208
Ok(())
224209
}

0 commit comments

Comments
 (0)