Skip to content

Commit ec9a640

Browse files
committed
feat(workflow): add support for HTTP branches interceptor in workflows
1 parent a3e4cda commit ec9a640

File tree

10 files changed

+237
-23
lines changed

10 files changed

+237
-23
lines changed

src/Dtmcli/Constant.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ internal static class Constant
44
{
55
internal const string DtmClientHttpName = "dtmClient";
66
internal const string BranchClientHttpName = "branchClient";
7-
7+
internal const string WorkflowBranchClientHttpName = "WF";
8+
89
internal static class Request
910
{
1011
internal const string CONTENT_TYPE = "application/json";

src/Dtmcli/DtmClient.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public async Task<TransGlobal> Query(string gid, CancellationToken cancellationT
148148
var client = _httpClientFactory.CreateClient(Constant.DtmClientHttpName);
149149
var response = await client.GetAsync(url, cancellationToken).ConfigureAwait(false);
150150
var dtmContent = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
151-
DtmImp.Utils.CheckStatus(response.StatusCode, dtmContent);
151+
DtmImp.Utils.CheckStatusCode(response.StatusCode);
152152
return JsonSerializer.Deserialize<TransGlobal>(dtmContent, _jsonOptions);
153153
}
154154

@@ -167,7 +167,7 @@ public async Task<string> QueryStatus(string gid, CancellationToken cancellation
167167
var client = _httpClientFactory.CreateClient(Constant.DtmClientHttpName);
168168
var response = await client.GetAsync(url, cancellationToken).ConfigureAwait(false);
169169
var dtmContent = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
170-
DtmImp.Utils.CheckStatus(response.StatusCode, dtmContent);
170+
DtmImp.Utils.CheckStatusCode(response.StatusCode);
171171
var graph = JsonSerializer.Deserialize<TransGlobalForStatus>(dtmContent, _jsonOptions);
172172
return graph.Transaction == null
173173
? string.Empty

src/Dtmcli/DtmImp/Utils.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,14 @@ public static void CheckStatus(HttpStatusCode status, string dtmResult)
4040
}
4141
}
4242

43+
public static void CheckStatusCode(HttpStatusCode status)
44+
{
45+
if (status != HttpStatusCode.OK)
46+
{
47+
throw new DtmException(string.Format(CheckStatusMsgFormat, status.ToString(), string.Empty));
48+
}
49+
}
50+
4351
/// <summary>
4452
/// OrString return the first not null or not empty string
4553
/// </summary>

src/Dtmcli/TransGlobal.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public class DtmTransaction
2828
{
2929
[JsonPropertyName("id")] public int Id { get; set; }
3030

31-
[JsonPropertyName("create_time")] public DateTimeOffset CreateTime { get; set; }
31+
[JsonPropertyName("create_time")] public DateTimeOffset? CreateTime { get; set; }
3232

3333
[JsonPropertyName("update_time")] public DateTimeOffset UpdateTime { get; set; }
3434

@@ -64,9 +64,9 @@ public class DtmBranch
6464
{
6565
[JsonPropertyName("id")] public int Id { get; set; }
6666

67-
[JsonPropertyName("create_time")] public DateTimeOffset CreateTime { get; set; }
67+
[JsonPropertyName("create_time")] public DateTimeOffset? CreateTime { get; set; }
6868

69-
[JsonPropertyName("update_time")] public DateTimeOffset UpdateTime { get; set; }
69+
[JsonPropertyName("update_time")] public DateTimeOffset? UpdateTime { get; set; }
7070

7171
[JsonPropertyName("gid")] public string Gid { get; set; }
7272

src/Dtmworkflow/ServiceCollectionExtensions.cs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ public static IServiceCollection AddDtmWorkflow(this IServiceCollection services
2323
services.TryAddSingleton<IWorkflowFactory, WorkflowFactory>();
2424
services.TryAddSingleton<WorkflowGlobalTransaction>();
2525

26+
AddHttpClient(services);
27+
2628
return services;
2729
}
2830

@@ -33,8 +35,22 @@ public static IServiceCollection AddDtmWorkflow(this IServiceCollection services
3335

3436
services.TryAddSingleton<IWorkflowFactory, WorkflowFactory>();
3537
services.TryAddSingleton<WorkflowGlobalTransaction>();
38+
39+
AddHttpClient(services);
3640

3741
return services;
3842
}
43+
44+
private static void AddHttpClient(IServiceCollection services /*, DtmOptions options*/)
45+
{
46+
services.AddHttpClient(Dtmcli.Constant.WorkflowBranchClientHttpName, client =>
47+
{
48+
// TODO DtmOptions
49+
// client.Timeout = TimeSpan.FromMilliseconds(options.BranchTimeout);
50+
}).AddHttpMessageHandler<WorkflowHttpInterceptor>();
51+
52+
// TODO how to inject workflow instance?
53+
services.AddTransient<WorkflowHttpInterceptor>();
54+
}
3955
}
40-
}
56+
}

src/Dtmworkflow/Workflow.Imp.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ internal async Task<byte[]> Process(WfFunc2 handler, byte[] data)
6464
}
6565

