Skip to content

Commit 7412a58

Browse files
authored
Merge pull request #9 from dotnetcore/master
merge
2 parents 9e9146e + 7ac5611 commit 7412a58

File tree

18 files changed

+461
-19
lines changed

18 files changed

+461
-19
lines changed

src/Surging.Core/Surging.Core.CPlatform/Configurations/ProtocolPortOptions.cs

+2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ public class ProtocolPortOptions
88

99
public int WSPort { get; set; }
1010

11+
public int GrpcPort { get; set; }
12+
1113
public int UdpPort { get; set; }
1214
}
1315
}

src/Surging.Core/Surging.Core.CPlatform/Transport/Implementation/RpcContext.cs

+13-9
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ public ConcurrentDictionary<String, Object> GetContextParameters()
1616
}
1717

1818
[MethodImpl(MethodImplOptions.AggressiveInlining)]
19-
public void SetAttachment(string key, object value)
19+
public void SetAttachment(string key,object value)
2020
{
21-
contextParameters.AddOrUpdate(key, value, (k, v) => value);
21+
contextParameters.AddOrUpdate(key, value,(k,v)=>value);
2222
}
2323

2424
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -34,21 +34,25 @@ public void SetContextParameters(ConcurrentDictionary<String, Object> contextPar
3434
this.contextParameters = contextParameters;
3535
}
3636

37-
private static ThreadLocal<RpcContext> rpcContextThreadLocal = new ThreadLocal<RpcContext>(() =>
38-
{
39-
RpcContext context = new RpcContext();
40-
context.SetContextParameters(new ConcurrentDictionary<string, object>());
41-
return context;
42-
});
37+
private static AsyncLocal<RpcContext> rpcContextThreadLocal=new AsyncLocal<RpcContext>();
4338

4439
public static RpcContext GetContext()
4540
{
41+
var context = rpcContextThreadLocal.Value;
42+
43+
if (context == null)
44+
{
45+
context = new RpcContext();
46+
context.SetContextParameters(new ConcurrentDictionary<string, object>());
47+
rpcContextThreadLocal.Value = context;
48+
}
49+
4650
return rpcContextThreadLocal.Value;
4751
}
4852

4953
public static void RemoveContext()
5054
{
51-
rpcContextThreadLocal.Dispose();
55+
rpcContextThreadLocal.Value = null;
5256
}
5357

