Skip to content

Commit 5d1b4e4

Browse files
Handle path finding algos
1 parent 8dad799 commit 5d1b4e4

File tree

6 files changed

+284
-307
lines changed

6 files changed

+284
-307
lines changed

applications/algorithms/machinery/src/main/java/org/neo4j/gds/applications/algorithms/machinery/Neo4jDatabaseRelationshipWriter.java

Lines changed: 72 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,28 @@
1919
*/
2020
package org.neo4j.gds.applications.algorithms.machinery;
2121

22+
import org.neo4j.gds.api.ExportedRelationship;
2223
import org.neo4j.gds.api.Graph;
2324
import org.neo4j.gds.api.IdMap;
2425
import org.neo4j.gds.api.ResultStore;
26+
import org.neo4j.gds.api.nodeproperties.ValueType;
2527
import org.neo4j.gds.api.properties.relationships.RelationshipWithPropertyConsumer;
2628
import org.neo4j.gds.applications.algorithms.metadata.RelationshipsWritten;
29+
import org.neo4j.gds.core.concurrency.Concurrency;
2730
import org.neo4j.gds.core.utils.logging.LoggerForProgressTrackingAdapter;
2831
import org.neo4j.gds.core.utils.progress.JobId;
2932
import org.neo4j.gds.core.utils.progress.TaskRegistryFactory;
3033
import org.neo4j.gds.core.utils.progress.tasks.TaskProgressTracker;
3134
import org.neo4j.gds.core.write.RelationshipExporter;
3235
import org.neo4j.gds.core.write.RelationshipExporterBuilder;
36+
import org.neo4j.gds.core.write.RelationshipStreamExporter;
37+
import org.neo4j.gds.core.write.RelationshipStreamExporterBuilder;
3338
import org.neo4j.gds.logging.Log;
3439
import org.neo4j.gds.termination.TerminationFlag;
3540

41+
import java.util.List;
3642
import java.util.Optional;
43+
import java.util.stream.Stream;
3744

