Skip to content

Commit

Permalink
fix: using tombstones to account for rapid deletion (#2317)
Browse files Browse the repository at this point in the history
closes: #2314

Signed-off-by: Steven Hawkins <shawkins@redhat.com>
  • Loading branch information
shawkins authored Mar 27, 2024
1 parent e5cb5b8 commit 3bfac63
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ protected ManagedInformerEventSource(

@Override
public void onAdd(R resource) {
temporaryResourceCache.onEvent(resource, false);
temporaryResourceCache.onAddOrUpdateEvent(resource);
}

@Override
public void onUpdate(R oldObj, R newObj) {
temporaryResourceCache.onEvent(newObj, false);
temporaryResourceCache.onAddOrUpdateEvent(newObj);
}

@Override
public void onDelete(R obj, boolean deletedFinalStateUnknown) {
temporaryResourceCache.onEvent(obj, deletedFinalStateUnknown);
temporaryResourceCache.onDeleteEvent(obj, deletedFinalStateUnknown);
}

protected InformerManager<R, C> manager() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
Expand All @@ -18,8 +16,8 @@
/**
* <p>
* Temporal cache is used to solve the problem for {@link KubernetesDependentResource} that is, when
* a create or update is executed the subsequent getResource opeeration might not return the
* up-to-date resource from informer cache, since it is not received yet by webhook.
* a create or update is executed the subsequent getResource operation might not return the
* up-to-date resource from informer cache, since it is not received yet.
* </p>
* <p>
* The idea of the solution is, that since an update (for create is simpler) was done successfully,
Expand All @@ -36,31 +34,78 @@
*/
public class TemporaryResourceCache<T extends HasMetadata> {

static class ExpirationCache<K> {
private final LinkedHashMap<K, Long> cache;
private final int ttlMs;

public ExpirationCache(int maxEntries, int ttlMs) {
this.ttlMs = ttlMs;
this.cache = new LinkedHashMap<>() {
@Override
protected boolean removeEldestEntry(Map.Entry<K, Long> eldest) {
return size() > maxEntries;
}
};
}

public void add(K key) {
clean();
cache.putIfAbsent(key, System.currentTimeMillis());
}

public boolean contains(K key) {
clean();
return cache.get(key) != null;
}

void clean() {
if (!cache.isEmpty()) {
long currentTimeMillis = System.currentTimeMillis();
var iter = cache.entrySet().iterator();
// the order will already be from oldest to newest, clean a fixed number of entries to
// amortize the cost amongst multiple calls
for (int i = 0; i < 10 && iter.hasNext(); i++) {
var entry = iter.next();
if (currentTimeMillis - entry.getValue() > ttlMs) {
iter.remove();
}
}
}
}
}

private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class);
private static final int MAX_RESOURCE_VERSIONS = 256;

private final Map<ResourceID, T> cache = new ConcurrentHashMap<>();

// keep up to the last million deletions for up to 10 minutes
private final ExpirationCache<String> tombstones = new ExpirationCache<>(1000000, 1200000);
private final ManagedInformerEventSource<T, ?, ?> managedInformerEventSource;
private final boolean parseResourceVersions;
private final Set<String> knownResourceVersions;
private final ExpirationCache<String> knownResourceVersions;

public TemporaryResourceCache(ManagedInformerEventSource<T, ?, ?> managedInformerEventSource,
boolean parseResourceVersions) {
this.managedInformerEventSource = managedInformerEventSource;
this.parseResourceVersions = parseResourceVersions;
if (parseResourceVersions) {
knownResourceVersions = Collections.newSetFromMap(new LinkedHashMap<String, Boolean>() {
@Override
protected boolean removeEldestEntry(java.util.Map.Entry<String, Boolean> eldest) {
return size() >= MAX_RESOURCE_VERSIONS;
}
});
// keep up to the 50000 add/updates for up to 5 minutes
knownResourceVersions = new ExpirationCache<>(50000, 600000);
} else {
knownResourceVersions = null;
}
}

public synchronized void onEvent(T resource, boolean unknownState) {
public synchronized void onDeleteEvent(T resource, boolean unknownState) {
tombstones.add(resource.getMetadata().getUid());
onEvent(resource, unknownState);
}

public synchronized void onAddOrUpdateEvent(T resource) {
onEvent(resource, false);
}

synchronized void onEvent(T resource, boolean unknownState) {
cache.computeIfPresent(ResourceID.fromResource(resource),
(id, cached) -> (unknownState || !isLaterResourceVersion(id, cached, resource)) ? null
: cached);
Expand All @@ -84,20 +129,33 @@ public synchronized void putResource(T newResource, String previousResourceVersi
var cachedResource = getResourceFromCache(resourceId)
.orElse(managedInformerEventSource.get(resourceId).orElse(null));

if ((previousResourceVersion == null && cachedResource == null)
boolean moveAhead = false;
if (previousResourceVersion == null && cachedResource == null) {
if (tombstones.contains(newResource.getMetadata().getUid())) {
log.debug(
"Won't resurrect uid {} for resource id: {}",
newResource.getMetadata().getUid(), resourceId);
return;
}
// we can skip further checks as this is a simple add and there's no previous entry to
// consider
moveAhead = true;
}

if (moveAhead
|| (cachedResource != null
&& (cachedResource.getMetadata().getResourceVersion().equals(previousResourceVersion))
|| isLaterResourceVersion(resourceId, newResource, cachedResource))) {
log.debug(
"Temporarily moving ahead to target version {} for resource id: {}",
newResource.getMetadata().getResourceVersion(), resourceId);
putToCache(newResource, resourceId);
cache.put(resourceId, newResource);
} else if (cache.remove(resourceId) != null) {
log.debug("Removed an obsolete resource from cache for id: {}", resourceId);
}
}

public boolean isKnownResourceVersion(T resource) {
public synchronized boolean isKnownResourceVersion(T resource) {
return knownResourceVersions != null
&& knownResourceVersions.contains(resource.getMetadata().getResourceVersion());
}
Expand All @@ -123,10 +181,6 @@ private boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T c
return false;
}

private void putToCache(T resource, ResourceID resourceID) {
cache.put(resourceID == null ? ResourceID.fromResource(resource) : resourceID, resource);
}

public synchronized Optional<T> getResourceFromCache(ResourceID resourceID) {
return Optional.ofNullable(cache.get(resourceID));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() {
informerEventSource.onUpdate(cachedDeployment, testDeployment());

verify(eventHandlerMock, times(1)).handleEvent(any());
verify(temporaryResourceCacheMock, times(1)).onEvent(testDeployment(), false);
verify(temporaryResourceCacheMock, times(1)).onAddOrUpdateEvent(testDeployment());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.ExpirationCache;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -81,7 +84,7 @@ void addOperationNotAddsTheResourceIfInformerCacheNotEmpty() {
void removesResourceFromCache() {
ConfigMap testResource = propagateTestResourceToCache();

temporaryResourceCache.onEvent(testResource(), false);
temporaryResourceCache.onAddOrUpdateEvent(testResource());

assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
.isNotPresent();
Expand All @@ -96,20 +99,59 @@ void resourceVersionParsing() {
ConfigMap testResource = propagateTestResourceToCache();

// an event with a newer version will not remove
temporaryResourceCache.onEvent(new ConfigMapBuilder(testResource).editMetadata()
.withResourceVersion("1").endMetadata().build(), false);
temporaryResourceCache.onAddOrUpdateEvent(new ConfigMapBuilder(testResource).editMetadata()
.withResourceVersion("1").endMetadata().build());

assertThat(temporaryResourceCache.isKnownResourceVersion(testResource)).isTrue();
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
.isPresent();

// anything else will remove
temporaryResourceCache.onEvent(testResource(), false);
temporaryResourceCache.onAddOrUpdateEvent(testResource());

assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
.isNotPresent();
}

@Test
void rapidDeletion() {
var testResource = testResource();

temporaryResourceCache.onAddOrUpdateEvent(testResource);
temporaryResourceCache.onDeleteEvent(new ConfigMapBuilder(testResource).editMetadata()
.withResourceVersion("3").endMetadata().build(), false);
temporaryResourceCache.putAddedResource(testResource);

assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
.isEmpty();
}

@Test
void expirationCacheMax() {
ExpirationCache<Integer> cache = new ExpirationCache<>(2, Integer.MAX_VALUE);

cache.add(1);
cache.add(2);
cache.add(3);

assertThat(cache.contains(1)).isFalse();
assertThat(cache.contains(2)).isTrue();
assertThat(cache.contains(3)).isTrue();
}

@Test
void expirationCacheTtl() {
ExpirationCache<Integer> cache = new ExpirationCache<>(2, 1);

cache.add(1);
cache.add(2);

Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(cache.contains(1)).isFalse();
assertThat(cache.contains(2)).isFalse();
});
}

private ConfigMap propagateTestResourceToCache() {
var testResource = testResource();
when(informerEventSource.get(any())).thenReturn(Optional.empty());
Expand All @@ -127,6 +169,7 @@ ConfigMap testResource() {
configMap.getMetadata().setName("test");
configMap.getMetadata().setNamespace("default");
configMap.getMetadata().setResourceVersion(RESOURCE_VERSION);
configMap.getMetadata().setUid("test-uid");
return configMap;
}

Expand Down

0 comments on commit 3bfac63

Please sign in to comment.