Skip to content

Commit

Permalink
Merge pull request #3856 from Sud0x67/useCachedInformer
Browse files Browse the repository at this point in the history
Enable the reuse of cached informers on creation by `SharedInformerFactory`.
  • Loading branch information
k8s-ci-robot authored Jan 24, 2025
2 parents 9dcc17a + 8973a20 commit f69b343
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,24 @@
import io.kubernetes.client.util.Watchable;
import io.kubernetes.client.util.generic.GenericKubernetesApi;
import io.kubernetes.client.util.generic.options.ListOptions;
import okhttp3.Call;
import org.apache.commons.collections4.MapUtils;

import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import okhttp3.Call;
import org.apache.commons.collections4.MapUtils;

/** SharedInformerFactory class constructs and caches informers for api types. */
/**
* SharedInformerFactory class constructs and caches informers for api types.
* For each api type, only the first created informer is stored in the cache.
* If reuseExistingCachedInformer is set to true, It returns the cached informer when creating
* an informer for the same api type.
* Otherwise, this factory creates a new informer on each invocation.
*/
public class SharedInformerFactory {

protected Map<Type, SharedIndexInformer> informers;
Expand All @@ -47,35 +54,54 @@ public class SharedInformerFactory {
private ExecutorService informerExecutor;

private ApiClient apiClient;
private boolean reuseExistingCachedInformer;

/** Constructor w/ default thread pool. */
/** DEPRECATE: In favor of explicit apiClient constructor to avoid misguiding */
@Deprecated
public SharedInformerFactory() {
this(Configuration.getDefaultApiClient().setReadTimeout(0), Executors.newCachedThreadPool());
this(Configuration.getDefaultApiClient().setReadTimeout(0), Executors.newCachedThreadPool(), false);
}

/** Constructor w/ api client specified and default thread pool. */
/**
* Constructor w/ api client specified, default thread pool and reuseExistingCachedInformer is false.
*/
public SharedInformerFactory(ApiClient apiClient) {
this(apiClient, Executors.newCachedThreadPool());
this(apiClient, false);
}

/**
* Constructor w/ api client specified and default thread pool
* @param apiClient specific api client
* @param reuseCache Determines whether to utilize an existing cached informer.
* If true, the method returns the first registered informer for multiple creations of the same apiClassType,
* else, each method calls results in the creation of a new informer,
* while only the first informer is stored in the cache.
*/
public SharedInformerFactory(ApiClient apiClient, boolean reuseCache) {
this(apiClient, Executors.newCachedThreadPool(), reuseCache);
}

/**
* Constructor w/ thread pool specified.
*
* @param threadPool specified thread pool
* DEPRECATE: In favor of explicit apiClient constructor to avoid misguiding.
*/
@Deprecated
public SharedInformerFactory(ExecutorService threadPool) {
this(Configuration.getDefaultApiClient().setReadTimeout(0), threadPool);
this(Configuration.getDefaultApiClient().setReadTimeout(0),threadPool, false);
}

/**
* Constructor w/ api client and thread pool specified.
*
* @param client specific api client
* @param threadPool specified thread pool
* @param reuseCache Determines whether to utilize an existing cached informer.
* If true, the method returns the first registered informer for multiple creations of the same apiClassType,
* else, each method calls results in the creation of a new informer,
* while only the first informer is stored in the cache.
*/
public SharedInformerFactory(ApiClient client, ExecutorService threadPool) {
public SharedInformerFactory(ApiClient client, ExecutorService threadPool, boolean reuseCache) {
if (client.getReadTimeout() != 0) {
throw new IllegalArgumentException("read timeout of ApiClient must be zero");
}
Expand All @@ -84,6 +110,7 @@ public SharedInformerFactory(ApiClient client, ExecutorService threadPool) {
informerExecutor = threadPool;
informers = new HashMap<>();
startedInformers = new HashMap<>();
reuseExistingCachedInformer = reuseCache;
}

/**
Expand All @@ -105,8 +132,7 @@ SharedIndexInformer<ApiType> sharedIndexInformerFor(
}

/**
* Constructs and returns a shared index informer w/ resync period specified. But the informer
* cache will not be overwritten i.e. only the first registered informer will be kept.
* Constructs and returns a shared index informer w/ resync period specified.
*
* @param <ApiType> the type parameter
* @param <ApiListType> the type parameter
Expand Down Expand Up @@ -151,9 +177,7 @@ SharedIndexInformer<ApiType> sharedIndexInformerFor(
}

/**
* Constructs and returns a shared index informer by specifying lister-watcher. But the informer
* cache will not be overwritten on multiple call w/ the the same apiTypeClass i.e. only the first
* registered informer will be kept.
* Constructs and returns a shared index informer by specifying lister-watcher.
*
* @param <ApiType> the type parameter
* @param <ApiListType> the type parameter
Expand All @@ -176,18 +200,22 @@ SharedIndexInformer<ApiType> sharedIndexInformerFor(
Class<ApiType> apiTypeClass,
long resyncPeriodInMillis,
BiConsumer<Class<ApiType>, Throwable> exceptionHandler) {
Type apiType = TypeToken.get(apiTypeClass).getType();

if(informers.containsKey(apiType) && reuseExistingCachedInformer) {
return informers.get(apiType);
}

SharedIndexInformer<ApiType> informer =
new DefaultSharedIndexInformer<>(
apiTypeClass, listerWatcher, resyncPeriodInMillis, new Cache<>(), exceptionHandler);
this.informers.putIfAbsent(TypeToken.get(apiTypeClass).getType(), informer);

this.informers.putIfAbsent(apiType, informer);
return informer;
}

/**
* Constructs and returns a shared index informer by specifying a generic api instance. But the
* informer cache will not be overwritten on multiple call w/ the the same apiTypeClass i.e. only
* the first registered informer will be kept.
* Constructs and returns a shared index informer by specifying a generic api instance.
*
* @param <ApiType> the type parameter
* @param <ApiListType> the type parameter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,7 @@
*/
package io.kubernetes.client.informer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1ListMeta;
Expand All @@ -28,15 +21,26 @@
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.util.CallGeneratorParams;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.generic.GenericKubernetesApi;
import io.kubernetes.client.util.generic.KubernetesApiResponse;
import io.kubernetes.client.util.generic.options.ListOptions;
import java.time.Duration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.io.IOException;
import java.time.Duration;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class SharedInformerFactoryTest {

Expand Down Expand Up @@ -96,4 +100,19 @@ void namespaceScopedNewInformerUsingGenericApi() {
await().timeout(Duration.ofSeconds(2)).until(podInformer::hasSynced);
verify(genericKubernetesApi, atLeastOnce()).list(eq("default"), any(ListOptions.class));
}

@Test
void createInformerMutipleTimesUseCache() throws IOException {
ApiClient client = Config.defaultClient();
SharedInformerFactory factory = new SharedInformerFactory(client, true);
SharedInformer<V1Pod> podInformer = null;
for (int i = 0; i < 10; i++) {
podInformer =
factory.sharedIndexInformerFor(genericKubernetesApi, V1Pod.class, 0, "default");
}
SharedInformer<V1Pod> cachedInformer = factory.getExistingSharedIndexInformer(V1Pod.class);
assertThat(cachedInformer).isNotNull();
assertThat(cachedInformer == podInformer).isTrue();
}

}

0 comments on commit f69b343

Please sign in to comment.