3845
final class Neo4jDatabaseRelationshipWriter {
3946

@@ -50,23 +57,23 @@ static RelationshipsWritten writeRelationshipsFromGraph(
5057
Optional<ResultStore> resultStore,
5158
RelationshipWithPropertyConsumer relationshipConsumer,
5259
JobId jobId
53-
){
60+
) {
5461

5562
var progressTracker = new TaskProgressTracker(
56-
RelationshipExporter.baseTask(taskName, graph.relationshipCount()),
57-
new LoggerForProgressTrackingAdapter(log),
58-
RelationshipExporterBuilder.TYPED_DEFAULT_WRITE_CONCURRENCY,
59-
taskRegistryFactory
60-
);
63+
RelationshipExporter.baseTask(taskName, graph.relationshipCount()),
64+
new LoggerForProgressTrackingAdapter(log),
65+
RelationshipExporterBuilder.TYPED_DEFAULT_WRITE_CONCURRENCY,
66+
taskRegistryFactory
67+
);
6168

62-
var exporter = relationshipExporterBuilder
63-
.withIdMappingOperator(rootIdMap::toOriginalNodeId)
64-
.withGraph(graph)
65-
.withTerminationFlag(algorithmTerminationFlag)
66-
.withProgressTracker(progressTracker)
67-
.withResultStore(resultStore)
68-
.withJobId(jobId)
69-
.build();
69+
var exporter = relationshipExporterBuilder
70+
.withIdMappingOperator(rootIdMap::toOriginalNodeId)
71+
.withGraph(graph)
72+
.withTerminationFlag(algorithmTerminationFlag)
73+
.withProgressTracker(progressTracker)
74+
.withResultStore(resultStore)
75+
.withJobId(jobId)
76+
.build();
7077

7178
try {
7279
exporter.write(
@@ -84,5 +91,56 @@ static RelationshipsWritten writeRelationshipsFromGraph(
8491
return new RelationshipsWritten(graph.relationshipCount());
8592
}
8693

94+
static RelationshipsWritten writeRelationshipsFromStream(
95+
String writeRelationshipType,
96+
List<String> properties,
97+
List<ValueType> valueTypes,
98+
TaskRegistryFactory taskRegistryFactory,
99+
RelationshipStreamExporterBuilder relationshipStreamExporterBuilder,
100+
Stream<ExportedRelationship> relationshipStream,
101+
IdMap rootIdMap,
102+
Log log,
103+
String taskName,
104+
TerminationFlag algorithmTerminationFlag,
105+
Optional<ResultStore> maybeResultStore,
106+
JobId jobId
107+
) {
108+
109+
var progressTracker = new TaskProgressTracker(
110+
RelationshipStreamExporter.baseTask(taskName),
111+
new LoggerForProgressTrackingAdapter(log),
112+
new Concurrency(1),
113+
taskRegistryFactory
114+
);
115+
116+
// When we are writing to the result store, the result stream might not be consumed
117+
// inside the current transaction. This causes the stream to immediately return an empty stream
118+
// as the termination flag, which is bound to the current transaction is set to true. We therefore
119+
// need to collect the stream and trigger an eager computation.
120+
var maybeCollectedStream = maybeResultStore
121+
.map(__ -> relationshipStream.toList().stream())
122+
.orElse(relationshipStream);
123+
124+
// configure the exporter
125+
var relationshipStreamExporter = relationshipStreamExporterBuilder
126+
.withResultStore(maybeResultStore)
127+
.withIdMappingOperator(rootIdMap::toOriginalNodeId)
128+
.withProgressTracker(progressTracker)
129+
.withRelationships(maybeCollectedStream)
130+
.withTerminationFlag(algorithmTerminationFlag)
131+
.withJobId(jobId)
132+
.build();
133+
134+
var relationshipsWritten = relationshipStreamExporter.write(
135+
writeRelationshipType,
136+
properties,
137+
valueTypes
138+
);
139+
140+
return new RelationshipsWritten(relationshipsWritten);
141+
}
142+
143+
87144
private Neo4jDatabaseRelationshipWriter() {}
88145
}
146+

applications/algorithms/machinery/src/main/java/org/neo4j/gds/applications/algorithms/machinery/WriteRelationshipService.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,19 @@
1919
*/
2020
package org.neo4j.gds.applications.algorithms.machinery;
2121

22+
import org.neo4j.gds.api.ExportedRelationship;
2223
import org.neo4j.gds.api.Graph;
2324
import org.neo4j.gds.api.IdMap;
2425
import org.neo4j.gds.api.ResultStore;
26+
import org.neo4j.gds.api.nodeproperties.ValueType;
2527
import org.neo4j.gds.api.properties.relationships.RelationshipWithPropertyConsumer;
2628
import org.neo4j.gds.applications.algorithms.metadata.RelationshipsWritten;
2729
import org.neo4j.gds.core.utils.progress.JobId;
2830
import org.neo4j.gds.logging.Log;
2931

32+
import java.util.List;
3033
import java.util.Optional;
34+
import java.util.stream.Stream;
3135

3236
public class WriteRelationshipService {
3337
private final Log log;
@@ -43,19 +47,20 @@ public WriteRelationshipService(Log log, RequestScopedDependencies requestScoped
4347
public RelationshipsWritten writeFromGraph(
4448
String writeRelationshipType,
4549
String writeProperty,
46-
Graph graph,
50+
Graph writeGraph,
4751
IdMap rootIdMap,
4852
String taskName,
4953
Optional<ResultStore> resultStore,
5054
RelationshipWithPropertyConsumer relationshipWithPropertyConsumer,
5155
JobId jobId
5256
) {
57+
5358
return Neo4jDatabaseRelationshipWriter.writeRelationshipsFromGraph(
5459
writeRelationshipType,
5560
writeProperty,
5661
requestScopedDependencies.taskRegistryFactory(),
5762
writeContext.relationshipExporterBuilder(),
58-
graph,
63+
writeGraph,
5964
rootIdMap,
6065
log,
6166
taskName,
@@ -65,4 +70,33 @@ public RelationshipsWritten writeFromGraph(
6570
jobId
6671
);
6772
}
73+
74+
public RelationshipsWritten writeFromRelationshipStream(
75+
String writeRelationshipType,
76+
List<String> properties,
77+
List<ValueType> valueTypes,
78+
Stream<ExportedRelationship> relationshipStream,
79+
IdMap rootIdMap,
80+
String taskName,
81+
Optional<ResultStore> resultStore,
82+
JobId jobId
83+
){
84+
85+
return Neo4jDatabaseRelationshipWriter.writeRelationshipsFromStream(
86+
writeRelationshipType,
87+
properties,
88+
valueTypes,
89+
requestScopedDependencies.taskRegistryFactory(),
90+
writeContext.relationshipStreamExporterBuilder(),
91+
relationshipStream,
92+
rootIdMap,
93+
log,
94+
taskName,
95+
requestScopedDependencies.terminationFlag(),
96+
resultStore,
97+
jobId
98+
);
99+
}
100+
101+
68102
}

applications/algorithms/path-finding/src/main/java/org/neo4j/gds/applications/algorithms/pathfinding/BellmanFordWriteStep.java

Lines changed: 21 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -19,50 +19,26 @@
1919
*/
2020
package org.neo4j.gds.applications.algorithms.pathfinding;
2121

22-
import org.neo4j.gds.api.ExportedRelationship;
2322
import org.neo4j.gds.api.Graph;
2423
import org.neo4j.gds.api.GraphStore;
25-
import org.neo4j.gds.api.IdMap;
2624
import org.neo4j.gds.api.ResultStore;
27-
import org.neo4j.gds.api.nodeproperties.ValueType;
28-
import org.neo4j.gds.applications.algorithms.machinery.RequestScopedDependencies;
29-
import org.neo4j.gds.applications.algorithms.machinery.WriteContext;
25+
import org.neo4j.gds.applications.algorithms.machinery.WriteRelationshipService;
3026
import org.neo4j.gds.applications.algorithms.machinery.WriteStep;
3127
import org.neo4j.gds.applications.algorithms.metadata.RelationshipsWritten;
32-
import org.neo4j.gds.core.concurrency.Concurrency;
33-
import org.neo4j.gds.core.utils.logging.LoggerForProgressTrackingAdapter;
3428
import org.neo4j.gds.core.utils.progress.JobId;
35-
import org.neo4j.gds.core.utils.progress.tasks.TaskProgressTracker;
36-
import org.neo4j.gds.core.write.RelationshipStreamExporter;
37-
import org.neo4j.gds.logging.Log;
38-
import org.neo4j.gds.paths.PathResult;
3929
import org.neo4j.gds.paths.bellmanford.AllShortestPathsBellmanFordWriteConfig;
4030
import org.neo4j.gds.paths.bellmanford.BellmanFordResult;
41-
import org.neo4j.values.storable.Value;
42-
import org.neo4j.values.storable.Values;
43-
44-
import java.util.List;
45-
46-
import static org.neo4j.gds.paths.dijkstra.config.ShortestPathDijkstraWriteConfig.COSTS_KEY;
47-
import static org.neo4j.gds.paths.dijkstra.config.ShortestPathDijkstraWriteConfig.NODE_IDS_KEY;
48-
import static org.neo4j.gds.paths.dijkstra.config.ShortestPathDijkstraWriteConfig.TOTAL_COST_KEY;
4931

5032
class BellmanFordWriteStep implements WriteStep<BellmanFordResult, RelationshipsWritten> {
51-
private final Log log;
52-
private final RequestScopedDependencies requestScopedDependencies;
53-
private final WriteContext writeContext;
33+
private final WriteRelationshipService writeRelationshipService;
5434
private final AllShortestPathsBellmanFordWriteConfig configuration;
5535

5636
BellmanFordWriteStep(
57-
Log log,
58-
RequestScopedDependencies requestScopedDependencies,
59-
WriteContext writeContext,
37+
WriteRelationshipService writeRelationshipService,
6038
AllShortestPathsBellmanFordWriteConfig configuration
6139
) {
62-
this.log = log;
63-
this.requestScopedDependencies = requestScopedDependencies;
6440
this.configuration = configuration;
65-
this.writeContext = writeContext;
41+
this.writeRelationshipService = writeRelationshipService;
6642
}
6743

6844
@Override
@@ -73,126 +49,33 @@ public RelationshipsWritten execute(
7349
BellmanFordResult result,
7450
JobId jobId
7551
) {
76-
var writeRelationshipType = configuration.writeRelationshipType();
7752

7853
var writeNodeIds = configuration.writeNodeIds();
7954
var writeCosts = configuration.writeCosts();
8055

56+
var specification = new PathFindingWriteRelationshipSpecification(graph,writeNodeIds,writeCosts);
57+
var keys = specification.createKeys();
58+
var types= specification.createTypes();
59+
8160
var paths = result.shortestPaths();
8261
if (configuration.writeNegativeCycles() && result.containsNegativeCycle()) {
8362
paths = result.negativeCycles();
8463
}
85-
86-
var relationshipStream = paths.mapPaths(
87-
pathResult -> new ExportedRelationship(
88-
pathResult.sourceNode(),
89-
pathResult.targetNode(),
90-
createValues(graph, pathResult, writeNodeIds, writeCosts)
91-
)
92-
);
93-
94-
var progressTracker = new TaskProgressTracker(
95-
RelationshipStreamExporter.baseTask("Write shortest Paths"),
96-
new LoggerForProgressTrackingAdapter(log),
97-
new Concurrency(1),
98-
requestScopedDependencies.taskRegistryFactory()
99-
);
100-
101-
var exporter = writeContext.relationshipStreamExporterBuilder()
102-
.withIdMappingOperator(graph::toOriginalNodeId)
103-
.withRelationships(relationshipStream)
104-
.withTerminationFlag(requestScopedDependencies.terminationFlag())
105-
.withProgressTracker(progressTracker)
106-
.withResultStore(configuration.resolveResultStore(resultStore))
107-
.withJobId(configuration.jobId())
108-
.build();
109-
110-
// effect
111-
var relationshipsWritten = exporter.write(
112-
writeRelationshipType,
113-
createKeys(writeNodeIds, writeCosts),
114-
createTypes(writeNodeIds, writeCosts)
115-
);
116-
117-
// reporting
118-
return new RelationshipsWritten(relationshipsWritten);
119-
}
120-
121-
private Value[] createValues(IdMap idMap, PathResult pathResult, boolean writeNodeIds, boolean writeCosts) {
122-
if (writeNodeIds && writeCosts) {
123-
return new Value[]{
124-
Values.doubleValue(pathResult.totalCost()),
125-
Values.longArray(toOriginalIds(idMap, pathResult.nodeIds())),
126-
Values.doubleArray(pathResult.costs())
127-
};
128-
}
129-
if (writeNodeIds) {
130-
return new Value[]{
131-
Values.doubleValue(pathResult.totalCost()),
132-
Values.longArray(toOriginalIds(idMap, pathResult.nodeIds())),
133-
};
134-
}
135-
if (writeCosts) {
136-
return new Value[]{
137-
Values.doubleValue(pathResult.totalCost()),
138-
Values.doubleArray(pathResult.costs())
139-
};
140-
}
141-
return new Value[]{
142-
Values.doubleValue(pathResult.totalCost()),
143-
};
144-
}
145-
146-
private long[] toOriginalIds(IdMap idMap, long[] internalIds) {
147-
for (int i = 0; i < internalIds.length; i++) {
148-
internalIds[i] = idMap.toOriginalNodeId(internalIds[i]);
149-
}
150-
return internalIds;
151-
}
152-
153-
private List<String> createKeys(boolean writeNodeIds, boolean writeCosts) {
154-
if (writeNodeIds && writeCosts) {
155-
return List.of(
156-
TOTAL_COST_KEY,
157-
NODE_IDS_KEY,
158-
COSTS_KEY
159-
);
160-
}
161-
if (writeNodeIds) {
162-
return List.of(
163-
TOTAL_COST_KEY,
164-
NODE_IDS_KEY
165-
);
166-
}
167-
if (writeCosts) {
168-
return List.of(
169-
TOTAL_COST_KEY,
170-
COSTS_KEY
64+
try (
65+
var relationshipStream = paths.mapPaths(specification::createRelationship);
66+
) {
67+
68+
return writeRelationshipService.writeFromRelationshipStream(
69+
configuration.writeRelationshipType(),
70+
keys,
71+
types,
72+
relationshipStream,
73+
graph.rootIdMap(),
74+
"Write shortest Paths",
75+
configuration.resolveResultStore(resultStore),
76+
configuration.jobId()
17177
);
17278
}
173-
return List.of(TOTAL_COST_KEY);
17479
}
17580

176-
private List<ValueType> createTypes(boolean writeNodeIds, boolean writeCosts) {
177-
if (writeNodeIds && writeCosts) {
178-
return List.of(
179-
ValueType.DOUBLE,
180-
ValueType.LONG_ARRAY,
181-
ValueType.DOUBLE_ARRAY
182-
);
183-
}
184-
if (writeNodeIds) {
185-
return List.of(
186-
ValueType.DOUBLE,
187-
ValueType.LONG_ARRAY
188-
);
189-
}
190-
if (writeCosts) {
191-
return List.of(
192-
ValueType.DOUBLE,
193-
ValueType.DOUBLE_ARRAY
194-
);
195-
}
196-
return List.of(ValueType.DOUBLE);
197-
}
19881
}

0 commit comments

Comments
 (0)