diff --git a/src/main/java/emissary/core/sentinel/Sentinel.java b/src/main/java/emissary/core/sentinel/Sentinel.java index 10cd9d8704..f2f1fd1a2d 100644 --- a/src/main/java/emissary/core/sentinel/Sentinel.java +++ b/src/main/java/emissary/core/sentinel/Sentinel.java @@ -2,14 +2,10 @@ import emissary.config.ConfigUtil; import emissary.config.Configurator; -import emissary.core.Factory; import emissary.core.IMobileAgent; import emissary.core.Namespace; import emissary.core.NamespaceException; -import emissary.core.sentinel.rules.Notify; -import emissary.core.sentinel.rules.Rule; -import emissary.directory.DirectoryPlace; -import emissary.directory.KeyManipulator; +import emissary.core.sentinel.protocols.Protocol; import emissary.pool.MobileAgentFactory; import org.apache.commons.lang3.StringUtils; @@ -17,6 +13,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.lang.invoke.MethodHandles; import java.util.List; import java.util.Map; import java.util.Objects; @@ -30,20 +27,14 @@ */ public class Sentinel implements Runnable { - protected static final Logger logger = LoggerFactory.getLogger(Sentinel.class); + protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final String DEFAULT_NAMESPACE_NAME = "Sentinel"; - protected static final String DEFAULT_RULE = "DEFAULT"; - - // key: place, value: rule (time limits, thresholds) - final Map rules = new ConcurrentHashMap<>(); - // key: agent name, value: how long Sentinel has observed the mobile agent final Map trackers = new ConcurrentHashMap<>(); - // key: place simple name, value: number of agents in place - final Map placeAgentCounts = new ConcurrentHashMap<>(); + final Map protocols = new ConcurrentHashMap<>(); Configurator config; @@ -112,7 +103,7 @@ public void run() { @Override public String toString() { - return "Watching agents with " + rules.values(); + return "Watching agents with " + protocols.values(); } /** @@ -135,35 +126,23 @@ protected void init() { if (this.enabled) { this.pollingInterval = config.findIntEntry("POLLING_INTERVAL_MINUTES", 5); - logger.trace("Loading rules..."); - for (String ruleId : config.findEntries("RULE_ID")) { + logger.trace("Sentinel protocols initializing..."); + for (Map.Entry proto : config.findStringMatchMap("PROTOCOL_", true, true).entrySet()) { try { - validate(ruleId); - Map map = config.findStringMatchMap(ruleId + "_"); - String rule = map.getOrDefault("RULE", Notify.class.getName()); - Rule ruleImpl = (Rule) Factory.create(rule, ruleId, map.get("TIME_LIMIT_MINUTES"), map.get("THRESHOLD")); - logger.info("Sentinel loaded {}", ruleImpl); - this.rules.put(ruleId, ruleImpl); + String protocolId = proto.getKey(); + String config = proto.getValue(); + Protocol protocol = new Protocol(config); + logger.info("Sentinel initiated {}", protocol); + if (protocol.isEnabled()) { + this.protocols.put(protocolId, protocol); + } } catch (Exception e) { - logger.warn("Unable to configure Sentinel for {}: {}", ruleId, e.getMessage()); + logger.warn("Unable to configure Sentinel Protocol {}: {}", proto, e.getMessage()); } } - - // if no rules, create a default one - if (!this.rules.containsKey(DEFAULT_RULE)) { - logger.warn("Default rule not found, creating one..."); - this.rules.put(DEFAULT_RULE, new Notify(DEFAULT_RULE, 60L, 1.0)); - } - } - } - - protected void validate(String place) throws NamespaceException { - // validate that the place exists - if (!DEFAULT_RULE.equalsIgnoreCase(place)) { - DirectoryPlace directoryPlace = Namespace.lookup(DirectoryPlace.class).iterator().next(); - if (directoryPlace.getEntries().stream() - .noneMatch(entry -> place.equalsIgnoreCase(KeyManipulator.getServiceClassname(entry.getFullKey())))) { - throw new IllegalStateException("Place not found in the directory"); + if (this.protocols.isEmpty()) { + this.enabled = false; + logger.warn("Sentinel initializing failed: no protocols found"); } } } @@ -174,13 +153,14 @@ protected void validate(String place) throws NamespaceException { * @throws NamespaceException if there is a problem looking up resources in the {@link Namespace} */ protected void watch() throws NamespaceException { - placeAgentCounts.clear(); - List agentKeys = - Namespace.keySet().stream().filter(k -> k.startsWith(MobileAgentFactory.AGENT_NAME)).sorted().collect(Collectors.toList()); + List agentKeys = Namespace.keySet().stream() + .filter(k -> k.startsWith(MobileAgentFactory.AGENT_NAME)) + .sorted() + .collect(Collectors.toList()); for (String agentKey : agentKeys) { watch(agentKey); } - check(); + protocols.values().forEach(protocol -> protocol.run(trackers)); } /** @@ -201,8 +181,6 @@ protected void watch(String agentKey) throws NamespaceException { trackedAgent.resetTimer(); } trackedAgent.incrementTimer(pollingInterval); - String placeSimpleName = getPlaceSimpleName(mobileAgent.getLastPlaceProcessed()); - placeAgentCounts.put(placeSimpleName, placeAgentCounts.getOrDefault(placeSimpleName, 0) + 1); logger.debug("Agent acquired {}", trackedAgent); } else { trackedAgent.clear(); @@ -210,24 +188,6 @@ protected void watch(String agentKey) throws NamespaceException { } } - /** - * Run the configured rules over the watched mobile-agents - * - * @throws NamespaceException if there is a problem looking up resources in the {@link Namespace} - */ - protected void check() throws NamespaceException { - logger.debug("Checking agents {}", placeAgentCounts); - for (Map.Entry item : placeAgentCounts.entrySet()) { - Rule rule = rules.getOrDefault(item.getKey(), rules.get(DEFAULT_RULE)); - logger.trace("Found {} for {}", rule, item.getKey()); - rule.run(trackers, item.getKey(), item.getValue()); - } - } - - protected static String getPlaceSimpleName(String lastPlaceProcessed) { - return StringUtils.substringAfterLast(lastPlaceProcessed, "/"); - } - public static class Tracker { private final String agentName; private String agentId; @@ -272,7 +232,7 @@ public void setPlaceName(String placeName) { } public String getPlaceSimpleName() { - return Sentinel.getPlaceSimpleName(this.placeName); + return Protocol.getPlaceSimpleName(this.placeName); } public long getTimer() { diff --git a/src/main/java/emissary/core/sentinel/protocols/Protocol.java b/src/main/java/emissary/core/sentinel/protocols/Protocol.java new file mode 100644 index 0000000000..04250cdd97 --- /dev/null +++ b/src/main/java/emissary/core/sentinel/protocols/Protocol.java @@ -0,0 +1,147 @@ +package emissary.core.sentinel.protocols; + +import emissary.config.ConfigUtil; +import emissary.config.Configurator; +import emissary.core.Factory; +import emissary.core.Namespace; +import emissary.core.NamespaceException; +import emissary.core.sentinel.Sentinel; +import emissary.core.sentinel.protocols.actions.Action; +import emissary.core.sentinel.protocols.actions.Notify; +import emissary.core.sentinel.protocols.rules.AllMaxTime; +import emissary.core.sentinel.protocols.rules.Rule; +import emissary.directory.DirectoryPlace; +import emissary.directory.KeyManipulator; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.Collections; +import java.util.Map; +import java.util.StringJoiner; +import java.util.concurrent.ConcurrentHashMap; + +public class Protocol { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + protected static final String DEFAULT_RULE = "DEFAULT"; + private Configurator config; + private boolean enabled = false; + private Action action; + private final Map rules = new ConcurrentHashMap<>(); // key: place, value: rule + private final Map placeAgentCounts = new ConcurrentHashMap<>(); // key: place, value: number of agents in place + + public Protocol(String conf) { + configure(conf); + } + + public boolean isEnabled() { + return enabled; + } + + public Map getRules() { + return Collections.unmodifiableMap(rules); + } + + public Map getPlaceAgentCounts() { + return Collections.unmodifiableMap(placeAgentCounts); + } + + public Action getAction() { + return action; + } + + /** + * Run the configured rules over the watched mobile-agents + * + * @throws NamespaceException if there is a problem looking up resources in the {@link Namespace} + */ + public void run(Map trackers) { + placeAgentCounts.clear(); + for (Sentinel.Tracker tracker : trackers.values()) { + String placeKey = getPlaceKey(tracker); + if (StringUtils.isNotBlank(placeKey)) { + placeAgentCounts.put(placeKey, placeAgentCounts.getOrDefault(placeKey, 0) + 1); + } + } + logger.debug("Checking agents {}", placeAgentCounts); + for (Map.Entry item : placeAgentCounts.entrySet()) { + Rule rule = rules.getOrDefault(item.getKey(), rules.get(DEFAULT_RULE)); + logger.trace("Found {} for {}", rule, item.getKey()); + check(trackers, item.getKey(), item.getValue()); + } + } + + public String getPlaceKey(Sentinel.Tracker tracker) { + return getPlaceSimpleName(tracker.getPlaceName()); + } + + public static String getPlaceSimpleName(String place) { + return StringUtils.substringAfterLast(place, "/"); + } + + protected void configure(String conf) { + try { + this.config = ConfigUtil.getConfigInfo(conf); + init(); + } catch (IOException e) { + logger.warn("Cannot read " + conf + ", skipping"); + } + } + + protected void init() { + this.enabled = config.findBooleanEntry("ENABLED", false); + if (enabled) { + logger.trace("Loading rules..."); + for (String ruleId : config.findEntries("RULE_ID")) { + try { + validate(ruleId); + Map map = config.findStringMatchMap(ruleId + "_"); + String rule = map.getOrDefault("RULE", AllMaxTime.class.getName()); + Rule ruleImpl = (Rule) Factory.create(rule, ruleId, map.get("TIME_LIMIT_MINUTES"), map.get("THRESHOLD")); + logger.debug("Sentinel loaded {}", ruleImpl); + this.rules.put(ruleId, ruleImpl); + } catch (Exception e) { + logger.warn("Unable to configure Sentinel for {}: {}", ruleId, e.getMessage()); + } + } + + // if no rules then disable protocol + if (this.rules.isEmpty()) { + this.enabled = false; + return; + } + + String action = config.findStringEntry("ACTION", Notify.class.getName()); + this.action = (Action) Factory.create(action); + } + } + + protected void validate(String place) throws NamespaceException { + // validate that the place exists + if (!DEFAULT_RULE.equalsIgnoreCase(place)) { + DirectoryPlace directoryPlace = Namespace.lookup(DirectoryPlace.class).iterator().next(); + if (directoryPlace.getEntries().stream() + .noneMatch(entry -> place.equalsIgnoreCase(KeyManipulator.getServiceClassname(entry.getFullKey())))) { + throw new IllegalStateException("Place not found in the directory"); + } + } + } + + protected void check(Map tracker, String placeSimpleName, Integer count) { + if (getRules().values().stream().allMatch(rule -> rule.condition(tracker, placeSimpleName, count))) { + getAction().trigger(tracker, placeSimpleName, count); + } + } + + @Override + public String toString() { + return new StringJoiner(", ", Protocol.class.getSimpleName() + "[", "]") + .add("rules=" + rules) + .add("action=" + action) + .toString(); + } +} diff --git a/src/main/java/emissary/core/sentinel/protocols/actions/Action.java b/src/main/java/emissary/core/sentinel/protocols/actions/Action.java new file mode 100644 index 0000000000..dabf1eaa63 --- /dev/null +++ b/src/main/java/emissary/core/sentinel/protocols/actions/Action.java @@ -0,0 +1,28 @@ +package emissary.core.sentinel.protocols.actions; + +import emissary.core.sentinel.Sentinel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.util.Map; + +public abstract class Action { + + protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * Try to terminate the JVM + * + * @param trackers the listing of agents, places, and filenames that's currently processing + * @param placeSimpleName the place name currently processing on one or more mobile agents + * @param count number of mobile agents stuck on the place + */ + public abstract void trigger(Map trackers, String placeSimpleName, Integer count); + + @Override + public String toString() { + return getClass().getName(); + } +} diff --git a/src/main/java/emissary/core/sentinel/rules/Exit.java b/src/main/java/emissary/core/sentinel/protocols/actions/Exit.java similarity index 58% rename from src/main/java/emissary/core/sentinel/rules/Exit.java rename to src/main/java/emissary/core/sentinel/protocols/actions/Exit.java index 9e7f6cbf5a..3c8897cd22 100644 --- a/src/main/java/emissary/core/sentinel/rules/Exit.java +++ b/src/main/java/emissary/core/sentinel/protocols/actions/Exit.java @@ -1,18 +1,10 @@ -package emissary.core.sentinel.rules; +package emissary.core.sentinel.protocols.actions; import emissary.core.sentinel.Sentinel; import java.util.Map; -public class Exit extends Rule { - - public Exit(String place, long timeLimit, double threshold) { - super(place, timeLimit, threshold); - } - - public Exit(String place, String timeLimit, String threshold) { - super(place, timeLimit, threshold); - } +public class Exit extends Action { /** * Try to terminate the JVM @@ -22,7 +14,7 @@ public Exit(String place, String timeLimit, String threshold) { * @param count number of mobile agents stuck on the place */ @Override - public void action(Map trackers, String placeSimpleName, Integer count) { + public void trigger(Map trackers, String placeSimpleName, Integer count) { logger.error("Sentinel detected {} unrecoverable agent(s) running [{}], exiting now!!", count, placeSimpleName); logger.debug("{}", trackers); System.exit(1); diff --git a/src/main/java/emissary/core/sentinel/rules/Kill.java b/src/main/java/emissary/core/sentinel/protocols/actions/Kill.java similarity index 61% rename from src/main/java/emissary/core/sentinel/rules/Kill.java rename to src/main/java/emissary/core/sentinel/protocols/actions/Kill.java index 2f82bd907c..a51e9dc4f7 100644 --- a/src/main/java/emissary/core/sentinel/rules/Kill.java +++ b/src/main/java/emissary/core/sentinel/protocols/actions/Kill.java @@ -1,18 +1,11 @@ -package emissary.core.sentinel.rules; +package emissary.core.sentinel.protocols.actions; import emissary.core.sentinel.Sentinel; import emissary.server.EmissaryServer; import java.util.Map; -public class Kill extends Rule { - public Kill(String place, long timeLimit, double threshold) { - super(place, timeLimit, threshold); - } - - public Kill(String place, String timeLimit, String threshold) { - super(place, timeLimit, threshold); - } +public class Kill extends Action { /** * Force a shutdown of the system @@ -22,7 +15,7 @@ public Kill(String place, String timeLimit, String threshold) { * @param count number of mobile agents stuck on the place */ @Override - public void action(Map trackers, String placeSimpleName, Integer count) { + public void trigger(Map trackers, String placeSimpleName, Integer count) { logger.error("Sentinel detected {} unrecoverable agent(s) running [{}], initiating forceful shutdown...", count, placeSimpleName); logger.debug("{}", trackers); EmissaryServer.stopServerForce(); diff --git a/src/main/java/emissary/core/sentinel/rules/Notify.java b/src/main/java/emissary/core/sentinel/protocols/actions/Notify.java similarity index 56% rename from src/main/java/emissary/core/sentinel/rules/Notify.java rename to src/main/java/emissary/core/sentinel/protocols/actions/Notify.java index 56b7ff97b6..3b968330dc 100644 --- a/src/main/java/emissary/core/sentinel/rules/Notify.java +++ b/src/main/java/emissary/core/sentinel/protocols/actions/Notify.java @@ -1,17 +1,10 @@ -package emissary.core.sentinel.rules; +package emissary.core.sentinel.protocols.actions; import emissary.core.sentinel.Sentinel; import java.util.Map; -public class Notify extends Rule { - public Notify(String place, long timeLimit, double threshold) { - super(place, timeLimit, threshold); - } - - public Notify(String place, String timeLimit, String threshold) { - super(place, timeLimit, threshold); - } +public class Notify extends Action { /** * Log the problem agents/threads @@ -21,7 +14,7 @@ public Notify(String place, String timeLimit, String threshold) { * @param count number of mobile agents stuck on the place */ @Override - public void action(Map trackers, String placeSimpleName, Integer count) { + public void trigger(Map trackers, String placeSimpleName, Integer count) { logger.warn("Sentinel detected {} locked agent(s) running [{}]", count, placeSimpleName); logger.debug("{}", trackers); } diff --git a/src/main/java/emissary/core/sentinel/rules/Recover.java b/src/main/java/emissary/core/sentinel/protocols/actions/Recover.java similarity index 76% rename from src/main/java/emissary/core/sentinel/rules/Recover.java rename to src/main/java/emissary/core/sentinel/protocols/actions/Recover.java index 91d32becd0..d44475caf4 100644 --- a/src/main/java/emissary/core/sentinel/rules/Recover.java +++ b/src/main/java/emissary/core/sentinel/protocols/actions/Recover.java @@ -1,4 +1,4 @@ -package emissary.core.sentinel.rules; +package emissary.core.sentinel.protocols.actions; import emissary.core.IMobileAgent; import emissary.core.Namespace; @@ -8,14 +8,7 @@ import java.util.Map; import java.util.stream.Collectors; -public class Recover extends Rule { - public Recover(String place, long timeLimit, double threshold) { - super(place, timeLimit, threshold); - } - - public Recover(String place, String timeLimit, String threshold) { - super(place, timeLimit, threshold); - } +public class Recover extends Action { /** * Attempts to recover the mobile agents by interrupting the thread @@ -25,7 +18,7 @@ public Recover(String place, String timeLimit, String threshold) { * @param counter number of mobile agents stuck on the place */ @Override - public void action(Map tracker, String placeSimpleName, Integer counter) { + public void trigger(Map tracker, String placeSimpleName, Integer counter) { logger.warn("Sentinel detected {} locked agent(s) running [{}], attempting recovery...", counter, placeSimpleName); List agentNames = tracker.values().stream() .filter(t -> t.getPlaceSimpleName().equalsIgnoreCase(placeSimpleName)) diff --git a/src/main/java/emissary/core/sentinel/rules/Stop.java b/src/main/java/emissary/core/sentinel/protocols/actions/Stop.java similarity index 61% rename from src/main/java/emissary/core/sentinel/rules/Stop.java rename to src/main/java/emissary/core/sentinel/protocols/actions/Stop.java index 3f3c6df25c..0c6c3d7caf 100644 --- a/src/main/java/emissary/core/sentinel/rules/Stop.java +++ b/src/main/java/emissary/core/sentinel/protocols/actions/Stop.java @@ -1,18 +1,11 @@ -package emissary.core.sentinel.rules; +package emissary.core.sentinel.protocols.actions; import emissary.core.sentinel.Sentinel; import emissary.server.EmissaryServer; import java.util.Map; -public class Stop extends Rule { - public Stop(String place, long timeLimit, double threshold) { - super(place, timeLimit, threshold); - } - - public Stop(String place, String timeLimit, String threshold) { - super(place, timeLimit, threshold); - } +public class Stop extends Action { /** * Attempt a graceful shutdown of the system @@ -22,7 +15,7 @@ public Stop(String place, String timeLimit, String threshold) { * @param count number of mobile agents stuck on the place */ @Override - public void action(Map trackers, String placeSimpleName, Integer count) { + public void trigger(Map trackers, String placeSimpleName, Integer count) { logger.error("Sentinel detected {} unrecoverable agent(s) running [{}], initiating graceful shutdown...", count, placeSimpleName); logger.debug("{}", trackers); EmissaryServer.stopServer(); diff --git a/src/main/java/emissary/core/sentinel/protocols/rules/AllMaxTime.java b/src/main/java/emissary/core/sentinel/protocols/rules/AllMaxTime.java new file mode 100644 index 0000000000..53f41b636c --- /dev/null +++ b/src/main/java/emissary/core/sentinel/protocols/rules/AllMaxTime.java @@ -0,0 +1,33 @@ +package emissary.core.sentinel.protocols.rules; + +import emissary.core.sentinel.Sentinel; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.util.Map; + +/** + * Looks at the place that has been running for the least amount of time. + */ +public class AllMaxTime extends Rule { + + protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public AllMaxTime(String place, long timeLimit, double threshold) { + super(place, timeLimit, threshold); + } + + public AllMaxTime(String place, String timeLimit, String threshold) { + super(place, timeLimit, threshold); + } + + protected boolean overTimeLimit(Map trackers, String placeSimpleName) { + return trackers.values().stream() + .filter(t -> StringUtils.equalsIgnoreCase(t.getPlaceSimpleName(), placeSimpleName)) + .allMatch(tracker -> tracker.getTimer() >= getTimeLimit()); + } + +} diff --git a/src/main/java/emissary/core/sentinel/protocols/rules/AnyMaxTime.java b/src/main/java/emissary/core/sentinel/protocols/rules/AnyMaxTime.java new file mode 100644 index 0000000000..fd87904c29 --- /dev/null +++ b/src/main/java/emissary/core/sentinel/protocols/rules/AnyMaxTime.java @@ -0,0 +1,33 @@ +package emissary.core.sentinel.protocols.rules; + +import emissary.core.sentinel.Sentinel; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.util.Map; + +/** + * Looks at the place that has been running for the most amount of time. + */ +public class AnyMaxTime extends Rule { + + protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public AnyMaxTime(String place, long timeLimit, double threshold) { + super(place, timeLimit, threshold); + } + + public AnyMaxTime(String place, String timeLimit, String threshold) { + super(place, timeLimit, threshold); + } + + protected boolean overTimeLimit(Map trackers, String placeSimpleName) { + return trackers.values().stream() + .filter(t -> StringUtils.equalsIgnoreCase(t.getPlaceSimpleName(), placeSimpleName)) + .anyMatch(tracker -> tracker.getTimer() >= getTimeLimit()); + } + +} diff --git a/src/main/java/emissary/core/sentinel/rules/Rule.java b/src/main/java/emissary/core/sentinel/protocols/rules/Rule.java similarity index 52% rename from src/main/java/emissary/core/sentinel/rules/Rule.java rename to src/main/java/emissary/core/sentinel/protocols/rules/Rule.java index 6bb259ae6b..4430147dd4 100644 --- a/src/main/java/emissary/core/sentinel/rules/Rule.java +++ b/src/main/java/emissary/core/sentinel/protocols/rules/Rule.java @@ -1,4 +1,4 @@ -package emissary.core.sentinel.rules; +package emissary.core.sentinel.protocols.rules; import emissary.core.NamespaceException; import emissary.core.sentinel.Sentinel; @@ -9,7 +9,6 @@ import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; -import java.util.Comparator; import java.util.Map; import java.util.StringJoiner; @@ -49,40 +48,26 @@ public double getThreshold() { return threshold; } - public void run(Map tracker, String placeSimpleName, Integer count) throws NamespaceException { - if (condition(tracker, placeSimpleName, count)) { - action(tracker, placeSimpleName, count); - } - } - - public boolean condition(Map trackers, String placeSimpleName, Integer count) throws NamespaceException { + public boolean condition(Map trackers, String placeSimpleName, Integer count) { return overThreshold(count) && overTimeLimit(trackers, placeSimpleName); } - public abstract void action(Map trackers, String placeSimpleName, Integer count); - - protected boolean overThreshold(Integer count) throws NamespaceException { - int poolSize = AgentPool.lookup().getCurrentPoolSize(); - logger.trace("Testing threshold for place={}, counter={}, poolSize={}, threshold={}", place, count, poolSize, threshold); - return (double) count / poolSize >= getThreshold(); - } - - protected boolean overTimeLimit(Map trackers, String placeSimpleName) { - long maxTimeInPlace = getMaxTimeInPlace(trackers, placeSimpleName); - logger.trace("Testing time limit for place={}, maxTimeInPlace={}, timeLimit={}", place, maxTimeInPlace, timeLimit); - return maxTimeInPlace >= getTimeLimit(); + protected boolean overThreshold(Integer count) { + try { + int poolSize = AgentPool.lookup().getCurrentPoolSize(); + logger.trace("Testing threshold for place={}, counter={}, poolSize={}, threshold={}", place, count, poolSize, threshold); + return (double) count / poolSize >= getThreshold(); + } catch (NamespaceException ne) { + throw new IllegalStateException(ne); + } } - protected long getMaxTimeInPlace(Map trackers, String placeSimpleName) { - return trackers.values().stream() - .filter(t -> StringUtils.equalsIgnoreCase(t.getPlaceSimpleName(), placeSimpleName)) - .map(Sentinel.Tracker::getTimer) - .max(Comparator.naturalOrder()).orElse(0L); - } + protected abstract boolean overTimeLimit(Map trackers, String placeSimpleName); @Override public String toString() { - return new StringJoiner(", ", "Rule:" + place + "[", "]") + return new StringJoiner(", ", getClass().getSimpleName() + "[", "]") + .add("place=" + place) .add("timeLimit=" + timeLimit) .add("threshold=" + threshold) .toString(); diff --git a/src/main/resources/emissary/core/sentinel/Sentinel.cfg b/src/main/resources/emissary/core/sentinel/Sentinel.cfg index 0de34be4c5..7ac8ef6878 100644 --- a/src/main/resources/emissary/core/sentinel/Sentinel.cfg +++ b/src/main/resources/emissary/core/sentinel/Sentinel.cfg @@ -1,12 +1,4 @@ ENABLED = false POLLING_INTERVAL_MINUTES = 5 -RULE_ID = "DEFAULT" -DEFAULT_TIME_LIMIT_MINUTES = "60" -DEFAULT_THRESHOLD = "1.0" - -# Sample rule -# RULE_ID = "DelayPlace" -# DelayPlace_RULE = "emissary.core.sentinel.rules.Kill" -# DelayPlace_TIME_LIMIT_MINUTES = "5" -# DelayPlace_THRESHOLD = "0.75" +PROTOCOL_DEFAULT = emissary.core.sentinel.protocols.DEFAULT.cfg diff --git a/src/main/resources/emissary/core/sentinel/protocols/DEFAULT.cfg b/src/main/resources/emissary/core/sentinel/protocols/DEFAULT.cfg new file mode 100644 index 0000000000..a24b6bf9cf --- /dev/null +++ b/src/main/resources/emissary/core/sentinel/protocols/DEFAULT.cfg @@ -0,0 +1,14 @@ +ENABLED = false + +ACTION = "emissary.core.sentinel.protocols.actions.Notify" + +RULE_ID = "DEFAULT" +DEFAULT_TIME_LIMIT_MINUTES = "60" +DEFAULT_THRESHOLD = "1.0" + +# Sample rule +# RULE_ID = "DelayPlace" +# DelayPlace_RULE = "emissary.core.sentinel.protocols.rules.AnyMaxTime" +# DelayPlace_TIME_LIMIT_MINUTES = "1" +# DelayPlace_THRESHOLD = "0.75" +