Skip to content

Commit

Permalink
added protocols
Browse files Browse the repository at this point in the history
  • Loading branch information
dev-mlb committed Nov 27, 2023
1 parent 823f2e2 commit 800dc8e
Show file tree
Hide file tree
Showing 13 changed files with 308 additions and 152 deletions.
88 changes: 24 additions & 64 deletions src/main/java/emissary/core/sentinel/Sentinel.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,18 @@

import emissary.config.ConfigUtil;
import emissary.config.Configurator;
import emissary.core.Factory;
import emissary.core.IMobileAgent;
import emissary.core.Namespace;
import emissary.core.NamespaceException;
import emissary.core.sentinel.rules.Notify;
import emissary.core.sentinel.rules.Rule;
import emissary.directory.DirectoryPlace;
import emissary.directory.KeyManipulator;
import emissary.core.sentinel.protocols.Protocol;
import emissary.pool.MobileAgentFactory;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -30,20 +27,14 @@
*/
public class Sentinel implements Runnable {

protected static final Logger logger = LoggerFactory.getLogger(Sentinel.class);
protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

public static final String DEFAULT_NAMESPACE_NAME = "Sentinel";

protected static final String DEFAULT_RULE = "DEFAULT";

// key: place, value: rule (time limits, thresholds)
final Map<String, Rule> rules = new ConcurrentHashMap<>();

// key: agent name, value: how long Sentinel has observed the mobile agent
final Map<String, Tracker> trackers = new ConcurrentHashMap<>();

// key: place simple name, value: number of agents in place
final Map<String, Integer> placeAgentCounts = new ConcurrentHashMap<>();
final Map<String, Protocol> protocols = new ConcurrentHashMap<>();

Configurator config;

Expand Down Expand Up @@ -112,7 +103,7 @@ public void run() {

@Override
public String toString() {
return "Watching agents with " + rules.values();
return "Watching agents with " + protocols.values();
}

/**
Expand All @@ -135,35 +126,23 @@ protected void init() {
if (this.enabled) {
this.pollingInterval = config.findIntEntry("POLLING_INTERVAL_MINUTES", 5);

logger.trace("Loading rules...");
for (String ruleId : config.findEntries("RULE_ID")) {
logger.trace("Sentinel protocols initializing...");
for (Map.Entry<String, String> proto : config.findStringMatchMap("PROTOCOL_", true, true).entrySet()) {
try {
validate(ruleId);
Map<String, String> map = config.findStringMatchMap(ruleId + "_");
String rule = map.getOrDefault("RULE", Notify.class.getName());
Rule ruleImpl = (Rule) Factory.create(rule, ruleId, map.get("TIME_LIMIT_MINUTES"), map.get("THRESHOLD"));
logger.info("Sentinel loaded {}", ruleImpl);
this.rules.put(ruleId, ruleImpl);
String protocolId = proto.getKey();
String config = proto.getValue();
Protocol protocol = new Protocol(config);
logger.info("Sentinel initiated {}", protocol);
if (protocol.isEnabled()) {
this.protocols.put(protocolId, protocol);
}
} catch (Exception e) {
logger.warn("Unable to configure Sentinel for {}: {}", ruleId, e.getMessage());
logger.warn("Unable to configure Sentinel Protocol {}: {}", proto, e.getMessage());
}
}

// if no rules, create a default one
if (!this.rules.containsKey(DEFAULT_RULE)) {
logger.warn("Default rule not found, creating one...");
this.rules.put(DEFAULT_RULE, new Notify(DEFAULT_RULE, 60L, 1.0));
}
}
}

protected void validate(String place) throws NamespaceException {
// validate that the place exists
if (!DEFAULT_RULE.equalsIgnoreCase(place)) {
DirectoryPlace directoryPlace = Namespace.lookup(DirectoryPlace.class).iterator().next();
if (directoryPlace.getEntries().stream()
.noneMatch(entry -> place.equalsIgnoreCase(KeyManipulator.getServiceClassname(entry.getFullKey())))) {
throw new IllegalStateException("Place not found in the directory");
if (this.protocols.isEmpty()) {
this.enabled = false;
logger.warn("Sentinel initializing failed: no protocols found");
}
}
}
Expand All @@ -174,13 +153,14 @@ protected void validate(String place) throws NamespaceException {
* @throws NamespaceException if there is a problem looking up resources in the {@link Namespace}
*/
protected void watch() throws NamespaceException {
placeAgentCounts.clear();
List<String> agentKeys =
Namespace.keySet().stream().filter(k -> k.startsWith(MobileAgentFactory.AGENT_NAME)).sorted().collect(Collectors.toList());
List<String> agentKeys = Namespace.keySet().stream()
.filter(k -> k.startsWith(MobileAgentFactory.AGENT_NAME))
.sorted()
.collect(Collectors.toList());
for (String agentKey : agentKeys) {
watch(agentKey);
}
check();
protocols.values().forEach(protocol -> protocol.run(trackers));
}

/**
Expand All @@ -201,33 +181,13 @@ protected void watch(String agentKey) throws NamespaceException {
trackedAgent.resetTimer();
}
trackedAgent.incrementTimer(pollingInterval);
String placeSimpleName = getPlaceSimpleName(mobileAgent.getLastPlaceProcessed());
placeAgentCounts.put(placeSimpleName, placeAgentCounts.getOrDefault(placeSimpleName, 0) + 1);
logger.debug("Agent acquired {}", trackedAgent);
} else {
trackedAgent.clear();
logger.debug("Agent not in use [{}]", agentKey);
}
}

/**
* Run the configured rules over the watched mobile-agents
*
* @throws NamespaceException if there is a problem looking up resources in the {@link Namespace}
*/
protected void check() throws NamespaceException {
logger.debug("Checking agents {}", placeAgentCounts);
for (Map.Entry<String, Integer> item : placeAgentCounts.entrySet()) {
Rule rule = rules.getOrDefault(item.getKey(), rules.get(DEFAULT_RULE));
logger.trace("Found {} for {}", rule, item.getKey());
rule.run(trackers, item.getKey(), item.getValue());
}
}

protected static String getPlaceSimpleName(String lastPlaceProcessed) {
return StringUtils.substringAfterLast(lastPlaceProcessed, "/");
}

public static class Tracker {
private final String agentName;
private String agentId;
Expand Down Expand Up @@ -272,7 +232,7 @@ public void setPlaceName(String placeName) {
}

public String getPlaceSimpleName() {
return Sentinel.getPlaceSimpleName(this.placeName);
return Protocol.getPlaceSimpleName(this.placeName);
}

public long getTimer() {
Expand Down
147 changes: 147 additions & 0 deletions src/main/java/emissary/core/sentinel/protocols/Protocol.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package emissary.core.sentinel.protocols;

import emissary.config.ConfigUtil;
import emissary.config.Configurator;
import emissary.core.Factory;
import emissary.core.Namespace;
import emissary.core.NamespaceException;
import emissary.core.sentinel.Sentinel;
import emissary.core.sentinel.protocols.actions.Action;
import emissary.core.sentinel.protocols.actions.Notify;
import emissary.core.sentinel.protocols.rules.AllMaxTime;
import emissary.core.sentinel.protocols.rules.Rule;
import emissary.directory.DirectoryPlace;
import emissary.directory.KeyManipulator;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;

public class Protocol {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected static final String DEFAULT_RULE = "DEFAULT";
private Configurator config;
private boolean enabled = false;
private Action action;
private final Map<String, Rule> rules = new ConcurrentHashMap<>(); // key: place, value: rule
private final Map<String, Integer> placeAgentCounts = new ConcurrentHashMap<>(); // key: place, value: number of agents in place

public Protocol(String conf) {
configure(conf);
}

public boolean isEnabled() {
return enabled;
}

public Map<String, Rule> getRules() {
return Collections.unmodifiableMap(rules);
}

public Map<String, Integer> getPlaceAgentCounts() {
return Collections.unmodifiableMap(placeAgentCounts);
}

public Action getAction() {
return action;
}

/**
* Run the configured rules over the watched mobile-agents
*
* @throws NamespaceException if there is a problem looking up resources in the {@link Namespace}
*/
public void run(Map<String, Sentinel.Tracker> trackers) {
placeAgentCounts.clear();
for (Sentinel.Tracker tracker : trackers.values()) {
String placeKey = getPlaceKey(tracker);
if (StringUtils.isNotBlank(placeKey)) {
placeAgentCounts.put(placeKey, placeAgentCounts.getOrDefault(placeKey, 0) + 1);
}
}
logger.debug("Checking agents {}", placeAgentCounts);
for (Map.Entry<String, Integer> item : placeAgentCounts.entrySet()) {
Rule rule = rules.getOrDefault(item.getKey(), rules.get(DEFAULT_RULE));
logger.trace("Found {} for {}", rule, item.getKey());
check(trackers, item.getKey(), item.getValue());
}
}

public String getPlaceKey(Sentinel.Tracker tracker) {
return getPlaceSimpleName(tracker.getPlaceName());
}

public static String getPlaceSimpleName(String place) {
return StringUtils.substringAfterLast(place, "/");
}

protected void configure(String conf) {
try {
this.config = ConfigUtil.getConfigInfo(conf);
init();
} catch (IOException e) {
logger.warn("Cannot read " + conf + ", skipping");
}
}

protected void init() {
this.enabled = config.findBooleanEntry("ENABLED", false);
if (enabled) {
logger.trace("Loading rules...");
for (String ruleId : config.findEntries("RULE_ID")) {
try {
validate(ruleId);
Map<String, String> map = config.findStringMatchMap(ruleId + "_");
String rule = map.getOrDefault("RULE", AllMaxTime.class.getName());
Rule ruleImpl = (Rule) Factory.create(rule, ruleId, map.get("TIME_LIMIT_MINUTES"), map.get("THRESHOLD"));
logger.debug("Sentinel loaded {}", ruleImpl);
this.rules.put(ruleId, ruleImpl);
} catch (Exception e) {
logger.warn("Unable to configure Sentinel for {}: {}", ruleId, e.getMessage());
}
}

// if no rules then disable protocol
if (this.rules.isEmpty()) {
this.enabled = false;
return;
}

String action = config.findStringEntry("ACTION", Notify.class.getName());
this.action = (Action) Factory.create(action);
}
}

protected void validate(String place) throws NamespaceException {
// validate that the place exists
if (!DEFAULT_RULE.equalsIgnoreCase(place)) {
DirectoryPlace directoryPlace = Namespace.lookup(DirectoryPlace.class).iterator().next();
if (directoryPlace.getEntries().stream()
.noneMatch(entry -> place.equalsIgnoreCase(KeyManipulator.getServiceClassname(entry.getFullKey())))) {
throw new IllegalStateException("Place not found in the directory");
}
}
}

protected void check(Map<String, Sentinel.Tracker> tracker, String placeSimpleName, Integer count) {
if (getRules().values().stream().allMatch(rule -> rule.condition(tracker, placeSimpleName, count))) {
getAction().trigger(tracker, placeSimpleName, count);
}
}

@Override
public String toString() {
return new StringJoiner(", ", Protocol.class.getSimpleName() + "[", "]")
.add("rules=" + rules)
.add("action=" + action)
.toString();
}
}
28 changes: 28 additions & 0 deletions src/main/java/emissary/core/sentinel/protocols/actions/Action.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package emissary.core.sentinel.protocols.actions;

import emissary.core.sentinel.Sentinel;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.util.Map;

public abstract class Action {

protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

/**
* Try to terminate the JVM
*
* @param trackers the listing of agents, places, and filenames that's currently processing
* @param placeSimpleName the place name currently processing on one or more mobile agents
* @param count number of mobile agents stuck on the place
*/
public abstract void trigger(Map<String, Sentinel.Tracker> trackers, String placeSimpleName, Integer count);

@Override
public String toString() {
return getClass().getName();
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
package emissary.core.sentinel.rules;
package emissary.core.sentinel.protocols.actions;

import emissary.core.sentinel.Sentinel;

import java.util.Map;

public class Exit extends Rule {

public Exit(String place, long timeLimit, double threshold) {
super(place, timeLimit, threshold);
}

public Exit(String place, String timeLimit, String threshold) {
super(place, timeLimit, threshold);
}
public class Exit extends Action {

/**
* Try to terminate the JVM
Expand All @@ -22,7 +14,7 @@ public Exit(String place, String timeLimit, String threshold) {
* @param count number of mobile agents stuck on the place
*/
@Override
public void action(Map<String, Sentinel.Tracker> trackers, String placeSimpleName, Integer count) {
public void trigger(Map<String, Sentinel.Tracker> trackers, String placeSimpleName, Integer count) {
logger.error("Sentinel detected {} unrecoverable agent(s) running [{}], exiting now!!", count, placeSimpleName);
logger.debug("{}", trackers);
System.exit(1);
Expand Down
Loading

0 comments on commit 800dc8e

Please sign in to comment.