From 0640c167b5c7fa6f5dd02876a880ec3c500eaea7 Mon Sep 17 00:00:00 2001 From: Sumit Maheshwari Date: Sat, 11 Mar 2017 22:34:04 -0800 Subject: [PATCH 01/10] SDK-178: Support for File and Partition Sensors. --- bin/qds.py | 19 ++++++++++- qds_sdk/sensors.py | 75 ++++++++++++++++++++++++++++++++++++++++++++ tests/test_sensor.py | 36 +++++++++++++++++++++ 3 files changed, 129 insertions(+), 1 deletion(-) create mode 100644 qds_sdk/sensors.py create mode 100644 tests/test_sensor.py diff --git a/bin/qds.py b/bin/qds.py index 23ae2e54..e2cc440d 100755 --- a/bin/qds.py +++ b/bin/qds.py @@ -16,6 +16,7 @@ from qds_sdk.nezha import NezhaCmdLine from qds_sdk.user import UserCmdLine from qds_sdk.template import TemplateCmdLine +from qds_sdk.sensors import * import os import sys @@ -37,6 +38,11 @@ "prestocmd": PrestoCommand } +SensorClasses = { + "filesensor": FileSensor, + "partitionsensor": PartitionSensor +} + usage_str = ( "Usage: qds.py [options] \n" "\nCommand subcommands:\n" @@ -86,7 +92,9 @@ "\nNezha subcommand:\n" " nezha --help\n" "\nUser subcommad:\n" - " user --help\n") + " user --help\n" + "\nSensor subcommand:\n" + " --help\n") def usage(parser=None): @@ -195,6 +203,12 @@ def cmdmain(cmd, args): return globals()[action + "action"](cmdclass, args) +def sensormain(sensor, args): + sensor_class = SensorClasses[sensor] + print(SensorCmdLine.check(sensor_class, args)) + return 0 + + def checkargs_cluster_id_label(args): if len(args) != 1: sys.stderr.write("expecting single argument cluster id or cluster label\n") @@ -560,6 +574,9 @@ def main(): if a0 in CommandClasses: return cmdmain(a0, args) + if a0 in SensorClasses: + return sensormain(a0, args) + if a0 == "account": return accountmain(args) diff --git a/qds_sdk/sensors.py b/qds_sdk/sensors.py new file mode 100644 index 00000000..9c19dd9c --- /dev/null +++ b/qds_sdk/sensors.py @@ -0,0 +1,75 @@ +""" +The sensors module contains the base definition for a generic +sensor call and the implementation of all the specific sensors +""" + +from __future__ import print_function +from qds_sdk.qubole import Qubole +from qds_sdk.resource import Resource +from argparse import ArgumentParser + +import logging +import json + +log = logging.getLogger("qds_sensors") + + +class SensorCmdLine: + + @staticmethod + def check(sensor_class, args): + """ + Method to call Sensor.check after parsing args from cmdline + :param sensor_class: sensor class + :param args: inline arguments + :return: True or False + """ + parser = SensorCmdLine.parsers(sensor_class) + parsed = parser.parse_args(args) + return sensor_class.check(json.loads(parsed.data)) + + @staticmethod + def parsers(sensor_class): + argparser = ArgumentParser(prog=sensor_class.usage, description=sensor_class.description) + subparsers = argparser.add_subparsers() + + #Check + check = subparsers.add_parser("check", help="Check a Sensor") + check.add_argument("-d", "--data", dest="data", required=True, + help="String containing a valid json object") + check.set_defaults(func=Sensor.check) + return argparser + + +class Sensor(Resource): + """ + qds_sdk.Sensor is the base Qubole sensor class. Different types of Qubole + sensors can subclass this. + """ + + @classmethod + def check(cls, data): + """ + Method to call the sensors api with json payload + :param data: valid json object + :return: True or False + """ + conn = Qubole.agent() + return conn.post(cls.rest_entity_path, data=data)['status'] + + +class FileSensor(Sensor): + rest_entity_path = "sensors/file_sensor" + + usage = ("qds.py filesensor check -d 'json string'") + description = "File Sensor client for Qubole Data Services" + + +class PartitionSensor(Sensor): + rest_entity_path = "sensors/partition_sensor" + + usage = ("qds.py partitionsensor check -d 'json string'") + description = "Hive Partition Sensor client for Qubole Data Services" + + + diff --git a/tests/test_sensor.py b/tests/test_sensor.py new file mode 100644 index 00000000..0108266d --- /dev/null +++ b/tests/test_sensor.py @@ -0,0 +1,36 @@ +import sys +import os + +if sys.version_info > (2, 7, 0): + import unittest +else: + import unittest2 as unittest +from mock import * + +sys.path.append(os.path.join(os.path.dirname(__file__), '../bin')) +import qds +from qds_sdk.connection import Connection +from test_base import print_command +from test_base import QdsCliTestCase + + +class TestSensorCheck(QdsCliTestCase): + def test_file_sensor(self): + sys.argv = ['qds.py', 'filesensor', 'check', '-d', '{"files":["s3://dev.canopydata.com/airflow"]}'] + print_command() + Connection._api_call = Mock(return_value={'status': True}) + qds.main() + Connection._api_call.assert_called_with( + "POST", "sensors/file_sensor", {'files':['s3://dev.canopydata.com/airflow']}) + + + def test_partition_sensor(self): + sys.argv = ['qds.py', 'partitionsensor', 'check', '-d', '{"schema" : "default", "table" : "nation_s3_rcfile_p", "columns" : [{"column" : "p", "values" : [1, 2]}]}'] + print_command() + Connection._api_call = Mock(return_value={'status': True}) + qds.main() + Connection._api_call.assert_called_with( + "POST", "sensors/partition_sensor", {"schema" : "default", "table" : "nation_s3_rcfile_p", "columns" : [{"column" : "p", "values" : [1, 2]}]}) + +if __name__ == '__main__': + unittest.main() From 896d7250d6c66968d02e141b062c3595df505fe4 Mon Sep 17 00:00:00 2001 From: Harsh Shah Date: Thu, 13 Apr 2017 14:02:49 -0700 Subject: [PATCH 02/10] Add license to setup file for the sdk. Squashed commit of the following: commit 1b67072292f7bd0336b654e36ddb590428ae143c Author: Anwesha Das Date: Thu Aug 25 11:15:41 2016 +0530 Adds license information to the setup function As suggested in https://packaging.python.org/distributing/#license --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index a004c492..0a5df4ab 100644 --- a/setup.py +++ b/setup.py @@ -16,6 +16,7 @@ def read(fname): author="Qubole", author_email="dev@qubole.com", description=("Python SDK for coding to the Qubole Data Service API"), + license="Apache License 2.0", keywords="qubole sdk api", url="https://github.com/qubole/qds-sdk-py", packages=['qds_sdk'], From 2bbc2c6d3b42d47a2f9a0190083f8e7af5678504 Mon Sep 17 00:00:00 2001 From: Tanish Gupta Date: Fri, 14 Apr 2017 23:39:02 +0530 Subject: [PATCH 03/10] new: SDK-168: ACM-181: Add support for clusters module. * Add eleastic ip support for master. * Add ha config to clusters. * Add options to include headers for get_results. --- bin/qds.py | 14 +++++++++----- qds_sdk/commands.py | 32 ++++++++++++++++++++++++++++++-- tests/test_actions.py | 2 +- tests/test_command.py | 27 ++++++++++++++++++++++++++- 4 files changed, 66 insertions(+), 9 deletions(-) diff --git a/bin/qds.py b/bin/qds.py index e2cc440d..c308a4a0 100755 --- a/bin/qds.py +++ b/bin/qds.py @@ -51,7 +51,8 @@ " run [cmd-specific-args .. ] : submit cmd & wait. print results\n" " check : print the cmd object for this id\n" " cancel : cancels the cmd with this id\n" - " getresult : get the results for the cmd with this id\n" + " getresult : id -> get the results for the cmd with this id\n" + " include_header -> to include headers in results(true/false)\n" " getlog : get the logs for the cmd with this id\n" "\nCluster subcommand:\n" " cluster \n" @@ -121,10 +122,10 @@ def submitaction(cmdclass, args): return 0 -def _getresult(cmdclass, cmd): +def _getresult(cmdclass, cmd, args=[]): if Command.is_success(cmd.status): log.info("Fetching results for %s, Id: %s" % (cmdclass.__name__, cmd.id)) - cmd.get_results(sys.stdout, delim='\t') + cmd.get_results(sys.stdout, delim='\t', qlog=cmd.qlog, arguments=args) return 0 else: log.error("Cannot fetch results - command Id: %s failed with status: %s" % (cmd.id, cmd.status)) @@ -164,9 +165,12 @@ def cancelaction(cmdclass, args): def getresultaction(cmdclass, args): - checkargs_id(args) + if len(args) > 2: + sys.stderr.write("expecting not more than 2 arguments\n") + usage() + cmd = cmdclass.find(args.pop(0)) - return _getresult(cmdclass, cmd) + return _getresult(cmdclass, cmd, args) def getlogaction(cmdclass, args): diff --git a/qds_sdk/commands.py b/qds_sdk/commands.py index a9cde5f3..a79ec248 100644 --- a/qds_sdk/commands.py +++ b/qds_sdk/commands.py @@ -192,7 +192,7 @@ def get_jobs_id(cls, id): return r.text - def get_results(self, fp=sys.stdout, inline=True, delim=None, fetch=True): + def get_results(self, fp=sys.stdout, inline=True, delim=None, fetch=True, qlog=None, arguments=[]): """ Fetches the result for the command represented by this object @@ -212,7 +212,14 @@ def get_results(self, fp=sys.stdout, inline=True, delim=None, fetch=True): conn = Qubole.agent() - r = conn.get(result_path, {'inline': inline}) + include_header = "false" + if len(arguments) == 1: + include_header = arguments.pop(0) + if include_header not in ('true', 'false'): + raise ParseError("incude_header can be either true or false") + + + r = conn.get(result_path, {'inline': inline, 'include_headers': include_header}) if r.get('inline'): if sys.version_info < (3, 0, 0): fp.write(r['results'].encode('utf8')) @@ -235,12 +242,19 @@ def get_results(self, fp=sys.stdout, inline=True, delim=None, fetch=True): log.info("Starting download from result locations: [%s]" % ",".join(r['result_location'])) #fetch latest value of num_result_dir num_result_dir = Command.find(self.id).num_result_dir + for s3_path in r['result_location']: + + # If column/header names are not able to fetch then use include header as true + if include_header.lower() == "true" and qlog is not None: + write_headers(qlog, fp) + # In Python 3, # If the delim is None, fp should be in binary mode because # boto expects it to be. # If the delim is not None, then both text and binary modes # work. + _download_to_local(boto_conn, s3_path, fp, num_result_dir, delim=delim, skip_data_avail_check=isinstance(self, PrestoCommand)) else: @@ -1247,6 +1261,20 @@ def _read_iteratively(key_instance, fp, delim): # Stream closes itself when the exception is raised return +def write_headers(qlog,fp): + col_names = [] + qlog = json.loads(qlog) + if qlog["QBOL-QUERY-SCHEMA"] is not None: + qlog_hash = qlog["QBOL-QUERY-SCHEMA"]["-1"] if qlog["QBOL-QUERY-SCHEMA"]["-1"] is not None else qlog["QBOL-QUERY-SCHEMA"][qlog["QBOL-QUERY-SCHEMA"].keys[0]] + + for qlog_item in qlog_hash: + col_names.append(qlog_item["ColumnName"]) + + col_names = "\t".join(col_names) + col_names += "\n" + + fp.write(col_names) + def _download_to_local(boto_conn, s3_path, fp, num_result_dir, delim=None, skip_data_avail_check=False): ''' diff --git a/tests/test_actions.py b/tests/test_actions.py index ddd782f3..c4d5fbd1 100644 --- a/tests/test_actions.py +++ b/tests/test_actions.py @@ -109,7 +109,7 @@ def test_results(self): qds.main() Connection._api_call.assert_has_calls( [call("GET", "actions/123", params=None), - call("GET", "commands/123/results", params={'inline': True})]) + call("GET", "commands/123/results", params={'inline': True, 'include_headers': 'false'})]) if __name__ == '__main__': diff --git a/tests/test_command.py b/tests/test_command.py index 10ccd040..c937ef7b 100644 --- a/tests/test_command.py +++ b/tests/test_command.py @@ -5,7 +5,7 @@ import unittest else: import unittest2 as unittest -from mock import Mock +from mock import * from tempfile import NamedTemporaryFile sys.path.append(os.path.join(os.path.dirname(__file__), '../bin')) import qds @@ -1459,6 +1459,31 @@ def test_submit_with_tags(self): 'command_type': 'DbTapQueryCommand', 'can_notify': False}) +class TestGetResultsCommand(QdsCliTestCase): + + def test_result_with_enable_header_true(self): + sys.argv = ['qds.py', 'hivecmd', 'getresult', '314591', 'true'] + print_command() + + # This mock include return values of both commands/:id and commands/:id/results get calls + Connection._api_call = Mock(return_value={'id' : 314591, + 'results': '123', + 'inline': True, + 'qlog': "column names", + 'meta_data': {'results_resource': 'commands/314591/results'}, + 'status': 'done'}) + qds.main() + Connection._api_call.assert_has_calls( + [call("GET", "commands/314591", params=None), + call("GET", "commands/314591/results", params={'inline': True, 'include_headers': 'true'})]) + + def test_result_failed_more_than_two_arguments(self): + sys.argv = ['qds.py', 'hivecmd', 'getresult', '314591', 'true', "extra_arg"] + print_command() + + with self.assertRaises(SystemExit): + qds.main() + if __name__ == '__main__': unittest.main() From 0e575812cc7e6f62eace35c6d88c0a58b9fed69f Mon Sep 17 00:00:00 2001 From: Tanish Gupta Date: Sun, 16 Apr 2017 00:45:27 -0700 Subject: [PATCH 04/10] fix: dev: SDK-177: Support for fetching command properties --- bin/qds.py | 21 +++++++++++++++++---- tests/test_command.py | 26 +++++++++++++++++--------- 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/bin/qds.py b/bin/qds.py index c308a4a0..f48cc97e 100755 --- a/bin/qds.py +++ b/bin/qds.py @@ -49,7 +49,9 @@ " \n" " submit [cmd-specific-args .. ] : submit cmd & print id\n" " run [cmd-specific-args .. ] : submit cmd & wait. print results\n" - " check : print the cmd object for this id\n" + " check : id -> print the cmd object for this id\n" + " include-query-properties(true/false) -> to include query properties like\n" + " tags, comments and user actions\n" " cancel : cancels the cmd with this id\n" " getresult : id -> get the results for the cmd with this id\n" " include_header -> to include headers in results(true/false)\n" @@ -143,9 +145,20 @@ def runaction(cmdclass, args): def checkaction(cmdclass, args): - checkargs_id(args) - o = cmdclass.find(args.pop(0)) - print(str(o)) + if len(args) > 2: + sys.stderr.write("expecting not more than 2 arguments\n") + usage() + + conn = Qubole.agent() + id = args.pop(0) + include_query_properties="false" + if len(args) == 1: + include_query_properties=args.pop(0) + if include_query_properties not in ('true', 'false'): + raise ParseError("include-query-properties can be either true or false") + + r = conn.get(cmdclass.element_path(id), {'include_query_properties': include_query_properties}) + print(str(r)) return 0 diff --git a/tests/test_command.py b/tests/test_command.py index c937ef7b..4a552c40 100644 --- a/tests/test_command.py +++ b/tests/test_command.py @@ -22,63 +22,71 @@ def test_hivecmd(self): print_command() Connection._api_call = Mock(return_value={}) qds.main() - Connection._api_call.assert_called_with("GET", "commands/123", params=None) + Connection._api_call.assert_called_with("GET", "commands/123", params={'include_query_properties': 'false'}) def test_sparkcmd(self): sys.argv = ['qds.py', 'sparkcmd', 'check', '123'] print_command() Connection._api_call = Mock(return_value={}) qds.main() - Connection._api_call.assert_called_with("GET", "commands/123", params=None) + Connection._api_call.assert_called_with("GET", "commands/123", params={'include_query_properties': 'false'}) def test_hadoopcmd(self): sys.argv = ['qds.py', 'hadoopcmd', 'check', '123'] print_command() Connection._api_call = Mock(return_value={}) qds.main() - Connection._api_call.assert_called_with("GET", "commands/123", params=None) + Connection._api_call.assert_called_with("GET", "commands/123", params={'include_query_properties': 'false'}) def test_prestocmd(self): sys.argv = ['qds.py', 'prestocmd', 'check', '123'] print_command() Connection._api_call = Mock(return_value={}) qds.main() - Connection._api_call.assert_called_with("GET", "commands/123", params=None) + Connection._api_call.assert_called_with("GET", "commands/123", params={'include_query_properties': 'false'}) def test_pigcmd(self): sys.argv = ['qds.py', 'pigcmd', 'check', '123'] print_command() Connection._api_call = Mock(return_value={}) qds.main() - Connection._api_call.assert_called_with("GET", "commands/123", params=None) + Connection._api_call.assert_called_with("GET", "commands/123", params={'include_query_properties': 'false'}) def test_shellcmd(self): sys.argv = ['qds.py', 'shellcmd', 'check', '123'] print_command() Connection._api_call = Mock(return_value={}) qds.main() - Connection._api_call.assert_called_with("GET", "commands/123", params=None) + Connection._api_call.assert_called_with("GET", "commands/123", params={'include_query_properties': 'false'}) def test_dbexportcmd(self): sys.argv = ['qds.py', 'dbexportcmd', 'check', '123'] print_command() Connection._api_call = Mock(return_value={}) qds.main() - Connection._api_call.assert_called_with("GET", "commands/123", params=None) + Connection._api_call.assert_called_with("GET", "commands/123", params={'include_query_properties': 'false'}) def test_dbimportcmd(self): sys.argv = ['qds.py', 'dbimportcmd', 'check', '123'] print_command() Connection._api_call = Mock(return_value={}) qds.main() - Connection._api_call.assert_called_with("GET", "commands/123", params=None) + Connection._api_call.assert_called_with("GET", "commands/123", params={'include_query_properties': 'false'}) def test_dbtapquerycmd(self): sys.argv = ['qds.py', 'dbtapquerycmd', 'check', '123'] print_command() Connection._api_call = Mock(return_value={}) qds.main() - Connection._api_call.assert_called_with("GET", "commands/123", params=None) + Connection._api_call.assert_called_with("GET", "commands/123", params={'include_query_properties': 'false'}) + + def test_includequeryproperty(self): + sys.argv = ['qds.py', 'hivecmd', 'check', '123', 'true'] + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with("GET", "commands/123", params={'include_query_properties': 'true'}) + class TestCommandCancel(QdsCliTestCase): From cb29fd258faf29a851b7d1c0b4abd3a1f6076b92 Mon Sep 17 00:00:00 2001 From: Tarun Goyal Date: Mon, 8 May 2017 21:30:06 +0530 Subject: [PATCH 05/10] fix: dev: MW-604: Supports remote script location. --- qds_sdk/commands.py | 42 +++++++++---------- tests/test_command.py | 96 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 114 insertions(+), 24 deletions(-) diff --git a/qds_sdk/commands.py b/qds_sdk/commands.py index a79ec248..26b8cd88 100644 --- a/qds_sdk/commands.py +++ b/qds_sdk/commands.py @@ -536,6 +536,20 @@ def validate_script_location(cls, options): if options.language is not None: raise ParseError("Both script location and language cannot be specified together", cls.optparser.format_help()) # for now, aws script_location is not supported and throws an error + fileName, fileExtension = os.path.splitext(options.script_location) + # getting the language of the program from the file extension + if fileExtension == ".py": + options.language = "python" + elif fileExtension == ".scala": + options.language = "scala" + elif fileExtension == ".R": + options.language = "R" + elif fileExtension == ".sql": + options.language = "sql" + else: + raise ParseError("Invalid program type %s. Please choose one from python, scala, R or sql." % str(fileExtension), + cls.optparser.format_help()) + if ((options.script_location.find("s3://") != 0) and (options.script_location.find("s3n://") != 0)): @@ -548,30 +562,14 @@ def validate_script_location(cls, options): str(e), cls.optparser.format_help()) - - fileName, fileExtension = os.path.splitext(options.script_location) - # getting the language of the program from the file extension - if fileExtension == ".py": - options.language = "python" - elif fileExtension == ".scala": - options.language = "scala" - elif fileExtension == ".R": - options.language = "R" - elif fileExtension == ".sql": - options.language = "sql" + + options.script_location = None + if options.language == "sql": + options.sql = q + options.language = None else: - raise ParseError("Invalid program type %s. Please choose one from python, scala, R or sql." % str(fileExtension), - cls.optparser.format_help()) - else: - raise ParseError("Invalid location, Please choose a local file location", - cls.optparser.format_help()) + options.program = q - options.script_location = None - if options.language == "sql": - options.sql = q - options.language = None - else: - options.program = q @classmethod def parse(cls, args): diff --git a/tests/test_command.py b/tests/test_command.py index 4a552c40..e5d19247 100644 --- a/tests/test_command.py +++ b/tests/test_command.py @@ -435,8 +435,100 @@ def test_submit_notebook_with_program(self): with self.assertRaises(qds_sdk.exception.ParseError): qds.main() - def test_submit_script_location_aws(self): - sys.argv = ['qds.py', 'sparkcmd', 'submit', '--script_location', 's3://bucket/path-to-script'] + def test_submit_script_location_aws_python(self): + sys.argv = ['qds.py', 'sparkcmd', 'submit', '--script_location', 's3://bucket/path-to-script.py'] + print_command() + Connection._api_call = Mock(return_value={'id': 1234}) + qds.main() + Connection._api_call.assert_called_with('POST', 'commands', + {'macros': None, + 'label': None, + 'language': "python", + 'tags': None, + 'name': None, + 'sql': None, + 'program': None, + 'app_id': None, + 'cmdline':None, + 'command_type': 'SparkCommand', + 'arguments': None, + 'user_program_arguments': None, + 'can_notify': False, + 'script_location': 's3://bucket/path-to-script.py', + 'note_id' : None, + 'retry': 0}) + + def test_submit_script_location_aws_scala(self): + sys.argv = ['qds.py', 'sparkcmd', 'submit', '--script_location', 's3://bucket/path-to-script.scala'] + print_command() + Connection._api_call = Mock(return_value={'id': 1234}) + qds.main() + Connection._api_call.assert_called_with('POST', 'commands', + {'macros': None, + 'label': None, + 'language': "scala", + 'tags': None, + 'name': None, + 'sql': None, + 'program': None, + 'app_id': None, + 'cmdline':None, + 'command_type': 'SparkCommand', + 'arguments': None, + 'user_program_arguments': None, + 'can_notify': False, + 'script_location': 's3://bucket/path-to-script.scala', + 'note_id' : None, + 'retry': 0}) + + def test_submit_script_location_aws_R(self): + sys.argv = ['qds.py', 'sparkcmd', 'submit', '--script_location', 's3://bucket/path-to-script.R'] + print_command() + Connection._api_call = Mock(return_value={'id': 1234}) + qds.main() + Connection._api_call.assert_called_with('POST', 'commands', + {'macros': None, + 'label': None, + 'language': "R", + 'tags': None, + 'name': None, + 'sql': None, + 'program': None, + 'app_id': None, + 'cmdline':None, + 'command_type': 'SparkCommand', + 'arguments': None, + 'user_program_arguments': None, + 'can_notify': False, + 'script_location': 's3://bucket/path-to-script.R', + 'note_id' : None, + 'retry': 0}) + + def test_submit_script_location_aws_sql(self): + sys.argv = ['qds.py', 'sparkcmd', 'submit', '--script_location', 's3://bucket/path-to-script.sql'] + print_command() + Connection._api_call = Mock(return_value={'id': 1234}) + qds.main() + Connection._api_call.assert_called_with('POST', 'commands', + {'macros': None, + 'label': None, + 'language': "sql", + 'tags': None, + 'name': None, + 'sql': None, + 'program': None, + 'app_id': None, + 'cmdline':None, + 'command_type': 'SparkCommand', + 'arguments': None, + 'user_program_arguments': None, + 'can_notify': False, + 'script_location': 's3://bucket/path-to-script.sql', + 'note_id' : None, + 'retry': 0}) + + def test_submit_script_location_aws_java(self): + sys.argv = ['qds.py', 'sparkcmd', 'submit', '--script_location', 's3://bucket/path-to-script.java'] print_command() with self.assertRaises(qds_sdk.exception.ParseError): qds.main() From d38ec8594d4396525fee848c4862cb00f98866bc Mon Sep 17 00:00:00 2001 From: Tanish Gupta Date: Fri, 12 May 2017 00:20:49 +0530 Subject: [PATCH 06/10] new: dev: MW-172-1: Supports cluster api v2. This version is cloud agnostic and works for all clouds supported on QDS. --- bin/qds.py | 38 ++- qds_sdk/cloud/__init__.py | 0 qds_sdk/cloud/aws_cloud.py | 166 +++++++++ qds_sdk/cloud/azure_cloud.py | 206 +++++++++++ qds_sdk/cloud/cloud.py | 7 + qds_sdk/cloud/oracle_bmc_cloud.py | 203 +++++++++++ qds_sdk/cluster.py | 22 +- qds_sdk/clusterv2.py | 545 ++++++++++++++++++++++++++++++ qds_sdk/engine.py | 196 +++++++++++ qds_sdk/qubole.py | 34 +- qds_sdk/util.py | 30 +- setup.py | 2 +- tests/test_cluster.py | 16 +- tests/test_clusterv2.py | 387 +++++++++++++++++++++ 14 files changed, 1817 insertions(+), 35 deletions(-) create mode 100644 qds_sdk/cloud/__init__.py create mode 100755 qds_sdk/cloud/aws_cloud.py create mode 100755 qds_sdk/cloud/azure_cloud.py create mode 100755 qds_sdk/cloud/cloud.py create mode 100755 qds_sdk/cloud/oracle_bmc_cloud.py mode change 100644 => 100755 qds_sdk/cluster.py create mode 100755 qds_sdk/clusterv2.py create mode 100644 qds_sdk/engine.py mode change 100644 => 100755 qds_sdk/util.py create mode 100644 tests/test_clusterv2.py diff --git a/bin/qds.py b/bin/qds.py index f48cc97e..3339ccd8 100755 --- a/bin/qds.py +++ b/bin/qds.py @@ -1,7 +1,6 @@ #!/bin/env python from __future__ import print_function -from qds_sdk.qubole import Qubole from qds_sdk.commands import * from qds_sdk.cluster import * import qds_sdk.exception @@ -16,8 +15,8 @@ from qds_sdk.nezha import NezhaCmdLine from qds_sdk.user import UserCmdLine from qds_sdk.template import TemplateCmdLine +from qds_sdk.clusterv2 import ClusterCmdLine from qds_sdk.sensors import * - import os import sys import traceback @@ -239,7 +238,6 @@ def cluster_create_action(clusterclass, args, api_version=1.2): print(json.dumps(result, indent=4)) return 0 - def cluster_update_action(clusterclass, args, api_version=1.2): arguments = clusterclass._parse_create_update(args, "update", api_version) cluster_info = _create_cluster_info(arguments, api_version) @@ -472,6 +470,23 @@ def clustermain(args, api_version): else: return globals()["cluster_" + action + "_action"](clusterclass, args) +def clustermainv2(args): + action = args[0] + actionset = set( + ["create", "delete", "update", "clone", "list", "start", "terminate", "status", "reassign_label", "add_node", + "remove_node", "update_node", "snapshot", "restore_point", "get_snapshot_schedule", + "update_snapshot_schedule"]) + + result = None + if action not in actionset: + sys.stderr.write("action must be one of <%s>\n" % "|".join(actionset)) + usage() + elif action in set(["create", "update", "clone"]): + result = ClusterCmdLine.run(args) + else: + result = globals()["cluster_" + action + "_action"](Cluster, args) + print(result) + def accountmain(args): result = AccountCmdLine.run(args) print(result) @@ -516,7 +531,7 @@ def nezhamain(args): def templatemain(args): result = TemplateCmdLine.run(args) print(result) - + def main(): optparser = OptionParser(usage=usage_str) @@ -541,6 +556,10 @@ def main(): default=False, help="skip verification of server SSL certificate. Insecure: use with caution.") + optparser.add_option("--cloud_name", dest="cloud_name", + default=os.getenv('CLOUD_PROVIDER'), + help="cloud", choices=["AWS", "AZURE", "ORACLE_BMC"]) + optparser.add_option("-v", dest="verbose", action="store_true", default=False, help="verbose mode - info level logging") @@ -572,6 +591,9 @@ def main(): if options.poll_interval is None: options.poll_interval = 5 + if options.cloud_name is None: + options.cloud_name = "AWS" + if options.skip_ssl_cert_check is None: options.skip_ssl_cert_check = False elif options.skip_ssl_cert_check: @@ -581,7 +603,8 @@ def main(): api_url=options.api_url, version=options.api_version, poll_interval=options.poll_interval, - skip_ssl_cert_check=options.skip_ssl_cert_check) + skip_ssl_cert_check=options.skip_ssl_cert_check, + cloud_name=options.cloud_name) if len(args) < 1: sys.stderr.write("Missing first argument containing subcommand\n") @@ -599,7 +622,10 @@ def main(): if a0 == "cluster": api_version_number = float(options.api_version[1:]) - return clustermain(args, api_version_number) + if api_version_number >= 2.0: + return clustermainv2(args) + else: + return clustermain(args, api_version_number) if a0 == "action": return actionmain(args) diff --git a/qds_sdk/cloud/__init__.py b/qds_sdk/cloud/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/qds_sdk/cloud/aws_cloud.py b/qds_sdk/cloud/aws_cloud.py new file mode 100755 index 00000000..bfae5811 --- /dev/null +++ b/qds_sdk/cloud/aws_cloud.py @@ -0,0 +1,166 @@ +from qds_sdk.cloud.cloud import Cloud +class AwsCloud(Cloud): + ''' + qds_sdk.cloud.AwsCloud is the class which stores information about aws cloud config settings. + The objects of this class can be used to set aws cloud_config settings while create/update/clone a cluster. + ''' + + def __init__(self): + self.compute_config = {} + self.location = {} + self.network_config = {} + self.storage_config = {} + + def set_cloud_config(self, + compute_access_key=None, + compute_secret_key=None, + use_account_compute_creds=None, + aws_region=None, + aws_availability_zone=None, + role_instance_profile=None, + vpc_id=None, + subnet_id=None, + persistent_security_groups=None, + bastion_node_public_dns=None, + master_elastic_ip=None): + ''' + + Args: + compute_access_key: The access key for customer's aws account. This + is required for creating the cluster. + + compute_secret_key: The secret access key for customer's aws + account. This is required for creating the cluster. + + use_account_compute_creds: Set it to true to use the account’s compute + credentials for all clusters of the account.The default value is false + + aws_region: The AWS region in which the cluster is created. The default value is, us-east-1. + Valid values are, us-east-1, us-west-1, us-west-2, eu-west-1, sa-east1, ap-southeast-1, + and ap-northeast-1. + Doc: http://docs.qubole.com/en/latest/rest-api/cluster_api/create-new-cluster.html#ec2-settings + + aws_availability_zone: The preferred availability zone in which the cluster must be created. The default value is Any. + + role_instance_profile: IAM Role instance profile to attach on cluster + + vpc_id: The ID of the vpc in which the cluster is created. + In this vpc, the enableDnsHostnames parameter must be set to true. + + subnet_id: The ID of the subnet in which the cluster is created. This subnet must belong to the + above VPC and it can be a public/private subnet + + persistent_security_groups: security group to associate with each node of the cluster. + Typically used to provide access to external hosts + + bastion_node_public_dns: Specify the Bastion host public DNS name if private subnet is provided. + Do not specify this value for a public subnet. + + master_elastic_ip: It is the Elastic IP address for attaching to the cluster master + + ''' + + self.set_compute_config(use_account_compute_creds, compute_access_key, + compute_secret_key, role_instance_profile) + self.set_location(aws_region, aws_availability_zone) + self.set_network_config(bastion_node_public_dns, persistent_security_groups, + master_elastic_ip, vpc_id, subnet_id) + + def set_compute_config(self, + use_account_compute_creds=None, + compute_access_key=None, + compute_secret_key=None, + role_instance_profile=None): + self.compute_config['use_account_compute_creds'] = use_account_compute_creds + self.compute_config['compute_access_key'] = compute_access_key + self.compute_config['compute_secret_key'] = compute_secret_key + self.compute_config['role_instance_profile'] = role_instance_profile + + def set_location(self, + aws_region=None, + aws_availability_zone=None): + self.location['aws_region'] = aws_region + self.location['aws_availability_zone'] = aws_availability_zone + + def set_network_config(self, + bastion_node_public_dns=None, + persistent_security_groups=None, + master_elastic_ip=None, + vpc_id=None, + subnet_id=None): + self.network_config['bastion_node_public_dns'] = bastion_node_public_dns + self.network_config['persistent_security_groups'] = persistent_security_groups + self.network_config['master_elastic_ip'] = master_elastic_ip + self.network_config['vpc_id'] = vpc_id + self.network_config['subnet_id'] = subnet_id + + def set_cloud_config_from_arguments(self, arguments): + self.set_cloud_config(compute_access_key=arguments.compute_access_key, + compute_secret_key=arguments.compute_secret_key, + use_account_compute_creds=arguments.use_account_compute_creds, + aws_region=arguments.aws_region, + aws_availability_zone=arguments.aws_availability_zone, + role_instance_profile=arguments.role_instance_profile, + vpc_id=arguments.vpc_id, + subnet_id=arguments.subnet_id, + persistent_security_groups=arguments.persistent_security_groups, + bastion_node_public_dns=arguments.bastion_node_public_dns, + master_elastic_ip=arguments.master_elastic_ip) + + def create_parser(self, argparser): + # compute settings parser + compute_config = argparser.add_argument_group("compute config settings") + compute_creds = compute_config.add_mutually_exclusive_group() + compute_creds.add_argument("--enable-account-compute-creds", + dest="use_account_compute_creds", + action="store_true", + default=None, + help="to use account compute credentials") + compute_creds.add_argument("--disable-account-compute-creds", + dest="use_account_compute_creds", + action="store_false", + default=None, + help="to disable account compute credentials") + compute_config.add_argument("--compute-access-key", + dest="compute_access_key", + default=None, + help="access key for aws cluster") + compute_config.add_argument("--compute-secret-key", + dest="compute_secret_key", + default=None, + help="secret key for aws cluster") + compute_config.add_argument("--role-instance-profile", + dest="role_instance_profile", + help="IAM Role instance profile to attach on cluster", ) + + # location settings parser + location_group = argparser.add_argument_group("location config setttings") + location_group.add_argument("--aws-region", + dest="aws_region", + choices=["us-east-1", "us-west-2", "ap-northeast-1", "sa-east-1", + "eu-west-1", "ap-southeast-1", "us-west-1"], + help="aws region to create the cluster in", ) + location_group.add_argument("--aws-availability-zone", + dest="aws_availability_zone", + help="availability zone to" + + " create the cluster in", ) + + # network settings parser + network_config_group = argparser.add_argument_group("network config settings") + network_config_group.add_argument("--vpc-id", + dest="vpc_id", + help="vpc to create the cluster in", ) + network_config_group.add_argument("--subnet-id", + dest="subnet_id", + help="subnet to create the cluster in", ) + network_config_group.add_argument("--bastion-node-public-dns", + dest="bastion_node_public_dns", + help="public dns name of the bastion node. Required only if cluster is in private subnet of a EC2-VPC", ) + network_config_group.add_argument("--persistent-security-groups", + dest="persistent_security_groups", + help="a security group to attach with each" + + " node of the cluster. Typically used" + + " to provide access to external hosts", ) + network_config_group.add_argument("--master-elastic-ip", + dest="master_elastic_ip", + help="master elastic ip for cluster") \ No newline at end of file diff --git a/qds_sdk/cloud/azure_cloud.py b/qds_sdk/cloud/azure_cloud.py new file mode 100755 index 00000000..02de7a2b --- /dev/null +++ b/qds_sdk/cloud/azure_cloud.py @@ -0,0 +1,206 @@ +from qds_sdk.cloud.cloud import Cloud +class AzureCloud(Cloud): + ''' + qds_sdk.cloud.AzureCloud is the class which stores information about azure cloud config settings. + The objects of this class can be use to set azure cloud_config settings while create/update/clone a cluster. + ''' + + def __init__(self): + self.compute_config = {} + self.location = {} + self.network_config = {} + self.storage_config = {} + + def set_cloud_config(self, + compute_client_id=None, + compute_client_secret=None, + compute_subscription_id=None, + compute_tenant_id=None, + use_account_compute_creds=None, + location=None, + storage_access_key=None, + storage_account_name=None, + disk_storage_account_name=None, + disk_storage_account_resource_group_name=None, + persistent_security_groups=None, + bastion_node_public_dns=None, + vnet_name=None, + subnet_name=None, + vnet_resource_group_name=None, + master_elastic_ip=None): + ''' + + Args: + compute_client_id: Client id for azure cluster + + compute_client_secret: Client secret key for azure cluster + + compute_subscription_id: Subscription id for azure cluster + + compute_tenant_id: Tenant id for azure cluster + + use_account_compute_creds: Set it to true to use the account’s compute + credentials for all clusters of the account.The default value is false + + location: Location for azure cluster + + storage_access_key: Storage access key for azure cluster + + storage_account_name: Storage account name for azure cluster + + disk_storage_account_name: Disk storage account name for azure cluster + + disk_storage_account_resource_group_name: Disk storage account resource group + namefor azure cluster + + persistent_security_groups: security group to associate with each node of the cluster. + Typically used to provide access to external hosts + + bastion_node_public_dns: public dns name of the bastion node. + Required only if cluster is in private subnet of a EC2-VPC + + vnet_name: vnet name for azure + + subnet_name: subnet name for azure + + vnet_resource_group_name: vnet resource group name for azure + + master_elastic_ip: It is the Elastic IP address for attaching to the cluster master + + ''' + + self.set_compute_config(use_account_compute_creds, compute_tenant_id, + compute_subscription_id, compute_client_id, + compute_client_secret) + self.set_location(location) + self.set_network_config(bastion_node_public_dns, persistent_security_groups, + master_elastic_ip, vnet_name, subnet_name, + vnet_resource_group_name) + self.set_storage_config(storage_access_key, storage_account_name, + disk_storage_account_name, + disk_storage_account_resource_group_name) + + def set_compute_config(self, + use_account_compute_creds=None, + compute_tenant_id=None, + compute_subscription_id=None, + compute_client_id=None, + compute_client_secret=None): + self.compute_config['use_account_compute_creds'] = use_account_compute_creds + self.compute_config['compute_tenant_id'] = compute_tenant_id + self.compute_config['compute_subscription_id'] = compute_subscription_id + self.compute_config['compute_client_id'] = compute_client_id + self.compute_config['compute_client_secret'] = compute_client_secret + + def set_location(self, + location=None): + self.location['location'] = location + + def set_network_config(self, + bastion_node_public_dns=None, + persistent_security_groups=None, + master_elastic_ip=None, + vnet_name=None, + subnet_name=None, + vnet_resource_group_name=None): + self.network_config['bastion_node_public_dns'] = bastion_node_public_dns + self.network_config['persistent_security_groups'] = persistent_security_groups + self.network_config['master_elastic_ip'] = master_elastic_ip + self.network_config['vnet_name'] = vnet_name + self.network_config['subnet_name'] = subnet_name + self.network_config['vnet_resource_group_name'] = vnet_resource_group_name + + def set_storage_config(self, + storage_access_key=None, + storage_account_name=None, + disk_storage_account_name=None, + disk_storage_account_resource_group_name=None): + self.storage_config['storage_access_key'] = storage_access_key + self.storage_config['storage_account_name'] = storage_account_name + self.storage_config['disk_storage_account_name'] = disk_storage_account_name + self.storage_config['disk_storage_account_resource_group_name'] \ + = disk_storage_account_resource_group_name + + def set_cloud_config_from_arguments(self, arguments): + self.set_cloud_config(compute_client_id=arguments.compute_client_id, + compute_client_secret=arguments.compute_client_secret, + compute_subscription_id=arguments.compute_subscription_id, + compute_tenant_id=arguments.compute_tenant_id, + use_account_compute_creds=arguments.use_account_compute_creds, + location=arguments.location, + storage_access_key=arguments.storage_access_key, + storage_account_name=arguments.storage_account_name, + disk_storage_account_name=arguments.disk_storage_account_name, + disk_storage_account_resource_group_name=arguments.disk_storage_account_resource_group_name, + vnet_name=arguments.vnet_name, + subnet_name=arguments.subnet_name, + vnet_resource_group_name=arguments.vnet_resource_group_name) + + def create_parser(self, argparser): + # compute settings parser + compute_config = argparser.add_argument_group("compute config settings") + compute_creds = compute_config.add_mutually_exclusive_group() + compute_creds.add_argument("--enable-account-compute-creds", + dest="use_account_compute_creds", + action="store_true", + default=None, + help="to use account compute credentials") + compute_creds.add_argument("--disable-account-compute-creds", + dest="use_account_compute_creds", + action="store_false", + default=None, + help="to disable account compute credentials") + compute_config.add_argument("--compute-client-id", + dest="compute_client_id", + default=None, + help="client id for azure cluster") + compute_config.add_argument("--compute-client-secret", + dest="compute_client_secret", + default=None, + help="client secret key for azure cluster") + compute_config.add_argument("--compute-tenant-id", + dest="compute_tenant_id", + default=None, + help="tenant id for azure cluster") + compute_config.add_argument("--compute-subscription-id", + dest="compute_subscription_id", + default=None, + help="Subscription id for azure cluster") + + # location settings parser + location_group = argparser.add_argument_group("location config settings") + location_group.add_argument("--location", + dest="location", + default=None, + help="location for azure cluster") + + # network settings parser + network_config_group = argparser.add_argument_group("network config settings") + network_config_group.add_argument("--vnet-name", + dest="vnet_name", + help="vnet name for azure", ) + network_config_group.add_argument("--subnet-name", + dest="subnet_name", + help="subnet name for azure") + network_config_group.add_argument("--vnet-resource-group-name", + dest="vnet_resource_group_name", + help="vnet resource group name for azure") + + # storage config settings parser + storage_config = argparser.add_argument_group("storage config settings") + storage_config.add_argument("--storage-access-key", + dest="storage_access_key", + default=None, + help="storage access key for azure cluster") + storage_config.add_argument("--storage-account-name", + dest="storage_account_name", + default=None, + help="storage account name for azure cluster") + storage_config.add_argument("--disk-storage-account-name", + dest="disk_storage_account_name", + default=None, + help="disk storage account name for azure cluster") + storage_config.add_argument("--disk-storage-account-resource-group-name", + dest="disk_storage_account_resource_group_name", + default=None, + help="disk storage account resource group for azure cluster") \ No newline at end of file diff --git a/qds_sdk/cloud/cloud.py b/qds_sdk/cloud/cloud.py new file mode 100755 index 00000000..49053d87 --- /dev/null +++ b/qds_sdk/cloud/cloud.py @@ -0,0 +1,7 @@ +class Cloud: + + def create_parser(self, argparser): + return NotImplemented + + def set_cloud_config_from_arguments(self, arguments): + return NotImplemented diff --git a/qds_sdk/cloud/oracle_bmc_cloud.py b/qds_sdk/cloud/oracle_bmc_cloud.py new file mode 100755 index 00000000..68a7e538 --- /dev/null +++ b/qds_sdk/cloud/oracle_bmc_cloud.py @@ -0,0 +1,203 @@ +from qds_sdk.cloud.cloud import Cloud +class OracleBmcCloud(Cloud): + ''' + qds_sdk.cloud.OracleBmcCloud is the class which stores information about oracle bmc cloud config settings. + The objects of this class can be used to set oracle_bmc cloud_config settings while create/update/clone a cluster. + ''' + + def __init__(self): + self.compute_config = {} + self.location = {} + self.network_config = {} + self.storage_config = {} + + def set_cloud_config(self, + compute_tenant_id=None, + compute_user_id=None, + compute_key_finger_print=None, + compute_api_private_rsa_key=None, + use_account_compute_creds=None, + subnet_id=None, + oracle_region=None, + oracle_availability_domain=None, + compartment_id=None, + image_id=None, + vcn_id=None, + storage_tenant_id=None, + storage_user_id=None, + storage_key_finger_print=None, + storage_api_private_rsa_key=None): + ''' + + Args: + compute_tenant_id: compute tenant id for oracle cluster + + compute_user_id: compute user id for oracle cluster + + compute_key_finger_print: compute key fingerprint for oracle cluster + + compute_api_private_rsa_key: compute api private rsa key for oracle cluster + + use_account_compute_creds: Set it to true to use the account’s compute + credentials for all clusters of the account.The default value is false + + subnet_id: subnet id for oracle + + oracle_region: region to create the cluster in + + oracle_availability_domain: availability zone to create the cluster in + + compartment_id: compartment id for oracle cluster + + image_id: image id for oracle cloud + + vcn_id: vcn to create the cluster in + + storage_tenant_id: tenant id for oracle cluster + + storage_user_id: storage user id for oracle cluster + + storage_key_finger_print: storage key fingerprint for oracle cluster + + storage_api_private_rsa_key: storage api private rsa key for oracle cluster + + ''' + + self.set_compute_config(use_account_compute_creds, compute_tenant_id, + compute_user_id, compute_key_finger_print, + compute_api_private_rsa_key) + self.set_location(oracle_region, oracle_availability_domain) + self.set_network_config(vcn_id, subnet_id, + compartment_id, image_id) + self.set_storage_config(storage_tenant_id, storage_user_id, + storage_key_finger_print, storage_api_private_rsa_key) + + def set_compute_config(self, + use_account_compute_creds=None, + compute_tenant_id=None, + compute_user_id=None, + compute_key_finger_print=None, + compute_api_private_rsa_key=None): + self.compute_config['use_account_compute_creds'] = use_account_compute_creds + self.compute_config['compute_tenant_id'] = compute_tenant_id + self.compute_config['compute_user_id'] = compute_user_id + self.compute_config['compute_key_finger_print'] = compute_key_finger_print + self.compute_config['compute_api_private_rsa_key'] = compute_api_private_rsa_key + + def set_location(self, + oracle_region=None, + oracle_availability_domain=None): + self.location['region'] = oracle_region + self.location['availability_domain'] = oracle_availability_domain + + def set_network_config(self, + vcn_id=None, + subnet_id=None, + compartment_id=None, + image_id=None): + self.network_config['vcn_id'] = vcn_id + self.network_config['subnet_id'] = subnet_id + self.network_config['compartment_id'] = compartment_id + self.network_config['image_id'] = image_id + + def set_storage_config(self, + storage_tenant_id=None, + storage_user_id=None, + storage_key_finger_print=None, + storage_api_private_rsa_key=None): + self.storage_config['storage_tenant_id'] = storage_tenant_id + self.storage_config['storage_user_id'] = storage_user_id + self.storage_config['storage_key_finger_print'] = storage_key_finger_print + self.storage_config['storage_api_private_rsa_key'] = storage_api_private_rsa_key + + def set_cloud_config_from_arguments(self, arguments): + self.set_cloud_config(compute_tenant_id=arguments.compute_tenant_id, + compute_user_id=arguments.compute_user_id, + compute_key_finger_print=arguments.compute_key_finger_print, + compute_api_private_rsa_key=arguments.compute_api_private_rsa_key, + use_account_compute_creds=arguments.use_account_compute_creds, + subnet_id=arguments.subnet_id, + oracle_region=arguments.region, + oracle_availability_domain=arguments.availability_domain, + compartment_id=arguments.compartment_id, + image_id=arguments.image_id, + vcn_id=arguments.vcn_id, + storage_tenant_id=arguments.storage_tenant_id, + storage_user_id=arguments.storage_user_id, + storage_key_finger_print=arguments.storage_key_finger_print, + storage_api_private_rsa_key=arguments.storage_api_private_rsa_key) + + def create_parser(self, argparser): + # compute settings parser + compute_config = argparser.add_argument_group("compute config settings") + compute_creds = compute_config.add_mutually_exclusive_group() + compute_creds.add_argument("--enable-account-compute-creds", + dest="use_account_compute_creds", + action="store_true", + default=None, + help="to use account compute credentials") + compute_creds.add_argument("--disable-account-compute-creds", + dest="use_account_compute_creds", + action="store_false", + default=None, + help="to disable account compute credentials") + compute_config.add_argument("--compute-tenant-id", + dest="compute_tenant_id", + default=None, + help="tenant id for oracle cluster") + compute_config.add_argument("--compute-user-id", + dest="compute_user_id", + default=None, + help="compute user id for oracle cluster") + compute_config.add_argument("--compute-key-finger-print", + dest="compute_key_finger_print", + default=None, + help="compute key fingerprint for oracle cluster") + compute_config.add_argument("--compute-api-private-rsa-key", + dest="compute_api_private_rsa_key", + default=None, + help="compute api private rsa key for oracle cluster") + + # location settings parser + location_group = argparser.add_argument_group("location config settings") + location_group.add_argument("--oracle-region", + dest="region", + help="region to create the cluster in", ) + location_group.add_argument("--oracle-availability-zone", + dest="availability_domain", + help="availability zone to" + + " create the cluster in", ) + + # network settings parser + network_config_group = argparser.add_argument_group("network config settings") + network_config_group.add_argument("--compartment-id", + dest="compartment_id", + help="compartment id for oracle cluster") + network_config_group.add_argument("--image-id", + dest="image_id", + help="image id for oracle cloud") + network_config_group.add_argument("--vcn-id", + dest="vcn_id", + help="vcn to create the cluster in", ) + network_config_group.add_argument("--subnet-id", + dest="subnet_id", + help="subnet id for oracle") + + # storage config settings parser + storage_config = argparser.add_argument_group("storage config settings") + storage_config.add_argument("--storage-tenant-id", + dest="storage_tenant_id", + default=None, + help="tenant id for oracle cluster") + storage_config.add_argument("--storage-user-id", + dest="storage_user_id", + default=None, + help="storage user id for oracle cluster") + storage_config.add_argument("--storage-key-finger-print", + dest="storage_key_finger_print", + default=None, + help="storage key fingerprint for oracle cluster") + storage_config.add_argument("--storage-api-private-rsa-key", + dest="storage_api_private_rsa_key", + default=None, + help="storage api private rsa key for oracle cluster") \ No newline at end of file diff --git a/qds_sdk/cluster.py b/qds_sdk/cluster.py old mode 100644 new mode 100755 index d149e1e6..c096e85a --- a/qds_sdk/cluster.py +++ b/qds_sdk/cluster.py @@ -6,6 +6,7 @@ from qds_sdk.qubole import Qubole from qds_sdk.resource import Resource from argparse import ArgumentParser +from qds_sdk import util import logging import json @@ -895,7 +896,7 @@ def minimal_payload(self): creating or updating a cluster. """ payload = {"cluster": self.__dict__} - return _make_minimal(payload) + return util._make_minimal(payload) class ClusterInfoV13(): """ @@ -1189,21 +1190,4 @@ def minimal_payload(self): """ payload_dict = self.__dict__ payload_dict.pop("api_version", None) - return _make_minimal(payload_dict) - - -def _make_minimal(dictionary): - """ - This function removes all the keys whose value is either None or an empty - dictionary. - """ - new_dict = {} - for key, value in dictionary.items(): - if value is not None: - if isinstance(value, dict): - new_value = _make_minimal(value) - if new_value: - new_dict[key] = new_value - else: - new_dict[key] = value - return new_dict + return util._make_minimal(payload_dict) diff --git a/qds_sdk/clusterv2.py b/qds_sdk/clusterv2.py new file mode 100755 index 00000000..9d9fd79d --- /dev/null +++ b/qds_sdk/clusterv2.py @@ -0,0 +1,545 @@ +from qds_sdk.qubole import Qubole +from qds_sdk.resource import Resource +from qds_sdk.cloud.cloud import Cloud +from qds_sdk.engine import Engine +from qds_sdk import util +import argparse +import json + +def str2bool(v): + return v.lower() in ("yes", "true", "t", "1") + +class ClusterCmdLine: + + @staticmethod + def parsers(action): + argparser = argparse.ArgumentParser( + prog="qds.py cluster", + description="Cluster Operations for Qubole Data Service.") + subparsers = argparser.add_subparsers(title="Cluster operations") + + if action == "create": + create = subparsers.add_parser("create", help="Create a new cluster") + ClusterCmdLine.create_update_clone_parser(create, action="create") + create.set_defaults(func=ClusterV2.create) + + if action == "update": + update = subparsers.add_parser("update", help="Update the settings of an existing cluster") + ClusterCmdLine.create_update_clone_parser(update, action="update") + update.set_defaults(func=ClusterV2.update) + + if action == "clone": + clone = subparsers.add_parser("clone", help="Clone a cluster from an existing one") + ClusterCmdLine.create_update_clone_parser(clone, action="clone") + clone.set_defaults(func=ClusterV2.clone) + + return argparser + + @staticmethod + def create_update_clone_parser(subparser, action=None): + # cloud config parser + cloud = Qubole.get_cloud() + cloud.create_parser(subparser) + + # cluster info parser + ClusterInfoV2.cluster_info_parser(subparser, action) + + # engine config parser + Engine.engine_parser(subparser) + + @staticmethod + def run(args): + parser = ClusterCmdLine.parsers(args[0]) + arguments = parser.parse_args(args) + customer_ssh_key = util._read_file(arguments.customer_ssh_key_file) + + # This will set cluster info and monitoring settings + cluster_info = ClusterInfoV2(arguments.label) + cluster_info.set_cluster_info(disallow_cluster_termination=arguments.disallow_cluster_termination, + enable_ganglia_monitoring=arguments.enable_ganglia_monitoring, + datadog_api_token=arguments.datadog_api_token, + datadog_app_token=arguments.datadog_app_token, + node_bootstrap=arguments.node_bootstrap_file, + master_instance_type=arguments.master_instance_type, + slave_instance_type=arguments.slave_instance_type, + min_nodes=arguments.initial_nodes, + max_nodes=arguments.max_nodes, + slave_request_type=arguments.slave_request_type, + fallback_to_ondemand=arguments.fallback_to_ondemand, + custom_tags=arguments.custom_tags, + heterogeneous_config=arguments.heterogeneous_config, + maximum_bid_price_percentage=arguments.maximum_bid_price_percentage, + timeout_for_request=arguments.timeout_for_request, + maximum_spot_instance_percentage=arguments.maximum_spot_instance_percentage, + stable_maximum_bid_price_percentage=arguments.stable_maximum_bid_price_percentage, + stable_timeout_for_request=arguments.stable_timeout_for_request, + stable_spot_fallback=arguments.stable_spot_fallback, + idle_cluster_timeout=arguments.idle_cluster_timeout, + disk_count=arguments.count, + disk_type=arguments.disk_type, + disk_size=arguments.size, + upscaling_config=arguments.upscaling_config, + enable_encryption=arguments.encrypted_ephemerals, + customer_ssh_key=customer_ssh_key) + + # This will set cloud config settings + cloud_config = Qubole.get_cloud() + cloud_config.set_cloud_config_from_arguments(arguments) + + # This will set engine settings + engine_config = Engine(flavour=arguments.flavour) + engine_config.set_engine_config_settings(arguments) + + cluster_request = ClusterCmdLine.get_cluster_request_parameters(cluster_info, cloud_config, engine_config) + + action = args[0] + if action == "create": + return arguments.func(cluster_request) + else: + return arguments.func(arguments.cluster_id_label, cluster_request) + + @staticmethod + def get_cluster_request_parameters(cluster_info, cloud_config, engine_config): + ''' + Use this to return final minimal request from cluster_info, cloud_config or engine_config objects + Alternatively call util._make_minimal if only one object needs to be implemented + ''' + + cluster_request = {} + cloud_config = util._make_minimal(cloud_config.__dict__) + if bool(cloud_config): cluster_request['cloud_config'] = cloud_config + + engine_config = util._make_minimal(engine_config.__dict__) + if bool(engine_config): cluster_request['engine_config'] = engine_config + + cluster_request.update(util._make_minimal(cluster_info.__dict__)) + return cluster_request + +class ClusterInfoV2(object): + """ + qds_sdk.ClusterInfoV2 is the class which stores information about a cluster_info. + You can use objects of this class to create/update/clone a cluster. + """ + + def __init__(self, label): + """ + Args: + `label`: A list of labels that identify the cluster. At least one label + must be provided when creating a cluster. + """ + self.cluster_info = {} + self.cluster_info['label'] = label + self.monitoring = {} + self.internal = {} # right now not supported + + def set_cluster_info(self, + disallow_cluster_termination=None, + enable_ganglia_monitoring=None, + datadog_api_token=None, + datadog_app_token=None, + node_bootstrap=None, + master_instance_type=None, + slave_instance_type=None, + min_nodes=None, + max_nodes=None, + slave_request_type=None, + fallback_to_ondemand=None, + custom_tags=None, + heterogeneous_config=None, + maximum_bid_price_percentage=None, + timeout_for_request=None, + maximum_spot_instance_percentage=None, + stable_maximum_bid_price_percentage=None, + stable_timeout_for_request=None, + stable_spot_fallback=None, + idle_cluster_timeout=None, + disk_count=None, + disk_type=None, + disk_size=None, + upscaling_config=None, + enable_encryption=None, + customer_ssh_key=None, + cluster_name=None, + force_tunnel=None): + """ + Args: + + `disallow_cluster_termination`: Set this to True if you don't want + qubole to auto-terminate idle clusters. Use this option with + extreme caution. + + `enable_ganglia_monitoring`: Set this to True if you want to enable + ganglia monitoring for the cluster. + + `node_bootstrap`: name of the node bootstrap file for this + cluster. It should be in stored in S3 at + /scripts/hadoop/ + + `master_instance_type`: The instance type to use for the Hadoop master + node. + + `slave_instance_type`: The instance type to use for the Hadoop slave + nodes. + + `min_nodes`: Number of nodes to start the cluster with. + + `max_nodes`: Maximum number of nodes the cluster may be auto-scaled up + to. + + `slave_request_type`: Purchasing option for slave instances. + Valid values: "ondemand", "hybrid", "spot". + + `fallback_to_ondemand`: Fallback to on-demand nodes if spot nodes could not be + obtained. Valid only if slave_request_type is 'spot'. + + `maximum_bid_price_percentage`: ( Valid only when `slave_request_type` + is hybrid or spot.) Maximum value to bid for spot + instances, expressed as a percentage of the base price + for the slave node instance type. + + `timeout_for_request`: Timeout for a spot instance request (Unit: + minutes) + + `maximum_spot_instance_percentage`: Maximum percentage of instances + that may be purchased from the AWS Spot market. Valid only when + slave_request_type is "hybrid". + + `stable_maximum_bid_price_percentage`: Maximum value to bid for stable node spot + instances, expressed as a percentage of the base price + (applies to both master and slave nodes). + + `stable_timeout_for_request`: Timeout for a stable node spot instance request (Unit: + minutes) + + `stable_spot_fallback`: Whether to fallback to on-demand instances for + stable nodes if spot instances are not available + + `disk_count`: Number of EBS volumes to attach + to each instance of the cluster. + + `disk_type`: Type of the EBS volume. Valid + values are 'standard' (magnetic) and 'ssd'. + + `disk_size`: Size of each EBS volume, in GB. + + `enable_encryption`: Encrypt the ephemeral drives on the instance. + + `customer_ssh_key`: SSH key to use to login to the instances. + + `idle_cluster_timeout`: The buffer time (range in 0-6 hrs) after a cluster goes idle + and gets terminated, given cluster auto termination is on and no cluster specific + timeout has been set (default is 2 hrs) + + `heterogeneous_config` : Configuring heterogeneous nodes in Hadoop 2 and Spark clusters. + It implies that slave nodes can be of different instance types + + `custom_tags` : Custom tags to be set on all instances + of the cluster. Specified as JSON object (key-value pairs) + + `datadog_api_token` : Specify the Datadog API token to use the Datadog monitoring service + + `datadog_app_token` : Specify the Datadog APP token to use the Datadog monitoring service + + Doc: For getting details about arguments + http://docs.qubole.com/en/latest/rest-api/cluster_api/create-new-cluster.html#parameters + + """ + self.cluster_info['master_instance_type'] = master_instance_type + self.cluster_info['slave_instance_type'] = slave_instance_type + self.cluster_info['min_nodes'] = min_nodes + self.cluster_info['max_nodes'] = max_nodes + self.cluster_info['cluster_name'] = cluster_name + self.cluster_info['node_bootstrap'] = node_bootstrap + self.cluster_info['disallow_cluster_termination'] = disallow_cluster_termination + self.cluster_info['force_tunnel'] = force_tunnel + self.cluster_info['fallback_to_ondemand'] = fallback_to_ondemand + self.cluster_info['customer_ssh_key'] = customer_ssh_key + if custom_tags and custom_tags.strip(): + try: + self.cluster_info['custom_tags'] = json.loads(custom_tags.strip()) + except Exception as e: + raise Exception("Invalid JSON string for custom ec2 tags: %s" % e.message) + + self.cluster_info['heterogeneous_config'] = heterogeneous_config + self.cluster_info['slave_request_type'] = slave_request_type + self.cluster_info['idle_cluster_timeout'] = idle_cluster_timeout + self.cluster_info['spot_settings'] = {} + + self.set_spot_instance_settings(maximum_bid_price_percentage, timeout_for_request, maximum_spot_instance_percentage) + self.set_stable_spot_bid_settings(stable_maximum_bid_price_percentage, stable_timeout_for_request, stable_spot_fallback) + self.set_data_disk(disk_size, disk_count, disk_type, upscaling_config, enable_encryption) + self.set_monitoring(enable_ganglia_monitoring, datadog_api_token, datadog_app_token) + + def set_datadog_setting(self, + datadog_api_token=None, + datadog_app_token=None): + self.monitoring['datadog'] = {} + self.monitoring['datadog']['datadog_api_token'] = datadog_api_token + self.monitoring['datadog']['datadog_app_token'] = datadog_app_token + + def set_monitoring(self, + enable_ganglia_monitoring=None, + datadog_api_token=None, + datadog_app_token=None): + self.monitoring['ganglia'] = enable_ganglia_monitoring + self.set_datadog_setting(datadog_api_token, datadog_app_token) + + def set_spot_instance_settings(self, + maximum_bid_price_percentage=None, + timeout_for_request=None, + maximum_spot_instance_percentage=None): + self.cluster_info['spot_settings']['spot_instance_settings'] = {} + self.cluster_info['spot_settings']['spot_instance_settings']['maximum_bid_price_percentage'] = \ + maximum_bid_price_percentage + self.cluster_info['spot_settings']['spot_instance_settings']['timeout_for_request'] = timeout_for_request + self.cluster_info['spot_settings']['spot_instance_settings']['maximum_spot_instance_percentage'] = \ + maximum_spot_instance_percentage + + def set_stable_spot_bid_settings(self, + stable_maximum_bid_price_percentage=None, + stable_timeout_for_request=None, + stable_spot_fallback=None): + self.cluster_info['spot_settings']['stable_spot_bid_settings'] = {} + self.cluster_info['spot_settings']['stable_spot_bid_settings']['maximum_bid_price_percentage'] = \ + stable_maximum_bid_price_percentage + self.cluster_info['spot_settings']['stable_spot_bid_settings']['timeout_for_request'] = \ + stable_timeout_for_request + self.cluster_info['spot_settings']['stable_spot_bid_settings']['stable_spot_fallback'] = \ + stable_spot_fallback + + def set_data_disk(self, + disk_size=None, + disk_count=None, + disk_type=None, + upscaling_config=None, + enable_encryption=None): + self.cluster_info['datadisk'] = {} + self.cluster_info['datadisk']['size'] = disk_size + self.cluster_info['datadisk']['count'] = disk_count + self.cluster_info['datadisk']['type'] = disk_type + self.cluster_info['datadisk']['upscaling_config'] = upscaling_config + self.cluster_info['datadisk']['encryption'] = enable_encryption + + @staticmethod + def cluster_info_parser(argparser, action): + create_required = False + label_required = False + if action == "create": + create_required = True + elif action == "update": + argparser.add_argument("cluster_id_label", + help="id/label of the cluster to update") + elif action == "clone": + argparser.add_argument("cluster_id_label", + help="id/label of the cluster to update") + label_required = True + + argparser.add_argument("--label", dest="label", + nargs="+", required=(create_required or label_required), + help="list of labels for the cluster" + + " (atleast one label is required)") + cluster_info = argparser.add_argument_group("cluster_info") + cluster_info.add_argument("--master-instance-type", + dest="master_instance_type", + help="instance type to use for the hadoop" + + " master node") + cluster_info.add_argument("--slave-instance-type", + dest="slave_instance_type", + help="instance type to use for the hadoop" + + " slave nodes") + cluster_info.add_argument("--min-nodes", + dest="initial_nodes", + type=int, + help="number of nodes to start the" + + " cluster with", ) + cluster_info.add_argument("--max-nodes", + dest="max_nodes", + type=int, + help="maximum number of nodes the cluster" + + " may be auto-scaled up to") + cluster_info.add_argument("--idle-cluster-timeout", + dest="idle_cluster_timeout", + help="cluster termination timeout for idle cluster") + cluster_info.add_argument("--node-bootstrap-file", + dest="node_bootstrap_file", + help="""name of the node bootstrap file for this cluster. It + should be in stored in S3 at + /scripts/hadoop/NODE_BOOTSTRAP_FILE + """, ) + termination = cluster_info.add_mutually_exclusive_group() + termination.add_argument("--disallow-cluster-termination", + dest="disallow_cluster_termination", + action="store_true", + default=None, + help="don't auto-terminate idle clusters," + + " use this with extreme caution", ) + termination.add_argument("--allow-cluster-termination", + dest="disallow_cluster_termination", + action="store_false", + default=None, + help="auto-terminate idle clusters,") + fallback_to_ondemand_group = cluster_info.add_mutually_exclusive_group() + fallback_to_ondemand_group.add_argument("--fallback-to-ondemand", + dest="fallback_to_ondemand", + action="store_true", + default=None, + help="Fallback to on-demand nodes if spot nodes" + + " could not be obtained. Valid only if slave_request_type is spot", ) + fallback_to_ondemand_group.add_argument("--no-fallback-to-ondemand", + dest="fallback_to_ondemand", + action="store_false", + default=None, + help="Dont Fallback to on-demand nodes if spot nodes" + + " could not be obtained. Valid only if slave_request_type is spot", ) + cluster_info.add_argument("--customer-ssh-key", + dest="customer_ssh_key_file", + help="location for ssh key to use to" + + " login to the instance") + cluster_info.add_argument("--custom-tags", + dest="custom_tags", + help="""Custom tags to be set on all instances + of the cluster. Specified as JSON object (key-value pairs) + e.g. --custom-ec2-tags '{"key1":"value1", "key2":"value2"}' + """, ) + + # datadisk settings + datadisk_group = argparser.add_argument_group("data disk settings") + datadisk_group.add_argument("--count", + dest="count", + type=int, + help="Number of EBS volumes to attach to" + + " each instance of the cluster", ) + datadisk_group.add_argument("--disk-type", + dest="disk_type", + choices=["standard", "gp2"], + help="Type of the volume attached to the instances. Valid values are " + + "'standard' (magnetic) and 'gp2' (ssd).") + datadisk_group.add_argument("--size", + dest="size", + type=int, + help="Size of each EBS volume, in GB", ) + datadisk_group.add_argument("--upscaling-config", + dest="upscaling_config", + help="Upscaling config to be attached with the instances.", ) + ephemerals = datadisk_group.add_mutually_exclusive_group() + ephemerals.add_argument("--encrypted-ephemerals", + dest="encrypted_ephemerals", + action="store_true", + default=None, + help="encrypt the ephemeral drives on" + + " the instance", ) + ephemerals.add_argument("--no-encrypted-ephemerals", + dest="encrypted_ephemerals", + action="store_false", + default=None, + help="don't encrypt the ephemeral drives on" + + " the instance", ) + + cluster_info.add_argument("--heterogeneous-config", + dest="heterogeneous_config", + help="heterogeneous config for the cluster") + + cluster_info.add_argument("--slave-request-type", + dest="slave_request_type", + choices=["ondemand", "spot", "hybrid"], + help="purchasing option for slave instaces", ) + + # spot settings + spot_instance_group = argparser.add_argument_group("spot instance settings" + + " (valid only when slave-request-type is hybrid or spot)") + spot_instance_group.add_argument("--maximum-bid-price-percentage", + dest="maximum_bid_price_percentage", + type=float, + help="maximum value to bid for spot instances" + + " expressed as a percentage of the base" + + " price for the slave node instance type", ) + spot_instance_group.add_argument("--timeout-for-spot-request", + dest="timeout_for_request", + type=int, + help="timeout for a spot instance request" + + " unit: minutes") + spot_instance_group.add_argument("--maximum-spot-instance-percentage", + dest="maximum_spot_instance_percentage", + type=int, + help="maximum percentage of instances that may" + + " be purchased from the aws spot market," + + " valid only when slave-request-type" + + " is 'hybrid'", ) + + stable_spot_group = argparser.add_argument_group("stable spot instance settings") + stable_spot_group.add_argument("--stable-maximum-bid-price-percentage", + dest="stable_maximum_bid_price_percentage", + type=float, + help="maximum value to bid for stable node spot instances" + + " expressed as a percentage of the base" + + " price for the master and slave node instance types", ) + stable_spot_group.add_argument("--stable-timeout-for-spot-request", + dest="stable_timeout_for_request", + type=int, + help="timeout for a stable node spot instance request" + + " unit: minutes") + stable_spot_group.add_argument("--stable-allow-fallback", + dest="stable_spot_fallback", default=None, + type=str2bool, + help="whether to fallback to on-demand instances for stable nodes" + + " if spot instances aren't available") + # monitoring settings + monitoring_group = argparser.add_argument_group("monitoring settings") + ganglia = monitoring_group.add_mutually_exclusive_group() + ganglia.add_argument("--enable-ganglia-monitoring", + dest="enable_ganglia_monitoring", + action="store_true", + default=None, + help="enable ganglia monitoring for the" + + " cluster", ) + ganglia.add_argument("--disable-ganglia-monitoring", + dest="enable_ganglia_monitoring", + action="store_false", + default=None, + help="disable ganglia monitoring for the" + + " cluster", ) + + datadog_group = argparser.add_argument_group("datadog settings") + datadog_group.add_argument("--datadog-api-token", + dest="datadog_api_token", + default=None, + help="fernet key for airflow cluster", ) + datadog_group.add_argument("--datadog-app-token", + dest="datadog_app_token", + default=None, + help="overrides for airflow cluster", ) + +class ClusterV2(Resource): + + rest_entity_path = "clusters" + + @classmethod + def create(cls, cluster_info): + """ + Create a new cluster using information provided in `cluster_info`. + """ + conn = Qubole.agent(version="v2") + return conn.post(cls.rest_entity_path, data=cluster_info) + + @classmethod + def update(cls, cluster_id_label, cluster_info): + """ + Update the cluster with id/label `cluster_id_label` using information provided in + `cluster_info`. + """ + conn = Qubole.agent(version="v2") + return conn.put(cls.element_path(cluster_id_label), data=cluster_info) + + @classmethod + def clone(cls, cluster_id_label, cluster_info): + """ + Update the cluster with id/label `cluster_id_label` using information provided in + `cluster_info`. + """ + conn = Qubole.agent(version="v2") + return conn.post(cls.element_path(cluster_id_label) + '/clone', data=cluster_info) + + # implementation needed + @classmethod + def list(self, state=None): + pass \ No newline at end of file diff --git a/qds_sdk/engine.py b/qds_sdk/engine.py new file mode 100644 index 00000000..65b57d8b --- /dev/null +++ b/qds_sdk/engine.py @@ -0,0 +1,196 @@ +from qds_sdk import util +class Engine: + ''' + Use this class to set engine config settings of cluster + qds_sdk.engine.Engine is the class which stores information about engine config settings. + You can use objects of this class to set engine_config settings while create/update/clone a cluster. + ''' + + def __init__(self, flavour=None): + self.flavour = flavour + self.hadoop_settings = {} + self.presto_settings = {} + self.spark_settings = {} + self.airflow_settings ={} + + def set_engine_config(self, + custom_hadoop_config=None, + use_qubole_placement_policy=None, + fairscheduler_config_xml=None, + default_pool=None, + presto_version=None, + custom_presto_config=None, + spark_version=None, + custom_spark_config=None, + dbtap_id=None, + fernet_key=None, + overrides=None, + is_ha=None): + ''' + + Args: + custom_hadoop_config: Custom Hadoop configuration overrides. + + use_qubole_placement_policy: Use Qubole Block Placement policy for + clusters with spot nodes. + + fairscheduler_config_xml: XML string with custom configuration + parameters for the fair scheduler. + + default_pool: The default pool for the fair scheduler. + + presto_version: Version of presto to be used in cluster + + custom_presto_config: Custom Presto configuration overrides. + + spark_version: Version of spark to be used in cluster + + custom_spark_config: Specify the custom Spark configuration overrides + + dbtap_id: ID of the data store inside QDS + + fernet_key: Encryption key for sensitive information inside airflow database. + For example, user passwords and connections. It must be a 32 url-safe base64 encoded bytes. + + overrides: Airflow configuration to override the default settings.Use the following syntax for overrides: +
.=\n
.=... + + is_ha: Enabling HA config for cluster + + ''' + + self.set_hadoop_settings(custom_hadoop_config, use_qubole_placement_policy, is_ha, fairscheduler_config_xml, default_pool) + self.set_presto_settings(presto_version, custom_presto_config) + self.set_spark_settings(spark_version, custom_spark_config) + self.set_airflow_settings(dbtap_id, fernet_key, overrides) + + def set_fairscheduler_settings(self, + fairscheduler_config_xml=None, + default_pool=None): + self.hadoop_settings['fairscheduler_settings'] = {} + self.hadoop_settings['fairscheduler_settings']['fairscheduler_config_xml'] = \ + fairscheduler_config_xml + self.hadoop_settings['fairscheduler_settings']['default_pool'] = default_pool + + def set_hadoop_settings(self, + custom_hadoop_config=None, + use_qubole_placement_policy=None, + is_ha=None, + fairscheduler_config_xml=None, + default_pool=None): + self.hadoop_settings['custom_hadoop_config'] = custom_hadoop_config + self.hadoop_settings['use_qubole_placement_policy'] = use_qubole_placement_policy + self.hadoop_settings['is_ha'] = is_ha + self.set_fairscheduler_settings(fairscheduler_config_xml, default_pool) + + def set_presto_settings(self, + presto_version=None, + custom_presto_config=None): + self.presto_settings['presto_version'] = presto_version + self.presto_settings['custom_presto_config'] = custom_presto_config + + def set_spark_settings(self, + spark_version=None, + custom_spark_config=None): + self.spark_settings['spark_version'] = spark_version + self.spark_settings['custom_spark_config'] = custom_spark_config + + def set_airflow_settings(self, + dbtap_id=None, + fernet_key=None, + overrides=None): + self.airflow_settings['dbtap_id'] = dbtap_id + self.airflow_settings['fernet_key'] = fernet_key + self.airflow_settings['overrides'] = overrides + + def set_engine_config_settings(self, arguments): + custom_hadoop_config = util._read_file(arguments.custom_hadoop_config_file) + fairscheduler_config_xml = util._read_file(arguments.fairscheduler_config_xml_file) + custom_presto_config = util._read_file(arguments.presto_custom_config_file) + + self.set_engine_config(custom_hadoop_config=custom_hadoop_config, + use_qubole_placement_policy=arguments.use_qubole_placement_policy, + fairscheduler_config_xml=fairscheduler_config_xml, + default_pool=arguments.default_pool, + presto_version=arguments.presto_version, + custom_presto_config=custom_presto_config, + spark_version=arguments.spark_version, + custom_spark_config=arguments.custom_spark_config, + dbtap_id=arguments.dbtap_id, + fernet_key=arguments.fernet_key, + overrides=arguments.overrides) + + @staticmethod + def engine_parser(argparser): + engine_group = argparser.add_argument_group("engine settings") + engine_group.add_argument("--flavour", + dest="flavour", + choices=["hadoop", "hadoop2", "presto", "spark", "hbase", "airflow"], + default=None, + help="Set engine flavour") + + hadoop_settings_group = argparser.add_argument_group("hadoop settings") + hadoop_settings_group.add_argument("--custom-hadoop-config", + dest="custom_hadoop_config_file", + default=None, + help="location of file containing custom" + + " hadoop configuration overrides") + qubole_placement_policy_group = hadoop_settings_group.add_mutually_exclusive_group() + qubole_placement_policy_group.add_argument("--use-qubole-placement-policy", + dest="use_qubole_placement_policy", + action="store_true", + default=None, + help="Use Qubole Block Placement policy" + + " for clusters with spot nodes", ) + qubole_placement_policy_group.add_argument("--no-use-qubole-placement-policy", + dest="use_qubole_placement_policy", + action="store_false", + default=None, + help="Do not use Qubole Block Placement policy" + + " for clusters with spot nodes", ) + + fairscheduler_group = argparser.add_argument_group( + "fairscheduler configuration options") + fairscheduler_group.add_argument("--fairscheduler-config-xml", + dest="fairscheduler_config_xml_file", + help="location for file containing" + + " xml with custom configuration" + + " for the fairscheduler", ) + fairscheduler_group.add_argument("--fairscheduler-default-pool", + dest="default_pool", + help="default pool for the" + + " fairscheduler", ) + + presto_settings_group = argparser.add_argument_group("presto settings") + presto_settings_group.add_argument("--presto-version", + dest="presto_version", + default=None, + help="Version of presto for this cluster", ) + presto_settings_group.add_argument("--presto-custom-config", + dest="presto_custom_config_file", + help="location of file containg custom" + + " presto configuration overrides") + + spark_settings_group = argparser.add_argument_group("spark settings") + spark_settings_group.add_argument("--spark-version", + dest="spark_version", + default=None, + help="Version of spark for the cluster", ) + spark_settings_group.add_argument("--custom-spark-config", + dest="custom_spark_config", + default=None, + help="Custom config spark for this cluster", ) + + airflow_settings_group = argparser.add_argument_group("airflow settings") + airflow_settings_group.add_argument("--dbtap-id", + dest="dbtap_id", + default=None, + help="dbtap id for airflow cluster", ) + airflow_settings_group.add_argument("--fernet-key", + dest="fernet_key", + default=None, + help="fernet key for airflow cluster", ) + airflow_settings_group.add_argument("--overrides", + dest="overrides", + default=None, + help="overrides for airflow cluster", ) diff --git a/qds_sdk/qubole.py b/qds_sdk/qubole.py index f4913ec8..ba69f075 100644 --- a/qds_sdk/qubole.py +++ b/qds_sdk/qubole.py @@ -3,7 +3,6 @@ from qds_sdk.connection import Connection from qds_sdk.exception import ConfigError - log = logging.getLogger("qds_qubole") class QuboleAuth(requests.auth.AuthBase): @@ -29,11 +28,12 @@ class Qubole: version = None poll_interval = None skip_ssl_cert_check = None + cloud_name = None @classmethod def configure(cls, api_token, api_url="https://api.qubole.com/api/", version="v1.2", - poll_interval=5, skip_ssl_cert_check=False): + poll_interval=5, skip_ssl_cert_check=False, cloud_name="AWS"): """ Set parameters governing interaction with QDS @@ -46,6 +46,7 @@ def configure(cls, api_token, `poll_interval`: interval in secs when polling QDS for events """ + cls._auth = QuboleAuth(api_token) cls.api_token = api_token cls.version = version @@ -56,8 +57,13 @@ def configure(cls, api_token, else: cls.poll_interval = poll_interval cls.skip_ssl_cert_check = skip_ssl_cert_check + cls.cloud_name = cloud_name.lower() + + cached_agent = None + cloud = None + @classmethod def agent(cls, version=None): @@ -87,3 +93,27 @@ def agent(cls, version=None): cls.cached_agent = Connection(cls._auth, cls.rest_url, cls.skip_ssl_cert_check) return cls.cached_agent + + @classmethod + def get_cloud(cls, cloud_name=None): + if cloud_name and cloud_name.lower() not in ["aws", "oracle_bmc", "azure"]: + raise Exception("cloud should be 'aws', 'oracle_bmc' or 'azure'") + + if cloud_name: + return Qubole.get_cloud_object(cloud_name) + else: + if cls.cloud is None: + cls.cloud = cls.get_cloud_object(cls.cloud_name) + return cls.cloud + + @classmethod + def get_cloud_object(cls, cloud_name): + if cloud_name.lower() == "aws": + import qds_sdk.cloud.aws_cloud + return qds_sdk.cloud.aws_cloud.AwsCloud() + elif cloud_name.lower() == "oracle_bmc": + import qds_sdk.cloud.oracle_bmc_cloud + return qds_sdk.cloud.oracle_bmc_cloud.OracleBmcCloud() + elif cloud_name.lower() == "azure": + import qds_sdk.cloud.azure_cloud + return qds_sdk.cloud.azure_cloud.AzureCloud() \ No newline at end of file diff --git a/qds_sdk/util.py b/qds_sdk/util.py old mode 100644 new mode 100755 index 692725c9..1efe6865 --- a/qds_sdk/util.py +++ b/qds_sdk/util.py @@ -1,6 +1,6 @@ import re import optparse - +import sys class OptionParsingError(RuntimeError): def __init__(self, msg): @@ -142,3 +142,31 @@ def underscore(word): """ return re.sub(r'\B((?<=[a-z])[A-Z]|[A-Z](?=[a-z]))', r'_\1', word).lower() + +def _make_minimal(dictionary): + """ + This function removes all the keys whose value is either None or an empty + dictionary. + """ + new_dict = {} + for key, value in dictionary.items(): + if value is not None: + if isinstance(value, dict): + new_value = _make_minimal(value) + if new_value: + new_dict[key] = new_value + else: + new_dict[key] = value + return new_dict + +def _read_file(file_path): + file_content = None + if file_path is not None: + try: + with open(file_path) as f: + file_content = f.read() + except IOError as e: + sys.stderr.write("Unable to read %s: %s\n" % (file_path, str(e))) + raise IOError("Unable to read %s: %s\n" % (file_path, str(e))) + return file_content + diff --git a/setup.py b/setup.py index 0a5df4ab..5deaad69 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ def read(fname): license="Apache License 2.0", keywords="qubole sdk api", url="https://github.com/qubole/qds-sdk-py", - packages=['qds_sdk'], + packages=['qds_sdk', 'qds_sdk/cloud'], scripts=['bin/qds.py'], install_requires=INSTALL_REQUIRES, long_description=read('README.rst'), diff --git a/tests/test_cluster.py b/tests/test_cluster.py index e31925ef..ccbea0f7 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -12,6 +12,7 @@ from qds_sdk.connection import Connection from test_base import print_command from test_base import QdsCliTestCase +from qds_sdk.cloud.cloud import Cloud class TestClusterList(QdsCliTestCase): @@ -19,7 +20,7 @@ class TestClusterList(QdsCliTestCase): def test_minimal(self): sys.argv = ['qds.py', 'cluster', 'list'] print_command() - Connection._api_call = Mock(return_value=[]) + Connection._api_call = Mock(return_value={"provider":"aws"}) qds.main() Connection._api_call.assert_called_with("GET", "clusters", params=None) @@ -33,40 +34,41 @@ def test_id(self): def test_label(self): sys.argv = ['qds.py', 'cluster', 'list', '--label', 'test_label'] print_command() - Connection._api_call = Mock(return_value=[]) + Connection._api_call = Mock(return_value={"provider": "aws"}) qds.main() Connection._api_call.assert_called_with("GET", "clusters/test_label", params=None) def test_state_up(self): sys.argv = ['qds.py', 'cluster', 'list', '--state', 'up'] print_command() - Connection._api_call = Mock(return_value=[]) + Connection._api_call = Mock(return_value=[{"cluster" : {"state" : "up"}}]) qds.main() Connection._api_call.assert_called_with("GET", "clusters", params=None) def test_state_down(self): sys.argv = ['qds.py', 'cluster', 'list', '--state', 'down'] print_command() - Connection._api_call = Mock(return_value=[]) + Connection._api_call = Mock(return_value=[{"cluster": {"state": "up"}}]) qds.main() Connection._api_call.assert_called_with("GET", "clusters", params=None) def test_state_pending(self): sys.argv = ['qds.py', 'cluster', 'list', '--state', 'pending'] print_command() - Connection._api_call = Mock(return_value=[]) + Connection._api_call = Mock(return_value=[{"cluster": {"state": "up"}}]) qds.main() Connection._api_call.assert_called_with("GET", "clusters", params=None) def test_state_terminating(self): sys.argv = ['qds.py', 'cluster', 'list', '--state', 'terminating'] print_command() - Connection._api_call = Mock(return_value=[]) + Connection._api_call = Mock(return_value=[{"cluster": {"state": "up"}}]) qds.main() Connection._api_call.assert_called_with("GET", "clusters", params=None) def test_state_invalid(self): sys.argv = ['qds.py', 'cluster', 'list', '--state', 'invalid'] + Connection._api_call = Mock(return_value={"provider": "aws"}) print_command() with self.assertRaises(SystemExit): qds.main() @@ -82,12 +84,14 @@ def test_success(self): def test_no_argument(self): sys.argv = ['qds.py', 'cluster', 'delete'] + Connection._api_call = Mock(return_value={"provider": "aws"}) print_command() with self.assertRaises(SystemExit): qds.main() def test_more_arguments(self): sys.argv = ['qds.py', 'cluster', 'delete', '1', '2'] + Connection._api_call = Mock(return_value={"provider": "aws"}) print_command() with self.assertRaises(SystemExit): qds.main() diff --git a/tests/test_clusterv2.py b/tests/test_clusterv2.py new file mode 100644 index 00000000..3f1c58fe --- /dev/null +++ b/tests/test_clusterv2.py @@ -0,0 +1,387 @@ +from __future__ import print_function +import sys +import os +if sys.version_info > (2, 7, 0): + import unittest +else: + import unittest2 as unittest +from mock import Mock +import tempfile +sys.path.append(os.path.join(os.path.dirname(__file__), '../bin')) +import qds +from qds_sdk.connection import Connection +from test_base import print_command +from test_base import QdsCliTestCase +from qds_sdk.cloud.cloud import Cloud +from qds_sdk.qubole import Qubole + +class TestClusterCreate(QdsCliTestCase): + def test_minimal(self): + sys.argv = ['qds.py','--version', 'v2', 'cluster', 'create', '--label', 'test_label', + '--compute-access-key', 'aki', '--compute-secret-key', 'sak'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', + {'cluster_info': + {'label': ['test_label'] + }, + 'cloud_config': { + 'compute_config': { + 'compute_secret_key': 'sak', + 'compute_access_key': 'aki'}} + }) + + def test_cluster_info(self): + sys.argv = ['qds.py', '--version', 'v2', 'cluster', 'create', '--label', 'test_label', + '--compute-access-key', 'aki', '--compute-secret-key', 'sak', '--min-nodes', '3', + '--max-nodes', '5', '--disallow-cluster-termination', '--enable-ganglia-monitoring', + '--node-bootstrap-file', 'test_file_name', '--master-instance-type', + 'm1.xlarge','--slave-instance-type', 'm1.large', '--encrypted-ephemerals'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', + {'cloud_config': {'compute_config': {'compute_secret_key': 'sak', + 'compute_access_key': 'aki'}}, + 'monitoring': {'ganglia': True}, + 'cluster_info': {'slave_instance_type': 'm1.large', 'min_nodes': 3, + 'max_nodes': 5, 'master_instance_type': 'm1.xlarge', + 'label': ['test_label'], + 'node_bootstrap': 'test_file_name', + 'disallow_cluster_termination': True, + 'datadisk': {'encryption': True}}}) + + def test_aws_compute_config(self): + sys.argv = ['qds.py', '--version', 'v2', 'cluster', 'create', '--label', 'test_label', + '--enable-account-compute-creds'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', + {'cloud_config': { + 'compute_config': {'use_account_compute_creds': True}}, + 'cluster_info': {'label': ['test_label']}}) + + + def test_aws_network_config(self): + sys.argv = ['qds.py', '--version', 'v2', 'cluster', 'create', '--label', 'test_label', + '--enable-account-compute-creds', '--vpc-id', 'vpc-12345678', '--subnet-id', 'subnet-12345678', + '--bastion-node-public-dns', 'dummydns','--persistent-security-groups', + 'foopsg','--master-elastic-ip', "10.10.10.10"] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', {'cloud_config': {'compute_config': {'use_account_compute_creds': True}, + 'network_config': {'subnet_id': 'subnet-12345678', + 'vpc_id': 'vpc-12345678', + 'master_elastic_ip': '10.10.10.10', + 'persistent_security_groups': 'foopsg', + 'bastion_node_public_dns': 'dummydns'}}, + 'cluster_info': {'label': ['test_label']}}) + + def test_aws_location_config(self): + sys.argv = ['qds.py', '--version', 'v2', 'cluster', 'create', '--label', 'test_label', + '--aws-region', 'us-east-1', '--aws-availability-zone', 'us-east-1a'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', {'cloud_config': {'location': { + 'aws_availability_zone': 'us-east-1a', + 'aws_region': 'us-east-1'}}, + 'cluster_info': {'label': ['test_label']}}) + + def test_oracle_bmc_compute_config(self): + sys.argv = ['qds.py', '--version', 'v2', '--cloud', 'ORACLE_BMC', 'cluster', 'create', '--label', 'test_label', + '--compute-tenant-id', 'xxx11', '--compute-user-id', 'yyyy11', '--compute-key-finger-print', + 'zzz22', '--compute-api-private-rsa-key', 'aaa'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', {'cloud_config': {'compute_config': + {'compute_key_finger_print': 'zzz22', + 'compute_api_private_rsa_key': 'aaa', + 'compute_user_id': 'yyyy11', + 'compute_tenant_id': 'xxx11'}}, + 'cluster_info': {'label': ['test_label']}}) + + def test_oracle_bmc_storage_config(self): + sys.argv = ['qds.py', '--version', 'v2', '--cloud', 'ORACLE_BMC', 'cluster', 'create', '--label', 'test_label', + '--storage-tenant-id', 'xxx11', '--storage-user-id', 'yyyy11', '--storage-key-finger-print', + 'zzz22', '--storage-api-private-rsa-key', 'aaa'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', {'cloud_config': + {'storage_config': + {'storage_key_finger_print': 'zzz22', + 'storage_api_private_rsa_key': 'aaa', + 'storage_user_id': 'yyyy11', + 'storage_tenant_id': 'xxx11'}}, + 'cluster_info': {'label': ['test_label']}}) + + def test_oracle_bmc_network_config(self): + sys.argv = ['qds.py', '--version', 'v2', '--cloud', 'ORACLE_BMC', 'cluster', 'create', '--label', 'test_label', + '--compartment-id', 'abc-compartment', '--image-id', 'abc-image', '--vcn-id', 'vcn-1', + '--subnet-id', 'subnet-1' ] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', {'cloud_config': {'network_config': + {'subnet_id': 'subnet-1', + 'vcn_id': 'vcn-1', + 'compartment_id': 'abc-compartment', + 'image_id': 'abc-image'}}, + 'cluster_info': {'label': ['test_label']}}) + + def test_oracle_bmc_location_config(self): + sys.argv = ['qds.py', '--version', 'v2', '--cloud', 'ORACLE_BMC', 'cluster', 'create', '--label', 'test_label', + '--oracle-region', 'us-phoenix-1', '--oracle-availability-zone', 'phx-ad-1'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', {'cloud_config': {'location': + {'region': 'us-phoenix-1', + 'availability_domain': 'phx-ad-1'}}, + 'cluster_info': {'label': ['test_label']}}) + + def test_azure_compute_config(self): + sys.argv = ['qds.py', '--version', 'v2', '--cloud', 'AZURE', 'cluster', 'create', '--label', 'test_label', + '--compute-client-id', 'testclientid', '--compute-client-secret', 'testclientsecret', + '--compute-tenant-id', 'testtenantid', '--compute-subscription-id', 'testsubscriptionid', + '--disable-account-compute-creds'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', + {'cloud_config': { + 'compute_config': {'compute_subscription_id': 'testsubscriptionid', + 'compute_client_id': 'testclientid', + 'compute_client_secret': 'testclientsecret', + 'use_account_compute_creds': 'False', + 'compute_tenant_id': 'testtenantid', + 'use_account_compute_creds': False}}, + 'cluster_info': {'label': ['test_label']}}) + + def test_azure_storage_config(self): + sys.argv = ['qds.py', '--version', 'v2', '--cloud', 'AZURE', 'cluster', 'create', '--label', 'test_label', + '--storage-access-key', 'testkey', '--storage-account-name', 'test_account_name', + '--disk-storage-account-name', 'testaccname', '--disk-storage-account-resource-group-name', + 'testgrpname'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', + {'cluster_info': + {'label': ['test_label']}, + 'cloud_config': + {'storage_config': + {'storage_access_key': 'testkey', + 'storage_account_name': 'test_account_name', + 'disk_storage_account_name': 'testaccname', + 'disk_storage_account_resource_group_name': 'testgrpname'} + } + }) + + def test_azure_network_config(self): + sys.argv = ['qds.py', '--version', 'v2', '--cloud', 'AZURE', 'cluster', 'create', '--label', 'test_label', + '--vnet-name', 'testvnet', '--subnet-name', 'testsubnet', + '--vnet-resource-group-name', 'vnetresname'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', + {'cloud_config': { + 'network_config': {'vnet_resource_group_name': 'vnetresname', + 'subnet_name': 'testsubnet', + 'vnet_name': 'testvnet'}}, + 'cluster_info': {'label': ['test_label']}}) + + def test_presto_engine_config(self): + with tempfile.NamedTemporaryFile() as temp: + temp.write("config.properties:\na=1\nb=2".encode("utf8")) + temp.flush() + sys.argv = ['qds.py', '--version', 'v2', 'cluster', 'create', '--label', 'test_label', + '--flavour', 'presto', '--presto-custom-config', temp.name] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', + {'engine_config': + {'flavour': 'presto', + 'presto_settings': { + 'custom_presto_config': 'config.properties:\na=1\nb=2'}}, + 'cluster_info': {'label': ['test_label']}}) + + def test_spark_engine_config(self): + with tempfile.NamedTemporaryFile() as temp: + temp.write("config.properties:\na=1\nb=2".encode("utf8")) + temp.flush() + sys.argv = ['qds.py', '--version', 'v2', 'cluster', 'create', '--label', 'test_label', + '--flavour', 'spark', '--custom-spark-config', 'spark-overrides'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', + {'engine_config': + {'flavour': 'spark', + 'spark_settings': { + 'custom_spark_config': 'spark-overrides'}}, + 'cluster_info': {'label': ['test_label'],}}) + + + def test_persistent_security_groups_v2(self): + sys.argv = ['qds.py', '--version', 'v2', 'cluster', 'create', '--label', 'test_label', + '--persistent-security-groups', 'sg1, sg2'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', + {'cluster_info': + {'label': ['test_label']}, + 'cloud_config': + {'network_config': + {'persistent_security_groups': 'sg1, sg2'} + } + }) + + def test_data_disk_v2(self): + sys.argv = ['qds.py', '--version', 'v2', 'cluster', 'create', '--label', 'test_label', + '--count', '1', '--size', '100', '--disk-type', 'standard'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', + {'cluster_info': { + 'datadisk': {'count': 1, 'type': 'standard', 'size': 100}, + 'label': ['test_label']}}) + + def test_heterogeneous_config_v2(self): + sys.argv = ['qds.py', '--version', 'v2', 'cluster', 'create', '--label', 'test_label', + '--heterogeneous-config', 'test'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', + {'cluster_info': + {'label': ['test_label'], + 'heterogeneous_config': 'test' + } + }) + + +class TestClusterUpdate(QdsCliTestCase): + def test_minimal(self): + sys.argv = ['qds.py', '--version', 'v2', 'cluster', 'update', '123'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('PUT', 'clusters/123', {}) + + def test_aws_cloud_config(self): + sys.argv = ['qds.py', '--version', 'v2', 'cluster', 'update', '123', + '--compute-access-key', 'aki', '--aws-region', 'us-east-1', + '--bastion-node-public-dns', 'dummydns'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('PUT', 'clusters/123', {'cloud_config': + {'compute_config': + {'compute_access_key': 'aki'}, + 'location': + {'aws_region': 'us-east-1'}, + 'network_config': + {'bastion_node_public_dns': 'dummydns'}} + }) + + + def test_azure_cloud_config(self): + sys.argv = ['qds.py', '--version', 'v2', '--cloud', 'AZURE', 'cluster', 'update', '123', + '--vnet-name', 'testvnet', '--storage-account-name', 'test_account_name', + '--compute-subscription-id', 'testsubscriptionid'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('PUT', 'clusters/123', {'cloud_config': {'compute_config': + {'compute_subscription_id': 'testsubscriptionid'}, + 'storage_config': {'storage_account_name': 'test_account_name'}, + 'network_config': {'vnet_name': 'testvnet'}}}) + + def test_oracle_bmc_cloud_config(self): + sys.argv = ['qds.py', '--version', 'v2', '--cloud', 'ORACLE_BMC', 'cluster', 'update', '123', + '--oracle-region', 'us-phoenix-1', '--compartment-id', 'abc-compartment', + '--storage-tenant-id', 'xxx11', '--compute-user-id', 'yyyy11'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('PUT', 'clusters/123', {'cloud_config': + {'network_config': + {'compartment_id': 'abc-compartment'}, + 'compute_config': {'compute_user_id': 'yyyy11'}, + 'storage_config': {'storage_tenant_id': 'xxx11'}, + 'location': {'region': 'us-phoenix-1'} + } + }) + + def test_engine_config(self): + with tempfile.NamedTemporaryFile() as temp: + temp.write("a=1\nb=2".encode("utf8")) + temp.flush() + sys.argv = ['qds.py', '--version', 'v2', 'cluster', 'update', '123', + '--use-qubole-placement-policy', '--custom-hadoop-config', + temp.name] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('PUT', 'clusters/123', {'engine_config': + {'hadoop_settings': + {'use_qubole_placement_policy': True, + 'custom_hadoop_config': 'a=1\nb=2'}} + }) + + def test_cluster_info(self): + sys.argv = ['qds.py', '--version', 'v2', 'cluster', 'update', '123', + '--slave-request-type', 'ondemand', '--max-nodes', '6', + '--disable-ganglia-monitoring'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('PUT', 'clusters/123', {'monitoring': {'ganglia': False}, + 'cluster_info': { + 'slave_request_type': 'ondemand', + 'max_nodes': 6}}) + + +class TestClusterClone(QdsCliTestCase): + + def test_minimal(self): + sys.argv = ['qds.py', '--version', 'v2', 'cluster', 'clone', '1234', '--label', 'test_label1', 'test_label2'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters/1234/clone', {'cluster_info': + {'label': ['test_label1', 'test_label2']}}) \ No newline at end of file From 3ca44d0ab5d51d78897c68c48f910ff461539dee Mon Sep 17 00:00:00 2001 From: Tanish Gupta Date: Mon, 15 May 2017 15:22:19 +0530 Subject: [PATCH 07/10] dev: fix: SDK-203: Fixed Non-ASCII character issue --- qds_sdk/cloud/aws_cloud.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qds_sdk/cloud/aws_cloud.py b/qds_sdk/cloud/aws_cloud.py index bfae5811..b4f56156 100755 --- a/qds_sdk/cloud/aws_cloud.py +++ b/qds_sdk/cloud/aws_cloud.py @@ -32,7 +32,7 @@ def set_cloud_config(self, compute_secret_key: The secret access key for customer's aws account. This is required for creating the cluster. - use_account_compute_creds: Set it to true to use the account’s compute + use_account_compute_creds: Set it to true to use the account's compute credentials for all clusters of the account.The default value is false aws_region: The AWS region in which the cluster is created. The default value is, us-east-1. From f7188ecd62b75b39e974f16800753e088267f122 Mon Sep 17 00:00:00 2001 From: Tanish Gupta Date: Tue, 16 May 2017 16:25:11 +0530 Subject: [PATCH 08/10] SDK-203-1: Fixed bad ascii character issue --- qds_sdk/cloud/azure_cloud.py | 2 +- qds_sdk/cloud/oracle_bmc_cloud.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/qds_sdk/cloud/azure_cloud.py b/qds_sdk/cloud/azure_cloud.py index 02de7a2b..9dece151 100755 --- a/qds_sdk/cloud/azure_cloud.py +++ b/qds_sdk/cloud/azure_cloud.py @@ -39,7 +39,7 @@ def set_cloud_config(self, compute_tenant_id: Tenant id for azure cluster - use_account_compute_creds: Set it to true to use the account’s compute + use_account_compute_creds: Set it to true to use the account's compute credentials for all clusters of the account.The default value is false location: Location for azure cluster diff --git a/qds_sdk/cloud/oracle_bmc_cloud.py b/qds_sdk/cloud/oracle_bmc_cloud.py index 68a7e538..b482b281 100755 --- a/qds_sdk/cloud/oracle_bmc_cloud.py +++ b/qds_sdk/cloud/oracle_bmc_cloud.py @@ -38,7 +38,7 @@ def set_cloud_config(self, compute_api_private_rsa_key: compute api private rsa key for oracle cluster - use_account_compute_creds: Set it to true to use the account’s compute + use_account_compute_creds: Set it to true to use the account's compute credentials for all clusters of the account.The default value is false subnet_id: subnet id for oracle From 6f60dbb89462808fe8cc757d649fb97fd30ccfb3 Mon Sep 17 00:00:00 2001 From: abhijitjaiswal Date: Wed, 17 May 2017 16:51:30 +0530 Subject: [PATCH 09/10] sdk-198: sdk times out while submitting query in hipaa environment --- qds_sdk/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qds_sdk/connection.py b/qds_sdk/connection.py index b389190a..469e5cfa 100644 --- a/qds_sdk/connection.py +++ b/qds_sdk/connection.py @@ -26,7 +26,7 @@ def init_poolmanager(self, connections, maxsize, self.poolmanager = PoolManager(num_pools=connections, maxsize=maxsize, block=block, - ssl_version=ssl.PROTOCOL_TLSv1) + ssl_version=ssl.PROTOCOL_SSLv23) class Connection: From b4bc114a2bcb10c9022c56cba3026dbb25d7cdd4 Mon Sep 17 00:00:00 2001 From: harsh Date: Fri, 19 May 2017 15:06:10 -0700 Subject: [PATCH 10/10] fix: usr: Bug fix for access denied error while downloading file. Pull Request #190. SDK-191: Aws throws Access denied errors in situations where the versioning on s3 files is enabled and an attempt to download the file is done by specifying the version(default beahvior by aws boto2). This is seen in boto2 and the workaround is to retry downloading again without specifying the version of the file. --- qds_sdk/commands.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/qds_sdk/commands.py b/qds_sdk/commands.py index 26b8cd88..e4ae1fa9 100644 --- a/qds_sdk/commands.py +++ b/qds_sdk/commands.py @@ -1342,7 +1342,18 @@ def _is_complete_data_available(bucket_paths, num_result_dir): raise Exception("Results file not available on s3 yet. This can be because of s3 eventual consistency issues.") log.info("Downloading file from %s" % s3_path) if delim is None: - key_instance.get_contents_to_file(fp) # cb=_callback + try: + key_instance.get_contents_to_file(fp) # cb=_callback + except boto.exception.S3ResponseError as e: + if (e.status == 403): + # SDK-191, boto gives an error while fetching the objects using versions which happens by default + # in the get_contents_to_file() api. So attempt one without specifying version. + log.warn("Access denied while fetching the s3 object. Retrying without specifying the version....") + key_instance.open() + fp.write(key_instance.read()) + key_instance.close() + else: + raise else: # Get contents as string. Replace parameters and write to file. _read_iteratively(key_instance, fp, delim=delim)