Skip to content

Commit cac88c8

Browse files
committed
[CSAPI] Add paging support for fois and obs
1 parent b71d81a commit cac88c8

File tree

1 file changed

+141
-29
lines changed

1 file changed

+141
-29
lines changed

sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java

Lines changed: 141 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,7 @@ public CompletableFuture<ISystemWithDesc> getSystemById(String id, ResourceForma
375375
}
376376
});
377377
}
378+
378379

379380
public CompletableFuture<ISystemWithDesc> getSystemByUid(String uid, ResourceFormat format) throws ExecutionException, InterruptedException
380381
{
@@ -424,6 +425,7 @@ public CompletableFuture<String> addSystem(ISystemWithDesc system)
424425
throw new IllegalStateException("Error initializing binding", e);
425426
}
426427
}
428+
427429

428430
public CompletableFuture<Integer> updateSystem(String systemID, ISystemWithDesc system)
429431
{
@@ -445,6 +447,7 @@ public CompletableFuture<Integer> updateSystem(String systemID, ISystemWithDesc
445447
throw new IllegalStateException("Error initializing binding", e);
446448
}
447449
}
450+
448451

449452
public CompletableFuture<String> addSubSystem(String systemID, ISystemWithDesc system)
450453
{
@@ -466,6 +469,7 @@ public CompletableFuture<String> addSubSystem(String systemID, ISystemWithDesc s
466469
throw new IllegalStateException("Error initializing binding", e);
467470
}
468471
}
472+
469473

470474
public CompletableFuture<Set<String>> addSystems(ISystemWithDesc... systems)
471475
{
@@ -532,6 +536,7 @@ public CompletableFuture<String> addSamplingFeature(String systemId, IFeature fe
532536
throw new IllegalStateException("Error initializing binding", e);
533537
}
534538
}
539+
535540

536541
public CompletableFuture<Integer> updateSamplingFeature(String featureId, IFeature feature)
537542
{
@@ -553,6 +558,7 @@ public CompletableFuture<Integer> updateSamplingFeature(String featureId, IFeatu
553558
throw new IllegalStateException("Error initializing binding", e);
554559
}
555560
}
561+
556562

557563
public CompletableFuture<IFeature> getSamplingFeatureById(String featureId)
558564
{
@@ -572,10 +578,58 @@ public CompletableFuture<IFeature> getSamplingFeatureById(String featureId)
572578
}
573579
});
574580
}
581+
582+
583+
public CompletableFuture<IFeature> getSamplingFeatureByUid(String uid, ResourceFormat format)
584+
{
585+
return sendGetRequest(endpoint.resolve(SF_COLLECTION + "?id=" + uid), format, body -> {
586+
try
587+
{
588+
var ctx = new RequestContext(body);
589+
590+
// use modified binding since the response contains a feature collection
591+
var binding = new FoiBindingGeoJson(ctx, null, null, true) {
592+
protected JsonReader getJsonReader(InputStream is) throws IOException
593+
{
594+
var reader = super.getJsonReader(is);
595+
skipToCollectionItems(reader);
596+
return reader;
597+
}
598+
};
599+
600+
return binding.deserialize();
601+
}
602+
catch (IOException e)
603+
{
604+
throw new CompletionException(e);
605+
}
606+
});
607+
}
608+
575609

576610
public CompletableFuture<Stream<IFeature>> getSystemSamplingFeatures(String systemId, ResourceFormat format)
577611
{
578-
var request = SYSTEMS_COLLECTION + "/" + systemId + "/" + SF_COLLECTION + "?f=" + format + "&limit=10000";
612+
return getSystemSamplingFeatures(systemId, format, 100);
613+
}
614+
615+
616+
public CompletableFuture<Stream<IFeature>> getSystemSamplingFeatures(String systemId, ResourceFormat format, int maxPageSize)
617+
{
618+
return getResourcesWithPaging((pageSize, offset) -> {
619+
try {
620+
return getSystemSamplingFeatures(systemId, format, pageSize, offset).get();
621+
}
622+
catch (Exception e) {
623+
throw new IOException("Error loading sampling features", e);
624+
}
625+
}, maxPageSize);
626+
}
627+
628+
629+
protected CompletableFuture<Stream<IFeature>> getSystemSamplingFeatures(String systemId, ResourceFormat format, int pageSize, int offset)
630+
{
631+
var request = SYSTEMS_COLLECTION + "/" + systemId + "/" + SF_COLLECTION + "?f=" + format + "&limit=" + pageSize + "&offset=" + offset;
632+
log.debug("{}", request);
579633

580634
return sendGetRequest(endpoint.resolve(request), format, body -> {
581635
try
@@ -644,32 +698,6 @@ public Spliterator<IFeature> trySplit()
644698
});
645699
}
646700

647-
public CompletableFuture<IFeature> getSamplingFeatureByUid(String uid, ResourceFormat format)
648-
{
649-
return sendGetRequest(endpoint.resolve(SF_COLLECTION + "?id=" + uid), format, body -> {
650-
try
651-
{
652-
var ctx = new RequestContext(body);
653-
654-
// use modified binding since the response contains a feature collection
655-
var binding = new FoiBindingGeoJson(ctx, null, null, true) {
656-
protected JsonReader getJsonReader(InputStream is) throws IOException
657-
{
658-
var reader = super.getJsonReader(is);
659-
skipToCollectionItems(reader);
660-
return reader;
661-
}
662-
};
663-
664-
return binding.deserialize();
665-
}
666-
catch (IOException e)
667-
{
668-
throw new CompletionException(e);
669-
}
670-
});
671-
}
672-
673701

