Skip to content

Commit 6d27bd9

Browse files
authored
feat: enhance Kubernetes client with watch functionality (#1667)
* feat: enhance Kubernetes client with watch functionality * refactor: simplify watch event handling in Kubernetes client example * refactor: update Kubernetes watch functionality to use new event handling methods and add async enumerable support * fix * fix * fix: correct usage of Pod list items in client example and update Obsolete attribute formatting * fix: update client example to use correct Pod list method and improve Obsolete attribute formatting * refactor: enhance type resolution for list items in TypeHelper by adding TryGetItemTypeFromSchema method * feat: mark Watch methods as obsolete to prepare for future deprecation * fix * refactor: update WatcherExt class to internal and remove obsolete attributes; improve example method signature in Program.cs * refactor: change WatcherExt class from internal to public and mark methods as obsolete for future deprecation
1 parent ca5d9f4 commit 6d27bd9

File tree

9 files changed

+647
-90
lines changed

9 files changed

+647
-90
lines changed

examples/clientset/Program.cs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
// See https://aka.ms/new-console-template for more information
2-
using k8s;
3-
using k8s.ClientSets;
1+
using k8s;
42
using k8s.Models;
3+
using k8s.ClientSets;
54
using System.Threading.Tasks;
65

76
namespace clientset
@@ -13,7 +12,7 @@ private static async Task Main(string[] args)
1312
var config = KubernetesClientConfiguration.BuildConfigFromConfigFile();
1413
var client = new Kubernetes(config);
1514

16-
ClientSet clientSet = new ClientSet(client);
15+
var clientSet = new ClientSet(client);
1716
var list = await clientSet.CoreV1.Pod.ListAsync("default").ConfigureAwait(false);
1817
foreach (var item in list)
1918
{
@@ -22,6 +21,12 @@ private static async Task Main(string[] args)
2221

2322
var pod = await clientSet.CoreV1.Pod.GetAsync("test", "default").ConfigureAwait(false);
2423
System.Console.WriteLine(pod?.Metadata?.Name);
24+
25+
var watch = clientSet.CoreV1.Pod.WatchListAsync("default");
26+
await foreach (var (_, item) in watch.ConfigureAwait(false))
27+
{
28+
System.Console.WriteLine(item.Metadata.Name);
29+
}
2530
}
2631
}
27-
}
32+
}

examples/watch/Program.cs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using k8s;
2-
using k8s.Models;
32
using System;
43
using System.Threading;
54
using System.Threading.Tasks;
@@ -8,9 +7,10 @@
87

98
IKubernetes client = new Kubernetes(config);
109

