Skip to content

Commit 2acb3f3

Browse files
csvirixstefankafalhambra-hivemqmetacosm
authored
feat: primary resource caching for followup reconciliation(s) (#2761)
Signed-off-by: Attila Mészáros <[email protected]> Signed-off-by: Chris Laprun <[email protected]> Co-authored-by: Martin Stefanko <[email protected]> Co-authored-by: Antonio <[email protected]> Co-authored-by: Chris Laprun <[email protected]> Co-authored-by: Chris Laprun <[email protected]>
1 parent 4225645 commit 2acb3f3

File tree

20 files changed

+917
-39
lines changed

20 files changed

+917
-39
lines changed

docs/content/en/docs/documentation/reconciler.md

+114
Original file line numberDiff line numberDiff line change
@@ -169,3 +169,117 @@ You can specify the name of the finalizer to use for your `Reconciler` using the
169169
annotation. If you do not specify a finalizer name, one will be automatically generated for you.
170170

171171
From v5, by default, the finalizer is added using Server Side Apply. See also `UpdateControl` in docs.
172+
173+
### Making sure the primary resource is up to date for the next reconciliation
174+
175+
It is typical to want to update the status subresource with the information that is available during the reconciliation.
176+
This is sometimes referred to as the last observed state. When the primary resource is updated, though, the framework
177+
does not cache the resource directly, relying instead on the propagation of the update to the underlying informer's
178+
cache. It can, therefore, happen that, if other events trigger other reconciliations before the informer cache gets
179+
updated, your reconciler does not see the latest version of the primary resource. While this might not typically be a
180+
problem in most cases, as caches eventually become consistent, depending on your reconciliation logic, you might still
181+
require the latest status version possible, for example if the status subresource is used as a communication mechanism,
182+
see [Representing Allocated Values](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#representing-allocated-values)
183+
from the Kubernetes docs for more details.
184+
185+
The framework provides utilities to help with these use cases with
186+
[`PrimaryUpdateAndCacheUtils`](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java).
187+
These utility methods come in two flavors:
188+
189+
#### Using internal cache
190+
191+
In almost all cases for this purpose, you can use internal caches:
192+
193+
```java
194+
@Override
195+
public UpdateControl<StatusPatchCacheCustomResource> reconcile(
196+
StatusPatchCacheCustomResource resource, Context<StatusPatchCacheCustomResource> context) {
197+
198+
// omitted logic
199+
200+
// update with SSA requires a fresh copy
201+
var freshCopy = createFreshCopy(primary);
202+
freshCopy.getStatus().setValue(statusWithState());
203+
204+
var updatedResource = PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(resource, freshCopy, context);
205+
206+
return UpdateControl.noUpdate();
207+
}
208+
```
209+
210+
In the background `PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus` puts the result of the update into an internal
211+
cache and will make sure that the next reconciliation will contain the most recent version of the resource. Note that it
212+
is not necessarily the version of the resource you got as response from the update, it can be newer since other parties
213+
can do additional updates meanwhile, but if not explicitly modified, it will contain the up-to-date status.
214+
215+
See related integration test [here](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal).
216+
217+
This approach works with the default configuration of the framework and should be good to go in most of the cases.
218+
Without going further into the details, this won't work if `ConfigurationService.parseResourceVersionsForEventFilteringAndCaching`
219+
is set to `false` (more precisely there are some edge cases when it won't work). For that case framework provides the following solution:
220+
221+
#### Fallback approach: using `PrimaryResourceCache` cache
222+
223+
As an alternative, for very rare cases when `ConfigurationService.parseResourceVersionsForEventFilteringAndCaching`
224+
needs to be set to `false` you can use an explicit caching approach:
225+
226+
```java
227+
228+
// We on purpose don't use the provided predicate to show what a custom one could look like.
229+
private final PrimaryResourceCache<StatusPatchPrimaryCacheCustomResource> cache =
230+
new PrimaryResourceCache<>(
231+
(statusPatchCacheCustomResourcePair, statusPatchCacheCustomResource) ->
232+
statusPatchCacheCustomResource.getStatus().getValue()
233+
>= statusPatchCacheCustomResourcePair.afterUpdate().getStatus().getValue());
234+
235+
@Override
236+
public UpdateControl<StatusPatchPrimaryCacheCustomResource> reconcile(
237+
StatusPatchPrimaryCacheCustomResource primary,
238+
Context<StatusPatchPrimaryCacheCustomResource> context) {
239+
240+
// cache will compare the current and the cached resource and return the more recent. (And evict the old)
241+
primary = cache.getFreshResource(primary);
242+
243+
// omitted logic
244+
245+
var freshCopy = createFreshCopy(primary);
246+
247+
freshCopy.getStatus().setValue(statusWithState());
248+
249+
var updated =
250+
PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(primary, freshCopy, context, cache);
251+
252+
return UpdateControl.noUpdate();
253+
}
254+
255+
@Override
256+
public DeleteControl cleanup(
257+
StatusPatchPrimaryCacheCustomResource resource,
258+
Context<StatusPatchPrimaryCacheCustomResource> context)
259+
throws Exception {
260+
// cleanup the cache on resource deletion
261+
cache.cleanup(resource);
262+
return DeleteControl.defaultDelete();
263+
}
264+
265+
```
266+
267+
[`PrimaryResourceCache`](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCache.java)
268+
is designed for this purpose. As shown in the example above, it is up to you to provide a predicate to determine if the
269+
resource is more recent than the one available. In other words, when to evict the resource from the cache. Typically, as
270+
shown in
271+
the [integration test](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache)
272+
you can have a counter in status to check on that.
273+
274+
Since all of this happens explicitly, you cannot use this approach for managed dependent resources and workflows and
275+
will need to use the unmanaged approach instead. This is due to the fact that managed dependent resources always get
276+
their associated primary resource from the underlying informer event source cache.
277+
278+
#### Additional remarks
279+
280+
As shown in the integration tests, there is no optimistic locking used when updating the
281+
[resource](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java#L41)
282+
(in other words `metadata.resourceVersion` is set to `null`). This is desired since you don't want the patch to fail on
283+
update.
284+
285+
In addition, you can configure the [Fabric8 client retry](https://github.com/fabric8io/kubernetes-client?tab=readme-ov-file#configuring-the-client).
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
package io.javaoperatorsdk.operator.api.reconciler;
2+
3+
import java.util.function.Supplier;
4+
import java.util.function.UnaryOperator;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import io.fabric8.kubernetes.api.model.HasMetadata;
10+
import io.fabric8.kubernetes.client.dsl.base.PatchContext;
11+
import io.fabric8.kubernetes.client.dsl.base.PatchType;
12+
import io.javaoperatorsdk.operator.api.reconciler.support.PrimaryResourceCache;
13+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
14+
15+
/**
16+
* Utility methods to patch the primary resource state and store it to the related cache, to make
17+
* sure that fresh resource is present for the next reconciliation. The main use case for such
18+
* updates is to store state is resource status. Use of optimistic locking is not desired for such
19+
* updates, since we don't want to patch fail and lose information that we want to store.
20+
*/
21+
public class PrimaryUpdateAndCacheUtils {
22+
23+
private PrimaryUpdateAndCacheUtils() {}
24+
25+
private static final Logger log = LoggerFactory.getLogger(PrimaryUpdateAndCacheUtils.class);
26+
27+
/**
28+
* Makes sure that the up-to-date primary resource will be present during the next reconciliation.
29+
* Using update (PUT) method.
30+
*
31+
* @param primary resource
32+
* @param context of reconciliation
33+
* @return updated resource
34+
* @param <P> primary resource type
35+
*/
36+
public static <P extends HasMetadata> P updateAndCacheStatus(P primary, Context<P> context) {
37+
logWarnIfResourceVersionPresent(primary);
38+
return patchAndCacheStatus(
39+
primary, context, () -> context.getClient().resource(primary).updateStatus());
40+
}
41+
42+
/**
43+
* Makes sure that the up-to-date primary resource will be present during the next reconciliation.
44+
* Using JSON Merge patch.
45+
*
46+
* @param primary resource
47+
* @param context of reconciliation
48+
* @return updated resource
49+
* @param <P> primary resource type
50+
*/
51+
public static <P extends HasMetadata> P patchAndCacheStatus(P primary, Context<P> context) {
52+
logWarnIfResourceVersionPresent(primary);
53+
return patchAndCacheStatus(
54+
primary, context, () -> context.getClient().resource(primary).patchStatus());
55+
}
56+
57+
/**
58+
* Makes sure that the up-to-date primary resource will be present during the next reconciliation.
59+
* Using JSON Patch.
60+
*
61+
* @param primary resource
62+
* @param context of reconciliation
63+
* @return updated resource
64+
* @param <P> primary resource type
65+
*/
66+
public static <P extends HasMetadata> P editAndCacheStatus(
67+
P primary, Context<P> context, UnaryOperator<P> operation) {
68+
logWarnIfResourceVersionPresent(primary);
69+
return patchAndCacheStatus(
70+
primary, context, () -> context.getClient().resource(primary).editStatus(operation));
71+
}
72+
73+
/**
74+
* Makes sure that the up-to-date primary resource will be present during the next reconciliation.
75+
*
76+
* @param primary resource
77+
* @param context of reconciliation
78+
* @param patch free implementation of cache
79+
* @return the updated resource.
80+
* @param <P> primary resource type
81+
*/
82+
public static <P extends HasMetadata> P patchAndCacheStatus(
83+
P primary, Context<P> context, Supplier<P> patch) {
84+
var updatedResource = patch.get();
85+
context
86+
.eventSourceRetriever()
87+
.getControllerEventSource()
88+
.handleRecentResourceUpdate(ResourceID.fromResource(primary), updatedResource, primary);
89+
return updatedResource;
90+
}
91+
92+
/**
93+
* Makes sure that the up-to-date primary resource will be present during the next reconciliation.
94+
* Using Server Side Apply.
95+
*
96+
* @param primary resource
97+
* @param freshResourceWithStatus - fresh resource with target state
98+
* @param context of reconciliation
99+
* @return the updated resource.
100+
* @param <P> primary resource type
101+
*/
102+
public static <P extends HasMetadata> P ssaPatchAndCacheStatus(
103+
P primary, P freshResourceWithStatus, Context<P> context) {
104+
logWarnIfResourceVersionPresent(freshResourceWithStatus);
105+
var res =
106+
context
107+
.getClient()
108+
.resource(freshResourceWithStatus)
109+
.subresource("status")
110+
.patch(
111+
new PatchContext.Builder()
112+
.withForce(true)
113+
.withFieldManager(context.getControllerConfiguration().fieldManager())
114+
.withPatchType(PatchType.SERVER_SIDE_APPLY)
115+
.build());
116+
117+
context
118+
.eventSourceRetriever()
119+
.getControllerEventSource()
120+
.handleRecentResourceUpdate(ResourceID.fromResource(primary), res, primary);
121+
return res;
122+
}
123+
124+
/**
125+
* Patches the resource and adds it to the {@link PrimaryResourceCache}.
126+
*
127+
* @param primary resource
128+
* @param freshResourceWithStatus - fresh resource with target state
129+
* @param context of reconciliation
130+
* @param cache - resource cache managed by user
131+
* @return the updated resource.
132+
* @param <P> primary resource type
133+
*/
134+
public static <P extends HasMetadata> P ssaPatchAndCacheStatus(
135+
P primary, P freshResourceWithStatus, Context<P> context, PrimaryResourceCache<P> cache) {
136+
logWarnIfResourceVersionPresent(freshResourceWithStatus);
137+
return patchAndCacheStatus(
138+
primary,
139+
cache,
140+
() ->
141+
context
142+
.getClient()
143+
.resource(freshResourceWithStatus)
144+
.subresource("status")
145+
.patch(
146+
new PatchContext.Builder()
147+
.withForce(true)
148+
.withFieldManager(context.getControllerConfiguration().fieldManager())
149+
.withPatchType(PatchType.SERVER_SIDE_APPLY)
150+
.build()));
151+
}
152+
153+
/**
154+
* Patches the resource with JSON Patch and adds it to the {@link PrimaryResourceCache}.
155+
*
156+
* @param primary resource
157+
* @param context of reconciliation
158+
* @param cache - resource cache managed by user
159+
* @return the updated resource.
160+
* @param <P> primary resource type
161+
*/
162+
public static <P extends HasMetadata> P editAndCacheStatus(
163+
P primary, Context<P> context, PrimaryResourceCache<P> cache, UnaryOperator<P> operation) {
164+
logWarnIfResourceVersionPresent(primary);
165+
return patchAndCacheStatus(
166+
primary, cache, () -> context.getClient().resource(primary).editStatus(operation));
167+
}
168+
169+
/**
170+
* Patches the resource with JSON Merge patch and adds it to the {@link PrimaryResourceCache}
171+
* provided.
172+
*
173+
* @param primary resource
174+
* @param context of reconciliation
175+
* @param cache - resource cache managed by user
176+
* @return the updated resource.
177+
* @param <P> primary resource type
178+
*/
179+
public static <P extends HasMetadata> P patchAndCacheStatus(
180+
P primary, Context<P> context, PrimaryResourceCache<P> cache) {
181+
logWarnIfResourceVersionPresent(primary);
182+
return patchAndCacheStatus(
183+
primary, cache, () -> context.getClient().resource(primary).patchStatus());
184+
}
185+
186+
/**
187+
* Updates the resource and adds it to the {@link PrimaryResourceCache}.
188+
*
189+
* @param primary resource
190+
* @param context of reconciliation
191+
* @param cache - resource cache managed by user
192+
* @return the updated resource.
193+
* @param <P> primary resource type
194+
*/
195+
public static <P extends HasMetadata> P updateAndCacheStatus(
196+
P primary, Context<P> context, PrimaryResourceCache<P> cache) {
197+
logWarnIfResourceVersionPresent(primary);
198+
return patchAndCacheStatus(
199+
primary, cache, () -> context.getClient().resource(primary).updateStatus());
200+
}
201+
202+
/**
203+
* Updates the resource using the user provided implementation anc caches the result.
204+
*
205+
* @param primary resource
206+
* @param cache resource cache managed by user
207+
* @param patch implementation of resource update*
208+
* @return the updated resource.
209+
* @param <P> primary resource type
210+
*/
211+
public static <P extends HasMetadata> P patchAndCacheStatus(
212+
P primary, PrimaryResourceCache<P> cache, Supplier<P> patch) {
213+
var updatedResource = patch.get();
214+
cache.cacheResource(primary, updatedResource);
215+
return updatedResource;
216+
}
217+
218+
private static <P extends HasMetadata> void logWarnIfResourceVersionPresent(P primary) {
219+
if (primary.getMetadata().getResourceVersion() != null) {
220+
log.warn(
221+
"The metadata.resourceVersion of primary resource is NOT null, "
222+
+ "using optimistic locking is discouraged for this purpose. ");
223+
}
224+
}
225+
}

0 commit comments

Comments
 (0)