diff --git a/BentoApplication.java b/BentoApplication.java index 3569c4b..616902b 100644 --- a/BentoApplication.java +++ b/BentoApplication.java @@ -5,6 +5,9 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.web.servlet.support.SpringBootServletInitializer; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; @SpringBootApplication(scanBasePackages = {"gov.nih.nci"}) public class BentoApplication extends SpringBootServletInitializer { diff --git a/ServletInitializer.java b/ServletInitializer.java deleted file mode 100644 index 95173c0..0000000 --- a/ServletInitializer.java +++ /dev/null @@ -1,11 +0,0 @@ -package gov.nih.nci.bento; - -import org.springframework.boot.builder.SpringApplicationBuilder; -import org.springframework.boot.web.servlet.support.SpringBootServletInitializer; - -public class ServletInitializer extends SpringBootServletInitializer { - @Override - protected SpringApplicationBuilder configure(SpringApplicationBuilder application) { - return application.sources(BentoApplication.class); - } -} diff --git a/constants/Const.java b/constants/Const.java index da5d9fe..7d71f24 100644 --- a/constants/Const.java +++ b/constants/Const.java @@ -4,7 +4,7 @@ public class Const { public static class ES_UNITS { public static final int DEFAULT_SIZE = 10; - public static final int MAX_SIZE = 10000; + public static final int MAX_SIZE = 200000; public static final String GS_HIGHLIGHT_DELIMITER = "$"; public static final String KEYWORD = ".keyword"; } @@ -18,7 +18,7 @@ public static class ES_PARAMS { public static final String SORT_DIRECTION = "sort_direction"; public static final String CASE_IDS = "case_ids"; - public static final int AGGS_SIZE = 1000; + public static final int AGGS_SIZE = 200000; public static final String INPUT = "input"; public static final String NESTED_FILTER = "FILTER_INFO"; } diff --git a/controller/GraphQLController.java b/controller/GraphQLController.java index 2af29e1..b9fedeb 100644 --- a/controller/GraphQLController.java +++ b/controller/GraphQLController.java @@ -116,7 +116,7 @@ private ResponseEntity getGraphQLResponse(HttpEntity httpEntity, JsonElement rawVar = jsonObject.get("variables"); variables = gson.fromJson(rawVar, Map.class); // Verify that all parameter inputs are less than 1000 values - int maxValues = 1000; + int maxValues = 200000; for (String key: variables.keySet()){ Object valuesObject = variables.get(key); if (!(valuesObject instanceof List)){ diff --git a/model/search/mapper/TypeMapperImpl.java b/model/search/mapper/TypeMapperImpl.java index 1311ba6..63911b6 100644 --- a/model/search/mapper/TypeMapperImpl.java +++ b/model/search/mapper/TypeMapperImpl.java @@ -5,7 +5,7 @@ import org.apache.lucene.search.TotalHits; import org.jetbrains.annotations.NotNull; import org.opensearch.action.search.SearchResponse; -import org.opensearch.common.text.Text; +import org.opensearch.core.common.text.Text; import org.opensearch.search.SearchHit; import org.opensearch.search.aggregations.Aggregation; import org.opensearch.search.aggregations.Aggregations; diff --git a/model/search/yaml/type/GlobalTypeYaml.java b/model/search/yaml/type/GlobalTypeYaml.java index f0351e6..641adec 100644 --- a/model/search/yaml/type/GlobalTypeYaml.java +++ b/model/search/yaml/type/GlobalTypeYaml.java @@ -15,6 +15,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.action.search.SearchRequest; import org.springframework.core.io.ClassPathResource; +import org.yaml.snakeyaml.LoaderOptions; import org.yaml.snakeyaml.Yaml; import org.yaml.snakeyaml.constructor.Constructor; @@ -31,7 +32,8 @@ public class GlobalTypeYaml extends AbstractYamlType { private List readYamlFile(ClassPathResource resource) throws IOException { logger.info(String.format("%s Yaml global file query loading...", accessType.toString())); - Yaml groupYaml = new Yaml(new Constructor(GroupTypeQuery.class)); + LoaderOptions options = new LoaderOptions(); + Yaml groupYaml = new Yaml(new Constructor(GroupTypeQuery.class, options)); GroupTypeQuery groupTypeQuery = groupYaml.load(resource.getInputStream()); return groupTypeQuery.getQueries(); } diff --git a/model/search/yaml/type/GroupTypeYaml.java b/model/search/yaml/type/GroupTypeYaml.java index 82c2ac5..7729f1c 100644 --- a/model/search/yaml/type/GroupTypeYaml.java +++ b/model/search/yaml/type/GroupTypeYaml.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.action.search.SearchRequest; import org.springframework.core.io.ClassPathResource; +import org.yaml.snakeyaml.LoaderOptions; import org.yaml.snakeyaml.Yaml; import org.yaml.snakeyaml.constructor.Constructor; @@ -40,7 +41,8 @@ private void setGlobalRangeFields(GroupTypeQuery groupTypeQuery) { private List readYamlFile(ClassPathResource resource) throws IOException { logger.info(String.format("%s Yaml group file query loading...", accessType.toString())); - Yaml groupYaml = new Yaml(new Constructor(GroupTypeQuery.class)); + LoaderOptions options = new LoaderOptions(); + Yaml groupYaml = new Yaml(new Constructor(GroupTypeQuery.class, options)); GroupTypeQuery groupTypeQuery = groupYaml.load(resource.getInputStream()); setGlobalRangeFields(groupTypeQuery); return groupTypeQuery.getQueries(); diff --git a/model/search/yaml/type/SingleTypeYaml.java b/model/search/yaml/type/SingleTypeYaml.java index eaf1e0e..5b3aa7d 100644 --- a/model/search/yaml/type/SingleTypeYaml.java +++ b/model/search/yaml/type/SingleTypeYaml.java @@ -15,6 +15,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.action.search.SearchRequest; import org.springframework.core.io.ClassPathResource; +import org.yaml.snakeyaml.LoaderOptions; import org.yaml.snakeyaml.Yaml; import org.yaml.snakeyaml.constructor.Constructor; @@ -30,7 +31,8 @@ public class SingleTypeYaml extends AbstractYamlType { private List readYamlFile(ClassPathResource resource) throws IOException { logger.info(String.format("%s Yaml single file query loading...", accessType.toString())); - Yaml yaml = new Yaml(new Constructor(SingleTypeQuery.class)); + LoaderOptions options = new LoaderOptions(); + Yaml yaml = new Yaml(new Constructor(SingleTypeQuery.class, options)); SingleTypeQuery singleTypeQuery = yaml.load(resource.getInputStream()); return singleTypeQuery.getQueries(); } diff --git a/service/ESService.java b/service/ESService.java index eb249af..dbad6c7 100644 --- a/service/ESService.java +++ b/service/ESService.java @@ -1,11 +1,13 @@ package gov.nih.nci.bento.service; import com.google.gson.*; + import gov.nih.nci.bento.model.ConfigurationDAO; import gov.nih.nci.bento.model.search.MultipleRequests; import gov.nih.nci.bento.service.connector.AWSClient; import gov.nih.nci.bento.service.connector.AbstractClient; import gov.nih.nci.bento.service.connector.DefaultClient; + import org.apache.http.util.EntityUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -20,12 +22,18 @@ import java.io.IOException; import java.util.*; +// import java.lang.management.ManagementFactory; +// import java.lang.management.MemoryMXBean; +// import java.lang.management.MemoryUsage; + @Service("ESService") public class ESService { public static final String SCROLL_ENDPOINT = "/_search/scroll"; public static final String JSON_OBJECT = "jsonObject"; public static final String AGGS = "aggs"; - public static final int MAX_ES_SIZE = 10000; + public static final int MAX_ES_SIZE = 200000; // Do not return more than this number of records + public static final int SCROLL_THRESHOLD = 10000; // Use scroll when trying to retrieve past this number of records + public static final int SCROLL_SIZE = 10000; // How big each scroll should be private static final Logger logger = LogManager.getLogger(RedisService.class); private RestClient client; @@ -57,6 +65,32 @@ public JsonObject send(Request request) throws IOException{ return getJSonFromResponse(response); } + // public void checkMemoryInit() { + // // Get the Java Runtime object + // Runtime runtime = Runtime.getRuntime(); + + // // Get the maximum heap size (in bytes) + // long maxMemory = runtime.maxMemory(); + // // Get the initial heap size (in bytes) + // long initialMemory = runtime.totalMemory(); + // // Get the current available memory (in bytes) + // long freeMemory = runtime.freeMemory(); + + // // Convert to MB for better readability + // System.out.println("Initial Heap Size: " + (initialMemory / (1024 * 1024)) + " MB"); + // System.out.println("Maximum Heap Size: " + (maxMemory / (1024 * 1024)) + " MB"); + // System.out.println("Free Memory: " + (freeMemory / (1024 * 1024)) + " MB"); + // } + + // public void checkMemoryNow() { + // // Optionally log the memory usage using MemoryMXBean + // MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + // MemoryUsage heapMemoryUsage = memoryMXBean.getHeapMemoryUsage(); + + // System.out.println("Used Heap Memory: " + (heapMemoryUsage.getUsed() / (1024 * 1024)) + " MB"); + // System.out.println("Committed Heap Memory: " + (heapMemoryUsage.getCommitted() / (1024 * 1024)) + " MB"); + // } + public Map elasticMultiSend(@NotNull List requests) throws IOException { try { MultiSearchRequest multiRequests = new MultiSearchRequest(); @@ -123,7 +157,7 @@ public Map buildListQuery(Map params, Set collectRangeAggs(JsonObject jsonObject, String[] return data; } - - public List collectBucketKeys(JsonArray buckets) { List keys = new ArrayList<>(); for (var bucket: buckets) { @@ -274,7 +306,7 @@ public List collectBucketKeys(JsonArray buckets) { public List collectField(Request request, String fieldName) throws IOException { List results = new ArrayList<>(); - request.addParameter("scroll", "10S"); + request.addParameter("scroll", "1m"); JsonObject jsonObject = send(request); JsonArray searchHits = jsonObject.getAsJsonObject("hits").getAsJsonArray("hits"); @@ -288,7 +320,7 @@ public List collectField(Request request, String fieldName) throws IOExc Request scrollRequest = new Request("POST", SCROLL_ENDPOINT); String scrollId = jsonObject.get("_scroll_id").getAsString(); Map scrollQuery = Map.of( - "scroll", "10S", + "scroll", "1m", "scroll_id", scrollId ); scrollRequest.setJsonEntity(gson.toJson(scrollQuery)); @@ -319,65 +351,112 @@ public List> collectPage(Request request, Map> collectPage(Request request, Map query, String[][] properties, int pageSize, int offset) throws IOException { - // data over limit of Elasticsearch, have to use roll API + // Make sure page size is less than max allowed size if (pageSize > MAX_ES_SIZE) { throw new IOException("Parameter 'first' must not exceeded " + MAX_ES_SIZE); } - if (pageSize + offset > MAX_ES_SIZE) { + + // Check whether to use scroll + if (pageSize + offset > SCROLL_THRESHOLD) { return collectPageWithScroll(request, query, properties, pageSize, offset); } // data within limit can use just from/size query.put("size", pageSize); query.put("from", offset); - request.setJsonEntity(gson.toJson(query)); + String queryJson = gson.toJson(query); + request.setJsonEntity(queryJson); JsonObject jsonObject = send(request); return collectPage(jsonObject, properties, pageSize); } - // offset MUST be multiple of pageSize, otherwise the page won't be complete + /** + * Uses scroll to obtain results + * @param request The Opensearch request + * @param query The query to be sent in the body of the Opensearch request + * @param properties The Opensearch properties to retrieve + * @param pageSize The desired number of results to obtain + * @param offset The desired offset of the results + * @return + * @throws IOException + */ private List> collectPageWithScroll( Request request, Map query, String[][] properties, int pageSize, int offset) throws IOException { - final int optimumSize = ( MAX_ES_SIZE / pageSize ) * pageSize; - if (offset % pageSize != 0) { - throw new IOException("'offset' must be multiple of 'first'!"); - } - query.put("size", optimumSize); - request.setJsonEntity(gson.toJson(query)); - request.addParameter("scroll", "10S"); - JsonObject page = rollToPage(request, offset); - return collectPage(page, properties, pageSize, offset % optimumSize); - } + query.put("size", SCROLL_SIZE); + String jsonizedQuery = gson.toJson(query); + request.setJsonEntity(jsonizedQuery); + request.addParameter("scroll", "1m"); + // JsonObject page = rollToPage(request, pageSize, offset); + // return collectPage(page, properties, pageSize, offset % SCROLL_SIZE); + JsonArray page = rollToPage(request, pageSize, offset); + // checkMemoryInit(); + return collectScrollPage(page, properties, pageSize, offset % SCROLL_SIZE); + } + + /** + * Sends consecutive scroll requests to get the desired number of records + * @param request The Opensearch request + * @param pageSize How many records to obtain + * @param offset How many records to skip + * @return + * @throws IOException + */ + private JsonArray rollToPage(Request request, int pageSize, int offset) throws IOException { + // Variables involved with the return object + JsonArray allHits = new JsonArray(); // All the hits gathered so far + // JsonObject outerHits = new JsonObject(); // Helper JSON object for the results + // JsonObject results = new JsonObject(); // The results to return + + // Variables used for scrolling + Request clearScrollRequest = new Request("DELETE", SCROLL_ENDPOINT); + int numCumulativeHits = 0; // Number of hits gathered so far + String scrollId = null; + Request scrollRequest = request; - private JsonObject rollToPage(Request request, int offset) throws IOException { - int rolledRecords = 0; - JsonObject jsonObject = send(request); - String scrollId = jsonObject.get("_scroll_id").getAsString(); - JsonArray searchHits = jsonObject.getAsJsonObject("hits").getAsJsonArray("hits"); - rolledRecords += searchHits.size(); + // Send scroll requests + while (numCumulativeHits < pageSize + offset) { + logger.info("Current records: " + numCumulativeHits + ". Collecting more records..."); - while (rolledRecords <= offset && searchHits.size() > 0) { - // Keep roll until correct page - logger.info("Current records: " + rolledRecords + " collecting..."); - Request scrollRequest = new Request("POST", SCROLL_ENDPOINT); + // Execute the scroll request + JsonObject scrollResults = send(scrollRequest); + JsonArray searchHits = scrollResults.getAsJsonObject("hits").getAsJsonArray("hits"); + int numScrollHits = searchHits.size(); + numCumulativeHits += numScrollHits; + scrollId = scrollResults.get("_scroll_id").getAsString(); + + logger.info("...collected " + numScrollHits + " records. Current records: " + numCumulativeHits); + + // Stop scrolling if there are no records left + if (numScrollHits <= 0) { + break; + } + + // Only add the hits if we've reached the scroll window of the desired results + if (numCumulativeHits > offset) { + allHits.addAll(searchHits); + } + + // Form the next scroll request + scrollRequest = new Request("POST", SCROLL_ENDPOINT); Map scrollQuery = Map.of( - "scroll", "10S", + "scroll", "1m", "scroll_id", scrollId ); - scrollRequest.setJsonEntity(gson.toJson(scrollQuery)); - jsonObject = send(scrollRequest); - scrollId = jsonObject.get("_scroll_id").getAsString(); - searchHits = jsonObject.getAsJsonObject("hits").getAsJsonArray("hits"); - rolledRecords += searchHits.size(); + String scrollQueryJson = gson.toJson(scrollQuery); + scrollRequest.setJsonEntity(scrollQueryJson); } - // Now return page - scrollId = jsonObject.get("_scroll_id").getAsString(); - Request clearScrollRequest = new Request("DELETE", SCROLL_ENDPOINT); - clearScrollRequest.setJsonEntity("{\"scroll_id\":\"" + scrollId +"\"}"); - send(clearScrollRequest); - return jsonObject; + // Close the scroll context + if (scrollId != null) { + clearScrollRequest.setJsonEntity("{\"scroll_id\":\"" + scrollId +"\"}"); + send(clearScrollRequest); + } + + // Format the return object + // outerHits.add("hits", allHits); + // results.add("hits", outerHits); + return allHits; } // Collect a page of data, result will be of pageSize or less if not enough data remains @@ -423,6 +502,30 @@ public List> collectPage(JsonObject jsonObject, String[][] p return data; } + public List> collectScrollPage(JsonArray searchHits, String[][] properties, int pageSize, int offset) throws IOException { + List> data = new ArrayList<>(); + + //JsonArray searchHits = jsonObject.getAsJsonObject("hits").getAsJsonArray("hits"); + for (int i = 0; i < searchHits.size(); i++) { + // skip offset number of documents + if (i + 1 <= offset) { + continue; + } + Map row = new HashMap<>(); + for (String[] prop: properties) { + String propName = prop[0]; + String dataField = prop[1]; + JsonElement element = searchHits.get(i).getAsJsonObject().get("_source").getAsJsonObject().get(dataField); + row.put(propName, getValue(element)); + } + data.add(row); + if (data.size() >= pageSize) { + break; + } + } + return data; + } + public List> getFilteredGroupCount(Map params, String endpoint, String aggregationField) throws IOException { return getFilteredGroupCount(params, endpoint, new String[]{aggregationField}); }