diff --git a/.gitignore b/.gitignore
index 3a3033b..e7e8768 100644
--- a/.gitignore
+++ b/.gitignore
@@ -207,4 +207,6 @@ UpgradeLog*.htm
.dotnet/
# Tools
-tools/
\ No newline at end of file
+tools/
+
+.vs/
\ No newline at end of file
diff --git a/build.ps1 b/build.ps1
index 96be84d..a30bd52 100644
--- a/build.ps1
+++ b/build.ps1
@@ -30,11 +30,11 @@ Param(
)
$FakeVersion = "4.57.4"
-$NUnitVersion = "3.6.0"
+$NUnitVersion = "3.9.0"
$DotNetChannel = "preview";
-$DotNetVersion = "1.0.0";
-$DotNetInstallerUri = "https://raw.githubusercontent.com/dotnet/cli/rel/1.0.0/scripts/obtain/dotnet-install.ps1";
-$NugetVersion = "4.1.0";
+$DotNetVersion = "3.0.100";
+$DotNetInstallerUri = "https://dot.net/v1/dotnet-install.ps1";
+$NugetVersion = "5.3.1";
$NugetUrl = "https://dist.nuget.org/win-x86-commandline/v$NugetVersion/nuget.exe"
# Make sure tools folder exists
diff --git a/src/api/Reactive.Streams/Reactive.Streams.csproj b/src/api/Reactive.Streams/Reactive.Streams.csproj
index aad21f6..13324c1 100644
--- a/src/api/Reactive.Streams/Reactive.Streams.csproj
+++ b/src/api/Reactive.Streams/Reactive.Streams.csproj
@@ -6,7 +6,7 @@
CC0 1.0 Universal
1.0.3
Reactive Streams
- netstandard1.0;net45
+ netstandard1.0;net45;netcoreapp3.0
reactive;stream
https://github.com/reactive-streams/reactive-streams-dotnet
http://creativecommons.org/publicdomain/zero/1.0/
diff --git a/src/examples/Reactive.Streams.Example.Unicast.Tests/Reactive.Streams.Example.Unicast.Tests.csproj b/src/examples/Reactive.Streams.Example.Unicast.Tests/Reactive.Streams.Example.Unicast.Tests.csproj
index f9bf5fc..d8114db 100644
--- a/src/examples/Reactive.Streams.Example.Unicast.Tests/Reactive.Streams.Example.Unicast.Tests.csproj
+++ b/src/examples/Reactive.Streams.Example.Unicast.Tests/Reactive.Streams.Example.Unicast.Tests.csproj
@@ -1,7 +1,7 @@
-
+
Reactive.Streams.Example.Unicast.Tests
- net45
+ net45;netcoreapp3.0
win7-x64
@@ -13,8 +13,8 @@
-
-
+
+
diff --git a/src/examples/Reactive.Streams.Example.Unicast/AsyncIterablePublisher.cs b/src/examples/Reactive.Streams.Example.Unicast/AsyncIterablePublisher.cs
index 2564b2c..8388759 100644
--- a/src/examples/Reactive.Streams.Example.Unicast/AsyncIterablePublisher.cs
+++ b/src/examples/Reactive.Streams.Example.Unicast/AsyncIterablePublisher.cs
@@ -1,12 +1,13 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
+using System.Diagnostics;
using System.Threading.Tasks;
namespace Reactive.Streams.Example.Unicast
{
public class AsyncIterablePublisher : IPublisher
- {
+ {
// These represent the protocol of the `AsyncIterablePublishers` SubscriptionImpls
private interface ISignal { }
@@ -61,7 +62,7 @@ public AsyncIterablePublisher(IEnumerable elements) : this(elements, DefaultB
private AsyncIterablePublisher(IEnumerable elements, int batchSize)
{
- if(elements == null)
+ if (elements == null)
throw new ArgumentNullException(nameof(elements));
if (batchSize < 1)
throw new ArgumentNullException(nameof(batchSize), "batchSize must be greater than zero!");
@@ -121,13 +122,13 @@ public SubscriptionImplementation(IEnumerable elements, int batchSize, ISubsc
if (_inboundSignals.TryDequeue(out signal) && !_cancelled) // to make sure that we follow rule 1.8, 3.6 and 3.7
{
// Below we simply unpack the `Signal`s and invoke the corresponding method
- if(signal is RequestSignal)
+ if (signal is RequestSignal)
DoRequest(((RequestSignal)signal).N);
else if (signal == SendSignal.Instance)
DoSend();
- else if(signal == CancelSignal.Instance)
+ else if (signal == CancelSignal.Instance)
DoCancel();
- else if(signal == SubscribeSignal.Instance)
+ else if (signal == SubscribeSignal.Instance)
DoSubscribe();
}
}
@@ -144,12 +145,12 @@ public SubscriptionImplementation(IEnumerable elements, int batchSize, ISubsc
private void DoRequest(long n)
{
if (n < 1)
- TerminateDueTo(new ArgumentException(_subscriber +" violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements"));
+ TerminateDueTo(new ArgumentException(_subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements"));
else if (_demand + n < 1)
{
// As governed by rule 3.17, when demand overflows `long.MaxValue` we treat the signalled demand as "effectively unbounded"
_demand = long.MaxValue;
- // Here we protect from the overflow and treat it as "effectively unbounded"
+ // Here we protect from the overflow and treat it as "effectively unbounded"
DoSend(); // Then we proceed with sending data downstream
}
else
@@ -250,13 +251,13 @@ private void DoSend()
try
{
next = _enumerator.Current;
- // We have already checked `MoveNext` when subscribing, so we can fall back to testing -after- `Current` is called.
+ // We have already checked `MoveNext` when subscribing, so we can fall back to testing -after- `Current` is called.
hasNext = _enumerator.MoveNext(); // Need to keep track of End-of-Stream
}
catch (Exception ex)
{
TerminateDueTo(ex);
- // If `Current` or `MoveNext` throws (they can, since it is user-provided), we need to treat the stream as errored as per rule 1.4
+ // If `Current` or `MoveNext` throws (they can, since it is user-provided), we need to treat the stream as errored as per rule 1.4
return;
}
diff --git a/src/examples/Reactive.Streams.Example.Unicast/AtomicBoolean.cs b/src/examples/Reactive.Streams.Example.Unicast/AtomicBoolean.cs
index f721e60..c5f856b 100644
--- a/src/examples/Reactive.Streams.Example.Unicast/AtomicBoolean.cs
+++ b/src/examples/Reactive.Streams.Example.Unicast/AtomicBoolean.cs
@@ -30,8 +30,7 @@ public bool Value
{
get
{
- Interlocked.MemoryBarrier();
- return _value == TrueValue;
+ return Volatile.Read(ref _value) == TrueValue;
}
set
{
diff --git a/src/examples/Reactive.Streams.Example.Unicast/Reactive.Streams.Example.Unicast.csproj b/src/examples/Reactive.Streams.Example.Unicast/Reactive.Streams.Example.Unicast.csproj
index 84be131..eca2707 100644
--- a/src/examples/Reactive.Streams.Example.Unicast/Reactive.Streams.Example.Unicast.csproj
+++ b/src/examples/Reactive.Streams.Example.Unicast/Reactive.Streams.Example.Unicast.csproj
@@ -2,7 +2,7 @@
Reactive.Streams.Example.Unicast
- netstandard1.4;net45
+ netstandard1.4;net45;netcoreapp3.0
true
diff --git a/src/tck/Reactive.Streams.TCK.Tests/Reactive.Streams.TCK.Tests.csproj b/src/tck/Reactive.Streams.TCK.Tests/Reactive.Streams.TCK.Tests.csproj
index 4444511..87a6d78 100644
--- a/src/tck/Reactive.Streams.TCK.Tests/Reactive.Streams.TCK.Tests.csproj
+++ b/src/tck/Reactive.Streams.TCK.Tests/Reactive.Streams.TCK.Tests.csproj
@@ -1,7 +1,7 @@
-
+
Reactive.Streams.TCK.Tests
- net45
+ net45;netcoreapp3.0
win7-x64
@@ -13,8 +13,8 @@
-
-
+
+
diff --git a/src/tck/Reactive.Streams.TCK/PublisherVerification.cs b/src/tck/Reactive.Streams.TCK/PublisherVerification.cs
index 4d3e1ca..af5dc03 100644
--- a/src/tck/Reactive.Streams.TCK/PublisherVerification.cs
+++ b/src/tck/Reactive.Streams.TCK/PublisherVerification.cs
@@ -129,7 +129,7 @@ public void Required_createPublisher1MustProduceAStreamOfExactly1Element()
ActivePublisherTest(1, true, publisher =>
{
var subscriber = _environment.NewManualSubscriber(publisher);
- Assert.True(requestNextElementOrEndOfStream(publisher, subscriber).HasValue, $"Publisher {publisher} produced no elements");
+ InternalAssertTrue(requestNextElementOrEndOfStream(publisher, subscriber).HasValue, $"Publisher {publisher} produced no elements");
subscriber.RequestEndOfStream();
});
}
@@ -145,20 +145,20 @@ public void Required_createPublisher3MustProduceAStreamOfExactly3Elements()
ActivePublisherTest(3, true, publisher =>
{
var subscriber = _environment.NewManualSubscriber(publisher);
- Assert.True(requestNextElementOrEndOfStream(publisher, subscriber).HasValue, $"Publisher {publisher} produced no elements");
- Assert.True(requestNextElementOrEndOfStream(publisher, subscriber).HasValue, $"Publisher {publisher} produced only 1 element");
- Assert.True(requestNextElementOrEndOfStream(publisher, subscriber).HasValue, $"Publisher {publisher} produced only 3 element");
+ InternalAssertTrue(requestNextElementOrEndOfStream(publisher, subscriber).HasValue, $"Publisher {publisher} produced no elements");
+ InternalAssertTrue(requestNextElementOrEndOfStream(publisher, subscriber).HasValue, $"Publisher {publisher} produced only 1 element");
+ InternalAssertTrue(requestNextElementOrEndOfStream(publisher, subscriber).HasValue, $"Publisher {publisher} produced only 3 element");
subscriber.RequestEndOfStream();
});
}
[Test]
public void Required_validate_maxElementsFromPublisher()
- => Assert.True(MaxElementsFromPublisher >= 0, "maxElementsFromPublisher MUST return a number >= 0");
+ => InternalAssertTrue(MaxElementsFromPublisher >= 0, "maxElementsFromPublisher MUST return a number >= 0");
[Test]
public void Required_validate_boundedDepthOfOnNextAndRequestRecursion()
- => Assert.True(BoundedDepthOfOnNextAndRequestRecursion >= 1, "boundedDepthOfOnNextAndRequestRecursion must return a number >= 1");
+ => InternalAssertTrue(BoundedDepthOfOnNextAndRequestRecursion >= 1, "boundedDepthOfOnNextAndRequestRecursion must return a number >= 1");
////////////////////// SPEC RULE VERIFICATION ///////////////////////////////
@@ -642,8 +642,8 @@ public void Optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequen
check2.Add(z2);
check2.Add(z3);
- Assert.AreEqual(r, check1, $"Publisher {publisher} did not produce the same element sequence for subscribers 1 and 2");
- Assert.AreEqual(r, check2, $"Publisher {publisher} did not produce the same element sequence for subscribers 1 and 3");
+ InternalAssertAreEqual(r, check1, $"Publisher {publisher} did not produce the same element sequence for subscribers 1 and 2");
+ InternalAssertAreEqual(r, check2, $"Publisher {publisher} did not produce the same element sequence for subscribers 1 and 3");
});
@@ -670,10 +670,18 @@ public void Optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequen
// a similar test *with* completion checking is implemented
- Assert.AreEqual(received1, received2, "Expected elements to be signaled in the same sequence to 1st and 2nd subscribers");
- Assert.AreEqual(received2, received3, "Expected elements to be signaled in the same sequence to 2st and 3nd subscribers");
+ InternalAssertAreEqual(received1, received2, "Expected elements to be signaled in the same sequence to 1st and 2nd subscribers");
+ InternalAssertAreEqual(received2, received3, "Expected elements to be signaled in the same sequence to 2st and 3nd subscribers");
});
+ private void InternalAssertAreEqual(object one, object two, string message)
+ {
+ if (!object.Equals(one, two))
+ {
+ throw new AssertionException(message + "\r\nExpected: " + one + "\r\nBut was: " + two);
+ }
+ }
+
// Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#1.11
[Test]
public void Optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequenceToAllOfItsSubscribersWhenRequestingManyUpfrontAndCompleteAsExpected()
@@ -697,8 +705,8 @@ public void Optional_spec111_multicast_mustProduceTheSameElementsInTheSameSequen
sub2.ExpectCompletion();
sub3.ExpectCompletion();
- Assert.AreEqual(received1, received2, "Expected elements to be signaled in the same sequence to 1st and 2nd subscribers");
- Assert.AreEqual(received2, received3, "Expected elements to be signaled in the same sequence to 2st and 3nd subscribers");
+ InternalAssertAreEqual(received1, received2, "Expected elements to be signaled in the same sequence to 1st and 2nd subscribers");
+ InternalAssertAreEqual(received2, received3, "Expected elements to be signaled in the same sequence to 2st and 3nd subscribers");
});
///////////////////// SUBSCRIPTION TESTS //////////////////////////////////
@@ -965,7 +973,7 @@ public void Required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling
}
// if the Publisher tries to emit more elements than was requested (and/or ignores cancellation) this will throw
- Assert.True(onNextSignalled <= totalDemand,
+ InternalAssertTrue(onNextSignalled <= totalDemand,
$"Publisher signalled {onNextSignalled} elements, which is more than the signalled demand: {totalDemand}");
} while (stillbeeingSignalled);
});
@@ -973,6 +981,14 @@ public void Required_spec312_cancelMustMakeThePublisherToEventuallyStopSignaling
_environment.VerifyNoAsyncErrorsNoDelay();
}
+ private void InternalAssertTrue(bool condition, string message)
+ {
+ if (!condition)
+ {
+ throw new AssertionException(message);
+ }
+ }
+
// Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#3.13
[Test]
public void Required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber()
diff --git a/src/tck/Reactive.Streams.TCK/Reactive.Streams.TCK.csproj b/src/tck/Reactive.Streams.TCK/Reactive.Streams.TCK.csproj
index 1f9e4da..2bda278 100644
--- a/src/tck/Reactive.Streams.TCK/Reactive.Streams.TCK.csproj
+++ b/src/tck/Reactive.Streams.TCK/Reactive.Streams.TCK.csproj
@@ -6,7 +6,7 @@
CC0 1.0 Universal
1.0.3
Reactive Streams
- net45
+ net45;netcoreapp3.0
reactive;stream
https://github.com/reactive-streams/reactive-streams-dotnet
http://creativecommons.org/publicdomain/zero/1.0/
@@ -19,7 +19,7 @@
-
+
diff --git a/src/tck/Reactive.Streams.TCK/SubscriberBlackboxVerification.cs b/src/tck/Reactive.Streams.TCK/SubscriberBlackboxVerification.cs
index a093679..7f7baeb 100644
--- a/src/tck/Reactive.Streams.TCK/SubscriberBlackboxVerification.cs
+++ b/src/tck/Reactive.Streams.TCK/SubscriberBlackboxVerification.cs
@@ -301,7 +301,7 @@ public void Required_spec213_blackbox_onSubscribe_mustThrowNullPointerExceptionW
gotNpe = true;
}
- Assert.True(gotNpe, "OnSubscribe(null) did not throw ArgumentNullException");
+ InternalAssertTrue(gotNpe, "OnSubscribe(null) did not throw ArgumentNullException");
Environment.VerifyNoAsyncErrorsNoDelay();
});
@@ -327,7 +327,7 @@ public void Required_spec213_blackbox_onNext_mustThrowNullPointerExceptionWhenPa
gotNpe = true;
}
- Assert.True(gotNpe, "OnNext(null) did not throw ArgumentNullException");
+ InternalAssertTrue(gotNpe, "OnNext(null) did not throw ArgumentNullException");
Environment.VerifyNoAsyncErrorsNoDelay();
});
@@ -349,10 +349,18 @@ public void Required_spec213_blackbox_onError_mustThrowNullPointerExceptionWhenP
gotNpe = true;
}
- Assert.True(gotNpe, "OnError(null) did not throw ArgumentNullException");
+ InternalAssertTrue(gotNpe, "OnError(null) did not throw ArgumentNullException");
Environment.VerifyNoAsyncErrorsNoDelay();
});
+ private void InternalAssertTrue(bool condition, string message)
+ {
+ if (!condition)
+ {
+ throw new AssertionException(message);
+ }
+ }
+
private sealed class Spec213DummySubscription : ISubscription
{
public void Request(long n)
diff --git a/src/tck/Reactive.Streams.TCK/TestEnvironment.cs b/src/tck/Reactive.Streams.TCK/TestEnvironment.cs
index 4f9829f..627fadf 100644
--- a/src/tck/Reactive.Streams.TCK/TestEnvironment.cs
+++ b/src/tck/Reactive.Streams.TCK/TestEnvironment.cs
@@ -145,14 +145,7 @@ public static long EnvironmentDefaultNoSignalsTimeoutMilliseconds()
///
public void Flop(string message)
{
- try
- {
- Assert.Fail(message);
- }
- catch (Exception ex)
- {
- AsyncErrors.Enqueue(ex);
- }
+ AsyncErrors.Enqueue(new AssertionException(message));
}
///
@@ -169,14 +162,7 @@ public void Flop(string message)
///
public void Flop(Exception exception, string message)
{
- try
- {
- Assert.Fail(message, exception);
- }
- catch (Exception)
- {
- AsyncErrors.Enqueue(exception);
- }
+ AsyncErrors.Enqueue(exception);
}
///
@@ -193,14 +179,7 @@ public void Flop(Exception exception, string message)
///
public void Flop(Exception exception)
{
- try
- {
- Assert.Fail(exception.Message, exception);
- }
- catch (Exception)
- {
- AsyncErrors.Enqueue(exception);
- }
+ AsyncErrors.Enqueue(exception);
}
///
@@ -217,17 +196,9 @@ public void Flop(Exception exception)
///
public T FlopAndFail(string message)
{
- try
- {
- Assert.Fail(message);
- }
- catch (Exception ex)
- {
- AsyncErrors.Enqueue(ex);
- Assert.Fail(message, ex);
- }
-
- return default(T); // unreachable, the previous block will always exit by throwing
+ var ae = new AssertionException(message);
+ AsyncErrors.Enqueue(ae);
+ throw new AssertionException(message, ae);
}
@@ -307,7 +278,7 @@ public void VerifyNoAsyncErrorsNoDelay()
if (exception != null)
throw exception;
- Assert.Fail($"Async error during test execution: {error.Message}", error);
+ throw new AssertionException($"Async error during test execution: {error.Message}", error);
}
}
@@ -738,8 +709,17 @@ public void ExpectCancelling(long timeoutMilliseconds) =>
public class Promise
{
private readonly TestEnvironment _environment;
- private readonly BlockingCollection _blockingCollection = new BlockingCollection();
- private Option _value;
+ private readonly CountdownEvent cde = new CountdownEvent(1);
+ private PromiseNode _node;
+
+ private sealed class PromiseNode
+ {
+ internal readonly T value;
+ internal PromiseNode(T value)
+ {
+ this.value = value;
+ }
+ }
public Promise(TestEnvironment environment)
{
@@ -757,20 +737,33 @@ public T Value
{
get
{
- if (IsCompleted())
- return _value.Value;
+ var n = Volatile.Read(ref _node);
+ if (n != null)
+ return n.value;
_environment.Flop("Cannot access promise value before completion");
return default(T);
}
}
- public bool IsCompleted() => _value.HasValue;
-
+ public bool IsCompleted()
+ {
+ return Volatile.Read(ref _node) != null;
+ }
///
/// Allows using ExpectCompletion to await for completion of the value and complete it _then_
///
- public void Complete(T value) => _blockingCollection.Add(value);
+ public void Complete(T value)
+ {
+ if (Interlocked.CompareExchange(ref _node, new PromiseNode(value), null) == null)
+ {
+ cde.Signal();
+ }
+ else
+ {
+ _environment.Flop("Already completed");
+ }
+ }
///
/// Completes the promise right away, it is not possible to ExpectCompletion on a Promise completed this way
@@ -778,18 +771,19 @@ public T Value
public void CompleteImmediatly(T value)
{
Complete(value); // complete!
- _value = value; // immediatly!
}
public void ExpectCompletion(long timeoutMilliseconds, string errorMessage)
{
if (!IsCompleted())
{
- T value;
- if (!_blockingCollection.TryTake(out value, TimeSpan.FromMilliseconds(timeoutMilliseconds)))
- _environment.Flop($"{errorMessage} within {timeoutMilliseconds} ms");
- else
- _value = value;
+ if (!cde.Wait(TimeSpan.FromMilliseconds(timeoutMilliseconds)))
+ {
+ if (!IsCompleted())
+ {
+ _environment.Flop($"{errorMessage} within {timeoutMilliseconds} ms");
+ }
+ }
}
}
}
@@ -883,7 +877,10 @@ public void ExpectCompletion(long timeoutMilliseconds, string errorMessage)
public E ExpectError(long timeoutMilliseconds, string errorMessage) where E : Exception
{
- Thread.Sleep(TimeSpan.FromMilliseconds(timeoutMilliseconds));
+ if (_environment.AsyncErrors.IsEmpty)
+ {
+ Thread.Sleep(TimeSpan.FromMilliseconds(timeoutMilliseconds));
+ }
if (_environment.AsyncErrors.IsEmpty)
return _environment.FlopAndFail($"{errorMessage} within {timeoutMilliseconds} ms");