11-
var podlistResp = client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true);
10+
var podlistResp = client.CoreV1.WatchListNamespacedPodAsync("default");
11+
1212
// C# 8 required https://docs.microsoft.com/en-us/archive/msdn-magazine/2019/november/csharp-iterating-with-async-enumerables-in-csharp-8
13-
await foreach (var (type, item) in podlistResp.WatchAsync<V1Pod, V1PodList>().ConfigureAwait(false))
13+
await foreach (var (type, item) in podlistResp.ConfigureAwait(false))
1414
{
1515
Console.WriteLine("==on watch event==");
1616
Console.WriteLine(type);
@@ -22,8 +22,7 @@
2222
void WatchUsingCallback(IKubernetes client)
2323
#pragma warning restore CS8321 // Remove unused private members
2424
{
25-
var podlistResp = client.CoreV1.ListNamespacedPodWithHttpMessagesAsync("default", watch: true);
26-
using (podlistResp.Watch<V1Pod, V1PodList>((type, item) =>
25+
using (var podlistResp = client.CoreV1.WatchListNamespacedPod("default", onEvent: (type, item) =>
2726
{
2827
Console.WriteLine("==on watch event==");
2928
Console.WriteLine(type);
@@ -37,4 +36,4 @@ void WatchUsingCallback(IKubernetes client)
3736
Console.CancelKeyPress += (sender, eventArgs) => ctrlc.Set();
3837
ctrlc.Wait();
3938
}
40-
}
39+
}

src/KubernetesClient/WatcherExt.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public static class WatcherExt
1616
/// The action to invoke when the server closes the connection.
1717
/// </param>
1818
/// <returns>a watch object</returns>
19+
[Obsolete("This method will be deprecated in future versions. Please use the Watch method instead.")]
1920
public static Watcher<T> Watch<T, L>(
2021
this Task<HttpOperationResponse<L>> responseTask,
2122
Action<WatchEventType, T> onEvent,
@@ -52,6 +53,7 @@ private static Func<Task<TextReader>> MakeStreamReaderCreator<T, L>(Task<HttpOpe
5253
/// The action to invoke when the server closes the connection.
5354
/// </param>
5455
/// <returns>a watch object</returns>
56+
[Obsolete("This method will be deprecated in future versions. Please use the Watch method instead.")]
5557
public static Watcher<T> Watch<T, L>(
5658
this HttpOperationResponse<L> response,
5759
Action<WatchEventType, T> onEvent,
@@ -71,6 +73,7 @@ public static Watcher<T> Watch<T, L>(
7173
/// <param name="onError">a callback when any exception was caught during watching</param>
7274
/// <param name="cancellationToken">cancellation token</param>
7375
/// <returns>IAsyncEnumerable of watch events</returns>
76+
[Obsolete("This method will be deprecated in future versions. Please use the WatchAsync method instead.")]
7477
public static IAsyncEnumerable<(WatchEventType, T)> WatchAsync<T, L>(
7578
this Task<HttpOperationResponse<L>> responseTask,
7679
Action<Exception> onError = null,

src/LibKubernetesGenerator/ParamHelper.cs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using Scriban.Runtime;
44
using System;
55
using System.Linq;
6+
using System.Collections.Generic;
67

78
namespace LibKubernetesGenerator
89
{
@@ -21,6 +22,8 @@ public void RegisterHelper(ScriptObject scriptObject)
2122
{
2223
scriptObject.Import(nameof(GetModelCtorParam), new Func<JsonSchema, string>(GetModelCtorParam));
2324
scriptObject.Import(nameof(IfParamContains), IfParamContains);
25+
scriptObject.Import(nameof(FilterParameters), FilterParameters);
26+
scriptObject.Import(nameof(GetParameterValueForWatch), new Func<OpenApiParameter, bool, string, string>(GetParameterValueForWatch));
2427
}
2528

2629
public static bool IfParamContains(OpenApiOperation operation, string name)
@@ -39,6 +42,23 @@ public static bool IfParamContains(OpenApiOperation operation, string name)
3942
return found;
4043
}
4144

45+
public static IEnumerable<OpenApiParameter> FilterParameters(OpenApiOperation operation, string excludeParam)
46+
{
47+
return operation.Parameters.Where(p => p.Name != excludeParam);
48+
}
49+
50+
public string GetParameterValueForWatch(OpenApiParameter parameter, bool watch, string init = "false")
51+
{
52+
if (parameter.Name == "watch")
53+
{
54+
return watch ? "true" : "false";
55+
}
56+
else
57+
{
58+
return generalNameHelper.GetDotNetNameOpenApiParameter(parameter, init);
59+
}
60+
}
61+
4262
public string GetModelCtorParam(JsonSchema schema)
4363
{
4464
return string.Join(", ", schema.Properties.Values
@@ -57,4 +77,4 @@ public string GetModelCtorParam(JsonSchema schema)
5777
}));
5878
}
5979
}
60-
}
80+
}

src/LibKubernetesGenerator/TypeHelper.cs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,6 @@ private string GetDotNetType(JsonSchema schema, JsonSchemaProperty parent)
122122
return $"IDictionary<string, {GetDotNetType(schema.AdditionalPropertiesSchema, parent)}>";
123123
}
124124

125-
126125
if (schema?.Reference != null)
127126
{
128127
return classNameHelper.GetClassNameForSchemaDefinition(schema.Reference);
@@ -245,6 +244,16 @@ string toType()
245244
}
246245

247246
break;
247+
case "T":
248+
var itemType = TryGetItemTypeFromSchema(response);
249+
if (itemType != null)
250+
{
251+
return itemType;
252+
}
253+
254+
break;
255+
case "TList":
256+
return t;
248257
}
249258

250259
return t;
@@ -283,5 +292,26 @@ public static bool IfType(JsonSchemaProperty property, string type)
283292

284293
return false;
285294
}
295+
296+
private string TryGetItemTypeFromSchema(OpenApiResponse response)
297+
{
298+
var listSchema = response?.Schema?.Reference;
299+
if (listSchema?.Properties?.TryGetValue("items", out var itemsProperty) != true)
300+
{
301+
return null;
302+
}
303+
304+
if (itemsProperty.Reference != null)
305+
{
306+
return classNameHelper.GetClassNameForSchemaDefinition(itemsProperty.Reference);
307+
}
308+
309+
if (itemsProperty.Item?.Reference != null)
310+
{
311+
return classNameHelper.GetClassNameForSchemaDefinition(itemsProperty.Item.Reference);
312+
}
313+
314+
return null;
315+
}
286316
}
287-
}
317+
}

