diff --git a/frameworks/cassandra/src/main/dist/svc.yml b/frameworks/cassandra/src/main/dist/svc.yml index b32762dfdb8..a1aa4380319 100644 --- a/frameworks/cassandra/src/main/dist/svc.yml +++ b/frameworks/cassandra/src/main/dist/svc.yml @@ -142,7 +142,7 @@ pods: cmd: > for f in $(find container-path/snapshot/ -maxdepth 1 -mindepth 1 -type d ! -name "system_*" ! -name "system") ; do for t in $(find "$f" -maxdepth 1 -mindepth 1 -type d) ; do - ./apache-cassandra-{{CASSANDRA_VERSION}}/bin/sstableloader -d {{TASKCFG_ALL_LOCAL_SEEDS}} "$t" ; + ./apache-cassandra-{{CASSANDRA_VERSION}}/bin/sstableloader -f apache-cassandra-{{CASSANDRA_VERSION}}/conf/cassandra.yaml -d {{TASKCFG_ALL_LOCAL_SEEDS}} -p {{TASKCFG_ALL_CASSANDRA_NATIVE_TRANSPORT_PORT}} "$t" ; done done diff --git a/frameworks/cassandra/tests/config.py b/frameworks/cassandra/tests/config.py index 000013d05be..140390695c3 100644 --- a/frameworks/cassandra/tests/config.py +++ b/frameworks/cassandra/tests/config.py @@ -38,12 +38,13 @@ def qualified_job_name(job_name): return 'test.cassandra.{}'.format(job_name) +def get_jobs_folder(): + return os.path.join(os.path.dirname(os.path.realpath(__file__)), 'jobs') + + def install_cassandra_jobs(): - jobs_folder = os.path.join( - os.path.dirname(os.path.realpath(__file__)), 'jobs' - ) for job in TEST_JOBS: - install_job(job, jobs_folder) + install_job(job, get_jobs_folder()) def install_job(job_name, jobs_folder): @@ -91,4 +92,8 @@ def launch_and_verify_job(job_name): def remove_cassandra_jobs(): for job in TEST_JOBS: - cmd.run_cli('job remove {}'.format(qualified_job_name(job))) + remove_job(job) + + +def remove_job(job_name): + cmd.run_cli('job remove {}'.format(qualified_job_name(job_name))) diff --git a/frameworks/cassandra/tests/test_soak.py b/frameworks/cassandra/tests/test_soak.py index 0a9d9d038ea..b35d1a3167f 100644 --- a/frameworks/cassandra/tests/test_soak.py +++ b/frameworks/cassandra/tests/test_soak.py @@ -1,34 +1,106 @@ import json +import os +import uuid +import dcos import pytest +import shakedown from tests.config import ( DEFAULT_TASK_COUNT, DELETE_DATA_JOB, PACKAGE_NAME, + TEST_JOBS, + VERIFY_DATA_JOB, + VERIFY_DELETION_JOB, + WRITE_DATA_JOB, + get_jobs_folder, install_cassandra_jobs, + install_job, launch_and_verify_job, remove_cassandra_jobs, + remove_job, ) from tests.test_backup import run_backup_and_restore +import sdk_api as api +import sdk_plan as plan +import sdk_spin as spin import sdk_test_upgrade +import sdk_utils as utils -def setup_module(module): - install_cassandra_jobs() +class EnvironmentContext(object): + """Context manager for temporarily overriding local process envvars.""" + def __init__(self, variable_mapping=None, **variables): + self.new_variables = {} -def teardown_module(module): - remove_cassandra_jobs() + self.new_variables.update( + {} if variable_mapping is None else variable_mapping + ) + self.new_variables.update(variables) + + def __enter__(self): + self.original_variables = os.environ + for k, v in self.new_variables.items(): + os.environ[k] = v + + def __exit__(self, *args): + for k, v in self.new_variables.items(): + if k not in self.original_variables: + del os.environ[k] + else: + os.environ[k] = self.original_variables[k] + + +class JobContext(object): + """Context manager for installing and cleaning up metronome jobs.""" + + def __init__(self, job_names): + self.job_names = job_names + + def __enter__(self): + for j in self.job_names: + install_job(j, get_jobs_folder()) + + def __exit__(self, *args): + for j in self.job_names: + remove_job(j) + + +class DataContext(object): + """Context manager for temporarily installing data in a cluster.""" + + def __init__(self, init_jobs=None, cleanup_jobs=None): + self.init_jobs = init_jobs if init_jobs is not None else [] + self.cleanup_jobs = cleanup_jobs if cleanup_jobs is not None else [] + + def __enter__(self): + for j in self.init_jobs: + launch_and_verify_job(j) + + def __exit__(self, *args): + for j in self.cleanup_jobs: + launch_and_verify_job(j) + + +def get_dcos_cassandra_plan(service_name): + utils.out('Waiting for {} plan to complete...'.format(service_name)) + + def fn(): + return api.get(service_name, '/v1/plan') + return spin.time_wait_return(fn) @pytest.mark.soak_backup def test_backup_and_restore(): - run_backup_and_restore() - - # Since this is run on the soak cluster and state is retained, we have to also delete the test - # data in preparation for the next run. - launch_and_verify_job(DELETE_DATA_JOB) + # Since this is run on the soak cluster and state is retained, we have to + # also delete the test data in preparation for the next run. + data_context = DataContext( + cleanup_jobs=[DELETE_DATA_JOB, VERIFY_DELETION_JOB] + ) + with JobContext(TEST_JOBS), data_context: + run_backup_and_restore() @pytest.mark.soak_upgrade @@ -42,3 +114,74 @@ def test_soak_upgrade_downgrade(): sdk_test_upgrade.soak_upgrade_downgrade( PACKAGE_NAME, DEFAULT_TASK_COUNT, install_options ) + + +@pytest.mark.soak_migration +def test_cassandra_migration(): + backup_service_name = os.getenv('CASSANDRA_BACKUP_CLUSTER_NAME') + restore_service_name = os.getenv('CASSANDRA_RESTORE_CLUSTER_NAME') + + env = EnvironmentContext( + CASSANDRA_NODE_ADDRESS=os.getenv( + 'BACKUP_NODE_ADDRESS', 'node-0.cassandra.mesos' + ), + CASSANDRA_NODE_PORT=os.getenv('BACKUP_NODE_PORT', '9042') + ) + plan_parameters = { + 'S3_BUCKET_NAME': os.getenv( + 'AWS_BUCKET_NAME', 'infinity-framework-test' + ), + 'AWS_ACCESS_KEY_ID': os.getenv('AWS_ACCESS_KEY_ID'), + 'AWS_SECRET_ACCESS_KEY': os.getenv('AWS_SECRET_ACCESS_KEY'), + 'AWS_REGION': os.getenv('AWS_REGION', 'us-west-2'), + 'SNAPSHOT_NAME': str(uuid.uuid1()), + 'CASSANDRA_KEYSPACES': '"testspace1 testspace2"', + } + + data_context = DataContext( + init_jobs=[WRITE_DATA_JOB, VERIFY_DATA_JOB], + cleanup_jobs=[DELETE_DATA_JOB, VERIFY_DELETION_JOB] + ) + # Install and run the write/delete data jobs against backup cluster, + # running dcos-cassandra-service + with env, JobContext(TEST_JOBS), data_context: + # Back this cluster up to S3 + backup_parameters = { + 'backup_name': plan_parameters['SNAPSHOT_NAME'], + 's3_access_key': plan_parameters['AWS_ACCESS_KEY_ID'], + 's3_secret_key': plan_parameters['AWS_SECRET_ACCESS_KEY'], + 'external_location': 's3://{}'.format(plan_parameters['S3_BUCKET_NAME']), + } + dcos.http.put( + '{}v1/backup/start'.format( + shakedown.dcos_service_url(backup_service_name) + ), + json=backup_parameters + ) + spin.time_wait_noisy( + lambda: get_dcos_cassandra_plan( + backup_service_name + ).json()['status'] == 'COMPLETE' + ) + + env = EnvironmentContext( + CASSANDRA_NODE_ADDRESS=os.getenv( + 'RESTORE_NODE_ADDRESS', 'node-0-server.sdk-cassandra.mesos' + ), + CASSANDRA_NODE_PORT=os.getenv('RESTORE_NODE_PORT', '9052') + ) + + data_context = DataContext( + cleanup_jobs=[VERIFY_DATA_JOB, DELETE_DATA_JOB, VERIFY_DELETION_JOB] + ) + with env, JobContext(TEST_JOBS), data_context: + plan.start_plan( + restore_service_name, 'restore-s3', parameters=plan_parameters + ) + spin.time_wait_noisy( + lambda: ( + plan.get_plan( + restore_service_name, 'restore-s3' + ).json()['status'] == 'COMPLETE' + ) + )