Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to run on an ec2 instance #272

Merged
merged 7 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 162 additions & 2 deletions sdgym/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from datetime import datetime
from pathlib import Path

import boto3
import compress_pickle
import numpy as np
import pandas as pd
Expand Down Expand Up @@ -442,12 +443,160 @@ def _get_empty_dataframe(compute_quality_score, sdmetrics):
return scores


def _directory_exists(bucket_name, s3_file_path):
# Find the last occurrence of '/' in the file path
last_slash_index = s3_file_path.rfind('/')
directory_prefix = s3_file_path[:last_slash_index + 1]
s3_client = boto3.client('s3')
response = s3_client.list_objects_v2(
Bucket=bucket_name, Prefix=directory_prefix, Delimiter='/')
return 'Contents' in response or 'CommonPrefixes' in response


def _check_write_permissions(bucket_name):
s3 = boto3.client('s3')
# Check write permissions by attempting to upload an empty object to the bucket
try:
s3.put_object(Bucket=bucket_name, Key='__test__', Body=b'')
write_permission = True
except Exception:
write_permission = False
finally:
# Clean up the test object
if write_permission:
s3.delete_object(Bucket=bucket_name, Key='__test__')
return write_permission


def _parse_s3_path(s3_path):
if '/' not in s3_path:
raise ValueError("""Invalid S3 path format.
Expected '<bucket_name>/<path_to_file>'.""")
# Split only on the first '/'
bucket_name, s3_file_path = s3_path.split('/', 1)
if not _directory_exists(bucket_name, s3_file_path):
raise ValueError(f'Directories in {s3_file_path} do not exist')
if not _check_write_permissions(bucket_name):
raise ValueError('No write permissions allowed for the bucket.')

return bucket_name, s3_file_path


def _create_sdgym_script(params, output_filepath):
lajohn4747 marked this conversation as resolved.
Show resolved Hide resolved
bucket_name, key_name = _parse_s3_path(output_filepath)
session = boto3.session.Session()
credentials = session.get_credentials()
synthesizer_string = 'synthesizers=['
for synthesizer in params['synthesizers']:
synthesizer_string += synthesizer.__name__ + ', '
synthesizer_string += ']'
# The indentation of the string is important for the python script
script_content = f"""import boto3
from io import StringIO
import sdgym
from sdgym.synthesizers.sdv import (CopulaGANSynthesizer, CTGANSynthesizer, FastMLPreset,
GaussianCopulaSynthesizer, HMASynthesizer, PARSynthesizer, SDVRelationalSynthesizer,
SDVTabularSynthesizer,TVAESynthesizer)

results = sdgym.benchmark_single_table(
{synthesizer_string}, custom_synthesizers={params['custom_synthesizers']},
sdv_datasets={params['sdv_datasets']},
additional_datasets_folder={params['additional_datasets_folder']},
limit_dataset_size={params['limit_dataset_size']},
compute_quality_score={params['compute_quality_score']},
sdmetrics={params['sdmetrics']}, timeout={params['timeout']},
detailed_results_folder={params['detailed_results_folder']},
multi_processing_config={params['multi_processing_config']}
)

# Convert DataFrame to CSV string
csv_buffer = StringIO()
results.to_csv(csv_buffer, index=False)
s3 = boto3.client('s3',
aws_access_key_id='{credentials.access_key}',
aws_secret_access_key='{credentials.secret_key}')


# Upload CSV to S3
response = s3.put_object(
Bucket='{bucket_name}',
Key='{key_name}',
Body=csv_buffer.getvalue()
)
"""

return script_content


def _create_instance_on_ec2(script_content):
ec2_client = boto3.client('ec2')
session = boto3.session.Session()
credentials = session.get_credentials()
print(f'This instance is being created in region: {session.region_name}') # noqa

# User data script to install the library
user_data_script = f"""#!/bin/bash
sudo apt update -y
sudo apt install python3-pip -y
echo "======== Install SDGYM ============"
sudo pip3 install sdgym
sudo pip3 install anyio
pip3 list
echo "======== Write Script ==========="
sudo touch ~/sdgym_script.py
echo "{script_content}" > ~/sdgym_script.py
echo "======== Run Script ==========="
sudo python3 ~/sdgym_script.py
echo "======== Terminate ==========="
sudo apt install awscli -y
aws configure set aws_access_key_id {credentials.access_key}
aws configure set aws_secret_access_key {credentials.secret_key}
aws configure set region {session.region_name}
lajohn4747 marked this conversation as resolved.
Show resolved Hide resolved
INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)
aws ec2 terminate-instances --instance-ids $INSTANCE_ID
"""

