diff --git a/heron/cli/src/python/BUILD b/heron/cli/src/python/BUILD index b9d22b5f20e..111ec46baaf 100644 --- a/heron/cli/src/python/BUILD +++ b/heron/cli/src/python/BUILD @@ -13,6 +13,7 @@ pex_library( "jars.py", "kill.py", "opts.py", + "ps.py", "restart.py", "submit.py", "utils.py", diff --git a/heron/cli/src/python/jars.py b/heron/cli/src/python/jars.py index 6cd80c102ba..78450944c72 100644 --- a/heron/cli/src/python/jars.py +++ b/heron/cli/src/python/jars.py @@ -10,7 +10,7 @@ def pick(dirname, pattern): return file_list[0] if file_list else None ################################################################################ -# Get the topology jars - TODO, make the jars independent version free +# Get the topology jars ################################################################################ def topology_jars(): jars = [ @@ -18,6 +18,15 @@ def topology_jars(): ] return jars +################################################################################ +# Get the command jars +################################################################################ +def command_jars(): + jars = [ + os.path.join(utils.get_heron_lib_dir(), "commands", "*") + ] + return jars + ################################################################################ # Get the scheduler jars ################################################################################ diff --git a/heron/cli/src/python/main.py b/heron/cli/src/python/main.py index 3f6d16dc8de..b93fbe2c791 100644 --- a/heron/cli/src/python/main.py +++ b/heron/cli/src/python/main.py @@ -23,6 +23,7 @@ import heron.cli.src.python.activate as activate import heron.cli.src.python.deactivate as deactivate import heron.cli.src.python.kill as kill +import heron.cli.src.python.ps as ps import heron.cli.src.python.restart as restart import heron.cli.src.python.submit as submit import heron.cli.src.python.utils as utils @@ -71,6 +72,7 @@ def create_parser(): deactivate.create_parser(subparsers) help.create_parser(subparsers) kill.create_parser(subparsers) + ps.create_parser(subparsers) restart.create_parser(subparsers) submit.create_parser(subparsers) version.create_parser(subparsers) @@ -93,6 +95,9 @@ def run(command, parser, command_args, unknown_args): elif command == 'restart': return restart.run(command, parser, command_args, unknown_args) + elif command == 'ps': + return ps.run(command, parser, command_args, unknown_args) + elif command == 'submit': return submit.run(command, parser, command_args, unknown_args) diff --git a/heron/cli/src/python/ps.py b/heron/cli/src/python/ps.py new file mode 100644 index 00000000000..b3558a77a0e --- /dev/null +++ b/heron/cli/src/python/ps.py @@ -0,0 +1,70 @@ +#!/usr/bin/python2.7 + +import argparse +import atexit +import base64 +import contextlib +import glob +import logging +import logging.handlers +import os +import shutil +import sys +import subprocess +import tarfile +import tempfile + +from heron.common.src.python.color import Log + +import heron.cli.src.python.args as args +import heron.cli.src.python.execute as execute +import heron.cli.src.python.jars as jars +import heron.cli.src.python.utils as utils + +def create_parser(subparsers): + parser = subparsers.add_parser( + 'ps', + help='List all topologies', + usage = "%(prog)s [options] cluster/[role]/[environ]", + add_help = False) + + args.add_titles(parser) + args.add_cluster_role_env(parser) + + args.add_config(parser) + args.add_verbose(parser) + + parser.set_defaults(subcommand='ps') + return parser + +def run(command, parser, cl_args, unknown_args): + + try: + config_overrides = utils.parse_cmdline_override(cl_args) + + new_args = [ + "--cluster", cl_args['cluster'], + "--role", cl_args['role'], + "--environment", cl_args['environ'], + "--heron_home", utils.get_heron_dir(), + "--config_path", cl_args['config_path'], + "--config_overrides", base64.b64encode(config_overrides), + "--command", "ps", + ] + + lib_jars = utils.get_heron_libs(jars.command_jars() + jars.statemgr_jars()) + + # invoke the runtime manager to kill the topology + execute.heron_class( + 'com.twitter.heron.command.CommandHandlerMain', + lib_jars, + extra_jars=[], + args= new_args + ) + + except Exception as ex: + print ex + Log.error('Failed to get list of topologies') + return False + + return True diff --git a/heron/commands/src/java/BUILD b/heron/commands/src/java/BUILD new file mode 100644 index 00000000000..dca980242c5 --- /dev/null +++ b/heron/commands/src/java/BUILD @@ -0,0 +1,41 @@ +package(default_visibility = ["//visibility:public"]) + +load("/tools/rules/heron_deps", "heron_java_proto_files") + +common_deps_files = [ + "//heron/common/src/java:common-java", + "//3rdparty/commons:commons-cli-java", + "//3rdparty/guava:guava-java", +] + +spi_deps_files = [ + "//heron/spi/src/java:common-spi-java", + "//heron/spi/src/java:statemgr-spi-java", + "//heron/spi/src/java:utils-spi-java", +] + +commands_deps_files = \ + common_deps_files + \ + heron_java_proto_files() + \ + spi_deps_files + +java_library( + name = 'commands-java', + srcs = glob( + ["**/*.java"], + ), + deps = commands_deps_files, +) + +java_binary( + name = 'commands-unshaded', + srcs = glob(["**/*.java"]), + deps = commands_deps_files, +) + +genrule( + name = "heron-commands", + srcs = [":commands-unshaded_deploy.jar"], + outs = ["heron-commands.jar"], + cmd = "cp $< $@", +) diff --git a/heron/commands/src/java/com/twitter/heron/command/CommandHandler.java b/heron/commands/src/java/com/twitter/heron/command/CommandHandler.java new file mode 100644 index 00000000000..b646cdddf60 --- /dev/null +++ b/heron/commands/src/java/com/twitter/heron/command/CommandHandler.java @@ -0,0 +1,40 @@ +package com.twitter.heron.command; + +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.twitter.heron.spi.common.Config; +import com.twitter.heron.spi.common.Context; + +public abstract class CommandHandler { + + // static config read from the config files + protected Config config; + + // runtime config gathered during execution + protected Config runtime; + + /** + * Construct the command handler with static and runtime config + */ + CommandHandler(Config config, Config runtime) { + this.config = config; + this.runtime = runtime; + } + + /** + * Execute any conditions before the command execution + */ + public abstract boolean beforeExecution() throws Exception; + + /** + * Execute any cleanup after the command execution + */ + public abstract boolean afterExecution() throws Exception; + + /** + * Execute the command + */ + public abstract boolean execute() throws Exception; +} diff --git a/heron/commands/src/java/com/twitter/heron/command/CommandHandlerConfig.java b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerConfig.java new file mode 100644 index 00000000000..a940fa66fe8 --- /dev/null +++ b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerConfig.java @@ -0,0 +1,79 @@ +package com.twitter.heron.command; + +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.twitter.heron.api.generated.TopologyAPI; +import com.twitter.heron.common.basics.FileUtils; +import com.twitter.heron.proto.scheduler.Scheduler; + +import com.twitter.heron.spi.common.ClusterConfig; +import com.twitter.heron.spi.common.ClusterDefaults; +import com.twitter.heron.spi.common.Config; +import com.twitter.heron.spi.common.Context; +import com.twitter.heron.spi.common.Keys; + +import org.apache.commons.cli.CommandLine; + +/** + * For loading command handler config + */ +public class CommandHandlerConfig { + private static final Logger LOG = Logger.getLogger(CommandHandlerConfig.class.getName()); + + /** + * Load the defaults config + * + * @return config, the defaults config + */ + protected static Config defaultConfigs(String heronHome, String configPath) { + Config config = Config.newBuilder() + .putAll(ClusterDefaults.getDefaults()) + .putAll(ClusterConfig.loadCommandsConfig(heronHome, configPath)) + .build(); + return config; + } + + /** + * Load the config parameters from the command line + * + * @param cluster, name of the cluster + * @param role, user role + * @param environ, user provided environment/tag + * + * @return config, the command line config + */ + protected static Config commandLineConfigs(String cluster, String role, String environ) { + Config config = Config.newBuilder() + .put(Keys.cluster(), cluster) + .put(Keys.role(), role) + .put(Keys.environ(), environ) + .build(); + return config; + } + + /** + * Load the config from static config files + * + * @param commandLine, the command line args provided + * + * @return config, the static config + */ + protected static Config loadConfig(CommandLine commandLine) { + + String cluster = commandLine.getOptionValue("cluster"); + String role = commandLine.getOptionValue("role"); + String environ = commandLine.getOptionValue("environment"); + String heronHome = commandLine.getOptionValue("heron_home"); + String configPath = commandLine.getOptionValue("config_path"); + + // build the config by expanding all the variables + Config config = Config.expand( + Config.newBuilder() + .putAll(defaultConfigs(heronHome, configPath)) + .putAll(commandLineConfigs(cluster, role, environ)) + .build()); + return config; + } +} diff --git a/heron/commands/src/java/com/twitter/heron/command/CommandHandlerFactory.java b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerFactory.java new file mode 100644 index 00000000000..c9328615199 --- /dev/null +++ b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerFactory.java @@ -0,0 +1,20 @@ +package com.twitter.heron.command; + +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.twitter.heron.spi.common.Config; +import com.twitter.heron.spi.common.Context; + +public class CommandHandlerFactory { + private static final Logger LOG = Logger.getLogger(CommandHandlerFactory.class.getName()); + + public static CommandHandler makeCommand(String command, Config config, Config runtime) { + if (command.equalsIgnoreCase("ps")) + return new ListTopologiesHandler(config, runtime); + + LOG.info("Invalid command " + command); + return null; + } +} diff --git a/heron/commands/src/java/com/twitter/heron/command/CommandHandlerMain.java b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerMain.java new file mode 100644 index 00000000000..9bbd1924ac3 --- /dev/null +++ b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerMain.java @@ -0,0 +1,95 @@ +package com.twitter.heron.command; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.twitter.heron.spi.common.ClusterConfig; +import com.twitter.heron.spi.common.ClusterDefaults; +import com.twitter.heron.spi.common.Config; +import com.twitter.heron.spi.common.Context; +import com.twitter.heron.spi.common.Keys; +import com.twitter.heron.spi.statemgr.IStateManager; +import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor; + +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.HelpFormatter; + +public class CommandHandlerMain { + private static final Logger LOG = Logger.getLogger(CommandHandlerMain.class.getName()); + + public static void main(String[] args) + throws ClassNotFoundException, IllegalAccessException, + InstantiationException, IOException, ParseException { + + // parse the help options first. + Options options = CommandHandlerOptions.constructOptions(); + Options helpOptions = CommandHandlerOptions.constructHelpOptions(); + CommandLineParser parser = new DefaultParser(); + CommandLine cmd = parser.parse(helpOptions, args, true);; + + if(cmd.hasOption("h")) { + CommandHandlerOptions.usage(options); + System.exit(0); + } + + // Now parse the required options + try { + cmd = parser.parse(options, args); + } catch(ParseException e) { + LOG.severe("Error parsing command line options: " + e.getMessage()); + CommandHandlerOptions.usage(options); + System.exit(1); + } + + // get the command to be executed + String command = cmd.getOptionValue("command"); + + // load the static config + Config config = CommandHandlerConfig.loadConfig(cmd); + LOG.info("Static config loaded successfully "); + LOG.info(config.toString()); + + // create an instance of state manager + String statemgrClass = Context.stateManagerClass(config); + IStateManager statemgr = (IStateManager) Class.forName(statemgrClass).newInstance(); + + try { + // initialize the statemgr + statemgr.initialize(config); + + // build the runtime config + Config runtime = Config.newBuilder() + .put(Keys.schedulerStateManagerAdaptor(), new SchedulerStateManagerAdaptor(statemgr)) + .build(); + + // instantiate the command handler + CommandHandler commandHandler = CommandHandlerFactory.makeCommand(command, config, runtime); + + // execute preconditions for command execution + commandHandler.beforeExecution(); + + commandHandler.execute(); + + // execute post conditions for command execution + commandHandler.afterExecution(); + + } catch (Exception e) { + e.printStackTrace(); + LOG.severe("Unable to execute command " + command); + System.exit(1); + + } finally { + // close the state manager + statemgr.close(); + } + + System.exit(0); + } +} diff --git a/heron/commands/src/java/com/twitter/heron/command/CommandHandlerOptions.java b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerOptions.java new file mode 100644 index 00000000000..81feabbe83c --- /dev/null +++ b/heron/commands/src/java/com/twitter/heron/command/CommandHandlerOptions.java @@ -0,0 +1,105 @@ +package com.twitter.heron.command; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.HelpFormatter; + +public class CommandHandlerOptions { + private static final Logger LOG = Logger.getLogger(CommandHandlerOptions.class.getName()); + + // Print usage options + protected static void usage(Options options) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp( "CommandHandlerOptions", options ); + } + + // Construct all required command line options + protected static Options constructOptions() { + Options options = new Options(); + + Option cluster = Option.builder("c") + .desc("Cluster name in which the topology needs to run on") + .longOpt("cluster") + .hasArgs() + .argName("cluster") + .required() + .build(); + + Option role = Option.builder("r") + .desc("Role under which the topology needs to run") + .longOpt("role") + .hasArgs() + .argName("role") + .required() + .build(); + + Option environment = Option.builder("e") + .desc("Environment under which the topology needs to run") + .longOpt("environment") + .hasArgs() + .argName("environment") + .required() + .build(); + + Option heronHome = Option.builder("d") + .desc("Directory where heron is installed") + .longOpt("heron_home") + .hasArgs() + .argName("heron home dir") + .required() + .build(); + + Option configPath = Option.builder("p") + .desc("Path of the config files") + .longOpt("config_path") + .hasArgs() + .argName("config path") + .required() + .build(); + + // TODO: Need to figure out the exact format + Option configOverrides = Option.builder("o") + .desc("Command line config overrides") + .longOpt("config_overrides") + .hasArgs() + .argName("config overrides") + .build(); + + Option command = Option.builder("m") + .desc("Command to run") + .longOpt("command") + .hasArgs() + .required() + .argName("command to run") + .build(); + + options.addOption(cluster); + options.addOption(role); + options.addOption(environment); + options.addOption(configPath); + options.addOption(configOverrides); + options.addOption(command); + options.addOption(heronHome); + + return options; + } + + // construct command line help options + protected static Options constructHelpOptions() { + Options options = new Options(); + Option help = Option.builder("h") + .desc("List all options and their description") + .longOpt("help") + .build(); + + options.addOption(help); + return options; + } +} diff --git a/heron/commands/src/java/com/twitter/heron/command/ListTopologiesHandler.java b/heron/commands/src/java/com/twitter/heron/command/ListTopologiesHandler.java new file mode 100644 index 00000000000..a42727ec014 --- /dev/null +++ b/heron/commands/src/java/com/twitter/heron/command/ListTopologiesHandler.java @@ -0,0 +1,94 @@ +package com.twitter.heron.command; + +import java.util.Arrays; +import java.util.List; +import java.util.ArrayList; +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import java.util.concurrent.TimeUnit; +import com.twitter.heron.spi.common.Config; +import com.twitter.heron.spi.common.Context; +import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor; +import com.twitter.heron.spi.utils.Runtime; + +import com.twitter.heron.proto.system.ExecutionEnvironment; + +public class ListTopologiesHandler extends CommandHandler { + private static final Logger LOG = Logger.getLogger(ListTopologiesHandler.class.getName()); + + /** + * Construct the command handler with static and runtime config + */ + ListTopologiesHandler(Config config, Config runtime) { + super(config, runtime); + } + + /** + * Execute any conditions before the command execution + */ + public boolean beforeExecution() { + return true; + } + + /** + * Execute any cleanup after the command execution + */ + public boolean afterExecution() { + return true; + } + + /** + * Execute the command + */ + public boolean execute() throws Exception { + + // get the state manager instance + SchedulerStateManagerAdaptor stateManager = Runtime.schedulerStateManagerAdaptor(runtime); + + // get the execution states of all topologies + List executionStates; + executionStates = stateManager.getAllExecutionStates().get(5, TimeUnit.SECONDS); + + // if no topologies are found, return right away + if (executionStates.isEmpty()) { + System.out.println("No topologies found"); + return true; + } + + // print the header + System.out.format("%-12.12s %-12.12s %-12.12s %-15.15s %-15.15s %s\n", + "CLUSTER", "ROLE", "ENVIRON", "USER", "TIME", "NAME"); + + long now = System.currentTimeMillis()/1000; + for (ExecutionEnvironment.ExecutionState es: executionStates) { + + // calculate the difference in time in seconds + long delta = now - es.getSubmissionTime(); + + // calculate (and subtract) whole days + long days = (long)Math.floor(delta / 86400); + delta -= days * 86400; + + // calculate (and subtract) whole hours + long hours = ((long)Math.floor(delta / 3600)) % 24; + delta -= hours * 3600; + + // calculate (and subtract) whole minutes + long minutes = ((long)Math.floor(delta / 60)) % 60; + delta -= minutes * 60; + + // what's left is seconds + long seconds = delta % 60; // in theory the modulus is not required + + String timeString = String.format("%03d:%02d:%02d:%02d", days, hours, minutes, seconds); + + System.out.format("%-12.12s %-12.12s %-12.12s %-15.15s %-15.15s %s\n", + es.getCluster(), es.getRole(), es.getEnviron(), + es.getSubmissionUser(), timeString, es.getTopologyName()); + } + + return true; + } +} diff --git a/heron/spi/src/java/com/twitter/heron/spi/common/ClusterConfig.java b/heron/spi/src/java/com/twitter/heron/spi/common/ClusterConfig.java index b1beef54769..3f9155b6c10 100644 --- a/heron/spi/src/java/com/twitter/heron/spi/common/ClusterConfig.java +++ b/heron/spi/src/java/com/twitter/heron/spi/common/ClusterConfig.java @@ -146,6 +146,15 @@ public static Config loadConfig(String heronHome, String configPath) { return cb.build(); } + public static Config loadCommandsConfig(String heronHome, String configPath) { + Config homeConfig = loadBasicConfig(heronHome, configPath); + + Config.Builder cb = Config.newBuilder() + .putAll(homeConfig) + .putAll(loadStateManagerConfig(Context.stateManagerFile(homeConfig))); + return cb.build(); + } + public static Config loadSandboxConfig() { Config sandboxConfig = loadBasicSandboxConfig(); diff --git a/heron/spi/src/java/com/twitter/heron/spi/statemgr/IStateManager.java b/heron/spi/src/java/com/twitter/heron/spi/statemgr/IStateManager.java index 17ee0a6732f..ca8e71c470d 100644 --- a/heron/spi/src/java/com/twitter/heron/spi/statemgr/IStateManager.java +++ b/heron/spi/src/java/com/twitter/heron/spi/statemgr/IStateManager.java @@ -1,5 +1,6 @@ package com.twitter.heron.spi.statemgr; +import java.util.List; import com.google.common.util.concurrent.ListenableFuture; import com.twitter.heron.api.generated.TopologyAPI; @@ -151,6 +152,13 @@ ListenableFuture getTopology( ListenableFuture getExecutionState( WatchCallback watcher, String topologyName); + /** + * Get the execution states of all topologies + * + * @return List + */ + ListenableFuture> getAllExecutionStates(); + /** * Set the location of Tmaster. * @param location diff --git a/heron/spi/src/java/com/twitter/heron/spi/statemgr/SchedulerStateManagerAdaptor.java b/heron/spi/src/java/com/twitter/heron/spi/statemgr/SchedulerStateManagerAdaptor.java index 6ef49ece876..44b40071386 100644 --- a/heron/spi/src/java/com/twitter/heron/spi/statemgr/SchedulerStateManagerAdaptor.java +++ b/heron/spi/src/java/com/twitter/heron/spi/statemgr/SchedulerStateManagerAdaptor.java @@ -1,5 +1,6 @@ package com.twitter.heron.spi.statemgr; +import java.util.List; import com.google.common.util.concurrent.ListenableFuture; import com.twitter.heron.api.generated.TopologyAPI; @@ -161,6 +162,15 @@ public ListenableFuture getExecutionState( return delegate.getExecutionState(null, topologyName); } + /** + * Get the execution states of all topologies + * + * @return List + */ + public ListenableFuture> getAllExecutionStates() { + return delegate.getAllExecutionStates(); + } + /** * Get the physical plan for the given topology * diff --git a/heron/statemgrs/src/java/com/twitter/heron/statemgr/NullStateManager.java b/heron/statemgrs/src/java/com/twitter/heron/statemgr/NullStateManager.java index fc44031efb4..d76cfb61245 100644 --- a/heron/statemgrs/src/java/com/twitter/heron/statemgr/NullStateManager.java +++ b/heron/statemgrs/src/java/com/twitter/heron/statemgr/NullStateManager.java @@ -1,5 +1,6 @@ package com.twitter.heron.statemgr; +import java.util.List; import java.util.Map; import com.google.common.util.concurrent.ListenableFuture; @@ -108,6 +109,11 @@ public ListenableFuture getExecutionState(W return nullFuture; } + @Override + public ListenableFuture> getAllExecutionStates() { + return nullFuture; + } + @Override public ListenableFuture getPhysicalPlan(WatchCallback watcher, String topologyName) { return nullFuture; diff --git a/heron/statemgrs/src/java/com/twitter/heron/statemgr/localfs/LocalFileSystemStateManager.java b/heron/statemgrs/src/java/com/twitter/heron/statemgr/localfs/LocalFileSystemStateManager.java index 432f6b822da..07f59760695 100644 --- a/heron/statemgrs/src/java/com/twitter/heron/statemgr/localfs/LocalFileSystemStateManager.java +++ b/heron/statemgrs/src/java/com/twitter/heron/statemgr/localfs/LocalFileSystemStateManager.java @@ -1,6 +1,9 @@ package com.twitter.heron.statemgr.localfs; +import java.io.File; +import java.util.List; import java.util.Map; +import java.util.LinkedList; import java.util.logging.Logger; import com.google.common.util.concurrent.ListenableFuture; @@ -193,9 +196,32 @@ public ListenableFuture getExecutionState(W executionState = ExecutionEnvironment.ExecutionState.parseFrom(data); future.set(executionState); } catch (InvalidProtocolBufferException e) { - future.setException(new RuntimeException("Could not parse SchedulerLocation", e)); + future.setException(new RuntimeException("Could not parse ExecutionState", e)); + } + + return future; + } + + @Override + public ListenableFuture> getAllExecutionStates() { + SettableFuture> future = SettableFuture.create(); + ExecutionEnvironment.ExecutionState executionState; + List executionStates = new LinkedList(); + + File folder = new File(getExecutionStateDir()); + File[] files = folder.listFiles(); + for (File file: files) { + byte[] data = FileUtils.readFromFile(file.getAbsolutePath()); + try { + executionState = ExecutionEnvironment.ExecutionState.parseFrom(data); + executionStates.add(executionState); + } catch (InvalidProtocolBufferException e) { + future.setException(new RuntimeException("Could not parse ExecutionState", e)); + return future; + } } + future.set(executionStates); return future; } diff --git a/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java b/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java index 7cabfb9d6a0..e1176b82707 100644 --- a/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java +++ b/heron/statemgrs/src/java/com/twitter/heron/statemgr/zookeeper/curator/CuratorStateManager.java @@ -1,5 +1,7 @@ package com.twitter.heron.statemgr.zookeeper.curator; +import java.util.List; +import java.util.LinkedList; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; @@ -265,6 +267,13 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex return topologyFuture; } + @Override + public ListenableFuture> getAllExecutionStates() { + final SettableFuture> executionStateFuture = SettableFuture.create(); + executionStateFuture.set(new LinkedList()); + return executionStateFuture; + } + @Override public ListenableFuture getExecutionState(WatchCallback watcher, String topologyName) { final SettableFuture executionStateFuture = SettableFuture.create(); diff --git a/scripts/centos5/BUILD b/scripts/centos5/BUILD index e38fd99eb3a..0f2dc2a666e 100644 --- a/scripts/centos5/BUILD +++ b/scripts/centos5/BUILD @@ -83,6 +83,7 @@ genrule( ":conf-local-uploader", ":hcli", ":hexamples", + ":hcommands", ":hscheduler", ":hlscheduler", ":hrrpacking", @@ -112,6 +113,7 @@ genrule( "--cp $(location conf-local-uploader) conf/local/uploader.yaml", "--cp $(location heron-core) dist/heron-core.tar.gz", "--cp $(location hexamples) examples/heron-examples.jar", + "--cp $(location hcommands) lib/commands/heron-commands.jar", "--cp $(location hscheduler) lib/scheduler/heron-scheduler.jar", "--cp $(location hlscheduler) lib/scheduler/heron-local-scheduler.jar", "--cp $(location hrrpacking) lib/packing/heron-roundrobin-packing.jar", @@ -237,6 +239,11 @@ filegroup( srcs = ["//heron/instance/src/java:aurora-logging-properties"], ) +filegroup( + name = "hcommands", + srcs = ["//heron/commands/src/java:heron-commands"], +) + filegroup( name = "hscheduler", srcs = ["//heron/newscheduler/src/java:heron-scheduler"], diff --git a/scripts/packages/BUILD b/scripts/packages/BUILD index fb78c8b83f1..596cc844dfd 100644 --- a/scripts/packages/BUILD +++ b/scripts/packages/BUILD @@ -8,6 +8,7 @@ load("/tools/rules/heron_api", "heron_api_files") load("/tools/rules/heron_client", "heron_client_bin_files") load("/tools/rules/heron_client", "heron_client_conf_files") load("/tools/rules/heron_client", "heron_client_lib_3rdparty_files") +load("/tools/rules/heron_client", "heron_client_lib_commands_files") load("/tools/rules/heron_client", "heron_client_lib_scheduler_files") load("/tools/rules/heron_client", "heron_client_lib_packing_files") load("/tools/rules/heron_client", "heron_client_lib_statemgr_files") @@ -132,6 +133,12 @@ pkg_tar( files = heron_client_bin_files(), ) +pkg_tar( + name = "heron-client-lib-commands", + package_dir = "lib/commands", + files = heron_client_lib_commands_files(), +) + pkg_tar( name = "heron-client-lib-scheduler", package_dir = "lib/scheduler", @@ -207,8 +214,9 @@ pkg_tar( ":heron-client-dist", ":heron-client-examples", ":heron-client-lib-3rdparty", - ":heron-client-lib-scheduler", + ":heron-client-lib-commands", ":heron-client-lib-packing", + ":heron-client-lib-scheduler", ":heron-client-lib-statemgr", ":heron-client-lib-uploader", ], diff --git a/tools/rules/heron_client.bzl b/tools/rules/heron_client.bzl index 0b019780a4e..9df787b0c14 100644 --- a/tools/rules/heron_client.bzl +++ b/tools/rules/heron_client.bzl @@ -21,6 +21,11 @@ def heron_client_aurora_files(): "//heron/config/src/yaml:conf-aurora-yaml", ] +def heron_client_lib_commands_files(): + return [ + "//heron/commands/src/java:heron-commands", + ] + def heron_client_lib_scheduler_files(): return [ "//heron/newscheduler/src/java:heron-scheduler",