Skip to content

Commit

Permalink
24477 Add auth logic for creating and affiliating entities to tombsto…
Browse files Browse the repository at this point in the history
…ne pipeline (bcgov#3101)
  • Loading branch information
argush3 authored Dec 2, 2024
1 parent 7bff285 commit 18539c7
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 12 deletions.
204 changes: 204 additions & 0 deletions data-tool/flows/common/auth_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
import json
from http import HTTPStatus
from typing import Dict

import requests


class AuthService:
"""Wrapper to call Authentication Services.
"""

BEARER: str = 'Bearer '
CONTENT_TYPE_JSON = {'Content-Type': 'application/json'}

@classmethod
def get_time_out(cls, config):
try:
timeout = int(config.ACCOUNT_SVC_TIMEOUT, 20)
except Exception:
timeout = 20

@classmethod
def get_bearer_token(cls, config):
"""Get a valid Bearer token for the service to use."""
token_url = config.ACCOUNT_SVC_AUTH_URL
client_id = config.ACCOUNT_SVC_CLIENT_ID
client_secret = config.ACCOUNT_SVC_CLIENT_SECRET

data = 'grant_type=client_credentials'

# get service account token
res = requests.post(url=token_url,
data=data,
headers={'content-type': 'application/x-www-form-urlencoded'},
auth=(client_id, client_secret),
timeout=cls.get_time_out(config))

try:
return res.json().get('access_token')
except Exception:
return None


# pylint: disable=too-many-arguments, too-many-locals disable=invalid-name;
@classmethod
def create_affiliation(cls,
config,
account: int,
business_registration: str,
business_name: str = None,
corp_type_code: str = 'TMP',
corp_sub_type_code: str = None,
pass_code: str = '',
details: dict = None):
"""Affiliate a business to an account."""
auth_url = config.AUTH_SVC_URL
account_svc_entity_url = f'{auth_url}/entities'
account_svc_affiliate_url = f'{auth_url}/orgs/{account}/affiliations'

token = cls.get_bearer_token(config)

if not token:
return HTTPStatus.UNAUTHORIZED

# Create an entity record
entity_data = {
'businessIdentifier': business_registration,
'corpTypeCode': corp_type_code,
'name': business_name or business_registration
}
if corp_sub_type_code:
entity_data['corpSubTypeCode'] = corp_sub_type_code

entity_record = requests.post(
url=account_svc_entity_url,
headers={**cls.CONTENT_TYPE_JSON,
'Authorization': cls.BEARER + token},
data=json.dumps(entity_data),
timeout=cls.get_time_out(config)
)

# Create an account:business affiliation
affiliate_data = {
'businessIdentifier': business_registration,
'passCode': pass_code
}
if details:
affiliate_data['entityDetails'] = details
affiliate = requests.post(
url=account_svc_affiliate_url,
headers={**cls.CONTENT_TYPE_JSON,
'Authorization': cls.BEARER + token},
data=json.dumps(affiliate_data),
timeout=cls.get_time_out(config)
)

# @TODO delete affiliation and entity record next sprint when affiliation service is updated
if affiliate.status_code != HTTPStatus.CREATED or entity_record.status_code != HTTPStatus.CREATED:
return HTTPStatus.BAD_REQUEST
return HTTPStatus.OK

@classmethod
def create_entity(cls,
config,
business_registration: str,
business_name: str,
corp_type_code: str):
"""Update an entity."""
auth_url = config.AUTH_SVC_URL
account_svc_entity_url = f'{auth_url}/entities'

token = cls.get_bearer_token(config)

if not token:
return HTTPStatus.UNAUTHORIZED

# Create an entity record
entity_data = {
'businessIdentifier': business_registration,
'corpTypeCode': corp_type_code,
'name': business_name
}

entity_record = requests.post(
url=account_svc_entity_url,
headers={**cls.CONTENT_TYPE_JSON,
'Authorization': cls.BEARER + token},
data=json.dumps(entity_data),
timeout=cls.get_time_out(config)
)

if entity_record.status_code != HTTPStatus.OK:
return HTTPStatus.BAD_REQUEST
return HTTPStatus.OK


@classmethod
def update_entity(cls,
config,
business_registration: str,
business_name: str,
corp_type_code: str,
state: str = None):
"""Update an entity."""
auth_url = config.AUTH_SVC_URL
account_svc_entity_url = f'{auth_url}/entities'

token = cls.get_bearer_token(config)

if not token:
return HTTPStatus.UNAUTHORIZED

# Create an entity record
entity_data = {
'businessIdentifier': business_registration,
'corpTypeCode': corp_type_code,
'name': business_name
}
if state:
entity_data['state'] = state

entity_record = requests.patch(
url=account_svc_entity_url + '/' + business_registration,
headers={**cls.CONTENT_TYPE_JSON,
'Authorization': cls.BEARER + token},
data=json.dumps(entity_data),
timeout=cls.get_time_out(config)
)

if entity_record.status_code != HTTPStatus.OK:
return HTTPStatus.BAD_REQUEST
return HTTPStatus.OK

@classmethod
def delete_affiliation(cls, config, account: int, business_registration: str) -> Dict:
"""Affiliate a business to an account.
@TODO Update this when account affiliation is changed next sprint.
"""
auth_url = config.AUTH_SVC_URL
account_svc_entity_url = f'{auth_url}/entities'
account_svc_affiliate_url = f'{auth_url}/orgs/{account}/affiliations'

token = cls.get_bearer_token(config)

# Delete an account:business affiliation
affiliate = requests.delete(
url=account_svc_affiliate_url + '/' + business_registration,
headers={**cls.CONTENT_TYPE_JSON,
'Authorization': cls.BEARER + token},
timeout=cls.get_time_out(config)
)
# Delete an entity record
entity_record = requests.delete(
url=account_svc_entity_url + '/' + business_registration,
headers={**cls.CONTENT_TYPE_JSON,
'Authorization': cls.BEARER + token},
timeout=cls.get_time_out(config)
)

if affiliate.status_code != HTTPStatus.OK \
or entity_record.status_code not in (HTTPStatus.OK, HTTPStatus.NO_CONTENT):
return HTTPStatus.BAD_REQUEST
return HTTPStatus.OK
51 changes: 39 additions & 12 deletions data-tool/flows/corps_tombstone_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from common.init_utils import colin_init, get_config, lear_init
from common.query_utils import convert_result_set_to_dict
from common.auth_service import AuthService
from prefect import flow, task
from prefect.futures import wait
from sqlalchemy import Connection, text
Expand Down Expand Up @@ -137,7 +138,7 @@ def load_corp_snapshot(conn: Connection, tombstone_data: dict) -> int:
mailing_address_id = address_id
else:
delivery_address_id = address_id

party['parties']['mailing_address_id'] = mailing_address_id
party['parties']['delivery_address_id'] = delivery_address_id

Expand All @@ -147,19 +148,19 @@ def load_corp_snapshot(conn: Connection, tombstone_data: dict) -> int:
party_role['business_id'] = business_id
party_role['party_id'] = party_id
load_data(conn, 'party_roles', party_role)

for share_class in tombstone_data['share_classes']:
share_class['share_classes']['business_id'] = business_id
share_class_id = load_data(conn, 'share_classes', share_class['share_classes'])

for share_series in share_class['share_series']:
share_series['share_class_id'] = share_class_id
load_data(conn, 'share_series', share_series)

for alias in tombstone_data['aliases']:
alias['business_id'] = business_id
load_data(conn, 'aliases', alias)

for resolution in tombstone_data['resolutions']:
resolution['business_id'] = business_id
load_data(conn, 'resolutions', resolution)
Expand Down Expand Up @@ -196,6 +197,31 @@ def load_placeholder_filings(conn: Connection, tombstone_data: dict, business_id
update_data(conn, 'businesses', update_business_data, business_id)


@task(name='3.3-Update-Auth-Task')
def update_auth(conn: Connection, config, corp_num: str, tombstone_data: dict):
"""Create auth entity and affiliate as required."""
# TODO affiliation to an account does not need to happen. only entity creation in auth is req'd.
# used for testing purposes to see how things look in entity dashboard - remove when done testing
if config.AFFILIATE_ENTITY:
business_data = tombstone_data['businesses']
account_id = config.AFFILIATE_ENTITY_ACCOUNT_ID
AuthService.create_affiliation(
config=config,
account=account_id,
business_registration=business_data['identifier'],
business_name=business_data['legal_name'],
corp_type_code=business_data['legal_type']
)
if config.UPDATE_ENTITY:
business_data = tombstone_data['businesses']
AuthService.create_entity(
config=config,
business_registration=business_data['identifier'],
business_name=business_data['legal_name'],
corp_type_code=business_data['legal_type']
)


@task(name='1-Migrate-Corp-Users-Task')
def migrate_corp_users(colin_engine: Engine, lear_engine: Engine, corp_nums: list) -> dict:
try:
Expand All @@ -216,20 +242,20 @@ def get_tombstone_data(config, colin_engine: Engine, corp_num: str) -> tuple[str
"""Get tombstone data - corp snapshot and placeholder filings."""
try:
# TODO: get filings data
print(f'👷 Start collecting corp snapshot and filings data for {corp_num}...')
print(f'👷 Start collecting corp snapshot and filings data for {corp_num}...')
raw_data = get_snapshot_filings_data(config, colin_engine, corp_num)
# print(f'raw data: {raw_data}')
clean_data = clean_snapshot_filings_data(raw_data)
# print(f'clean data: {clean_data}')
print(f'👷 Complete collecting corp snapshot and filings data for {corp_num}!')
print(f'👷 Complete collecting corp snapshot and filings data for {corp_num}!')
return corp_num, clean_data
except Exception as e:
print(f'❌ Error collecting corp snapshot and filings data for {corp_num}: {repr(e)}')
return corp_num, None


@task(name='3-Corp-Tombstone-Migrate-Task-Aync')
def migrate_tombstone(lear_engine: Engine, corp_num: str, clean_data: dict, users_mapper: dict) -> str:
@task(name='3-Corp-Tombstone-Migrate-Task-Async')
def migrate_tombstone(config, lear_engine: Engine, corp_num: str, clean_data: dict, users_mapper: dict) -> str:
"""Migrate tombstone data - corp snapshot and placeholder filings."""
# TODO: update corp_processing status (succeeded & failed)
# TODO: determine the time to update some business values based off filing info
Expand All @@ -239,6 +265,7 @@ def migrate_tombstone(lear_engine: Engine, corp_num: str, clean_data: dict, user
try:
business_id = load_corp_snapshot(lear_conn, clean_data)
load_placeholder_filings(lear_conn, clean_data, business_id, users_mapper)
update_auth(lear_conn, config, corp_num, clean_data)
transaction.commit()
except Exception as e:
transaction.rollback()
Expand Down Expand Up @@ -271,7 +298,7 @@ def tombstone_flow():
batches = min(math.ceil(total/batch_size), config.TOMBSTONE_BATCHES)

print(f'👷 Going to migrate {total} corps with batch size of {batch_size}')

cnt = 0
migrated_cnt = 0
while cnt < batches:
Expand All @@ -285,7 +312,7 @@ def tombstone_flow():
if users_mapper is None:
print(f'❗ Skip populating user info for corps in this round due to user migration error.')
users_mapper = {}

data_futures = []
for corp_num in corp_nums:
data_futures.append(
Expand All @@ -298,7 +325,7 @@ def tombstone_flow():
corp_num, clean_data = f.result()
if clean_data:
corp_futures.append(
migrate_tombstone.submit(lear_engine, corp_num, clean_data, users_mapper)
migrate_tombstone.submit(config, lear_engine, corp_num, clean_data, users_mapper)
)
else:
skipped += 1
Expand All @@ -319,4 +346,4 @@ def tombstone_flow():


if __name__ == "__main__":
tombstone_flow()
tombstone_flow()
2 changes: 2 additions & 0 deletions data-tool/flows/tombstone/tombstone_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ def get_unprocessed_corps_query(flow_name, environment, batch_size):
-- and c.corp_num = 'BC0043406' -- lots of directors
-- and c.corp_num in ('BC0326163', 'BC0395512', 'BC0883637') -- TODO: re-migrate issue (can be solved by adding tracking)
-- and c.corp_num = 'BC0870626' -- lots of filings - IA, CoDs, ARs
-- and c.corp_num = 'BC0004969' -- lots of filings - IA, ARs, transition, alteration, COD, COA
-- and c.corp_num = 'BC0002567' -- lots of filings - IA, ARs, transition, COD
-- and c.corp_num in ('BC0068889', 'BC0441359') -- test users mapping
-- and c.corp_num in ('BC0326163', 'BC0046540', 'BC0883637', 'BC0043406', 'BC0068889', 'BC0441359')
and c.corp_type_cd in ('BC', 'C', 'ULC', 'CUL', 'CC', 'CCC', 'QA', 'QB', 'QC', 'QD', 'QE') -- TODO: update transfer script
Expand Down

0 comments on commit 18539c7

Please sign in to comment.