Skip to content
This repository was archived by the owner on Jun 11, 2024. It is now read-only.

Commit bc72e51

Browse files
authored
Fetch addresses from elasticsearch in batch (#60)
* minor lints and fixes * addresses: rewrite find_address lazyly * pois: lazy building of POIs * fetch ES addresses in batch * lazy_es: rename PartialResult into LazyEs * addresses: less agressive error handling * lazy_es: log amount of requests * lazy_es: add a bit of documentation * lazy_es: limit size of multi-search * apply small review comments * lazy_es: use self instanciated http client instead of custom rubber * addresses: set MAX_REVERSE_DISTANCE to 500m * lazy_es: use the same http client until the progress is done * addresses: panic when ES raises an error
1 parent 7c95193 commit bc72e51

File tree

6 files changed

+464
-193
lines changed

6 files changed

+464
-193
lines changed

Cargo.toml

+3-2
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ par-map = "0.1.4"
1919
num_cpus = "1.13"
2020
once_cell = "1.4"
2121
reqwest = "0.10"
22-
serde = {version = "1", features = ["rc"]}
23-
serde_json = "1"
22+
serde = { version = "1", features = ["rc"] }
23+
serde_json = { version = "1", features = ["raw_value"] }
24+
2425

2526
[dev-dependencies]
2627
retry = "*"

src/addresses.rs

+133-97
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
use itertools::Itertools;
2-
use mimir::rubber::Rubber;
32
use mimir::Poi;
43
use mimirsbrunn::admin_geofinder::AdminGeoFinder;
54
use mimirsbrunn::labels::format_addr_name_and_label;
65
use mimirsbrunn::labels::format_street_label;
76
use mimirsbrunn::utils::find_country_codes;
8-
use reqwest::StatusCode;
97
use serde::Deserialize;
8+
use serde_json::json;
109
use std::ops::Deref;
1110
use std::sync::Arc;
1211

12+
use crate::lazy_es::{parse_es_response, LazyEs};
13+
1314
// Prefixes used in ids for Address objects derived from OSM tags
1415
const FAFNIR_ADDR_NAMESPACE: &str = "addr_poi:";
1516
const FAFNIR_STREET_NAMESPACE: &str = "street_poi:";
17+
const MAX_REVERSE_DISTANCE: &str = "500m";
1618

1719
/// Check if a mimir address originates from OSM data.
1820
pub fn is_addr_derived_from_tags(addr: &mimir::Address) -> bool {
@@ -37,36 +39,33 @@ pub enum CurPoiAddress {
3739
/// Get current value of address associated with a POI in the ES database if
3840
/// any, together with current coordinates of the POI that have been used to
3941
/// perform a reverse
40-
pub fn get_current_addr(rubber: &mut Rubber, poi_index: &str, osm_id: &str) -> CurPoiAddress {
41-
let query = format!(
42-
"{}/poi/{}/_source?_source_include=address,coord",
43-
poi_index, osm_id
44-
);
45-
42+
pub fn get_current_addr<'a>(poi_index: &str, osm_id: &'a str) -> LazyEs<'a, CurPoiAddress> {
4643
#[derive(Deserialize)]
47-
struct FetchPOI {
44+
struct FetchPoi {
4845
coord: mimir::Coord,
4946
address: Option<mimir::Address>,
5047
}
5148

52-
rubber
53-
.get(&query)
54-
.map_err(|err| warn!("query to elasticsearch failed: {:?}", err))
55-
.ok()
56-
.and_then(|res| {
57-
if res.status() != StatusCode::NOT_FOUND {
58-
res.json()
59-
.map_err(|err| {
60-
warn!(
61-
"failed to parse ES response while reading old address for {}: {:?}",
62-
osm_id, err
63-
)
64-
})
65-
.ok()
66-
.map(|poi_json: FetchPOI| {
67-
let coord = poi_json.coord;
49+
LazyEs::NeedEsQuery {
50+
header: json!({ "index": poi_index }),
51+
query: json!({
52+
"_source": ["address", "coord"],
53+
"query": {"terms": {"_id": [osm_id]}}
54+
}),
55+
progress: Box::new(move |es_response| {
56+
LazyEs::Value({
57+
let hits = parse_es_response(es_response)
58+
.expect("got error from ES while reading old address");
59+
60+
assert!(hits.len() <= 1);
6861

69-
if let Some(address) = poi_json.address {
62+
hits.into_iter()
63+
.next()
64+
.map(|hit| {
65+
let poi: FetchPoi = hit.source;
66+
let coord = poi.coord;
67+
68+
if let Some(address) = poi.address {
7069
CurPoiAddress::Some {
7170
coord,
7271
address: Box::new(address),
@@ -75,11 +74,49 @@ pub fn get_current_addr(rubber: &mut Rubber, poi_index: &str, osm_id: &str) -> C
7574
CurPoiAddress::None { coord }
7675
}
7776
})
78-
} else {
79-
None
77+
.unwrap_or(CurPoiAddress::NotFound)
78+
})
79+
}),
80+
}
81+
}
82+
83+
/// Get addresses close to input coordinates.
84+
pub fn get_addr_from_coords<'a>(coord: &mimir::Coord) -> LazyEs<'a, Vec<mimir::Place>> {
85+
let indexes = mimir::rubber::get_indexes(false, &[], &[], &["house", "street"]);
86+
87+
LazyEs::NeedEsQuery {
88+
header: json!({
89+
"index": indexes,
90+
"ignore_unavailable": true
91+
}),
92+
query: json!({
93+
"query": {
94+
"bool": {
95+
"should": mimir::rubber::build_proximity_with_boost(coord, 1.),
96+
"must": {
97+
"geo_distance": {
98+
"distance": MAX_REVERSE_DISTANCE,
99+
"coord": {
100+
"lat": coord.lat(),
101+
"lon": coord.lon()
102+
}
103+
}
104+
}
105+
}
80106
}
81-
})
82-
.unwrap_or(CurPoiAddress::NotFound)
107+
}),
108+
progress: Box::new(|es_response| {
109+
LazyEs::Value(
110+
parse_es_response(es_response)
111+
.expect("got error from ES while performing reverse")
112+
.into_iter()
113+
.filter_map(|hit| {
114+
mimir::rubber::make_place(hit.doc_type, Some(Box::new(hit.source)), None)
115+
})
116+
.collect(),
117+
)
118+
}),
119+
}
83120
}
84121

85122
fn build_new_addr(
@@ -159,102 +196,101 @@ fn build_new_addr(
159196
/// We also search for the admins that contains the coordinates of the poi
160197
/// and add them as the address's admins.
161198
///
162-
/// If try_skip_reverse is set to try, it will reuse the address already
199+
/// If try_skip_reverse is set to true, it will reuse the address already
163200
/// attached to a POI in the ES database.
164-
pub fn find_address(
165-
poi: &Poi,
166-
geofinder: &AdminGeoFinder,
167-
rubber: &mut Rubber,
201+
pub fn find_address<'p>(
202+
poi: &'p Poi,
203+
geofinder: &'p AdminGeoFinder,
168204
poi_index: &str,
169205
try_skip_reverse: bool,
170-
) -> Option<mimir::Address> {
206+
) -> LazyEs<'p, Option<mimir::Address>> {
171207
if poi
172208
.properties
173209
.iter()
174210
.any(|p| p.key == "poi_class" && p.value == "locality")
175211
{
176212
// We don't want to add address on hamlets.
177-
return None;
213+
return LazyEs::Value(None);
178214
}
215+
179216
let osm_addr_tag = ["addr:housenumber", "contact:housenumber"]
180217
.iter()
181-
.filter_map(|k| {
218+
.find_map(|k| {
182219
poi.properties
183220
.iter()
184221
.find(|p| &p.key == k)
185222
.map(|p| &p.value)
186-
})
187-
.next();
223+
});
188224

189-
let osm_street_tag = ["addr:street", "contact:street"]
190-
.iter()
191-
.filter_map(|k| {
192-
poi.properties
193-
.iter()
194-
.find(|p| &p.key == k)
195-
.map(|p| &p.value)
196-
})
197-
.next();
225+
let osm_street_tag = ["addr:street", "contact:street"].iter().find_map(|k| {
226+
poi.properties
227+
.iter()
228+
.find(|p| &p.key == k)
229+
.map(|p| &p.value)
230+
});
198231

199232
match (osm_addr_tag, osm_street_tag) {
200-
(Some(house_number_tag), Some(street_tag)) => Some(build_new_addr(
233+
(Some(house_number_tag), Some(street_tag)) => LazyEs::Value(Some(build_new_addr(
201234
house_number_tag,
202235
street_tag,
203236
poi,
204237
geofinder.get(&poi.coord),
205-
)),
206-
(None, Some(street_tag)) => {
207-
if let Ok(addrs) = rubber.get_address(&poi.coord) {
208-
for addr in addrs.into_iter() {
209-
if let Some(address) = addr.address() {
210-
match address {
211-
mimir::Address::Street(_) => continue,
212-
mimir::Address::Addr(ref a) => {
213-
if a.street.name != *street_tag {
214-
continue;
215-
}
216-
}
217-
}
218-
return Some(address);
238+
))),
239+
(None, Some(street_tag)) => get_addr_from_coords(&poi.coord).map(move |addrs| {
240+
addrs
241+
.into_iter()
242+
.find_map(|p| {
243+
let as_address = p.address();
244+
245+
match &as_address {
246+
Some(mimir::Address::Addr(a)) if a.street.name == *street_tag => as_address,
247+
_ => None,
219248
}
220-
}
221-
}
222-
Some(build_new_addr(
223-
"",
224-
street_tag,
225-
poi,
226-
geofinder.get(&poi.coord),
227-
))
228-
}
249+
})
250+
.or_else(|| {
251+
Some(build_new_addr(
252+
"",
253+
street_tag,
254+
poi,
255+
geofinder.get(&poi.coord),
256+
))
257+
})
258+
}),
229259
_ => {
260+
let lazy_es_address = get_addr_from_coords(&poi.coord).map(|places| {
261+
Some(
262+
places
263+
.into_iter()
264+
.next()?
265+
.address()
266+
.expect("`get_address_from_coords` returned a non-address object"),
267+
)
268+
});
269+
230270
if try_skip_reverse {
231-
// Fetch the address already attached to the POI to avoid computing an unnecessary
232-
// reverse.
233-
let changed_coords = |old_coord: mimir::Coord| {
234-
(old_coord.lon() - poi.coord.lon()).abs() > 1e-6
235-
|| (old_coord.lat() - poi.coord.lat()).abs() > 1e-6
236-
};
271+
// Fetch the address already attached to the POI to avoid computing an
272+
// unnecessary reverse.
273+
get_current_addr(poi_index, &poi.id).then(move |current_address| {
274+
let changed_coords = |old_coord: mimir::Coord| {
275+
(old_coord.lon() - poi.coord.lon()).abs() > 1e-6
276+
|| (old_coord.lat() - poi.coord.lat()).abs() > 1e-6
277+
};
237278

238-
match get_current_addr(rubber, poi_index, &poi.id) {
239-
CurPoiAddress::None { coord } if !changed_coords(coord) => return None,
240-
CurPoiAddress::Some { coord, address }
241-
if !is_addr_derived_from_tags(&address) && !changed_coords(coord) =>
242-
{
243-
return Some(*address);
279+
match current_address {
280+
CurPoiAddress::None { coord } if !changed_coords(coord) => {
281+
LazyEs::Value(None)
282+
}
283+
CurPoiAddress::Some { coord, address }
284+
if !is_addr_derived_from_tags(&address) && !changed_coords(coord) =>
285+
{
286+
LazyEs::Value(Some(*address))
287+
}
288+
_ => lazy_es_address,
244289
}
245-
_ => {}
246-
}
247-
}
248-
249-
rubber
250-
.get_address(&poi.coord)
251-
.map_err(|e| warn!("`get_address` returned ES error for {}: {}", poi.id, e))
252-
.ok()
253-
.and_then(|addrs| addrs.into_iter().next())
254-
.map(|addr| {
255-
addr.address()
256-
.expect("`get_address` returned a non-address object")
257290
})
291+
} else {
292+
lazy_es_address
293+
}
258294
}
259295
}
260296
}

0 commit comments

Comments
 (0)