Skip to content

Commit

Permalink
Merge pull request #1142 from bcgov/feature/ALCS-1239
Browse files Browse the repository at this point in the history
Application Parcels & Certificate of Title
  • Loading branch information
lstod authored Nov 8, 2023
2 parents 4ff5c72 + 3cd8f7d commit 02068f2
Show file tree
Hide file tree
Showing 15 changed files with 476 additions and 1 deletion.
8 changes: 8 additions & 0 deletions bin/migrate-oats-data/applications/migrate_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
clean_app_submissions,
process_application_statuses,
process_alcs_app_submissions,
clean_parcels,
process_application_parcels,
)
from .base_applications import process_applications, clean_applications
from .app_prep import process_alcs_application_prep_fields
Expand All @@ -15,14 +17,20 @@ def process_application_etl(batch_size):
process_alcs_application_prep_fields(batch_size)
process_alcs_app_submissions(batch_size)
process_application_statuses(batch_size)
process_application_parcels(batch_size)
process_application_submission_status_emails()


def clean_alcs_applications():
clean_application_submission_status_emails()
clean_parcels()
clean_app_submissions()
clean_applications()


def init_applications(batch_size):
process_applications(batch_size)


def clean_application_parcels():
clean_parcels()
2 changes: 2 additions & 0 deletions bin/migrate-oats-data/applications/submissions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
process_application_statuses,
clean_app_submissions,
)
from .parcels import clean_parcels
from .migrate_application_parcels import process_application_parcels
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .parcels import init_application_parcels, process_application_certificate_of_title


def process_application_parcels(batch_size):
init_application_parcels(batch_size)
process_application_certificate_of_title(batch_size)
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .application_certificate_of_title import process_application_certificate_of_title
from .application_parcel_insert import init_application_parcels, clean_parcels
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from common import (
setup_and_get_logger,
BATCH_UPLOAD_SIZE,
)
from db import inject_conn_pool
from psycopg2.extras import RealDictCursor, execute_batch

etl_name = "process_application_certificate_of_title"
logger = setup_and_get_logger(etl_name)


@inject_conn_pool
def process_application_certificate_of_title(conn=None, batch_size=BATCH_UPLOAD_SIZE):
logger.info(f"Start {etl_name}")
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
with open(
"applications/submissions/sql/parcels/application_certificate_of_title_count.sql",
"r",
encoding="utf-8",
) as sql_file:
count_query = sql_file.read()
cursor.execute(count_query)
count_total = dict(cursor.fetchone())["count"]
logger.info(f"Total Application data to update: {count_total}")

failed_inserts = 0
successful_updates_count = 0
last_document_id = 0

with open(
"applications/submissions/sql/parcels/application_certificate_of_title.sql",
"r",
encoding="utf-8",
) as sql_file:
application_sql = sql_file.read()
while True:
cursor.execute(
f"{application_sql} AND od.document_id > {last_document_id} ORDER BY od.document_id;"
)

rows = cursor.fetchmany(batch_size)

if not rows:
break
try:
records_to_be_updated_count = len(rows)

_update_records(conn, batch_size, cursor, rows)

successful_updates_count = (
successful_updates_count + records_to_be_updated_count
)
last_document_id = dict(rows[-1])["document_id"]

logger.debug(
f"retrieved/updated items count: {records_to_be_updated_count}; total successfully updated applications so far {successful_updates_count}; last updated alr_application_id: {last_document_id}"
)
except Exception as err:
# this is NOT going to be caused by actual data update failure. This code is only executed when the code error appears or connection to DB is lost
logger.exception()
conn.rollback()
failed_inserts = count_total - successful_updates_count
last_document_id = last_document_id + 1

logger.info(
f"Finished {etl_name}: total amount of successful updates {successful_updates_count}, total failed updates {failed_inserts}"
)


def _update_records(conn, batch_size, cursor, rows):
# parsed_data_list = _prepare_oats_data(rows)

if len(rows) > 0:
execute_batch(
cursor,
_update_query,
rows,
page_size=batch_size,
)

conn.commit()


