Skip to content

Commit 5bf516f

Browse files
author
Johan 't Hart
committed
Add possibility for callee instance to return an observable
#238
1 parent fe62d3e commit 5bf516f

File tree

5 files changed

+109
-14
lines changed

5 files changed

+109
-14
lines changed

src/netstandard/Tests/WampSharp.Tests.Wampv2/Integration/RpcProgressTests.cs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,29 @@ public async Task ProgressiveCallsCallerProgress()
4141
Assert.That(callback.Task.Result, Is.EqualTo(10));
4242
}
4343

44+
[Test]
45+
public async Task ProgressiveCallsCallerProgressObservable()
46+
{
47+
WampPlayground playground = new WampPlayground();
48+
49+
CallerCallee dualChannel = await playground.GetCallerCalleeDualChannel();
50+
IWampChannel calleeChannel = dualChannel.CalleeChannel;
51+
IWampChannel callerChannel = dualChannel.CallerChannel;
52+
53+
await calleeChannel.RealmProxy.Services.RegisterCallee(new LongOpObsService());
54+
55+
MyCallback callback = new MyCallback();
56+
57+
callerChannel.RealmProxy.RpcCatalog.Invoke
58+
(callback,
59+
new CallOptions() { ReceiveProgress = true },
60+
"com.myapp.longop",
61+
new object[] { 10, false });
62+
63+
Assert.That(callback.Task.Result, Is.EqualTo(-1));
64+
CollectionAssert.AreEquivalent(Enumerable.Range(0, 10), callback.ProgressiveResults);
65+
}
66+
4467
[Test]
4568
public async Task ProgressiveCallsCalleeProxyProgress()
4669
{
@@ -206,7 +229,7 @@ public class MyCallback : IWampRawRpcOperationClientCallback
206229

207230
public void Result<TMessage>(IWampFormatter<TMessage> formatter, ResultDetails details)
208231
{
209-
throw new NotImplementedException();
232+
mTask.SetResult(-1); // -1 indicates no final return value
210233
}
211234

212235
public void Result<TMessage>(IWampFormatter<TMessage> formatter, ResultDetails details, TMessage[] arguments)

src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/LocalRpcOperation.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ protected void CallResult(IWampRawRpcOperationRouterCallback caller, YieldOption
7777
{
7878
caller.Result(ObjectFormatter, options, arguments, argumentKeywords);
7979
}
80-
else if (!this.HasResult)
80+
else if (!this.HasResult || arguments == null)
8181
{
8282
caller.Result(ObjectFormatter, options);
8383
}
@@ -93,7 +93,7 @@ protected IEnumerable<object> UnpackParameters<TMessage>(IWampFormatter<TMessage
9393
{
9494
ArgumentUnpacker unpacker = new ArgumentUnpacker(Parameters);
9595

96-
IEnumerable<object> result =
96+
IEnumerable<object> result =
9797
unpacker.UnpackParameters(formatter, arguments, argumentsKeywords);
9898

9999
return result;

src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/Reflection/OperationExtractor.cs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,11 @@ protected IWampRpcOperation CreateRpcMethod(Func<object> instanceProvider, ICall
5757
string procedureUri =
5858
interceptor.GetProcedureUri(method);
5959

60-
if (!typeof (Task).IsAssignableFrom(method.ReturnType))
60+
if (method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(IObservable<>))
61+
{
62+
return CreateProgressiveObservableOperation(instanceProvider, method, procedureUri);
63+
}
64+
else if (!typeof (Task).IsAssignableFrom(method.ReturnType))
6165
{
6266
MethodInfoValidation.ValidateSyncMethod(method);
6367
return new SyncMethodInfoRpcOperation(instanceProvider, method, procedureUri);
@@ -97,5 +101,25 @@ private static IWampRpcOperation CreateProgressiveOperation(Func<object> instanc
97101

98102
return operation;
99103
}
104+
105+
private static IWampRpcOperation CreateProgressiveObservableOperation(Func<object> instanceProvider, MethodInfo method, string procedureUri)
106+
{
107+
//return new ProgressiveObservableMethodInfoRpcOperation<returnType>
108+
// (instance, method, procedureUri);
109+
110+
Type returnType = method.ReturnType.GetGenericArguments()[0];
111+
112+
Type operationType =
113+
typeof(ProgressiveObservableMethodInfoRpcOperation<>)
114+
.MakeGenericType(returnType);
115+
116+
IWampRpcOperation operation =
117+
(IWampRpcOperation)Activator.CreateInstance(operationType,
118+
instanceProvider,
119+
method,
120+
procedureUri);
121+
122+
return operation;
123+
}
100124
}
101-
}
125+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Reflection;
5+
using System.Threading;
6+
using WampSharp.Core.Utilities;
7+
using WampSharp.Core.Serialization;
8+
using WampSharp.V2.Core.Contracts;
9+
10+
namespace WampSharp.V2.Rpc
11+
{
12+
public class ProgressiveObservableMethodInfoRpcOperation<T> : SyncMethodInfoRpcOperation
13+
{
14+
public ProgressiveObservableMethodInfoRpcOperation(Func<object> instanceProvider, MethodInfo method, string procedureName) :
15+
base(instanceProvider, method, procedureName)
16+
{
17+
}
18+
19+
protected override void OnResult(IWampRawRpcOperationRouterCallback caller, object result, IDictionary<string, object> outputs)
20+
{
21+
((IObservable<T>)result).Subscribe(
22+
it => CallResult(caller, it, outputs, new YieldOptions { Progress = true }),
23+
ex =>
24+
{
25+
if (ex is WampException wampex)
26+
HandleException(caller, wampex);
27+
else
28+
HandleException(caller, ex);
29+
},
30+
// An observable does not emit any value when completing, so result without arguments
31+
() => caller.Result(ObjectFormatter, new YieldOptions())
32+
);
33+
}
34+
}
35+
}

src/netstandard/WampSharp/WAMP2/V2/Rpc/Callee/SyncLocalRpcOperation.cs

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,30 +29,43 @@ protected override IWampCancellableInvocation InnerInvoke<TMessage>(IWampRawRpcO
2929
argumentsKeywords,
3030
out IDictionary<string, object> outputs);
3131

32-
CallResult(caller, result, outputs);
32+
OnResult(caller, result, outputs);
3333
}
3434
catch (WampException ex)
3535
{
36-
mLogger.ErrorFormat(ex, "An error occurred while calling {ProcedureUri}", this.Procedure);
37-
IWampErrorCallback callback = new WampRpcErrorCallback(caller);
38-
callback.Error(ex);
36+
HandleException(caller, ex);
3937
}
4038
catch (Exception ex)
4139
{
42-
WampException wampException = ConvertExceptionToRuntimeException(ex);
43-
IWampErrorCallback callback = new WampRpcErrorCallback(caller);
44-
callback.Error(wampException);
40+
HandleException(caller, ex);
4541
}
4642

4743
return null;
4844
}
4945

46+
protected void HandleException(IWampRawRpcOperationRouterCallback caller, WampException ex)
47+
{
48+
mLogger.ErrorFormat(ex, "An error occurred while calling {ProcedureUri}", this.Procedure);
49+
IWampErrorCallback callback = new WampRpcErrorCallback(caller);
50+
callback.Error(ex);
51+
}
52+
53+
protected void HandleException(IWampRawRpcOperationRouterCallback caller, Exception ex)
54+
{
55+
WampException wampException = ConvertExceptionToRuntimeException(ex);
56+
IWampErrorCallback callback = new WampRpcErrorCallback(caller);
57+
callback.Error(wampException);
58+
}
59+
60+
protected virtual void OnResult(IWampRawRpcOperationRouterCallback caller, object result, IDictionary<string, object> outputs) =>
61+
CallResult(caller, result, outputs);
62+
5063
protected void CallResult(IWampRawRpcOperationRouterCallback caller, object result, IDictionary<string, object> outputs, YieldOptions yieldOptions = null)
5164
{
5265
yieldOptions = yieldOptions ?? new YieldOptions();
5366
object[] resultArguments = GetResultArguments(result);
5467

55-
IDictionary<string, object> argumentKeywords =
68+
IDictionary<string, object> argumentKeywords =
5669
GetResultArgumentKeywords(result, outputs);
5770

5871
CallResult(caller,
@@ -69,4 +82,4 @@ protected virtual IDictionary<string, object> GetResultArgumentKeywords(object r
6982
protected abstract object InvokeSync<TMessage>
7083
(IWampRawRpcOperationRouterCallback caller, IWampFormatter<TMessage> formatter, InvocationDetails details, TMessage[] arguments, IDictionary<string, TMessage> argumentsKeywords, out IDictionary<string, object> outputs);
7184
}
72-
}
85+
}

0 commit comments

Comments
 (0)