Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FDS-2386] Synapse entity tracking and code concurrency updates #1505

Open
wants to merge 36 commits into
base: develop
Choose a base branch
from

Conversation

BryanFauble
Copy link
Contributor

@BryanFauble BryanFauble commented Sep 17, 2024

I will be adding comments to specific areas of the code that should be looked at and carefully considered. Below covers the overall gist of what is going on here:

Problem:

  1. When an entity is retrieved from the synapse python client it is not stored for further use within the context of a request hitting schematic. The entity is re-retrieved several times throughout the lifecycle of the request causing significant bottlenecks when retrieving data out of Synapse.
  2. When downloading manifests a temporary directory is being used when running on AWS, but not locally, or for integration tests. As a result concurrent requests in those contexts were causing manifest files to be overwritten for new tests and causing previous tests to fail.
  3. A few concepts for great expectations (suites and checkpoints) were constantly being created and torn down, even during requests where the checkpoint/suite was not created. This leads to more concurrency type issues where a checkpoint/suite could be torn down in the middle of a request, or at a point in time it isn't expected.

Solution:

  1. Creating a simple "pull through cache" that can be added via composition to the classes that interact with Synapse. This is acting as a small (albeit hacky) shim layer between schematic and SYNPY. The usage of this entity tracker allows the code to function largely as it has been without the need to more in-depth refactoring. This means that an entity for a user session is pulled out of memory before it is pulled from Synapse again.
  2. Writing manifests to a temporary directory in all cases except for when being used as a CLI tool.
  3. Updating the logic for tearing down the great expectation suite and checkpoints to only consider tearing down the checkpoints that itself created.

Testing:

  1. I have been stepping through the code with the integration tests that have been written to verify functionality.
  2. Will need to have a verbose testing plan in place to verify that schematic continues to function as expected. @linglp @andrewelamb Are you two available to help with the concepts of a plan for helping to test all of these changes?

pyproject.toml Outdated Show resolved Hide resolved
@@ -75,13 +76,14 @@ Flask-Cors = {version = "^3.0.10", optional = true}
uWSGI = {version = "^2.0.21", optional = true}
Jinja2 = {version = ">2.11.3", optional = true}
asyncio = "^3.4.3"
jaeger-client = {version = "^4.8.0", optional = true}
flask-opentracing = {version="^2.0.0", optional = true}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flask/jaeger are deprecated. Swapping over to the otel-api/sdk libraries instead.

