Skip to content

Commit

Permalink
Merge pull request #4352 from atlanhq/cassandrapoliciesoptimisation
Browse files Browse the repository at this point in the history
MLH-73 | Cassandra optimisation. Optimise edgeLabel fetching only to root level vertices
  • Loading branch information
sriram-atlan authored Mar 6, 2025
2 parents 9b9f364 + f63491c commit a1d977e
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 38 deletions.
1 change: 1 addition & 0 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ on:
- development
- master
- lineageondemand
- cassandrapoliciesoptimisation

jobs:
build:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public enum AtlasConfiguration {
ATLAS_ELASTICSEARCH_NON_UI_SEARCH_CLUSTER_URL("atlas.index.elasticsearch.nonui.cluster.url","atlas-elasticsearch2-non-ui-search.atlas.svc.cluster.local:9200"),
ATLAS_INDEXSEARCH_ENABLE_API_LIMIT("atlas.indexsearch.enable.api.limit", false),
ATLAS_INDEXSEARCH_ENABLE_JANUS_OPTIMISATION("atlas.indexsearch.enable.janus.optimization", false),
ATLAS_INDEXSEARCH_ENABLE_JANUS_OPTIMISATION_FOR_RELATIONS("atlas.indexsearch.enable.janus.optimization.for.relationship", false),
ATLAS_MAINTENANCE_MODE("atlas.maintenance.mode", false),
DELTA_BASED_REFRESH_ENABLED("atlas.authorizer.enable.delta_based_refresh", false),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2065,30 +2065,6 @@ private static Set<String> parseLabelsString(String labels) {

return ret;
}

/**
* Get all the active edges
* @param vertex entity vertex
* @return Iterator of children edges
*/
public static Iterator<AtlasJanusEdge> getOnlyActiveEdges(AtlasVertex vertex, AtlasEdgeDirection direction) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("GraphHelper.getOnlyActiveEdges");

try {
return vertex.query()
.direction(direction)
.has(STATE_PROPERTY_KEY, ACTIVE_STATE_VALUE)
.edges()
.iterator();
} catch (Exception e) {
LOG.error("Error while getting active edges of vertex", e);
throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, e);
}
finally {
RequestContext.get().endMetricRecord(metricRecorder);
}
}

public Set<AbstractMap.SimpleEntry<String,String>> retrieveEdgeLabelsAndTypeName(AtlasVertex vertex) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("GraphHelper.retrieveEdgeLabelsAndTypeName");

Expand All @@ -2109,7 +2085,7 @@ public Set<AbstractMap.SimpleEntry<String,String>> retrieveEdgeLabelsAndTypeName

return new AbstractMap.SimpleEntry<>(labelStr, typeNameStr);
})
.filter(entry -> !entry.getKey().isEmpty() && !entry.getValue().isEmpty())
.filter(entry -> !entry.getKey().isEmpty())
.distinct()
.collect(Collectors.toSet());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,26 +285,36 @@ public AtlasObjectId toAtlasObjectId(AtlasVertex entityVertex) throws AtlasBaseE
AtlasObjectId ret = null;
String typeName = entityVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class);
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);

