-
Notifications
You must be signed in to change notification settings - Fork 340
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DRAFT] Sync AWS accounts in parallel #1138
Draft
achantavy
wants to merge
9
commits into
master
Choose a base branch
from
threading
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+174
−38
Draft
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
1112093
Initial commit for parallel account sync
achantavy 90c9d6b
Log the number of threads
achantavy 691a42b
added neo4j session factory along with threading
harshagw d89d0f4
performed linting
harshagw c2d6ed3
Use session factory for parallel by account, revert vpc region changes
achantavy ed4586b
Unit test factory
achantavy fcd072f
Linter
achantavy cc65b3f
fix import
achantavy d061afb
linter agian
achantavy File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
import logging | ||
|
||
import neo4j | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class Neo4jSessionFactory: | ||
_setup = False | ||
_driver = None | ||
_database = None | ||
|
||
def __init__(self): | ||
logger.info("neo4j_session_factory init") | ||
|
||
def initialize(self, neo4j_driver: neo4j.Driver, neo4j_database: str) -> None: | ||
if self._setup: | ||
logger.warning("Reinitializing the Neo4j session factory is not allowed; doing nothing.") | ||
return | ||
|
||
logger.info("Setting up the Neo4j session factory") | ||
|
||
self._setup = True | ||
self._driver = neo4j_driver | ||
self._database = neo4j_database | ||
|
||
def get_new_session(self) -> neo4j.Session: | ||
if not self._setup or not self._driver: | ||
raise RuntimeError( | ||
"Neo4j session factory is not initialized. " | ||
"Make sure that initialize() is called before get_new_session().", | ||
) | ||
|
||
new_session = self._driver.session(database=self._database) | ||
return new_session | ||
|
||
|
||
neo4j_session_factory = Neo4jSessionFactory() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
import unittest | ||
from unittest import mock | ||
|
||
import neo4j | ||
import pytest | ||
|
||
from cartography.neo4j_session_factory import Neo4jSessionFactory | ||
|
||
|
||
def test_initialize(): | ||
# Arrange | ||
neo4j_session_factory = Neo4jSessionFactory() | ||
neo4j_driver_mock = mock.Mock(spec=neo4j.Driver) | ||
|
||
# Act | ||
neo4j_session_factory.initialize(neo4j_driver_mock, "test_db") | ||
|
||
# Assert | ||
assert neo4j_session_factory._driver == neo4j_driver_mock | ||
assert neo4j_session_factory._database == "test_db" | ||
|
||
|
||
def test_get_new_session(): | ||
# Arrange | ||
neo4j_session_factory = Neo4jSessionFactory() | ||
neo4j_driver_mock = mock.Mock(spec=neo4j.Driver) | ||
neo4j_session_factory.initialize(neo4j_driver_mock, "test_db") | ||
neo4j_session_mock = mock.Mock() | ||
neo4j_driver_mock.session.return_value = neo4j_session_mock | ||
|
||
# Act | ||
new_session = neo4j_session_factory.get_new_session() | ||
|
||
# Assert | ||
assert new_session == neo4j_session_mock | ||
|
||
|
||
class TestNeo4jSessionFactory(unittest.TestCase): | ||
def setUp(self): | ||
self.driver_mock = mock.Mock(spec=neo4j.Driver) | ||
|
||
def test_reinitialize(self): | ||
# Arrange | ||
neo4j_session_factory = Neo4jSessionFactory() | ||
neo4j_session_factory.initialize(self.driver_mock, "test_db") | ||
|
||
# Act | ||
with self.assertLogs(level="WARNING") as log: | ||
neo4j_session_factory.initialize(self.driver_mock, "test_db") | ||
|
||
# Assert | ||
self.assertIn("Reinitializing the Neo4j session", log.output[0]) | ||
|
||
|
||
def test_neo4j_session_factory_get_new_session_not_initialized(): | ||
neo4j_session_factory = Neo4jSessionFactory() | ||
|
||
with pytest.raises(RuntimeError, match="Neo4j session factory is not initialized"): | ||
new_session = neo4j_session_factory.get_new_session() | ||
assert new_session is None |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this will wait for each execution in turn (consecutive execution) as the call to the
result()
method of the Future returned byexecutor.submit
will wait for the future to complete.To get concurrent execution I believe you would want to submit the sync functions in a loop as is done here, collect the returned Future objects in a list and then call
concurrent.futures.wait
to wait for all of the syncs to be complete (or iterate overconcurrent.futures.as_completed
if you want to report sync statuses as they complete).