response = ec2_client.run_instances(
ImageId='ami-07d9b9ddc6cd8dd30',
InstanceType='t2.medium',
Comment on lines +560 to +561
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be configurable arguments on the function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was one I was wondering about. This requires, the user to put in a lot more information about the configuration. The block mapping in line 548, those fields depend on the image. Instance type depends on the image. I think it's fine to hard code because these configurations exist and would require different scripts for each type of VM and errors on the ec2 instance (not enough memory, incorrect os script, etc.) are pretty much on the user to debug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave out configuration for now and add it later if we need to

MinCount=1,
MaxCount=1,
UserData=user_data_script,
TagSpecifications=[
{
'ResourceType': 'instance',
'Tags': [
{
'Key': 'Name',
'Value': 'SDGym_Temp'
}
]
}
],
BlockDeviceMappings=[
{
'DeviceName': '/dev/sda1',
'Ebs': {
'VolumeSize': 16, # Specify the desired size in GB
lajohn4747 marked this conversation as resolved.
Show resolved Hide resolved
'VolumeType': 'gp2' # Change the volume type as needed
}
}
],
)

# Wait until the instance is running before terminating
instance_id = response['Instances'][0]['InstanceId']
waiter = ec2_client.get_waiter('instance_status_ok')
waiter.wait(InstanceIds=[instance_id])
print(f'Job kicked off for SDGym on {instance_id}') # noqa


def benchmark_single_table(synthesizers=DEFAULT_SYNTHESIZERS, custom_synthesizers=None,
sdv_datasets=DEFAULT_DATASETS, additional_datasets_folder=None,
limit_dataset_size=False, compute_quality_score=True,
sdmetrics=DEFAULT_METRICS, timeout=None, output_filepath=None,
detailed_results_folder=None, show_progress=False,
multi_processing_config=None):
multi_processing_config=None, run_on_ec2=False):
"""Run the SDGym benchmark on single-table datasets.

Args:
Expand Down Expand Up @@ -487,7 +636,9 @@ def benchmark_single_table(synthesizers=DEFAULT_SYNTHESIZERS, custom_synthesizer
timeout is enforced.
output_filepath (str or ``None``):
A file path for where to write the output as a csv file. If ``None``, no output
is written.
is written. If run_on_ec2 flag output_filepath needs to be defined and
the filepath should be structured as: {s3_bucket_name}/{path_to_file}
Please make sure the path exists and permissions are given.
detailed_results_folder (str or ``None``):
The folder for where to store the intermediary results. If ``None``, do not store
the intermediate results anywhere.
Expand All @@ -504,6 +655,15 @@ def benchmark_single_table(synthesizers=DEFAULT_SYNTHESIZERS, custom_synthesizer
pandas.DataFrame:
A table containing one row per synthesizer + dataset + metric.
"""
if run_on_ec2:
print("This will create an instance for the current AWS user's account.") # noqa
if output_filepath is not None:
script_content = _create_sdgym_script(dict(locals()), output_filepath)
_create_instance_on_ec2(script_content)
else:
raise ValueError('In order to run on EC2, please provide an S3 folder output.')
return None

_validate_inputs(output_filepath, detailed_results_folder, synthesizers, custom_synthesizers)

_create_detailed_results_directory(detailed_results_folder)
Expand Down
5 changes: 5 additions & 0 deletions sdgym/cli/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,11 @@ def _get_parser():
action='store_true',
help='Print a progress bar using tqdm.'
)
run.add_argument(
'run_on_ec2',
action='store_true',
help='Run job on created ec2 instance with environment aws variables'
)
run.add_argument(
'-t',
'--timeout',
Expand Down
153 changes: 153 additions & 0 deletions tests/unit/test_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import pytest

from sdgym import benchmark_single_table
from sdgym.benchmark import _check_write_permissions, _create_sdgym_script, _directory_exists
from sdgym.synthesizers import CTGANSynthesizer, GaussianCopulaSynthesizer


@patch('sdgym.benchmark.os.path')
Expand Down Expand Up @@ -81,3 +83,154 @@ def test_benchmark_single_table_with_timeout(mock_multiprocessing, mock__score):
'error': {0: 'Synthesizer Timeout'}
})
pd.testing.assert_frame_equal(scores, expected_scores)


