Skip to content

Commit

Permalink
sentinel updates and add tests (#631)
Browse files Browse the repository at this point in the history
  • Loading branch information
dev-mlb authored Dec 12, 2023
1 parent a245269 commit 642fa24
Show file tree
Hide file tree
Showing 15 changed files with 476 additions and 78 deletions.
55 changes: 26 additions & 29 deletions src/main/java/emissary/core/sentinel/Sentinel.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public void run() {
// Delay this loop
try {
Thread.sleep(TimeUnit.MINUTES.toMillis(pollingInterval));
logger.debug("Sentinel is still watching");
watch();
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -190,10 +191,10 @@ protected void watch(String agentKey) throws NamespaceException {
Tracker trackedAgent = trackers.computeIfAbsent(mobileAgent.getName(), Tracker::new);
if (mobileAgent.isInUse()) {
if (!Objects.equals(mobileAgent.agentID(), trackedAgent.getAgentId())
|| !Objects.equals(mobileAgent.getLastPlaceProcessed(), trackedAgent.getPlaceName())) {
|| !Objects.equals(mobileAgent.getLastPlaceProcessed(), trackedAgent.getDirectoryEntryKey())) {
trackedAgent.clear();
trackedAgent.setAgentId(mobileAgent.agentID());
trackedAgent.setPlaceName(mobileAgent.getLastPlaceProcessed());
trackedAgent.resetTimer();
trackedAgent.setDirectoryEntryKey(mobileAgent.getLastPlaceProcessed());
}
trackedAgent.incrementTimer(pollingInterval);
logger.trace("Agent acquired {}", trackedAgent);
Expand All @@ -203,11 +204,11 @@ protected void watch(String agentKey) throws NamespaceException {
}
}

public static class Tracker {
public static class Tracker implements Comparable<Tracker> {
private final String agentName;
private String agentId;
private String shortName;
private String placeName;
private String directoryEntryKey;
private long timer = -1;

public Tracker(String agentName) {
Expand All @@ -224,8 +225,7 @@ public String getAgentId() {

public void setAgentId(String agentId) {
if (StringUtils.contains(agentId, "No_AgentID_Set")) {
this.agentId = "";
this.shortName = "";
clear();
} else {
this.agentId = agentId;
if (StringUtils.contains(agentId, "Agent-")) {
Expand All @@ -242,28 +242,20 @@ public static String getShortName(String agentId) {
return StringUtils.substringAfter(StringUtils.substringAfter(agentId, "Agent-"), "-");
}

public String getPlaceName() {
return placeName;
}

public void setPlaceName(String placeName) {
this.placeName = placeName;
}

public String getPlaceSimpleName() {
return getPlaceSimpleName(this.placeName);
public String getDirectoryEntryKey() {
return directoryEntryKey;
}

public String getPlaceAndShortName() {
return getPlaceAndShortName(this.placeName, this.shortName);
public void setDirectoryEntryKey(String directoryEntryKey) {
this.directoryEntryKey = directoryEntryKey;
}

public static String getPlaceSimpleName(String place) {
return StringUtils.defaultString(StringUtils.substringAfterLast(place, "/"), "");
public String getPlaceName() {
return getPlaceName(this.directoryEntryKey);
}

public static String getPlaceAndShortName(String place, String shortName) {
return getPlaceSimpleName(place) + "/" + shortName;
public static String getPlaceName(String directoryEntryKey) {
return StringUtils.defaultString(StringUtils.substringAfterLast(directoryEntryKey, "/"), "");
}

public long getTimer() {
Expand All @@ -285,17 +277,22 @@ public void incrementTimer(long time) {
public void clear() {
this.agentId = "";
this.shortName = "";
this.placeName = "";
this.directoryEntryKey = "";
resetTimer();
}

@Override
public int compareTo(Tracker o) {
return this.agentName.compareTo(o.agentName);
}

@Override
public String toString() {
return new StringJoiner(", ", "[", "]")
.add("agentName='" + agentName + "'")
.add("placeName='" + placeName + "'")
.add("shortName='" + shortName + "'")
.add("timer=" + timer + " minute(s)")
return new StringJoiner(", ", "{", "}")
.add("\"agentName\":\"" + agentName + "\"")
.add("\"directoryEntry\":\"" + directoryEntryKey + "\"")
.add("\"shortName\":\"" + shortName + "\"")
.add("\"timeInMinutes\":" + timer)
.toString();
}
}
Expand Down
47 changes: 21 additions & 26 deletions src/main/java/emissary/core/sentinel/protocols/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;

import static emissary.core.sentinel.Sentinel.Tracker.getPlaceSimpleName;

/**
* This protocol buckets places that are running in mobile agents and then looks at max and min time in place and the
* number of agents that are potentially "stuck." After places are bucketed, the place stats are run against the
Expand All @@ -40,6 +38,8 @@ public class Protocol {
protected final Map<String, Rule> rules = new ConcurrentHashMap<>();
protected Action action;

Protocol() {}

public Protocol(String conf) {
configure(conf);
}
Expand All @@ -53,32 +53,16 @@ public boolean isEnabled() {
*/
public void run(Map<String, Sentinel.Tracker> trackers) {

Map<String, PlaceAgentStats> placeAgentStats = new ConcurrentHashMap<>();
for (Sentinel.Tracker tracker : trackers.values()) {
String placeKey = getPlaceKey(tracker);
if (StringUtils.isNotBlank(placeKey)) {
placeAgentStats.put(placeKey, placeAgentStats.getOrDefault(placeKey, new PlaceAgentStats(placeKey)).update(tracker.getTimer()));
}
}

Map<String, PlaceAgentStats> placeAgentStats = generatePlaceAgentStats(trackers);
if (!placeAgentStats.isEmpty()) {
logger.debug("Running rules on agents {}", placeAgentStats);
if (rules.values().stream().allMatch(rule -> rule.condition(placeAgentStats.values()))) {
logger.warn("Sentinel rules matched -- {}", rules.values());
action.trigger(trackers);
}
}
}

/**
* Get the place key, i.e. the simple name
*
* @param tracker agents, places, and filenames that's currently processing
* @return the place key
*/
public String getPlaceKey(Sentinel.Tracker tracker) {
return getPlaceSimpleName(tracker.getPlaceName());
}

/**
* Get the Configurator
*
Expand All @@ -87,7 +71,7 @@ public String getPlaceKey(Sentinel.Tracker tracker) {
protected void configure(String conf) {
try {
this.config = ConfigUtil.getConfigInfo(conf);
init();
init(this.config);
} catch (IOException e) {
logger.warn("Cannot read {}, skipping!!", conf);
}
Expand All @@ -96,7 +80,7 @@ protected void configure(String conf) {
/**
* Initialize rule set and action
*/
protected void init() {
protected void init(Configurator config) {
this.enabled = config.findBooleanEntry("ENABLED", false);
if (enabled) {

Expand All @@ -110,7 +94,7 @@ protected void init() {
}
Map<String, String> map = config.findStringMatchMap(ruleId + "_");
String rule = map.getOrDefault("RULE", AllMaxTime.class.getName());
Rule ruleImpl = (Rule) Factory.create(rule, validate(map.get("PLACE_MATCHER")), map.get("TIME_LIMIT_MINUTES"),
Rule ruleImpl = (Rule) Factory.create(rule, ruleId, validate(map.get("PLACE_MATCHER")), map.get("TIME_LIMIT_MINUTES"),
map.get("PLACE_THRESHOLD"));
logger.debug("Sentinel loaded rule[{}] - {}", ruleId, ruleImpl);
this.rules.put(ruleId, ruleImpl);
Expand Down Expand Up @@ -143,11 +127,22 @@ protected String validate(String place) throws NamespaceException {
return place;
}

protected Map<String, PlaceAgentStats> generatePlaceAgentStats(Map<String, Sentinel.Tracker> trackers) {
Map<String, PlaceAgentStats> placeAgentStats = new ConcurrentHashMap<>();
for (Sentinel.Tracker tracker : trackers.values()) {
String placeKey = tracker.getPlaceName();
if (StringUtils.isNotBlank(placeKey)) {
placeAgentStats.put(placeKey, placeAgentStats.getOrDefault(placeKey, new PlaceAgentStats(placeKey)).update(tracker.getTimer()));
}
}
return placeAgentStats;
}

@Override
public String toString() {
return new StringJoiner(", ", "[", "]")
.add("rules=" + rules)
.add("action=" + action)
return new StringJoiner(", ", "{", "}")
.add("\"rules\":" + rules.values())
.add("\"action\":" + action)
.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class Action {
public abstract class Action {

protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

Expand All @@ -17,13 +19,15 @@ public class Action {
*
* @param trackers the listing of agents, places, and filenames that's currently processing
*/
public void trigger(Map<String, Sentinel.Tracker> trackers) {
public abstract void trigger(Map<String, Sentinel.Tracker> trackers);

public List<Sentinel.Tracker> format(Map<String, Sentinel.Tracker> trackers) {
return trackers.values().stream().sorted().collect(Collectors.toList());
}

@Override
public String toString() {
return getClass().getSimpleName();
return "\"" + getClass().getSimpleName() + "\"";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class Exit extends Action {

@Override
public void trigger(Map<String, Sentinel.Tracker> trackers) {
logger.error("Sentinel detected unrecoverable agents {}, exiting now!!", trackers.values());
logger.error("Sentinel detected unrecoverable agents, exiting now -- {}", format(trackers));
System.exit(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class Kill extends Action {

@Override
public void trigger(Map<String, Sentinel.Tracker> trackers) {
logger.error("Sentinel detected unrecoverable agents {}, initiating forceful shutdown...", trackers.values());
logger.error("Sentinel detected unrecoverable agents, initiating forceful shutdown -- {}", format(trackers));
CompletableFuture.runAsync(EmissaryServer::stopServerForce);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ public class Notify extends Action {

@Override
public void trigger(Map<String, Sentinel.Tracker> trackers) {
logger.warn("Sentinel detected locked agents {}", trackers.values());
logger.warn("Sentinel detected possible locked agents -- {}", format(trackers));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class Stop extends Action {

@Override
public void trigger(Map<String, Sentinel.Tracker> trackers) {
logger.error("Sentinel detected unrecoverable agents {}, initiating graceful shutdown...", trackers.values());
logger.error("Sentinel detected unrecoverable agents, initiating graceful shutdown -- {}", format(trackers));
CompletableFuture.runAsync(EmissaryServer::stopServer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ public class AllMaxTime extends Rule {

protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

public AllMaxTime(String place, long timeLimit, double threshold) {
super(place, timeLimit, threshold);
public AllMaxTime(String name, String place, long timeLimit, double threshold) {
super(name, place, timeLimit, threshold);
}

public AllMaxTime(String place, String timeLimit, String threshold) {
super(place, timeLimit, threshold);
public AllMaxTime(String name, String place, String timeLimit, String threshold) {
super(name, place, timeLimit, threshold);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ public class AnyMaxTime extends Rule {

protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

public AnyMaxTime(String place, long timeLimit, double threshold) {
super(place, timeLimit, threshold);
public AnyMaxTime(String name, String place, long timeLimit, double threshold) {
super(name, place, timeLimit, threshold);
}

public AnyMaxTime(String place, String timeLimit, String threshold) {
super(place, timeLimit, threshold);
public AnyMaxTime(String name, String place, String timeLimit, String threshold) {
super(name, place, timeLimit, threshold);
}

/**
Expand Down
25 changes: 17 additions & 8 deletions src/main/java/emissary/core/sentinel/protocols/rules/Rule.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ public abstract class Rule {

protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

// the name of the rule
protected final String name;

// the place name to test the condition
protected final Pattern place;

Expand All @@ -26,8 +29,11 @@ public abstract class Rule {
// percentage of mobile agents that are stuck on the same place before sounding the alarm
protected final double threshold;

public Rule(String place, long timeLimit, double threshold) {
logger.trace("Creating rule for place={}, timeLimit={}, threshold={}", place, timeLimit, threshold);
public Rule(String name, String place, long timeLimit, double threshold) {
logger.trace("Creating rule for name={}, place={}, timeLimit={}, threshold={}", name, place, timeLimit, threshold);
if (StringUtils.isBlank(name)) {
throw new IllegalArgumentException("Invalid name [" + name + "]");
}
if (StringUtils.isBlank(place)) {
throw new IllegalArgumentException("Invalid place pattern [" + place + "]");
}
Expand All @@ -37,13 +43,14 @@ public Rule(String place, long timeLimit, double threshold) {
if (threshold <= 0.0 || threshold > 1.0) {
throw new IllegalArgumentException("Invalid threshold [" + threshold + "], expected a value > 0.0 or <= 1.0");
}
this.name = name;
this.place = Pattern.compile(place);
this.timeLimit = timeLimit;
this.threshold = threshold;
}

public Rule(String place, String timeLimit, String threshold) {
this(place, StringUtils.isBlank(timeLimit) ? 60L : Long.parseLong(timeLimit),
public Rule(String name, String place, String timeLimit, String threshold) {
this(name, place, StringUtils.isBlank(timeLimit) ? 60L : Long.parseLong(timeLimit),
StringUtils.isBlank(threshold) ? 1.0 : Double.parseDouble(threshold));
}

Expand Down Expand Up @@ -92,10 +99,12 @@ protected int getAgentCount() {

@Override
public String toString() {
return new StringJoiner(", ", getClass().getSimpleName() + "[", "]")
.add("place=" + place)
.add("timeLimit=" + timeLimit)
.add("threshold=" + threshold)
return new StringJoiner(", ", "{", "}")
.add("\"name\":\"" + name + "\"")
.add("\"rule\":\"" + getClass().getSimpleName() + "\"")
.add("\"place\":\"" + place + "\"")
.add("\"timeLimitInMinutes\":" + timeLimit)
.add("\"threshold\":" + threshold)
.toString();
}

Expand Down
Loading

0 comments on commit 642fa24

Please sign in to comment.