Skip to content

Commit

Permalink
#40: Added Topology Monitor Service that checks running topologies to…
Browse files Browse the repository at this point in the history
… ensure they stay up even after a cluster outage.
  • Loading branch information
juliencruz committed Aug 30, 2015
1 parent 393d0cd commit edd5aef
Show file tree
Hide file tree
Showing 13 changed files with 610 additions and 347 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ nbactions.xml
.DS_Store
*.DS_Store
*.iml
.idea
.idea/
2 changes: 1 addition & 1 deletion streamflow-core/streamflow-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>13.0</version>
<version>18.0</version>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ public void run() {
localCluster.submitTopology(topology.getId(), stormConfig, stormTopology);
}
} catch (AlreadyAliveException ex) {
LOG.error("The specified topology is already running on the cluster: {}", ex);
LOG.error("The specified topology is already running on the cluster:", ex);
} catch (InvalidTopologyException ex) {
LOG.error("The specified topology is invalid: " + ex);
} catch (FrameworkException ex) {
LOG.error("The topology was unable to load a dependent framework: {}", ex);
LOG.error("The topology was unable to load a dependent framework:", ex);
} catch (Exception ex) {
LOG.error("The topology threw an uncaught exception: {}", ex);
LOG.error("The topology threw an uncaught exception:", ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/**
* Copyright 2014 Lockheed Martin Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package streamflow.model.config;

import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

public class MonitorConfig implements Serializable {

static Logger LOG = LoggerFactory.getLogger(MonitorConfig.class);

private boolean enabled = false;

private int pollingInterval = 60;

public MonitorConfig() {
}

public boolean isEnabled() {
return Boolean.parseBoolean(
System.getProperty("monitor.enabled", Boolean.toString(enabled)));
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public int getPollingInterval() {
if (System.getProperty("monitor.pollingInterval") != null) {
try {
pollingInterval = Integer.parseInt(System.getProperty("monitor.pollingInterval"));
} catch (Exception ex) {
}
}
return pollingInterval;
}

public void setPollingInterval(int pollingInterval) {
this.pollingInterval = pollingInterval;
}

@Override
public int hashCode() {
int hash = 3;
hash = 43 * hash + (this.enabled ? 1 : 0);
hash = 43 * hash + this.pollingInterval;
return hash;
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final MonitorConfig other = (MonitorConfig) obj;
if (this.enabled != other.enabled) {
return false;
}
if (this.pollingInterval != other.pollingInterval) {
return false;
}
return true;
}

@Override
public String toString() {
return "MonitorConfig{" + "enabled=" + enabled + ", pollingInterval=" + pollingInterval + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ public class StreamflowConfig implements Serializable {
private LoggerConfig logger = new LoggerConfig();

private AuthConfig auth = new AuthConfig();


private MonitorConfig monitor = new MonitorConfig();

private LocalClusterConfig localCluster = new LocalClusterConfig();

private List<Cluster> clusters = new ArrayList<>();
Expand Down Expand Up @@ -84,6 +86,14 @@ public void setAuth(AuthConfig auth) {
this.auth = auth;
}

public MonitorConfig getMonitor() {
return monitor;
}

public void setMonitor(MonitorConfig monitor) {
this.monitor = monitor;
}

public LocalClusterConfig getLocalCluster() {
return localCluster;
}
Expand Down Expand Up @@ -116,6 +126,7 @@ public int hashCode() {
hash = 29 * hash + (this.datastore != null ? this.datastore.hashCode() : 0);
hash = 29 * hash + (this.logger != null ? this.logger.hashCode() : 0);
hash = 29 * hash + (this.auth != null ? this.auth.hashCode() : 0);
hash = 29 * hash + (this.monitor != null ? this.monitor.hashCode() : 0);
hash = 29 * hash + (this.localCluster != null ? this.localCluster.hashCode() : 0);
hash = 29 * hash + (this.clusters != null ? this.clusters.hashCode() : 0);
return hash;
Expand Down Expand Up @@ -150,6 +161,10 @@ public boolean equals(Object obj) {
|| !this.auth.equals(other.auth))) {
return false;
}
if (this.monitor != other.monitor && (this.monitor == null
|| !this.monitor.equals(other.monitor))) {
return false;
}
if (this.localCluster != other.localCluster && (this.localCluster == null
|| !this.localCluster.equals(other.localCluster))) {
return false;
Expand All @@ -165,7 +180,7 @@ public boolean equals(Object obj) {
public String toString() {
return "StreamFlowConfig{" + "server=" + server + ", proxy=" + proxy
+ ", datastore=" + datastore + ", logger=" + logger
+ ", auth=" + auth + ", localCluster=" + localCluster
+ ", auth=" + auth + ", monitor=" + monitor + ", localCluster=" + localCluster
+ ", clusters=" + clusters + '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package streamflow.server.config;

import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Service;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import streamflow.model.config.MonitorConfig;
import streamflow.model.config.StreamflowConfig;
import streamflow.server.service.TopologyMonitorService;
import streamflow.util.config.ConfigLoader;

import java.util.Set;

public class GuavaServiceModule extends AbstractModule {

@Override
protected void configure() {
MonitorConfig monitorConfig = ConfigLoader.getConfig().getMonitor();
if (monitorConfig.isEnabled()) {
bind(TopologyMonitorService.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
*/
package streamflow.server.config;

//import com.google.common.util.concurrent.ServiceManager;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceServletContextListener;
import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
import javax.servlet.annotation.WebListener;

import org.apache.storm.guava.util.concurrent.ServiceManager;
import streamflow.datastore.config.DatastoreModule;
import streamflow.engine.config.EngineModule;
import streamflow.server.service.TopologyMonitorService;
import streamflow.service.config.ServiceModule;
import streamflow.util.config.ConfigModule;
import org.apache.shiro.guice.web.ShiroWebModule;
Expand All @@ -39,9 +43,17 @@ protected Injector getInjector() {
StreamflowEnvironment.setStreamflowHome(System.getenv("STREAMFLOW_HOME"));
StreamflowEnvironment.initialize();

return Guice.createInjector(new ConfigModule(), new DatastoreModule(),
new ServiceModule(), new EngineModule(), new JerseyModule(),
Injector injector = Guice.createInjector(new ConfigModule(), new DatastoreModule(),
new ServiceModule(), new GuavaServiceModule(), new EngineModule(), new JerseyModule(),
new SecurityModule(servletContext), ShiroWebModule.guiceFilterModule());

// Initialize the service manager to manage daemon services
//ServiceManager manager = injector.getInstance(ServiceManager.class);
//manager.startAsync().awaitHealthy();
TopologyMonitorService topologyMonitorService = injector.getInstance(TopologyMonitorService.class);
topologyMonitorService.startAsync().awaitRunning();

return injector;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package streamflow.server.service;

import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import streamflow.model.Topology;
import streamflow.model.config.MonitorConfig;
import streamflow.service.TopologyService;

import java.util.concurrent.TimeUnit;

@Singleton
public class TopologyMonitorService extends AbstractScheduledService {

public static final Logger LOG = LoggerFactory.getLogger(TopologyMonitorService.class);

private TopologyService topologyService;
private MonitorConfig monitorConfig;

@Inject
public TopologyMonitorService(TopologyService topologyService, MonitorConfig monitorConfig) {
this.topologyService = topologyService;
this.monitorConfig = monitorConfig;
}

@Override
protected void startUp() throws Exception {
LOG.info("Topology Status Monitor Started...");
}

@Override
protected void runOneIteration() throws Exception {
// Iterate over all of the topologies for each user to check live status
for (Topology topology : topologyService.listAllTopologies()) {
try {
// Get the current live status of the topology
String topologyStatusDesired = topology.getStatus();
String topologyStatusActual = topologyService.getTopology(topology.getId(), topology.getUserId()).getStatus();

if (topologyStatusDesired.equalsIgnoreCase("ACTIVE")) {
// Topology should be submitted, but isn't active so resubmit to desired state
if (!topologyStatusActual.equalsIgnoreCase("ACTIVE")) {
LOG.warn("Topology has a desired state of ACTIVE but is not currently deployed. "
+ "Redeploying topology... ID = " + topology.getId() + ", Name = " + topology.getName());

// Resubmit the topology using the same settings as originally submitted
Topology submittedTopology = topologyService.submitTopology(
topology.getId(), topology.getUserId(), topology.getClusterId(),
topology.getLogLevel(), topology.getClassLoaderPolicy());

if (topology != null && topology.getStatus().equalsIgnoreCase("ACTIVE")) {
LOG.info("Topology redeploy succeeded: ID = " + topology.getId() + ", Name = " + topology.getName());
} else {
LOG.error("Topology redeploy failed: ID = " + topology.getId() + ", Name = " + topology.getName());
}
}
}
} catch (Exception ex) {
LOG.error("An exception occurred while checking topology status: ID = "
+ topology.getId() + ", Name = " + topology.getName(), ex);
}
}
}

@Override
protected Scheduler scheduler() {
return Scheduler.newFixedRateSchedule(monitorConfig.getPollingInterval(), monitorConfig.getPollingInterval(), TimeUnit.SECONDS);
}
}
Loading

0 comments on commit edd5aef

Please sign in to comment.