Skip to content

Commit

Permalink
Merge pull request #3908 from atlanhq/plt-2973-divert-request
Browse files Browse the repository at this point in the history
PLT-2979 : (feat)add request isolation from client level in indexsearch
  • Loading branch information
sumandas0 authored Mar 4, 2025
2 parents 40c3f0b + 729a9a3 commit 3f6b2d4
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public class AtlasElasticsearchDatabase {

private static volatile RestHighLevelClient searchClient;
private static volatile RestClient lowLevelClient;

private static volatile RestClient esUiClusterClient;
private static volatile RestClient esNonUiClusterClient;
public static final String INDEX_BACKEND_CONF = "atlas.graph.index.search.hostname";

public static List<HttpHost> getHttpHosts() throws AtlasException {
Expand Down Expand Up @@ -101,4 +104,64 @@ public static RestClient getLowLevelClient() {
}
return lowLevelClient;
}

public static RestClient getUiClusterClient() {
if (!AtlasConfiguration.ATLAS_INDEXSEARCH_ENABLE_REQUEST_ISOLATION.getBoolean()) {
return null;
}

if (esUiClusterClient == null) {
synchronized (AtlasElasticsearchDatabase.class) {
if (esUiClusterClient == null) {
try {
HttpHost UiHost = HttpHost.create(AtlasConfiguration.ATLAS_ELASTICSEARCH_UI_SEARCH_CLUSTER_URL.getString());

RestClientBuilder builder = RestClient.builder(UiHost);
builder.setHttpClientConfigCallback(httpAsyncClientBuilder ->
httpAsyncClientBuilder.setKeepAliveStrategy(((httpResponse, httpContext) -> 3600000)));
builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
.setConnectTimeout(AtlasConfiguration.INDEX_CLIENT_CONNECTION_TIMEOUT.getInt())
.setSocketTimeout(AtlasConfiguration.INDEX_CLIENT_SOCKET_TIMEOUT.getInt()));

esUiClusterClient = builder.build();
} catch (Exception e) {
LOG.error("Failed to initialize UI cluster client", e);
return null;
}
}
}
}
return esUiClusterClient;
}

