Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable the reuse of cached informers on creation by SharedInformerFactory. #3856

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,24 @@
import io.kubernetes.client.util.Watchable;
import io.kubernetes.client.util.generic.GenericKubernetesApi;
import io.kubernetes.client.util.generic.options.ListOptions;
import okhttp3.Call;
import org.apache.commons.collections4.MapUtils;

import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import okhttp3.Call;
import org.apache.commons.collections4.MapUtils;

/** SharedInformerFactory class constructs and caches informers for api types. */
/**
* SharedInformerFactory class constructs and caches informers for api types.
* For each api type, only the first created informer is stored in the cache.
* If reuseExistingCachedInformer is set to true, It returns the cached informer when creating
* an informer for the same api type.
* Otherwise, this factory creates a new informer on each invocation.
*/
public class SharedInformerFactory {

protected Map<Type, SharedIndexInformer> informers;
Expand All @@ -47,35 +54,54 @@ public class SharedInformerFactory {
private ExecutorService informerExecutor;

private ApiClient apiClient;
private boolean reuseExistingCachedInformer;

/** Constructor w/ default thread pool. */
/** DEPRECATE: In favor of explicit apiClient constructor to avoid misguiding */
@Deprecated
public SharedInformerFactory() {
this(Configuration.getDefaultApiClient().setReadTimeout(0), Executors.newCachedThreadPool());
this(Configuration.getDefaultApiClient().setReadTimeout(0), Executors.newCachedThreadPool(), false);
Sud0x67 marked this conversation as resolved.
Show resolved Hide resolved
}

/** Constructor w/ api client specified and default thread pool. */
/**
* Constructor w/ api client specified, default thread pool and reuseExistingCachedInformer is false.
*/
public SharedInformerFactory(ApiClient apiClient) {
this(apiClient, Executors.newCachedThreadPool());
this(apiClient, false);
}

/**
* Constructor w/ api client specified and default thread pool
* @param apiClient specific api client
* @param reuseCache Determines whether to utilize an existing cached informer.
* If true, the method returns the first registered informer for multiple creations of the same apiClassType,
* else, each method calls results in the creation of a new informer,
* while only the first informer is stored in the cache.
*/
public SharedInformerFactory(ApiClient apiClient, boolean reuseCache) {
this(apiClient, Executors.newCachedThreadPool(), reuseCache);
}

/**
* Constructor w/ thread pool specified.
*
* @param threadPool specified thread pool
* DEPRECATE: In favor of explicit apiClient constructor to avoid misguiding.
*/
@Deprecated
public SharedInformerFactory(ExecutorService threadPool) {
this(Configuration.getDefaultApiClient().setReadTimeout(0), threadPool);
this(Configuration.getDefaultApiClient().setReadTimeout(0),threadPool, false);
}

/**
* Constructor w/ api client and thread pool specified.
*
* @param client specific api client
* @param threadPool specified thread pool
* @param reuseCache Determines whether to utilize an existing cached informer.
* If true, the method returns the first registered informer for multiple creations of the same apiClassType,
* else, each method calls results in the creation of a new informer,
* while only the first informer is stored in the cache.
*/
public SharedInformerFactory(ApiClient client, ExecutorService threadPool) {
public SharedInformerFactory(ApiClient client, ExecutorService threadPool, boolean reuseCache) {
if (client.getReadTimeout() != 0) {
throw new IllegalArgumentException("read timeout of ApiClient must be zero");
}
Expand All @@ -84,6 +110,7 @@ public SharedInformerFactory(ApiClient client, ExecutorService threadPool) {
informerExecutor = threadPool;
informers = new HashMap<>();
startedInformers = new HashMap<>();
reuseExistingCachedInformer = reuseCache;
}

/**
Expand All @@ -105,8 +132,7 @@ SharedIndexInformer<ApiType> sharedIndexInformerFor(
}

/**
* Constructs and returns a shared index informer w/ resync period specified. But the informer
* cache will not be overwritten i.e. only the first registered informer will be kept.
* Constructs and returns a shared index informer w/ resync period specified.
*
* @param <ApiType> the type parameter
* @param <ApiListType> the type parameter
Expand Down Expand Up @@ -151,9 +177,7 @@ SharedIndexInformer<ApiType> sharedIndexInformerFor(
}

/**
* Constructs and returns a shared index informer by specifying lister-watcher. But the informer
* cache will not be overwritten on multiple call w/ the the same apiTypeClass i.e. only the first
* registered informer will be kept.
* Constructs and returns a shared index informer by specifying lister-watcher.
*
* @param <ApiType> the type parameter
* @param <ApiListType> the type parameter
Expand All @@ -176,18 +200,22 @@ SharedIndexInformer<ApiType> sharedIndexInformerFor(
Class<ApiType> apiTypeClass,
long resyncPeriodInMillis,
BiConsumer<Class<ApiType>, Throwable> exceptionHandler) {
Type apiType = TypeToken.get(apiTypeClass).getType();

if(informers.containsKey(apiType) && reuseExistingCachedInformer) {
return informers.get(apiType);
}

SharedIndexInformer<ApiType> informer =
new DefaultSharedIndexInformer<>(
apiTypeClass, listerWatcher, resyncPeriodInMillis, new Cache<>(), exceptionHandler);
this.informers.putIfAbsent(TypeToken.get(apiTypeClass).getType(), informer);

this.informers.putIfAbsent(apiType, informer);
return informer;
}

/**
* Constructs and returns a shared index informer by specifying a generic api instance. But the
* informer cache will not be overwritten on multiple call w/ the the same apiTypeClass i.e. only
* the first registered informer will be kept.
* Constructs and returns a shared index informer by specifying a generic api instance.
*
* @param <ApiType> the type parameter
* @param <ApiListType> the type parameter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,7 @@
*/
package io.kubernetes.client.informer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1ListMeta;
Expand All @@ -28,15 +21,26 @@
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.util.CallGeneratorParams;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.generic.GenericKubernetesApi;
import io.kubernetes.client.util.generic.KubernetesApiResponse;
import io.kubernetes.client.util.generic.options.ListOptions;
import java.time.Duration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.io.IOException;
import java.time.Duration;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class SharedInformerFactoryTest {

Expand Down Expand Up @@ -96,4 +100,19 @@ void namespaceScopedNewInformerUsingGenericApi() {
await().timeout(Duration.ofSeconds(2)).until(podInformer::hasSynced);
verify(genericKubernetesApi, atLeastOnce()).list(eq("default"), any(ListOptions.class));
}

@Test
void createInformerMutipleTimesUseCache() throws IOException {
ApiClient client = Config.defaultClient();
SharedInformerFactory factory = new SharedInformerFactory(client, true);
SharedInformer<V1Pod> podInformer = null;
for (int i = 0; i < 10; i++) {
podInformer =
factory.sharedIndexInformerFor(genericKubernetesApi, V1Pod.class, 0, "default");
}
SharedInformer<V1Pod> cachedInformer = factory.getExistingSharedIndexInformer(V1Pod.class);
assertThat(cachedInformer).isNotNull();
assertThat(cachedInformer == podInformer).isTrue();
}

}
Loading