diff --git a/controllers/mailing.js b/controllers/mailing.js index eda1ae8..ef311da 100644 --- a/controllers/mailing.js +++ b/controllers/mailing.js @@ -11,6 +11,10 @@ const dbConn = module.parent.parent.exports.dbConn; const ObjectId = require('mongodb').ObjectId; const { Worker } = require('worker_threads'); +const { createJob } = require("../jobs/bullConfig"); + +console.log("mailing " + typeof createJob) +console.log(createJob) const _mailingState = { cancelled: "cancelled", @@ -395,8 +399,18 @@ async function mailingUpdate( mailingId, newHistoryState, options ) { async function sendMailingToSubs ( mailingId, topicId, mailingSubject, mailingBody ) { // When completed, change state to "sent" - + // Start the worker. + workerData = { + topicId: topicId, + mailingBody: mailingBody, + mailingSubject: mailingSubject, + typeMailing: "msgUpdates", + sentTo: "allSubs", + dbConn: true //dbConn + }; + createJob("getSubscribers", workerData); + /* const worker = new Worker( './controllers/workerSendEmail.js', { workerData: { topicId: topicId, @@ -408,6 +422,11 @@ async function sendMailingToSubs ( mailingId, topicId, mailingSubject, mailingBo } }); + + + + + worker.on('message', function(msg){ if ( msg.completed ) { @@ -419,12 +438,14 @@ async function sendMailingToSubs ( mailingId, topicId, mailingSubject, mailingBo } console.log( msg.msg ); + }); worker.on('error', function(msg){ console.log( "Send to subs - Worker ERRROR: " + msg ); }); - + */ + mailingUpdate( mailingId, _mailingState.sent, { historyState: _mailingState.sending } ); } // Simple worker to send mailing diff --git a/controllers/mailing_view.js b/controllers/mailing_view.js index 992fe8c..abd3e04 100644 --- a/controllers/mailing_view.js +++ b/controllers/mailing_view.js @@ -10,6 +10,8 @@ const mustache = require('mustache'); const fsPromises = require('fs').promises; const mailing = require('./mailing'); +//const subscription = require('./subscriptions'); + const _mailingState = mailing.mailingState; const _baseRedirFolder = ( process.env.baseFolder || "" ) + "/api/v1/mailing/"; diff --git a/controllers/subscriptions.js b/controllers/subscriptions.js index d4c6eac..1225201 100644 --- a/controllers/subscriptions.js +++ b/controllers/subscriptions.js @@ -1103,7 +1103,8 @@ exports.simulateAddPost = async ( req, res, next ) => { /** * This is the future REST endpoint handler function for queuing a mailing with Notify */ -/* It is commented until the mailing.js module is updated to use Bull to make the API call. Related to APPS-53 work. +//It is commented until the mailing.js module is updated to use Bull to make the API call. Related to APPS-53 work. +/* exports.sendMailing = async ( req, res, next ) => { const email = req.body.email, templateId = req.body.templateId, @@ -1126,8 +1127,8 @@ exports.sendMailing = async ( req, res, next ) => { res.json( _successJSO ); -}*/ - +} +*/ /** * This is the function for queuing a subscriber confirmation email * send via notify. diff --git a/controllers/workerSendEmail.js b/controllers/workerSendEmail.js index 6e47233..766cada 100644 --- a/controllers/workerSendEmail.js +++ b/controllers/workerSendEmail.js @@ -108,6 +108,8 @@ async function init() { * Send the mailing * */ + console.log("worker _notifyEndPoint " + _notifyEndPoint) + console.log("notifyKey " + notifyKey); let notifyClient = new NotifyClient( _notifyEndPoint, notifyKey ); //console.log( "_notifyEndPoint: " + _notifyEndPoint ); diff --git a/helpers/sendEmail.js b/helpers/sendEmail.js new file mode 100644 index 0000000..753cdfc --- /dev/null +++ b/helpers/sendEmail.js @@ -0,0 +1,279 @@ +const chalk = require('chalk'); // To color message in console log + +const MongoClient = require('mongodb').MongoClient; +const NotifyClient = require('notifications-node-client').NotifyClient; + +const ObjectId = require('mongodb').ObjectId; +//const dbConn = module.parent.parent.exports.dbConn; + +const processEnv = process.env, + _notifyEndPoint = processEnv.notifyEndPoint || "https://api.notification.alpha.canada.ca", + _unsubBaseURL = process.env.removeURL || "https://apps.canada.ca/x-notify/subs/remove/", + _subsLinkSuffix = process.env.subsLinkSuffix || "853e0212b92a127" + + +let dbConn, notifyKey; + +MongoClient.connect( processEnv.MONGODB_URI || '', {useUnifiedTopology: true} ).then( ( mongoInstance ) => { + + dbConn = mongoInstance.db( processEnv.MONGODB_NAME || 'subs' ); + +}).catch( (e) => { console.log( "%s Worker MongoDB ERRROR: %s", chalk.red('✗'), e ) } ); + + +exports.getSubscribers = async (job, done, createJob) => { + console.log("job data -----> ") + console.log(job) + + // Ensure we have received all the data + if ( !job.mailingBody || !job.mailingSubject ) { + throw new Error( "Send Email: No email body" ); + } + + if ( !job.topicId ) { + throw new Error( "Send Email: No topicId selected" ); + } + + /* + * Get mailing notify information + * + */ + console.log("module.parent.parent.exports.dbConn " + module.parent.parent.exports ) + let topic = await dbConn.collection( "topics" ).findOne( + { _id: job.topicId }, + { projection: { + nTemplateMailingId: 1, + templateId: 1, + notifyKey: 1, + } + } ).catch( (e) => { + console.log( "sendEmail-getTopic" ); + console.log( e ); + throw new Error( "sendEmail: Can't find the topic: " + job.topicId ); + }); + + let templateId; + notifyKey = topic.notifyKey; + + + if ( !topic.nTemplateMailingId ) { + throw new Error( "Worker: There is no mailing template associated with : " + topicId ); + } + + console.log("topic.nTemplateMailingId " + topic.nTemplateMailingId); + + // Get the correct notify email template + if ( job.typeMailing === "msgUpdates" ) { + templateId = topic.nTemplateMailingId; + } else if ( job.typeMailing === "confirmSubs" ) { + templateId = topic.templateId; + } else { + throw new Error( "Worker: Invalid type mailing, was : " + job.typeMailing ); + } + + /* + * Get list of confirmed subscribers + * + */ + let listEmail = []; + + if ( Array.isArray( job.sentTo ) ) { + listEmail = sentTo; + } else if ( job.sentTo === "allSubs" ) { + listEmail = await getConfirmedSubscriberAsArray( job.topicId ); + } + + // No subscribers + if ( !listEmail.length ){ + console.log( "Worker: No subscriber" ); + + //parentPort.postMessage( { msg: "No subscriber" } ); + + } + let emailData = { + listEmail : listEmail, + notifyKey : notifyKey, + emailData : job, + + } + console.log("sendM+EMAil") + console.log(typeof createJob); + createJob("sendEmail", emailData); + } + +/* + * Utilities function + */ +getConfirmedSubscriberAsArray = async ( topicId ) => { + + // Get all the emails for the given topic + let docs = await dbConn.collection( "subsConfirmed" ).find( + { + topicId: topicId + }, + { + projection: { + email: 1, + subscode: 1 + } + } + ); + + let docsItems = await docs.toArray(); + + return docsItems; +}; + +exports.sendEmail = async (job, done) => { + //console.log("emailListv " + job) + console.log("sendEmail _notifyEndPoint " + _notifyEndPoint) + console.log("notifyKey " + job.notifyKey); + + + notifyClient = new NotifyClient( _notifyEndPoint, job.notifyKey ); + let listEmail = job.listEmail; + let i, i_len = listEmail.length, i_cache; + for( i = 0; i !== i_len; i++) { + i_cache = listEmail[ i ]; + + const { email, subscode } = i_cache; + + const userCodeUrl = ( subscode.id ? subscode.toHexString() : subscode ); + + //console.log( "Worker: Send for : " + email ); + + if ( !email ) { + continue; + } + + //parentPort.postMessage( { msg: "Send for : " + email } ); + + + //console.log( "templateId: " + templateId ); + //console.log( "email: " + email ); + //console.log( "subject: " + mailingSubject ); + //console.log( "body: " + mailingBody ); + //console.log( "unsub_link: " + _unsubBaseURL + userCodeUrl + "/" + _subsLinkSuffix ); + //console.log( "reference: " + "x-notify_" + typeMailing ); + /* + notifyClient.sendEmail( templateId, email, + { + personalisation: { + body: mailingBody, + subject: mailingSubject, + unsub_link: _unsubBaseURL + userCodeUrl + "/" + _subsLinkSuffix + }, + reference: "x-notify_" + typeMailing + }).catch( ( e ) => { + // Log the Notify errors + // console.log( "Error in Notify" ); + // console.log( e ); + + //parentPort.postMessage( { msg: "worker-Error in Notify" } ); + + const currDate = new Date(), + currDateTime = currDate.getTime(), + errDetails = e.error.errors[0], + statusCode = e.error.status_code, + msg = errDetails.message; + + + + if ( statusCode === 400 && msg.indexOf( "email_address" ) !== -1 ) { + + // + // We need to remove that user and log it + // + // Removal of bad email should be done after 25 min, same delay used to the not-before + // The following task need to be quoeud and delayed. It could be addressed at the same time of APPS-26 + //dbConn.collection( "subsUnconfirmed" ).findOneAndDelete( + // { + // email: email + // } + //) + //dbConn.collection( "subsExist" ).findOneAndDelete( + // { + // e: email + // } + //) + + + // Log + dbConn.collection( "notify_badEmail_logs" ).insertOne( + { + createdAt: currDate, + code: userCodeUrl, + email: email + } + ).catch( (e2) => { + console.log( "worker-sendNotifyConfirmEmail: notify_badEmail_logs: " + userCodeUrl ); + console.log( e2 ); + console.log( e ); + }); + + } else if ( statusCode === 429 ) { + + // + // This is a rate limit error, the system should notify us + // + dbConn.collection( "notify_tooManyReq_logs" ).insertOne( + { + createdAt: currDate, + email: email, + code: userCodeUrl, + templateId: templateId, + details: msg + } + ).catch( (e2) => { + console.log( "worker-sendNotifyConfirmEmail: notify_tooManyReq_logs: " + userCodeUrl ); + console.log( e2 ); + console.log( e ); + }); + + // + // Try to email us (only with the predefined interval) + // + if ( _notifyUsNotBeforeTimeLimit <= currDateTime ) { + + letUsKnow( "429 Too Many Request error", { + type: "ratelimit", + currTime: currDateTime, + lastTime: _notifyUsNotBeforeTimeLimit + }, + true ); + + // Readjust the limit for the next period + _notifyUsNotBeforeTimeLimit = currDateTime + _notifyUsTimeLimit; + + } + + } else { + + // + // Any other kind of error - https://docs.notifications.service.gov.uk/node.html#send-an-email-error-codes + // + // notify_logs entry - this can be async + dbConn.collection( "notify_logs" ).insertOne( + { + createdAt: currDate, + templateId: templateId, + e: errDetails.error, + msg: msg, + statusCode: statusCode, + err: e.toString(), + code: userCodeUrl + } + ).catch( (e2) => { + console.log( "worker-sendNotifyConfirmEmail: notify_logs: " + userCodeUrl ); + console.log( e2 ); + console.log( e ); + }); + + } + + console.log( "worker-sendNotifyConfirmEmail: sendEmail " + userCodeUrl ); + //mailingUpdate( mailingId, _mailingState.sent, { historyState: _mailingState.sending } ); + }); */ + } +} + diff --git a/jobs/bullConfig.js b/jobs/bullConfig.js new file mode 100644 index 0000000..694e33e --- /dev/null +++ b/jobs/bullConfig.js @@ -0,0 +1,90 @@ +//const Queue = require("bull"); +const { sendEmail, getSubscribers } = require("../helpers/sendEmail"); +//const { setQueues } = require('bull-board'); + + +const Queue = require('bull'); +const { createBullBoard } = require('@bull-board/api'); +const { BullAdapter } = require('@bull-board/api/bullAdapter'); +const { ExpressAdapter } = require('@bull-board/express'); + + +const redisUri = process.env.REDIS_URI || 'x-notify-redis'; +const redisPort = process.env.REDIS_PORT || '6379'; +const redisSentinel1Uri = process.env.REDIS_SENTINEL_1_URI || '127.0.0.1'; +const redisSentinel1Port = process.env.REDIS_SENTINEL_1_PORT || '26379'; +const redisSentinel2Uri = process.env.REDIS_SENTINEL_2_URI || '127.0.0.1'; +const redisSentinel2Port = process.env.REDIS_SENTINEL_2_PORT || '26379'; +const redisMasterName = process.env.REDIS_MASTER_NAME || 'x-notify-master'; + +var maxCompletedJobs = process.env.COMPLETED_JOBS_TO_KEEP || 300; + + + +let redisConf = {}; +if (process.env.NODE_ENV === 'prod') { + redisConf = { + redis: { + sentinels: [ + { host: redisSentinel1Uri, port: redisSentinel1Port }, + { host: redisSentinel2Uri, port: redisSentinel2Port } + ], + name: redisMasterName, + host: redisUri, + port: redisPort + } + } +} else { + redisConf = { + redis: { + host: redisUri, + port: redisPort, + } + } +} + + +const queue = new Queue("Jobs", redisConf); + +exports.createJob = async (options, data) => { + console.log("job creattion" + data) + queue.add(options, data, { + removeOnComplete: true, + removeOnFail: true, + }); +}; +//exports.createJob = createJob; + +queue.process("getSubscribers", (job, done) => { + console.log("subscribers retrieved -------- " + job.data); + console.log("bullConfig") + console.log(typeof createJob); + getSubscribers(job.data, done, createJob); +}); + +queue.process("sendEmail", (job, done) => { + console.log("send email triggered========" + job.data); + sendEmail(job.data, done); +}); + + + +const serverAdapter = new ExpressAdapter(); + +createBullBoard({ + queues: [ + new BullAdapter( queue ), + ], + serverAdapter +}) + +function getRouter( basePath ) { + serverAdapter.setBasePath( basePath ); + return serverAdapter.getRouter(); +} + +//setQueues([queue]); + + +//module.exports.createJob = createJob; +module.exports.UI = getRouter; diff --git a/notifyQueue.js b/notifyQueue.js index 329d386..3b22ff5 100644 --- a/notifyQueue.js +++ b/notifyQueue.js @@ -50,4 +50,29 @@ function getRouter( basePath ) { } +exports.sendMailing = async ( req, res, next ) => { + const email = req.body.email, + templateId = req.body.templateId, + personalisation = req.body.personalisation, + notifyKey = req.body.notifyKey; + + notifyQueue.add({ + email:email, + templateId:templateId, + personalisation:personalisation, + notifyKey:notifyKey + }, + { + priority:10 + }, + { + removeOnComplete: maxCompletedJobs + } + ); + + + res.json( _successJSO ); +} + + module.exports.UI = getRouter; diff --git a/package.json b/package.json index 69c06ce..ad1b5be 100644 --- a/package.json +++ b/package.json @@ -16,11 +16,11 @@ "start": "nodemon -L server.js", "prod": "node server.js" }, - "dependencies": { + "dependencies": { "aws-sdk": "^2.659.0", "@bull-board/api": "^3.10.2", - "@bull-board/express": "^3.10.2", - "bull": "^3.15.0", + "@bull-board/express": "^3.10.4", + "bullmq": "^1.80.4", "chalk": "^3.0.0", "compression": "^1.7.4", "cors": "^2.8.5", @@ -28,9 +28,7 @@ "dotenv": "^8.2.0", "entities": "^2.0.0", "errorhandler": "^1.5.1", - "express": "^4.17.1", - "express-session": "^1.17.0", - "express-status-monitor": "^1.2.8", + "express": "^4.17.3", "jsonwebtoken": "^8.5.1", "mongodb": "^3.5.5", "morgan": "^1.9.1", diff --git a/server.js b/server.js index e348fe9..e9f9b7f 100644 --- a/server.js +++ b/server.js @@ -35,6 +35,8 @@ dotenv.config({ }); const notifyQueue = require('./notifyQueue.js'); +const sendMailQueue = require("./jobs/bullConfig.js") + const _corsSettings = JSON.parse(processEnv.cors || '{"optionSucessStatus":200}'); // Parse CORS settings const _baseFolder = process.env.baseFolder || ""; @@ -261,9 +263,12 @@ MongoClient.connect( processEnv.MONGODB_URI || '', {useUnifiedTopology: true} ). * Bull routes */ app.use( "/admin/queues", - passport.authenticate( "basic", { session: false } ), + //passport.authenticate( "basic", { session: false } ), notifyQueue.UI( _baseFolder + "/admin/queues" ) ); - + + app.use( "/admin/sendQueues", + //passport.authenticate( "basic", { session: false } ), + sendMailQueue.UI( _baseFolder + "/admin/sendQueues" ) ); /** * JWT Authentication diff --git a/views/mailingEdit.html b/views/mailingEdit.html index f4175e8..555aff2 100644 --- a/views/mailingEdit.html +++ b/views/mailingEdit.html @@ -71,6 +71,8 @@

Email message

{{#body}} +
  • Submit the email to all subscribers
  • +

    Workflow