src/LibKubernetesGenerator/templates/Client.cs.template

Lines changed: 74 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@ public partial class {{name}}Client : ResourceClient
1717
}
1818

1919
{{for api in apis }}
20+
{{~ $filteredParams = FilterParameters api.operation "watch" ~}}
2021
/// <summary>
2122
/// {{ToXmlDoc api.operation.description}}
2223
/// </summary>
23-
{{ for parameter in api.operation.parameters}}
24+
{{ for parameter in $filteredParams}}
2425
/// <param name="{{GetDotNetNameOpenApiParameter parameter "false"}}">
2526
/// {{ToXmlDoc parameter.description}}
2627
/// </param>
@@ -29,15 +30,15 @@ public partial class {{name}}Client : ResourceClient
2930
/// A <see cref="CancellationToken"/> which can be used to cancel the asynchronous operation.
3031
/// </param>
3132
public async Task{{GetReturnType api.operation "<>"}} {{GetActionName api.operation name "Async"}}(
32-
{{ for parameter in api.operation.parameters}}
33+
{{ for parameter in $filteredParams}}
3334
{{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}},
3435
{{ end }}
3536
CancellationToken cancellationToken = default(CancellationToken))
3637
{
3738
{{if IfReturnType api.operation "stream"}}
3839
var _result = await Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}(
3940
{{ for parameter in api.operation.parameters}}
40-
{{GetDotNetNameOpenApiParameter parameter "false"}},
41+
{{GetParameterValueForWatch parameter false}},
4142
{{end}}
4243
null,
4344
cancellationToken);
@@ -47,7 +48,7 @@ public partial class {{name}}Client : ResourceClient
4748
{{if IfReturnType api.operation "obj"}}
4849
using (var _result = await Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}(
4950
{{ for parameter in api.operation.parameters}}
50-
{{GetDotNetNameOpenApiParameter parameter "false"}},
51+
{{GetParameterValueForWatch parameter false}},
5152
{{end}}
5253
null,
5354
cancellationToken).ConfigureAwait(false))
@@ -58,7 +59,7 @@ public partial class {{name}}Client : ResourceClient
5859
{{if IfReturnType api.operation "void"}}
5960
using (var _result = await Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}(
6061
{{ for parameter in api.operation.parameters}}
61-
{{GetDotNetNameOpenApiParameter parameter "false"}},
62+
{{GetParameterValueForWatch parameter false}},
6263
{{end}}
6364
null,
6465
cancellationToken).ConfigureAwait(false))
@@ -71,7 +72,7 @@ public partial class {{name}}Client : ResourceClient
7172
/// <summary>
7273
/// {{ToXmlDoc api.operation.description}}
7374
/// </summary>
74-
{{ for parameter in api.operation.parameters}}
75+
{{ for parameter in $filteredParams}}
7576
/// <param name="{{GetDotNetNameOpenApiParameter parameter "false"}}">
7677
/// {{ToXmlDoc parameter.description}}
7778
/// </param>
@@ -80,14 +81,14 @@ public partial class {{name}}Client : ResourceClient
8081
/// A <see cref="CancellationToken"/> which can be used to cancel the asynchronous operation.
8182
/// </param>
8283
public async Task<T> {{GetActionName api.operation name "Async"}}<T>(
83-
{{ for parameter in api.operation.parameters}}
84+
{{ for parameter in $filteredParams}}
8485
{{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "false"}},
8586
{{ end }}
8687
CancellationToken cancellationToken = default(CancellationToken))
8788
{
8889
using (var _result = await Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}<T>(
8990
{{ for parameter in api.operation.parameters}}
90-
{{GetDotNetNameOpenApiParameter parameter "false"}},
91+
{{GetParameterValueForWatch parameter false}},
9192
{{end}}
9293
null,
9394
cancellationToken).ConfigureAwait(false))
@@ -96,5 +97,69 @@ public partial class {{name}}Client : ResourceClient
9697
}
9798
}
9899
{{end}}
100+
101+
#if !K8S_AOT
102+
{{if IfParamContains api.operation "watch"}}
103+
/// <summary>
104+
/// Watch {{ToXmlDoc api.operation.description}}
105+
/// </summary>
106+
{{ for parameter in $filteredParams}}
107+
/// <param name="{{GetDotNetNameOpenApiParameter parameter "false"}}">
108+
/// {{ToXmlDoc parameter.description}}
109+
/// </param>
110+
{{ end }}
111+
/// <param name="onEvent">Callback when any event raised from api server</param>
112+
/// <param name="onError">Callback when any exception was caught during watching</param>
113+
/// <param name="onClosed">Callback when the server closes the connection</param>
114+
public Watcher<{{GetReturnType api.operation "T"}}> Watch{{GetActionName api.operation name ""}}(
115+
{{ for parameter in $filteredParams}}
116+
{{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}},
117+
{{ end }}
118+
Action<WatchEventType, {{GetReturnType api.operation "T"}}> onEvent = null,
119+
Action<Exception> onError = null,
120+
Action onClosed = null)
121+
{
122+
if (onEvent == null) throw new ArgumentNullException(nameof(onEvent));
123+
124+
var responseTask = Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}(
125+
{{ for parameter in api.operation.parameters}}
126+
{{GetParameterValueForWatch parameter true}},
127+
{{ end }}
128+
null,
129+
CancellationToken.None);
130+
131+
return responseTask.Watch<{{GetReturnType api.operation "T"}}, {{GetReturnType api.operation "TList"}}>(
132+
onEvent, onError, onClosed);
133+
}
134+
135+
/// <summary>
136+
/// Watch {{ToXmlDoc api.operation.description}} as async enumerable
137+
/// </summary>
138+
{{ for parameter in $filteredParams}}
139+
/// <param name="{{GetDotNetNameOpenApiParameter parameter "false"}}">
140+
/// {{ToXmlDoc parameter.description}}
141+
/// </param>
142+
{{ end }}
143+
/// <param name="onError">Callback when any exception was caught during watching</param>
144+
/// <param name="cancellationToken">Cancellation token</param>
145+
public IAsyncEnumerable<(WatchEventType, {{GetReturnType api.operation "T"}})> Watch{{GetActionName api.operation name "Async"}}(
146+
{{ for parameter in $filteredParams}}
147+
{{GetDotNetTypeOpenApiParameter parameter}} {{GetDotNetNameOpenApiParameter parameter "true"}},
148+
{{ end }}
149+
Action<Exception> onError = null,
150+
CancellationToken cancellationToken = default)
151+
{
152+
var responseTask = Client.{{group}}.{{GetOperationId api.operation "WithHttpMessagesAsync"}}(
153+
{{ for parameter in api.operation.parameters}}
154+
{{GetParameterValueForWatch parameter true}},
155+
{{ end }}
156+
null,
157+
cancellationToken);
158+
159+
return responseTask.WatchAsync<{{GetReturnType api.operation "T"}}, {{GetReturnType api.operation "TList"}}>(
160+
onError, cancellationToken);
161+
}
99162
{{end}}
100-
}
163+
#endif
164+
{{end}}
165+
}

0 commit comments

Comments
 (0)