19
19
*/
20
20
package org .neo4j .gds .applications .algorithms .machinery ;
21
21
22
+ import org .neo4j .gds .api .ExportedRelationship ;
22
23
import org .neo4j .gds .api .Graph ;
23
24
import org .neo4j .gds .api .IdMap ;
24
25
import org .neo4j .gds .api .ResultStore ;
26
+ import org .neo4j .gds .api .nodeproperties .ValueType ;
25
27
import org .neo4j .gds .api .properties .relationships .RelationshipWithPropertyConsumer ;
26
28
import org .neo4j .gds .applications .algorithms .metadata .RelationshipsWritten ;
29
+ import org .neo4j .gds .core .concurrency .Concurrency ;
27
30
import org .neo4j .gds .core .utils .logging .LoggerForProgressTrackingAdapter ;
28
31
import org .neo4j .gds .core .utils .progress .JobId ;
29
32
import org .neo4j .gds .core .utils .progress .TaskRegistryFactory ;
30
33
import org .neo4j .gds .core .utils .progress .tasks .TaskProgressTracker ;
31
34
import org .neo4j .gds .core .write .RelationshipExporter ;
32
35
import org .neo4j .gds .core .write .RelationshipExporterBuilder ;
36
+ import org .neo4j .gds .core .write .RelationshipStreamExporter ;
37
+ import org .neo4j .gds .core .write .RelationshipStreamExporterBuilder ;
33
38
import org .neo4j .gds .logging .Log ;
34
39
import org .neo4j .gds .termination .TerminationFlag ;
35
40
41
+ import java .util .List ;
36
42
import java .util .Optional ;
43
+ import java .util .stream .Stream ;
37
44
38
45
final class Neo4jDatabaseRelationshipWriter {
39
- static RelationshipsWritten writeRelationship (
46
+
47
+ static RelationshipsWritten writeRelationshipsFromGraph (
40
48
String writeRelationshipType ,
41
49
String writeProperty ,
42
50
TaskRegistryFactory taskRegistryFactory ,
@@ -49,24 +57,23 @@ static RelationshipsWritten writeRelationship(
49
57
Optional <ResultStore > resultStore ,
50
58
RelationshipWithPropertyConsumer relationshipConsumer ,
51
59
JobId jobId
52
- ){
60
+ ) {
53
61
54
62
var progressTracker = new TaskProgressTracker (
55
- RelationshipExporter .baseTask (taskName , graph .relationshipCount ()),
56
- new LoggerForProgressTrackingAdapter (log ),
57
- RelationshipExporterBuilder .TYPED_DEFAULT_WRITE_CONCURRENCY ,
58
- taskRegistryFactory
59
- );
60
-
61
- var exporter = relationshipExporterBuilder
62
- .withIdMappingOperator (rootIdMap ::toOriginalNodeId )
63
- .withGraph (graph )
64
- .withTerminationFlag (algorithmTerminationFlag )
65
- .withProgressTracker (progressTracker )
66
- .withResultStore (resultStore )
67
- .withJobId (jobId )
68
- .build ();
63
+ RelationshipExporter .baseTask (taskName , graph .relationshipCount ()),
64
+ new LoggerForProgressTrackingAdapter (log ),
65
+ RelationshipExporterBuilder .TYPED_DEFAULT_WRITE_CONCURRENCY ,
66
+ taskRegistryFactory
67
+ );
69
68
69
+ var exporter = relationshipExporterBuilder
70
+ .withIdMappingOperator (rootIdMap ::toOriginalNodeId )
71
+ .withGraph (graph )
72
+ .withTerminationFlag (algorithmTerminationFlag )
73
+ .withProgressTracker (progressTracker )
74
+ .withResultStore (resultStore )
75
+ .withJobId (jobId )
76
+ .build ();
70
77
71
78
try {
72
79
exporter .write (
@@ -82,8 +89,58 @@ static RelationshipsWritten writeRelationship(
82
89
}
83
90
84
91
return new RelationshipsWritten (graph .relationshipCount ());
92
+ }
85
93
94
+ static RelationshipsWritten writeRelationshipsFromStream (
95
+ String writeRelationshipType ,
96
+ List <String > properties ,
97
+ List <ValueType > valueTypes ,
98
+ TaskRegistryFactory taskRegistryFactory ,
99
+ RelationshipStreamExporterBuilder relationshipStreamExporterBuilder ,
100
+ Stream <ExportedRelationship > relationshipStream ,
101
+ IdMap rootIdMap ,
102
+ Log log ,
103
+ String taskName ,
104
+ TerminationFlag algorithmTerminationFlag ,
105
+ Optional <ResultStore > maybeResultStore ,
106
+ JobId jobId
107
+ ) {
108
+
109
+ var progressTracker = new TaskProgressTracker (
110
+ RelationshipStreamExporter .baseTask (taskName ),
111
+ new LoggerForProgressTrackingAdapter (log ),
112
+ new Concurrency (1 ),
113
+ taskRegistryFactory
114
+ );
115
+
116
+ // When we are writing to the result store, the result stream might not be consumed
117
+ // inside the current transaction. This causes the stream to immediately return an empty stream
118
+ // as the termination flag, which is bound to the current transaction is set to true. We therefore
119
+ // need to collect the stream and trigger an eager computation.
120
+ var maybeCollectedStream = maybeResultStore
121
+ .map (__ -> relationshipStream .toList ().stream ())
122
+ .orElse (relationshipStream );
123
+
124
+ // configure the exporter
125
+ var relationshipStreamExporter = relationshipStreamExporterBuilder
126
+ .withResultStore (maybeResultStore )
127
+ .withIdMappingOperator (rootIdMap ::toOriginalNodeId )
128
+ .withProgressTracker (progressTracker )
129
+ .withRelationships (maybeCollectedStream )
130
+ .withTerminationFlag (algorithmTerminationFlag )
131
+ .withJobId (jobId )
132
+ .build ();
133
+
134
+ var relationshipsWritten = relationshipStreamExporter .write (
135
+ writeRelationshipType ,
136
+ properties ,
137
+ valueTypes
138
+ );
139
+
140
+ return new RelationshipsWritten (relationshipsWritten );
86
141
}
87
142
143
+
88
144
private Neo4jDatabaseRelationshipWriter () {}
89
145
}
146
+
0 commit comments