14
14
import java .util .Objects ;
15
15
import java .util .Optional ;
16
16
import java .util .TreeMap ;
17
+ import java .util .concurrent .ConcurrentHashMap ;
17
18
import java .util .function .BiConsumer ;
18
19
import java .util .stream .Collectors ;
19
20
import java .util .stream .StreamSupport ;
64
65
import io .hyperfoil .tools .horreum .api .data .ValidationError ;
65
66
import io .hyperfoil .tools .horreum .api .services .RunService ;
66
67
import io .hyperfoil .tools .horreum .api .services .SchemaService ;
68
+ import io .hyperfoil .tools .horreum .api .services .TestService ;
67
69
import io .hyperfoil .tools .horreum .bus .AsyncEventChannels ;
68
70
import io .hyperfoil .tools .horreum .datastore .BackendResolver ;
69
71
import io .hyperfoil .tools .horreum .datastore .Datastore ;
@@ -152,6 +154,8 @@ WHEN jsonb_typeof(data) = 'array' THEN ?1 IN (SELECT jsonb_array_elements(data)-
152
154
@ Inject
153
155
Session session ;
154
156
157
+ private final ConcurrentHashMap <Integer , TestService .RecalculationStatus > transformations = new ConcurrentHashMap <>();
158
+
155
159
@ Transactional
156
160
@ WithRoles (extras = Roles .HORREUM_SYSTEM )
157
161
void onTestDeleted (int testId ) {
@@ -1118,14 +1122,38 @@ public void recalculateAll(String fromStr, String toStr) {
1118
1122
}
1119
1123
}
1120
1124
1125
+ /**
1126
+ * Transforms the data for a given run by applying applicable schemas and transformers.
1127
+ * It ensures any existing datasets for the run are removed before creating new ones,
1128
+ * handles timeouts for ongoing transformations, and creates datasets with the transformed data.
1129
+ *
1130
+ * @param runId the ID of the run to transform
1131
+ * @param isRecalculation flag indicating if this is a recalculation
1132
+ * @return the number of datasets created, or 0 if the run is invalid or not found or already ongoing
1133
+ */
1121
1134
@ WithRoles (extras = Roles .HORREUM_SYSTEM )
1122
1135
@ Transactional
1123
1136
int transform (int runId , boolean isRecalculation ) {
1124
1137
if (runId < 1 ) {
1125
1138
log .errorf ("Transformation parameters error: run %s" , runId );
1126
1139
return 0 ;
1127
1140
}
1141
+
1128
1142
log .debugf ("Transforming run ID %d, recalculation? %s" , runId , Boolean .toString (isRecalculation ));
1143
+ int numDatasets = 0 ;
1144
+
1145
+ // check whether there is an ongoing transformation on the same runId
1146
+ TestService .RecalculationStatus status = new TestService .RecalculationStatus (1 );
1147
+ TestService .RecalculationStatus prev = transformations .putIfAbsent (runId , status );
1148
+ // ensure the transformation is removed, with this approach we should be sure
1149
+ // it gets removed even if transaction-level exception occurs, e.g., timeout
1150
+ Util .registerTxSynchronization (tm , txStatus -> transformations .remove (runId , status ));
1151
+ if (prev != null ) {
1152
+ // there is an ongoing transformation that has recently been initiated
1153
+ log .warnf ("Transformation for run %d already in progress" , runId );
1154
+ return numDatasets ;
1155
+ }
1156
+
1129
1157
// We need to make sure all old datasets are gone before creating new; otherwise we could
1130
1158
// break the runid,ordinal uniqueness constraint
1131
1159
for (DatasetDAO old : DatasetDAO .<DatasetDAO > list ("run.id" , runId )) {
@@ -1138,9 +1166,8 @@ int transform(int runId, boolean isRecalculation) {
1138
1166
RunDAO run = RunDAO .findById (runId );
1139
1167
if (run == null ) {
1140
1168
log .errorf ("Cannot load run ID %d for transformation" , runId );
1141
- return 0 ;
1169
+ return numDatasets ; // this is 0
1142
1170
}
1143
- int ordinal = 0 ;
1144
1171
Map <Integer , JsonNode > transformerResults = new TreeMap <>();
1145
1172
// naked nodes (those produced by implicit identity transformers) are all added to each dataset
1146
1173
List <JsonNode > nakedNodes = new ArrayList <>();
@@ -1247,7 +1274,8 @@ int transform(int runId, boolean isRecalculation) {
1247
1274
}
1248
1275
} else if (!result .isContainerNode () || (result .isObject () && !result .has ("$schema" )) ||
1249
1276
(result .isArray ()
1250
- && StreamSupport .stream (result .spliterator (), false ).anyMatch (item -> !item .has ("$schema" )))) {
1277
+ && StreamSupport .stream (result .spliterator (), false )
1278
+ .anyMatch (item -> !item .has ("$schema" )))) {
1251
1279
logMessage (run , PersistentLogDAO .WARN , "Dataset will contain element without a schema." );
1252
1280
}
1253
1281
JsonNode existing = transformerResults .get (transformerId );
@@ -1285,12 +1313,14 @@ int transform(int runId, boolean isRecalculation) {
1285
1313
}
1286
1314
nakedNodes .add (node );
1287
1315
logMessage (run , PersistentLogDAO .DEBUG ,
1288
- "This test (%d) does not use any transformer for schema %s (key %s), passing as-is." , run .testid , uri ,
1316
+ "This test (%d) does not use any transformer for schema %s (key %s), passing as-is." , run .testid ,
1317
+ uri ,
1289
1318
key );
1290
1319
}
1291
1320
}
1292
1321
if (schemasAndTransformers > 0 ) {
1293
- int max = transformerResults .values ().stream ().filter (JsonNode ::isArray ).mapToInt (JsonNode ::size ).max ().orElse (1 );
1322
+ int max = transformerResults .values ().stream ().filter (JsonNode ::isArray ).mapToInt (JsonNode ::size ).max ()
1323
+ .orElse (1 );
1294
1324
1295
1325
for (int position = 0 ; position < max ; position += 1 ) {
1296
1326
ArrayNode all = instance .arrayNode (max + nakedNodes .size ());
@@ -1305,7 +1335,7 @@ int transform(int runId, boolean isRecalculation) {
1305
1335
String message = String .format (
1306
1336
"Transformer %d produced an array of %d elements but other transformer " +
1307
1337
"produced %d elements; dataset %d/%d might be missing some data." ,
1308
- entry .getKey (), node .size (), max , run .id , ordinal );
1338
+ entry .getKey (), node .size (), max , run .id , numDatasets );
1309
1339
logMessage (run , PersistentLogDAO .WARN , "%s" , message );
1310
1340
log .warnf (message );
1311
1341
}
@@ -1316,18 +1346,18 @@ int transform(int runId, boolean isRecalculation) {
1316
1346
}
1317
1347
}
1318
1348
nakedNodes .forEach (all ::add );
1319
- createDataset (new DatasetDAO (run , ordinal ++, run .description , all ), isRecalculation );
1349
+ createDataset (new DatasetDAO (run , numDatasets ++, run .description , all ), isRecalculation );
1320
1350
}
1321
1351
mediator .validateRun (run .id );
1322
- return ordinal ;
1323
1352
} else {
1353
+ numDatasets = 1 ;
1324
1354
logMessage (run , PersistentLogDAO .INFO , "No applicable schema, dataset will be empty." );
1325
1355
createDataset (new DatasetDAO (
1326
1356
run , 0 , "Empty Dataset for run data without any schema." ,
1327
1357
instance .arrayNode ()), isRecalculation );
1328
1358
mediator .validateRun (run .id );
1329
- return 1 ;
1330
1359
}
1360
+ return numDatasets ;
1331
1361
}
1332
1362
1333
1363
private String limitLength (String str ) {
0 commit comments