diff --git a/commons/src/main/java/org/soluvas/commons/tenant/TenantBeans.java b/commons/src/main/java/org/soluvas/commons/tenant/TenantBeans.java index 65710bc34..ecabe1033 100644 --- a/commons/src/main/java/org/soluvas/commons/tenant/TenantBeans.java +++ b/commons/src/main/java/org/soluvas/commons/tenant/TenantBeans.java @@ -2,9 +2,14 @@ import java.lang.reflect.Method; import java.util.Collections; -import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import javax.annotation.Nullable; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.inject.Inject; @@ -19,11 +24,17 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; +import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.eventbus.EventBus; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; /** * Manages the lifecycle of tenant-scoped beans, using the same implementation class for all tenants. @@ -47,7 +58,8 @@ public abstract class TenantBeans implements TenantRepositoryListener { /** * Stores the currently managed tenant beans. */ - private final Map beanMap = new LinkedHashMap<>(); +// private final Map beanMap = new LinkedHashMap<>(); + private final Map beanMap = new ConcurrentHashMap<>(); private Method initMethod; private Method destroyMethod; private final Class implClass; @@ -58,6 +70,15 @@ public abstract class TenantBeans implements TenantRepositoryListener { private EventBus appEventBus; @Autowired(required=false) private TenantRepository tenantRepo; + /** + * Ticket: https://github.com/soluvas/soluvas-framework/issues/66 + * + * Blocked by SPR-8471 + * + *

