Skip to content

Commit

Permalink
WIP APPS-26 -- x-notify add bull queue and Jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
Shiva Kayathi committed May 6, 2022
1 parent 42d8eaa commit f360e6f
Show file tree
Hide file tree
Showing 10 changed files with 438 additions and 13 deletions.
25 changes: 23 additions & 2 deletions controllers/mailing.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ const dbConn = module.parent.parent.exports.dbConn;
const ObjectId = require('mongodb').ObjectId;

const { Worker } = require('worker_threads');
const { createJob } = require("../jobs/bullConfig");

console.log("mailing " + typeof createJob)
console.log(createJob)

const _mailingState = {
cancelled: "cancelled",
Expand Down Expand Up @@ -395,8 +399,18 @@ async function mailingUpdate( mailingId, newHistoryState, options ) {
async function sendMailingToSubs ( mailingId, topicId, mailingSubject, mailingBody ) {

// When completed, change state to "sent"

// Start the worker.
workerData = {
topicId: topicId,
mailingBody: mailingBody,
mailingSubject: mailingSubject,
typeMailing: "msgUpdates",
sentTo: "allSubs",
dbConn: true //dbConn
};
createJob("getSubscribers", workerData);
/*
const worker = new Worker( './controllers/workerSendEmail.js', {
workerData: {
topicId: topicId,
Expand All @@ -408,6 +422,11 @@ async function sendMailingToSubs ( mailingId, topicId, mailingSubject, mailingBo
}
});
worker.on('message', function(msg){
if ( msg.completed ) {
Expand All @@ -419,12 +438,14 @@ async function sendMailingToSubs ( mailingId, topicId, mailingSubject, mailingBo
}
console.log( msg.msg );
});
worker.on('error', function(msg){
console.log( "Send to subs - Worker ERRROR: " + msg );
});

*/
mailingUpdate( mailingId, _mailingState.sent, { historyState: _mailingState.sending } );
}

// Simple worker to send mailing
Expand Down
2 changes: 2 additions & 0 deletions controllers/mailing_view.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
const mustache = require('mustache');
const fsPromises = require('fs').promises;
const mailing = require('./mailing');
//const subscription = require('./subscriptions');

const _mailingState = mailing.mailingState;
const _baseRedirFolder = ( process.env.baseFolder || "" ) + "/api/v1/mailing/";

Expand Down
7 changes: 4 additions & 3 deletions controllers/subscriptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,8 @@ exports.simulateAddPost = async ( req, res, next ) => {
/**
* This is the future REST endpoint handler function for queuing a mailing with Notify
*/
/* It is commented until the mailing.js module is updated to use Bull to make the API call. Related to APPS-53 work.
//It is commented until the mailing.js module is updated to use Bull to make the API call. Related to APPS-53 work.
/*
exports.sendMailing = async ( req, res, next ) => {
const email = req.body.email,
templateId = req.body.templateId,
Expand All @@ -1126,8 +1127,8 @@ exports.sendMailing = async ( req, res, next ) => {
res.json( _successJSO );
}*/

}
*/
/**
* This is the function for queuing a subscriber confirmation email
* send via notify.
Expand Down
2 changes: 2 additions & 0 deletions controllers/workerSendEmail.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ async function init() {
* Send the mailing
*
*/
console.log("worker _notifyEndPoint " + _notifyEndPoint)
console.log("notifyKey " + notifyKey);
let notifyClient = new NotifyClient( _notifyEndPoint, notifyKey );

//console.log( "_notifyEndPoint: " + _notifyEndPoint );
Expand Down
279 changes: 279 additions & 0 deletions helpers/sendEmail.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
const chalk = require('chalk'); // To color message in console log

const MongoClient = require('mongodb').MongoClient;
const NotifyClient = require('notifications-node-client').NotifyClient;

const ObjectId = require('mongodb').ObjectId;
//const dbConn = module.parent.parent.exports.dbConn;

const processEnv = process.env,
_notifyEndPoint = processEnv.notifyEndPoint || "https://api.notification.alpha.canada.ca",
_unsubBaseURL = process.env.removeURL || "https://apps.canada.ca/x-notify/subs/remove/",
_subsLinkSuffix = process.env.subsLinkSuffix || "853e0212b92a127"


let dbConn, notifyKey;

MongoClient.connect( processEnv.MONGODB_URI || '', {useUnifiedTopology: true} ).then( ( mongoInstance ) => {

dbConn = mongoInstance.db( processEnv.MONGODB_NAME || 'subs' );

}).catch( (e) => { console.log( "%s Worker MongoDB ERRROR: %s", chalk.red('✗'), e ) } );


exports.getSubscribers = async (job, done, createJob) => {
console.log("job data -----> ")
console.log(job)

// Ensure we have received all the data
if ( !job.mailingBody || !job.mailingSubject ) {
throw new Error( "Send Email: No email body" );
}

if ( !job.topicId ) {
throw new Error( "Send Email: No topicId selected" );
}

/*
* Get mailing notify information
*
*/
console.log("module.parent.parent.exports.dbConn " + module.parent.parent.exports )
let topic = await dbConn.collection( "topics" ).findOne(
{ _id: job.topicId },
{ projection: {
nTemplateMailingId: 1,
templateId: 1,
notifyKey: 1,
}
} ).catch( (e) => {
console.log( "sendEmail-getTopic" );
console.log( e );
throw new Error( "sendEmail: Can't find the topic: " + job.topicId );
});

let templateId;
notifyKey = topic.notifyKey;


if ( !topic.nTemplateMailingId ) {
throw new Error( "Worker: There is no mailing template associated with : " + topicId );
}

console.log("topic.nTemplateMailingId " + topic.nTemplateMailingId);

// Get the correct notify email template
if ( job.typeMailing === "msgUpdates" ) {
templateId = topic.nTemplateMailingId;
} else if ( job.typeMailing === "confirmSubs" ) {
templateId = topic.templateId;
} else {
throw new Error( "Worker: Invalid type mailing, was : " + job.typeMailing );
}

/*
* Get list of confirmed subscribers
*
*/
let listEmail = [];

if ( Array.isArray( job.sentTo ) ) {
listEmail = sentTo;
} else if ( job.sentTo === "allSubs" ) {
listEmail = await getConfirmedSubscriberAsArray( job.topicId );
}

// No subscribers
if ( !listEmail.length ){
console.log( "Worker: No subscriber" );

//parentPort.postMessage( { msg: "No subscriber" } );

}
let emailData = {
listEmail : listEmail,
notifyKey : notifyKey,
emailData : job,

}
console.log("sendM+EMAil")
console.log(typeof createJob);
createJob("sendEmail", emailData);
}

/*
* Utilities function
*/
getConfirmedSubscriberAsArray = async ( topicId ) => {

// Get all the emails for the given topic
let docs = await dbConn.collection( "subsConfirmed" ).find(
{
topicId: topicId
},
{
projection: {
email: 1,
subscode: 1
}
}
);

let docsItems = await docs.toArray();

return docsItems;
};

exports.sendEmail = async (job, done) => {
//console.log("emailListv " + job)
console.log("sendEmail _notifyEndPoint " + _notifyEndPoint)
console.log("notifyKey " + job.notifyKey);


notifyClient = new NotifyClient( _notifyEndPoint, job.notifyKey );
let listEmail = job.listEmail;
let i, i_len = listEmail.length, i_cache;
for( i = 0; i !== i_len; i++) {
i_cache = listEmail[ i ];

const { email, subscode } = i_cache;

const userCodeUrl = ( subscode.id ? subscode.toHexString() : subscode );

//console.log( "Worker: Send for : " + email );

if ( !email ) {
continue;
}

//parentPort.postMessage( { msg: "Send for : " + email } );


//console.log( "templateId: " + templateId );
//console.log( "email: " + email );
//console.log( "subject: " + mailingSubject );
//console.log( "body: " + mailingBody );
//console.log( "unsub_link: " + _unsubBaseURL + userCodeUrl + "/" + _subsLinkSuffix );
//console.log( "reference: " + "x-notify_" + typeMailing );
/*
notifyClient.sendEmail( templateId, email,
{
personalisation: {
body: mailingBody,
subject: mailingSubject,
unsub_link: _unsubBaseURL + userCodeUrl + "/" + _subsLinkSuffix
},
reference: "x-notify_" + typeMailing
}).catch( ( e ) => {
// Log the Notify errors
// console.log( "Error in Notify" );
// console.log( e );
//parentPort.postMessage( { msg: "worker-Error in Notify" } );
const currDate = new Date(),
currDateTime = currDate.getTime(),
errDetails = e.error.errors[0],
statusCode = e.error.status_code,
msg = errDetails.message;
if ( statusCode === 400 && msg.indexOf( "email_address" ) !== -1 ) {
//
// We need to remove that user and log it
//
// Removal of bad email should be done after 25 min, same delay used to the not-before
// The following task need to be quoeud and delayed. It could be addressed at the same time of APPS-26
//dbConn.collection( "subsUnconfirmed" ).findOneAndDelete(
// {
// email: email
// }
//)
//dbConn.collection( "subsExist" ).findOneAndDelete(
// {
// e: email
// }
//)
// Log
dbConn.collection( "notify_badEmail_logs" ).insertOne(
{
createdAt: currDate,
code: userCodeUrl,
email: email
}
).catch( (e2) => {
console.log( "worker-sendNotifyConfirmEmail: notify_badEmail_logs: " + userCodeUrl );
console.log( e2 );
console.log( e );
});
} else if ( statusCode === 429 ) {
//
// This is a rate limit error, the system should notify us
//
dbConn.collection( "notify_tooManyReq_logs" ).insertOne(
{
createdAt: currDate,
email: email,
code: userCodeUrl,
templateId: templateId,
details: msg
}
).catch( (e2) => {
console.log( "worker-sendNotifyConfirmEmail: notify_tooManyReq_logs: " + userCodeUrl );
console.log( e2 );
console.log( e );
});
//
// Try to email us (only with the predefined interval)
//
if ( _notifyUsNotBeforeTimeLimit <= currDateTime ) {
letUsKnow( "429 Too Many Request error", {
type: "ratelimit",
currTime: currDateTime,
lastTime: _notifyUsNotBeforeTimeLimit
},
true );
// Readjust the limit for the next period
_notifyUsNotBeforeTimeLimit = currDateTime + _notifyUsTimeLimit;
}
} else {
//
// Any other kind of error - https://docs.notifications.service.gov.uk/node.html#send-an-email-error-codes
//
// notify_logs entry - this can be async
dbConn.collection( "notify_logs" ).insertOne(
{
createdAt: currDate,
templateId: templateId,
e: errDetails.error,
msg: msg,
statusCode: statusCode,
err: e.toString(),
code: userCodeUrl
}
).catch( (e2) => {
console.log( "worker-sendNotifyConfirmEmail: notify_logs: " + userCodeUrl );
console.log( e2 );
console.log( e );
});
}
console.log( "worker-sendNotifyConfirmEmail: sendEmail " + userCodeUrl );
//mailingUpdate( mailingId, _mailingState.sent, { historyState: _mailingState.sending } );
}); */
}
}

Loading

0 comments on commit f360e6f

Please sign in to comment.