diff --git a/carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/application/DataStreamBatch.kt b/carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/application/DataStreamBatch.kt index 906540509..9dd906145 100644 --- a/carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/application/DataStreamBatch.kt +++ b/carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/application/DataStreamBatch.kt @@ -3,6 +3,7 @@ package dk.cachet.carp.data.application import dk.cachet.carp.common.application.data.Data +import dk.cachet.carp.common.application.data.DataType import kotlinx.serialization.* import kotlinx.serialization.builtins.* import kotlinx.serialization.descriptors.* @@ -90,7 +91,7 @@ class MutableDataStreamBatch : DataStreamBatch /** * Append all data stream sequences contained in [batch] to this batch. * - * @throws IllegalArgumentException when the start of any of the sequences contained in [batch] + * @throws IllegalArgumentException when the start of the sequences contained in [batch] * precede the end of a previously appended sequence to the same data stream. */ fun appendBatch( batch: DataStreamBatch ) diff --git a/carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/application/DataStreamService.kt b/carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/application/DataStreamService.kt index ceceb522a..ae5b6f867 100644 --- a/carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/application/DataStreamService.kt +++ b/carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/application/DataStreamService.kt @@ -2,10 +2,13 @@ package dk.cachet.carp.data.application import dk.cachet.carp.common.application.UUID +import dk.cachet.carp.common.application.data.DataType import dk.cachet.carp.common.application.services.ApiVersion import dk.cachet.carp.common.application.services.ApplicationService import dk.cachet.carp.common.application.services.IntegrationEvent -import kotlinx.serialization.* +import kotlinx.datetime.Instant +import kotlinx.serialization.Required +import kotlinx.serialization.Serializable /** @@ -60,6 +63,33 @@ interface DataStreamService : ApplicationService, + deviceRoleNames: Set? = null, + dataTypes: Set? = null, + from: Instant? = null, + to: Instant? = null + ): DataStreamBatch + + /** * Stop accepting incoming data for all data streams for each of the [studyDeploymentIds]. * diff --git a/carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/infrastructure/DataStreamServiceDecorator.kt b/carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/infrastructure/DataStreamServiceDecorator.kt index cab4d76d6..e532fad55 100644 --- a/carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/infrastructure/DataStreamServiceDecorator.kt +++ b/carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/infrastructure/DataStreamServiceDecorator.kt @@ -1,6 +1,7 @@ package dk.cachet.carp.data.infrastructure import dk.cachet.carp.common.application.UUID +import dk.cachet.carp.common.application.data.DataType import dk.cachet.carp.common.infrastructure.services.ApplicationServiceDecorator import dk.cachet.carp.common.infrastructure.services.ApplicationServiceInvoker import dk.cachet.carp.common.infrastructure.services.Command @@ -8,6 +9,7 @@ import dk.cachet.carp.data.application.DataStreamBatch import dk.cachet.carp.data.application.DataStreamId import dk.cachet.carp.data.application.DataStreamService import dk.cachet.carp.data.application.DataStreamsConfiguration +import kotlinx.datetime.Instant class DataStreamServiceDecorator( @@ -32,6 +34,22 @@ class DataStreamServiceDecorator( toSequenceIdInclusive: Long? ) = invoke( DataStreamServiceRequest.GetDataStream( dataStream, fromSequenceId, toSequenceIdInclusive ) ) + override suspend fun getBatchForStudyDeployments( + studyDeploymentIds: Set, + deviceRoleNames: Set?, + dataTypes: Set?, + from: Instant?, + to: Instant? + ) = invoke( + DataStreamServiceRequest.GetBatchForStudyDeployments( + studyDeploymentIds, + deviceRoleNames, + dataTypes, + from, + to + ) + ) + override suspend fun closeDataStreams( studyDeploymentIds: Set ) = invoke( DataStreamServiceRequest.CloseDataStreams( studyDeploymentIds ) ) @@ -51,5 +69,12 @@ object DataStreamServiceInvoker : ApplicationServiceInvoker service.closeDataStreams( studyDeploymentIds ) is DataStreamServiceRequest.RemoveDataStreams -> service.removeDataStreams( studyDeploymentIds ) + is DataStreamServiceRequest.GetBatchForStudyDeployments -> service.getBatchForStudyDeployments( + studyDeploymentIds, + deviceRoleNames, + dataTypes, + from, + to + ) } } diff --git a/carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/infrastructure/DataStreamServiceRequest.kt b/carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/infrastructure/DataStreamServiceRequest.kt index cabd742c5..18a91e4f4 100644 --- a/carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/infrastructure/DataStreamServiceRequest.kt +++ b/carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/infrastructure/DataStreamServiceRequest.kt @@ -1,6 +1,7 @@ package dk.cachet.carp.data.infrastructure import dk.cachet.carp.common.application.UUID +import dk.cachet.carp.common.application.data.DataType import dk.cachet.carp.common.application.services.ApiVersion import dk.cachet.carp.common.infrastructure.serialization.ignoreTypeParameters import dk.cachet.carp.common.infrastructure.services.ApplicationServiceRequest @@ -9,6 +10,7 @@ import dk.cachet.carp.data.application.DataStreamBatchSerializer import dk.cachet.carp.data.application.DataStreamId import dk.cachet.carp.data.application.DataStreamService import dk.cachet.carp.data.application.DataStreamsConfiguration +import kotlinx.datetime.Instant import kotlinx.serialization.* import kotlin.js.JsExport @@ -54,15 +56,25 @@ sealed class DataStreamServiceRequest : ApplicationServiceRequest ) : - DataStreamServiceRequest() + data class GetBatchForStudyDeployments( + val studyDeploymentIds: Set, + val deviceRoleNames: Set? = null, + val dataTypes: Set? = null, + val from: Instant? = null, + val to: Instant? = null + ) : DataStreamServiceRequest() + { + override fun getResponseSerializer() = DataStreamBatchSerializer + } + + @Serializable + data class CloseDataStreams( val studyDeploymentIds: Set ) : DataStreamServiceRequest() { override fun getResponseSerializer() = serializer() } @Serializable - data class RemoveDataStreams( val studyDeploymentIds: Set ) : - DataStreamServiceRequest>() + data class RemoveDataStreams( val studyDeploymentIds: Set ) : DataStreamServiceRequest>() { override fun getResponseSerializer() = serializer>() } diff --git a/carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/infrastructure/InMemoryDataStreamService.kt b/carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/infrastructure/InMemoryDataStreamService.kt index 63ab284c4..96d59a37b 100644 --- a/carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/infrastructure/InMemoryDataStreamService.kt +++ b/carp.data.core/src/commonMain/kotlin/dk/cachet/carp/data/infrastructure/InMemoryDataStreamService.kt @@ -2,14 +2,19 @@ package dk.cachet.carp.data.infrastructure import dk.cachet.carp.common.application.UUID import dk.cachet.carp.common.application.data.Data +import dk.cachet.carp.common.application.data.DataType import dk.cachet.carp.common.application.intersect import dk.cachet.carp.common.domain.ExtractUniqueKeyMap import dk.cachet.carp.data.application.DataStreamBatch import dk.cachet.carp.data.application.DataStreamId +import dk.cachet.carp.data.application.DataStreamSequence import dk.cachet.carp.data.application.DataStreamService import dk.cachet.carp.data.application.DataStreamsConfiguration +import dk.cachet.carp.data.application.Measurement import dk.cachet.carp.data.application.MutableDataStreamBatch import dk.cachet.carp.data.application.MutableDataStreamSequence +import dk.cachet.carp.data.application.applyToTimestamp +import kotlinx.datetime.Instant /** @@ -20,14 +25,18 @@ class InMemoryDataStreamService : DataStreamService private val configuredDataStreams: ExtractUniqueKeyMap = ExtractUniqueKeyMap( { configuration -> configuration.studyDeploymentId } ) { - studyDeploymentId -> - IllegalStateException( - "Data streams for deployment with \"$studyDeploymentId\" have already been configured." - ) + studyDeploymentId -> + IllegalStateException( + "Data streams for deployment with \"$studyDeploymentId\" have already been configured." + ) } private val stoppedStudyDeploymentIds: MutableSet = mutableSetOf() private val dataStreams: MutableDataStreamBatch = MutableDataStreamBatch() + companion object + { + private const val MICROSECONDS_TO_MILLISECONDS = 1000L + } /** * Start accepting data for a study deployment for data streams configured in [configuration]. @@ -52,15 +61,15 @@ class InMemoryDataStreamService : DataStreamService override suspend fun appendToDataStreams( studyDeploymentId: UUID, batch: DataStreamBatch ) { require( batch.sequences.all { it.dataStream.studyDeploymentId == studyDeploymentId } ) - { "The study deployment ID of one or more sequences in `batch` doesn't match `studyDeploymentId`." } + { "The study deployment ID of one or more sequences in `batch` doesn't match `studyDeploymentId`." } val configuration = configuredDataStreams[ studyDeploymentId ] requireNotNull( configuration ) { "No data streams configured for this study deployment." } require( batch.sequences.all { it.dataStream in configuration.expectedDataStreamIds } ) - { "The batch contains a sequence with a data stream which wasn't configured for this study deployment." } + { "The batch contains a sequence with a data stream which wasn't configured for this study deployment." } check( studyDeploymentId !in stoppedStudyDeploymentIds ) - { "Data streams for this study deployment have been closed." } + { "Data streams for this study deployment have been closed." } dataStreams.appendBatch( batch ) } @@ -86,9 +95,9 @@ class InMemoryDataStreamService : DataStreamService val configuration = configuredDataStreams[ dataStream.studyDeploymentId ] requireNotNull( configuration ) { "No data streams configured for this study deployment." } require( dataStream in configuration.expectedDataStreamIds ) - { "The batch contains a sequence with a data stream which wasn't configured for this study deployment." } + { "The batch contains a sequence with a data stream which wasn't configured for this study deployment." } require( fromSequenceId >= 0 && (toSequenceIdInclusive == null || toSequenceIdInclusive >= fromSequenceId) ) - { "The starting sequence ID is negative or the end sequence ID is smaller than the starting ID." } + { "The starting sequence ID is negative or the end sequence ID is smaller than the starting ID." } return dataStreams.sequences .filter { it.dataStream == dataStream } @@ -102,7 +111,7 @@ class InMemoryDataStreamService : DataStreamService val startOffset = subRange.first - it.range.first val exclusiveEnd = startOffset + subRange.last - subRange.first + 1 check( startOffset <= Int.MAX_VALUE && exclusiveEnd <= Int.MAX_VALUE ) - { "Exceeded capacity of measurements which can be held in memory." } + { "Exceeded capacity of measurements which can be held in memory." } appendMeasurements( it.measurements.subList( startOffset.toInt(), exclusiveEnd.toInt() ) ) } } @@ -112,14 +121,179 @@ class InMemoryDataStreamService : DataStreamService } /** - * Stop accepting incoming data for all data streams for each of the [studyDeploymentIds]. + * Retrieve collected data points for the specified study deployments, optionally filtered by device role names, data types, and time range. + * + * @param studyDeploymentIds The set of study deployment IDs to query. Must not be empty. + * @param deviceRoleNames Optional set of device role names (e.g., "phone") to include. If null or empty, data for all device roles is returned. + * @param dataTypes Optional set of [DataType]s to include. If null or empty, data for all data types is returned. + * @param from Optional absolute start time for filtering (inclusive). If null, no lower bound is applied. + * @param to Optional absolute end time for filtering (exclusive). If null, no upper bound is applied. + * + * @return A [DataStreamBatch] containing matching data points. + */ + override suspend fun getBatchForStudyDeployments( + studyDeploymentIds: Set, + deviceRoleNames: Set?, + dataTypes: Set?, + from: Instant?, + to: Instant? + ): DataStreamBatch + { + // 1) Apply basic filters and clip by time to contiguous chunks + val prelim = dataStreams.sequences + .filter { matchesBasicFilters(it, studyDeploymentIds, deviceRoleNames, dataTypes) } + .flatMap { seq -> clipByTimeToChunks(seq, from, to).asSequence() } // 0..N sequences per original + .toList() + + // 2) Group by data stream and sort sequences by start of range to ensure adherence to DataStream interface + val byStream = prelim.groupBy { it.dataStream } + .mapValues { (_, seqs) -> seqs.sortedBy { it.range.first } } + + // 3) Append per stream to preserve contract + val batch = MutableDataStreamBatch() + byStream.forEach { (_, seqs) -> + seqs.forEach { batch.appendSequence(it) } + } + return batch + } + + /** + * Checks if a sequence matches the basic filters (deployment ID, device role, data type). + */ + private fun matchesBasicFilters( + sequence: DataStreamSequence<*>, + studyDeploymentIds: Set, + deviceRoleNames: Set?, + dataTypes: Set? + ): Boolean + { + // Check deployment match first - most restrictive filter + if (sequence.dataStream.studyDeploymentId !in studyDeploymentIds) + { + return false + } + + // Then check device role + if (!deviceRoleNames.isNullOrEmpty() && + sequence.dataStream.deviceRoleName !in deviceRoleNames + ) + { + return false + } + + // Finally check data type + if (!dataTypes.isNullOrEmpty() && + sequence.dataStream.dataType !in dataTypes + ) + { + return false + } + + return true + } + + /** + * Returns 0...N clipped sequences for [sequence] that intersect [from, to). + * Builds contiguous chunks by index where the time predicate holds. + * Each chunk becomes its own sequence with an adjusted firstSequenceId. + * This approach works even if time is not monotonic. + */ + private fun clipByTimeToChunks( + sequence: DataStreamSequence<*>, + from: Instant?, + to: Instant? + ): List> + { + if (from == null && to == null) return listOf(sequence) + + val ms = sequence.measurements + if (ms.isEmpty()) return emptyList() + + val chunks = mutableListOf>() + var startIdx = -1 + + fun keep( mIdx: Int ): Boolean = + isWithinTimeRange(ms[mIdx], sequence, from, to) + + for (i in ms.indices) + { + val inRange = keep(i) + if (inRange) + { + if (startIdx == -1) startIdx = i + } else if (startIdx != -1) + { + // close chunk [startIdx, i) + chunks += buildSlice(sequence, startIdx, i) // i is exclusive + startIdx = -1 + } + } + if (startIdx != -1) chunks += buildSlice(sequence, startIdx, ms.size) + + return chunks + } + + /** + * Builds a slice of a sequence from [fromIdx] to [toIdxExclusive]. + */ + private fun buildSlice( + sequence: DataStreamSequence<*>, + fromIdx: Int, + toIdxExclusive: Int + ): DataStreamSequence<*> + { + val newFirst = sequence.firstSequenceId + fromIdx + val slice = sequence.measurements.subList(fromIdx, toIdxExclusive) + return MutableDataStreamSequence( + sequence.dataStream, + newFirst, + sequence.triggerIds, + sequence.syncPoint + ).apply { appendMeasurements(slice) } + } + + /** + * Checks if a measurement falls within the specified time range [from, to). + * Uses half-open interval to avoid double-counting on boundaries. + * + * @param from Inclusive start time + * @param to Exclusive end time + */ + private fun isWithinTimeRange( + measurement: Measurement<*>, + sequence: DataStreamSequence<*>, + from: Instant?, + to: Instant? + ): Boolean + { + // Convert sensor timestamps to absolute time using the sync point + val absoluteStartTime = Instant.fromEpochMilliseconds( + sequence.syncPoint.applyToTimestamp(measurement.sensorStartTime) / MICROSECONDS_TO_MILLISECONDS + ) + + val absoluteEndTime = measurement.sensorEndTime?.let { endTime -> + Instant.fromEpochMilliseconds( + sequence.syncPoint.applyToTimestamp(endTime) / MICROSECONDS_TO_MILLISECONDS + ) + } ?: absoluteStartTime + + // Accept a measurement if its [start, end] intersects [from, to) + val afterFrom = from == null || absoluteEndTime >= from + val beforeTo = to == null || absoluteStartTime < to + + return afterFrom && beforeTo + } + + /** + * Stop accepting data for the specified [studyDeploymentIds]. * - * @throws IllegalArgumentException when no data streams were ever opened for any of the [studyDeploymentIds]. + * @throws IllegalArgumentException when one or more of the specified [studyDeploymentIds] + * do not have configured data streams. */ override suspend fun closeDataStreams( studyDeploymentIds: Set ) { require( studyDeploymentIds.all { configuredDataStreams[ it ] != null } ) - { "No data streams configured for this study deployment." } + { "No data streams configured for this study deployment." } stoppedStudyDeploymentIds.addAll( studyDeploymentIds ) } diff --git a/carp.data.core/src/commonTest/kotlin/dk/cachet/carp/data/application/DataStreamServiceTest.kt b/carp.data.core/src/commonTest/kotlin/dk/cachet/carp/data/application/DataStreamServiceTest.kt index 63a53c478..ec1a30164 100644 --- a/carp.data.core/src/commonTest/kotlin/dk/cachet/carp/data/application/DataStreamServiceTest.kt +++ b/carp.data.core/src/commonTest/kotlin/dk/cachet/carp/data/application/DataStreamServiceTest.kt @@ -165,12 +165,88 @@ interface DataStreamServiceTest fun removeDataStream_ignores_unknown_ids() = runTest { val service = createServiceWithOpenStubDataPointStream() - val unknownDeploymentid = UUID.randomUUID() - val removed = service.removeDataStreams( setOf( stubDeploymentId, unknownDeploymentid ) ) + val unknownDeploymentId = UUID.randomUUID() + val removed = service.removeDataStreams( setOf( stubDeploymentId, unknownDeploymentId ) ) assertEquals( setOf( stubDeploymentId ), removed ) } + @Test + fun getBatchForStudyDeployments_succeeds() = runTest { + val service = createServiceWithOpenStubDataPointStream() + val batch = MutableDataStreamBatch().apply { + appendSequence( createStubSequence( 0, StubDataPoint(), StubDataPoint() ) ) + } + service.appendToDataStreams( stubDeploymentId, batch ) + + val collectedData = service.getBatchForStudyDeployments( setOf( stubDeploymentId ) ) + + assertNotNull( collectedData ) + assertTrue( collectedData.sequences.toList().isNotEmpty() ) + } + + @Test + fun getBatchForStudyDeployments_succeeds_with_empty_result() = runTest { + val service = createServiceWithOpenStubDataPointStream() + + val collectedData = service.getBatchForStudyDeployments( setOf( stubDeploymentId ) ) + + assertNotNull( collectedData ) + assertTrue( collectedData.sequences.toList().isEmpty() ) + } + + @Test + fun getBatchForStudyDeployments_succeeds_with_mixed_valid_invalid_deployment_ids() = runTest { + val service = createServiceWithOpenStubDataPointStream() + + val unknownDeploymentId = UUID.randomUUID() + val collectedData = service.getBatchForStudyDeployments( setOf( stubDeploymentId, unknownDeploymentId ) ) + + assertNotNull( collectedData ) + // Should handle mixed valid/invalid deployment IDs gracefully + assertTrue( collectedData.sequences.toList().isEmpty() ) // No data added yet + } + + @Test + fun getBatchForStudyDeployments_does_not_fail_when_all_deployment_ids_are_invalid() = runTest { + val service = createService() + + val unknownDeploymentId = UUID.randomUUID() + val batch = service.getBatchForStudyDeployments(setOf(stubDeploymentId, unknownDeploymentId)) + + assertNotNull(batch) + } + + @Test + fun getBatchForStudyDeployments_succeeds_with_empty_deployment_set() = runTest { + val service = createService() + + val collectedData = service.getBatchForStudyDeployments( emptySet() ) + + assertNotNull( collectedData ) + assertTrue( collectedData.sequences.toList().isEmpty() ) + } + + @Test + fun getBatchForStudyDeployments_with_filters_succeeds() = runTest { + val service = createServiceWithOpenStubDataPointStream() + val batch = MutableDataStreamBatch().apply { + appendSequence( createStubSequence( 0, StubDataPoint() ) ) + } + service.appendToDataStreams( stubDeploymentId, batch ) + + // Test that optional parameters don't break the API + val collectedData = service.getBatchForStudyDeployments( + setOf( stubDeploymentId ), + deviceRoleNames = setOf( stubSequenceDeviceRoleName ), + dataTypes = setOf( STUB_DATA_POINT_TYPE ), + from = now, + to = now + ) + + assertNotNull( collectedData ) + // API should succeed regardless of filter results + } /** * Create a data stream service and open [stubDataPointStream] for [stubDeploymentId]. diff --git a/carp.data.core/src/commonTest/kotlin/dk/cachet/carp/data/infrastructure/DataStreamServiceInfrastructureTest.kt b/carp.data.core/src/commonTest/kotlin/dk/cachet/carp/data/infrastructure/DataStreamServiceInfrastructureTest.kt index bc244bb1f..322644620 100644 --- a/carp.data.core/src/commonTest/kotlin/dk/cachet/carp/data/infrastructure/DataStreamServiceInfrastructureTest.kt +++ b/carp.data.core/src/commonTest/kotlin/dk/cachet/carp/data/infrastructure/DataStreamServiceInfrastructureTest.kt @@ -26,7 +26,13 @@ class DataStreamServiceRequestsTest : ApplicationServiceRequestsTest + sequence.measurements.forEach { measurement -> + val absoluteStartTime = Instant.fromEpochMilliseconds( + sequence.syncPoint.applyToTimestamp(measurement.sensorStartTime) / 1000L + ) + assertTrue(absoluteStartTime in fromTime..( + dataStreamId, + 0L, + listOf(1), + SyncPoint.UnixEpoch + ) + + val measurements = mutableListOf>() + repeat(count) { i -> + val measurement = Measurement( + sensorStartTime = startTimestamp + i * 1000L, // 1ms apart in microseconds + sensorEndTime = null, + dataType = dataType, + data = NoData + ) + measurements.add(measurement) + } + + sequence.appendMeasurements(measurements) + + val batch = MutableDataStreamBatch() + batch.appendSequence(sequence) + return batch + } + + @Test + fun getBatchForStudyDeployments_preserves_order_and_non_overlap_per_stream() = runTest { + val d1 = DataStreamId(deploymentId1, "device1", dataType1) + val cfg = DataStreamsConfiguration( + deploymentId1, + setOf( + DataStreamsConfiguration.ExpectedDataStream.fromDataStreamId(d1) + ) + ) + service.openDataStreams(cfg) + + // Two appends produce two consecutive sequences in the same stream. + // First batch: seqIds 0..2 (3 measurements) + val batchA = createTestBatchWithSequenceId(deploymentId1, "device1", dataType1, 1_000_000L, 3, 0L) + // Second batch: seqIds 3..4 (2 measurements) - must start after first batch ends + val batchB = createTestBatchWithSequenceId(deploymentId1, "device1", dataType1, 4_000_000L, 2, 3L) + service.appendToDataStreams(deploymentId1, batchA) + service.appendToDataStreams(deploymentId1, batchB) + + val result = service.getBatchForStudyDeployments( + studyDeploymentIds = setOf(deploymentId1), + deviceRoleNames = setOf("device1"), + dataTypes = setOf(dataType1), + from = null, to = null + ) + + // Group per stream and verify ordering & non-overlap. + val byStream = result.sequences.groupBy { it.dataStream } + byStream.forEach { (_, seqs) -> + // sorted by sequenceId range start + val sorted = seqs.toList().sortedBy { it.range.first } + assertEquals(sorted, seqs.toList()) + + // non-overlapping: last.end < next.start + for (i in 0 until sorted.size - 1) + { + val a = sorted[i].range + val b = sorted[i + 1].range + assertEquals( + a.last + 1, + b.first, + "Sequences must be immediately consecutive or disjoint without overlap" + ) + } + } + } + + /** + * Helper function to create a test batch with sequential timestamps and custom firstSequenceId. + */ + private fun createTestBatchWithSequenceId( + deploymentId: UUID, + deviceRoleName: String, + dataType: DataType, + startTimestamp: Long, + count: Int, + firstSequenceId: Long + ): DataStreamBatch + { + val dataStreamId = DataStreamId(deploymentId, deviceRoleName, dataType) + val sequence = MutableDataStreamSequence( + dataStreamId, + firstSequenceId, + listOf(1), + SyncPoint.UnixEpoch + ) + + val measurements = mutableListOf>() + repeat(count) { i -> + val measurement = Measurement( + sensorStartTime = startTimestamp + i * 1000L, // 1ms apart in microseconds + sensorEndTime = null, + dataType = dataType, + data = NoData + ) + measurements.add(measurement) + } + + sequence.appendMeasurements(measurements) + + val batch = MutableDataStreamBatch() + batch.appendSequence(sequence) + return batch + } + + @Test + fun getBatchForStudyDeployments_clipping_shifts_firstSequenceId() = runTest { + val d1 = DataStreamId(deploymentId1, "device1", dataType1) + service.openDataStreams( + DataStreamsConfiguration( + deploymentId1, + setOf( + DataStreamsConfiguration.ExpectedDataStream.fromDataStreamId(d1) + ) + ) + ) + // Build a single sequence with 5 points, seqIds 0..4 + // Use microsecond timestamps: 1_000_000, 1_001_000, 1_002_000, 1_003_000, 1_004_000 (1000ms, 1001ms, 1002ms, 1003ms, 1004ms) + val batch = createTestBatch(deploymentId1, "device1", dataType1, 1_000_000L, 5) + service.appendToDataStreams(deploymentId1, batch) + + // Choose a fromTime that trims the first 2 points. + // fromTime = 1002ms = 1_002_000 microseconds, so it keeps measurements at indices 2, 3, 4 + val fromTime = Instant.fromEpochMilliseconds(1_002L) + val toTime = null + + val result = service.getBatchForStudyDeployments( + studyDeploymentIds = setOf(deploymentId1), + deviceRoleNames = null, + dataTypes = null, + from = fromTime, to = toTime + ) + + // Expect exactly one clipped sequence starting at firstSequenceId = 2 + val seqs = result.sequences.toList() + assertEquals(1, seqs.size) + assertEquals(2L, seqs[0].firstSequenceId) + assertEquals(3, seqs[0].measurements.size) // ids 2,3,4 remain + + assertPerStreamOrderAndNonOverlap(result) + } + + private fun createNonMonotonicBatch( + deploymentId: UUID, + dataType: DataType + ): DataStreamBatch + { + val id = DataStreamId(deploymentId, "device1", dataType) + val seq = MutableDataStreamSequence(id, 0L, listOf(1), SyncPoint.UnixEpoch) + // Deliberately wobble timestamps (microseconds) + // Use microsecond values that correspond to 1001ms, 1000ms, 1003ms, 1002.5ms, 1004ms + val base = 1_000_000L // 1000 milliseconds in microseconds + val ts = listOf(base + 1000, base + 0, base + 3000, base + 2500, base + 4000) + val ms = ts.map { t -> + Measurement(sensorStartTime = t, sensorEndTime = null, dataType = dataType, data = NoData) + } + seq.appendMeasurements(ms) + return MutableDataStreamBatch().apply { appendSequence(seq) } + } + + @Test + fun getBatchForStudyDeployments_handles_non_monotonic_timestamps() = runTest { + val id = DataStreamId(deploymentId1, "device1", dataType1) + service.openDataStreams( + DataStreamsConfiguration( + deploymentId1, + setOf( + DataStreamsConfiguration.ExpectedDataStream.fromDataStreamId(id) + ) + ) + ) + service.appendToDataStreams(deploymentId1, createNonMonotonicBatch(deploymentId1, dataType1)) + + // Select a middle window that should pick a contiguous chunk by index (chunking logic) + val fromTime = Instant.fromEpochMilliseconds(1_002L) + val toTime = Instant.fromEpochMilliseconds(1_004L) + + val result = service.getBatchForStudyDeployments( + studyDeploymentIds = setOf(deploymentId1), + deviceRoleNames = null, + dataTypes = null, + from = fromTime, to = toTime + ) + + val seqs = result.sequences.toList() + assertTrue(seqs.isNotEmpty()) + + assertPerStreamOrderAndNonOverlap(result) + } + + private fun assertPerStreamOrderAndNonOverlap( batch: DataStreamBatch ) + { + val byStream = batch.sequences.groupBy { it.dataStream } + byStream.forEach { + (_, seqs) -> + val sorted = seqs.toList().sortedBy { it.range.first } + assertEquals(sorted, seqs.toList()) + for (i in 0 until sorted.size - 1) + { + val a = sorted[i].range + val b = sorted[i + 1].range + assertTrue(a.last + 1 <= b.first, "Overlapping sequences in stream") + } + } + } +} diff --git a/carp.data.core/src/commonTest/resources/getBatchForStudyDeployments_does_not_fail_when_all_deployment_ids_are_invalid.json b/carp.data.core/src/commonTest/resources/getBatchForStudyDeployments_does_not_fail_when_all_deployment_ids_are_invalid.json new file mode 100644 index 000000000..60b882d47 --- /dev/null +++ b/carp.data.core/src/commonTest/resources/getBatchForStudyDeployments_does_not_fail_when_all_deployment_ids_are_invalid.json @@ -0,0 +1,16 @@ +[ + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.GetBatchForStudyDeployments", + "apiVersion": "1.1", + "studyDeploymentIds": [ + "7c824b0d-1143-46db-8e66-0fa4f2a34af9", + "085b7f5a-c7cc-4c73-9e33-a09a169338d2" + ] + }, + "precedingEvents": [], + "publishedEvents": [], + "response": [] + } +] \ No newline at end of file diff --git a/carp.data.core/src/commonTest/resources/getBatchForStudyDeployments_succeeds.json b/carp.data.core/src/commonTest/resources/getBatchForStudyDeployments_succeeds.json new file mode 100644 index 000000000..fb47def40 --- /dev/null +++ b/carp.data.core/src/commonTest/resources/getBatchForStudyDeployments_succeeds.json @@ -0,0 +1,112 @@ +[ + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.OpenDataStreams", + "apiVersion": "1.1", + "configuration": { + "studyDeploymentId": "7c824b0d-1143-46db-8e66-0fa4f2a34af9", + "expectedDataStreams": [ + { + "deviceRoleName": "Device", + "dataType": "dk.cachet.carp.stubpoint" + } + ] + } + }, + "precedingEvents": [], + "publishedEvents": [], + "response": {} + }, + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.AppendToDataStreams", + "apiVersion": "1.1", + "studyDeploymentId": "7c824b0d-1143-46db-8e66-0fa4f2a34af9", + "batch": [ + { + "dataStream": { + "studyDeploymentId": "7c824b0d-1143-46db-8e66-0fa4f2a34af9", + "deviceRoleName": "Device", + "dataType": "dk.cachet.carp.stubpoint" + }, + "firstSequenceId": 0, + "measurements": [ + { + "sensorStartTime": 0, + "data": { + "__type": "dk.cachet.carp.stubpoint", + "data": "Stub" + } + }, + { + "sensorStartTime": 0, + "data": { + "__type": "dk.cachet.carp.stubpoint", + "data": "Stub" + } + } + ], + "triggerIds": [ + 1 + ], + "syncPoint": { + "synchronizedOn": "2025-10-28T10:39:22.311612700Z", + "sensorTimestampAtSyncPoint": 1761647962311612, + "relativeClockSpeed": 1.0 + } + } + ] + }, + "precedingEvents": [], + "publishedEvents": [], + "response": {} + }, + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.GetBatchForStudyDeployments", + "apiVersion": "1.1", + "studyDeploymentIds": [ + "7c824b0d-1143-46db-8e66-0fa4f2a34af9" + ] + }, + "precedingEvents": [], + "publishedEvents": [], + "response": [ + { + "dataStream": { + "studyDeploymentId": "7c824b0d-1143-46db-8e66-0fa4f2a34af9", + "deviceRoleName": "Device", + "dataType": "dk.cachet.carp.stubpoint" + }, + "firstSequenceId": 0, + "measurements": [ + { + "sensorStartTime": 0, + "data": { + "__type": "dk.cachet.carp.stubpoint", + "data": "Stub" + } + }, + { + "sensorStartTime": 0, + "data": { + "__type": "dk.cachet.carp.stubpoint", + "data": "Stub" + } + } + ], + "triggerIds": [ + 1 + ], + "syncPoint": { + "synchronizedOn": "2025-10-28T10:39:22.311612700Z", + "sensorTimestampAtSyncPoint": 1761647962311612, + "relativeClockSpeed": 1.0 + } + } + ] + } +] \ No newline at end of file diff --git a/carp.data.core/src/commonTest/resources/getBatchForStudyDeployments_succeeds_with_empty_deployment_set.json b/carp.data.core/src/commonTest/resources/getBatchForStudyDeployments_succeeds_with_empty_deployment_set.json new file mode 100644 index 000000000..ac70c5d56 --- /dev/null +++ b/carp.data.core/src/commonTest/resources/getBatchForStudyDeployments_succeeds_with_empty_deployment_set.json @@ -0,0 +1,13 @@ +[ + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.GetBatchForStudyDeployments", + "apiVersion": "1.1", + "studyDeploymentIds": [] + }, + "precedingEvents": [], + "publishedEvents": [], + "response": [] + } +] \ No newline at end of file diff --git a/carp.data.core/src/commonTest/resources/getBatchForStudyDeployments_succeeds_with_empty_result.json b/carp.data.core/src/commonTest/resources/getBatchForStudyDeployments_succeeds_with_empty_result.json new file mode 100644 index 000000000..1e647fa7d --- /dev/null +++ b/carp.data.core/src/commonTest/resources/getBatchForStudyDeployments_succeeds_with_empty_result.json @@ -0,0 +1,34 @@ +[ + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.OpenDataStreams", + "apiVersion": "1.1", + "configuration": { + "studyDeploymentId": "7c824b0d-1143-46db-8e66-0fa4f2a34af9", + "expectedDataStreams": [ + { + "deviceRoleName": "Device", + "dataType": "dk.cachet.carp.stubpoint" + } + ] + } + }, + "precedingEvents": [], + "publishedEvents": [], + "response": {} + }, + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.GetBatchForStudyDeployments", + "apiVersion": "1.1", + "studyDeploymentIds": [ + "7c824b0d-1143-46db-8e66-0fa4f2a34af9" + ] + }, + "precedingEvents": [], + "publishedEvents": [], + "response": [] + } +] \ No newline at end of file diff --git a/carp.data.core/src/commonTest/resources/getBatchForStudyDeployments_succeeds_with_mixed_valid_invalid_deployment_ids.json b/carp.data.core/src/commonTest/resources/getBatchForStudyDeployments_succeeds_with_mixed_valid_invalid_deployment_ids.json new file mode 100644 index 000000000..220e7cbc8 --- /dev/null +++ b/carp.data.core/src/commonTest/resources/getBatchForStudyDeployments_succeeds_with_mixed_valid_invalid_deployment_ids.json @@ -0,0 +1,35 @@ +[ + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.OpenDataStreams", + "apiVersion": "1.1", + "configuration": { + "studyDeploymentId": "7c824b0d-1143-46db-8e66-0fa4f2a34af9", + "expectedDataStreams": [ + { + "deviceRoleName": "Device", + "dataType": "dk.cachet.carp.stubpoint" + } + ] + } + }, + "precedingEvents": [], + "publishedEvents": [], + "response": {} + }, + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.GetBatchForStudyDeployments", + "apiVersion": "1.1", + "studyDeploymentIds": [ + "7c824b0d-1143-46db-8e66-0fa4f2a34af9", + "571a3355-e602-4cd1-8abf-a4ed66b9c223" + ] + }, + "precedingEvents": [], + "publishedEvents": [], + "response": [] + } +] \ No newline at end of file diff --git a/carp.data.core/src/commonTest/resources/getBatchForStudyDeployments_with_filters_succeeds.json b/carp.data.core/src/commonTest/resources/getBatchForStudyDeployments_with_filters_succeeds.json new file mode 100644 index 000000000..6ac08aa14 --- /dev/null +++ b/carp.data.core/src/commonTest/resources/getBatchForStudyDeployments_with_filters_succeeds.json @@ -0,0 +1,80 @@ +[ + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.OpenDataStreams", + "apiVersion": "1.1", + "configuration": { + "studyDeploymentId": "7c824b0d-1143-46db-8e66-0fa4f2a34af9", + "expectedDataStreams": [ + { + "deviceRoleName": "Device", + "dataType": "dk.cachet.carp.stubpoint" + } + ] + } + }, + "precedingEvents": [], + "publishedEvents": [], + "response": {} + }, + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.AppendToDataStreams", + "apiVersion": "1.1", + "studyDeploymentId": "7c824b0d-1143-46db-8e66-0fa4f2a34af9", + "batch": [ + { + "dataStream": { + "studyDeploymentId": "7c824b0d-1143-46db-8e66-0fa4f2a34af9", + "deviceRoleName": "Device", + "dataType": "dk.cachet.carp.stubpoint" + }, + "firstSequenceId": 0, + "measurements": [ + { + "sensorStartTime": 0, + "data": { + "__type": "dk.cachet.carp.stubpoint", + "data": "Stub" + } + } + ], + "triggerIds": [ + 1 + ], + "syncPoint": { + "synchronizedOn": "2025-10-28T10:39:22.311612700Z", + "sensorTimestampAtSyncPoint": 1761647962311612, + "relativeClockSpeed": 1.0 + } + } + ] + }, + "precedingEvents": [], + "publishedEvents": [], + "response": {} + }, + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.GetBatchForStudyDeployments", + "apiVersion": "1.1", + "studyDeploymentIds": [ + "7c824b0d-1143-46db-8e66-0fa4f2a34af9" + ], + "deviceRoleNames": [ + "Device" + ], + "dataTypes": [ + "dk.cachet.carp.stubpoint" + ], + "from": "2025-10-28T10:39:22.311612700Z", + "to": "2025-10-28T10:39:22.311612700Z" + }, + "precedingEvents": [], + "publishedEvents": [], + "response": [] + } +] \ No newline at end of file diff --git a/carp.data.core/src/commonTest/resources/getDataStream_fails_for_unopened_streams.json b/carp.data.core/src/commonTest/resources/getDataStream_fails_for_unopened_streams.json new file mode 100644 index 000000000..d9f9374af --- /dev/null +++ b/carp.data.core/src/commonTest/resources/getDataStream_fails_for_unopened_streams.json @@ -0,0 +1,18 @@ +[ + { + "outcome": "Failed", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.GetDataStream", + "apiVersion": "1.1", + "dataStream": { + "studyDeploymentId": "7c824b0d-1143-46db-8e66-0fa4f2a34af9", + "deviceRoleName": "Device", + "dataType": "dk.cachet.carp.stubpoint" + }, + "fromSequenceId": 0 + }, + "precedingEvents": [], + "publishedEvents": [], + "exceptionType": "IllegalArgumentException" + } +] \ No newline at end of file diff --git a/carp.data.core/src/commonTest/resources/getDataStream_fails_for_wrong_sequence_ids.json b/carp.data.core/src/commonTest/resources/getDataStream_fails_for_wrong_sequence_ids.json new file mode 100644 index 000000000..7f75d694f --- /dev/null +++ b/carp.data.core/src/commonTest/resources/getDataStream_fails_for_wrong_sequence_ids.json @@ -0,0 +1,100 @@ +[ + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.OpenDataStreams", + "apiVersion": "1.1", + "configuration": { + "studyDeploymentId": "7c824b0d-1143-46db-8e66-0fa4f2a34af9", + "expectedDataStreams": [ + { + "deviceRoleName": "Device", + "dataType": "dk.cachet.carp.stubpoint" + } + ] + } + }, + "precedingEvents": [], + "publishedEvents": [], + "response": {} + }, + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.AppendToDataStreams", + "apiVersion": "1.1", + "studyDeploymentId": "7c824b0d-1143-46db-8e66-0fa4f2a34af9", + "batch": [ + { + "dataStream": { + "studyDeploymentId": "7c824b0d-1143-46db-8e66-0fa4f2a34af9", + "deviceRoleName": "Device", + "dataType": "dk.cachet.carp.stubpoint" + }, + "firstSequenceId": 0, + "measurements": [ + { + "sensorStartTime": 0, + "data": { + "__type": "dk.cachet.carp.stubpoint", + "data": "Stub" + } + }, + { + "sensorStartTime": 0, + "data": { + "__type": "dk.cachet.carp.stubpoint", + "data": "Stub" + } + } + ], + "triggerIds": [ + 1 + ], + "syncPoint": { + "synchronizedOn": "2025-10-28T10:39:22.311612700Z", + "sensorTimestampAtSyncPoint": 1761647962311612, + "relativeClockSpeed": 1.0 + } + } + ] + }, + "precedingEvents": [], + "publishedEvents": [], + "response": {} + }, + { + "outcome": "Failed", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.GetDataStream", + "apiVersion": "1.1", + "dataStream": { + "studyDeploymentId": "7c824b0d-1143-46db-8e66-0fa4f2a34af9", + "deviceRoleName": "Device", + "dataType": "dk.cachet.carp.stubpoint" + }, + "fromSequenceId": -1, + "toSequenceIdInclusive": 10 + }, + "precedingEvents": [], + "publishedEvents": [], + "exceptionType": "IllegalArgumentException" + }, + { + "outcome": "Failed", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.GetDataStream", + "apiVersion": "1.1", + "dataStream": { + "studyDeploymentId": "7c824b0d-1143-46db-8e66-0fa4f2a34af9", + "deviceRoleName": "Device", + "dataType": "dk.cachet.carp.stubpoint" + }, + "fromSequenceId": 1, + "toSequenceIdInclusive": 0 + }, + "precedingEvents": [], + "publishedEvents": [], + "exceptionType": "IllegalArgumentException" + } +] \ No newline at end of file diff --git a/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getBatchForStudyDeployments_does_not_fail_when_all_deployment_ids_are_invalid.json b/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getBatchForStudyDeployments_does_not_fail_when_all_deployment_ids_are_invalid.json new file mode 100644 index 000000000..f9da24083 --- /dev/null +++ b/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getBatchForStudyDeployments_does_not_fail_when_all_deployment_ids_are_invalid.json @@ -0,0 +1,16 @@ +[ + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.GetBatchForStudyDeployments", + "apiVersion": "1.1", + "studyDeploymentIds": [ + "dfd7a5be-6276-4cea-87a9-97b44e6a53ee", + "6fe2484d-7829-48fa-8a2e-5a07ae40b9df" + ] + }, + "precedingEvents": [], + "publishedEvents": [], + "response": [] + } +] \ No newline at end of file diff --git a/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getBatchForStudyDeployments_succeeds.json b/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getBatchForStudyDeployments_succeeds.json new file mode 100644 index 000000000..dec97f634 --- /dev/null +++ b/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getBatchForStudyDeployments_succeeds.json @@ -0,0 +1,112 @@ +[ + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.OpenDataStreams", + "apiVersion": "1.1", + "configuration": { + "studyDeploymentId": "dfd7a5be-6276-4cea-87a9-97b44e6a53ee", + "expectedDataStreams": [ + { + "deviceRoleName": "Device", + "dataType": "dk.cachet.carp.stubpoint" + } + ] + } + }, + "precedingEvents": [], + "publishedEvents": [], + "response": {} + }, + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.AppendToDataStreams", + "apiVersion": "1.1", + "studyDeploymentId": "dfd7a5be-6276-4cea-87a9-97b44e6a53ee", + "batch": [ + { + "dataStream": { + "studyDeploymentId": "dfd7a5be-6276-4cea-87a9-97b44e6a53ee", + "deviceRoleName": "Device", + "dataType": "dk.cachet.carp.stubpoint" + }, + "firstSequenceId": 0, + "measurements": [ + { + "sensorStartTime": 0, + "data": { + "__type": "dk.cachet.carp.stubpoint", + "data": "Stub" + } + }, + { + "sensorStartTime": 0, + "data": { + "__type": "dk.cachet.carp.stubpoint", + "data": "Stub" + } + } + ], + "triggerIds": [ + 1 + ], + "syncPoint": { + "synchronizedOn": "2025-10-27T15:48:33.455846700Z", + "sensorTimestampAtSyncPoint": 1761580113455846, + "relativeClockSpeed": 1.0 + } + } + ] + }, + "precedingEvents": [], + "publishedEvents": [], + "response": {} + }, + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.GetBatchForStudyDeployments", + "apiVersion": "1.1", + "studyDeploymentIds": [ + "dfd7a5be-6276-4cea-87a9-97b44e6a53ee" + ] + }, + "precedingEvents": [], + "publishedEvents": [], + "response": [ + { + "dataStream": { + "studyDeploymentId": "dfd7a5be-6276-4cea-87a9-97b44e6a53ee", + "deviceRoleName": "Device", + "dataType": "dk.cachet.carp.stubpoint" + }, + "firstSequenceId": 0, + "measurements": [ + { + "sensorStartTime": 0, + "data": { + "__type": "dk.cachet.carp.stubpoint", + "data": "Stub" + } + }, + { + "sensorStartTime": 0, + "data": { + "__type": "dk.cachet.carp.stubpoint", + "data": "Stub" + } + } + ], + "triggerIds": [ + 1 + ], + "syncPoint": { + "synchronizedOn": "2025-10-27T15:48:33.455846700Z", + "sensorTimestampAtSyncPoint": 1761580113455846, + "relativeClockSpeed": 1.0 + } + } + ] + } +] \ No newline at end of file diff --git a/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getBatchForStudyDeployments_succeeds_with_empty_deployment_set.json b/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getBatchForStudyDeployments_succeeds_with_empty_deployment_set.json new file mode 100644 index 000000000..ac70c5d56 --- /dev/null +++ b/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getBatchForStudyDeployments_succeeds_with_empty_deployment_set.json @@ -0,0 +1,13 @@ +[ + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.GetBatchForStudyDeployments", + "apiVersion": "1.1", + "studyDeploymentIds": [] + }, + "precedingEvents": [], + "publishedEvents": [], + "response": [] + } +] \ No newline at end of file diff --git a/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getBatchForStudyDeployments_succeeds_with_empty_result.json b/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getBatchForStudyDeployments_succeeds_with_empty_result.json new file mode 100644 index 000000000..f5e5a514f --- /dev/null +++ b/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getBatchForStudyDeployments_succeeds_with_empty_result.json @@ -0,0 +1,34 @@ +[ + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.OpenDataStreams", + "apiVersion": "1.1", + "configuration": { + "studyDeploymentId": "dfd7a5be-6276-4cea-87a9-97b44e6a53ee", + "expectedDataStreams": [ + { + "deviceRoleName": "Device", + "dataType": "dk.cachet.carp.stubpoint" + } + ] + } + }, + "precedingEvents": [], + "publishedEvents": [], + "response": {} + }, + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.GetBatchForStudyDeployments", + "apiVersion": "1.1", + "studyDeploymentIds": [ + "dfd7a5be-6276-4cea-87a9-97b44e6a53ee" + ] + }, + "precedingEvents": [], + "publishedEvents": [], + "response": [] + } +] \ No newline at end of file diff --git a/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getBatchForStudyDeployments_succeeds_with_mixed_valid_invalid_deployment_ids.json b/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getBatchForStudyDeployments_succeeds_with_mixed_valid_invalid_deployment_ids.json new file mode 100644 index 000000000..ce6757c2b --- /dev/null +++ b/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getBatchForStudyDeployments_succeeds_with_mixed_valid_invalid_deployment_ids.json @@ -0,0 +1,35 @@ +[ + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.OpenDataStreams", + "apiVersion": "1.1", + "configuration": { + "studyDeploymentId": "dfd7a5be-6276-4cea-87a9-97b44e6a53ee", + "expectedDataStreams": [ + { + "deviceRoleName": "Device", + "dataType": "dk.cachet.carp.stubpoint" + } + ] + } + }, + "precedingEvents": [], + "publishedEvents": [], + "response": {} + }, + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.GetBatchForStudyDeployments", + "apiVersion": "1.1", + "studyDeploymentIds": [ + "dfd7a5be-6276-4cea-87a9-97b44e6a53ee", + "f1cb0ddd-f5ef-402e-8962-592838e22937" + ] + }, + "precedingEvents": [], + "publishedEvents": [], + "response": [] + } +] \ No newline at end of file diff --git a/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getBatchForStudyDeployments_with_filters_succeeds.json b/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getBatchForStudyDeployments_with_filters_succeeds.json new file mode 100644 index 000000000..8d096aa06 --- /dev/null +++ b/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getBatchForStudyDeployments_with_filters_succeeds.json @@ -0,0 +1,80 @@ +[ + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.OpenDataStreams", + "apiVersion": "1.1", + "configuration": { + "studyDeploymentId": "dfd7a5be-6276-4cea-87a9-97b44e6a53ee", + "expectedDataStreams": [ + { + "deviceRoleName": "Device", + "dataType": "dk.cachet.carp.stubpoint" + } + ] + } + }, + "precedingEvents": [], + "publishedEvents": [], + "response": {} + }, + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.AppendToDataStreams", + "apiVersion": "1.1", + "studyDeploymentId": "dfd7a5be-6276-4cea-87a9-97b44e6a53ee", + "batch": [ + { + "dataStream": { + "studyDeploymentId": "dfd7a5be-6276-4cea-87a9-97b44e6a53ee", + "deviceRoleName": "Device", + "dataType": "dk.cachet.carp.stubpoint" + }, + "firstSequenceId": 0, + "measurements": [ + { + "sensorStartTime": 0, + "data": { + "__type": "dk.cachet.carp.stubpoint", + "data": "Stub" + } + } + ], + "triggerIds": [ + 1 + ], + "syncPoint": { + "synchronizedOn": "2025-10-27T15:48:33.455846700Z", + "sensorTimestampAtSyncPoint": 1761580113455846, + "relativeClockSpeed": 1.0 + } + } + ] + }, + "precedingEvents": [], + "publishedEvents": [], + "response": {} + }, + { + "outcome": "Succeeded", + "request": { + "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.GetBatchForStudyDeployments", + "apiVersion": "1.1", + "studyDeploymentIds": [ + "dfd7a5be-6276-4cea-87a9-97b44e6a53ee" + ], + "deviceRoleNames": [ + "Device" + ], + "dataTypes": [ + "dk.cachet.carp.stubpoint" + ], + "from": "2025-10-27T15:48:33.455846700Z", + "to": "2025-10-27T15:48:33.455846700Z" + }, + "precedingEvents": [], + "publishedEvents": [], + "response": [] + } +] \ No newline at end of file diff --git a/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getDataStream_fails_for_unopened_streams.json b/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getDataStream_fails_for_unopened_streams.json index 0c2c80ab5..caff256e0 100644 --- a/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getDataStream_fails_for_unopened_streams.json +++ b/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getDataStream_fails_for_unopened_streams.json @@ -5,16 +5,14 @@ "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.GetDataStream", "apiVersion": "1.1", "dataStream": { - "studyDeploymentId": "3de35fb5-c39e-4fea-8ccc-3a70ca618ce7", + "studyDeploymentId": "dfd7a5be-6276-4cea-87a9-97b44e6a53ee", "deviceRoleName": "Device", "dataType": "dk.cachet.carp.stubpoint" }, "fromSequenceId": 0 }, - "precedingEvents": [ - ], - "publishedEvents": [ - ], + "precedingEvents": [], + "publishedEvents": [], "exceptionType": "IllegalArgumentException" } ] \ No newline at end of file diff --git a/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getDataStream_fails_for_wrong_sequence_ids.json b/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getDataStream_fails_for_wrong_sequence_ids.json index db4eec4ce..43026456b 100644 --- a/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getDataStream_fails_for_wrong_sequence_ids.json +++ b/carp.data.core/src/commonTest/resources/test-requests/DataStreamService/1.1/DataStreamServiceTest/getDataStream_fails_for_wrong_sequence_ids.json @@ -5,7 +5,7 @@ "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.OpenDataStreams", "apiVersion": "1.1", "configuration": { - "studyDeploymentId": "3de35fb5-c39e-4fea-8ccc-3a70ca618ce7", + "studyDeploymentId": "dfd7a5be-6276-4cea-87a9-97b44e6a53ee", "expectedDataStreams": [ { "deviceRoleName": "Device", @@ -14,23 +14,20 @@ ] } }, - "precedingEvents": [ - ], - "publishedEvents": [ - ], - "response": { - } + "precedingEvents": [], + "publishedEvents": [], + "response": {} }, { "outcome": "Succeeded", "request": { "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.AppendToDataStreams", "apiVersion": "1.1", - "studyDeploymentId": "3de35fb5-c39e-4fea-8ccc-3a70ca618ce7", + "studyDeploymentId": "dfd7a5be-6276-4cea-87a9-97b44e6a53ee", "batch": [ { "dataStream": { - "studyDeploymentId": "3de35fb5-c39e-4fea-8ccc-3a70ca618ce7", + "studyDeploymentId": "dfd7a5be-6276-4cea-87a9-97b44e6a53ee", "deviceRoleName": "Device", "dataType": "dk.cachet.carp.stubpoint" }, @@ -55,19 +52,16 @@ 1 ], "syncPoint": { - "synchronizedOn": "2022-10-13T11:43:38.577859500Z", - "sensorTimestampAtSyncPoint": 1665661418577859, + "synchronizedOn": "2025-10-27T15:48:33.455846700Z", + "sensorTimestampAtSyncPoint": 1761580113455846, "relativeClockSpeed": 1.0 } } ] }, - "precedingEvents": [ - ], - "publishedEvents": [ - ], - "response": { - } + "precedingEvents": [], + "publishedEvents": [], + "response": {} }, { "outcome": "Failed", @@ -75,17 +69,15 @@ "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.GetDataStream", "apiVersion": "1.1", "dataStream": { - "studyDeploymentId": "3de35fb5-c39e-4fea-8ccc-3a70ca618ce7", + "studyDeploymentId": "dfd7a5be-6276-4cea-87a9-97b44e6a53ee", "deviceRoleName": "Device", "dataType": "dk.cachet.carp.stubpoint" }, "fromSequenceId": -1, "toSequenceIdInclusive": 10 }, - "precedingEvents": [ - ], - "publishedEvents": [ - ], + "precedingEvents": [], + "publishedEvents": [], "exceptionType": "IllegalArgumentException" }, { @@ -94,17 +86,15 @@ "__type": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.GetDataStream", "apiVersion": "1.1", "dataStream": { - "studyDeploymentId": "3de35fb5-c39e-4fea-8ccc-3a70ca618ce7", + "studyDeploymentId": "dfd7a5be-6276-4cea-87a9-97b44e6a53ee", "deviceRoleName": "Device", "dataType": "dk.cachet.carp.stubpoint" }, "fromSequenceId": 1, "toSequenceIdInclusive": 0 }, - "precedingEvents": [ - ], - "publishedEvents": [ - ], + "precedingEvents": [], + "publishedEvents": [], "exceptionType": "IllegalArgumentException" } ] \ No newline at end of file diff --git a/docs/carp-data.md b/docs/carp-data.md index ff9377c9a..11aac5737 100644 --- a/docs/carp-data.md +++ b/docs/carp-data.md @@ -29,5 +29,6 @@ Store and retrieve [`DataStreamPoint`](../carp.data.core/src/commonMain/kotlin/d | `openDataStreams` | Start accepting data for a specific study deployment. | manage deployment: `studyDeploymentId`| | | `appendToDataStreams` | Append a batch of data point sequences to corresponding data streams. | in deployment: `studyDeploymentId` | | | `getDataStream` | Retrieve all data points in data stream that fall within the requested range. | in deployment: `dataStream.studyDeploymentId` | | +| `getBatchForStudyDeployments` | Fetch collected data across multiple study deployments with optional filtering by device roles, data types, and time range. | in deployment: (all) `studyDeploymentIds` | | | `closeDataStreams` | Stop accepting data for specified study deployments. | manage deployment: (all) `studyDeploymentId` | | -| `removeDataStreams` | Close data streams and remove all data for specified study deployments. | manage deployment: (all) `studyDeploymentId` | | +| `removeDataStreams` | Close data streams and remove all data for specified study deployments. | manage deployment: (all) `studyDeploymentId` | | diff --git a/rpc/schemas/data/DataStreamService/DataStreamServiceRequest.json b/rpc/schemas/data/DataStreamService/DataStreamServiceRequest.json index 3c7fbecc3..0b3c9d890 100644 --- a/rpc/schemas/data/DataStreamService/DataStreamServiceRequest.json +++ b/rpc/schemas/data/DataStreamService/DataStreamServiceRequest.json @@ -5,7 +5,8 @@ { "$ref": "#AppendToDataStreams" }, { "$ref": "#GetDataStream" }, { "$ref": "#CloseDataStreams" }, - { "$ref": "#RemoveDataStreams" } + { "$ref": "#RemoveDataStreams" }, + { "$ref": "#GetBatchForStudyDeployments" } ], "$defs": { "ApiVersion": { "const": "1.1" }, @@ -57,6 +58,29 @@ "$ref": "../DataStreamBatch.json" } }, + "GetBatchForStudyDeployments": { + "$anchor": "GetBatchForStudyDeployments", + "type": "object", + "properties": { + "__type": { "const": "dk.cachet.carp.data.infrastructure.DataStreamServiceRequest.GetBatchForStudyDeployments" }, + "apiVersion": { "$ref": "#/$defs/ApiVersion" }, + "studyDeploymentIds": { + "type": "array", + "items": { "type": "string", "format": "uuid" } + }, + "dataTypes": { + "type": ["array", "null"], + "items": {"$ref": "../common/NamespacedId.json" } + }, + "from": { "type": "string", "format": "date-time" }, + "to": { "type": "string", "format": "date-time" } + }, + "required": [ "__type", "apiVersion", "studyDeploymentIds" ], + "Response": { + "$anchor": "GetBatchForStudyDeployments-Response", + "$ref": "../DataStreamBatch.json" + } + }, "CloseDataStreams": { "$anchor": "CloseDataStreams", "type": "object", diff --git a/rpc/src/main/kotlin/dk/cachet/carp/rpc/GenerateExampleRequests.kt b/rpc/src/main/kotlin/dk/cachet/carp/rpc/GenerateExampleRequests.kt index 93c4df393..9aa5601e4 100644 --- a/rpc/src/main/kotlin/dk/cachet/carp/rpc/GenerateExampleRequests.kt +++ b/rpc/src/main/kotlin/dk/cachet/carp/rpc/GenerateExampleRequests.kt @@ -154,7 +154,6 @@ private val studyLiveStatus = StudyStatus.Live( studyId, studyName, studyCreated // Deployment data matching the example protocol. private val deploymentId = UUID( "c9cc5317-48da-45f2-958e-58bc07f34681" ) -private val deploymentName = "Boaty McBoatface" private val deploymentIds = setOf( deploymentId, UUID( "d4a9bba4-860e-4c58-a356-8a91605dc1ee" ) ) private val deploymentCreatedOn = Instant.fromEpochSeconds( 1642504000 ) private val participantId = UUID( "32880e82-01c9-40cf-a6ed-17ff3348f251" ) @@ -166,7 +165,6 @@ private val studyInvitation = StudyInvitation( ApplicationData( "{\"trialGroup\", \"A\"}" ) ) private val participantAssignedRoles = AssignedTo.Roles( setOf( participantRole.role ) ) -private val roleAssignment = setOf( AssignedParticipantRoles( participantId, participantAssignedRoles ) ) private val participantInvitation = ParticipantInvitation( participantId, participantAssignedRoles, @@ -385,44 +383,17 @@ private val exampleRequests: Map, LoggedRequest.Succeeded<*>> = map RecruitmentService::inviteNewParticipantGroup to example( request = RecruitmentServiceRequest.InviteNewParticipantGroup( studyId, - setOf( AssignedParticipantRoles( participantId, participantAssignedRoles ) ), - deploymentName + setOf( AssignedParticipantRoles( participantId, participantAssignedRoles ) ) ), - response = ParticipantGroupStatus.Invited( - deploymentId, - participants, - roleAssignment, - participantGroupInvitedOn, - invitedDeploymentStatus, - deploymentName - ) + response = ParticipantGroupStatus.Invited( deploymentId, participants, setOf( AssignedParticipantRoles( participantId, participantAssignedRoles ) ), participantGroupInvitedOn, invitedDeploymentStatus ) ), RecruitmentService::getParticipantGroupStatusList to example( request = RecruitmentServiceRequest.GetParticipantGroupStatusList( studyId ), - response = listOf( - ParticipantGroupStatus.Running( - deploymentId, - participants, - roleAssignment, - participantGroupInvitedOn, - runningDeploymentStatus, - runningDeploymentStatus.startedOn, - deploymentName - ) - ) + response = listOf( ParticipantGroupStatus.Running( deploymentId, participants, setOf( AssignedParticipantRoles( participantId, participantAssignedRoles ) ), participantGroupInvitedOn, runningDeploymentStatus, runningDeploymentStatus.startedOn ) ) ), RecruitmentService::stopParticipantGroup to example( request = RecruitmentServiceRequest.StopParticipantGroup( studyId, deploymentId ), - response = ParticipantGroupStatus.Stopped( - deploymentId, - participants, - roleAssignment, - participantGroupInvitedOn, - stoppedDeploymentStatus, - stoppedDeploymentStatus.startedOn, - stoppedDeploymentStatus.stoppedOn, - deploymentName - ) + response = ParticipantGroupStatus.Stopped( deploymentId, participants, setOf( AssignedParticipantRoles( participantId, participantAssignedRoles ) ), participantGroupInvitedOn, stoppedDeploymentStatus, stoppedDeploymentStatus.startedOn, stoppedDeploymentStatus.stoppedOn ) ), // DeploymentService @@ -512,6 +483,16 @@ private val exampleRequests: Map, LoggedRequest.Succeeded<*>> = map request = DataStreamServiceRequest.GetDataStream( phoneGeoDataStream, 0, 100 ), response = MutableDataStreamBatch().apply { appendSequence( geoDataSequence ) } ), + DataStreamService::getBatchForStudyDeployments to example( + request = DataStreamServiceRequest.GetBatchForStudyDeployments( + studyDeploymentIds = deploymentIds, + dataTypes = null, + from = deploymentCreatedOn, + to = deploymentCreatedOn + 1.days + ), + response = phoneDataStreamBatch + ), + DataStreamService::closeDataStreams to example( request = DataStreamServiceRequest.CloseDataStreams( deploymentIds ) ),