6666
err = Utils.GrpcError2DtmError(err);
67-
67+
6868
if (err != null && err is not DtmCommon.DtmFailureException) throw err;
6969

7070
try
@@ -193,7 +193,7 @@ private StepResult StepResultFromGrpc(IMessage reply, Exception err)
193193
return sr;
194194
}
195195

196-
private HttpResponseMessage StepResultToHttp(StepResult r)
196+
public HttpResponseMessage StepResultToHttp(StepResult r)
197197
{
198198
if (r.Error != null)
199199
{
@@ -203,7 +203,7 @@ private HttpResponseMessage StepResultToHttp(StepResult r)
203203
return Utils.NewJSONResponse(HttpStatusCode.OK, r.Data);
204204
}
205205

206-
private StepResult StepResultFromHTTP(HttpResponseMessage resp, Exception err)
206+
public StepResult StepResultFromHTTP(HttpResponseMessage resp, Exception err)
207207
{
208208
var sr = new StepResult
209209
{
@@ -212,7 +212,7 @@ private StepResult StepResultFromHTTP(HttpResponseMessage resp, Exception err)
212212

213213
if (err == null)
214214
{
215-
// HTTPResp2DtmError
215+
(sr.Data, sr.Error) = Utils.HTTPResp2DtmError(resp); // TODO go 使用了 this.Options.HTTPResp2DtmError(resp), 方便定制
216216
sr.Status = WfErrorToStatus(sr.Error);
217217
}
218218

@@ -234,9 +234,9 @@ private string WfErrorToStatus(Exception err)
234234
}
235235

236236

237-
private async Task<StepResult> RecordedDo(Func<DtmCommon.BranchBarrier, Task<StepResult>> fn)
237+
public async Task<StepResult> RecordedDo(Func<DtmCommon.BranchBarrier, Task<StepResult>> fn)
238238
{
239-
var sr = await this.RecordedDoInner(fn);
239+
StepResult sr = await this.RecordedDoInner(fn);
240240

241241
// do not compensate the failed branch if !CompensateErrorBranch
242242
if (this.Options.CompensateErrorBranch && sr.Status == DtmCommon.Constant.StatusFailed)

src/Dtmworkflow/Workflow.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
using Microsoft.Extensions.Logging;
44
using System;
55
using System.Collections.Generic;
6+
using System.Net.Http;
67
using System.Threading.Tasks;
8+
using Microsoft.Extensions.DependencyInjection;
79

810
namespace Dtmworkflow
911
{
@@ -32,7 +34,13 @@ public Workflow(IDtmClient httpClient, IDtmgRPCClient grpcClient, Dtmcli.IBranch
3234

3335
public System.Net.Http.HttpClient NewRequest()
3436
{
35-
return _httpClient.GetHttpClient("WF");
37+
if(true)
38+
return new HttpClient(new WorkflowHttpInterceptor(this));
39+
else
40+
{
41+
var client = _httpClient.GetHttpClient("WF");
42+
return client;
43+
}
3644
}
3745

3846
/// <summary>
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
using System;
2+
using System.Net.Http;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
6+
namespace Dtmworkflow;
7+
8+
internal class WorkflowHttpInterceptor : DelegatingHandler
9+
{
10+
private readonly Workflow _wf;
11+
12+
public WorkflowHttpInterceptor(Workflow wf)
13+
{
14+
this._wf = wf;
15+
InnerHandler = new HttpClientHandler();
16+
}
17+
18+
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
19+
{
20+
Func<DtmCommon.BranchBarrier, Task<StepResult>> origin = async (barrier) =>
21+
{
22+
var response = await base.SendAsync(request, cancellationToken);
23+
return _wf.StepResultFromHTTP(response, null);
24+
};
25+
26+
StepResult sr;
27+
// in phase 2, do not save, because it is saved outer
28+
if (_wf.WorkflowImp.CurrentOp != DtmCommon.Constant.OpAction)
29+
{
30+
sr = await origin(null);
31+
}
32+
else
33+
{
34+
sr = await _wf.RecordedDo(origin);
35+
}
36+
37+
return _wf.StepResultToHttp(sr);
38+
}
39+
}

tests/BusiGrpcService/Program.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
{
99
// Setup a HTTP/2 endpoint without TLS.
1010
options.ListenLocalhost(5005, o => o.Protocols = HttpProtocols.Http2);
11+
// test for workflow http branch
12+
options.ListenLocalhost(5006, o => o.Protocols = HttpProtocols.Http1);
1113
});
1214

1315
builder.Services.AddGrpc();
@@ -20,6 +22,15 @@
2022

2123
// Configure the HTTP request pipeline.
2224
app.MapGrpcService<BusiApiService>();
25+
26+
// test for workflow http branch
27+
app.MapGet("/test-http-ok1", () => "SUCCESS");
28+
app.MapGet("/test-http-ok2", () => "SUCCESS");
29+
app.MapGet("/409", context =>
30+
{
31+
context.Response.StatusCode = 409;
32+
return context.Response.WriteAsync("i am body, the http branch is 409"); // FAILURE
33+
});
2334
app.MapGet("/", () => "Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909");
2435

2536
app.Run();

0 commit comments

Comments
 (0)