@@ -135,41 +131,3 @@ testpaths = [
filterwarnings = [
"ignore::DeprecationWarning"
]
markers = [
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were moved to pytest.ini to stop a bunch of warning from being emitted.

Comment on lines +57 to +59
FlaskInstrumentor().instrument(
request_hook=request_hook, response_hook=response_hook
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most all the code in this script was just moved except for this FlaskInstruement and the request/response hook. This is coming from this library: https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/instrumentation/opentelemetry-instrumentation-flask - Which allows auto instrumentation of the flask server that is running here

datasetId=dataset_id,
downloadFile=True,
newManifestName=new_manifest_name,
use_temporary_folder=False,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When running in CLI mode we will not be downloading manifest files to a temporary folder, instead continue to use the defined manifest folder from the config.

Comment on lines +119 to +120
anonymous_usage_statistics = AnonymizedUsageStatisticsConfig(enabled=False)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This removes extra calls from leaving schematic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a comment?

Comment on lines +154 to +158
self.expectation_suite_name = f"Manifest_test_suite_{uuid.uuid4()}"
expectation_suite = self.context.add_expectation_suite(
expectation_suite_name=self.expectation_suite_name,
)
self.suite = expectation_suite
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found issues when tests were running concurrently where great expectation test suites were stepping over eachother, causing tests to break in Un-expected places. With this change the JSON file that is created for each test suite run is unique, and cleaned up after the test suite has ran.

No more will another test clean these up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same notes about concurrency also apply when this code is running as an API server. If 2 users were sending in a request at the same time this is likely to have caused issues if they lined up just perfectly.

@@ -421,7 +409,7 @@ def build_checkpoint(self):
adds checkpoint to self
"""
# create manifest checkpoint
self.checkpoint_name = "manifest_checkpoint"
self.checkpoint_name = f"manifest_checkpoint_{uuid.uuid4()}"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a unique checkpoint name to prevent name collisions with other GE runs.

Comment on lines +364 to 365
# TODO: Do we need to build data docs here during normal operation?
self.context.build_data_docs(resource_identifiers=[suite_identifier])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewelamb @linglp @milen-sage - Is this code needed? It builds a bunch of HTML files, but I couldn't understand if they were actually used anywhere?

Comment on lines -276 to -279
# check if suite has been created. If so, delete it
if os.path.exists("great_expectations/expectations/Manifest_test_suite.json"):
os.remove("great_expectations/expectations/Manifest_test_suite.json")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer cleaning up the test suite run here. Instead relying on the cleanup to happen in the finally block after the expectation runs.

Comment on lines +163 to +164
if logger.isEnabledFor(logging.DEBUG):
t_GE = perf_counter()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some extra code to help guard against extra things from being tracked in memory (It's super minor)

Comment on lines +183 to +185
"batch_identifiers": {
"default_identifier_name": f"manifestID_{uuid.uuid4()}"
},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unique identifier to prevent name collisions

Comment on lines -324 to +332
for error in sorted(v.iter_errors(annotation), key=exceptions.relevance):
for sorted_error in sorted(
v.iter_errors(annotation), key=exceptions.relevance
):
errorRow = str(i + 2)
errorCol = error.path[-1] if len(error.path) > 0 else "Wrong schema"
errorColName = error.path[0] if len(error.path) > 0 else "Wrong schema"
errorMsg = error.message[0:500]
errorVal = error.instance if len(error.path) > 0 else "Wrong schema"
errorColName = (
sorted_error.path[0]
if len(sorted_error.path) > 0
else "Wrong schema"
)
errorMsg = sorted_error.message[0:500]
errorVal = (
sorted_error.instance
if len(sorted_error.path) > 0
else "Wrong schema"
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just automatically re-formatted code. I did not make any changes here.

@@ -0,0 +1,103 @@
"""This script is responsible for creating a 'pull through cache' class that can be
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is the main star of all these changes. Please read carefully.

Comment on lines +124 to +126
cleanup_temporary_storage(
temporary_manifest_storage, time_delta_seconds=3600
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of blanket removing previously downloaded manifest files instead this will have a 1 hour delay to clean them up. This was implemented to prevent a manifest from getting cleaned up that might be actively being used, or used for a successful cache pull (ie: Not downloading the manifest file again).

Comment on lines +131 to 175
download_location = create_temp_folder(
path=temporary_manifest_storage,
prefix=f"{self.manifest_id}-{time.time()}-",
)
else:
download_location = CONFIG.manifest_folder
manifest_data = self.syn.get(
self.manifest_id,
downloadLocation=download_location,
ifcollision="overwrite.local",
if use_temporary_folder:
download_location = create_temp_folder(
path=CONFIG.manifest_folder,
prefix=f"{self.manifest_id}-{time.time()}-",
)
else:
download_location = CONFIG.manifest_folder

manifest_data = self.synapse_entity_tracker.get(
synapse_id=self.manifest_id,
syn=self.syn,
download_file=True,
retrieve_if_not_present=True,
download_location=download_location,
)

# This is doing a rename of the downloaded file. The reason this is important
# is that if we are re-using a file that was previously downloaded, but the
# file had been renamed. The file downloaded from the Synapse client is just
# a direct copy of that renamed file. This code will set the name of the file
# to the original name that was used to download the file. Note: An MD5 checksum
# of the file will still be performed so if the file has changed, it will be
# downloaded again.
filename = manifest_data._file_handle.fileName
if filename != os.path.basename(manifest_data.path):
parent_folder = os.path.dirname(manifest_data.path)
manifest_original_name_and_path = os.path.join(parent_folder, filename)

self.syn.cache.remove(
file_handle_id=manifest_data.dataFileHandleId, path=manifest_data.path
)
os.rename(manifest_data.path, manifest_original_name_and_path)
manifest_data.path = manifest_original_name_and_path
self.syn.cache.add(
file_handle_id=manifest_data.dataFileHandleId,
path=manifest_original_name_and_path,
md5=manifest_data._file_handle.contentMd5,
)

return manifest_data
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a potentially impactful change here @andrewelamb @linglp @milen-sage that could use a good look. Basically what this is solving:

(When NOT running on AWS)

  1. When manifest were downloaded the downloadAs name, or the name of the actual file, could conflict with a file that was already in the location being downloaded to. This would overwrite the file and potentially cause the test, or whatever kick off that code to fail.

When running on either AWS or not

  1. When downloading an entity where we already have a local copy of it (Think of a file that exists within the synapse cachemap) it will copy that file over to the requested location. The issue however is that the file that is copied retains whatever name it has been renamed to (Some tests do this, and code in schematic does this). Because of this I have the os.rename logic in place to set the name of the copied file back to the expected file name so that any subsequent code behaves as expected.

projectName = self.syn.get(projectId, downloadFile=False).name
projects.append((projectId, projectName))
project_name_from_project_header = project_id_to_name_dict.get(projectId)
projects.append((projectId, project_name_from_project_header))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few things to note here on these changes:

  1. I have added a change to SYNPY ([FDS-2386] Add owner id to the credentials to be used later on synapsePythonClient#1130) that stored the ID of the user profile that logged into Synapse. This removes the need to call to getUserProfile() and saves an extra HTTP call.
  2. I am storing the result of getting the paginated list of project headers into the synapse_entity_tracker. It is really slow to crawl this information and it occurs several times. By doing this the content is re-used in the context of the request.
  3. I removed the need to call syn.get() for each of the projects as the paginated list of project headers contained all the data we needed.

Comment on lines +1496 to 1499
table_parent_id = self.getDatasetProject(datasetId=datasetId)
existing_table_id = self.syn.findEntityId(
name=table_name, parent=table_parent_id
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Swapping to findEntityId prevents the need to walk the project to find the applicable table ID. Instead we can utilize the Synapse Rest API to find the ID for us.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will return None if not found

Comment on lines 1530 to 1541
table_entity = self.synapse_entity_tracker.get(
synapse_id=existing_table_id, syn=self.syn, download_file=False
)
annos = OldAnnotations(
id=table_entity.id,
etag=table_entity.etag,
values=table_entity.annotations,
)
annos["primary_key"] = table_manifest["Component"][0] + "_id"
annos = self.syn.set_annotations(annos)
table_entity.etag = annos.etag
table_entity.annotations = annos
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this file 2 sets of Annotations from SYNPY are being used:

from synapseclient.models.annotations import Annotations # This is the new set of Annotations that has Async support
from synapseclient import Annotations as OldAnnotations # This is the old set of Annotations that works with the current Entity objects

Since all of the Entities returned from syn.get() still use OldAnnotations I am using it in this context.

Comment on lines +1581 to +1607
try:
# Rename the file to file_name_new then revert
# This is to maintain the original file name in-case other code is
# expecting that the file exists with the original name
original_file_path = metadataManifestPath
new_file_path = os.path.join(
os.path.dirname(metadataManifestPath), file_name_new
)
os.rename(original_file_path, new_file_path)

manifest_synapse_file = self._store_file_for_manifest_upload(
new_file_path=new_file_path,
dataset_id=datasetId,
existing_file_name=file_name_full,
file_name_new=file_name_new,
restrict_manifest=restrict_manifest,
)
manifest_synapse_file_id = manifest_synapse_file.id

finally:
# Revert the file name back to the original
os.rename(new_file_path, original_file_path)

if manifest_synapse_file:
manifest_synapse_file.path = original_file_path

return manifest_synapse_file_id
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this prevents the need to call changeFileMetaData as the file will be uploaded with the expected name. That function was/is super slow so this is a big gain.

Comment on lines 1797 to 1807
annos["annotations"]["annotations"][anno_k] = ""
elif (
isinstance(anno_v, str)
and re.fullmatch(csv_list_regex, anno_v)
and rule_in_rule_list(
"list", dmge.get_node_validation_rules(anno_k)
)
):
annos[anno_k] = anno_v.split(",")
annos["annotations"]["annotations"][anno_k] = anno_v.split(",")
else:
annos[anno_k] = anno_v
annos["annotations"]["annotations"][anno_k] = anno_v
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled in some changes from: #1485 so I could make sure everything was working. I will handle any merge conflicts if that code goes first.

Comment on lines -2806 to +3075
config = self.synStore.syn.getConfigFile(CONFIG.synapse_configuration_path)
config = get_config_file(CONFIG.synapse_configuration_path)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to the non-deprecated function (Same behavior)

Comment on lines -90 to -116
def trace_function_params():
"""capture all the parameters of API requests"""

def decorator(func):
"""create a decorator"""

@wraps(func)
def wrapper(*args, **kwargs):
"""create a wrapper function. Any number of positional arguments and keyword arguments can be passed here."""
tracer = trace.get_tracer(__name__)
# Start a new span with the function's name
with tracer.start_as_current_span(func.__name__) as span:
# Set values of parameters as tags
for i, arg in enumerate(args):
span.set_attribute(f"arg{i}", arg)

for name, value in kwargs.items():
span.set_attribute(name, value)
# Call the actual function
result = func(*args, **kwargs)
return result

return wrapper

return decorator


Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer needed with the request hooks that I added for the FlaskInstrumentor

@@ -182,7 +103,7 @@ def convert_df_to_csv(self, df, file_name):
"""

# convert dataframe to a temporary csv file
temp_dir = tempfile.gettempdir()
temp_dir = create_temp_folder(path=tempfile.gettempdir())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was failing with high concurrency as tempfile.gettempdir() only returned back /tmp, and for tests where the name of the uploaded file was the same was causing errors. By doing this we are guaranteed to get back a unique temporary directory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this impacts deployment, not sure if the /tmp directory is mounted in any way (probably not)

#----------------------------------------------
# load cached venv if cache exists
#----------------------------------------------
- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v4
with:
path: .venv
key: venv-${{ runner.os }}-${{ hashFiles('**/poetry.lock') }}-${{ matrix.python-version }}


Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled the changes from #1507 - As my build started to fail. I will pull in develop when this is merged in.

@BryanFauble BryanFauble marked this pull request as ready for review September 20, 2024 20:27
if project_headers:
return project_headers

all_results = syn.restGET(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: There is a syn._GET_Paginated if that makes it easier

Copy link
Member

@thomasyu888 thomasyu888 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a quick initial scan but I'll leave it to @BWMac / @jaymedina / @linglp / @andrewelamb / @GiaJordan for further comments before I go deeper into it.

I found your self review very helpful!

Copy link

sonarcloud bot commented Sep 21, 2024

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants