From 3bfac638c390e085722f0a8a7b5d1c0aa64d8cf8 Mon Sep 17 00:00:00 2001 From: Steven Hawkins Date: Wed, 27 Mar 2024 09:45:12 -0400 Subject: [PATCH] fix: using tombstones to account for rapid deletion (#2317) closes: #2314 Signed-off-by: Steven Hawkins --- .../informer/ManagedInformerEventSource.java | 6 +- .../informer/TemporaryResourceCache.java | 94 +++++++++++++++---- .../informer/InformerEventSourceTest.java | 2 +- .../informer/TemporaryResourceCacheTest.java | 51 +++++++++- 4 files changed, 125 insertions(+), 28 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index ec72da0529..7139395e76 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -51,17 +51,17 @@ protected ManagedInformerEventSource( @Override public void onAdd(R resource) { - temporaryResourceCache.onEvent(resource, false); + temporaryResourceCache.onAddOrUpdateEvent(resource); } @Override public void onUpdate(R oldObj, R newObj) { - temporaryResourceCache.onEvent(newObj, false); + temporaryResourceCache.onAddOrUpdateEvent(newObj); } @Override public void onDelete(R obj, boolean deletedFinalStateUnknown) { - temporaryResourceCache.onEvent(obj, deletedFinalStateUnknown); + temporaryResourceCache.onDeleteEvent(obj, deletedFinalStateUnknown); } protected InformerManager manager() { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index fd9a8ad565..b905629e69 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -1,10 +1,8 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; -import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; @@ -18,8 +16,8 @@ /** *

* Temporal cache is used to solve the problem for {@link KubernetesDependentResource} that is, when - * a create or update is executed the subsequent getResource opeeration might not return the - * up-to-date resource from informer cache, since it is not received yet by webhook. + * a create or update is executed the subsequent getResource operation might not return the + * up-to-date resource from informer cache, since it is not received yet. *

*

* The idea of the solution is, that since an update (for create is simpler) was done successfully, @@ -36,31 +34,78 @@ */ public class TemporaryResourceCache { + static class ExpirationCache { + private final LinkedHashMap cache; + private final int ttlMs; + + public ExpirationCache(int maxEntries, int ttlMs) { + this.ttlMs = ttlMs; + this.cache = new LinkedHashMap<>() { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > maxEntries; + } + }; + } + + public void add(K key) { + clean(); + cache.putIfAbsent(key, System.currentTimeMillis()); + } + + public boolean contains(K key) { + clean(); + return cache.get(key) != null; + } + + void clean() { + if (!cache.isEmpty()) { + long currentTimeMillis = System.currentTimeMillis(); + var iter = cache.entrySet().iterator(); + // the order will already be from oldest to newest, clean a fixed number of entries to + // amortize the cost amongst multiple calls + for (int i = 0; i < 10 && iter.hasNext(); i++) { + var entry = iter.next(); + if (currentTimeMillis - entry.getValue() > ttlMs) { + iter.remove(); + } + } + } + } + } + private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class); - private static final int MAX_RESOURCE_VERSIONS = 256; private final Map cache = new ConcurrentHashMap<>(); + + // keep up to the last million deletions for up to 10 minutes + private final ExpirationCache tombstones = new ExpirationCache<>(1000000, 1200000); private final ManagedInformerEventSource managedInformerEventSource; private final boolean parseResourceVersions; - private final Set knownResourceVersions; + private final ExpirationCache knownResourceVersions; public TemporaryResourceCache(ManagedInformerEventSource managedInformerEventSource, boolean parseResourceVersions) { this.managedInformerEventSource = managedInformerEventSource; this.parseResourceVersions = parseResourceVersions; if (parseResourceVersions) { - knownResourceVersions = Collections.newSetFromMap(new LinkedHashMap() { - @Override - protected boolean removeEldestEntry(java.util.Map.Entry eldest) { - return size() >= MAX_RESOURCE_VERSIONS; - } - }); + // keep up to the 50000 add/updates for up to 5 minutes + knownResourceVersions = new ExpirationCache<>(50000, 600000); } else { knownResourceVersions = null; } } - public synchronized void onEvent(T resource, boolean unknownState) { + public synchronized void onDeleteEvent(T resource, boolean unknownState) { + tombstones.add(resource.getMetadata().getUid()); + onEvent(resource, unknownState); + } + + public synchronized void onAddOrUpdateEvent(T resource) { + onEvent(resource, false); + } + + synchronized void onEvent(T resource, boolean unknownState) { cache.computeIfPresent(ResourceID.fromResource(resource), (id, cached) -> (unknownState || !isLaterResourceVersion(id, cached, resource)) ? null : cached); @@ -84,20 +129,33 @@ public synchronized void putResource(T newResource, String previousResourceVersi var cachedResource = getResourceFromCache(resourceId) .orElse(managedInformerEventSource.get(resourceId).orElse(null)); - if ((previousResourceVersion == null && cachedResource == null) + boolean moveAhead = false; + if (previousResourceVersion == null && cachedResource == null) { + if (tombstones.contains(newResource.getMetadata().getUid())) { + log.debug( + "Won't resurrect uid {} for resource id: {}", + newResource.getMetadata().getUid(), resourceId); + return; + } + // we can skip further checks as this is a simple add and there's no previous entry to + // consider + moveAhead = true; + } + + if (moveAhead || (cachedResource != null && (cachedResource.getMetadata().getResourceVersion().equals(previousResourceVersion)) || isLaterResourceVersion(resourceId, newResource, cachedResource))) { log.debug( "Temporarily moving ahead to target version {} for resource id: {}", newResource.getMetadata().getResourceVersion(), resourceId); - putToCache(newResource, resourceId); + cache.put(resourceId, newResource); } else if (cache.remove(resourceId) != null) { log.debug("Removed an obsolete resource from cache for id: {}", resourceId); } } - public boolean isKnownResourceVersion(T resource) { + public synchronized boolean isKnownResourceVersion(T resource) { return knownResourceVersions != null && knownResourceVersions.contains(resource.getMetadata().getResourceVersion()); } @@ -123,10 +181,6 @@ private boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T c return false; } - private void putToCache(T resource, ResourceID resourceID) { - cache.put(resourceID == null ? ResourceID.fromResource(resource) : resourceID, resource); - } - public synchronized Optional getResourceFromCache(ResourceID resourceID) { return Optional.ofNullable(cache.get(resourceID)); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index ce3c52076d..5b8aac89e6 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -120,7 +120,7 @@ void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() { informerEventSource.onUpdate(cachedDeployment, testDeployment()); verify(eventHandlerMock, times(1)).handleEvent(any()); - verify(temporaryResourceCacheMock, times(1)).onEvent(testDeployment(), false); + verify(temporaryResourceCacheMock, times(1)).onAddOrUpdateEvent(testDeployment()); } @Test diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java index d641736739..60eb7245a9 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java @@ -2,7 +2,9 @@ import java.util.Map; import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.awaitility.Awaitility; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -10,6 +12,7 @@ import io.fabric8.kubernetes.api.model.ConfigMapBuilder; import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.ExpirationCache; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -81,7 +84,7 @@ void addOperationNotAddsTheResourceIfInformerCacheNotEmpty() { void removesResourceFromCache() { ConfigMap testResource = propagateTestResourceToCache(); - temporaryResourceCache.onEvent(testResource(), false); + temporaryResourceCache.onAddOrUpdateEvent(testResource()); assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) .isNotPresent(); @@ -96,20 +99,59 @@ void resourceVersionParsing() { ConfigMap testResource = propagateTestResourceToCache(); // an event with a newer version will not remove - temporaryResourceCache.onEvent(new ConfigMapBuilder(testResource).editMetadata() - .withResourceVersion("1").endMetadata().build(), false); + temporaryResourceCache.onAddOrUpdateEvent(new ConfigMapBuilder(testResource).editMetadata() + .withResourceVersion("1").endMetadata().build()); assertThat(temporaryResourceCache.isKnownResourceVersion(testResource)).isTrue(); assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) .isPresent(); // anything else will remove - temporaryResourceCache.onEvent(testResource(), false); + temporaryResourceCache.onAddOrUpdateEvent(testResource()); assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) .isNotPresent(); } + @Test + void rapidDeletion() { + var testResource = testResource(); + + temporaryResourceCache.onAddOrUpdateEvent(testResource); + temporaryResourceCache.onDeleteEvent(new ConfigMapBuilder(testResource).editMetadata() + .withResourceVersion("3").endMetadata().build(), false); + temporaryResourceCache.putAddedResource(testResource); + + assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) + .isEmpty(); + } + + @Test + void expirationCacheMax() { + ExpirationCache cache = new ExpirationCache<>(2, Integer.MAX_VALUE); + + cache.add(1); + cache.add(2); + cache.add(3); + + assertThat(cache.contains(1)).isFalse(); + assertThat(cache.contains(2)).isTrue(); + assertThat(cache.contains(3)).isTrue(); + } + + @Test + void expirationCacheTtl() { + ExpirationCache cache = new ExpirationCache<>(2, 1); + + cache.add(1); + cache.add(2); + + Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(cache.contains(1)).isFalse(); + assertThat(cache.contains(2)).isFalse(); + }); + } + private ConfigMap propagateTestResourceToCache() { var testResource = testResource(); when(informerEventSource.get(any())).thenReturn(Optional.empty()); @@ -127,6 +169,7 @@ ConfigMap testResource() { configMap.getMetadata().setName("test"); configMap.getMetadata().setNamespace("default"); configMap.getMetadata().setResourceVersion(RESOURCE_VERSION); + configMap.getMetadata().setUid("test-uid"); return configMap; }