Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/refactor queue logic #905

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 64 additions & 77 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
const { AsyncLocalStorage } = require("node:async_hooks");
const PgBoss = require("pg-boss");
const util = require("util");
const AppConfig = require("./src/app_config");
const ReportProcessingContext = require("./src/report_processing_context");
const Logger = require("./src/logger");
const Processor = require("./src/processor");
const Queue = require("./src/queue/queue");
const ReportJobQueueMessage = require("./src/queue/report_job_queue_message");

/**
* Gets an array of JSON report objects from the application confing, then runs
Expand Down Expand Up @@ -80,9 +81,9 @@ async function _processReport(appConfig, context, reportConfig, processor) {
}

/**
* Gets an array of JSON report objects from the application confing, then runs
* a sequential chain of actions on each report object in the array. Some of the
* actions performed are optional based on the options passed to this function.
* Gets an array of agencies from the deploy/agencies.json file. Publishes a
* report job message to the queue for each report and agency based on the
* startup options.
*
* @param {object} options an object with options to be used when processing
* all reports.
Expand Down Expand Up @@ -110,49 +111,44 @@ async function runQueuePublish(options = {}) {
const agencies = _initAgencies(options.agenciesFile);
const appConfig = new AppConfig(options);
const reportConfigs = appConfig.filteredReportConfigurations;
const appLogger = Logger.initialize(appConfig);
const queueClient = await _initQueueClient(appConfig, appLogger);
const queue = "analytics-reporter-job-queue";
const appLogger = Logger.initialize({
agencyName: appConfig.agencyLogName,
scriptName: appConfig.scriptName,
});
const queue = await _initReportJobQueue(appConfig, appLogger);

for (const agency of agencies) {
process.env.AGENCY_NAME = agency.agencyName || "";
for (const reportConfig of reportConfigs) {
process.env.AGENCY_NAME = agency.agencyName;
const reportLogger = Logger.initialize(appConfig, reportConfig);
const reportLogger = Logger.initialize({
agencyName: appConfig.agencyLogName,
reportName: reportConfig.name,
scriptName: appConfig.scriptName,
});
try {
let jobId = await queueClient.send(
queue,
_createQueueMessage(
options,
agency,
let jobId = await queue.sendMessage(
new ReportJobQueueMessage({
...agency,
reportOptions: options,
reportConfig,
appConfig.scriptName,
),
{
priority: _messagePriority(reportConfig),
singletonKey: `${appConfig.scriptName}-${agency.agencyName}-${reportConfig.name}`,
},
scriptName: appConfig.scriptName,
}),
);
if (jobId) {
reportLogger.info(
`Created job in queue: ${queue} with job ID: ${jobId}`,
`Created job in queue: ${queue.name} with job ID: ${jobId}`,
);
} else {
reportLogger.info(`Found a duplicate job in queue: ${queue}`);
reportLogger.info(`Found a duplicate job in queue: ${queue.name}`);
}
} catch (e) {
reportLogger.error(`Error sending to queue: ${queue}`);
reportLogger.error(`Error sending to queue: ${queue.name}`);
reportLogger.error(util.inspect(e));
}
}
}

try {
await queueClient.stop();
appLogger.debug(`Stopping queue client`);
} catch (e) {
appLogger.error("Error stopping queue client");
appLogger.error(util.inspect(e));
}
_stopQueue(queue, appLogger);
}

function _initAgencies(agencies_file) {
Expand All @@ -172,76 +168,67 @@ function _initAgencies(agencies_file) {
return Array.isArray(agencies) ? agencies : legacyAgencies;
}

async function _initQueueClient(appConfig, logger) {
let queueClient;
async function _initReportJobQueue(appConfig, logger) {
const queue = new Queue({
connectionString: appConfig.messageQueueDatabaseConnection,
queueName: appConfig.messageQueueName,
messageClass: ReportJobQueueMessage,
});
try {
queueClient = new PgBoss(appConfig.messageQueueDatabaseConnection);
await queueClient.start();
logger.debug("Starting queue client");
await queue.start();
logger.debug(`Starting ${queue.name} queue client`);
} catch (e) {
logger.error("Error starting queue client");
logger.error(`Error starting ${queue.name} queue client`);
logger.error(util.inspect(e));
}

return queueClient;
}

function _createQueueMessage(options, agency, reportConfig, scriptName) {
return {
...agency,
options,
reportConfig,
scriptName,
};
return queue;
}

function _messagePriority(reportConfig) {
if (!reportConfig.frequency) {
return 0;
} else if (reportConfig.frequency == "daily") {
return 1;
} else if (reportConfig.frequency == "hourly") {
return 2;
} else if (reportConfig.frequency == "realtime") {
return 3;
async function _stopQueue(queue, logger) {
try {
await queue.stop();
logger.debug(`Stopping ${queue.name} queue client`);
} catch (e) {
logger.error(`Error stopping ${queue.name} queue client`);
logger.error(util.inspect(e));
}
}

/**
* @returns {Promise} when the process ends
* Begins a queue consumer process which receives job messages from the
* analytics report job queue and processes the report for the received job.
*/
async function runQueueConsume() {
const appConfig = new AppConfig();
const appLogger = Logger.initialize();
const queueClient = await _initQueueClient(appConfig, appLogger);
const queue = "analytics-reporter-job-queue";
const queue = await _initReportJobQueue(appConfig, appLogger);

try {
const context = new ReportProcessingContext(new AsyncLocalStorage());
const processor = Processor.buildAnalyticsProcessor(appConfig, appLogger);

await queueClient.work(
queue,
{ newJobCheckIntervalSeconds: 1 },
async (message) => {
appLogger.info("Queue message received");
process.env.AGENCY_NAME = message.data.agencyName;
process.env.ANALYTICS_REPORT_IDS = message.data.analyticsReportIds;
process.env.AWS_BUCKET_PATH = message.data.awsBucketPath;
process.env.ANALYTICS_SCRIPT_NAME = message.data.scriptName;

await _processReport(
new AppConfig(message.data.options),
context,
message.data.reportConfig,
processor,
);
},
);
await queue.poll(async (message) => {
appLogger.info("Queue message received");
_setProcessEnvForMessage(message);

await _processReport(
new AppConfig(message.options),
context,
message.reportConfig,
processor,
);
});
} catch (e) {
appLogger.error("Error polling queue for messages");
appLogger.error(util.inspect(e));
}
}

