Skip to content

Commit

Permalink
Merge pull request #3 from maca88/small-improvements
Browse files Browse the repository at this point in the history
Small improvements
  • Loading branch information
lepicekmichal authored May 12, 2024
2 parents 5ca0fcb + 55bc5aa commit 454ad1d
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.filter
Expand All @@ -13,7 +13,6 @@ import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.onSubscription
import kotlinx.coroutines.launch
import kotlinx.serialization.InternalSerializationApi
import kotlinx.serialization.SerializationException
Expand All @@ -26,9 +25,9 @@ abstract class HubCommunicationLink(private val json: Json) : HubCommunication()

protected abstract val scope: CoroutineScope

protected abstract val receivedInvocations: SharedFlow<HubMessage.Invocation>
protected abstract val receivedCompletions: SharedFlow<HubMessage.Completion>
protected abstract val receivedStreamItems: SharedFlow<HubMessage.StreamItem>
private val receivedInvocations = MutableSharedFlow<HubMessage.Invocation>()
private val receivedCompletions = MutableSharedFlow<HubMessage.Completion>()
private val receivedStreamItems = MutableSharedFlow<HubMessage.StreamItem>()

private val resultProviderRegistry: MutableSet<String> = mutableSetOf()

Expand Down Expand Up @@ -96,9 +95,9 @@ abstract class HubCommunicationLink(private val json: Json) : HubCommunication()
try {
it.result.fromJson(resultType)
} catch (ex: SerializationException) {
throw RuntimeException("Completion result could not be parsed as ${resultType.simpleName}: ${it.result}")
throw RuntimeException("Completion result could not be parsed as ${resultType.simpleName}: ${it.result}", ex)
} catch (ex: IllegalArgumentException) {
throw RuntimeException("${resultType.simpleName} could not be initialized from the completion result: ${it.result}")
throw RuntimeException("${resultType.simpleName} could not be initialized from the completion result: ${it.result}", ex)
}
},
)
Expand Down Expand Up @@ -162,9 +161,9 @@ abstract class HubCommunicationLink(private val json: Json) : HubCommunication()
try {
it.item.fromJson(itemType)
} catch (ex: SerializationException) {
throw RuntimeException("Completion result could not be parsed as ${itemType.simpleName}: ${it.item}")
throw RuntimeException("Completion result could not be parsed as ${itemType.simpleName}: ${it.item}", ex)
} catch (ex: IllegalArgumentException) {
throw RuntimeException("${itemType.simpleName} could not be initialized from the completion result: ${it.item}")
throw RuntimeException("${itemType.simpleName} could not be initialized from the completion result: ${it.item}", ex)
}
}
.collect { if (!isClosedForSend) send(it) }
Expand All @@ -189,6 +188,31 @@ abstract class HubCommunicationLink(private val json: Json) : HubCommunication()
}
}

protected suspend fun processReceivedInvocation(message: HubMessage.Invocation) {
if (message is HubMessage.Invocation.Blocking && !resultProviderRegistry.contains(message.target)) {
logger.log(
severity = Logger.Severity.WARNING,
message = "There is no result provider for '${message.target}' despite server expecting it.",
cause = null,
)

complete(
HubMessage.Completion.Error(
invocationId = message.invocationId,
error = "Client did not provide a result.",
)
)
}

receivedInvocations.emit(message)
}

protected suspend fun processReceivedStreamItem(message: HubMessage.StreamItem) =
receivedStreamItems.emit(message)

protected suspend fun processReceivedCompletion(message: HubMessage.Completion) =
receivedCompletions.emit(message)

final override fun <T : Any> Flow<HubMessage.Invocation>.handleIncomingInvocation(
resultType: KClass<T>,
callback: suspend (HubMessage.Invocation) -> T,
Expand Down Expand Up @@ -245,36 +269,16 @@ abstract class HubCommunicationLink(private val json: Json) : HubCommunication()
}

final override fun on(target: String, hasResult: Boolean): Flow<HubMessage.Invocation> {
if (hasResult && resultProviderRegistry.contains(target)) {
if (hasResult && !resultProviderRegistry.add(target)) {
throw IllegalStateException("There can be only one function for returning result on blocking invocation (method: $target)")
}
return receivedInvocations
.run {
if (!hasResult) this
else this
.onSubscription { resultProviderRegistry.add(target) }
.onCompletion { resultProviderRegistry.remove(target) }
}
.filter { it.target == target }
.run {
if (hasResult) this
else this.onEach {
if (it is HubMessage.Invocation.Blocking) {
logger.log(
severity = Logger.Severity.WARNING,
message = "There is no result provider for ${it.target} despite server expecting it.",
cause = null,
)

complete(
HubMessage.Completion.Error(
invocationId = it.invocationId,
error = "Client did not provide a result."
),
)
}
}
}
.onEach { logger.log(Logger.Severity.INFO, "Received invocation: $it", null) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,6 @@ class HubConnection private constructor(
}
}

override val receivedInvocations = MutableSharedFlow<HubMessage.Invocation>()
override val receivedStreamItems = MutableSharedFlow<HubMessage.StreamItem>()
override val receivedCompletions = MutableSharedFlow<HubMessage.Completion>()

private val _connectionState: MutableStateFlow<HubConnectionState> = MutableStateFlow(HubConnectionState.DISCONNECTED)
val connectionState: StateFlow<HubConnectionState> = _connectionState.asStateFlow()

Expand Down Expand Up @@ -387,12 +383,12 @@ class HubConnection private constructor(
else stop(message.error)
}

is HubMessage.Invocation -> receivedInvocations.emit(message)
is HubMessage.Invocation -> processReceivedInvocation(message)
is HubMessage.StreamInvocation -> Unit // not supported yet
is HubMessage.Ping -> Unit
is HubMessage.CancelInvocation -> Unit // this should not happen according to standard
is HubMessage.StreamItem -> receivedStreamItems.emit(message)
is HubMessage.Completion -> receivedCompletions.emit(message)
is HubMessage.StreamItem -> processReceivedStreamItem(message)
is HubMessage.Completion -> processReceivedCompletion(message)
}
}
}
Expand Down

0 comments on commit 454ad1d

Please sign in to comment.