Skip to content

Commit

Permalink
HPC-9460: Allow for read-only replica DB connection
Browse files Browse the repository at this point in the history
Since Knex doesn't support read-only replicas,
we need to allow for a second connection to
the replica server. This assumes master and
replica DB are on separate servers, but if
connection to replica isn't passed, the models
behave just like before.

The goal is to use master DB for write operations
and for transactions and replica DB should be
only used for read-only operations. There are
lots of changes, but it's mostly passing around
of connections, just so that newly introduced
replica connection can be used in `find()`
method of raw model.
  • Loading branch information
Pl217 committed Jun 25, 2024
1 parent 7963527 commit fe49ad2
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 140 deletions.
218 changes: 115 additions & 103 deletions src/db/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,115 +110,127 @@ export const UTILS = {
Cond,
};

const initializeTables = (conn: Knex) => ({
attachment: attachment(conn),
attachmentPrototype: attachmentPrototype(conn),
attachmentVersion: attachmentVersion(conn),
authGrant: authGrant(conn),
authGrantee: authGrantee(conn),
authGrantLog: authGrantLog(conn),
authInvite: authInvite(conn),
authTarget: authTarget(conn),
authToken: authToken(conn),
blueprint: blueprint(conn),
budgetSegment: budgetSegment(conn),
budgetSegmentBreakdown: budgetSegmentBreakdown(conn),
budgetSegmentBreakdownEntity: budgetSegmentBreakdownEntity(conn),
cache: cache(conn),
category: category(conn),
categoryGroup: categoryGroup(conn),
categoryRef: categoryRef(conn),
client: client(conn),
conditionField: conditionField(conn),
conditionFieldReliesOn: conditionFieldReliesOn(conn),
conditionFieldType: conditionFieldType(conn),
currency: currency(conn),
disaggregationCategory: disaggregationCategory(conn),
disaggregationCategoryGroup: disaggregationCategoryGroup(conn),
disaggregationModel: disaggregationModel(conn),
emergency: emergency(conn),
emergencyLocation: emergencyLocation(conn),
endpointLog: endpointLog(conn),
endpointUsage: endpointUsage(conn),
entityPrototype: entityPrototype(conn),
expiredData: expiredData(conn),
externalData: externalData(conn),
externalReference: externalReference(conn),
fileAssetEntity: fileAssetEntity(conn),
fileRecord: fileRecord(conn),
flow: flow(conn),
flowLink: flowLink(conn),
flowObject: flowObject(conn),
flowObjectType: flowObjectType(conn),
form: form(conn),
globalCluster: globalCluster(conn),
globalClusterAssociation: globalClusterAssociation(conn),
globalIndicator: globalIndicator(conn),
governingEntity: governingEntity(conn),
governingEntityVersion: governingEntityVersion(conn),
highWater: highWater(conn),
iatiActivity: iatiActivity(conn),
iatiPublisher: iatiPublisher(conn),
iatiRecipientCountry: iatiRecipientCountry(conn),
iatiTransaction: iatiTransaction(conn),
job: job(conn),
jobAssociation: jobAssociation(conn),
legacy: legacy(conn),
location: location(conn),
lookup: lookup(conn),
measurement: measurement(conn),
measurementVersion: measurementVersion(conn),
operation: operation(conn),
operationCluster: operationCluster(conn),
organization: organization(conn),
organizationLocation: organizationLocation(conn),
participant: participant(conn),
participantCountry: participantCountry(conn),
participantOrganization: participantOrganization(conn),
plan: plan(conn),
planEmergency: planEmergency(conn),
planEntity: planEntity(conn),
planEntityVersion: planEntityVersion(conn),
planLocation: planLocation(conn),
planReportingPeriod: planReportingPeriod(conn),
planTag: planTag(conn),
planVersion: planVersion(conn),
planYear: planYear(conn),
procedureEntityPrototype: procedureEntityPrototype(conn),
procedureSection: procedureSection(conn),
procedureSectionField: procedureSectionField(conn),
project: project(conn),
projectContact: projectContact(conn),
projectGlobalClusters: projectGlobalClusters(conn),
projectLocations: projectLocations(conn),
projectVersion: projectVersion(conn),
projectVersionAttachment: projectVersionAttachment(conn),
projectVersionComment: projectVersionComment(conn),
projectVersionField: projectVersionField(conn),
projectVersionGoverningEntity: projectVersionGoverningEntity(conn),
projectVersionHistory: projectVersionHistory(conn),
projectVersionOrganization: projectVersionOrganization(conn),
projectVersionPlan: projectVersionPlan(conn),
projectVersionPlanEntity: projectVersionPlanEntity(conn),
reportDetail: reportDetail(conn),
reportFile: reportFile(conn),
reportingWindow: reportingWindow(conn),
reportingWindowAssignment: reportingWindowAssignment(conn),
tag: tag(conn),
task: task(conn),
unit: unit(conn),
unitType: unitType(conn),
usageYear: usageYear(conn),
workflowStatusOption: workflowStatusOption(conn),
workflowStatusOptionStep: workflowStatusOptionStep(conn),
const initializeTables = (masterConn: Knex, replicaConn?: Knex) => ({
attachment: attachment(masterConn, replicaConn),
attachmentPrototype: attachmentPrototype(masterConn, replicaConn),
attachmentVersion: attachmentVersion(masterConn, replicaConn),
authGrant: authGrant(masterConn, replicaConn),
authGrantee: authGrantee(masterConn, replicaConn),
authGrantLog: authGrantLog(masterConn, replicaConn),
authInvite: authInvite(masterConn, replicaConn),
authTarget: authTarget(masterConn, replicaConn),
authToken: authToken(masterConn, replicaConn),
blueprint: blueprint(masterConn, replicaConn),
budgetSegment: budgetSegment(masterConn, replicaConn),
budgetSegmentBreakdown: budgetSegmentBreakdown(masterConn, replicaConn),
budgetSegmentBreakdownEntity: budgetSegmentBreakdownEntity(
masterConn,
replicaConn
),
cache: cache(masterConn, replicaConn),
category: category(masterConn, replicaConn),
categoryGroup: categoryGroup(masterConn, replicaConn),
categoryRef: categoryRef(masterConn, replicaConn),
client: client(masterConn, replicaConn),
conditionField: conditionField(masterConn, replicaConn),
conditionFieldReliesOn: conditionFieldReliesOn(masterConn, replicaConn),
conditionFieldType: conditionFieldType(masterConn, replicaConn),
currency: currency(masterConn, replicaConn),
disaggregationCategory: disaggregationCategory(masterConn, replicaConn),
disaggregationCategoryGroup: disaggregationCategoryGroup(
masterConn,
replicaConn
),
disaggregationModel: disaggregationModel(masterConn, replicaConn),
emergency: emergency(masterConn, replicaConn),
emergencyLocation: emergencyLocation(masterConn, replicaConn),
endpointLog: endpointLog(masterConn, replicaConn),
endpointUsage: endpointUsage(masterConn, replicaConn),
entityPrototype: entityPrototype(masterConn, replicaConn),
expiredData: expiredData(masterConn, replicaConn),
externalData: externalData(masterConn, replicaConn),
externalReference: externalReference(masterConn, replicaConn),
fileAssetEntity: fileAssetEntity(masterConn, replicaConn),
fileRecord: fileRecord(masterConn, replicaConn),
flow: flow(masterConn, replicaConn),
flowLink: flowLink(masterConn, replicaConn),
flowObject: flowObject(masterConn, replicaConn),
flowObjectType: flowObjectType(masterConn, replicaConn),
form: form(masterConn, replicaConn),
globalCluster: globalCluster(masterConn, replicaConn),
globalClusterAssociation: globalClusterAssociation(masterConn, replicaConn),
globalIndicator: globalIndicator(masterConn, replicaConn),
governingEntity: governingEntity(masterConn, replicaConn),
governingEntityVersion: governingEntityVersion(masterConn, replicaConn),
highWater: highWater(masterConn, replicaConn),
iatiActivity: iatiActivity(masterConn, replicaConn),
iatiPublisher: iatiPublisher(masterConn, replicaConn),
iatiRecipientCountry: iatiRecipientCountry(masterConn, replicaConn),
iatiTransaction: iatiTransaction(masterConn, replicaConn),
job: job(masterConn, replicaConn),
jobAssociation: jobAssociation(masterConn, replicaConn),
legacy: legacy(masterConn, replicaConn),
location: location(masterConn, replicaConn),
lookup: lookup(masterConn, replicaConn),
measurement: measurement(masterConn, replicaConn),
measurementVersion: measurementVersion(masterConn, replicaConn),
operation: operation(masterConn, replicaConn),
operationCluster: operationCluster(masterConn, replicaConn),
organization: organization(masterConn, replicaConn),
organizationLocation: organizationLocation(masterConn, replicaConn),
participant: participant(masterConn, replicaConn),
participantCountry: participantCountry(masterConn, replicaConn),
participantOrganization: participantOrganization(masterConn, replicaConn),
plan: plan(masterConn, replicaConn),
planEmergency: planEmergency(masterConn, replicaConn),
planEntity: planEntity(masterConn, replicaConn),
planEntityVersion: planEntityVersion(masterConn, replicaConn),
planLocation: planLocation(masterConn, replicaConn),
planReportingPeriod: planReportingPeriod(masterConn, replicaConn),
planTag: planTag(masterConn, replicaConn),
planVersion: planVersion(masterConn, replicaConn),
planYear: planYear(masterConn, replicaConn),
procedureEntityPrototype: procedureEntityPrototype(masterConn, replicaConn),
procedureSection: procedureSection(masterConn, replicaConn),
procedureSectionField: procedureSectionField(masterConn, replicaConn),
project: project(masterConn, replicaConn),
projectContact: projectContact(masterConn, replicaConn),
projectGlobalClusters: projectGlobalClusters(masterConn, replicaConn),
projectLocations: projectLocations(masterConn, replicaConn),
projectVersion: projectVersion(masterConn, replicaConn),
projectVersionAttachment: projectVersionAttachment(masterConn, replicaConn),
projectVersionComment: projectVersionComment(masterConn, replicaConn),
projectVersionField: projectVersionField(masterConn, replicaConn),
projectVersionGoverningEntity: projectVersionGoverningEntity(
masterConn,
replicaConn
),
projectVersionHistory: projectVersionHistory(masterConn, replicaConn),
projectVersionOrganization: projectVersionOrganization(
masterConn,
replicaConn
),
projectVersionPlan: projectVersionPlan(masterConn, replicaConn),
projectVersionPlanEntity: projectVersionPlanEntity(masterConn, replicaConn),
reportDetail: reportDetail(masterConn, replicaConn),
reportFile: reportFile(masterConn, replicaConn),
reportingWindow: reportingWindow(masterConn, replicaConn),
reportingWindowAssignment: reportingWindowAssignment(masterConn, replicaConn),
tag: tag(masterConn, replicaConn),
task: task(masterConn, replicaConn),
unit: unit(masterConn, replicaConn),
unitType: unitType(masterConn, replicaConn),
usageYear: usageYear(masterConn, replicaConn),
workflowStatusOption: workflowStatusOption(masterConn, replicaConn),
workflowStatusOptionStep: workflowStatusOptionStep(masterConn, replicaConn),
});

export type Tables = ReturnType<typeof initializeTables>;

export type Table = Tables[keyof Tables];

const initializeRoot = (conn: Knex) => {
const _tables = initializeTables(conn);
const initializeRoot = (masterConn: Knex, replicaConn?: Knex) => {
const _tables = initializeTables(masterConn, replicaConn);
return {
...UTILS,
..._tables,
Expand Down
10 changes: 5 additions & 5 deletions src/db/models/authGrant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import { AUTH_GRANTEE_ID } from './authGrantee';
import { AUTH_TARGET_ID } from './authTarget';
import type { ParticipantId } from './participant';

const authGrantModel = (conn: Knex) => {
const authGrantLog = initAuthGrantLog(conn);
const authGrantModel = (masterConn: Knex, replicaConn?: Knex) => {
const authGrantLog = initAuthGrantLog(masterConn, replicaConn);

const model = defineSequelizeModel({
tableName: 'authGrant',
Expand All @@ -30,7 +30,7 @@ const authGrantModel = (conn: Knex) => {
},
},
softDeletionEnabled: false,
})(conn);
})(masterConn, replicaConn);

type UserData = UserDataOfModel<typeof model>;
type Instance = InstanceOfModel<typeof model>;
Expand Down Expand Up @@ -61,7 +61,7 @@ const authGrantModel = (conn: Knex) => {
return await model.create(data, { trx });
};

return trx ? createCallback(trx) : conn.transaction(createCallback);
return trx ? createCallback(trx) : masterConn.transaction(createCallback);
};

const update = (
Expand Down Expand Up @@ -104,7 +104,7 @@ const authGrantModel = (conn: Knex) => {
}
};

return trx ? updateCallback(trx) : conn.transaction(updateCallback);
return trx ? updateCallback(trx) : masterConn.transaction(updateCallback);
};

const createOrUpdate = async (
Expand Down
4 changes: 2 additions & 2 deletions src/db/models/authTarget.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const AUTH_TARGET_TYPE = {
governingEntity: null,
};

export default (conn: Knex) => {
export default (masterConn: Knex, replicaConn?: Knex) => {
const model = defineIDModel({
tableName: 'authTarget',
fields: {
Expand All @@ -47,7 +47,7 @@ export default (conn: Knex) => {
},
idField: 'id',
softDeletionEnabled: false,
})(conn);
})(masterConn, replicaConn);

return {
...model,
Expand Down
4 changes: 2 additions & 2 deletions src/db/models/reportingWindowAssignment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ export default defineVersionedModel({
},
},
},
prepare: async (data, conn) => {
prepare: async (data, masterConn, replicaConn) => {
let assigneeOperation: OperationId;
let assigneeId: number;
if (data.assignee.type === 'operation') {
assigneeOperation = data.assignee.operation;
assigneeId = data.assignee.operation;
} else if (data.assignee.type === 'operationCluster') {
const oc = operationCluster(conn);
const oc = operationCluster(masterConn, replicaConn);
const cluster = await oc.get(data.assignee.cluster);
if (!cluster) {
throw new Error(
Expand Down
6 changes: 3 additions & 3 deletions src/db/util/id-model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export type ModelWithIdInitializer<
| null
| keyof F['generated']
| keyof F['generatedCompositeKey'],
> = (conn: Knex) => ModelWithId<F, IDField>;
> = (masterConn: Knex, replicaConn?: Knex) => ModelWithId<F, IDField>;

const hasField = <F extends string>(
data: unknown,
Expand Down Expand Up @@ -90,7 +90,7 @@ export const defineIDModel =
FieldsWithSequelize<F, SoftDeletionEnabled>,
IDField
> =>
(conn) => {
(masterConn, replicaConn) => {
const { idField, tableName } = opts;
type Fields = FieldsWithSequelize<F, SoftDeletionEnabled>;
type ID = IdOf<Fields, IDField>;
Expand All @@ -102,7 +102,7 @@ export const defineIDModel =
hasField(data, idField)
? `${tableName} ${data[idField]}`
: `Unknown ${tableName}`,
})(conn);
})(masterConn, replicaConn);

const get = (id: ID): Promise<Instance | null> =>
model.findOne({
Expand Down
Loading

0 comments on commit fe49ad2

Please sign in to comment.