function _setProcessEnvForMessage(message) {
process.env.AGENCY_NAME = message.data.agencyName || "";
process.env.ANALYTICS_REPORT_IDS = message.data.analyticsReportIds || "";
process.env.AWS_BUCKET_PATH = message.data.awsBucketPath || "";
process.env.ANALYTICS_SCRIPT_NAME = message.data.scriptName || "";
}

module.exports = { run, runQueuePublish, runQueueConsume };
6 changes: 5 additions & 1 deletion src/app_config.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ class AppConfig {
}

get agency() {
return process.env.AGENCY_NAME || "gov-wide";
return process.env.AGENCY_NAME;
}

get agencyLogName() {
return this.agency || "gov-wide";
}

get scriptName() {
Expand Down
41 changes: 18 additions & 23 deletions src/logger.js
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
const winston = require("winston");

/**
* @param {object} params the parameters for the method
* @param {string} params.agencyName the name of the agency for this logger
* instance.
* @param {string} params.reportName the name of the report being run for this
* logger instance.
* @param {string} params.scriptName the name of the script being run for this
* logger instance.
* @param {object} params the parameter object
* @param {string} params.agencyName the agency name to use for log tagging.
* @param {string} params.reportName the report name to use for log tagging.
* @param {string} params.scriptName the script name to use for log tagging.
* @returns {string} a standard tag for the logger to identify the specific
* report being processed when writing logs.
* agency, report, and script being processed when writing logs.
*/
const tag = ({ agencyName, reportName, scriptName }) => {
let tagString = "";

if (scriptName) {
tagString = tagString + `${scriptName} - `;
tagString = tagString + `${scriptName}`;
}
if (reportName) {
tagString = tagString + `${reportName} - `;
tagString = tagString + `${tagString ? " - " : ""}${reportName}`;
}
if (agencyName) {
tagString = tagString + `${agencyName}`;
tagString = tagString + `${tagString ? " - " : ""}${agencyName}`;
}

return tagString;
Expand All @@ -44,21 +41,19 @@ const baseLogger = winston.createLogger({
/**
* Creates an application logger instance.
*
* @param {import('../app_config')} appConfig application config instance. Sets the log level and
* is also referenced to create a leading log tag for this logger instance.
* @param {object} reportConfig config for the report being run for this
* logger instance. Used to create a leading log tag for messages
* @param {string} reportConfig.name the name of the report being run for this
* logger instance. Used to create a leading log tag for messages
* @param {object} params the parameter object
* @param {string} params.agencyName the agency name to use for log tagging.
* @param {string} params.reportName the report name to use for log tagging.
* @param {string} params.scriptName the script name to use for log tagging.
* @returns {import('winston').Logger} the configured logger instance
*/
const initialize = (appConfig = {}, reportConfig = {}) => {
const initialize = ({
agencyName = "",
reportName = "",
scriptName = "",
} = {}) => {
return baseLogger.child({
label: tag({
agencyName: appConfig.agency,
reportName: reportConfig.name,
scriptName: appConfig.scriptName,
}),
label: tag({ agencyName, reportName, scriptName }),
});
};

Expand Down
98 changes: 98 additions & 0 deletions src/queue/queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
const PgBoss = require("pg-boss");

/**
* Implements a message queue using the PgBoss library.
*/
class Queue {
#queueClient;
#queueName;
#messageClass;

/**
* @param {object} params the parameter object
* @param {import('pg-boss')} params.queueClient the queue client instance to
* use for queue operations.
* @param {string} params.queueName the identifier for the queue.
* @param {*} params.messageClass a class which implements the fromMessage
* static method to return an instance of the class from a PgBoss message
* object.
*/
constructor({ queueClient, queueName, messageClass }) {
this.#queueClient = queueClient;
this.#queueName = queueName;
this.#messageClass = messageClass;
}

/**
* @returns {string} the queue name
*/
get name() {
return this.#queueName;
}

/**
* @returns {Promise} resolves when the PgBoss queue client has started
*/
start() {
return this.#queueClient.start();
}

/**
* @returns {Promise} resolves when the PgBoss queue client has stopped
*/
stop() {
return this.#queueClient.stop();
}

/**
* @param {import('./queue_message')} queueMessage a QueueMessage instance
* @returns {string} a message ID or null if a duplicate message exists on the
* queue.
*/
async sendMessage(queueMessage) {
const result = await this.#queueClient.send(
this.#queueName,
queueMessage.toJSON(),
queueMessage.sendOptions(),
);
return result;
}

/**
* @param {Function} callback the function to call for each message
* @param {object} options the options to pass to the PgBoss work function
* @returns {Promise} resolves when the queue poller process stops
*/
async poll(callback, options = { newJobCheckIntervalSeconds: 1 }) {
const result = await this.#queueClient.work(
this.#queueName,
options,
async (message) => {
await callback(this.#messageClass.fromMessage(message));
},
);
return result;
}

/**
* @param {object} params the parameter object
* @param {string} params.connectionString a Postgres database connection
* string.
* @param {string} params.queueName the name of the queue to use for the
* client.
* @param {*} params.messageClass a class which implements the fromMessage
* static method to return an instance of the class from a PgBoss message
* object.
* @returns {Queue} the queue instance configured with the PgBoss queue
* client.
*/
static buildQueue({ connectionString, queueName, messageClass }) {
return new Queue({
queueClient: new PgBoss(connectionString),
queueName,
messageClass,
});
}
}

module.exports = Queue;
Loading