Skip to content

Commit

Permalink
Merge pull request #1752 from bcgov/feat/hamed-migrate-notional-trans…
Browse files Browse the repository at this point in the history
…fer-1555

Feat: Migrate Notional Transfer Records with Version Chaining - 1555
  • Loading branch information
hamed-valiollahi authored Jan 21, 2025
2 parents 2475b91 + 22e78b3 commit 323efdf
Show file tree
Hide file tree
Showing 4 changed files with 335 additions and 0 deletions.
Binary file modified etl/database/nifi-registry-primary.mv.db
Binary file not shown.
Binary file modified etl/nifi/conf/flow.json.gz
Binary file not shown.
Binary file modified etl/nifi/conf/flow.xml.gz
Binary file not shown.
335 changes: 335 additions & 0 deletions etl/nifi_scripts/notional_transfer.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,335 @@
/*
Migrate Notional Transfers from TFRS to LCFS
1. Finds all LCFS compliance reports having a TFRS legacy_id.
2. For each TFRS compliance report, determines its chain (root_report_id).
3. Retrieves schedule_a records for each version in the chain.
4. Computes a diff (CREATE / UPDATE) between consecutive versions.
5. Inserts rows in notional_transfer with a stable, random group_uuid (UUID) per schedule_a_record_id.
6. Versions these notional_transfer entries so that subsequent changes increment the version.
7. Logs the actions taken for easier traceability.
*/

import groovy.transform.Field
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.util.UUID

@Field
Map<Integer, String> recordUuidMap = [:]

@Field
String SELECT_LCFS_IMPORTED_REPORTS_QUERY = """
SELECT compliance_report_id, legacy_id
FROM compliance_report
WHERE legacy_id IS NOT NULL
"""

@Field
String SELECT_ROOT_REPORT_ID_QUERY = """
SELECT root_report_id
FROM compliance_report
WHERE id = ?
"""

@Field
String SELECT_REPORT_CHAIN_QUERY = """
SELECT
c.id AS tfrs_report_id,
c.traversal
FROM compliance_report c
WHERE c.root_report_id = ?
ORDER BY c.traversal, c.id
"""

@Field
String SELECT_SCHEDULE_A_RECORDS_QUERY = """
SELECT
sar.id AS schedule_a_record_id,
sar.quantity,
sar.trading_partner,
sar.postal_address,
sar.fuel_class_id,
sar.transfer_type_id
FROM compliance_report_schedule_a_record sar
JOIN compliance_report_schedule_a sa ON sa.id = sar.schedule_id
JOIN compliance_report c ON c.schedule_a_id = sa.id
WHERE c.id = ?
ORDER BY sar.id
"""

@Field
String SELECT_LCFS_COMPLIANCE_REPORT_BY_TFRSID_QUERY = """
SELECT compliance_report_id
FROM compliance_report
WHERE legacy_id = ?
"""

@Field
String SELECT_CURRENT_VERSION_QUERY = """
SELECT version
FROM notional_transfer
WHERE group_uuid = ?
ORDER BY version DESC
LIMIT 1
"""

@Field
String INSERT_NOTIONAL_TRANSFER_SQL = """
INSERT INTO notional_transfer (
compliance_report_id,
quantity,
legal_name,
address_for_service,
fuel_category_id,
received_or_transferred,
group_uuid,
version,
user_type,
action_type
) VALUES (
?,
?,
?,
?,
?,
?::receivedOrTransferredEnum,
?,
?,
'SUPPLIER',
?::actiontypeenum
)
"""

// =========================================
// NiFi Controller Services
// =========================================
def sourceDbcpService = context.controllerServiceLookup.getControllerService("3245b078-0192-1000-ffff-ffffba20c1eb")
def destinationDbcpService = context.controllerServiceLookup.getControllerService("3244bf63-0192-1000-ffff-ffffc8ec6d93")

// =========================================
// Helper Functions
// =========================================