boolean enableJanusOptimisation = AtlasConfiguration.ATLAS_INDEXSEARCH_ENABLE_JANUS_OPTIMISATION_FOR_RELATIONS.getBoolean();
Map<String, Object> referenceVertexProperties = null;
if (entityType != null) {
Map<String, Object> uniqueAttributes = new HashMap<>();

Set<String> relationAttributes = RequestContext.get().getRelationAttrsForSearch();
if (enableJanusOptimisation) {
//don't fetch edge labels for a relation attribute
referenceVertexProperties = preloadProperties(entityVertex, entityType, relationAttributes, false);
}
for (AtlasAttribute attribute : entityType.getUniqAttributes().values()) {
Object attrValue = getVertexAttribute(entityVertex, attribute);
Object attrValue = getVertexAttributePreFetchCache(entityVertex, attribute, referenceVertexProperties);

if (attrValue != null) {
uniqueAttributes.put(attribute.getName(), attrValue);
}
}

Map<String, Object> attributes = new HashMap<>();
Set<String> relationAttributes = RequestContext.get().getRelationAttrsForSearch();
if (CollectionUtils.isNotEmpty(relationAttributes)) {
for (String attributeName : relationAttributes) {
AtlasAttribute attribute = entityType.getAttribute(attributeName);
if (attribute != null
&& !uniqueAttributes.containsKey(attributeName)) {
Object attrValue = getVertexAttribute(entityVertex, attribute);
Object attrValue = null;
if (enableJanusOptimisation) {
attrValue = getVertexAttributePreFetchCache(entityVertex, attribute, referenceVertexProperties);
} else {
attrValue = getVertexAttribute(entityVertex, attribute);
}

if (attrValue != null) {
attributes.put(attribute.getName(), attrValue);
}
Expand Down Expand Up @@ -1002,7 +1012,7 @@ private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex)
return mapVertexToAtlasEntityHeader(entityVertex, Collections.<String>emptySet());
}

private Map<String, Object> preloadProperties(AtlasVertex entityVertex, AtlasEntityType entityType, Set<String> attributes) throws AtlasBaseException {
private Map<String, Object> preloadProperties(AtlasVertex entityVertex, AtlasEntityType entityType, Set<String> attributes, boolean fetchEdgeLabels) throws AtlasBaseException {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("preloadProperties");

try {
Expand All @@ -1014,11 +1024,14 @@ private Map<String, Object> preloadProperties(AtlasVertex entityVertex, AtlasEnt
// Execute the traversal to fetch properties
Iterator<VertexProperty<Object>> traversal = ((AtlasJanusVertex)entityVertex).getWrappedElement().properties();

// retrieve all the valid relationships for this entityType
Map<String, Set<String>> relationshipsLookup = fetchEdgeNames(entityType);

// Fetch edges in both directions
retrieveEdgeLabels(entityVertex, attributes, relationshipsLookup, propertiesMap);
// if the vertex in scope is root then call below otherwise skip
// we don't support relation attributes of a relation
if (fetchEdgeLabels) {
// retrieve all the valid relationships for this entityType
Map<String, Set<String>> relationshipsLookup = fetchEdgeNames(entityType);
retrieveEdgeLabels(entityVertex, attributes, relationshipsLookup, propertiesMap);
}

// Iterate through the resulting VertexProperty objects
while (traversal.hasNext()) {
Expand Down Expand Up @@ -1132,12 +1145,25 @@ private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex,
&& typeName.equals("S3Bucket");

boolean shouldPrefetch = RequestContext.get().isInvokedByIndexSearch()
&& !isPolicyAttribute(attributes)
&& !isS3Bucket
&& AtlasConfiguration.ATLAS_INDEXSEARCH_ENABLE_JANUS_OPTIMISATION.getBoolean();

// remove isPolicyAttribute from shouldPrefetch check
// prefetch properties for policies
// if there is some exception in fetching properties,
// then we will fetch properties again without prefetch

if (shouldPrefetch) {
return mapVertexToAtlasEntityHeaderWithPrefetch(entityVertex, attributes);
try {
return mapVertexToAtlasEntityHeaderWithPrefetch(entityVertex, attributes);
} catch (AtlasBaseException e) {
if (isPolicyAttribute(attributes)) {
RequestContext.get().endMetricRecord(RequestContext.get().startMetricRecord("policiesPrefetchFailed"));
LOG.error("Error fetching properties for entity vertex: {}. Retrying without prefetch", entityVertex.getId(), e);
return mapVertexToAtlasEntityHeaderWithoutPrefetch(entityVertex, attributes);
}
throw e;
}
} else {
return mapVertexToAtlasEntityHeaderWithoutPrefetch(entityVertex, attributes);
}
Expand Down Expand Up @@ -1235,7 +1261,7 @@ private AtlasEntityHeader mapVertexToAtlasEntityHeaderWithPrefetch(AtlasVertex e
//pre-fetching the properties
String typeName = entityVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class); //properties.get returns null
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); // this is not costly
Map<String, Object> properties = preloadProperties(entityVertex, entityType, attributes);
Map<String, Object> properties = preloadProperties(entityVertex, entityType, attributes, true);

String guid = (String) properties.get(Constants.GUID_PROPERTY_KEY);

Expand Down

0 comments on commit a1d977e

Please sign in to comment.