diff --git a/GrpcDotNetNamedPipes.PerfTests/GrpcDotNetNamedPipes.PerfTests.csproj b/GrpcDotNetNamedPipes.PerfTests/GrpcDotNetNamedPipes.PerfTests.csproj
index 354a359..0bec2b2 100644
--- a/GrpcDotNetNamedPipes.PerfTests/GrpcDotNetNamedPipes.PerfTests.csproj
+++ b/GrpcDotNetNamedPipes.PerfTests/GrpcDotNetNamedPipes.PerfTests.csproj
@@ -10,8 +10,11 @@
-
-
+
+
+ all
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+
diff --git a/GrpcDotNetNamedPipes.Tests/GrpcDotNetNamedPipes.Tests.csproj b/GrpcDotNetNamedPipes.Tests/GrpcDotNetNamedPipes.Tests.csproj
index 4876e8a..e6d16f5 100644
--- a/GrpcDotNetNamedPipes.Tests/GrpcDotNetNamedPipes.Tests.csproj
+++ b/GrpcDotNetNamedPipes.Tests/GrpcDotNetNamedPipes.Tests.csproj
@@ -9,14 +9,17 @@
-
-
-
-
-
+
+
+
+
+
-
-
+
+
+ all
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+
diff --git a/GrpcDotNetNamedPipes.Tests/Helpers/NamedPipeChannelContextFactory.cs b/GrpcDotNetNamedPipes.Tests/Helpers/NamedPipeChannelContextFactory.cs
index b69d0d9..5572c06 100644
--- a/GrpcDotNetNamedPipes.Tests/Helpers/NamedPipeChannelContextFactory.cs
+++ b/GrpcDotNetNamedPipes.Tests/Helpers/NamedPipeChannelContextFactory.cs
@@ -24,7 +24,7 @@ public class NamedPipeChannelContextFactory : ChannelContextFactory
public ChannelContext Create(NamedPipeServerOptions options, ITestOutputHelper output)
{
var impl = new TestServiceImpl();
- var server = new NamedPipeServer(_pipeName, options, output != null ? output.WriteLine : null);
+ var server = new NamedPipeServer(_pipeName, options, output != null ? output.WriteLine : null, output != null ? output.WriteLine : null);
TestService.BindService(server.ServiceBinder, impl);
server.Start();
return new ChannelContext
@@ -44,6 +44,7 @@ public override TestService.TestServiceClient CreateClient(ITestOutputHelper out
{
var channel = new NamedPipeChannel(".", _pipeName,
new NamedPipeChannelOptions { ConnectionTimeout = _connectionTimeout },
+ output != null ? output.WriteLine : null,
output != null ? output.WriteLine : null);
channel.PipeCallback = PipeCallback;
return new TestService.TestServiceClient(channel);
diff --git a/GrpcDotNetNamedPipes/GrpcDotNetNamedPipes.csproj b/GrpcDotNetNamedPipes/GrpcDotNetNamedPipes.csproj
index 50b11b1..057db92 100644
--- a/GrpcDotNetNamedPipes/GrpcDotNetNamedPipes.csproj
+++ b/GrpcDotNetNamedPipes/GrpcDotNetNamedPipes.csproj
@@ -5,8 +5,8 @@
11
true
- 2.1.0
- 2.1.0
+ 2.2.0
+ 2.2.0
2.0.0.0
Ben Olden-Cooligan
@@ -25,9 +25,9 @@
-
-
-
+
+
+
diff --git a/GrpcDotNetNamedPipes/Internal/Helpers/ConnectionLogger.cs b/GrpcDotNetNamedPipes/Internal/Helpers/ConnectionLogger.cs
index c1c183a..e492ce1 100644
--- a/GrpcDotNetNamedPipes/Internal/Helpers/ConnectionLogger.cs
+++ b/GrpcDotNetNamedPipes/Internal/Helpers/ConnectionLogger.cs
@@ -21,15 +21,17 @@ internal class ConnectionLogger
private static int _lastId;
private static int NextId() => Interlocked.Increment(ref _lastId);
- public static ConnectionLogger Client(Action log) => new(log, "CLIENT", log != null ? NextId() : 0);
- public static ConnectionLogger Server(Action log) => new(log, "SERVER", 0);
+ public static ConnectionLogger Client(Action traceLog, Action errorLog) => new(traceLog, errorLog, "CLIENT", traceLog != null ? NextId() : 0);
+ public static ConnectionLogger Server(Action traceLog, Action errorLog) => new(traceLog, errorLog, "SERVER", 0);
- private readonly Action _log;
+ private readonly Action _traceLog;
+ private readonly Action _errorLog;
private readonly string _type;
- private ConnectionLogger(Action log, string type, int id)
+ private ConnectionLogger(Action traceLog, Action errorLog, string type, int id)
{
- _log = log;
+ _traceLog = traceLog;
+ _errorLog = errorLog;
_type = type;
ConnectionId = id;
}
@@ -38,8 +40,15 @@ private ConnectionLogger(Action log, string type, int id)
public void Log(string message)
{
- if (_log == null) return;
+ if (_traceLog == null) return;
var id = ConnectionId > 0 ? ConnectionId.ToString() : "?";
- _log($"[{_type}][{id}] {message}");
+ _traceLog($"[{_type}][{id}] {message}");
+ }
+
+ public void LogError(string message)
+ {
+ if(_errorLog == null) return;
+ var id = ConnectionId > 0 ? ConnectionId.ToString() : "?";
+ _errorLog($"[{_type}][{id}] {message}");
}
}
\ No newline at end of file
diff --git a/GrpcDotNetNamedPipes/Internal/PipeReader.cs b/GrpcDotNetNamedPipes/Internal/PipeReader.cs
index dce42c5..7ca30c5 100644
--- a/GrpcDotNetNamedPipes/Internal/PipeReader.cs
+++ b/GrpcDotNetNamedPipes/Internal/PipeReader.cs
@@ -49,11 +49,11 @@ await _transport.Read(_messageHandler).ConfigureAwait(false))
}
catch (EndOfPipeException)
{
- _logger.Log("End of pipe");
+ _logger.LogError("End of pipe");
}
catch (Exception error)
{
- _logger.Log("Pipe read error");
+ _logger.LogError("Pipe read error");
_onError?.Invoke(error);
}
finally
diff --git a/GrpcDotNetNamedPipes/Internal/ServerConnectionContext.cs b/GrpcDotNetNamedPipes/Internal/ServerConnectionContext.cs
index 812536b..c6a5603 100644
--- a/GrpcDotNetNamedPipes/Internal/ServerConnectionContext.cs
+++ b/GrpcDotNetNamedPipes/Internal/ServerConnectionContext.cs
@@ -21,9 +21,10 @@ internal class ServerConnectionContext : TransportMessageHandler, IDisposable
private readonly ConnectionLogger _logger;
private readonly Dictionary> _methodHandlers;
private readonly PayloadQueue _payloadQueue;
-
+ private readonly TaskFactory _taskFactory;
+
public ServerConnectionContext(NamedPipeServerStream pipeStream, ConnectionLogger logger,
- Dictionary> methodHandlers)
+ Dictionary> methodHandlers, TaskFactory taskFactory)
{
CallContext = new NamedPipeCallContext(this);
PipeStream = pipeStream;
@@ -32,6 +33,7 @@ public ServerConnectionContext(NamedPipeServerStream pipeStream, ConnectionLogge
_methodHandlers = methodHandlers;
_payloadQueue = new PayloadQueue();
CancellationTokenSource = new CancellationTokenSource();
+ _taskFactory = taskFactory;
}
public NamedPipeServerStream PipeStream { get; }
@@ -58,11 +60,13 @@ public IServerStreamWriter CreateResponseStream(Marshaller
return new ResponseStreamWriterImpl(Transport, CancellationToken.None, responseMarshaller,
() => IsCompleted);
}
-
+
public override void HandleRequestInit(string methodFullName, DateTime? deadline)
{
Deadline = new Deadline(deadline);
- Task.Run(async () => await _methodHandlers[methodFullName](this).ConfigureAwait(false));
+ // Note the use of .ConfigureAwait(false) here...
+ // https://blog.stephencleary.com/2012/07/dont-block-on-async-code.html
+ _taskFactory.StartNew(async () => await _methodHandlers[methodFullName](this).ConfigureAwait(false));
}
public override void HandleHeaders(Metadata headers) => RequestHeaders = headers;
@@ -75,22 +79,31 @@ public override void HandleRequestInit(string methodFullName, DateTime? deadline
public void Error(Exception ex)
{
- _logger.Log("RPC error");
IsCompleted = true;
if (Deadline != null && Deadline.IsExpired)
{
+ _logger.LogError("RPC Warning: Deadline Exceeded");
WriteTrailers(StatusCode.DeadlineExceeded, "");
}
else if (CancellationTokenSource.IsCancellationRequested)
{
+
+ _logger.LogError("RPC Warning: Cancelled");
WriteTrailers(StatusCode.Cancelled, "");
}
else if (ex is RpcException rpcException)
{
+
+ _logger.LogError($"RPC Exception: {rpcException.Message} at {rpcException.StackTrace}");
+ if (rpcException.InnerException != null)
+ _logger.LogError($"Inner exception: {rpcException.InnerException.Message} at {rpcException.InnerException.StackTrace}");
WriteTrailers(rpcException.StatusCode, rpcException.Status.Detail);
}
else
{
+ _logger.LogError($"RPC Exception (unknown): {ex.Message} at {ex.StackTrace}");
+ if (ex.InnerException != null)
+ _logger.LogError($"Inner exception: {ex.InnerException.Message} at {ex.InnerException.StackTrace}");
WriteTrailers(StatusCode.Unknown, "Exception was thrown by handler.");
}
}
diff --git a/GrpcDotNetNamedPipes/Internal/ServerStreamPool.cs b/GrpcDotNetNamedPipes/Internal/ServerStreamPool.cs
index e3806f7..1318d58 100644
--- a/GrpcDotNetNamedPipes/Internal/ServerStreamPool.cs
+++ b/GrpcDotNetNamedPipes/Internal/ServerStreamPool.cs
@@ -18,7 +18,7 @@ namespace GrpcDotNetNamedPipes.Internal;
internal class ServerStreamPool : IDisposable
{
- private const int PoolSize = 4;
+ private readonly int PoolSize = 4;
private const int FallbackMin = 100;
private const int FallbackMax = 10_000;
@@ -37,6 +37,8 @@ public ServerStreamPool(string pipeName, NamedPipeServerOptions options,
_options = options;
_handleConnection = handleConnection;
_invokeError = invokeError;
+ if (options.ThreadPoolSize > 0)
+ PoolSize = options.ThreadPoolSize;
}
private NamedPipeServerStream CreatePipeServer()
@@ -161,7 +163,8 @@ private void RunHandleConnection(NamedPipeServerStream pipeServer)
try
{
await _handleConnection(pipeServer);
- pipeServer.Disconnect();
+ if (pipeServer.IsConnected)
+ pipeServer.Disconnect();
}
catch (Exception error)
{
diff --git a/GrpcDotNetNamedPipes/NamedPipeChannel.cs b/GrpcDotNetNamedPipes/NamedPipeChannel.cs
index e7c1e57..e6540dc 100644
--- a/GrpcDotNetNamedPipes/NamedPipeChannel.cs
+++ b/GrpcDotNetNamedPipes/NamedPipeChannel.cs
@@ -22,6 +22,7 @@ public class NamedPipeChannel : CallInvoker
private readonly string _pipeName;
private readonly NamedPipeChannelOptions _options;
private readonly Action _log;
+ private readonly Action _errorLog;
public NamedPipeChannel(string serverName, string pipeName)
: this(serverName, pipeName, new NamedPipeChannelOptions())
@@ -29,16 +30,17 @@ public NamedPipeChannel(string serverName, string pipeName)
}
public NamedPipeChannel(string serverName, string pipeName, NamedPipeChannelOptions options)
- : this(serverName, pipeName, options, null)
+ : this(serverName, pipeName, options, null, null)
{
}
- internal NamedPipeChannel(string serverName, string pipeName, NamedPipeChannelOptions options, Action log)
+ public NamedPipeChannel(string serverName, string pipeName, NamedPipeChannelOptions options, Action log, Action errorLog)
{
_serverName = serverName;
_pipeName = pipeName;
_options = options;
_log = log;
+ _errorLog = errorLog;
}
internal Action PipeCallback { get; set; }
@@ -62,7 +64,7 @@ private ClientConnectionContext CreateConnectionContext(
try
{
bool isServerUnary = method.Type == MethodType.Unary || method.Type == MethodType.ClientStreaming;
- var logger = ConnectionLogger.Client(_log);
+ var logger = ConnectionLogger.Client(_log, _errorLog);
var ctx = new ClientConnectionContext(stream, callOptions, isServerUnary, _options.ConnectionTimeout,
logger);
ctx.InitCall(method, request);
diff --git a/GrpcDotNetNamedPipes/NamedPipeServer.cs b/GrpcDotNetNamedPipes/NamedPipeServer.cs
index 7a396f7..a7cc28b 100644
--- a/GrpcDotNetNamedPipes/NamedPipeServer.cs
+++ b/GrpcDotNetNamedPipes/NamedPipeServer.cs
@@ -19,7 +19,9 @@ namespace GrpcDotNetNamedPipes;
public class NamedPipeServer : IDisposable
{
private readonly ServerStreamPool _pool;
+ private readonly TaskFactory _taskFactory;
private readonly Action _log;
+ private readonly Action _errorLog;
private readonly Dictionary> _methodHandlers = new();
public NamedPipeServer(string pipeName)
@@ -28,15 +30,17 @@ public NamedPipeServer(string pipeName)
}
public NamedPipeServer(string pipeName, NamedPipeServerOptions options)
- : this(pipeName, options, null)
+ : this(pipeName, options, null, null)
{
}
- internal NamedPipeServer(string pipeName, NamedPipeServerOptions options, Action log)
+ public NamedPipeServer(string pipeName, NamedPipeServerOptions options, Action log, Action errorLog)
{
_pool = new ServerStreamPool(pipeName, options, HandleConnection, InvokeError);
_log = log;
+ _errorLog = errorLog;
ServiceBinder = new ServiceBinderImpl(this);
+ _taskFactory = options.TaskFactory ?? new TaskFactory();
}
public ServiceBinderBase ServiceBinder { get; }
@@ -65,8 +69,8 @@ public void Dispose()
private async Task HandleConnection(NamedPipeServerStream pipeStream)
{
- var logger = ConnectionLogger.Server(_log);
- var ctx = new ServerConnectionContext(pipeStream, logger, _methodHandlers);
+ var logger = ConnectionLogger.Server(_log, _errorLog);
+ var ctx = new ServerConnectionContext(pipeStream, logger, _methodHandlers, _taskFactory);
await Task.Run(new PipeReader(pipeStream, ctx, logger, ctx.Dispose, InvokeError).ReadLoop);
}
diff --git a/GrpcDotNetNamedPipes/NamedPipeServerOptions.cs b/GrpcDotNetNamedPipes/NamedPipeServerOptions.cs
index 52e7461..0051e10 100644
--- a/GrpcDotNetNamedPipes/NamedPipeServerOptions.cs
+++ b/GrpcDotNetNamedPipes/NamedPipeServerOptions.cs
@@ -31,4 +31,17 @@ public class NamedPipeServerOptions
///
public PipeSecurity PipeSecurity { get; set; }
#endif
+
+ ///
+ /// Gets or sets a Custom Task Factory to control how tasks are serviced.
+ /// For example, causing threads to be processsed in FIFO rather than LIFO
+ /// by using TaskCreationOptions.preferFairness
+ ///
+ public TaskFactory TaskFactory { get; set; }
+
+ ///
+ /// Gets or sets a count of threads to use for the listener.
+ /// If you need to address a synchronous code execution issue, try increasing
+ ///
+ public int ThreadPoolSize { get; set; } = 4;
}
\ No newline at end of file