/**
* Maps TFRS transfer_type_id => 'Received' or 'Transferred'
* TFRS: 1=Transferred, 2=Received
*/
def mapReceivedOrTransferred(Integer transferTypeId) {
if (transferTypeId == 1) {
return "Received"
}
return "Transferred"
}

/**
* Maps a TFRS fuel_class_id => LCFS fuel_category_id
*/
def mapFuelCategoryId(Integer fuelClassId) {
switch (fuelClassId) {
case 1: return 2
case 2: return 1
}
}

/**
* Checks if any relevant fields in a schedule_a record differ between old and new.
*/
def isRecordChanged(Map oldRow, Map newRow) {
if (oldRow == null || newRow == null) return true

if (oldRow.quantity?.compareTo(newRow.quantity) != 0) return true
if (oldRow.fuel_class_id != newRow.fuel_class_id) return true
if (oldRow.transfer_type_id != newRow.transfer_type_id) return true
if (oldRow.trading_partner != newRow.trading_partner) return true
if (oldRow.postal_address != newRow.postal_address) return true

return false
}

/**
* Inserts a new row in notional_transfer with action=CREATE/UPDATE
* We always do version = oldVersion + 1 or 0 if none yet.
*/
def insertVersionRow(Connection destConn, Integer lcfsCRid, Map rowData, String action) {
def recordId = rowData.schedule_a_record_id

// Retrieve or generate the stable random group uuid for this record
def groupUuid = recordUuidMap[recordId]
if (!groupUuid) {
groupUuid = UUID.randomUUID().toString()
recordUuidMap[recordId] = groupUuid
}

// Find current highest version in notional_transfer for that group_uuid
def currentVer = -1
PreparedStatement verStmt = destConn.prepareStatement(SELECT_CURRENT_VERSION_QUERY)
verStmt.setString(1, groupUuid)
ResultSet verRS = verStmt.executeQuery()
if (verRS.next()) {
currentVer = verRS.getInt("version")
}
verRS.close()
verStmt.close()

def nextVer = (currentVer < 0) ? 0 : currentVer + 1

// Map TFRS fields => LCFS fields
def recOrXfer = mapReceivedOrTransferred(rowData.transfer_type_id)
def fuelCatId = mapFuelCategoryId(rowData.fuel_class_id)
def quantity = rowData.quantity ?: 0
def tradePrt = rowData.trading_partner ?: ""
def postAddr = rowData.postal_address ?: ""

// Insert the new row
PreparedStatement insStmt = destConn.prepareStatement(INSERT_NOTIONAL_TRANSFER_SQL)
insStmt.setInt(1, lcfsCRid)
insStmt.setBigDecimal(2, quantity)
insStmt.setString(3, tradePrt)
insStmt.setString(4, postAddr)
insStmt.setInt(5, fuelCatId)
insStmt.setString(6, recOrXfer)
insStmt.setString(7, groupUuid)
insStmt.setInt(8, nextVer)
insStmt.setString(9, action)
insStmt.executeUpdate()
insStmt.close()

log.info(" -> notional_transfer row: recordId=${recordId}, action=${action}, groupUuid=${groupUuid}, version=${nextVer}")
}

// =========================================
// Main Execution
// =========================================

log.warn("**** BEGIN NOTIONAL TRANSFER MIGRATION ****")

Connection sourceConn = null
Connection destinationConn = null