@patch('sdgym.benchmark.boto3.client')
def test__directory_exists(mock_client):
# Setup
mock_client.return_value.list_objects_v2.return_value = {
'Contents': [
{
'Key': 'example.txt',
'ETag': '"1234567890abcdef1234567890abcdef"',
'Size': 1024,
'StorageClass': 'STANDARD'
},
{
'Key': 'example_folder/',
'ETag': '"0987654321fedcba0987654321fedcba"',
'Size': 0,
'StorageClass': 'STANDARD'
}
],
'CommonPrefixes': [
{
'Prefix': 'example_folder/subfolder1/'
},
{
'Prefix': 'example_folder/subfolder2/'
}
]
}

# Run and Assert
assert _directory_exists('bucket', 'file_path/mock.csv')

# Setup Failure
mock_client.return_value.list_objects_v2.return_value = {}

# Run and Assert
assert not _directory_exists('bucket', 'file_path/mock.csv')


@patch('sdgym.benchmark.boto3.client')
def test__check_write_permissions(mock_client):
# Setup
mock_client.return_value.put_object.side_effect = Exception('Simulated error')

# Run and Assert
assert not _check_write_permissions('bucket')

# Setup for success
mock_client.return_value.put_object.side_effect = None

# Run and Assert
assert _check_write_permissions('bucket')


@patch('sdgym.benchmark._directory_exists')
@patch('sdgym.benchmark._check_write_permissions')
@patch('sdgym.benchmark.boto3.session.Session')
@patch('sdgym.benchmark._create_instance_on_ec2')
def test_run_ec2_flag(
create_ec2_mock,
session_mock,
mock_write_permissions,
mock_directory_exists
):
"""Test that the benchmarking function updates the progress bar on one line."""
# Setup
create_ec2_mock.return_value = MagicMock()
session_mock.get_credentials.return_value = MagicMock()
mock_write_permissions.return_value = True
mock_directory_exists.return_value = True

# Run
benchmark_single_table(run_on_ec2=True, output_filepath='BucketName/path')

# Assert
create_ec2_mock.assert_called_once()

# Run
with pytest.raises(ValueError,
match=r'In order to run on EC2, please provide an S3 folder output.'):
benchmark_single_table(run_on_ec2=True)

# Assert
create_ec2_mock.assert_called_once()

# Run
with pytest.raises(ValueError, match=r"""Invalid S3 path format.
Expected '<bucket_name>/<path_to_file>'."""):
benchmark_single_table(run_on_ec2=True, output_filepath='Wrong_Format')

# Assert
create_ec2_mock.assert_called_once()

# Setup for failure in permissions
mock_write_permissions.return_value = False

# Run
with pytest.raises(ValueError,
match=r'No write permissions allowed for the bucket.'):
benchmark_single_table(run_on_ec2=True, output_filepath='BucketName/path')

# Setup for failure in directory exists
mock_write_permissions.return_value = True
mock_directory_exists.return_value = False

# Run
with pytest.raises(ValueError,
match=r'Directories in mock/path do not exist'):
benchmark_single_table(run_on_ec2=True, output_filepath='BucketName/mock/path')


@patch('sdgym.benchmark._directory_exists')
@patch('sdgym.benchmark._check_write_permissions')
@patch('sdgym.benchmark.boto3.session.Session')
def test__create_sdgym_script(session_mock, mock_write_permissions, mock_directory_exists):
# Setup
session_mock.get_credentials.return_value = MagicMock()
test_params = {
'synthesizers': [GaussianCopulaSynthesizer, CTGANSynthesizer],
'custom_synthesizers': None,
'sdv_datasets': [
'adult', 'alarm', 'census',
'child', 'expedia_hotel_logs',
'insurance', 'intrusion', 'news', 'covtype'
],
'additional_datasets_folder': None,
'limit_dataset_size': True,
'compute_quality_score': False,
'sdmetrics': [('NewRowSynthesis', {'synthetic_sample_size': 1000})],
'timeout': 600,
'output_filepath': 'sdgym-results/address_comments.csv',
'detailed_results_folder': None,
'show_progress': False,
'multi_processing_config': None,
'dummy': True
}
mock_write_permissions.return_value = True
mock_directory_exists.return_value = True

# Run
result = _create_sdgym_script(test_params, 'Bucket/Filepath')

# Assert
assert 'synthesizers=[GaussianCopulaSynthesizer, CTGANSynthesizer, ]' in result
assert 'detailed_results_folder=None' in result
assert 'multi_processing_config=None' in result
assert "sdmetrics=[('NewRowSynthesis', {'synthetic_sample_size': 1000})]" in result
assert 'timeout=600' in result
assert 'compute_quality_score=False' in result
assert 'import boto3' in result
Loading