Perhaps wait for Spring 4? + */ +// @Inject @Cpu + private final ListeningExecutorService cpuExecutor = MoreExecutors.sameThreadExecutor(); /** * @param implClass Must be the implementation class, because {@code init()} and {@code destroy()} @@ -86,12 +107,29 @@ public TenantBeans(Class implClass) { @PostConstruct public void init() { - ImmutableMap initialTenantMap = tenantConfig.tenantMap(); + final ImmutableMap initialTenantMap = tenantConfig.tenantMap(); log.info("Initializing {} {} tenant beans: {}", initialTenantMap.size(), implClass.getSimpleName(), initialTenantMap.keySet()); - for (final Map.Entry tenant : initialTenantMap.entrySet()) { - final String tenantId = tenant.getKey(); - createAndPut(tenantId, tenant.getValue()); +// for (final Map.Entry tenant : initialTenantMap.entrySet()) { +// final String tenantId = tenant.getKey(); +// createAndPut(tenantId, tenant.getValue()); +// } + final ImmutableList tasks = FluentIterable.from(initialTenantMap.entrySet()) + .transform(new Function, CreateAndPut>() { + @Override @Nullable + public TenantBeans.CreateAndPut apply( + @Nullable Entry input) { + return new CreateAndPut(input.getKey(), input.getValue()); + } + }).toList(); + try { + final List> futures = (List) cpuExecutor.invokeAll(tasks); + final List initializedTenantIds = Futures.allAsList(futures).get(); + log.info("Initialized {} {} tenant beans: {}", + initializedTenantIds.size(), implClass.getSimpleName(), initializedTenantIds); + } catch (InterruptedException | ExecutionException e) { + throw new TenantException(e, "Cannot initialize %s %s tenant beans: %s", + initialTenantMap.size(), implClass.getSimpleName(), initialTenantMap.keySet()); } if (tenantRepo != null) { @@ -100,53 +138,137 @@ public void init() { } @PreDestroy - public synchronized void destroy() { - final ImmutableList tenantIdsRev = ImmutableList.copyOf(beanMap.keySet()).reverse(); - log.info("Shutting down {} {} beans in reverse order: {}", - beanMap.size(), implClass.getSimpleName(), tenantIdsRev); - for (String tenantId : tenantIdsRev) { - removeAndDestroy(tenantId); + public void destroy() { + final ImmutableList tenantIdsToRemove = ImmutableList.copyOf(beanMap.keySet()); + log.info("Shutting down {} {} beans: {}", + beanMap.size(), implClass.getSimpleName(), tenantIdsToRemove); + + final ImmutableList tasks = FluentIterable.from(tenantIdsToRemove) + .transform(new Function() { + @Override @Nullable + public RemoveAndDestroy apply(String input) { + return new RemoveAndDestroy(input); + } + }).toList(); + try { + final List> futures = (List) cpuExecutor.invokeAll(tasks); + final List initializedTenantIds = Futures.allAsList(futures).get(); + log.info("Shutted down {} {} tenant beans: {}", + initializedTenantIds.size(), implClass.getSimpleName(), initializedTenantIds); + } catch (InterruptedException | ExecutionException e) { + throw new TenantException(e, "Cannot shutdown %s %s tenant beans: %s", + tenantIdsToRemove.size(), implClass.getSimpleName(), tenantIdsToRemove); } + +// for (String tenantId : tenantIdsRev) { +// removeAndDestroy(tenantId); +// } } - protected synchronized final void createAndPut(String tenantId, AppManifest appManifest) { - try { - log.debug("Initializing '{}' {} {}", - tenantId, implClass.getSimpleName(), initMethod != null ? "using init method " + initMethod : "without init method"); - final T bean = create(tenantId, appManifest); - if (initMethod != null) { - initMethod.invoke(bean); + protected class CreateAndPut implements Callable { + + private final String tenantId; + private final AppManifest appManifest; + + public CreateAndPut(String tenantId, AppManifest appManifest) { + super(); + this.tenantId = tenantId; + this.appManifest = appManifest; + } + + @Override + public String call() throws Exception { + try { + log.debug("Initializing '{}' {} {}", + tenantId, implClass.getSimpleName(), initMethod != null ? "using init method " + initMethod : "without init method"); + final T bean = create(tenantId, appManifest); + if (initMethod != null) { + initMethod.invoke(bean); + } + beanMap.put(tenantId, bean); + onCreated(tenantId, appManifest, bean); + return tenantId; + } catch (Exception e) { + throw new TenantException("Cannot initialize " + implClass.getSimpleName() + " bean for '" + tenantId + "': " + e, e); } - beanMap.put(tenantId, bean); - onCreated(tenantId, appManifest, bean); - } catch (Exception e) { - throw new CommonsException("Cannot initialize " + implClass.getSimpleName() + " bean for '" + tenantId + "': " + e, e); } + } - protected synchronized final void removeAndDestroy(String tenantId) { - try { - log.info("Shutting down '{}' {} {}", - tenantId, implClass.getSimpleName(), destroyMethod != null ? "using destroymethod " + destroyMethod : "without destroy method"); - final T bean = beanMap.get(tenantId); - if (bean != null) { - onDestroying(tenantId, bean); - beanMap.remove(tenantId); - if (destroyMethod != null) { - destroyMethod.invoke(bean); + protected class RemoveAndDestroy implements Callable { + + private final String tenantId; + + public RemoveAndDestroy(String tenantId) { + super(); + this.tenantId = tenantId; + } + + @Override + public String call() throws Exception { + try { + log.info("Shutting down '{}' {} {}", + tenantId, implClass.getSimpleName(), destroyMethod != null ? "using destroymethod " + destroyMethod : "without destroy method"); + final T bean = beanMap.get(tenantId); + if (bean != null) { + onDestroying(tenantId, bean); + beanMap.remove(tenantId); + if (destroyMethod != null) { + destroyMethod.invoke(bean); + } + } else { + log.warn("Not removing non-existing {} bean for tenant '{}'", + implClass.getSimpleName(), tenantId); } - } else { - log.warn("Not removing non-existing {} bean for tenant '{}'", - implClass.getSimpleName(), tenantId); + return tenantId; + } catch (Exception e) { + throw new TenantException("Cannot destroy " + implClass.getSimpleName() + " bean for '" + tenantId + "'", e); } - } catch (Exception e) { - throw new CommonsException("Cannot destroy " + implClass.getSimpleName() + " bean for '" + tenantId + "'", e); } + } +// protected synchronized final void createAndPut(String tenantId, AppManifest appManifest) { +// try { +// log.debug("Initializing '{}' {} {}", +// tenantId, implClass.getSimpleName(), initMethod != null ? "using init method " + initMethod : "without init method"); +// final T bean = create(tenantId, appManifest); +// if (initMethod != null) { +// initMethod.invoke(bean); +// } +// beanMap.put(tenantId, bean); +// onCreated(tenantId, appManifest, bean); +// } catch (Exception e) { +// throw new CommonsException("Cannot initialize " + implClass.getSimpleName() + " bean for '" + tenantId + "': " + e, e); +// } +// } + +// protected synchronized final void removeAndDestroy(String tenantId) { +// try { +// log.info("Shutting down '{}' {} {}", +// tenantId, implClass.getSimpleName(), destroyMethod != null ? "using destroymethod " + destroyMethod : "without destroy method"); +// final T bean = beanMap.get(tenantId); +// if (bean != null) { +// onDestroying(tenantId, bean); +// beanMap.remove(tenantId); +// if (destroyMethod != null) { +// destroyMethod.invoke(bean); +// } +// } else { +// log.warn("Not removing non-existing {} bean for tenant '{}'", +// implClass.getSimpleName(), tenantId); +// } +// } catch (Exception e) { +// throw new TenantException("Cannot destroy " + implClass.getSimpleName() + " bean for '" + tenantId + "'", e); +// } +// } + /** * Creates the bean for the specified tenant, do not call {@code init()}, * {@link TenantBeans} will call it. + * + *