_update_query = """
UPDATE alcs.application_parcel
SET certificate_of_title_uuid = %(document_uuid)s
WHERE oats_subject_property_id = %(subject_property_id)s
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
from common import (
setup_and_get_logger,
BATCH_UPLOAD_SIZE,
OATS_ETL_USER,
add_timezone_and_keep_date_part,
OatsToAlcsOwnershipType,
)
from db import inject_conn_pool
from psycopg2.extras import RealDictCursor

etl_name = "init_application_parcels"
logger = setup_and_get_logger(etl_name)


@inject_conn_pool
def init_application_parcels(conn=None, batch_size=BATCH_UPLOAD_SIZE):
logger.info(f"Start {etl_name}")

with conn.cursor(cursor_factory=RealDictCursor) as cursor:
with open(
"applications/submissions/sql/parcels/application_parcels_insert_count.sql",
"r",
encoding="utf-8",
) as sql_file:
count_query = sql_file.read()
cursor.execute(count_query)
count_total = dict(cursor.fetchone())["count"]
logger.info(f"Total Applications Parcels to insert: {count_total}")

failed_inserts = 0
successful_inserts_count = 0
last_subject_property_id = 0

with open(
"applications/submissions/sql/parcels/application_parcels_insert.sql",
"r",
encoding="utf-8",
) as sql_file:
application_sql = sql_file.read()
while True:
cursor.execute(
f"{application_sql} WHERE osp.subject_property_id > {last_subject_property_id} ORDER BY osp.subject_property_id;"
)

rows = cursor.fetchmany(batch_size)

if not rows:
break
try:
records_to_be_inserted_count = len(rows)

_insert_records(conn, cursor, rows)

successful_inserts_count = (
successful_inserts_count + records_to_be_inserted_count
)

last_subject_property_id = dict(rows[-1])["subject_property_id"]

logger.debug(
f"inserted items count: {records_to_be_inserted_count}; total successfully inserted application parcels so far {successful_inserts_count}; last inserted subject_property_id: {last_subject_property_id}"
)
except Exception as err:
logger.exception(err)
conn.rollback()
failed_inserts = count_total - successful_inserts_count
last_subject_property_id = last_subject_property_id + 1

logger.info(
f"Finished {etl_name}: total amount of successful inserts {successful_inserts_count}, total failed inserts {failed_inserts}"
)


def _insert_records(conn, cursor, rows):
number_of_rows_to_insert = len(rows)

if number_of_rows_to_insert > 0:
insert_query = _compile_application_insert_query(number_of_rows_to_insert)
rows_to_insert = _prepare_data_to_insert(rows)
cursor.execute(insert_query, rows_to_insert)
conn.commit()


def _prepare_data_to_insert(rows):
row_without_last_element = []
for row in rows:
mapped_row = _map_data(row)
row_without_last_element.append(tuple(mapped_row.values()))

return row_without_last_element


def _map_data(row):
return {
"application_submission_uuid": row["application_submission_uuid"],
"audit_created_by": OATS_ETL_USER,
"is_confirmed_by_applicant": True,
"alr_area": row["alr_area"],
"civic_address": row["civic_address"],
"is_farm": row["farm_land_ind"],
"legal_description": row["legal_description"],
"map_area_hectares": row["area_size"],
"ownership_type_code": _map_ownership_type_code(
row["property_owner_type_code"]
),
"pid": row["pid"],
"pin": row["pin"],
"purchased_date": _map_purchased_date(row["purchase_date"]),
"oats_subject_property_id": row["subject_property_id"],
}


def _map_ownership_type_code(ownership_type_code):
if ownership_type_code == OatsToAlcsOwnershipType.FEE.name:
return OatsToAlcsOwnershipType.FEE.value
if ownership_type_code == OatsToAlcsOwnershipType.CROWN.name:
return OatsToAlcsOwnershipType.CROWN.value
if ownership_type_code == OatsToAlcsOwnershipType.FIRST.name:
return OatsToAlcsOwnershipType.FIRST.value


def _map_purchased_date(purchased_date):
date = None
if purchased_date:
date = add_timezone_and_keep_date_part(purchased_date)
return date


def _compile_application_insert_query(number_of_rows_to_insert):
parcels_to_insert = ",".join(["%s"] * number_of_rows_to_insert)
return f"""
INSERT INTO alcs.application_parcel
(
application_submission_uuid,
audit_created_by,
is_confirmed_by_applicant,
alr_area,
civic_address,
is_farm,
legal_description,
map_area_hectares,
ownership_type_code,
pid,
pin,
purchased_date,
oats_subject_property_id
)
VALUES{parcels_to_insert}
ON CONFLICT DO NOTHING
"""


@inject_conn_pool
def clean_parcels(conn=None):
logger.info("Start application parcel cleaning")
with conn.cursor() as cursor:
cursor.execute(
"DELETE FROM alcs.application_parcel appp WHERE appp.audit_created_by = 'oats_etl'"
)
logger.info(f"Deleted items count = {cursor.rowcount}")
conn.commit()
logger.info("Done application parcel cleaning")
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
SELECT od.document_id,
od.document_code,
odc.description,
osp.subject_property_id,
oaa.alr_application_id,
od.who_created,
appd.uuid AS document_uuid
FROM oats.oats_documents od
JOIN oats.oats_subject_properties osp ON osp.subject_property_id = od.subject_property_id
JOIN oats.oats_alr_applications oaa ON oaa.alr_application_id = osp.alr_application_id
JOIN oats.oats_document_codes odc ON odc.document_code = od.document_code
JOIN alcs.application_document appd ON appd.oats_document_id::bigint = od.document_id
JOIN alcs.application_parcel appp ON appp.oats_subject_property_id = osp.subject_property_id
WHERE od.document_code = 'CT'
AND oaa.application_class_code IN ('LOA', 'BLK')
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
SELECT count(*)
FROM oats.oats_documents od
JOIN oats.oats_subject_properties osp ON osp.subject_property_id = od.subject_property_id
JOIN oats.oats_alr_applications oaa ON oaa.alr_application_id = osp.alr_application_id
JOIN oats.oats_document_codes odc ON odc.document_code = od.document_code
JOIN alcs.application_document appd ON appd.oats_document_id::bigint = od.document_id
JOIN alcs.application_parcel appp ON appp.oats_subject_property_id = osp.subject_property_id
WHERE od.document_code = 'CT'
AND oaa.application_class_code IN ('LOA', 'BLK');
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
WITH parcels_to_insert AS (
SELECT apps.uuid,
osp.subject_property_id
FROM alcs.application_submission apps
JOIN oats.oats_subject_properties osp ON osp.alr_application_id = apps.file_number::INTEGER
),
grouped_oats_property_interests_ids AS (
SELECT MIN(property_owner_type_code) AS property_owner_type_code,
-- min will not affect anything since all property_owner_type_code are the same in scope of subject_property
subject_property_id
FROM oats.oats_property_interests opi
GROUP BY opi.subject_property_id
)
SELECT uuid AS application_submission_uuid,
osp.alr_area,
op.civic_address,
osp.farm_land_ind,
op.legal_description,
op.area_size,
op.pid,
op.pin,
osp.purchase_date,
gopi.property_owner_type_code,
osp.subject_property_id
FROM parcels_to_insert pti
JOIN oats.oats_subject_properties osp ON osp.subject_property_id = pti.subject_property_id
JOIN oats.oats_properties op ON op.property_id = osp.property_id
LEFT JOIN grouped_oats_property_interests_ids gopi ON gopi.subject_property_id = pti.subject_property_id
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
WITH parcels_to_insert AS (
SELECT apps.uuid,
osp.subject_property_id
FROM alcs.application_submission apps
JOIN oats.oats_subject_properties osp ON osp.alr_application_id = apps.file_number::INTEGER
),
grouped_oats_property_interests_ids AS (
SELECT MIN(property_owner_type_code) AS property_owner_type_code,
subject_property_id
FROM oats.oats_property_interests opi
GROUP BY opi.subject_property_id
)
SELECT count(*)
FROM parcels_to_insert pti
JOIN oats.oats_subject_properties osp ON osp.subject_property_id = pti.subject_property_id
JOIN oats.oats_properties op ON op.property_id = osp.property_id
LEFT JOIN grouped_oats_property_interests_ids gopi ON gopi.subject_property_id = pti.subject_property_id;
Loading

0 comments on commit 02068f2

Please sign in to comment.