try {
sourceConn = sourceDbcpService.getConnection()
destinationConn = destinationDbcpService.getConnection()

// 1) Find all LCFS compliance reports that have TFRS legacy_id
log.info("Retrieving LCFS compliance_report with legacy_id != null")
PreparedStatement lcfsStmt = destinationConn.prepareStatement(SELECT_LCFS_IMPORTED_REPORTS_QUERY)
ResultSet lcfsRS = lcfsStmt.executeQuery()

def tfrsIds = []
while (lcfsRS.next()) {
def tfrsId = lcfsRS.getInt("legacy_id")
tfrsIds << tfrsId
}
lcfsRS.close()
lcfsStmt.close()

// For each TFRS compliance_report ID, follow the chain approach
tfrsIds.each { tfrsId ->
log.info("Processing TFRS compliance_report.id = ${tfrsId}")

// 2) Find the root_report_id
PreparedStatement rootStmt = sourceConn.prepareStatement(SELECT_ROOT_REPORT_ID_QUERY)
rootStmt.setInt(1, tfrsId)
def rootRS = rootStmt.executeQuery()
def rootId = null
if (rootRS.next()) {
rootId = rootRS.getInt("root_report_id")
}
rootRS.close()
rootStmt.close()

if (!rootId) {
log.warn("No root_report_id found for TFRS #${tfrsId}; skipping.")
return
}

// 3) Gather the chain in ascending order
PreparedStatement chainStmt = sourceConn.prepareStatement(SELECT_REPORT_CHAIN_QUERY)
chainStmt.setInt(1, rootId)
def chainRS = chainStmt.executeQuery()

def chainIds = []
while (chainRS.next()) {
chainIds << chainRS.getInt("tfrs_report_id")
}
chainRS.close()
chainStmt.close()

if (chainIds.isEmpty()) {
log.warn("Chain empty for root=${rootId}? skipping.")
return
}

// Keep the old version's schedule_a data in memory so we can do diffs
Map<Integer,Map> previousRecords = [:]

chainIds.eachWithIndex { chainTfrsId, idx ->
log.info("TFRS #${chainTfrsId} (chain idx=${idx})")

// 4) Fetch current TFRS schedule_a records
Map<Integer,Map> currentRecords = [:]
PreparedStatement schedStmt = sourceConn.prepareStatement(SELECT_SCHEDULE_A_RECORDS_QUERY)
schedStmt.setInt(1, chainTfrsId)
ResultSet sarRS = schedStmt.executeQuery()
while (sarRS.next()) {
def recId = sarRS.getInt("schedule_a_record_id")
currentRecords[recId] = [
schedule_a_record_id: recId,
quantity : sarRS.getBigDecimal("quantity"),
trading_partner : sarRS.getString("trading_partner"),
postal_address : sarRS.getString("postal_address"),
fuel_class_id : sarRS.getInt("fuel_class_id"),
transfer_type_id : sarRS.getInt("transfer_type_id")
]
}
sarRS.close()
schedStmt.close()

// 5) Find the matching LCFS compliance_report
Integer lcfsCRid = null
PreparedStatement findCRstmt = destinationConn.prepareStatement(SELECT_LCFS_COMPLIANCE_REPORT_BY_TFRSID_QUERY)
findCRstmt.setInt(1, chainTfrsId)
ResultSet findCRrs = findCRstmt.executeQuery()
if (findCRrs.next()) {
lcfsCRid = findCRrs.getInt("compliance_report_id")
}
findCRrs.close()
findCRstmt.close()

if (!lcfsCRid) {
log.warn("TFRS #${chainTfrsId} not found in LCFS? Skipping diff, just storing previousRecords.")
previousRecords = currentRecords
return
}

// Compare old vs new

// A) For each record in currentRecords
currentRecords.each { recId, newData ->
if (!previousRecords.containsKey(recId)) {
// wasn't in old => CREATE
insertVersionRow(destinationConn, lcfsCRid, newData, "CREATE")
} else {
// existed => check if changed
def oldData = previousRecords[recId]
if (isRecordChanged(oldData, newData)) {
insertVersionRow(destinationConn, lcfsCRid, newData, "UPDATE")
}
}
}

// Update previousRecords for the next version
previousRecords = currentRecords
} // end chain loop
} // end each tfrsId

} catch (Exception e) {
log.error("Error running notional transfer migration", e)
} finally {
if (sourceConn != null) sourceConn.close()
if (destinationConn != null) destinationConn.close()
}

log.warn("**** DONE: NOTIONAL TRANSFER ****")

0 comments on commit 323efdf

Please sign in to comment.