Thread safety: This method must be thread safe! + * * @param tenantId * @param appManifest * @return @@ -212,20 +334,62 @@ public T get(String tenantId) { @Override public void onTenantsStarting(TenantsStarting starting) throws Exception { - for (final Map.Entry entry : starting.getAddeds().entrySet()) { - final String tenantId = entry.getKey(); - final AppManifest appManifest = entry.getValue(); - createAndPut(tenantId, appManifest); +// for (final Map.Entry entry : starting.getAddeds().entrySet()) { +// final String tenantId = entry.getKey(); +// final AppManifest appManifest = entry.getValue(); +// createAndPut(tenantId, appManifest); +// } + + log.info("Starting {} {} tenant beans: {}", + starting.getAddeds().size(), implClass.getSimpleName(), starting.getAddeds().keySet()); + final ImmutableList tasks = FluentIterable.from(starting.getAddeds().entrySet()) + .transform(new Function, CreateAndPut>() { + @Override @Nullable + public TenantBeans.CreateAndPut apply( + @Nullable Entry input) { + return new CreateAndPut(input.getKey(), input.getValue()); + } + }).toList(); + try { + final List> futures = (List) cpuExecutor.invokeAll(tasks); + final List initializedTenantIds = Futures.allAsList(futures).get(); + log.info("Started {} {} tenant beans: {}", + initializedTenantIds.size(), implClass.getSimpleName(), initializedTenantIds); + } catch (InterruptedException | ExecutionException e) { + throw new TenantException(e, "Cannot start %s %s tenant beans: %s", + starting.getAddeds().size(), implClass.getSimpleName(), starting.getAddeds().keySet()); } } @Override public void onTenantsStopping(TenantsStopping stopping) throws Exception { - for (final Map.Entry entry : stopping.getTenants().entrySet()) { - final String tenantId = entry.getKey(); - final AppManifest appManifest = entry.getValue(); - removeAndDestroy(tenantId); +// for (final Map.Entry entry : stopping.getTenants().entrySet()) { +// final String tenantId = entry.getKey(); +// final AppManifest appManifest = entry.getValue(); +// removeAndDestroy(tenantId); +// } + + final ImmutableList tenantIdsToRemove = ImmutableList.copyOf(stopping.getTenants().keySet()); + log.info("Stopping {} {} beans: {}", + beanMap.size(), implClass.getSimpleName(), tenantIdsToRemove); + + final ImmutableList tasks = FluentIterable.from(tenantIdsToRemove) + .transform(new Function() { + @Override @Nullable + public RemoveAndDestroy apply(String input) { + return new RemoveAndDestroy(input); + } + }).toList(); + try { + final List> futures = (List) cpuExecutor.invokeAll(tasks); + final List initializedTenantIds = Futures.allAsList(futures).get(); + log.info("Stopped {} {} tenant beans: {}", + initializedTenantIds.size(), implClass.getSimpleName(), initializedTenantIds); + } catch (InterruptedException | ExecutionException e) { + throw new TenantException(e, "Cannot stop %s %s tenant beans: %s", + tenantIdsToRemove.size(), implClass.getSimpleName(), tenantIdsToRemove); } + } } diff --git a/commons/src/main/java/org/soluvas/commons/tenant/TenantUtils.java b/commons/src/main/java/org/soluvas/commons/tenant/TenantUtils.java index b5bdbd3d0..3520a3fef 100644 --- a/commons/src/main/java/org/soluvas/commons/tenant/TenantUtils.java +++ b/commons/src/main/java/org/soluvas/commons/tenant/TenantUtils.java @@ -152,6 +152,10 @@ public static T selectBean(TenantSelector tenantSelector, Map map * @return */ public static T selectBean(String tenantId, Map map, Class clazz) { + // ImmutableMap is important to make it threadsafe +// return Preconditions.checkNotNull(ImmutableMap.copyOf(map).get(tenantId), +// "No %s for tenant '%s'. %s available: %s", +// clazz.getSimpleName(), tenantId, map.size(), map.keySet()); return Preconditions.checkNotNull(map.get(tenantId), "No %s for tenant '%s'. %s available: %s", clazz.getSimpleName(), tenantId, map.size(), map.keySet());