Skip to content

Commit

Permalink
Saving sms conversations into crm task; Added temp method in crm cont…
Browse files Browse the repository at this point in the history
…roller for manual testing

Twilio Conversation webhook added (onMessageAdded)

Code review comments

pulled logic into MessagingService, convering the controller endpoint into a generic conversations webhook

Removed unsuned task config property; Setting task id in update method

Added MBT inbound/message-status webhooks; Saving inbound messages into crm activity (tasks)

Added basic processing of Mailchimp message webhooks

Code review comments

Using combinations of webhook data fields as a converstion key for MBT and Mailchimp
  • Loading branch information
VSydor authored and brmeyer committed Sep 4, 2023
1 parent 2c2891f commit 7331b7e
Show file tree
Hide file tree
Showing 19 changed files with 532 additions and 53 deletions.
3 changes: 3 additions & 0 deletions src/main/java/com/impactupgrade/nucleus/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.impactupgrade.nucleus.controller.EmailController;
import com.impactupgrade.nucleus.controller.EventsController;
import com.impactupgrade.nucleus.controller.JobController;
import com.impactupgrade.nucleus.controller.MBTController;
import com.impactupgrade.nucleus.controller.MailchimpController;
import com.impactupgrade.nucleus.controller.PaymentGatewayController;
import com.impactupgrade.nucleus.controller.ScheduledJobController;
Expand Down Expand Up @@ -100,6 +101,7 @@ public void start() throws Exception {
apiConfig.register(twilioFrontlineController());
apiConfig.register(accountingController());
apiConfig.register(jobController());
apiConfig.register(mbtController());

// Controllers that require DB connectivity -- prevent JDBC/Hikari connection errors.
if ("true".equalsIgnoreCase(System.getenv("DATABASE_CONNECTED"))) {
Expand Down Expand Up @@ -160,6 +162,7 @@ public void registerServlets(ServletContextHandler context) throws Exception {}
protected ScheduledJobController scheduledJobController() { return new ScheduledJobController(envFactory); }
protected AccountingController accountingController() { return new AccountingController(envFactory); }
protected JobController jobController() { return new JobController(envFactory); }
protected MBTController mbtController() { return new MBTController(envFactory); }
public EnvironmentFactory getEnvironmentFactory() {
return envFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,13 @@ public Optional<SObject> getUserByEmail(String email, String... extraFields) thr
return querySingle(query);
}

protected static final String TASK_FIELDS = "Id, WhoId, OwnerId, Subject, description, status, priority, activityDate";

public Optional<SObject> getActivityByExternalReference(String externalReference) throws ConnectionException, InterruptedException {
String query = "select " + TASK_FIELDS + " from task where " + env.getConfig().salesforce.fieldDefinitions.activityExternalReference + " = '" + externalReference + "'";
return querySingle(query);
}

/**
* Use with caution, it retrieves ALL active users. Unsuitable for orgs with many users.
*/
Expand Down
135 changes: 135 additions & 0 deletions src/main/java/com/impactupgrade/nucleus/controller/MBTController.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package com.impactupgrade.nucleus.controller;

import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.impactupgrade.nucleus.entity.JobStatus;
import com.impactupgrade.nucleus.entity.JobType;
import com.impactupgrade.nucleus.environment.Environment;
import com.impactupgrade.nucleus.environment.EnvironmentFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;

import static com.impactupgrade.nucleus.service.logic.MessagingService.MessageType.SMS;

/**
* To receive webhooks from MBT as messages are sent/received.
*/
@Path("/mbt")
public class MBTController {

private static final Logger log = LogManager.getLogger(MBTController.class);

private static final String DATE_FORMAT = "yyyy-MM-dd";
private static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss";

protected final EnvironmentFactory envFactory;

public MBTController(EnvironmentFactory envFactory) {
this.envFactory = envFactory;
}

/**
* The Inbound Messages webhook is triggered by receipt of a message to your MBT account.
*/
@Path("/inbound/sms/webhook")
@POST
@Consumes(MediaType.APPLICATION_JSON)
public Response inboundSmsWebhook(
InboundMessageWebhookData inboundMessageWebhookData,
@Context HttpServletRequest request
) throws Exception {
Environment env = envFactory.init(request);

String jobName = "SMS Inbound";
env.startJobLog(JobType.EVENT, null, jobName, "MBT");

// Using combination of subscriber number and today's date
// as a conversation id
// to group all user's messages for current day
String conversationId = inboundMessageWebhookData.subscriberNo + "::" + new SimpleDateFormat(DATE_FORMAT).format(new Date());
env.messagingService().upsertCrmConversation(
SMS,
conversationId, // TODO: use customParams to contain conversation id?
inboundMessageWebhookData.externalReferenceId,
inboundMessageWebhookData.message);

env.endJobLog(JobStatus.DONE);

return Response.ok().build();
}

/**
* The Message Status webhook is triggered as a message sent from an Account progresses to a Subscriber.
*/
@Path("/sms/status")
@POST
@Consumes(MediaType.APPLICATION_JSON)
public Response smsStatusWebhook(
MessageStatusWebhookData messageStatusWebhookData,
@Context HttpServletRequest request
) throws Exception {
Environment env = envFactory.init(request);

String jobName = "SMS Status";
env.startJobLog(JobType.EVENT, null, jobName, "MBT");

// Using combination of msisdn and today's date
// as a conversation id
// to group all user's messages' statuses for current day
String conversationId = messageStatusWebhookData.msisdn + "::" + new SimpleDateFormat(DATE_FORMAT).format(new Date());
env.messagingService().upsertCrmConversation(
SMS,
conversationId, // TODO: use customParams to contain conversation id?
messageStatusWebhookData.messageId,
messageStatusWebhookData.message);

env.endJobLog(JobStatus.DONE);

return Response.ok().build();
}

@JsonIgnoreProperties(ignoreUnknown = true)
public static final class InboundMessageWebhookData {
public String externalReferenceId;
public String type;
public String message;
public String subscriberNo;
public String groupName;
public String groupId;
public String communicationCode;
public String messageType;
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = DATE_TIME_FORMAT)
public Date receivedTime;
// Every message received sends the data shown in sample to the target URL.
// The customParams parameters may be specified and will be implemented by MBT.
public Map<String, String> customParams;
}

@JsonIgnoreProperties(ignoreUnknown = true)
public static final class MessageStatusWebhookData {
public String accountId;
public String message;
public String msisdn;
public String groupName;
public String groupId;
public String communicationCode;
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = DATE_TIME_FORMAT)
public Date deliveredTime;
public Map<String, String> properties;
public String statusCode;
public String statusCodeDescription;
public String messageId;
public String referenceId;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.impactupgrade.nucleus.controller;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.impactupgrade.nucleus.entity.JobStatus;
import com.impactupgrade.nucleus.entity.JobType;
import com.impactupgrade.nucleus.environment.Environment;
import com.impactupgrade.nucleus.environment.EnvironmentFactory;

Expand All @@ -11,13 +15,21 @@
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.Date;
import java.util.List;

import static com.impactupgrade.nucleus.service.logic.MessagingService.MessageType.EMAIL;

@Path("/mailchimp")
public class MailchimpController {

private static final String DATE_FORMAT = "yyyy-MM-dd";

protected final EnvironmentFactory envFactory;

public MailchimpController(EnvironmentFactory envFactory){
public MailchimpController(EnvironmentFactory envFactory) {
this.envFactory = envFactory;
}

Expand All @@ -30,7 +42,7 @@ public Response webhook(
@FormParam("email") String email,
@FormParam("list_id") String listId,
@Context HttpServletRequest request
) throws Exception{
) throws Exception {
Environment env = envFactory.init(request);

env.logJobInfo("action = {} reason = {} email = {} list_id = {}", action, reason, email, listId);
Expand All @@ -43,4 +55,107 @@ public Response webhook(

return Response.status(200).build();
}

// Message events
@Path("/webhook/message")
@POST
@Consumes(MediaType.APPLICATION_JSON)
public Response messageEvent(
MessageWebhookPayload webhookPayload,
@Context HttpServletRequest request
) throws Exception {
Environment env = envFactory.init(request);

String jobName = "Mailchimp webhook events batch";
env.startJobLog(JobType.EVENT, null, jobName, "Mailchimp");

env.logJobInfo("Mailchimp message event batch received. Batch size: {}", webhookPayload.events.size());

JobStatus jobStatus = JobStatus.DONE;
for (Event event: webhookPayload.events) {
try {
processEvent(event, env);
} catch (Exception e) {
env.logJobError("Failed to process event! Event type/email: {}/{}",
event.eventType, event.message.email, e);
}
}

env.endJobLog(jobStatus);

return Response.status(200).build();
}

private void processEvent(Event event, Environment env) throws Exception {
if (event == null) {
return;
}
if ("send".equalsIgnoreCase(event.eventType)) {
// Using sender::recipient::sent-date
// as a conversation id
Date sentAt = Date.from(Instant.ofEpochSecond(event.message.timestamp));
String conversationId = event.message.sender + "::" + event.message.email + "::" + new SimpleDateFormat(DATE_FORMAT).format(sentAt);
env.messagingService().upsertCrmConversation(
EMAIL,
conversationId,
event.message.id,
event.message.subject); // using subject instead of message body (body n\a in the webhook's payload)
} else {
env.logJobInfo("skipping event type {}...", event.eventType);
}
}

public static final class MessageWebhookPayload {
@JsonProperty("mandrill_events")
public List<Event> events;
}

@JsonIgnoreProperties(ignoreUnknown = true)
public static final class Event {
@JsonProperty("_id")
public String id;
@JsonProperty("event")
public String eventType;
@JsonProperty("msg")
public Message message;

//@JsonProperty("ts")
//public Long timestamp;
//public String url;
//public String ip;
//@JsonProperty("user_agent")
//public String userAgent;
//public Object location;
//@JsonProperty("user_agent_parsed")
//public List<Object> userAgentParsed;

}

@JsonIgnoreProperties(ignoreUnknown = true)
public static final class Message {
@JsonProperty("_id")
public String id;
public String state; // One of: sent, rejected, spam, unsub, bounced, or soft-bounced
public String email;
public String sender;
public String subject;
@JsonProperty("ts")
public Long timestamp; // the integer UTC UNIX timestamp when the message was sent
//TODO: add object definitions, if needed
//@JsonProperty("smtp_events")
//public List<Object> smtpEvents;
//public List<Object> opens;
//public List<Object> clicks;
//public List<String> tags;
//public Map<String, Object> metadata;
//@JsonProperty("subaccount")
//public String subAccount;
//public String diag; //specific SMTP response code and bounce description, if any, received from the remote server
//@JsonProperty("bounce_description")
//public String bounceDescription;
//public String template;
}

//TODO: Sync events: add/remove (to either of allowlist or denylist)
//TODO: inbound messages
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.impactupgrade.nucleus.model.ContactSearch;
import com.impactupgrade.nucleus.model.CrmContact;
import com.impactupgrade.nucleus.model.CrmOpportunity;
import com.impactupgrade.nucleus.security.SecurityUtil;
import com.impactupgrade.nucleus.service.logic.NotificationService;
import com.impactupgrade.nucleus.util.Utils;
import com.twilio.twiml.MessagingResponse;
Expand Down Expand Up @@ -43,6 +44,7 @@

import static com.impactupgrade.nucleus.entity.JobStatus.DONE;
import static com.impactupgrade.nucleus.entity.JobStatus.FAILED;
import static com.impactupgrade.nucleus.service.logic.MessagingService.MessageType.SMS;
import static com.impactupgrade.nucleus.util.Utils.noWhitespace;
import static com.impactupgrade.nucleus.util.Utils.trim;

Expand Down Expand Up @@ -291,6 +293,51 @@ public Response proxyVoice(
return Response.ok().entity(xml).build();
}

/**
* This webhook handles 'onMessageAdded' event for Conversations, creating CRM activities. However, note that
* tracking of one-off messages is instead handled by inboundWebhook!
*/
@Path("/callback/conversations")
@POST
@Consumes(MediaType.APPLICATION_FORM_URLENCODED)
@Produces(MediaType.APPLICATION_JSON)
public Response conversationsWebhook(
@FormParam("EventType") String eventType,
@FormParam("ConversationSid") String conversationSid,
@FormParam("MessageSid") String messageSid,
@FormParam("MessagingServiceSid") String messagingServiceSid,
@FormParam("Index") Integer index,
@FormParam("DateCreated") String date, //ISO8601 time
@FormParam("Body") String body,
@FormParam("Author") String author,
@FormParam("ParticipantSid") String participantSid,
@FormParam("Attributes") String attributes,
@FormParam("Media") String media, // Stringified JSON array of attached media objects
@Context HttpServletRequest request
) throws Exception {
Environment env = envFactory.init(request);
SecurityUtil.verifyApiKey(env);

env.startJobLog(JobType.EVENT, null, "Conversation Webhook", "Twilio");
env.logJobInfo("eventType={} conversationSid={} messageSid={} messagingServiceSid={} index={} date={} body={} author={} participantSid={} attributes={} media={}",
eventType, conversationSid, messageSid, messagingServiceSid, index, date, body, author, participantSid, attributes, media);

switch (eventType) {
case "onMessageAdded":
env.messagingService().upsertCrmConversation(
SMS,
conversationSid,
messageSid,
body);
env.endJobLog(DONE);
return Response.ok().build();
default:
env.logJobWarn("unexpected eventType: " + eventType);
env.endJobLog(FAILED);
return Response.status(422).build();
}
}

// TODO: Temporary method to prototype an MMS replacement of the mobile app. In the future,
// this can be molded into an API...
public static void main(String[] args) {
Expand Down
Loading

0 comments on commit 7331b7e

Please sign in to comment.