public static RestClient getNonUiClusterClient() {
if (!AtlasConfiguration.ATLAS_INDEXSEARCH_ENABLE_REQUEST_ISOLATION.getBoolean()) {
return null;
}

if (esNonUiClusterClient == null) {
synchronized (AtlasElasticsearchDatabase.class) {
if (esNonUiClusterClient == null) {
try {
HttpHost nonUiHost = HttpHost.create(AtlasConfiguration.ATLAS_ELASTICSEARCH_NON_UI_SEARCH_CLUSTER_URL.getString());

RestClientBuilder builder = RestClient.builder(nonUiHost);
builder.setHttpClientConfigCallback(httpAsyncClientBuilder ->
httpAsyncClientBuilder.setKeepAliveStrategy(((httpResponse, httpContext) -> 3600000)));
builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
.setConnectTimeout(AtlasConfiguration.INDEX_CLIENT_CONNECTION_TIMEOUT.getInt())
.setSocketTimeout(AtlasConfiguration.INDEX_CLIENT_SOCKET_TIMEOUT.getInt()));

esNonUiClusterClient = builder.build();
} catch (Exception e) {
LOG.error("Failed to initialize Non-Ui cluster client", e);
return null;
}
}
}
}
return esNonUiClusterClient;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,14 @@
public class AtlasElasticsearchQuery implements AtlasIndexQuery<AtlasJanusVertex, AtlasJanusEdge> {
private static final Logger LOG = LoggerFactory.getLogger(AtlasElasticsearchQuery.class);

private static final String CLIENT_ORIGIN_PRODUCT = "product_webapp";

private AtlasJanusGraph graph;
private RestHighLevelClient esClient;
private RestClient lowLevelRestClient;

private RestClient esUiClusterClient;
private RestClient esNonUiClusterClient;
private String index;
private SearchSourceBuilder sourceBuilder;
private SearchResponse searchResponse;
Expand All @@ -80,9 +85,11 @@ public AtlasElasticsearchQuery(AtlasJanusGraph graph, RestHighLevelClient esClie
this.sourceBuilder = sourceBuilder;
}

public AtlasElasticsearchQuery(AtlasJanusGraph graph, RestClient restClient, String index, SearchParams searchParams) {
public AtlasElasticsearchQuery(AtlasJanusGraph graph, RestClient restClient, String index, SearchParams searchParams, RestClient esUiClusterClient, RestClient esNonUiClusterClient) {
this(graph, index);
this.lowLevelRestClient = restClient;
this.esUiClusterClient = esUiClusterClient;
this.esNonUiClusterClient = esNonUiClusterClient;
this.searchParams = searchParams;
}

Expand All @@ -92,8 +99,8 @@ private AtlasElasticsearchQuery(AtlasJanusGraph graph, String index) {
searchResponse = null;
}

public AtlasElasticsearchQuery(AtlasJanusGraph graph, String index, RestClient restClient) {
this(graph, restClient, index, null);
public AtlasElasticsearchQuery(AtlasJanusGraph graph, String index, RestClient restClient, RestClient esUiClusterClient, RestClient esNonUiClusterClient) {
this(graph, restClient, index, null, esUiClusterClient, esNonUiClusterClient);
}

public AtlasElasticsearchQuery(String index, RestClient restClient) {
Expand All @@ -107,6 +114,35 @@ private SearchRequest getSearchRequest(String index, SearchSourceBuilder sourceB
return searchRequest;
}

/**
* Returns the appropriate Elasticsearch RestClient based on client origin and configuration settings if isolation is enabled.
*
* @return RestClient configured for either UI or Non-UI cluster, falling back to low-level client
*/
private RestClient getESClient() {
if (!AtlasConfiguration.ATLAS_INDEXSEARCH_ENABLE_REQUEST_ISOLATION.getBoolean()) {
return lowLevelRestClient;
}

try {
String clientOrigin = RequestContext.get().getClientOrigin();
if (clientOrigin == null) {
return lowLevelRestClient;
}
if (CLIENT_ORIGIN_PRODUCT.equals(clientOrigin)) {
return Optional.ofNullable(esUiClusterClient)
.orElse(lowLevelRestClient);
} else {
return Optional.ofNullable(esNonUiClusterClient)
.orElse(lowLevelRestClient);
}

} catch (Exception e) {
LOG.error("Error determining ES client, falling back to low-level client", e);
return lowLevelRestClient;
}
}

private Iterator<Result<AtlasJanusVertex, AtlasJanusEdge>> runQuery(SearchRequest searchRequest) {
Iterator<Result<AtlasJanusVertex, AtlasJanusEdge>> result = null;

Expand Down Expand Up @@ -134,7 +170,7 @@ private DirectIndexQueryResult runQueryWithLowLevelClient(SearchParams searchPar
try {
if(searchParams.isCallAsync() || AtlasConfiguration.ENABLE_ASYNC_INDEXSEARCH.getBoolean()) {
return performAsyncDirectIndexQuery(searchParams);
} else{
} else {
String responseString = performDirectIndexQuery(searchParams.getQuery(), false);
if (LOG.isDebugEnabled()) {
LOG.debug("runQueryWithLowLevelClient.response : {}", responseString);
Expand Down Expand Up @@ -350,7 +386,7 @@ public void onFailure(Exception exception) {
}
};

lowLevelRestClient.performRequestAsync(request, responseListener);
getESClient().performRequestAsync(request, responseListener);

return future;
}
Expand All @@ -372,7 +408,7 @@ public void onFailure(Exception exception) {
}
}
};
lowLevelRestClient.performRequestAsync(request, responseListener);
getESClient().performRequestAsync(request, responseListener);
}

private Future<AsyncQueryResult> submitAsyncSearch(SearchParams searchParams, String KeepAliveTime, boolean source) {
Expand Down Expand Up @@ -424,7 +460,7 @@ public void onFailure(Exception exception) {
}
};

lowLevelRestClient.performRequestAsync(request, responseListener);
getESClient().performRequestAsync(request, responseListener);

return future;
}
Expand All @@ -444,7 +480,7 @@ private String performDirectIndexQuery(String query, boolean source) throws Atla

Response response;
try {
response = lowLevelRestClient.performRequest(request);
response = getESClient().performRequest(request);
} catch (ResponseException rex) {
if (rex.getResponse().getStatusLine().getStatusCode() == 404) {
LOG.warn(String.format("ES index with name %s not found", index));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getLowLevelClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getUiClusterClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getNonUiClusterClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasJanusGraphDatabase.getGraphInstance;
import static org.apache.atlas.type.Constants.STATE_PROPERTY_KEY;

Expand All @@ -120,6 +122,9 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE
private final RestHighLevelClient elasticsearchClient;
private final RestClient restClient;

private final RestClient esUiClusterClient;
private final RestClient esNonUiClusterClient;

private final ThreadLocal<GremlinGroovyScriptEngine> scriptEngine = ThreadLocal.withInitial(() -> {
DefaultImportCustomizer.Builder builder = DefaultImportCustomizer.build()
.addClassImports(java.util.function.Function.class)
Expand All @@ -129,10 +134,10 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE
});

public AtlasJanusGraph() {
this(getGraphInstance(), getClient(), getLowLevelClient());
this(getGraphInstance(), getClient(), getLowLevelClient(), getUiClusterClient(), getNonUiClusterClient());
}

public AtlasJanusGraph(JanusGraph graphInstance, RestHighLevelClient elasticsearchClient, RestClient restClient) {
public AtlasJanusGraph(JanusGraph graphInstance, RestHighLevelClient elasticsearchClient, RestClient restClient, RestClient esUiClusterClient, RestClient esNonUiClusterClient) {
//determine multi-properties once at startup
JanusGraphManagement mgmt = null;

Expand All @@ -155,6 +160,8 @@ public AtlasJanusGraph(JanusGraph graphInstance, RestHighLevelClient elasticsear
janusGraph = (StandardJanusGraph) graphInstance;
this.restClient = restClient;
this.elasticsearchClient = elasticsearchClient;
this.esUiClusterClient = esUiClusterClient;
this.esNonUiClusterClient = esNonUiClusterClient;
}

@Override
Expand Down Expand Up @@ -326,7 +333,7 @@ public AtlasIndexQuery<AtlasJanusVertex, AtlasJanusEdge> elasticsearchQuery(Stri
LOG.error("restClient is not initiated, failed to run query on ES");
throw new AtlasBaseException(INDEX_SEARCH_FAILED, "restClient is not initiated");
}
return new AtlasElasticsearchQuery(this, restClient, INDEX_PREFIX + indexName, searchParams);
return new AtlasElasticsearchQuery(this, restClient, INDEX_PREFIX + indexName, searchParams, esUiClusterClient, esNonUiClusterClient);
}

@Override
Expand Down Expand Up @@ -382,7 +389,7 @@ public AtlasIndexQuery elasticsearchQuery(String indexName) throws AtlasBaseExce
LOG.error("restClient is not initiated, failed to run query on ES");
throw new AtlasBaseException(INDEX_SEARCH_FAILED, "restClient is not initiated");
}
return new AtlasElasticsearchQuery(this, indexName, restClient);
return new AtlasElasticsearchQuery(this, indexName, restClient, esUiClusterClient, esNonUiClusterClient);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@

import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getLowLevelClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getUiClusterClient;
import static org.apache.atlas.repository.graphdb.janus.AtlasElasticsearchDatabase.getNonUiClusterClient;


import static org.apache.atlas.ApplicationProperties.DEFAULT_INDEX_RECOVERY;
import static org.apache.atlas.ApplicationProperties.INDEX_RECOVERY_CONF;
Expand Down Expand Up @@ -354,7 +357,7 @@ public AtlasGraph<AtlasJanusVertex, AtlasJanusEdge> getGraph() {

@Override
public AtlasGraph<AtlasJanusVertex, AtlasJanusEdge> getGraphBulkLoading() {
return new AtlasJanusGraph(getBulkLoadingGraphInstance(), getClient(), getLowLevelClient());
return new AtlasJanusGraph(getBulkLoadingGraphInstance(), getClient(), getLowLevelClient(), getUiClusterClient(), getNonUiClusterClient());
}

private static void startEmbeddedSolr() throws AtlasException {
Expand Down
3 changes: 3 additions & 0 deletions intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ public enum AtlasConfiguration {
ENABLE_ASYNC_INDEXSEARCH("atlas.indexsearch.async.enable", false),
ATLAS_INDEXSEARCH_QUERY_SIZE_MAX_LIMIT("atlas.indexsearch.query.size.max.limit", 100000),
ATLAS_INDEXSEARCH_LIMIT_UTM_TAGS("atlas.indexsearch.limit.ignore.utm.tags", ""),
ATLAS_INDEXSEARCH_ENABLE_REQUEST_ISOLATION("atlas.indexsearch.request.isolation.enable", false),
ATLAS_ELASTICSEARCH_UI_SEARCH_CLUSTER_URL("atlas.index.elasticsearch.ui.cluster.url","atlas-elasticsearch2-ui-search.atlas.svc.cluster.local:9200"),
ATLAS_ELASTICSEARCH_NON_UI_SEARCH_CLUSTER_URL("atlas.index.elasticsearch.nonui.cluster.url","atlas-elasticsearch2-non-ui-search.atlas.svc.cluster.local:9200"),
ATLAS_INDEXSEARCH_ENABLE_API_LIMIT("atlas.indexsearch.enable.api.limit", false),
ATLAS_INDEXSEARCH_ENABLE_JANUS_OPTIMISATION("atlas.indexsearch.enable.janus.optimization", false),
ATLAS_MAINTENANCE_MODE("atlas.maintenance.mode", false),
Expand Down

0 comments on commit 3f6b2d4

Please sign in to comment.