674702
/*-------------*/
675703
/* Datastreams */
@@ -709,6 +737,7 @@ public CompletableFuture<IDataStreamInfo> getDatastreamById(String id, ResourceF
709737

710738
}
711739

740+
712741
public CompletableFuture<IDataStreamInfo> getDatastreamSchema(String id, ResourceFormat obsFormat, ResourceFormat format)
713742
{
714743
var obsFormatStr = URLEncoder.encode(obsFormat.getMimeType(), StandardCharsets.UTF_8);
@@ -730,6 +759,7 @@ public CompletableFuture<IDataStreamInfo> getDatastreamSchema(String id, Resourc
730759
}
731760
});
732761
}
762+
733763

734764
public CompletableFuture<String> addDataStream(String systemId, IDataStreamInfo datastream)
735765
{
@@ -955,10 +985,28 @@ public CompletableFuture<String> pushObs(String dataStreamId, IDataStreamInfo da
955985
}
956986
}
957987

958-
988+
959989
public CompletableFuture<Stream<IObsData>> getObservations(String dsId, IDataStreamInfo dsInfo, TemporalFilter timeFilter, Set<String> foiIds, ResourceFormat format)
960990
{
961-
var request = DATASTREAMS_COLLECTION + "/" + dsId + "/observations?f=" + format + "&limit=10000";
991+
return getObservations(dsId, dsInfo, timeFilter, foiIds, format, 100);
992+
}
993+
994+
995+
public CompletableFuture<Stream<IObsData>> getObservations(String dsId, IDataStreamInfo dsInfo, TemporalFilter timeFilter, Set<String> foiIds, ResourceFormat format, int maxPageSize)
996+
{
997+
return getResourcesWithPaging((pageSize, offset) -> {
998+
try {
999+
return getObservations(dsId, dsInfo, timeFilter, foiIds, format, pageSize, offset).get();
1000+
}
1001+
catch (Exception e) {
1002+
throw new IOException("Error loading observations", e);
1003+
}
1004+
}, maxPageSize);
1005+
}
1006+
1007+
protected CompletableFuture<Stream<IObsData>> getObservations(String dsId, IDataStreamInfo dsInfo, TemporalFilter timeFilter, Set<String> foiIds, ResourceFormat format, int pageSize, int offset)
1008+
{
1009+
var request = DATASTREAMS_COLLECTION + "/" + dsId + "/observations?f=" + format + "&limit=" + pageSize + "&offset=" + offset;
9621010

9631011
if (foiIds != null)
9641012
request += "&foi=" + String.join(",", foiIds);
@@ -1316,6 +1364,70 @@ protected CompletableFuture<Set<String>> sendBatchPostRequestFallback(URI collec
13161364
}
13171365

13181366

1367+
interface PageLoadFunction<T>
1368+
{
1369+
Stream<T> loadPage(int offset, int pageSize) throws IOException;
1370+
}
1371+
1372+
1373+
protected <T> CompletableFuture<Stream<T>> getResourcesWithPaging(PageLoadFunction<T> pageLoader, int pageSize)
1374+
{
1375+
var resourceStream = StreamSupport.stream(new Spliterator<T>() {
1376+
Spliterator<T> currentBatch;
1377+
int offset = 0;
1378+
1379+
@Override
1380+
public int characteristics()
1381+
{
1382+
return Spliterator.ORDERED | Spliterator.DISTINCT;
1383+
}
1384+
1385+
@Override
1386+
public long estimateSize()
1387+
{
1388+
return Long.MAX_VALUE;
1389+
}
1390+
1391+
@Override
1392+
public boolean tryAdvance(Consumer<? super T> consumer)
1393+
{
1394+
boolean hasNext = false;
1395+
if (currentBatch != null)
1396+
hasNext = currentBatch.tryAdvance(consumer);
1397+
1398+
if (!hasNext)
1399+
{
1400+
try {
1401+
log.debug("Loading batch {}={}", offset, offset+pageSize);
1402+
var batch = pageLoader.loadPage(pageSize, offset);
1403+
if (batch == null)
1404+
return false;
1405+
1406+
offset += pageSize;
1407+
currentBatch = batch.spliterator();
1408+
hasNext = currentBatch.tryAdvance(consumer);
1409+
}
1410+
catch (Exception e)
1411+
{
1412+
throw new IllegalStateException("Error loading next page", e);
1413+
}
1414+
}
1415+
1416+
return hasNext;
1417+
}
1418+
1419+
@Override
1420+
public Spliterator<T> trySplit()
1421+
{
1422+
return null;
1423+
}
1424+
1425+
}, false);
1426+
1427+
return CompletableFuture.completedFuture(resourceStream);
1428+
}
1429+
1430+
13191431
protected void skipToCollectionItems(JsonReader reader) throws IOException
13201432
{
13211433
// skip to array of collection items

0 commit comments

Comments
 (0)