Skip to content

Commit ad94124

Browse files
committed
fix invalid bulk index format
1 parent 3f50f3c commit ad94124

File tree

6 files changed

+152
-4
lines changed

6 files changed

+152
-4
lines changed

src/elastic/src/client/requests/bulk/operation.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -189,15 +189,15 @@ where
189189
}
190190
}
191191

192-
pub fn index(self, doc: TDocument) -> BulkOperation<Doc<TDocument>> {
192+
pub fn index(self, doc: TDocument) -> BulkOperation<TDocument> {
193193
BulkOperation {
194194
action: Action::Index,
195195
header: BulkHeader {
196196
index: Some(Index::from(doc.index().into_owned())),
197197
ty: Some(Type::from(doc.ty().into_owned())),
198198
id: doc.partial_id().map(|id| Id::from(id.into_owned())),
199199
},
200-
inner: Some(Doc::value(doc)),
200+
inner: Some(doc),
201201
}
202202
}
203203

@@ -256,15 +256,15 @@ where
256256
.script_fluent(builder)
257257
}
258258

259-
pub fn create(self, doc: TDocument) -> BulkOperation<Doc<TDocument>> {
259+
pub fn create(self, doc: TDocument) -> BulkOperation<TDocument> {
260260
BulkOperation {
261261
action: Action::Create,
262262
header: BulkHeader {
263263
index: Some(Index::from(doc.index().into_owned())),
264264
ty: Some(Type::from(doc.ty().into_owned())),
265265
id: doc.partial_id().map(|id| Id::from(id.into_owned())),
266266
},
267-
inner: Some(Doc::value(doc)),
267+
inner: Some(doc),
268268
}
269269
}
270270

tests/run/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,5 @@ tokio-threadpool = "~0.1"
1717
tokio-timer = "~0.2"
1818
term-painter = "~0.2"
1919
clap = "~2"
20+
log = "~0.4"
2021
env_logger = "~0.6"

tests/run/src/bulk/index_get.rs

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
use elastic::error::Error;
2+
use elastic::prelude::*;
3+
use futures::Future;
4+
use run_tests::IntegrationTest;
5+
6+
#[derive(Debug, Clone, Copy)]
7+
pub struct IndexGet;
8+
9+
#[derive(Debug, PartialEq, Serialize, Deserialize, ElasticType)]
10+
#[elastic(ty = "document", index = "bulk_index")]
11+
pub struct Doc {
12+
#[elastic(id)]
13+
id: String,
14+
title: String,
15+
timestamp: Date<DefaultDateMapping>,
16+
}
17+
18+
impl IntegrationTest for IndexGet {
19+
type Response = GetResponse<Doc>;
20+
21+
fn kind() -> &'static str {
22+
"bulk"
23+
}
24+
fn name() -> &'static str {
25+
"index"
26+
}
27+
28+
// Ensure the index doesn't exist
29+
fn prepare(&self, client: AsyncClient) -> Box<Future<Item = (), Error = Error>> {
30+
let delete_res = client.index(Doc::static_index()).delete().send().map(|_| ());
31+
32+
Box::new(delete_res)
33+
}
34+
35+
// Index some bulk documents
36+
fn request(&self, client: AsyncClient) -> Box<Future<Item = Self::Response, Error = Error>> {
37+
let ops = (0..10).into_iter().map(|i| bulk().index(Doc {
38+
id: i.to_string(),
39+
title: "A document title".to_owned(),
40+
timestamp: Date::build(2017, 03, 24, 13, 44, 0, 0),
41+
}));
42+
43+
let bulk_res = client
44+
.bulk()
45+
.extend(ops)
46+
.params_fluent(|p| p.url_param("refresh", true))
47+
.send();
48+
49+
let get_res = client.document().get("4").send();
50+
51+
Box::new(bulk_res.and_then(|_| get_res))
52+
}
53+
54+
// Ensure the response contains the expected document
55+
fn assert_ok(&self, res: &Self::Response) -> bool {
56+
res.document().map(|doc| &*doc.id) == Some("4")
57+
}
58+
}

tests/run/src/bulk/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use run_tests::{
44
};
55

