Skip to content

Commit bb7043a

Browse files
authored
Plug ingest v2 into ES bulk API (#4374)
1 parent f6a09db commit bb7043a

File tree

20 files changed

+780
-142
lines changed

20 files changed

+780
-142
lines changed

quickwit/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-config/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ pub use source_config::{
5353
load_source_config_from_user_config, FileSourceParams, GcpPubSubSourceParams,
5454
KafkaSourceParams, KinesisSourceParams, PulsarSourceAuth, PulsarSourceParams, RegionOrEndpoint,
5555
SourceConfig, SourceInputFormat, SourceParams, TransformConfig, VecSourceParams,
56-
VoidSourceParams, CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_SOURCE_ID,
56+
VoidSourceParams, CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_V2_SOURCE_ID,
5757
};
5858
use tracing::warn;
5959

@@ -64,8 +64,8 @@ pub use crate::metastore_config::{
6464
MetastoreBackend, MetastoreConfig, MetastoreConfigs, PostgresMetastoreConfig,
6565
};
6666
pub use crate::node_config::{
67-
IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig, SplitCacheLimits,
68-
DEFAULT_QW_CONFIG_PATH,
67+
enable_ingest_v2, IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig,
68+
SplitCacheLimits, DEFAULT_QW_CONFIG_PATH,
6969
};
7070
use crate::source_config::serialize::{SourceConfigV0_7, VersionedSourceConfig};
7171
pub use crate::storage_config::{

quickwit/quickwit-config/src/node_config/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use std::time::Duration;
2929
use anyhow::{bail, ensure};
3030
use bytesize::ByteSize;
3131
use http::HeaderMap;
32+
use once_cell::sync::Lazy;
3233
use quickwit_common::net::HostAddr;
3334
use quickwit_common::uri::Uri;
3435
use quickwit_proto::indexing::CpuCapacity;
@@ -212,6 +213,12 @@ impl Default for IngestApiConfig {
212213
}
213214
}
214215

216+
/// Returns true if the ingest API v2 is enabled.
217+
pub fn enable_ingest_v2() -> bool {
218+
static ENABLE_INGEST_V2: Lazy<bool> = Lazy::new(|| env::var("QW_ENABLE_INGEST_V2").is_ok());
219+
*ENABLE_INGEST_V2
220+
}
221+
215222
impl IngestApiConfig {
216223
pub fn replication_factor(&self) -> anyhow::Result<NonZeroUsize> {
217224
if let Ok(replication_factor_str) = env::var("QW_INGEST_REPLICATION_FACTOR") {

quickwit/quickwit-config/src/source_config/mod.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub use serialize::load_source_config_from_user_config;
3434
// For backward compatibility.
3535
use serialize::VersionedSourceConfig;
3636

37-
use crate::TestableForRegression;
37+
use crate::{enable_ingest_v2, TestableForRegression};
3838

3939
/// Reserved source ID for the `quickwit index ingest` CLI command.
4040
pub const CLI_INGEST_SOURCE_ID: &str = "_ingest-cli-source";
@@ -44,10 +44,13 @@ pub const INGEST_API_SOURCE_ID: &str = "_ingest-api-source";
4444

4545
/// Reserved source ID used for native Quickwit ingest.
4646
/// (this is for ingest v2)
47-
pub const INGEST_SOURCE_ID: &str = "_ingest-source";
47+
pub const INGEST_V2_SOURCE_ID: &str = "_ingest-source";
4848

49-
pub const RESERVED_SOURCE_IDS: &[&str] =
50-
&[CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_SOURCE_ID];
49+
pub const RESERVED_SOURCE_IDS: &[&str] = &[
50+
CLI_INGEST_SOURCE_ID,
51+
INGEST_API_SOURCE_ID,
52+
INGEST_V2_SOURCE_ID,
53+
];
5154

5255
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
5356
#[serde(into = "VersionedSourceConfig")]
@@ -125,10 +128,10 @@ impl SourceConfig {
125128
/// Creates an ingest source v2.
126129
pub fn ingest_v2_default() -> Self {
127130
Self {
128-
source_id: INGEST_SOURCE_ID.to_string(),
131+
source_id: INGEST_V2_SOURCE_ID.to_string(),
129132
max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 should be non-zero"),
130133
desired_num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"),
131-
enabled: false,
134+
enabled: enable_ingest_v2(),
132135
source_params: SourceParams::Ingest,
133136
transform_config: None,
134137
input_format: SourceInputFormat::Json,

quickwit/quickwit-control-plane/src/control_plane.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,7 @@ impl EventSubscriber<ShardPositionsUpdate> for ControlPlaneEventSubscriber {
569569
mod tests {
570570
use mockall::Sequence;
571571
use quickwit_actors::{AskError, Observe, SupervisorMetrics};
572-
use quickwit_config::{IndexConfig, SourceParams, INGEST_SOURCE_ID};
572+
use quickwit_config::{IndexConfig, SourceParams, INGEST_V2_SOURCE_ID};
573573
use quickwit_indexing::IndexingService;
574574
use quickwit_metastore::{
575575
CreateIndexRequestExt, IndexMetadata, ListIndexesMetadataResponseExt,
@@ -893,14 +893,14 @@ mod tests {
893893

894894
let subrequest = &request.subrequests[0];
895895
assert_eq!(subrequest.index_uid, "test-index:0");
896-
assert_eq!(subrequest.source_id, INGEST_SOURCE_ID);
896+
assert_eq!(subrequest.source_id, INGEST_V2_SOURCE_ID);
897897

898898
let subresponses = vec![ListShardsSubresponse {
899899
index_uid: "test-index:0".to_string(),
900-
source_id: INGEST_SOURCE_ID.to_string(),
900+
source_id: INGEST_V2_SOURCE_ID.to_string(),
901901
shards: vec![Shard {
902902
index_uid: "test-index:0".to_string(),
903-
source_id: INGEST_SOURCE_ID.to_string(),
903+
source_id: INGEST_V2_SOURCE_ID.to_string(),
904904
shard_id: 1,
905905
shard_state: ShardState::Open as i32,
906906
..Default::default()
@@ -925,7 +925,7 @@ mod tests {
925925
subrequests: vec![GetOrCreateOpenShardsSubrequest {
926926
subrequest_id: 0,
927927
index_id: "test-index".to_string(),
928-
source_id: INGEST_SOURCE_ID.to_string(),
928+
source_id: INGEST_V2_SOURCE_ID.to_string(),
929929
}],
930930
closed_shards: Vec::new(),
931931
unavailable_leaders: Vec::new(),
@@ -939,7 +939,7 @@ mod tests {
939939

940940
let subresponse = &get_open_shards_response.successes[0];
941941
assert_eq!(subresponse.index_uid, "test-index:0");
942-
assert_eq!(subresponse.source_id, INGEST_SOURCE_ID);
942+
assert_eq!(subresponse.source_id, INGEST_V2_SOURCE_ID);
943943
assert_eq!(subresponse.open_shards.len(), 1);
944944
assert_eq!(subresponse.open_shards[0].shard_id, 1);
945945

@@ -1111,15 +1111,15 @@ mod tests {
11111111
assert_eq!(delete_shards_request.subrequests.len(), 1);
11121112
let subrequest = &delete_shards_request.subrequests[0];
11131113
assert_eq!(subrequest.index_uid, index_uid_clone);
1114-
assert_eq!(subrequest.source_id, INGEST_SOURCE_ID);
1114+
assert_eq!(subrequest.source_id, INGEST_V2_SOURCE_ID);
11151115
assert_eq!(&subrequest.shard_ids[..], &[17]);
11161116
Ok(DeleteShardsResponse {})
11171117
},
11181118
);
11191119

11201120
let mut shard = Shard {
11211121
index_uid: index_0.index_uid.to_string(),
1122-
source_id: INGEST_SOURCE_ID.to_string(),
1122+
source_id: INGEST_V2_SOURCE_ID.to_string(),
11231123
shard_id: 17,
11241124
leader_id: "test_node".to_string(),
11251125
..Default::default()
@@ -1132,7 +1132,7 @@ mod tests {
11321132
let list_shards_resp = ListShardsResponse {
11331133
subresponses: vec![ListShardsSubresponse {
11341134
index_uid: index_uid_clone.to_string(),
1135-
source_id: INGEST_SOURCE_ID.to_string(),
1135+
source_id: INGEST_V2_SOURCE_ID.to_string(),
11361136
shards: vec![shard],
11371137
next_shard_id: 18,
11381138
}],
@@ -1152,7 +1152,7 @@ mod tests {
11521152
);
11531153
let source_uid = SourceUid {
11541154
index_uid: index_0.index_uid.clone(),
1155-
source_id: INGEST_SOURCE_ID.to_string(),
1155+
source_id: INGEST_V2_SOURCE_ID.to_string(),
11561156
};
11571157

11581158
// This update should not triggeer anything in the control plane.
@@ -1246,7 +1246,7 @@ mod tests {
12461246
assert_eq!(delete_shards_request.subrequests.len(), 1);
12471247
let subrequest = &delete_shards_request.subrequests[0];
12481248
assert_eq!(subrequest.index_uid, index_uid_clone);
1249-
assert_eq!(subrequest.source_id, INGEST_SOURCE_ID);
1249+
assert_eq!(subrequest.source_id, INGEST_V2_SOURCE_ID);
12501250
assert_eq!(&subrequest.shard_ids[..], &[17]);
12511251
Ok(DeleteShardsResponse {})
12521252
},
@@ -1258,7 +1258,7 @@ mod tests {
12581258
let list_shards_resp = ListShardsResponse {
12591259
subresponses: vec![ListShardsSubresponse {
12601260
index_uid: index_uid_clone.to_string(),
1261-
source_id: INGEST_SOURCE_ID.to_string(),
1261+
source_id: INGEST_V2_SOURCE_ID.to_string(),
12621262
shards: vec![],
12631263
next_shard_id: 18,
12641264
}],
@@ -1278,7 +1278,7 @@ mod tests {
12781278
);
12791279
let source_uid = SourceUid {
12801280
index_uid: index_0.index_uid.clone(),
1281-
source_id: INGEST_SOURCE_ID.to_string(),
1281+
source_id: INGEST_V2_SOURCE_ID.to_string(),
12821282
};
12831283

12841284
// This update should not triggeer anything in the control plane.
@@ -1332,7 +1332,7 @@ mod tests {
13321332
let list_shards_resp = ListShardsResponse {
13331333
subresponses: vec![ListShardsSubresponse {
13341334
index_uid: index_uid_clone.to_string(),
1335-
source_id: INGEST_SOURCE_ID.to_string(),
1335+
source_id: INGEST_V2_SOURCE_ID.to_string(),
13361336
shards: vec![Shard {
13371337
index_uid: index_uid_clone.to_string(),
13381338
source_id: source.source_id.to_string(),
@@ -1428,7 +1428,7 @@ mod tests {
14281428
mock_metastore.expect_delete_source().return_once(
14291429
move |delete_source_request: DeleteSourceRequest| {
14301430
assert_eq!(delete_source_request.index_uid, index_uid_clone.to_string());
1431-
assert_eq!(&delete_source_request.source_id, INGEST_SOURCE_ID);
1431+
assert_eq!(&delete_source_request.source_id, INGEST_V2_SOURCE_ID);
14321432
Ok(EmptyResponse {})
14331433
},
14341434
);
@@ -1454,7 +1454,7 @@ mod tests {
14541454
let list_shards_resp = ListShardsResponse {
14551455
subresponses: vec![ListShardsSubresponse {
14561456
index_uid: index_uid_clone.to_string(),
1457-
source_id: INGEST_SOURCE_ID.to_string(),
1457+
source_id: INGEST_V2_SOURCE_ID.to_string(),
14581458
shards: vec![Shard {
14591459
index_uid: index_uid_clone.to_string(),
14601460
source_id: source.source_id.to_string(),
@@ -1485,7 +1485,7 @@ mod tests {
14851485
control_plane_mailbox
14861486
.ask(DeleteSourceRequest {
14871487
index_uid: index_0.index_uid.to_string(),
1488-
source_id: INGEST_SOURCE_ID.to_string(),
1488+
source_id: INGEST_V2_SOURCE_ID.to_string(),
14891489
})
14901490
.await
14911491
.unwrap()

quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,7 @@ mod tests {
672672

673673
use std::collections::BTreeSet;
674674

675-
use quickwit_config::{SourceConfig, SourceParams, INGEST_SOURCE_ID};
675+
use quickwit_config::{SourceConfig, SourceParams, INGEST_V2_SOURCE_ID};
676676
use quickwit_ingest::{RateMibPerSec, ShardInfo};
677677
use quickwit_metastore::IndexMetadata;
678678
use quickwit_proto::control_plane::GetOrCreateOpenShardsSubrequest;
@@ -1344,7 +1344,7 @@ mod tests {
13441344
.returning(|request| {
13451345
assert_eq!(request.subrequests.len(), 1);
13461346
assert_eq!(request.subrequests[0].index_uid, "test-index:0");
1347-
assert_eq!(request.subrequests[0].source_id, INGEST_SOURCE_ID);
1347+
assert_eq!(request.subrequests[0].source_id, INGEST_V2_SOURCE_ID);
13481348
assert_eq!(request.subrequests[0].leader_id, "test-ingester");
13491349
assert_eq!(request.subrequests[0].next_shard_id, 1);
13501350

@@ -1355,17 +1355,17 @@ mod tests {
13551355
mock_metastore.expect_open_shards().returning(|request| {
13561356
assert_eq!(request.subrequests.len(), 1);
13571357
assert_eq!(request.subrequests[0].index_uid, "test-index:0");
1358-
assert_eq!(request.subrequests[0].source_id, INGEST_SOURCE_ID);
1358+
assert_eq!(request.subrequests[0].source_id, INGEST_V2_SOURCE_ID);
13591359
assert_eq!(request.subrequests[0].leader_id, "test-ingester");
13601360
assert_eq!(request.subrequests[0].next_shard_id, 1);
13611361

13621362
let subresponses = vec![metastore::OpenShardsSubresponse {
13631363
subrequest_id: 0,
13641364
index_uid: "test-index:0".into(),
1365-
source_id: INGEST_SOURCE_ID.to_string(),
1365+
source_id: INGEST_V2_SOURCE_ID.to_string(),
13661366
opened_shards: vec![Shard {
13671367
index_uid: "test-index:0".into(),
1368-
source_id: INGEST_SOURCE_ID.to_string(),
1368+
source_id: INGEST_V2_SOURCE_ID.to_string(),
13691369
shard_id: 1,
13701370
leader_id: "test-ingester".to_string(),
13711371
shard_state: ShardState::Open as i32,
@@ -1387,7 +1387,7 @@ mod tests {
13871387
);
13881388

13891389
let index_uid: IndexUid = "test-index:0".into();
1390-
let source_id: SourceId = INGEST_SOURCE_ID.to_string();
1390+
let source_id: SourceId = INGEST_V2_SOURCE_ID.to_string();
13911391

13921392
let source_uid = SourceUid {
13931393
index_uid: index_uid.clone(),
@@ -1425,7 +1425,7 @@ mod tests {
14251425
.returning(|request| {
14261426
assert_eq!(request.shards.len(), 1);
14271427
assert_eq!(request.shards[0].index_uid, "test-index:0");
1428-
assert_eq!(request.shards[0].source_id, INGEST_SOURCE_ID);
1428+
assert_eq!(request.shards[0].source_id, INGEST_V2_SOURCE_ID);
14291429
assert_eq!(request.shards[0].shard_id, 1);
14301430
assert_eq!(request.shards[0].leader_id, "test-ingester");
14311431

@@ -1434,7 +1434,7 @@ mod tests {
14341434
ingester_mock.expect_init_shards().returning(|request| {
14351435
assert_eq!(request.shards.len(), 1);
14361436
assert_eq!(request.shards[0].index_uid, "test-index:0");
1437-
assert_eq!(request.shards[0].source_id, INGEST_SOURCE_ID);
1437+
assert_eq!(request.shards[0].source_id, INGEST_V2_SOURCE_ID);
14381438
assert_eq!(request.shards[0].shard_id, 1);
14391439
assert_eq!(request.shards[0].leader_id, "test-ingester");
14401440

quickwit/quickwit-control-plane/src/model/mod.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ impl ControlPlaneModel {
345345

346346
#[cfg(test)]
347347
mod tests {
348-
use quickwit_config::{SourceConfig, SourceParams, INGEST_SOURCE_ID};
348+
use quickwit_config::{SourceConfig, SourceParams, INGEST_V2_SOURCE_ID};
349349
use quickwit_metastore::IndexMetadata;
350350
use quickwit_proto::ingest::{Shard, ShardState};
351351
use quickwit_proto::metastore::ListIndexesMetadataResponse;
@@ -381,14 +381,14 @@ mod tests {
381381
assert_eq!(request.subrequests.len(), 2);
382382

383383
assert_eq!(request.subrequests[0].index_uid, "test-index-0:0");
384-
assert_eq!(request.subrequests[0].source_id, INGEST_SOURCE_ID);
384+
assert_eq!(request.subrequests[0].source_id, INGEST_V2_SOURCE_ID);
385385
assert_eq!(
386386
request.subrequests[0].shard_state(),
387387
ShardState::Unspecified
388388
);
389389

390390
assert_eq!(request.subrequests[1].index_uid, "test-index-1:0");
391-
assert_eq!(request.subrequests[1].source_id, INGEST_SOURCE_ID);
391+
assert_eq!(request.subrequests[1].source_id, INGEST_V2_SOURCE_ID);
392392
assert_eq!(
393393
request.subrequests[1].shard_state(),
394394
ShardState::Unspecified
@@ -397,11 +397,11 @@ mod tests {
397397
let subresponses = vec![
398398
metastore::ListShardsSubresponse {
399399
index_uid: "test-index-0:0".to_string(),
400-
source_id: INGEST_SOURCE_ID.to_string(),
400+
source_id: INGEST_V2_SOURCE_ID.to_string(),
401401
shards: vec![Shard {
402402
shard_id: 42,
403403
index_uid: "test-index-0:0".to_string(),
404-
source_id: INGEST_SOURCE_ID.to_string(),
404+
source_id: INGEST_V2_SOURCE_ID.to_string(),
405405
shard_state: ShardState::Open as i32,
406406
leader_id: "node1".to_string(),
407407
..Default::default()
@@ -410,7 +410,7 @@ mod tests {
410410
},
411411
metastore::ListShardsSubresponse {
412412
index_uid: "test-index-1:0".to_string(),
413-
source_id: INGEST_SOURCE_ID.to_string(),
413+
source_id: INGEST_V2_SOURCE_ID.to_string(),
414414
shards: Vec::new(),
415415
next_shard_id: 1,
416416
},
@@ -443,7 +443,7 @@ mod tests {
443443

444444
let source_uid_0 = SourceUid {
445445
index_uid: "test-index-0:0".into(),
446-
source_id: INGEST_SOURCE_ID.to_string(),
446+
source_id: INGEST_V2_SOURCE_ID.to_string(),
447447
};
448448
let shards: Vec<&ShardEntry> = model
449449
.shard_table
@@ -458,7 +458,7 @@ mod tests {
458458

459459
let source_uid_1 = SourceUid {
460460
index_uid: "test-index-1:0".into(),
461-
source_id: INGEST_SOURCE_ID.to_string(),
461+
source_id: INGEST_V2_SOURCE_ID.to_string(),
462462
};
463463
let shards: Vec<&ShardEntry> = model
464464
.shard_table

quickwit/quickwit-ingest/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ bytes = { workspace = true }
1212
bytesize = { workspace = true }
1313
dyn-clone = { workspace = true }
1414
flume = { workspace = true }
15+
fnv = { workspace = true }
1516
futures = { workspace = true }
1617
http = { workspace = true }
1718
hyper = { workspace = true }

0 commit comments

Comments
 (0)