5458
private RpcContext()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
using Autofac;
2+
using Microsoft.Extensions.Logging;
3+
using Surging.Core.CPlatform;
4+
using Surging.Core.CPlatform.Module;
5+
using Surging.Core.CPlatform.Runtime.Server;
6+
using Surging.Core.CPlatform.Runtime.Server.Implementation;
7+
using Surging.Core.Grpc.Runtime;
8+
using Surging.Core.Grpc.Runtime.Implementation;
9+
10+
namespace Surging.Core.Grpc
11+
{
12+
public class GrpcModule : EnginePartModule
13+
{
14+
public override void Initialize(AppModuleContext context)
15+
{
16+
base.Initialize(context);
17+
}
18+
19+
protected override void RegisterBuilder(ContainerBuilderWrapper builder)
20+
{
21+
builder.Register(provider =>
22+
{
23+
return new DefaultGrpcServiceEntryProvider(
24+
provider.Resolve<IServiceEntryProvider>(),
25+
provider.Resolve<ILogger<DefaultGrpcServiceEntryProvider>>(),
26+
provider.Resolve<CPlatformContainer>()
27+
);
28+
}).As(typeof(IGrpcServiceEntryProvider)).SingleInstance();
29+
if (AppConfig.ServerOptions.Protocol == CommunicationProtocol.WS)
30+
{
31+
RegisterDefaultProtocol(builder);
32+
}
33+
else if (AppConfig.ServerOptions.Protocol == CommunicationProtocol.None)
34+
{
35+
RegisterGrpcProtocol(builder);
36+
}
37+
}
38+
39+
private static void RegisterDefaultProtocol(ContainerBuilderWrapper builder)
40+
{
41+
42+
builder.Register(provider =>
43+
{
44+
return new GrpcServerMessageListener(
45+
provider.Resolve<ILogger<GrpcServerMessageListener>>(),
46+
provider.Resolve<IGrpcServiceEntryProvider>()
47+
);
48+
}).SingleInstance();
49+
builder.Register(provider =>
50+
{
51+
var messageListener = provider.Resolve<GrpcServerMessageListener>();
52+
return new DefaultServiceHost(async endPoint =>
53+
{
54+
await messageListener.StartAsync(endPoint);
55+
return messageListener;
56+
}, null);
57+
58+
}).As<IServiceHost>();
59+
}
60+
61+
private static void RegisterGrpcProtocol(ContainerBuilderWrapper builder)
62+
{
63+
builder.Register(provider =>
64+
{
65+
return new GrpcServerMessageListener(provider.Resolve<ILogger<GrpcServerMessageListener>>(),
66+
provider.Resolve<IGrpcServiceEntryProvider>()
67+
);
68+
}).SingleInstance();
69+
builder.Register(provider =>
70+
{
71+
var messageListener = provider.Resolve<GrpcServerMessageListener>();
72+
return new GrpcServiceHost(async endPoint =>
73+
{
74+
await messageListener.StartAsync(endPoint);
75+
return messageListener;
76+
});
77+
78+
}).As<IServiceHost>();
79+
}
80+
}
81+
}
82+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
using Grpc.Core;
2+
using Microsoft.Extensions.Logging;
3+
using Surging.Core.CPlatform.Messages;
4+
using Surging.Core.CPlatform.Transport;
5+
using Surging.Core.Grpc.Runtime;
6+
using System;
7+
using System.Collections.Generic;
8+
using System.Net;
9+
using System.Text;
10+
using System.Threading.Tasks;
11+
12+
namespace Surging.Core.Grpc
13+
{
14+
public class GrpcServerMessageListener: IMessageListener, IDisposable
15+
{
16+
private Server _server;
17+
private readonly ILogger<GrpcServerMessageListener> _logger;
18+
private readonly IGrpcServiceEntryProvider _grpcServiceEntryProvider;
19+
20+
public GrpcServerMessageListener(ILogger<GrpcServerMessageListener> logger,
21+
IGrpcServiceEntryProvider grpcServiceEntryProvider)
22+
{
23+
_logger = logger;
24+
_grpcServiceEntryProvider = grpcServiceEntryProvider;
25+
}
26+
public Task StartAsync(EndPoint endPoint)
27+
{
28+
var ipEndPoint = endPoint as IPEndPoint;
29+
_server = new Server() { Ports = { new ServerPort(ipEndPoint.Address.ToString(), ipEndPoint.Port, ServerCredentials.Insecure) } };
30+
31+
try
32+
{
33+
var entries = _grpcServiceEntryProvider.GetEntries();
34+
35+
var serverServiceDefinitions = new List<ServerServiceDefinition>();
36+
foreach (var entry in entries)
37+
{
38+
39+
var baseType = entry.Type.BaseType.BaseType;
40+
var definitionType = baseType?.DeclaringType;
41+
42+
var methodInfo = definitionType?.GetMethod("BindService", new Type[] { baseType });
43+
if (methodInfo != null)
44+
{
45+
var serviceDescriptor = methodInfo.Invoke(null, new object[] { entry.Behavior }) as ServerServiceDefinition;
46+
if (serviceDescriptor != null)
47+
{
48+
_server.Services.Add(serviceDescriptor);
49+
continue;
50+
}
51+
}
52+
}
53+
_server.Start();
54+
if (_logger.IsEnabled(LogLevel.Debug))
55+
_logger.LogDebug($"Grpc服务主机启动成功,监听地址:{endPoint}。");
56+
}
57+
catch
58+
{
59+
_logger.LogError($"Grpc服务主机启动失败,监听地址:{endPoint}。 ");
60+
}
61+
return Task.CompletedTask;
62+
}
63+
64+
public Server Server
65+
{
66+
get
67+
{
68+
return _server;
69+
}
70+
}
71+
72+
public event ReceivedDelegate Received;
73+
74+
public Task OnReceived(IMessageSender sender, TransportMessage message)
75+
{
76+
return Task.CompletedTask;
77+
}
78+
79+
public void Dispose()
80+
{
81+
_server.ShutdownAsync();
82+
}
83+
}
84+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
using Surging.Core.CPlatform;
2+
using Surging.Core.CPlatform.Runtime.Server.Implementation;
3+
using Surging.Core.CPlatform.Transport;
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Net;
7+
using System.Text;
8+
using System.Threading.Tasks;
9+
10+
namespace Surging.Core.Grpc
11+
{
12+
public class GrpcServiceHost : ServiceHostAbstract
13+
{
14+
#region Field
15+
16+
private readonly Func<EndPoint, Task<IMessageListener>> _messageListenerFactory;
17+
private IMessageListener _serverMessageListener;
18+
19+
#endregion Field
20+
21+
public GrpcServiceHost(Func<EndPoint, Task<IMessageListener>> messageListenerFactory) : base(null)
22+
{
23+
_messageListenerFactory = messageListenerFactory;
24+
}
25+
26+
#region Overrides of ServiceHostAbstract
27+
28+
/// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
29+
public override void Dispose()
30+
{
31+
(_serverMessageListener as IDisposable)?.Dispose();
32+
}
33+
34+
/// <summary>
35+
/// 启动主机。
36+
/// </summary>
37+
/// <param name="endPoint">主机终结点。</param>
38+
/// <returns>一个任务。</returns>
39+
public override async Task StartAsync(EndPoint endPoint)
40+
{
41+
if (_serverMessageListener != null)
42+
return;
43+
_serverMessageListener = await _messageListenerFactory(endPoint);
44+
45+
}
46+
47+
public override async Task StartAsync(string ip, int port)
48+
{
49+
if (_serverMessageListener != null)
50+
return;
51+
_serverMessageListener = await _messageListenerFactory(new IPEndPoint(IPAddress.Parse(ip), AppConfig.ServerOptions.Ports.GrpcPort));
52+
}
53+
54+
#endregion Overrides of ServiceHostAbstract
55+
}
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using Surging.Core.CPlatform.Ioc;
2+
using System;
3+
using System.Collections.Generic;
4+
using System.Text;
5+
6+
namespace Surging.Core.Grpc.Runtime
7+
{
8+
public class GrpcServiceEntry
9+
{
10+
11+
public Type Type { get; set; }
12+
13+
public IServiceBehavior Behavior { get; set; }
14+
}
15+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
5+
namespace Surging.Core.Grpc.Runtime
6+
{
7+
public interface IGrpcServiceEntryProvider
8+
{
9+
List<GrpcServiceEntry> GetEntries();
10+
}
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
using Microsoft.Extensions.Logging;
2+
using Surging.Core.CPlatform;
3+
using Surging.Core.CPlatform.Ioc;
4+
using Surging.Core.CPlatform.Runtime.Server;
5+
using Surging.Core.CPlatform.Runtime.Server.Implementation.ServiceDiscovery.Attributes;
6+
using System;
7+
using System.Collections.Generic;
8+
using System.Linq;
9+
using System.Reflection;
10+
using System.Text;
11+
12+
namespace Surging.Core.Grpc.Runtime.Implementation
13+
{
14+
public class DefaultGrpcServiceEntryProvider: IGrpcServiceEntryProvider
15+
{
16+
#region Field
17+
18+
private readonly IEnumerable<Type> _types;
19+
private readonly ILogger<DefaultGrpcServiceEntryProvider> _logger;
20+
private readonly CPlatformContainer _serviceProvider;
21+
private List<GrpcServiceEntry> _grpcServiceEntries;
22+
23+
#endregion Field
24+
25+
#region Constructor
26+
27+
public DefaultGrpcServiceEntryProvider(IServiceEntryProvider serviceEntryProvider,
28+
ILogger<DefaultGrpcServiceEntryProvider> logger,
29+
CPlatformContainer serviceProvider)
30+
{
31+
_types = serviceEntryProvider.GetTypes();
32+
_logger = logger;
33+
_serviceProvider = serviceProvider;
34+
}
35+
36+
#endregion Constructor
37+
38+
#region Implementation of IUdpServiceEntryProvider
39+
40+
/// <summary>
41+
/// 获取服务条目集合。
42+
/// </summary>
43+
/// <returns>服务条目集合。</returns>
44+
public List<GrpcServiceEntry> GetEntries()
45+
{
46+
var services = _types.ToArray();
47+
if (_grpcServiceEntries == null)
48+
{
49+
_grpcServiceEntries = new List<GrpcServiceEntry>();
50+
foreach (var service in services)
51+
{
52+
var entry = CreateServiceEntry(service);
53+
if (entry != null)
54+
{
55+
_grpcServiceEntries.Add(entry);
56+
}
57+
}
58+
if (_logger.IsEnabled(LogLevel.Debug))
59+
{
60+
_logger.LogDebug($"发现了以下grpc服务:{string.Join(",", _grpcServiceEntries.Select(i => i.Type.FullName))}。"); ;
61+
}
62+
}
63+
return _grpcServiceEntries;
64+
}
65+
66+
public GrpcServiceEntry CreateServiceEntry(Type service)
67+
{
68+
GrpcServiceEntry result = null;
69+
var objInstance = _serviceProvider.GetInstances(service);
70+
var behavior = objInstance as IServiceBehavior;
71+
if (behavior != null)
72+
result = new GrpcServiceEntry
73+
{
74+
Behavior = behavior,
75+
Type = behavior.GetType()
76+
};
77+
return result;
78+
}
79+
#endregion
80+
}
81+
}

0 commit comments

Comments
 (0)