Skip to content

Commit 4d0f062

Browse files
committed
Converts the dispatcher service from APM to TAP
1 parent 7c9be66 commit 4d0f062

File tree

1 file changed

+22
-71
lines changed

1 file changed

+22
-71
lines changed

src/ServiceBusRelayHost/DispatcherService.cs

+22-71
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ namespace WebApi.Explorations.ServiceBusIntegration
1919
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple)]
2020
internal class DispatcherService
2121
{
22-
//private readonly HttpServer _server;
2322
private readonly HttpMessageInvoker _serverInvoker;
2423
private readonly HttpServiceBusConfiguration _config;
2524

@@ -42,27 +41,22 @@ public DispatcherService(HttpServer server, HttpServiceBusConfiguration config)
4241
"Expires",
4342
"Last-Modified"
4443
};
45-
4644

47-
4845
[WebGet(UriTemplate = "*")]
4946
[OperationContract(AsyncPattern = true)]
50-
public IAsyncResult BeginGet(AsyncCallback callback, object state)
47+
public async Task<Message> GetAsync()
5148
{
5249
var context = WebOperationContext.Current;
53-
return DispatchToHttpServer(context.IncomingRequest, null, context.OutgoingResponse, _config.BufferRequestContent, callback, state);
54-
}
55-
56-
public Message EndGet(IAsyncResult ar)
57-
{
58-
var t = ar as Task<Stream>;
59-
var stream = t.Result;
50+
var request = MakeHttpRequestMessageFrom(context.IncomingRequest, null, _config.BufferRequestContent);
51+
var response = await _serverInvoker.SendAsync(request, CancellationToken.None);
52+
CopyHttpResponseMessageToOutgoingResponse(response, context.OutgoingResponse);
53+
var stream = response.Content != null ? await response.Content.ReadAsStreamAsync() : null;
6054
return StreamMessageHelper.CreateMessage(MessageVersion.None, "GETRESPONSE", stream ?? new MemoryStream());
6155
}
62-
56+
6357
[WebInvoke(UriTemplate = "*", Method = "*")]
6458
[OperationContract(AsyncPattern = true)]
65-
public IAsyncResult BeginInvoke(Message msg, AsyncCallback callback, object state)
59+
public async Task<Message> InvokeAsync(Message msg)
6660
{
6761
var context = WebOperationContext.Current;
6862
object value;
@@ -74,71 +68,29 @@ public IAsyncResult BeginInvoke(Message msg, AsyncCallback callback, object stat
7468
{
7569
s = StreamMessageHelper.GetStream(msg);
7670
}
77-
}else{
78-
var ms = new MemoryStream();
79-
using(var xw = XmlDictionaryWriter.CreateTextWriter(ms, Encoding.UTF8, false))
80-
{
81-
msg.WriteBodyContents(xw);
82-
}
83-
ms.Seek(0, SeekOrigin.Begin);
84-
s = ms;
71+
}
72+
else
73+
{
74+
var ms = new MemoryStream();
75+
using (var xw = XmlDictionaryWriter.CreateTextWriter(ms, Encoding.UTF8, false))
76+
{
77+
msg.WriteBodyContents(xw);
8578
}
86-
return DispatchToHttpServer(context.IncomingRequest, s, context.OutgoingResponse, _config.BufferRequestContent, callback, state);
87-
}
88-
89-
public Message EndInvoke(IAsyncResult ar)
90-
{
91-
var t = ar as Task<Stream>;
92-
var stream = t.Result;
79+
ms.Seek(0, SeekOrigin.Begin);
80+
s = ms;
81+
}
82+
var request = MakeHttpRequestMessageFrom(context.IncomingRequest, s, _config.BufferRequestContent);
83+
var response = await _serverInvoker.SendAsync(request, CancellationToken.None);
84+
CopyHttpResponseMessageToOutgoingResponse(response, context.OutgoingResponse);
85+
var stream = response.Content != null ? await response.Content.ReadAsStreamAsync() : null;
9386
return StreamMessageHelper.CreateMessage(MessageVersion.None, "GETRESPONSE", stream ?? new MemoryStream());
9487
}
9588

96-
private IAsyncResult DispatchToHttpServer(
97-
IncomingWebRequestContext incomingRequest,
98-
Stream body,
99-
OutgoingWebResponseContext outgoingResponse,
100-
bool bufferBody,
101-
AsyncCallback callback,
102-
object state)
103-
{
104-
var request = MakeHttpRequestMessageFrom(incomingRequest, body, bufferBody);
105-
var tcs = new TaskCompletionSource<Stream>(state);
106-
_serverInvoker.SendAsync(request, new CancellationToken())
107-
.ContinueWith(t =>
108-
{
109-
var response = t.Result;
110-
CopyHttpResponseMessageToOutgoingResponse(response, outgoingResponse);
111-
Action<Task<Stream>> complete = (t2) =>
112-
{
113-
if (t2.IsFaulted)
114-
tcs.TrySetException(
115-
t2.Exception.InnerExceptions);
116-
else if (t2.IsCanceled) tcs.TrySetCanceled();
117-
else tcs.TrySetResult(t2.Result);
118-
119-
if (callback != null) callback(tcs.Task);
120-
};
121-
122-
if (response.Content == null)
123-
{
124-
tcs.TrySetResult(null);
125-
if (callback != null) callback(tcs.Task);
126-
}
127-
else
128-
{
129-
response.Content.ReadAsStreamAsync()
130-
.ContinueWith(complete);
131-
}
132-
});
133-
return tcs.Task;
134-
}
135-
136-
private static HttpRequestMessage MakeHttpRequestMessageFrom(IncomingWebRequestContext oreq, Stream body, bool bufferBody)
89+
private static HttpRequestMessage MakeHttpRequestMessageFrom(IncomingWebRequestContext oreq, Stream body, bool bufferBody)
13790
{
13891
var nreq = new HttpRequestMessage(new HttpMethod(oreq.Method), oreq.UriTemplateMatch.RequestUri);
13992
foreach (var name in oreq.Headers.AllKeys.Where(name => !_httpContentHeaders.Contains(name)))
14093
{
141-
//nreq.Headers.TryAddWithoutValidation(name, oreq.Headers.Get(name).Split(',').Select(s => s.Trim()));
14294
nreq.Headers.TryAddWithoutValidation(name, oreq.Headers.Get(name));
14395
}
14496
if (body != null)
@@ -155,7 +107,6 @@ private static HttpRequestMessage MakeHttpRequestMessageFrom(IncomingWebRequestC
155107

156108
foreach (var name in oreq.Headers.AllKeys.Where(name => _httpContentHeaders.Contains(name)))
157109
{
158-
//nreq.Content.Headers.TryAddWithoutValidation(name, oreq.Headers.Get(name).Split(',').Select(s => s.Trim()));
159110
nreq.Content.Headers.TryAddWithoutValidation(name, oreq.Headers.Get(name));
160111
}
161112
}

0 commit comments

Comments
 (0)