diff --git a/.gitignore b/.gitignore
index 6a7917c..561e014 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,4 +4,4 @@ nbactions.xml
.DS_Store
*.DS_Store
*.iml
-.idea
+.idea/
diff --git a/pom.xml b/pom.xml
index ec5189f..5ce491b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,7 +50,7 @@
UTF-8
- 0.9.3
+ 0.9.5
9.0.0.RC2
2.4.1
1.17.1
diff --git a/streamflow-core/streamflow-engine/pom.xml b/streamflow-core/streamflow-engine/pom.xml
index dacf9a3..7eb27cd 100644
--- a/streamflow-core/streamflow-engine/pom.xml
+++ b/streamflow-core/streamflow-engine/pom.xml
@@ -50,7 +50,7 @@
com.google.guava
guava
- 13.0
+ 18.0
com.google.inject
diff --git a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/StormEngine.java b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/StormEngine.java
index 123ab04..09d9149 100644
--- a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/StormEngine.java
+++ b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/StormEngine.java
@@ -130,10 +130,13 @@ public boolean killTopology(Topology topology, int waitTimeSecs, boolean async)
killed = waitForTopologyRemoval(topology, waitTimeSecs + KILL_BUFFER_SECS);
}
+ } catch (NotAliveException ex) {
+ // Topology is not running on the cluster so just ignore
+ killed = true;
} catch (Exception ex) {
LOG.error("Exception occurred while killing the remote topology: ID = " +
topology.getId() + ", Reason = " + ex.getMessage());
-
+ ex.printStackTrace();
killed = false;
}
}
@@ -252,7 +255,7 @@ public TopologyInfo getTopologyInfo(Topology topology) {
} catch (NotAliveException ex) {
LOG.error("The requested topology was not found in the cluster: ID = " + stormTopologyId);
} catch (TException ex) {
- LOG.error("Exception while retrieving the remote topology info: ", ex);
+ LOG.error("Exception while retrieving the remote topology info: ", ex.getMessage());
} finally {
tTransport.close();
}
@@ -267,167 +270,169 @@ public TopologyInfo getTopologyInfo(Topology topology) {
*/
TopologyInfo topologyInfo = new TopologyInfo();
- topologyInfo.setId(info.get_id());
- topologyInfo.setName(info.get_name());
- topologyInfo.setStatus(info.get_status());
- topologyInfo.setUptimeSecs(info.get_uptime_secs());
- topologyInfo.setTopologyConf(topologyConf);
-
- for (Map.Entry> error
- : info.get_errors().entrySet()) {
- List errorInfoList = new ArrayList<>();
- for (backtype.storm.generated.ErrorInfo ei : error.getValue()) {
- ErrorInfo errorInfo = new ErrorInfo();
- errorInfo.setError(ei.get_error());
- errorInfo.setErrorTimeSecs(ei.get_error_time_secs());
- errorInfo.setHost(ei.get_host());
- errorInfo.setPort(ei.get_port());
-
- errorInfoList.add(errorInfo);
+ if (info != null) {
+ topologyInfo.setId(info.get_id());
+ topologyInfo.setName(info.get_name());
+ topologyInfo.setStatus(info.get_status());
+ topologyInfo.setUptimeSecs(info.get_uptime_secs());
+ topologyInfo.setTopologyConf(topologyConf);
+
+ for (Map.Entry> error
+ : info.get_errors().entrySet()) {
+ List errorInfoList = new ArrayList<>();
+ for (backtype.storm.generated.ErrorInfo ei : error.getValue()) {
+ ErrorInfo errorInfo = new ErrorInfo();
+ errorInfo.setError(ei.get_error());
+ errorInfo.setErrorTimeSecs(ei.get_error_time_secs());
+ errorInfo.setHost(ei.get_host());
+ errorInfo.setPort(ei.get_port());
+
+ errorInfoList.add(errorInfo);
+ }
+
+ topologyInfo.getErrors().put(error.getKey(), errorInfoList);
}
- topologyInfo.getErrors().put(error.getKey(), errorInfoList);
- }
+ List executorSummaries = new ArrayList<>();
+ for (backtype.storm.generated.ExecutorSummary es : info.get_executors()) {
+ ExecutorSummary executor = new ExecutorSummary();
+ executor.setComponentId(es.get_component_id());
+ executor.setHost(es.get_host());
+ executor.setPort(es.get_port());
+ executor.setUptimeSecs(es.get_uptime_secs());
+
+ backtype.storm.generated.ExecutorInfo ei = es.get_executor_info();
+ if (ei != null) {
+ ExecutorInfo executorInfo = new ExecutorInfo();
+ executorInfo.setTaskStart(ei.get_task_start());
+ executorInfo.setTaskEnd(ei.get_task_end());
+
+ executor.setExecutorInfo(executorInfo);
+ }
- List executorSummaries = new ArrayList<>();
- for (backtype.storm.generated.ExecutorSummary es : info.get_executors()) {
- ExecutorSummary executor = new ExecutorSummary();
- executor.setComponentId(es.get_component_id());
- executor.setHost(es.get_host());
- executor.setPort(es.get_port());
- executor.setUptimeSecs(es.get_uptime_secs());
-
- backtype.storm.generated.ExecutorInfo ei = es.get_executor_info();
- if (ei != null) {
- ExecutorInfo executorInfo = new ExecutorInfo();
- executorInfo.setTaskStart(ei.get_task_start());
- executorInfo.setTaskEnd(ei.get_task_end());
-
- executor.setExecutorInfo(executorInfo);
- }
+ backtype.storm.generated.ExecutorStats eStats = es.get_stats();
+ if (eStats != null) {
+ ExecutorStats stats = new ExecutorStats();
+ stats.setEmitted(eStats.get_emitted());
+ stats.setTransferred(eStats.get_transferred());
- backtype.storm.generated.ExecutorStats eStats = es.get_stats();
- if (eStats != null) {
- ExecutorStats stats = new ExecutorStats();
- stats.setEmitted(eStats.get_emitted());
- stats.setTransferred(eStats.get_transferred());
+ backtype.storm.generated.ExecutorSpecificStats ess = eStats.get_specific();
+ if (ess != null) {
+ ExecutorSpecificStats specific = new ExecutorSpecificStats();
- backtype.storm.generated.ExecutorSpecificStats ess = eStats.get_specific();
- if (ess != null) {
- ExecutorSpecificStats specific = new ExecutorSpecificStats();
+ if (ess.is_set_bolt()) {
+ backtype.storm.generated.BoltStats bs = ess.get_bolt();
+ if (bs != null) {
+ BoltStats boltStats = new BoltStats();
- if (ess.is_set_bolt()) {
- backtype.storm.generated.BoltStats bs = ess.get_bolt();
- if (bs != null) {
- BoltStats boltStats = new BoltStats();
+ for (Map.Entry> ae
+ : bs.get_acked().entrySet()) {
+ Map ackedMap = new HashMap<>();
- for (Map.Entry> ae
- : bs.get_acked().entrySet()) {
- Map ackedMap = new HashMap<>();
+ for (Map.Entry aem
+ : ae.getValue().entrySet()) {
+ backtype.storm.generated.GlobalStreamId gsi = aem.getKey();
- for (Map.Entry aem
- : ae.getValue().entrySet()) {
- backtype.storm.generated.GlobalStreamId gsi = aem.getKey();
+ String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId();
- String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId();
+ ackedMap.put(globalStreamId, aem.getValue());
+ }
- ackedMap.put(globalStreamId, aem.getValue());
+ boltStats.getAcked().put(ae.getKey(), ackedMap);
}
- boltStats.getAcked().put(ae.getKey(), ackedMap);
- }
+ for (Map.Entry> fe
+ : bs.get_failed().entrySet()) {
+ Map failedMap = new HashMap<>();
- for (Map.Entry> fe
- : bs.get_failed().entrySet()) {
- Map failedMap = new HashMap<>();
+ for (Map.Entry fem
+ : fe.getValue().entrySet()) {
+ backtype.storm.generated.GlobalStreamId gsi = fem.getKey();
- for (Map.Entry fem
- : fe.getValue().entrySet()) {
- backtype.storm.generated.GlobalStreamId gsi = fem.getKey();
+ String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId();
- String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId();
+ failedMap.put(globalStreamId, fem.getValue());
+ }
- failedMap.put(globalStreamId, fem.getValue());
+ boltStats.getFailed().put(fe.getKey(), failedMap);
}
- boltStats.getFailed().put(fe.getKey(), failedMap);
- }
+ for (Map.Entry> ee
+ : bs.get_executed().entrySet()) {
+ Map executedMap = new HashMap<>();
- for (Map.Entry> ee
- : bs.get_executed().entrySet()) {
- Map executedMap = new HashMap<>();
+ for (Map.Entry eem
+ : ee.getValue().entrySet()) {
+ backtype.storm.generated.GlobalStreamId gsi = eem.getKey();
- for (Map.Entry eem
- : ee.getValue().entrySet()) {
- backtype.storm.generated.GlobalStreamId gsi = eem.getKey();
+ String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId();
- String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId();
+ executedMap.put(globalStreamId, eem.getValue());
+ }
- executedMap.put(globalStreamId, eem.getValue());
+ boltStats.getExecuted().put(ee.getKey(), executedMap);
}
- boltStats.getExecuted().put(ee.getKey(), executedMap);
- }
+ for (Map.Entry> ema
+ : bs.get_execute_ms_avg().entrySet()) {
+ Map executedMap = new HashMap<>();
- for (Map.Entry> ema
- : bs.get_execute_ms_avg().entrySet()) {
- Map executedMap = new HashMap<>();
+ for (Map.Entry emam
+ : ema.getValue().entrySet()) {
+ backtype.storm.generated.GlobalStreamId gsi = emam.getKey();
- for (Map.Entry emam
- : ema.getValue().entrySet()) {
- backtype.storm.generated.GlobalStreamId gsi = emam.getKey();
+ String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId();
- String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId();
+ executedMap.put(globalStreamId, emam.getValue());
+ }
- executedMap.put(globalStreamId, emam.getValue());
+ boltStats.getExecuteMsAvg().put(ema.getKey(), executedMap);
}
- boltStats.getExecuteMsAvg().put(ema.getKey(), executedMap);
- }
+ for (Map.Entry> pma
+ : bs.get_process_ms_avg().entrySet()) {
+ Map processMap = new HashMap<>();
- for (Map.Entry> pma
- : bs.get_process_ms_avg().entrySet()) {
- Map processMap = new HashMap<>();
+ for (Map.Entry pmam
+ : pma.getValue().entrySet()) {
+ backtype.storm.generated.GlobalStreamId gsi = pmam.getKey();
- for (Map.Entry pmam
- : pma.getValue().entrySet()) {
- backtype.storm.generated.GlobalStreamId gsi = pmam.getKey();
+ String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId();
- String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId();
+ processMap.put(globalStreamId, pmam.getValue());
+ }
- processMap.put(globalStreamId, pmam.getValue());
+ boltStats.getProcessMsAvg().put(pma.getKey(), processMap);
}
- boltStats.getProcessMsAvg().put(pma.getKey(), processMap);
+ specific.setBolt(boltStats);
}
-
- specific.setBolt(boltStats);
}
- }
- if (ess.is_set_spout()) {
- backtype.storm.generated.SpoutStats ss = ess.get_spout();
- if (ss != null) {
- SpoutStats spoutStats = new SpoutStats();
- spoutStats.setAcked(ss.get_acked());
- spoutStats.setFailed(ss.get_failed());
- spoutStats.setCompleteMsAvg(ss.get_complete_ms_avg());
+ if (ess.is_set_spout()) {
+ backtype.storm.generated.SpoutStats ss = ess.get_spout();
+ if (ss != null) {
+ SpoutStats spoutStats = new SpoutStats();
+ spoutStats.setAcked(ss.get_acked());
+ spoutStats.setFailed(ss.get_failed());
+ spoutStats.setCompleteMsAvg(ss.get_complete_ms_avg());
- specific.setSpout(spoutStats);
+ specific.setSpout(spoutStats);
+ }
}
+
+ stats.setSpecific(specific);
}
- stats.setSpecific(specific);
+ executor.setStats(stats);
}
- executor.setStats(stats);
+ executorSummaries.add(executor);
}
- executorSummaries.add(executor);
+ topologyInfo.setExecutors(executorSummaries);
}
- topologyInfo.setExecutors(executorSummaries);
-
return topologyInfo;
}
diff --git a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/topology/TopologySubmitter.java b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/topology/TopologySubmitter.java
index 8ba4ea5..b8f242c 100644
--- a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/topology/TopologySubmitter.java
+++ b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/topology/TopologySubmitter.java
@@ -102,13 +102,13 @@ public void run() {
localCluster.submitTopology(topology.getId(), stormConfig, stormTopology);
}
} catch (AlreadyAliveException ex) {
- LOG.error("The specified topology is already running on the cluster: {}", ex);
+ LOG.error("The specified topology is already running on the cluster:", ex);
} catch (InvalidTopologyException ex) {
LOG.error("The specified topology is invalid: " + ex);
} catch (FrameworkException ex) {
- LOG.error("The topology was unable to load a dependent framework: {}", ex);
+ LOG.error("The topology was unable to load a dependent framework:", ex);
} catch (Exception ex) {
- LOG.error("The topology threw an uncaught exception: {}", ex);
+ LOG.error("The topology threw an uncaught exception:", ex);
}
}
diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/Cluster.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/Cluster.java
index d0f2a95..ec98a5c 100644
--- a/streamflow-core/streamflow-model/src/main/java/streamflow/model/Cluster.java
+++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/Cluster.java
@@ -30,7 +30,7 @@ public class Cluster implements Serializable {
private Integer nimbusPort = 6627;
- private String version = "0.9.1";
+ private String version = "0.9.5";
private String logServerHost = "localhost";
diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/MonitorConfig.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/MonitorConfig.java
new file mode 100644
index 0000000..0c911d2
--- /dev/null
+++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/MonitorConfig.java
@@ -0,0 +1,91 @@
+/**
+ * Copyright 2014 Lockheed Martin Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package streamflow.model.config;
+
+import com.fasterxml.jackson.annotation.JsonAnyGetter;
+import com.fasterxml.jackson.annotation.JsonAnySetter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MonitorConfig implements Serializable {
+
+ static Logger LOG = LoggerFactory.getLogger(MonitorConfig.class);
+
+ private boolean enabled = false;
+
+ private int pollingInterval = 60;
+
+ public MonitorConfig() {
+ }
+
+ public boolean isEnabled() {
+ return Boolean.parseBoolean(
+ System.getProperty("monitor.enabled", Boolean.toString(enabled)));
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public int getPollingInterval() {
+ if (System.getProperty("monitor.pollingInterval") != null) {
+ try {
+ pollingInterval = Integer.parseInt(System.getProperty("monitor.pollingInterval"));
+ } catch (Exception ex) {
+ }
+ }
+ return pollingInterval;
+ }
+
+ public void setPollingInterval(int pollingInterval) {
+ this.pollingInterval = pollingInterval;
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 3;
+ hash = 43 * hash + (this.enabled ? 1 : 0);
+ hash = 43 * hash + this.pollingInterval;
+ return hash;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final MonitorConfig other = (MonitorConfig) obj;
+ if (this.enabled != other.enabled) {
+ return false;
+ }
+ if (this.pollingInterval != other.pollingInterval) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "MonitorConfig{" + "enabled=" + enabled + ", pollingInterval=" + pollingInterval + '}';
+ }
+}
diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/StreamflowConfig.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/StreamflowConfig.java
index 9756c9d..76dc7d0 100644
--- a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/StreamflowConfig.java
+++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/StreamflowConfig.java
@@ -34,7 +34,9 @@ public class StreamflowConfig implements Serializable {
private LoggerConfig logger = new LoggerConfig();
private AuthConfig auth = new AuthConfig();
-
+
+ private MonitorConfig monitor = new MonitorConfig();
+
private LocalClusterConfig localCluster = new LocalClusterConfig();
private List clusters = new ArrayList<>();
@@ -84,6 +86,14 @@ public void setAuth(AuthConfig auth) {
this.auth = auth;
}
+ public MonitorConfig getMonitor() {
+ return monitor;
+ }
+
+ public void setMonitor(MonitorConfig monitor) {
+ this.monitor = monitor;
+ }
+
public LocalClusterConfig getLocalCluster() {
return localCluster;
}
@@ -116,6 +126,7 @@ public int hashCode() {
hash = 29 * hash + (this.datastore != null ? this.datastore.hashCode() : 0);
hash = 29 * hash + (this.logger != null ? this.logger.hashCode() : 0);
hash = 29 * hash + (this.auth != null ? this.auth.hashCode() : 0);
+ hash = 29 * hash + (this.monitor != null ? this.monitor.hashCode() : 0);
hash = 29 * hash + (this.localCluster != null ? this.localCluster.hashCode() : 0);
hash = 29 * hash + (this.clusters != null ? this.clusters.hashCode() : 0);
return hash;
@@ -150,6 +161,10 @@ public boolean equals(Object obj) {
|| !this.auth.equals(other.auth))) {
return false;
}
+ if (this.monitor != other.monitor && (this.monitor == null
+ || !this.monitor.equals(other.monitor))) {
+ return false;
+ }
if (this.localCluster != other.localCluster && (this.localCluster == null
|| !this.localCluster.equals(other.localCluster))) {
return false;
@@ -165,7 +180,7 @@ public boolean equals(Object obj) {
public String toString() {
return "StreamFlowConfig{" + "server=" + server + ", proxy=" + proxy
+ ", datastore=" + datastore + ", logger=" + logger
- + ", auth=" + auth + ", localCluster=" + localCluster
+ + ", auth=" + auth + ", monitor=" + monitor + ", localCluster=" + localCluster
+ ", clusters=" + clusters + '}';
}
}
diff --git a/streamflow-core/streamflow-server/src/main/java/streamflow/server/config/GuavaServiceModule.java b/streamflow-core/streamflow-server/src/main/java/streamflow/server/config/GuavaServiceModule.java
new file mode 100644
index 0000000..e78f1cb
--- /dev/null
+++ b/streamflow-core/streamflow-server/src/main/java/streamflow/server/config/GuavaServiceModule.java
@@ -0,0 +1,23 @@
+package streamflow.server.config;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.Service;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import streamflow.model.config.MonitorConfig;
+import streamflow.model.config.StreamflowConfig;
+import streamflow.server.service.TopologyMonitorService;
+import streamflow.util.config.ConfigLoader;
+
+import java.util.Set;
+
+public class GuavaServiceModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ MonitorConfig monitorConfig = ConfigLoader.getConfig().getMonitor();
+ if (monitorConfig.isEnabled()) {
+ bind(TopologyMonitorService.class);
+ }
+ }
+}
diff --git a/streamflow-core/streamflow-server/src/main/java/streamflow/server/config/WebConfig.java b/streamflow-core/streamflow-server/src/main/java/streamflow/server/config/WebConfig.java
index f283d20..cd75871 100644
--- a/streamflow-core/streamflow-server/src/main/java/streamflow/server/config/WebConfig.java
+++ b/streamflow-core/streamflow-server/src/main/java/streamflow/server/config/WebConfig.java
@@ -15,14 +15,18 @@
*/
package streamflow.server.config;
+//import com.google.common.util.concurrent.ServiceManager;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceServletContextListener;
import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
import javax.servlet.annotation.WebListener;
+
+import org.apache.storm.guava.util.concurrent.ServiceManager;
import streamflow.datastore.config.DatastoreModule;
import streamflow.engine.config.EngineModule;
+import streamflow.server.service.TopologyMonitorService;
import streamflow.service.config.ServiceModule;
import streamflow.util.config.ConfigModule;
import org.apache.shiro.guice.web.ShiroWebModule;
@@ -39,9 +43,17 @@ protected Injector getInjector() {
StreamflowEnvironment.setStreamflowHome(System.getenv("STREAMFLOW_HOME"));
StreamflowEnvironment.initialize();
- return Guice.createInjector(new ConfigModule(), new DatastoreModule(),
- new ServiceModule(), new EngineModule(), new JerseyModule(),
+ Injector injector = Guice.createInjector(new ConfigModule(), new DatastoreModule(),
+ new ServiceModule(), new GuavaServiceModule(), new EngineModule(), new JerseyModule(),
new SecurityModule(servletContext), ShiroWebModule.guiceFilterModule());
+
+ // Initialize the service manager to manage daemon services
+ //ServiceManager manager = injector.getInstance(ServiceManager.class);
+ //manager.startAsync().awaitHealthy();
+ TopologyMonitorService topologyMonitorService = injector.getInstance(TopologyMonitorService.class);
+ topologyMonitorService.startAsync().awaitRunning();
+
+ return injector;
}
@Override
diff --git a/streamflow-core/streamflow-server/src/main/java/streamflow/server/service/TopologyMonitorService.java b/streamflow-core/streamflow-server/src/main/java/streamflow/server/service/TopologyMonitorService.java
new file mode 100644
index 0000000..cf4670a
--- /dev/null
+++ b/streamflow-core/streamflow-server/src/main/java/streamflow/server/service/TopologyMonitorService.java
@@ -0,0 +1,71 @@
+package streamflow.server.service;
+
+import com.google.common.util.concurrent.AbstractScheduledService;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import streamflow.model.Topology;
+import streamflow.model.config.MonitorConfig;
+import streamflow.service.TopologyService;
+
+import java.util.concurrent.TimeUnit;
+
+@Singleton
+public class TopologyMonitorService extends AbstractScheduledService {
+
+ public static final Logger LOG = LoggerFactory.getLogger(TopologyMonitorService.class);
+
+ private TopologyService topologyService;
+ private MonitorConfig monitorConfig;
+
+ @Inject
+ public TopologyMonitorService(TopologyService topologyService, MonitorConfig monitorConfig) {
+ this.topologyService = topologyService;
+ this.monitorConfig = monitorConfig;
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ LOG.info("Topology Status Monitor Started...");
+ }
+
+ @Override
+ protected void runOneIteration() throws Exception {
+ // Iterate over all of the topologies for each user to check live status
+ for (Topology topology : topologyService.listAllTopologies()) {
+ try {
+ // Get the current live status of the topology
+ String topologyStatusDesired = topology.getStatus();
+ String topologyStatusActual = topologyService.getTopology(topology.getId(), topology.getUserId()).getStatus();
+
+ if (topologyStatusDesired.equalsIgnoreCase("ACTIVE")) {
+ // Topology should be submitted, but isn't active so resubmit to desired state
+ if (!topologyStatusActual.equalsIgnoreCase("ACTIVE")) {
+ LOG.warn("Topology has a desired state of ACTIVE but is not currently deployed. "
+ + "Redeploying topology... ID = " + topology.getId() + ", Name = " + topology.getName());
+
+ // Resubmit the topology using the same settings as originally submitted
+ Topology submittedTopology = topologyService.submitTopology(
+ topology.getId(), topology.getUserId(), topology.getClusterId(),
+ topology.getLogLevel(), topology.getClassLoaderPolicy());
+
+ if (topology != null && topology.getStatus().equalsIgnoreCase("ACTIVE")) {
+ LOG.info("Topology redeploy succeeded: ID = " + topology.getId() + ", Name = " + topology.getName());
+ } else {
+ LOG.error("Topology redeploy failed: ID = " + topology.getId() + ", Name = " + topology.getName());
+ }
+ }
+ }
+ } catch (Exception ex) {
+ LOG.error("An exception occurred while checking topology status: ID = "
+ + topology.getId() + ", Name = " + topology.getName(), ex);
+ }
+ }
+ }
+
+ @Override
+ protected Scheduler scheduler() {
+ return Scheduler.newFixedRateSchedule(monitorConfig.getPollingInterval(), monitorConfig.getPollingInterval(), TimeUnit.SECONDS);
+ }
+}
diff --git a/streamflow-core/streamflow-server/src/main/webapp/app/topology/topology.js b/streamflow-core/streamflow-server/src/main/webapp/app/topology/topology.js
index 969eb3b..1fa95c3 100644
--- a/streamflow-core/streamflow-server/src/main/webapp/app/topology/topology.js
+++ b/streamflow-core/streamflow-server/src/main/webapp/app/topology/topology.js
@@ -554,7 +554,9 @@ topologyModule.controller('TopologyPropertiesNewController', [
'storm.messaging.netty.max_wait_ms',
'storm.messaging.netty.min_wait_ms',
'storm.messaging.netty.transfer.batch.size',
+ 'storm.messaging.netty.socket.backlog',
'storm.messaging.netty.flush.check.interval.ms',
+ 'storm.messaging.netty.authentication',
'topology.enable.message.timeouts',
'topology.debug',
'topology.workers',
@@ -584,8 +586,10 @@ topologyModule.controller('TopologyPropertiesNewController', [
'topology.kryo.factory',
'topology.tuple.serializer',
'topology.trident.batch.emit.interval.millis',
+ 'storm.group.mapping.service.cache.duration.secs',
'topology.classpath',
- 'topology.environment'
+ 'topology.environment',
+ 'topology.bolts.outgoing.overflow.buffer.enable'
];
$scope.add = function() {
diff --git a/streamflow-core/streamflow-service/src/main/java/streamflow/service/FrameworkService.java b/streamflow-core/streamflow-service/src/main/java/streamflow/service/FrameworkService.java
index 1333051..af854a6 100644
--- a/streamflow-core/streamflow-service/src/main/java/streamflow/service/FrameworkService.java
+++ b/streamflow-core/streamflow-service/src/main/java/streamflow/service/FrameworkService.java
@@ -1,17 +1,15 @@
/**
* Copyright 2014 Lockheed Martin Corporation
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
*/
package streamflow.service;
@@ -23,7 +21,6 @@
import java.util.Date;
import java.util.Enumeration;
import java.util.List;
-import java.util.UUID;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.zip.ZipEntry;
@@ -156,182 +153,233 @@ public byte[] getFrameworkJar(String frameworkId) {
return frameworkJarContent;
}
-
+
/**
* Process the annotations found in a framework jar
+ *
* @param jarFile
* @return a FrameworkConfig or null if no annotations were found
*/
- public FrameworkConfig processFrameworkAnnotations(File jarFile){
- FrameworkConfig config = new FrameworkConfig();
- ArrayList components = new ArrayList();
- String frameworkLevel = null;
- boolean foundFrameworkAnnotations = false;
- ZipFile zipFile = null;
-
- try {
- zipFile = new ZipFile(jarFile);
-
- Enumeration extends ZipEntry> entries = zipFile.entries();
- while(entries.hasMoreElements()){
- ZipEntry entry = entries.nextElement();
- String entryName = entry.getName();
- if(entry.isDirectory()){
- if(frameworkLevel != null){
- if(entryName.startsWith(frameworkLevel) == false) frameworkLevel = null;
- }
- ZipEntry packageInfoEntry = zipFile.getEntry(entryName+"package-info.class");
- if(packageInfoEntry != null){
- InputStream fileInputStream = zipFile.getInputStream(packageInfoEntry);
- DataInputStream dstream = new DataInputStream(fileInputStream);
- ClassFile cf = new ClassFile(dstream);
- String cfName = cf.getName();
- AnnotationsAttribute attr = (AnnotationsAttribute)cf.getAttribute(AnnotationsAttribute.visibleTag);
- Annotation annotation = attr.getAnnotation("streamflow.annotations.Framework");
- if(annotation == null) continue;
-
- frameworkLevel = cfName;
- foundFrameworkAnnotations = true;
- StringMemberValue frameworkLabel = (StringMemberValue) annotation.getMemberValue("label");
- if(frameworkLabel != null) config.setLabel(frameworkLabel.getValue());
- StringMemberValue frameworkName = (StringMemberValue) annotation.getMemberValue("name");
- if(frameworkName != null) config.setName(frameworkName.getValue());
- StringMemberValue frameworkVersion = (StringMemberValue) annotation.getMemberValue("version");
- if(frameworkVersion != null) config.setVersion(frameworkVersion.getValue());
-
- Annotation descriptionAnnotation = attr.getAnnotation("streamflow.annotations.Description");
- if(descriptionAnnotation != null){
- StringMemberValue frameworkDescription = (StringMemberValue) descriptionAnnotation.getMemberValue("value");
- if(frameworkDescription != null) config.setDescription(frameworkDescription.getValue());
- }
-
-
- }
- } else if(frameworkLevel != null && entryName.endsWith(".class") && entryName.endsWith("package-info.class") == false){
- ZipEntry packageInfoEntry = zipFile.getEntry(entryName);
- InputStream fileInputStream = zipFile.getInputStream(packageInfoEntry);
- DataInputStream dstream = new DataInputStream(fileInputStream);
- ClassFile cf = new ClassFile(dstream);
- String cfName = cf.getName();
- AnnotationsAttribute attr = (AnnotationsAttribute)cf.getAttribute(AnnotationsAttribute.visibleTag);
- if(attr == null) continue;
- Annotation componentAnnotation = attr.getAnnotation("streamflow.annotations.Component");
-
- if(componentAnnotation == null) continue;
-
- ComponentConfig component = new ComponentConfig();
- component.setMainClass(cf.getName());
- StringMemberValue componentLabel = (StringMemberValue) componentAnnotation.getMemberValue("label");
- if(componentLabel != null) component.setLabel(componentLabel.getValue());
- StringMemberValue componentName = (StringMemberValue) componentAnnotation.getMemberValue("name");
- if(componentName != null) component.setName(componentName.getValue());
- StringMemberValue componentType = (StringMemberValue) componentAnnotation.getMemberValue("type");
- if(componentType != null) component.setType(componentType.getValue());
- StringMemberValue componentIcon = (StringMemberValue) componentAnnotation.getMemberValue("icon");
- if(componentIcon != null) component.setIcon(componentIcon.getValue());
-
- Annotation componentDescriptionAnnotation = attr.getAnnotation("streamflow.annotations.Description");
- if(componentDescriptionAnnotation != null){
- StringMemberValue componentDescription = (StringMemberValue) componentDescriptionAnnotation.getMemberValue("value");
- if(componentDescription != null) component.setDescription(componentDescription.getValue());
- }
-
- Annotation componentInputsAnnotation = attr.getAnnotation("streamflow.annotations.ComponentInputs");
- if(componentInputsAnnotation != null){
- ArrayList inputs = new ArrayList();
- ArrayMemberValue componentInputs = (ArrayMemberValue) componentInputsAnnotation.getMemberValue("value");
- for(MemberValue value : componentInputs.getValue()){
- AnnotationMemberValue annotationMember = (AnnotationMemberValue)value;
- Annotation annotationValue = annotationMember.getValue();
- StringMemberValue keyAnnotationValue = (StringMemberValue) annotationValue.getMemberValue("key");
- StringMemberValue descriptionAnnotationValue = (StringMemberValue) annotationValue.getMemberValue("description");
- ComponentInterface inputInterface = new ComponentInterface();
- if(keyAnnotationValue != null) inputInterface.setKey(keyAnnotationValue.getValue());
- if(descriptionAnnotationValue != null) inputInterface.setDescription(descriptionAnnotationValue.getValue());
- inputs.add(inputInterface);
- }
-
- component.setInputs(inputs);
- }
-
- Annotation componentOutputsAnnotation = attr.getAnnotation("streamflow.annotations.ComponentOutputs");
- if(componentOutputsAnnotation != null){
- ArrayList outputs = new ArrayList();
- ArrayMemberValue componentOutputs = (ArrayMemberValue) componentOutputsAnnotation.getMemberValue("value");
- for(MemberValue value : componentOutputs.getValue()){
- AnnotationMemberValue annotationMember = (AnnotationMemberValue)value;
- Annotation annotationValue = annotationMember.getValue();
- StringMemberValue keyAnnotationValue = (StringMemberValue) annotationValue.getMemberValue("key");
- StringMemberValue descriptionAnnotationValue = (StringMemberValue) annotationValue.getMemberValue("description");
- ComponentInterface outputInterface = new ComponentInterface();
- if(keyAnnotationValue != null) outputInterface.setKey(keyAnnotationValue.getValue());
- if(descriptionAnnotationValue != null) outputInterface.setDescription(descriptionAnnotationValue.getValue());
- outputs.add(outputInterface);
- }
-
- component.setOutputs(outputs);
- }
-
-
- List memberMethods = cf.getMethods();
- if(memberMethods != null){
- ArrayList properties = new ArrayList();
-
- for(MethodInfo method : memberMethods){
- AnnotationsAttribute methodAttr = (AnnotationsAttribute) method.getAttribute(AnnotationsAttribute.visibleTag);
- if(methodAttr == null) continue;
- Annotation propertyAnnotation = methodAttr.getAnnotation("streamflow.annotations.ComponentProperty");
- if(propertyAnnotation == null) continue;
-
- ComponentProperty property = new ComponentProperty();
-
- StringMemberValue propertyName = (StringMemberValue) propertyAnnotation.getMemberValue("name");
- if(propertyName != null) property.setName(propertyName.getValue());
- StringMemberValue propertylabel = (StringMemberValue) propertyAnnotation.getMemberValue("label");
- if(propertylabel != null) property.setLabel(propertylabel.getValue());
- StringMemberValue propertyType = (StringMemberValue) propertyAnnotation.getMemberValue("type");
- if(propertyType != null) property.setType(propertyType.getValue());
- StringMemberValue propertyDefaultValue = (StringMemberValue) propertyAnnotation.getMemberValue("defaultValue");
- if(propertyDefaultValue != null) property.setDefaultValue(propertyDefaultValue.getValue());
- BooleanMemberValue propertyRequired = (BooleanMemberValue)propertyAnnotation.getMemberValue("required");
- if(propertyRequired != null) property.setRequired(propertyRequired.getValue());
-
- Annotation methodDescriptionAnnotation = methodAttr.getAnnotation("streamflow.annotations.Description");
- if(methodDescriptionAnnotation != null){
- StringMemberValue methodDescription = (StringMemberValue) methodDescriptionAnnotation.getMemberValue("value");
- if(methodDescription != null) property.setDescription(methodDescription.getValue());
- }
- properties.add(property);
- }
- component.setProperties(properties);
-
- }
-
- components.add(component);
- }
- }
-
- config.setComponents(components);
-
- // return null if no framework annotations were located
- if(foundFrameworkAnnotations == false) return null;
-
- return config;
- } catch (IOException ex){
+ public FrameworkConfig processFrameworkAnnotations(File jarFile) {
+ FrameworkConfig config = new FrameworkConfig();
+ ArrayList components = new ArrayList();
+ String frameworkLevel = null;
+ boolean foundFrameworkAnnotations = false;
+ ZipFile zipFile = null;
+
+ try {
+ zipFile = new ZipFile(jarFile);
+
+ Enumeration extends ZipEntry> entries = zipFile.entries();
+ while (entries.hasMoreElements()) {
+ ZipEntry entry = entries.nextElement();
+ String entryName = entry.getName();
+ if (entry.isDirectory()) {
+ if (frameworkLevel != null) {
+ if (entryName.startsWith(frameworkLevel) == false) {
+ frameworkLevel = null;
+ }
+ }
+ ZipEntry packageInfoEntry = zipFile.getEntry(entryName + "package-info.class");
+ if (packageInfoEntry != null) {
+ InputStream fileInputStream = zipFile.getInputStream(packageInfoEntry);
+ DataInputStream dstream = new DataInputStream(fileInputStream);
+ ClassFile cf = new ClassFile(dstream);
+ String cfName = cf.getName();
+ AnnotationsAttribute attr = (AnnotationsAttribute) cf.getAttribute(AnnotationsAttribute.visibleTag);
+ Annotation annotation = attr.getAnnotation("streamflow.annotations.Framework");
+ if (annotation == null) {
+ continue;
+ }
+
+ frameworkLevel = cfName;
+ foundFrameworkAnnotations = true;
+ StringMemberValue frameworkLabel = (StringMemberValue) annotation.getMemberValue("label");
+ if (frameworkLabel != null) {
+ config.setLabel(frameworkLabel.getValue());
+ }
+ StringMemberValue frameworkName = (StringMemberValue) annotation.getMemberValue("name");
+ if (frameworkName != null) {
+ config.setName(frameworkName.getValue());
+ }
+ StringMemberValue frameworkVersion = (StringMemberValue) annotation.getMemberValue("version");
+ if (frameworkVersion != null) {
+ config.setVersion(frameworkVersion.getValue());
+ }
+
+ Annotation descriptionAnnotation = attr.getAnnotation("streamflow.annotations.Description");
+ if (descriptionAnnotation != null) {
+ StringMemberValue frameworkDescription = (StringMemberValue) descriptionAnnotation.getMemberValue("value");
+ if (frameworkDescription != null) {
+ config.setDescription(frameworkDescription.getValue());
+ }
+ }
+
+ }
+ } else if (frameworkLevel != null && entryName.endsWith(".class") && entryName.endsWith("package-info.class") == false) {
+ ZipEntry packageInfoEntry = zipFile.getEntry(entryName);
+ InputStream fileInputStream = zipFile.getInputStream(packageInfoEntry);
+ DataInputStream dstream = new DataInputStream(fileInputStream);
+ ClassFile cf = new ClassFile(dstream);
+ String cfName = cf.getName();
+ AnnotationsAttribute attr = (AnnotationsAttribute) cf.getAttribute(AnnotationsAttribute.visibleTag);
+ if (attr == null) {
+ continue;
+ }
+ Annotation componentAnnotation = attr.getAnnotation("streamflow.annotations.Component");
+
+ if (componentAnnotation == null) {
+ continue;
+ }
+
+ ComponentConfig component = new ComponentConfig();
+ component.setMainClass(cf.getName());
+ StringMemberValue componentLabel = (StringMemberValue) componentAnnotation.getMemberValue("label");
+ if (componentLabel != null) {
+ component.setLabel(componentLabel.getValue());
+ }
+ StringMemberValue componentName = (StringMemberValue) componentAnnotation.getMemberValue("name");
+ if (componentName != null) {
+ component.setName(componentName.getValue());
+ }
+ StringMemberValue componentType = (StringMemberValue) componentAnnotation.getMemberValue("type");
+ if (componentType != null) {
+ component.setType(componentType.getValue());
+ }
+ StringMemberValue componentIcon = (StringMemberValue) componentAnnotation.getMemberValue("icon");
+ if (componentIcon != null) {
+ component.setIcon(componentIcon.getValue());
+ }
+
+ Annotation componentDescriptionAnnotation = attr.getAnnotation("streamflow.annotations.Description");
+ if (componentDescriptionAnnotation != null) {
+ StringMemberValue componentDescription = (StringMemberValue) componentDescriptionAnnotation.getMemberValue("value");
+ if (componentDescription != null) {
+ component.setDescription(componentDescription.getValue());
+ }
+ }
+
+ Annotation componentInputsAnnotation = attr.getAnnotation("streamflow.annotations.ComponentInputs");
+ if (componentInputsAnnotation != null) {
+ ArrayList inputs = new ArrayList();
+ ArrayMemberValue componentInputs = (ArrayMemberValue) componentInputsAnnotation.getMemberValue("value");
+ for (MemberValue value : componentInputs.getValue()) {
+ AnnotationMemberValue annotationMember = (AnnotationMemberValue) value;
+ Annotation annotationValue = annotationMember.getValue();
+ StringMemberValue keyAnnotationValue = (StringMemberValue) annotationValue.getMemberValue("key");
+ StringMemberValue descriptionAnnotationValue = (StringMemberValue) annotationValue.getMemberValue("description");
+ ComponentInterface inputInterface = new ComponentInterface();
+ if (keyAnnotationValue != null) {
+ inputInterface.setKey(keyAnnotationValue.getValue());
+ }
+ if (descriptionAnnotationValue != null) {
+ inputInterface.setDescription(descriptionAnnotationValue.getValue());
+ }
+ inputs.add(inputInterface);
+ }
+
+ component.setInputs(inputs);
+ }
+
+ Annotation componentOutputsAnnotation = attr.getAnnotation("streamflow.annotations.ComponentOutputs");
+ if (componentOutputsAnnotation != null) {
+ ArrayList outputs = new ArrayList();
+ ArrayMemberValue componentOutputs = (ArrayMemberValue) componentOutputsAnnotation.getMemberValue("value");
+ for (MemberValue value : componentOutputs.getValue()) {
+ AnnotationMemberValue annotationMember = (AnnotationMemberValue) value;
+ Annotation annotationValue = annotationMember.getValue();
+ StringMemberValue keyAnnotationValue = (StringMemberValue) annotationValue.getMemberValue("key");
+ StringMemberValue descriptionAnnotationValue = (StringMemberValue) annotationValue.getMemberValue("description");
+ ComponentInterface outputInterface = new ComponentInterface();
+ if (keyAnnotationValue != null) {
+ outputInterface.setKey(keyAnnotationValue.getValue());
+ }
+ if (descriptionAnnotationValue != null) {
+ outputInterface.setDescription(descriptionAnnotationValue.getValue());
+ }
+ outputs.add(outputInterface);
+ }
+
+ component.setOutputs(outputs);
+ }
+
+ List memberMethods = cf.getMethods();
+ if (memberMethods != null) {
+ ArrayList properties = new ArrayList();
+
+ for (MethodInfo method : memberMethods) {
+ AnnotationsAttribute methodAttr = (AnnotationsAttribute) method.getAttribute(AnnotationsAttribute.visibleTag);
+ if (methodAttr == null) {
+ continue;
+ }
+ Annotation propertyAnnotation = methodAttr.getAnnotation("streamflow.annotations.ComponentProperty");
+ if (propertyAnnotation == null) {
+ continue;
+ }
+
+ ComponentProperty property = new ComponentProperty();
+
+ StringMemberValue propertyName = (StringMemberValue) propertyAnnotation.getMemberValue("name");
+ if (propertyName != null) {
+ property.setName(propertyName.getValue());
+ }
+ StringMemberValue propertylabel = (StringMemberValue) propertyAnnotation.getMemberValue("label");
+ if (propertylabel != null) {
+ property.setLabel(propertylabel.getValue());
+ }
+ StringMemberValue propertyType = (StringMemberValue) propertyAnnotation.getMemberValue("type");
+ if (propertyType != null) {
+ property.setType(propertyType.getValue());
+ }
+ StringMemberValue propertyDefaultValue = (StringMemberValue) propertyAnnotation.getMemberValue("defaultValue");
+ if (propertyDefaultValue != null) {
+ property.setDefaultValue(propertyDefaultValue.getValue());
+ }
+ BooleanMemberValue propertyRequired = (BooleanMemberValue) propertyAnnotation.getMemberValue("required");
+ if (propertyRequired != null) {
+ property.setRequired(propertyRequired.getValue());
+ }
+
+ Annotation methodDescriptionAnnotation = methodAttr.getAnnotation("streamflow.annotations.Description");
+ if (methodDescriptionAnnotation != null) {
+ StringMemberValue methodDescription = (StringMemberValue) methodDescriptionAnnotation.getMemberValue("value");
+ if (methodDescription != null) {
+ property.setDescription(methodDescription.getValue());
+ }
+ }
+ properties.add(property);
+ }
+ component.setProperties(properties);
+
+ }
+
+ components.add(component);
+ }
+ }
+
+ config.setComponents(components);
+
+ // return null if no framework annotations were located
+ if (foundFrameworkAnnotations == false) {
+ return null;
+ }
+
+ return config;
+ } catch (IOException ex) {
LOG.error("Error while parsing framework annotations: ", ex);
-
+
throw new EntityInvalidException("Error while parsing framework annotations: "
- + ex.getMessage());
- } finally {
- if(zipFile != null){
- try {
- zipFile.close();
- } catch (IOException e) {
- LOG.error("Error while closing framework zip");
- }
- }
- }
-
+ + ex.getMessage());
+ } finally {
+ if (zipFile != null) {
+ try {
+ zipFile.close();
+ } catch (IOException e) {
+ LOG.error("Error while closing framework zip");
+ }
+ }
+ }
+
}
public FileInfo getFrameworkFileInfo(String frameworkId) {
@@ -348,22 +396,19 @@ public FileInfo getFrameworkFileInfo(String frameworkId) {
public Framework processFrameworkJar(byte[] frameworkJar, boolean isPublic) {
Framework framework = null;
-
+
try {
String frameworkHash = DigestUtils.md5Hex(frameworkJar);
-
+
// Write out a temporary file for the jar so it can be processed
File tempFrameworkFile = new File(StreamflowEnvironment.getFrameworksDir(),
frameworkHash + ".jar");
FileUtils.writeByteArrayToFile(tempFrameworkFile, frameworkJar);
-
+
FrameworkConfig frameworkConfig = processFrameworkConfig(tempFrameworkFile);
if (frameworkConfig != null) {
-
-
-
String frameworkId = frameworkConfig.getName();
// If the framework already exists, delete it first to clear out children
@@ -400,13 +445,13 @@ public Framework processFrameworkJar(byte[] frameworkJar, boolean isPublic) {
LOG.error("Exception while processing the framework jar", ex);
throw new EntityInvalidException(
- "Exception while processing the framework framework: Exception = "
- + ex.getMessage());
+ "Exception while processing the framework framework: Exception = "
+ + ex.getMessage());
}
return framework;
}
-
+
public String storeFrameworkJar(byte[] frameworkJar) {
FileInfo frameworkFile = new FileInfo();
frameworkFile.setFileName(IDUtils.randomUUID());
@@ -420,18 +465,18 @@ public String storeFrameworkJar(byte[] frameworkJar) {
if (frameworkFile == null) {
throw new ServiceException("Unable to save framework jar file");
}
-
+
return frameworkFile.getId();
}
-
+
public FrameworkConfig processFrameworkConfig(File tempFrameworkFile) {
FrameworkConfig frameworkConfig = null;
try {
JarFile frameworkJarFile = new JarFile(tempFrameworkFile.getAbsoluteFile());
-
+
JarEntry frameworkYamlEntry = frameworkJarFile.getJarEntry("STREAMFLOW-INF/framework.yml");
-
+
JarEntry frameworkJsonEntry = frameworkJarFile.getJarEntry("STREAMFLOW-INF/framework.json");
if (frameworkYamlEntry != null) {
@@ -444,27 +489,27 @@ public FrameworkConfig processFrameworkConfig(File tempFrameworkFile) {
} else if (frameworkJsonEntry != null) {
String frameworkJson = IOUtils.toString(
frameworkJarFile.getInputStream(frameworkJsonEntry));
-
+
// Attempt to deserialize the inbuilt streams-framework.json
frameworkConfig = jsonMapper.readValue(
frameworkJson, FrameworkConfig.class);
} else {
- frameworkConfig = processFrameworkAnnotations(tempFrameworkFile);
- if(frameworkConfig == null){
- throw new EntityInvalidException(
- "The framework configuration file was not found in the framework jar");
- }
+ frameworkConfig = processFrameworkAnnotations(tempFrameworkFile);
+ if (frameworkConfig == null) {
+ throw new EntityInvalidException(
+ "The framework configuration file was not found in the framework jar");
+ }
}
} catch (IOException ex) {
LOG.error("Error while loaded the framework configuration: ", ex);
-
+
throw new EntityInvalidException("Error while loading the framework configuration: "
- + ex.getMessage());
+ + ex.getMessage());
}
-
+
return frameworkConfig;
}
-
+
public void processFrameworkComponents(Framework framework, FrameworkConfig frameworkConfig, File frameworkFile) {
for (ComponentConfig componentConfig : frameworkConfig.getComponents()) {
Component component = new Component();
@@ -480,7 +525,7 @@ public void processFrameworkComponents(Framework framework, FrameworkConfig fram
componentService.addComponent(component);
}
}
-
+
public void processFrameworkResources(Framework framework, FrameworkConfig frameworkConfig) {
for (ResourceConfig resourceConfig : frameworkConfig.getResources()) {
Resource resource = new Resource();
@@ -495,7 +540,7 @@ public void processFrameworkResources(Framework framework, FrameworkConfig frame
resourceService.addResource(resource);
}
}
-
+
public void processFrameworkSerializations(Framework framework, FrameworkConfig frameworkConfig) {
// Keep track of the order or the serializations specified in the config
int serializationPriority = 0;
@@ -513,7 +558,7 @@ public void processFrameworkSerializations(Framework framework, FrameworkConfig
serializationService.addSerialization(serialization);
}
}
-
+
public String loadFrameworkComponentIcon(ComponentConfig componentConfig, File frameworkFile) {
String iconId = null;
byte[] iconData = null;
@@ -521,7 +566,7 @@ public String loadFrameworkComponentIcon(ComponentConfig componentConfig, File f
if (componentConfig.getIcon() != null) {
try {
JarFile frameworkJarFile = new JarFile(frameworkFile);
-
+
JarEntry iconEntry = frameworkJarFile.getJarEntry(componentConfig.getIcon());
if (iconEntry != null) {
iconData = IOUtils.toByteArray(frameworkJarFile.getInputStream(iconEntry));
@@ -535,31 +580,31 @@ public String loadFrameworkComponentIcon(ComponentConfig componentConfig, File f
try {
if (componentConfig.getType().equalsIgnoreCase(Component.STORM_SPOUT_TYPE)) {
iconData = IOUtils.toByteArray(Thread.currentThread()
- .getContextClassLoader().getResourceAsStream("icons/storm-spout.png"));
+ .getContextClassLoader().getResourceAsStream("icons/storm-spout.png"));
} else if (componentConfig.getType().equalsIgnoreCase(Component.STORM_BOLT_TYPE)) {
iconData = IOUtils.toByteArray(Thread.currentThread()
- .getContextClassLoader().getResourceAsStream("icons/storm-bolt.png"));
+ .getContextClassLoader().getResourceAsStream("icons/storm-bolt.png"));
} else {
iconData = IOUtils.toByteArray(Thread.currentThread()
- .getContextClassLoader().getResourceAsStream("icons/storm-trident.png"));
+ .getContextClassLoader().getResourceAsStream("icons/storm-trident.png"));
}
} catch (IOException ex) {
LOG.error("Error occurred while loading the default component icon: ", ex);
}
}
-
+
if (iconData != null) {
FileInfo iconFile = new FileInfo();
iconFile.setFileName(iconFile.getFileName());
iconFile.setFileType("image/png");
iconFile.setFileSize(iconData.length);
iconFile.setContentHash(DigestUtils.md5Hex(iconData));
-
+
iconFile = fileService.saveFile(iconFile, iconData);
iconId = iconFile.getId();
}
-
+
return iconId;
}
}
diff --git a/streamflow-core/streamflow-service/src/main/java/streamflow/service/LogService.java b/streamflow-core/streamflow-service/src/main/java/streamflow/service/LogService.java
index 5744b7a..95a1cda 100644
--- a/streamflow-core/streamflow-service/src/main/java/streamflow/service/LogService.java
+++ b/streamflow-core/streamflow-service/src/main/java/streamflow/service/LogService.java
@@ -166,7 +166,9 @@ public void clearTopologyLog(Topology topology, Cluster cluster) {
try {
// Delete the local log file from the server
- FileUtils.forceDelete(logFile);
+ if (logFile.exists()) {
+ FileUtils.forceDelete(logFile);
+ }
} catch (IOException ex) {
LOG.error("Error deleting local topology log file: " + logFile.getAbsolutePath());
}
diff --git a/streamflow-core/streamflow-service/src/main/java/streamflow/service/TopologyService.java b/streamflow-core/streamflow-service/src/main/java/streamflow/service/TopologyService.java
index 1f7eaa7..77e0ef4 100644
--- a/streamflow-core/streamflow-service/src/main/java/streamflow/service/TopologyService.java
+++ b/streamflow-core/streamflow-service/src/main/java/streamflow/service/TopologyService.java
@@ -112,6 +112,10 @@ public TopologyService(TopologyDao topologyDao, ComponentService componentServic
this.streamflowConfig = streamflowConfig;
}
+ public List listAllTopologies() {
+ return topologyDao.findAll();
+ }
+
public List listTopologies(String userId) {
List topologies = topologyDao.findAll(userId);
@@ -398,7 +402,7 @@ public Topology initializeTopologyObject(Topology topology) {
} catch (ServiceException ex) {
LOG.error("Exception while initializing the topology config object", ex);
- throw new ServiceException("Exception while intializing the "
+ throw new ServiceException("Exception while initializing the "
+ "Topology config object: " + ex.getMessage());
}
}
@@ -418,9 +422,9 @@ public String generateTopologyJar(Topology topology, Cluster cluster) {
jarBuilder.open();
// Keep track of already added dependencies
- HashSet frameworkDependencies = new HashSet();
+ HashSet frameworkDependencies = new HashSet<>();
- HashSet processedSerializations = new HashSet();
+ HashSet processedSerializations = new HashSet<>();
TopologyConfig topologyConfig = topology.getDeployedConfig();
@@ -663,7 +667,7 @@ private void clearTopologyProject(String projectId) {
FileUtils.forceDelete(new File(StreamflowEnvironment.getTopologiesDir(),
projectId + ".jar"));
} catch (IOException ex) {
- LOG.error("Exception while clearing the topology project: ", ex);
+ //LOG.error("Exception while clearing the topology project: ", ex);
}
}
}
diff --git a/streamflow-core/streamflow-service/src/main/resources/icons/storm-bolt.png b/streamflow-core/streamflow-service/src/main/resources/icons/storm-bolt.png
index 57e410e..7f2e842 100644
Binary files a/streamflow-core/streamflow-service/src/main/resources/icons/storm-bolt.png and b/streamflow-core/streamflow-service/src/main/resources/icons/storm-bolt.png differ
diff --git a/streamflow-core/streamflow-service/src/main/resources/icons/storm-spout.png b/streamflow-core/streamflow-service/src/main/resources/icons/storm-spout.png
index 11d8d57..7ce1401 100644
Binary files a/streamflow-core/streamflow-service/src/main/resources/icons/storm-spout.png and b/streamflow-core/streamflow-service/src/main/resources/icons/storm-spout.png differ
diff --git a/streamflow-core/streamflow-util/src/main/java/streamflow/util/config/ConfigModule.java b/streamflow-core/streamflow-util/src/main/java/streamflow/util/config/ConfigModule.java
index 5feeb81..5927608 100644
--- a/streamflow-core/streamflow-util/src/main/java/streamflow/util/config/ConfigModule.java
+++ b/streamflow-core/streamflow-util/src/main/java/streamflow/util/config/ConfigModule.java
@@ -16,15 +16,9 @@
package streamflow.util.config;
import com.google.inject.AbstractModule;
-import streamflow.model.config.AuthConfig;
-import streamflow.model.config.DatastoreConfig;
-import streamflow.model.config.StreamflowConfig;
-import streamflow.model.config.LoggerConfig;
-import streamflow.model.config.ProxyConfig;
-import streamflow.model.config.ServerConfig;
+import streamflow.model.config.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import streamflow.model.config.LocalClusterConfig;
public class ConfigModule extends AbstractModule {
@@ -37,6 +31,7 @@ protected void configure() {
bind(StreamflowConfig.class).toInstance(streamflowConfig);
bind(ServerConfig.class).toInstance(streamflowConfig.getServer());
bind(AuthConfig.class).toInstance(streamflowConfig.getAuth());
+ bind(MonitorConfig.class).toInstance(streamflowConfig.getMonitor());
bind(ProxyConfig.class).toInstance(streamflowConfig.getProxy());
bind(LoggerConfig.class).toInstance(streamflowConfig.getLogger());
bind(LocalClusterConfig.class).toInstance(streamflowConfig.getLocalCluster());
diff --git a/streamflow-frameworks/twitter-framework/src/main/resources/STREAMFLOW-INF/framework.yml b/streamflow-frameworks/twitter-framework/src/main/resources/STREAMFLOW-INF/framework.yml
new file mode 100644
index 0000000..df29380
--- /dev/null
+++ b/streamflow-frameworks/twitter-framework/src/main/resources/STREAMFLOW-INF/framework.yml
@@ -0,0 +1,44 @@
+name: twitter-framework
+label: Twitter Framework
+version: ${project.version}
+description: Spouts and Bolts supporting Twitter functionality
+
+components:
+ - name: twitter-sample-spout
+ label: Twitter Sample Spout
+ type: storm-spout
+ description: Utilizes Twitter Streaming API to stream of 1% Twitter data for analysis. Twitter OAuth credentials for you application are required for use.
+ mainClass: streamflow.spout.twitter.TwitterSampleSpout
+ icon: icons/twitter.png
+ properties:
+ - name: oauth-consumer-key
+ label: OAuth Consumer Key
+ type: text
+ description: Twitter OAuth Consumer Key
+ defaultValue:
+ required: true
+ - name: oauth-consumer-secret
+ label: OAuth Consumer Secret
+ type: text
+ description: Twitter OAuth Consumer Secret
+ defaultValue:
+ required: true
+ - name: oauth-access-token
+ label: OAuth Access Token
+ type: text
+ description: Twitter OAuth Access Token
+ defaultValue:
+ required: true
+ - name: oauth-access-token-secret
+ label: OAuth Access Token Secret
+ type: text
+ description: Twitter OAuth Access Token Secret
+ defaultValue:
+ required: true
+ outputs:
+ - key: default
+ description: Twitter Status
+
+
+#serializations:
+# - typeClass: twitter4j.Status