66
mod delete;
7+
mod index_get;
78
mod index_create;
89
mod stream;
910
mod stream_tiny_size_limit;
@@ -13,6 +14,7 @@ mod stream_tiny_timeout;
1314
pub fn tests() -> Vec<Test> {
1415
vec![
1516
Box::new(|client| test(client, delete::Delete)),
17+
Box::new(|client| test(client, index_get::IndexGet)),
1618
Box::new(|client| test(client, index_create::IndexCreate)),
1719
Box::new(|client| test(client, stream::BulkStream)),
1820
Box::new(|client| test(client, stream_tiny_size_limit::BulkStreamTinySize)),

tests/run/src/document/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod compile_test;
77

88
mod delete;
99
mod simple_index_get;
10+
mod simple_mapping;
1011
mod update_no_index;
1112
mod update_with_doc;
1213
mod update_with_inline_script;
@@ -15,6 +16,7 @@ mod update_with_script;
1516
pub fn tests() -> Vec<Test> {
1617
vec![
1718
Box::new(|client| test(client, simple_index_get::SimpleIndexGet)),
19+
Box::new(|client| test(client, simple_mapping::SimpleMapping)),
1820
Box::new(|client| test(client, update_with_doc::UpdateWithDoc)),
1921
Box::new(|client| test(client, update_with_script::UpdateWithScript)),
2022
Box::new(|client| test(client, update_with_inline_script::UpdateWithInlineScript)),
+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
use elastic::error::Error;
2+
use elastic::prelude::*;
3+
use futures::Future;
4+
use run_tests::IntegrationTest;
5+
6+
use serde_json::Value;
7+
8+
#[derive(Debug, Clone, Copy)]
9+
pub struct SimpleMapping;
10+
11+
#[derive(Debug, PartialEq, Serialize, Deserialize, ElasticType)]
12+
#[elastic(ty = "document", index = "simple_mapping")]
13+
pub struct Doc {
14+
#[elastic(id)]
15+
id: String,
16+
timestamp: Date<DefaultDateMapping>,
17+
}
18+
19+
impl IntegrationTest for SimpleMapping {
20+
type Response = Value;
21+
22+
fn kind() -> &'static str {
23+
"document"
24+
}
25+
fn name() -> &'static str {
26+
"simple mapping"
27+
}
28+
29+
// Ensure the index doesn't exist
30+
fn prepare(&self, client: AsyncClient) -> Box<Future<Item = (), Error = Error>> {
31+
let delete_res = client
32+
.index(Doc::static_index())
33+
.delete()
34+
.send()
35+
.map(|_| ());
36+
37+
Box::new(delete_res)
38+
}
39+
40+
// Put the document mapping, then get it back from Elasticsearch
41+
fn request(&self, client: AsyncClient) -> Box<Future<Item = Self::Response, Error = Error>> {
42+
let create_index = client.index(Doc::static_index()).create().send().map(|_| ());
43+
44+
let put_mapping = client
45+
.document::<Doc>()
46+
.put_mapping()
47+
.send();
48+
49+
let get_mapping = client
50+
.request(IndicesGetMappingRequest::for_index_ty(Doc::static_index(), Doc::static_ty()))
51+
.send()
52+
.and_then(|res| res.into_response::<Value>());
53+
54+
Box::new(create_index.and_then(|_| put_mapping).and_then(|_| get_mapping))
55+
}
56+
57+
// Ensure the response contains the expected document
58+
fn assert_ok(&self, res: &Self::Response) -> bool {
59+
let expected = json!({
60+
"simple_mapping": {
61+
"mappings": {
62+
"document": {
63+
"properties": {
64+
"id": {
65+
"type": "text",
66+
"fields": {
67+
"keyword": {
68+
"type": "keyword",
69+
"ignore_above": 256
70+
}
71+
}
72+
},
73+
"timestamp": {
74+
"type": "date",
75+
"format": "basic_date_time"
76+
}
77+
}
78+
}
79+
}
80+
}
81+
});
82+
83+
res.to_string() == expected.to_string()
84+
}
85+
}

0 commit comments

Comments
 (0)