diff --git a/index.js b/index.js index 6d7afa7..863cb9e 100644 --- a/index.js +++ b/index.js @@ -1,10 +1,11 @@ const { AsyncLocalStorage } = require("node:async_hooks"); -const PgBoss = require("pg-boss"); const util = require("util"); const AppConfig = require("./src/app_config"); const ReportProcessingContext = require("./src/report_processing_context"); const Logger = require("./src/logger"); const Processor = require("./src/processor"); +const Queue = require("./src/queue/queue"); +const ReportJobQueueMessage = require("./src/queue/report_job_queue_message"); /** * Gets an array of JSON report objects from the application confing, then runs @@ -80,9 +81,9 @@ async function _processReport(appConfig, context, reportConfig, processor) { } /** - * Gets an array of JSON report objects from the application confing, then runs - * a sequential chain of actions on each report object in the array. Some of the - * actions performed are optional based on the options passed to this function. + * Gets an array of agencies from the deploy/agencies.json file. Publishes a + * report job message to the queue for each report and agency based on the + * startup options. * * @param {object} options an object with options to be used when processing * all reports. @@ -110,49 +111,44 @@ async function runQueuePublish(options = {}) { const agencies = _initAgencies(options.agenciesFile); const appConfig = new AppConfig(options); const reportConfigs = appConfig.filteredReportConfigurations; - const appLogger = Logger.initialize(appConfig); - const queueClient = await _initQueueClient(appConfig, appLogger); - const queue = "analytics-reporter-job-queue"; + const appLogger = Logger.initialize({ + agencyName: appConfig.agencyLogName, + scriptName: appConfig.scriptName, + }); + const queue = await _initReportJobQueue(appConfig, appLogger); for (const agency of agencies) { + process.env.AGENCY_NAME = agency.agencyName || ""; for (const reportConfig of reportConfigs) { - process.env.AGENCY_NAME = agency.agencyName; - const reportLogger = Logger.initialize(appConfig, reportConfig); + const reportLogger = Logger.initialize({ + agencyName: appConfig.agencyLogName, + reportName: reportConfig.name, + scriptName: appConfig.scriptName, + }); try { - let jobId = await queueClient.send( - queue, - _createQueueMessage( - options, - agency, + let jobId = await queue.sendMessage( + new ReportJobQueueMessage({ + ...agency, + reportOptions: options, reportConfig, - appConfig.scriptName, - ), - { - priority: _messagePriority(reportConfig), - singletonKey: `${appConfig.scriptName}-${agency.agencyName}-${reportConfig.name}`, - }, + scriptName: appConfig.scriptName, + }), ); if (jobId) { reportLogger.info( - `Created job in queue: ${queue} with job ID: ${jobId}`, + `Created job in queue: ${queue.name} with job ID: ${jobId}`, ); } else { - reportLogger.info(`Found a duplicate job in queue: ${queue}`); + reportLogger.info(`Found a duplicate job in queue: ${queue.name}`); } } catch (e) { - reportLogger.error(`Error sending to queue: ${queue}`); + reportLogger.error(`Error sending to queue: ${queue.name}`); reportLogger.error(util.inspect(e)); } } } - try { - await queueClient.stop(); - appLogger.debug(`Stopping queue client`); - } catch (e) { - appLogger.error("Error stopping queue client"); - appLogger.error(util.inspect(e)); - } + _stopQueue(queue, appLogger); } function _initAgencies(agencies_file) { @@ -172,76 +168,67 @@ function _initAgencies(agencies_file) { return Array.isArray(agencies) ? agencies : legacyAgencies; } -async function _initQueueClient(appConfig, logger) { - let queueClient; +async function _initReportJobQueue(appConfig, logger) { + const queue = new Queue({ + connectionString: appConfig.messageQueueDatabaseConnection, + queueName: appConfig.messageQueueName, + messageClass: ReportJobQueueMessage, + }); try { - queueClient = new PgBoss(appConfig.messageQueueDatabaseConnection); - await queueClient.start(); - logger.debug("Starting queue client"); + await queue.start(); + logger.debug(`Starting ${queue.name} queue client`); } catch (e) { - logger.error("Error starting queue client"); + logger.error(`Error starting ${queue.name} queue client`); logger.error(util.inspect(e)); } - - return queueClient; -} - -function _createQueueMessage(options, agency, reportConfig, scriptName) { - return { - ...agency, - options, - reportConfig, - scriptName, - }; + return queue; } -function _messagePriority(reportConfig) { - if (!reportConfig.frequency) { - return 0; - } else if (reportConfig.frequency == "daily") { - return 1; - } else if (reportConfig.frequency == "hourly") { - return 2; - } else if (reportConfig.frequency == "realtime") { - return 3; +async function _stopQueue(queue, logger) { + try { + await queue.stop(); + logger.debug(`Stopping ${queue.name} queue client`); + } catch (e) { + logger.error(`Error stopping ${queue.name} queue client`); + logger.error(util.inspect(e)); } } /** - * @returns {Promise} when the process ends + * Begins a queue consumer process which receives job messages from the + * analytics report job queue and processes the report for the received job. */ async function runQueueConsume() { const appConfig = new AppConfig(); const appLogger = Logger.initialize(); - const queueClient = await _initQueueClient(appConfig, appLogger); - const queue = "analytics-reporter-job-queue"; + const queue = await _initReportJobQueue(appConfig, appLogger); try { const context = new ReportProcessingContext(new AsyncLocalStorage()); const processor = Processor.buildAnalyticsProcessor(appConfig, appLogger); - await queueClient.work( - queue, - { newJobCheckIntervalSeconds: 1 }, - async (message) => { - appLogger.info("Queue message received"); - process.env.AGENCY_NAME = message.data.agencyName; - process.env.ANALYTICS_REPORT_IDS = message.data.analyticsReportIds; - process.env.AWS_BUCKET_PATH = message.data.awsBucketPath; - process.env.ANALYTICS_SCRIPT_NAME = message.data.scriptName; - - await _processReport( - new AppConfig(message.data.options), - context, - message.data.reportConfig, - processor, - ); - }, - ); + await queue.poll(async (message) => { + appLogger.info("Queue message received"); + _setProcessEnvForMessage(message); + + await _processReport( + new AppConfig(message.options), + context, + message.reportConfig, + processor, + ); + }); } catch (e) { appLogger.error("Error polling queue for messages"); appLogger.error(util.inspect(e)); } } +function _setProcessEnvForMessage(message) { + process.env.AGENCY_NAME = message.data.agencyName || ""; + process.env.ANALYTICS_REPORT_IDS = message.data.analyticsReportIds || ""; + process.env.AWS_BUCKET_PATH = message.data.awsBucketPath || ""; + process.env.ANALYTICS_SCRIPT_NAME = message.data.scriptName || ""; +} + module.exports = { run, runQueuePublish, runQueueConsume }; diff --git a/src/app_config.js b/src/app_config.js index de80bde..73a837c 100644 --- a/src/app_config.js +++ b/src/app_config.js @@ -93,7 +93,11 @@ class AppConfig { } get agency() { - return process.env.AGENCY_NAME || "gov-wide"; + return process.env.AGENCY_NAME; + } + + get agencyLogName() { + return this.agency || "gov-wide"; } get scriptName() { diff --git a/src/logger.js b/src/logger.js index 01126b1..47c282d 100644 --- a/src/logger.js +++ b/src/logger.js @@ -1,27 +1,24 @@ const winston = require("winston"); /** - * @param {object} params the parameters for the method - * @param {string} params.agencyName the name of the agency for this logger - * instance. - * @param {string} params.reportName the name of the report being run for this - * logger instance. - * @param {string} params.scriptName the name of the script being run for this - * logger instance. + * @param {object} params the parameter object + * @param {string} params.agencyName the agency name to use for log tagging. + * @param {string} params.reportName the report name to use for log tagging. + * @param {string} params.scriptName the script name to use for log tagging. * @returns {string} a standard tag for the logger to identify the specific - * report being processed when writing logs. + * agency, report, and script being processed when writing logs. */ const tag = ({ agencyName, reportName, scriptName }) => { let tagString = ""; if (scriptName) { - tagString = tagString + `${scriptName} - `; + tagString = tagString + `${scriptName}`; } if (reportName) { - tagString = tagString + `${reportName} - `; + tagString = tagString + `${tagString ? " - " : ""}${reportName}`; } if (agencyName) { - tagString = tagString + `${agencyName}`; + tagString = tagString + `${tagString ? " - " : ""}${agencyName}`; } return tagString; @@ -44,21 +41,19 @@ const baseLogger = winston.createLogger({ /** * Creates an application logger instance. * - * @param {import('../app_config')} appConfig application config instance. Sets the log level and - * is also referenced to create a leading log tag for this logger instance. - * @param {object} reportConfig config for the report being run for this - * logger instance. Used to create a leading log tag for messages - * @param {string} reportConfig.name the name of the report being run for this - * logger instance. Used to create a leading log tag for messages + * @param {object} params the parameter object + * @param {string} params.agencyName the agency name to use for log tagging. + * @param {string} params.reportName the report name to use for log tagging. + * @param {string} params.scriptName the script name to use for log tagging. * @returns {import('winston').Logger} the configured logger instance */ -const initialize = (appConfig = {}, reportConfig = {}) => { +const initialize = ({ + agencyName = "", + reportName = "", + scriptName = "", +} = {}) => { return baseLogger.child({ - label: tag({ - agencyName: appConfig.agency, - reportName: reportConfig.name, - scriptName: appConfig.scriptName, - }), + label: tag({ agencyName, reportName, scriptName }), }); }; diff --git a/src/queue/queue.js b/src/queue/queue.js new file mode 100644 index 0000000..433aff3 --- /dev/null +++ b/src/queue/queue.js @@ -0,0 +1,98 @@ +const PgBoss = require("pg-boss"); + +/** + * Implements a message queue using the PgBoss library. + */ +class Queue { + #queueClient; + #queueName; + #messageClass; + + /** + * @param {object} params the parameter object + * @param {import('pg-boss')} params.queueClient the queue client instance to + * use for queue operations. + * @param {string} params.queueName the identifier for the queue. + * @param {*} params.messageClass a class which implements the fromMessage + * static method to return an instance of the class from a PgBoss message + * object. + */ + constructor({ queueClient, queueName, messageClass }) { + this.#queueClient = queueClient; + this.#queueName = queueName; + this.#messageClass = messageClass; + } + + /** + * @returns {string} the queue name + */ + get name() { + return this.#queueName; + } + + /** + * @returns {Promise} resolves when the PgBoss queue client has started + */ + start() { + return this.#queueClient.start(); + } + + /** + * @returns {Promise} resolves when the PgBoss queue client has stopped + */ + stop() { + return this.#queueClient.stop(); + } + + /** + * @param {import('./queue_message')} queueMessage a QueueMessage instance + * @returns {string} a message ID or null if a duplicate message exists on the + * queue. + */ + async sendMessage(queueMessage) { + const result = await this.#queueClient.send( + this.#queueName, + queueMessage.toJSON(), + queueMessage.sendOptions(), + ); + return result; + } + + /** + * @param {Function} callback the function to call for each message + * @param {object} options the options to pass to the PgBoss work function + * @returns {Promise} resolves when the queue poller process stops + */ + async poll(callback, options = { newJobCheckIntervalSeconds: 1 }) { + const result = await this.#queueClient.work( + this.#queueName, + options, + async (message) => { + await callback(this.#messageClass.fromMessage(message)); + }, + ); + return result; + } + + /** + * @param {object} params the parameter object + * @param {string} params.connectionString a Postgres database connection + * string. + * @param {string} params.queueName the name of the queue to use for the + * client. + * @param {*} params.messageClass a class which implements the fromMessage + * static method to return an instance of the class from a PgBoss message + * object. + * @returns {Queue} the queue instance configured with the PgBoss queue + * client. + */ + static buildQueue({ connectionString, queueName, messageClass }) { + return new Queue({ + queueClient: new PgBoss(connectionString), + queueName, + messageClass, + }); + } +} + +module.exports = Queue; diff --git a/src/queue/queue_message.js b/src/queue/queue_message.js new file mode 100644 index 0000000..430d853 --- /dev/null +++ b/src/queue/queue_message.js @@ -0,0 +1,28 @@ +/** + * Abstract class for a queue message to be sent to a PgBoss queue client. + */ +class QueueMessage { + /** + * @returns {object} the class converted to a JSON object. + */ + toJSON() { + return {}; + } + + /** + * @returns {object} an options object for the PgBoss send method + */ + options() { + return {}; + } + + /** + * @param {object} message a PgBoss message object from the report job queue. + * @returns {QueueMessage} the built queue message instance. + */ + static fromMessage(message) { + return new QueueMessage(message.data); + } +} + +module.exports = QueueMessage; diff --git a/src/queue/report_job_queue_message.js b/src/queue/report_job_queue_message.js new file mode 100644 index 0000000..5602d3b --- /dev/null +++ b/src/queue/report_job_queue_message.js @@ -0,0 +1,99 @@ +const QueueMessage = require("./queue_message"); + +/** + * Data object for a report job queue message to be sent to a PgBoss queue + * client. + */ +class ReportJobQueueMessage extends QueueMessage { + #agencyName; + #analyticsReportIds; + #awsBucketPath; + #reportOptions; + #reportConfig; + #scriptName; + + /** + * @param {object} params the params object. + * @param {string} params.agencyName the name of the agency. + * @param {string} params.analyticsReportIds the google analytics property ids + * for the agency to use when running reports. + * @param {string} params.awsBucketPath the folder in the S3 bucket where + * report data is stored for the agency. + * @param {object} params.reportOptions the options passed to the reporter + * executable. + * @param {object} params.reportConfig the google analytics configuration + * object for the report to run. + * @param {string} params.scriptName the name of the script which was run to + * begin the reporter process. + * @returns {ReportJobQueueMessage} the built queue message instance. + */ + constructor({ + agencyName = "", + analyticsReportIds = "", + awsBucketPath = "", + reportOptions = {}, + reportConfig = {}, + scriptName = "", + }) { + super(); + this.#agencyName = agencyName; + this.#analyticsReportIds = analyticsReportIds; + this.#awsBucketPath = awsBucketPath; + this.#reportOptions = reportOptions; + this.#reportConfig = reportConfig; + this.#scriptName = scriptName; + } + + /** + * @returns {object} the class converted to a JSON object. + */ + toJSON() { + return { + agencyName: this.#agencyName, + analyticsReportIds: this.#analyticsReportIds, + awsBucketPath: this.#awsBucketPath, + options: this.#reportOptions, + reportConfig: this.#reportConfig, + scriptName: this.#scriptName, + }; + } + + /** + * @returns {object} an options object for the PgBoss send method + */ + sendOptions() { + return { + priority: this.#messagePriority(this.#reportConfig.frequency), + singletonKey: `${this.#scriptName}-${this.#agencyName}-${this.#reportConfig.name}`, + }; + } + + #messagePriority(reportFrequency) { + if (!reportFrequency) { + return 0; + } else if (reportFrequency == "daily") { + return 1; + } else if (reportFrequency == "hourly") { + return 2; + } else if (reportFrequency == "realtime") { + return 3; + } + } + + /** + * @param {object} message a PgBoss message object from the report job queue. + * @returns {ReportJobQueueMessage} the built queue message instance. + */ + static fromMessage(message) { + return new ReportJobQueueMessage({ + agencyName: message.data.agencyName, + analyticsReportIds: message.data.analyticsReportIds, + awsBucketPath: message.data.awsBucketPath, + reportOptions: message.data.options, + reportConfig: message.data.reportConfig, + scriptName: message.data.scriptName, + }); + } +} + +module.exports = ReportJobQueueMessage; diff --git a/test/app_config.test.js b/test/app_config.test.js index f9b243b..cc93cf7 100644 --- a/test/app_config.test.js +++ b/test/app_config.test.js @@ -250,7 +250,35 @@ describe("AppConfig", () => { }); it("returns the default: gov-wide", () => { - expect(subject.agency).to.equal("gov-wide"); + expect(subject.agency).to.equal(undefined); + }); + }); + }); + + describe("agencyLogName", () => { + describe("when AGENCY_NAME is set", () => { + beforeEach(() => { + process.env.AGENCY_NAME = "HUD"; + subject = new AppConfig(); + }); + + afterEach(() => { + delete process.env.AGENCY_NAME; + }); + + it("returns the string set in the env var", () => { + expect(subject.agencyLogName).to.equal("HUD"); + }); + }); + + describe("when AGENCY_NAME is not set", () => { + beforeEach(() => { + delete process.env.AGENCY_NAME; + subject = new AppConfig(); + }); + + it("returns the default: gov-wide", () => { + expect(subject.agencyLogName).to.equal("gov-wide"); }); }); }); diff --git a/test/logger.test.js b/test/logger.test.js index 2d33203..1bf50c5 100644 --- a/test/logger.test.js +++ b/test/logger.test.js @@ -46,11 +46,9 @@ describe("logger", () => { describe(".initialize", () => { describe("when config is provided", () => { const logLevel = "warn"; - const config = { - scriptName: "foobar.sh", - agency: "gov-wide", - }; - const reportConfig = { name: "device" }; + const scriptName = "foobar.sh"; + const agencyName = "gov-wide"; + const reportName = "device"; beforeEach(() => { process.env.ANALYTICS_LOG_LEVEL = logLevel; @@ -63,12 +61,84 @@ describe("logger", () => { delete process.env.ANALYTICS_LOG_LEVEL; }); - it("creates a logger with log level set to the environment value", () => { - expect(subject.initialize(config, reportConfig)).to.eql({ - level: logLevel, - format: "printf", - label: "foobar.sh - device - gov-wide", - transports: [new WinstonConsoleMock({ level: logLevel })], + describe("and all config property are set", () => { + it("creates a logger with log level set to the environment value and tag with 3 identifiers", () => { + expect( + subject.initialize({ agencyName, reportName, scriptName }), + ).to.eql({ + level: logLevel, + format: "printf", + label: "foobar.sh - device - gov-wide", + transports: [new WinstonConsoleMock({ level: logLevel })], + }); + }); + }); + + describe("and some config properties are not set", () => { + describe("and only scriptName and agencyName are set", () => { + it("creates a logger with log level set to the environment value and tag with 2 identifiers", () => { + expect(subject.initialize({ agencyName, scriptName })).to.eql({ + level: logLevel, + format: "printf", + label: "foobar.sh - gov-wide", + transports: [new WinstonConsoleMock({ level: logLevel })], + }); + }); + }); + + describe("and only scriptName and reportName are set", () => { + it("creates a logger with log level set to the environment value and tag with 2 identifiers", () => { + expect(subject.initialize({ reportName, scriptName })).to.eql({ + level: logLevel, + format: "printf", + label: "foobar.sh - device", + transports: [new WinstonConsoleMock({ level: logLevel })], + }); + }); + }); + + describe("and only agencyName and reportName are set", () => { + it("creates a logger with log level set to the environment value and tag with 2 identifiers", () => { + expect(subject.initialize({ agencyName, reportName })).to.eql({ + level: logLevel, + format: "printf", + label: "device - gov-wide", + transports: [new WinstonConsoleMock({ level: logLevel })], + }); + }); + }); + + describe("and only scriptName is set", () => { + it("creates a logger with log level set to the environment value and tag with 1 identifier", () => { + expect(subject.initialize({ scriptName })).to.eql({ + level: logLevel, + format: "printf", + label: "foobar.sh", + transports: [new WinstonConsoleMock({ level: logLevel })], + }); + }); + }); + + describe("and only agencyName is set", () => { + it("creates a logger with log level set to the environment value and tag with 1 identifier", () => { + expect(subject.initialize({ agencyName })).to.eql({ + level: logLevel, + format: "printf", + label: "gov-wide", + transports: [new WinstonConsoleMock({ level: logLevel })], + }); + }); + }); + + describe("and only reportName is set", () => { + it("creates a logger with log level set to the environment value and tag with 1 identifier", () => { + expect(subject.initialize({ reportName })).to.eql({ + level: logLevel, + format: "printf", + label: "device", + transports: [new WinstonConsoleMock({ level: logLevel })], + }); + }); }); }); }); diff --git a/test/queue/queue.test.js b/test/queue/queue.test.js new file mode 100644 index 0000000..4cc2772 --- /dev/null +++ b/test/queue/queue.test.js @@ -0,0 +1,104 @@ +const expect = require("chai").expect; +const sinon = require("sinon"); +const Queue = require("../../src/queue/queue"); + +let messageJSON; +let messageOptions; + +class TestQueueMessage { + toJSON() { + return messageJSON; + } + + sendOptions() { + return messageOptions; + } + + static fromMessage(message) { + return message; + } +} + +describe("Queue", () => { + const queueName = "foobar-queue"; + let queueClient; + const messageClass = TestQueueMessage; + let subject; + + beforeEach(() => { + queueClient = { + start: sinon.spy(), + stop: sinon.spy(), + send: sinon.spy(), + work: sinon.spy(), + }; + subject = new Queue({ queueName, queueClient, messageClass }); + }); + + describe(".name", () => { + it("returns the queue name", () => { + expect(subject.name).to.equal(queueName); + }); + }); + + describe(".start", () => { + beforeEach(async () => { + await subject.start(); + }); + + it("starts the queue client", () => { + expect(queueClient.start.calledWith()).to.equal(true); + }); + }); + + describe(".stop", () => { + beforeEach(async () => { + await subject.stop(); + }); + + it("stops the queue client", () => { + expect(queueClient.stop.calledWith()).to.equal(true); + }); + }); + + describe(".sendMessage", () => { + let queueMessage; + + beforeEach(async () => { + messageJSON = { foo: "bar" }; + messageOptions = { test: 1 }; + queueMessage = new TestQueueMessage(); + await subject.sendMessage(queueMessage); + }); + + it("sends the message to the queue with expected JSON and options", () => { + expect( + queueClient.send.calledWith(queueName, messageJSON, messageOptions), + ).to.equal(true); + }); + }); + + describe(".poll", () => { + describe("when options are not passed", () => { + let callback = () => { + return ""; + }; + + beforeEach(async () => { + await subject.poll(callback); + }); + + it("polls the queue with expected options", () => { + expect(queueClient.work.getCalls()[0].args[0]).to.equal(queueName); + expect(queueClient.work.getCalls()[0].args[1]).to.deep.equal({ + newJobCheckIntervalSeconds: 1, + }); + expect(typeof queueClient.work.getCalls()[0].args[2]).to.equal( + "function", + ); + }); + }); + + describe("when options are passed", () => {}); + }); +});