Skip to content

Commit

Permalink
META-4232 self-cyclic relation count handled & atlas UI fix
Browse files Browse the repository at this point in the history
  • Loading branch information
BurakAltas committed Feb 17, 2023
1 parent 2fa5dc9 commit 814af91
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 17 deletions.
4 changes: 2 additions & 2 deletions dashboardv2/public/js/views/graph/LineageLayoutView.js
Original file line number Diff line number Diff line change
Expand Up @@ -534,10 +534,10 @@ define(['require',
inputLimit = relationCount.inputRelationCount;
outputLimit = relationCount.outputRelationCount + Globals.lineageNodeCount;
}
this.lineageOnDemandPayload[parentId] = { direction: "BOTH", inputRelationsLimit: inputLimit, outputRelationsLimit: outputLimit, depth: Globals.lineageDepth };
this.lineageOnDemandPayload["constraints"][parentId] = { direction: "BOTH", inputRelationsLimit: inputLimit, outputRelationsLimit: outputLimit, depth: Globals.lineageDepth };
}
var queryParameter = {}
queryParameter["constraints"] = this.lineageOnDemandPayload;
queryParameter["constraints"] = this.lineageOnDemandPayload["constraints"];
this.fetchGraphData({ queryParam: queryParameter, 'legends': false });
},
validateInputOutputLimit: function(parentId, btnType) {
Expand Down
4 changes: 2 additions & 2 deletions dashboardv3/public/js/views/graph/LineageLayoutView.js
Original file line number Diff line number Diff line change
Expand Up @@ -534,10 +534,10 @@ define(['require',
inputLimit = relationCount.inputRelationCount;
outputLimit = relationCount.outputRelationCount + Globals.lineageNodeCount;
}
this.lineageOnDemandPayload[parentId] = { direction: "BOTH", inputRelationsLimit: inputLimit, outputRelationsLimit: outputLimit, depth: Globals.lineageDepth };
this.lineageOnDemandPayload["constraints"][parentId] = { direction: "BOTH", inputRelationsLimit: inputLimit, outputRelationsLimit: outputLimit, depth: Globals.lineageDepth };
}
var queryParameter = {}
queryParameter["constraints"] = this.lineageOnDemandPayload;
queryParameter["constraints"] = this.lineageOnDemandPayload["constraints"];
this.fetchGraphData({ queryParam: queryParameter, 'legends': false });
},
validateInputOutputLimit: function(parentId, btnType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,11 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag


if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) {
traverseEdgesOnDemand(datasetVertex, true, depth, new HashSet<>(), atlasLineageOnDemandContext, ret);
traverseEdgesOnDemand(datasetVertex, true, depth, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, AtlasLineageOnDemandInfo.LineageDirection.INPUT);
}

if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) {
traverseEdgesOnDemand(datasetVertex, false, depth, new HashSet<>(), atlasLineageOnDemandContext, ret);
traverseEdgesOnDemand(datasetVertex, false, depth, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, AtlasLineageOnDemandInfo.LineageDirection.OUTPUT);
}
} else {
AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid);
Expand All @@ -311,13 +311,13 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag
if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) {
Iterable<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE);

traverseEdgesOnDemand(processEdges, true, depth, atlasLineageOnDemandContext, ret, processVertex);
traverseEdgesOnDemand(processEdges, true, depth, atlasLineageOnDemandContext, ret, processVertex, guid, AtlasLineageOnDemandInfo.LineageDirection.INPUT);
}

if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) {
Iterable<AtlasEdge> processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE);

traverseEdgesOnDemand(processEdges, false, depth, atlasLineageOnDemandContext, ret, processVertex);
traverseEdgesOnDemand(processEdges, false, depth, atlasLineageOnDemandContext, ret, processVertex, guid, AtlasLineageOnDemandInfo.LineageDirection.OUTPUT);
}

}
Expand All @@ -326,15 +326,15 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag
return ret;
}

private void traverseEdgesOnDemand(Iterable<AtlasEdge> processEdges, boolean isInput, int depth, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex) throws AtlasBaseException {
private void traverseEdgesOnDemand(Iterable<AtlasEdge> processEdges, boolean isInput, int depth, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtlasLineageOnDemandInfo.LineageDirection direction) throws AtlasBaseException {
for (AtlasEdge processEdge : processEdges) {
boolean isInputEdge = processEdge.getLabel().equalsIgnoreCase(PROCESS_INPUTS_EDGE);

if (checkForOffset(processEdge, processVertex, atlasLineageOnDemandContext, ret)) {
continue;
}

if (incrementAndCheckIfRelationsLimitReached(processEdge, isInputEdge, atlasLineageOnDemandContext, ret, depth)) {
if (incrementAndCheckIfRelationsLimitReached(processEdge, isInputEdge, atlasLineageOnDemandContext, ret, depth, baseGuid, direction)) {
break;
} else {
addEdgeToResult(processEdge, ret, atlasLineageOnDemandContext);
Expand All @@ -349,11 +349,11 @@ private void traverseEdgesOnDemand(Iterable<AtlasEdge> processEdges, boolean isI
ret.getRelationsOnDemand().put(inGuid, new LineageInfoOnDemand(inGuidLineageConstrains));
}

traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, new HashSet<>(), atlasLineageOnDemandContext, ret);
traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, direction);
}
}

private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, Set<String> visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret) throws AtlasBaseException {
private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, Set<String> visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtlasLineageOnDemandInfo.LineageDirection direction) throws AtlasBaseException {
if (depth != 0) { // base condition of recursion for depth
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("traverseEdgesOnDemand");

Expand All @@ -375,7 +375,7 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i
continue;
}

if (incrementAndCheckIfRelationsLimitReached(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth)) {
if (incrementAndCheckIfRelationsLimitReached(incomingEdge, !isInput, atlasLineageOnDemandContext, ret, depth, baseGuid, direction)) {
break;
} else {
addEdgeToResult(incomingEdge, ret, atlasLineageOnDemandContext);
Expand All @@ -396,14 +396,14 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i
continue;
}

if (incrementAndCheckIfRelationsLimitReached(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth)) {
if (incrementAndCheckIfRelationsLimitReached(outgoingEdge, isInput, atlasLineageOnDemandContext, ret, depth, baseGuid, direction)) {
break;
} else {
addEdgeToResult(outgoingEdge, ret, atlasLineageOnDemandContext);
}

if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) {
traverseEdgesOnDemand(entityVertex, isInput, depth - 1, visitedVertices, atlasLineageOnDemandContext, ret); // execute inner depth
traverseEdgesOnDemand(entityVertex, isInput, depth - 1, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, direction); // execute inner depth
}
}
}
Expand Down Expand Up @@ -431,7 +431,7 @@ private static String getId(AtlasVertex vertex) {
return vertex.getIdForDisplay();
}

private boolean incrementAndCheckIfRelationsLimitReached(AtlasEdge atlasEdge, boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, int depth) {
private boolean incrementAndCheckIfRelationsLimitReached(AtlasEdge atlasEdge, boolean isInput, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, int depth, String baseGuid, AtlasLineageOnDemandInfo.LineageDirection direction) {
AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("incrementAndCheckIfRelationsLimitReached");

if (lineageContainsVisitedEdgeV2(ret, atlasEdge)) {
Expand Down Expand Up @@ -466,7 +466,9 @@ private boolean incrementAndCheckIfRelationsLimitReached(AtlasEdge atlasEdge, bo
outLineageInfo.setHasMoreOutputs(true);
hasRelationsLimitReached = true;
} else {
outLineageInfo.incrementOutputRelationsCount();
if (! (direction.equals(AtlasLineageOnDemandInfo.LineageDirection.INPUT) && outGuid.equals(baseGuid))) {
outLineageInfo.incrementOutputRelationsCount();
}
}

// Handle horizontal pagination
Expand Down

0 comments on commit 814af91

Please sign in to comment.