diff --git a/src/main/java/com/impactupgrade/nucleus/client/MailchimpClient.java b/src/main/java/com/impactupgrade/nucleus/client/MailchimpClient.java index c9af3c6ea..185953ded 100644 --- a/src/main/java/com/impactupgrade/nucleus/client/MailchimpClient.java +++ b/src/main/java/com/impactupgrade/nucleus/client/MailchimpClient.java @@ -6,6 +6,9 @@ import com.ecwid.maleorang.MailchimpException; import com.ecwid.maleorang.MailchimpObject; +import com.ecwid.maleorang.method.v3_0.batches.BatchStatus; +import com.ecwid.maleorang.method.v3_0.batches.GetBatchStatusMethod; +import com.ecwid.maleorang.method.v3_0.batches.StartBatchMethod; import com.ecwid.maleorang.method.v3_0.lists.members.DeleteMemberMethod; import com.ecwid.maleorang.method.v3_0.lists.members.EditMemberMethod; import com.ecwid.maleorang.method.v3_0.lists.members.GetMemberMethod; @@ -14,7 +17,10 @@ import com.ecwid.maleorang.method.v3_0.lists.merge_fields.EditMergeFieldMethod; import com.ecwid.maleorang.method.v3_0.lists.merge_fields.GetMergeFieldsMethod; import com.ecwid.maleorang.method.v3_0.lists.merge_fields.MergeFieldInfo; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; import com.impactupgrade.nucleus.environment.EnvironmentConfig; +import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -22,7 +28,9 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; +import java.util.Date; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -80,6 +88,28 @@ public void upsertContact(String listId, MemberInfo contact) throws IOException, } } + public String upsertContactsBatch(String listId, List contacts) throws IOException, MailchimpException { + List upsertMemberMethods = contacts.stream() + .map(contact -> { + EditMemberMethod.CreateOrUpdate upsertMemberMethod = new EditMemberMethod.CreateOrUpdate(listId, contact.email_address); + upsertMemberMethod.status_if_new = contact.status; + upsertMemberMethod.mapping.putAll(contact.mapping); + upsertMemberMethod.merge_fields.mapping.putAll(contact.merge_fields.mapping); + upsertMemberMethod.interests.mapping.putAll(contact.interests.mapping); + upsertMemberMethod.tags = contact.tags; + return upsertMemberMethod; + }) + .collect(Collectors.toList()); + + StartBatchMethod startBatchMethod = new StartBatchMethod(upsertMemberMethods); + BatchStatus batchStatus = client.execute(startBatchMethod); + return batchStatus.id; + } + + public List getListMembers(String listId) throws IOException, MailchimpException { + return getListMembers(listId, null, null); + } + // public List getListMembers(String listId, String status) throws IOException, MailchimpException { // return getListMembers(listId, status, null); // } @@ -96,7 +126,7 @@ public List getListMembers(String listId, String status, Calendar si } GetMembersMethod.Response getMemberResponse = client.execute(getMembersMethod); List members = new ArrayList<>(getMemberResponse.members); - while(getMemberResponse.total_items > members.size()) { + while (getMemberResponse.total_items > members.size()) { getMembersMethod.offset = members.size(); log.info("retrieving list {} contacts (offset {} of total {})", listId, getMembersMethod.offset, getMemberResponse.total_items); getMemberResponse = client.execute(getMembersMethod); @@ -119,6 +149,16 @@ public void archiveContact(String listId, String email) throws IOException, Mail } } + public String archiveContactsBatch(String listId, List emails) throws IOException, MailchimpException { + List deleteMemberMethods = emails.stream() + .map(email -> new DeleteMemberMethod(listId, email)) + .collect(Collectors.toList()); + + StartBatchMethod startBatchMethod = new StartBatchMethod(deleteMemberMethods); + BatchStatus batchStatus = client.execute(startBatchMethod); + return batchStatus.id; + } + // TODO: TEST THIS public Set getContactGroupIds(String listId, String contactEmail) throws IOException, MailchimpException { MemberInfo contact = getContactInfo(listId, contactEmail); @@ -131,6 +171,18 @@ public List getContactTags(String listId, String contactEmail) throws IO return tags.stream().map(t -> t.mapping.get(TAG_NAME).toString()).collect(Collectors.toList()); } + public Map> getContactsTags(String listId) throws IOException, MailchimpException { + List memberInfos = getListMembers(listId); + Map> tagsMap = memberInfos.stream() + .collect(Collectors.toMap( + memberInfo -> memberInfo.email_address, memberInfo -> { + List tags = (List) memberInfo.mapping.get(TAGS); + return tags.stream().map(t -> t.mapping.get(TAG_NAME).toString()).collect(Collectors.toList()); + } + )); + return tagsMap; + } + public void updateContactTags(String listId, String contactEmail, List activeTags, List inactiveTags) throws IOException, MailchimpException { ArrayList tags = new ArrayList<>(); for (String activeTag : activeTags) { @@ -151,6 +203,46 @@ public void updateContactTags(String listId, String contactEmail, List a client.execute(editMemberMethod); } + public String updateContactTagsBatch(String listId, List emailContacts) throws IOException, MailchimpException { + List editMemberMethods = + emailContacts.stream() + .map(emailContact -> { + List active = emailContact.activeTags; + List inactive = emailContact.inactiveTags; + ArrayList tags = new ArrayList<>(); + if (CollectionUtils.isNotEmpty(active)) { + for (String activeTag : active) { + MailchimpObject tag = new MailchimpObject(); + tag.mapping.put(TAG_STATUS, TAG_ACTIVE); + tag.mapping.put(TAG_NAME, activeTag); + tags.add(tag); + } + } + if (CollectionUtils.isNotEmpty(inactive)) { + for (String inactiveTag : inactive) { + MailchimpObject tag = new MailchimpObject(); + tag.mapping.put(TAG_STATUS, TAG_INACTIVE); + tag.mapping.put(TAG_NAME, inactiveTag); + tags.add(tag); + } + } + + EditMemberMethod.AddorRemoveTag editMemberMethod = new EditMemberMethod.AddorRemoveTag(listId, emailContact.email); + editMemberMethod.tags = tags; + + return editMemberMethod; + }).collect(Collectors.toList()); + + StartBatchMethod startBatchMethod = new StartBatchMethod(editMemberMethods); + BatchStatus batchStatus = client.execute(startBatchMethod); + return batchStatus.id; + } + + public BatchStatus getBatchStatus(String batchStatusId) throws IOException, MailchimpException { + GetBatchStatusMethod getBatchStatusMethod = new GetBatchStatusMethod(batchStatusId); + return client.execute(getBatchStatusMethod); + } + public List getMergeFields(String listId) throws IOException, MailchimpException { GetMergeFieldsMethod getMergeFields = new GetMergeFieldsMethod(listId); getMergeFields.count = 1000; // the max @@ -174,4 +266,41 @@ public String exceptionToString(MailchimpException e) { } return description; } + + public record EmailContact(String email, List activeTags, List inactiveTags) {}; + + @JsonIgnoreProperties(ignoreUnknown = true) + public static final class BatchOperation { + @JsonProperty("status_code") + public Integer status; + @JsonProperty("operation_id") + public String operationId; + public BatchOperationResponse response; + } + + @JsonIgnoreProperties(ignoreUnknown = true) + public static final class BatchOperationResponse { + public String id; + @JsonProperty("merge_fields") + public Map mergeFields; + @JsonProperty("contact_id") + public String contactId; + @JsonProperty("email_address") + public String email; + @JsonProperty("full_name") + public String fullName; + @JsonProperty("tags_count") + public Integer tagsCount; + @JsonProperty("last_changed") + public Date lastChangedAt; + @JsonProperty("list_id") + public String listId; + + // error response fields + public String instance; + public String detail; + public String type; + public String title; + public String status; + } } diff --git a/src/main/java/com/impactupgrade/nucleus/service/segment/MailchimpEmailService.java b/src/main/java/com/impactupgrade/nucleus/service/segment/MailchimpEmailService.java index c4fbd30b1..5e218d478 100644 --- a/src/main/java/com/impactupgrade/nucleus/service/segment/MailchimpEmailService.java +++ b/src/main/java/com/impactupgrade/nucleus/service/segment/MailchimpEmailService.java @@ -2,19 +2,37 @@ import com.ecwid.maleorang.MailchimpException; import com.ecwid.maleorang.MailchimpObject; +import com.ecwid.maleorang.method.v3_0.batches.BatchStatus; import com.ecwid.maleorang.method.v3_0.lists.members.MemberInfo; import com.ecwid.maleorang.method.v3_0.lists.merge_fields.MergeFieldInfo; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; +import com.google.common.collect.Lists; import com.impactupgrade.nucleus.client.MailchimpClient; import com.impactupgrade.nucleus.environment.Environment; import com.impactupgrade.nucleus.environment.EnvironmentConfig; import com.impactupgrade.nucleus.model.CrmAddress; import com.impactupgrade.nucleus.model.CrmContact; +import com.impactupgrade.nucleus.util.HttpClient; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.commons.io.IOUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.json.JSONArray; +import org.json.JSONObject; +import java.io.BufferedInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Calendar; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -33,6 +51,10 @@ public class MailchimpEmailService extends AbstractEmailService { private static final Logger log = LogManager.getLogger(MailchimpEmailService.class); + private static final Integer BATCH_REQUEST_OPERATIONS_SIZE = 500; + private static final Integer BATCH_STATUS_RETRY_TIMEOUT_IN_SECONDS = 10; + private static final Integer BATCH_STATUS_MAX_RETRIES = 5; + private final Map mergeFieldsNameToTag = new HashMap<>(); @Override @@ -64,35 +86,158 @@ public void syncContacts(Calendar lastSync) throws Exception { List crmContacts = getCrmContacts(emailList, lastSync); Map> crmContactCampaignNames = getContactCampaignNames(crmContacts); - - int count = 0; - for (CrmContact crmContact : crmContacts) { - log.info("updating contact {} {} on list {} ({} of {})", crmContact.id, crmContact.email, emailList.id, count++, crmContacts.size()); - syncContact(crmContact, crmContactCampaignNames, mailchimpConfig, emailList); + Map> tags = new MailchimpClient(mailchimpConfig).getContactsTags(emailList.id); + + List> partitions = Lists.partition(crmContacts, BATCH_REQUEST_OPERATIONS_SIZE); + int i = 1; + for (List contactsBatch : partitions) { + log.info("Processing contacts batch {} of total {}...", i, partitions.size()); + syncContacts(contactsBatch, crmContactCampaignNames, tags, mailchimpConfig, emailList); + i++; } } } } - protected void syncContact(CrmContact crmContact, Map> crmContactCampaignNames, - EnvironmentConfig.EmailPlatform mailchimpConfig, EnvironmentConfig.EmailList emailList) throws Exception { + protected void syncContacts(List crmContacts, Map> crmContactCampaignNames, + Map> tags, EnvironmentConfig.EmailPlatform mailchimpConfig, EnvironmentConfig.EmailList emailList) throws Exception { MailchimpClient mailchimpClient = new MailchimpClient(mailchimpConfig); + List contactsToUpsert = new ArrayList<>(); + List contactsToArchive = new ArrayList<>(); + + // transactional is always subscribed + if (emailList.type == EnvironmentConfig.EmailListType.TRANSACTIONAL) { + contactsToUpsert.addAll(crmContacts); + } else { + crmContacts.forEach(crmContact -> (crmContact.canReceiveEmail() ? contactsToUpsert : contactsToArchive).add(crmContact)); + } + try { - // transactional is always subscribed - if (emailList.type == EnvironmentConfig.EmailListType.TRANSACTIONAL || crmContact.canReceiveEmail()) { - Map customFields = getCustomFields(emailList.id, crmContact, mailchimpClient, mailchimpConfig); - mailchimpClient.upsertContact(emailList.id, toMcMemberInfo(crmContact, customFields, emailList.groups)); - // if they can't, they're archived, and will be failed to be retrieved for update - updateTags(emailList.id, crmContact, crmContactCampaignNames.get(crmContact.id), mailchimpClient, mailchimpConfig); - } else if (!crmContact.canReceiveEmail()) { - mailchimpClient.archiveContact(emailList.id, crmContact.email); + Map> contactsCustomFields = new HashMap<>(); + for (CrmContact crmContact : contactsToUpsert) { + Map customFieldMap = getCustomFields(emailList.id, crmContact, mailchimpClient, mailchimpConfig); + contactsCustomFields.put(crmContact.email, customFieldMap); } + + List memberInfos = toMemberInfos(emailList, contactsToUpsert, contactsCustomFields); + + String upsertBatchId = mailchimpClient.upsertContactsBatch(emailList.id, memberInfos); + // Getting batch processing results synchronously to make sure + // all contacts were processed before updating tags + List batchOperations = getBatchOperations(mailchimpClient, mailchimpConfig, upsertBatchId, 0); + + // Logging error operations + batchOperations.stream() + .filter(batchOperation -> batchOperation.status >= 300) + .forEach(batchOperation -> + log.warn("Failed Batch Operation (status: detail): {}: {}", batchOperation.response.status, batchOperation.response.detail)); + + Map> activeTags = getActiveTags(contactsToUpsert, crmContactCampaignNames, mailchimpConfig); + List emailContacts = contactsToUpsert.stream() + .map(crmContact -> new MailchimpClient.EmailContact(crmContact.email, activeTags.get(crmContact.email), tags.get(crmContact.email))) + .collect(Collectors.toList()); + + updateTagsBatch(emailList.id, emailContacts, mailchimpClient); + + // if they can't, they're archived, and will be failed to be retrieved for update + List emailsToArchive = contactsToArchive.stream().map(crmContact -> crmContact.email).collect(Collectors.toList()); + mailchimpClient.archiveContactsBatch(emailList.id, emailsToArchive); + } catch (MailchimpException e) { - log.warn("Mailchimp syncContact failed: {}", mailchimpClient.exceptionToString(e)); + log.warn("Mailchimp syncContacts failed: {}", mailchimpClient.exceptionToString(e)); } catch (Exception e) { - log.warn("Mailchimp syncContact failed", e); + log.warn("Mailchimp syncContacts failed", e); + } + } + + protected List getBatchOperations(MailchimpClient mailchimpClient, EnvironmentConfig.EmailPlatform mailchimpConfig, String batchStatusId, Integer attemptCount) throws Exception { + List batchOperations = null; + if (attemptCount == BATCH_STATUS_MAX_RETRIES) { + log.error("exhausted retries; returning..."); + } else { + BatchStatus batchStatus = mailchimpClient.getBatchStatus(batchStatusId); + if (!"finished".equalsIgnoreCase(batchStatus.status)) { + log.info("Batch '{}' is not finished. Retrying in {} seconds...", batchStatusId, BATCH_STATUS_RETRY_TIMEOUT_IN_SECONDS); + Thread.sleep(BATCH_STATUS_RETRY_TIMEOUT_IN_SECONDS * 1000); + Integer newAttemptCount = attemptCount + 1; + batchOperations = getBatchOperations(mailchimpClient, mailchimpConfig, batchStatusId, newAttemptCount); + } else { + log.info("Batch '{}' finished! (finished/total) {}/{}", batchStatusId, batchStatus.finished_operations, batchStatus.total_operations); + if (batchStatus.errored_operations > 0) { + log.warn("Errored operations count: {}", batchStatus.errored_operations); + } else { + log.info("All operations processed OK!"); + } + String batchResponse = getBatchResponseAsString(batchStatus, mailchimpConfig); + batchOperations = deserializeBatchOperations(batchResponse); + } } + return batchOperations; + } + + protected String getBatchResponseAsString(BatchStatus batchStatus, EnvironmentConfig.EmailPlatform mailchimpConfig) throws Exception { + if (Strings.isNullOrEmpty(batchStatus.response_body_url)) { + return null; + } + + String responseString = null; + InputStream inputStream = HttpClient.get(batchStatus.response_body_url, HttpClient.HeaderBuilder.builder() + .authBearerToken(mailchimpConfig.secretKey) + .header("Accept-Encoding", "application/gzip"), InputStream.class); + + try (TarArchiveInputStream tarArchiveInputStream = new TarArchiveInputStream( + new GzipCompressorInputStream(new BufferedInputStream(inputStream)))) { + TarArchiveEntry tarArchiveEntry; + while ((tarArchiveEntry = (TarArchiveEntry) tarArchiveInputStream.getNextEntry()) != null) { + if (!tarArchiveEntry.isDirectory()) { + responseString = IOUtils.toString(tarArchiveInputStream, StandardCharsets.UTF_8); + } + } + } catch (Exception e) { + log.error("Failed to get batch response body! {}", e); + } + return responseString; + } + + protected List deserializeBatchOperations(String batchOperationsString) { + if (Strings.isNullOrEmpty(batchOperationsString)) { + return Collections.emptyList(); + } + List batchOperations = new ArrayList<>(); + try { + JSONArray jsonArray = new JSONArray(batchOperationsString); + ObjectMapper objectMapper = new ObjectMapper(); + + for (int i = 0; i< jsonArray.length(); i ++) { + JSONObject batchOperation = jsonArray.getJSONObject(i); + // Response is an escaped string - converting it to json object and back to string to unescape + String response = batchOperation.getString("response"); + JSONObject responseObject = new JSONObject(response); + batchOperation.put("response", responseObject); + + batchOperations.add(objectMapper.readValue(batchOperation.toString(), new TypeReference<>() {})); + } + } catch (JsonProcessingException e) { + log.warn("Failed to deserialize batch operations! {}", e.getMessage()); + } + return batchOperations; + } + + protected Map> getActiveTags(List crmContacts, Map> crmContactCampaignNames, EnvironmentConfig.EmailPlatform mailchimpConfig) throws Exception { + Map> activeTags = new HashMap<>(); + for (CrmContact crmContact : crmContacts) { + List tagsCleaned = getContactTagsCleaned(crmContact, crmContactCampaignNames.get(crmContact.id), mailchimpConfig); + activeTags.put(crmContact.email, tagsCleaned); + } + return activeTags; + } + + protected List toMemberInfos(EnvironmentConfig.EmailList emailList, List crmContacts, + Map> customFieldsMap) { + return crmContacts.stream() + .map(crmContact -> toMcMemberInfo(crmContact, customFieldsMap.get(crmContact.email), emailList.groups)) + .collect(Collectors.toList()); } @Override @@ -165,6 +310,27 @@ public void upsertContact(String email, @Deprecated String contactId) throws Exc } } + protected void syncContact(CrmContact crmContact, Map> crmContactCampaignNames, + EnvironmentConfig.EmailPlatform mailchimpConfig, EnvironmentConfig.EmailList emailList) throws Exception { + MailchimpClient mailchimpClient = new MailchimpClient(mailchimpConfig); + + try { + // transactional is always subscribed + if (emailList.type == EnvironmentConfig.EmailListType.TRANSACTIONAL || crmContact.canReceiveEmail()) { + Map customFields = getCustomFields(emailList.id, crmContact, mailchimpClient, mailchimpConfig); + mailchimpClient.upsertContact(emailList.id, toMcMemberInfo(crmContact, customFields, emailList.groups)); + // if they can't, they're archived, and will be failed to be retrieved for update + updateTags(emailList.id, crmContact, crmContactCampaignNames.get(crmContact.id), mailchimpClient, mailchimpConfig); + } else if (!crmContact.canReceiveEmail()) { + mailchimpClient.archiveContact(emailList.id, crmContact.email); + } + } catch (MailchimpException e) { + log.warn("Mailchimp syncContact failed: {}", mailchimpClient.exceptionToString(e)); + } catch (Exception e) { + log.warn("Mailchimp syncContact failed", e); + } + } + // @Override // public Optional getContactByEmail(String listName, String email) throws Exception { // String listId = getListIdFromName(listName); @@ -213,7 +379,7 @@ protected Map getCustomFields(String listId, CrmContact crmConta if (!mergeFieldsNameToTag.containsKey(customField.name)) { // TEXT, NUMBER, ADDRESS, PHONE, DATE, URL, IMAGEURL, RADIO, DROPDOWN, BIRTHDAY, ZIP - MergeFieldInfo.Type type = switch(customField.type) { + MergeFieldInfo.Type type = switch (customField.type) { case DATE -> MergeFieldInfo.Type.DATE; // MC doesn't support a boolean type, so we use NUMBER and map to 0/1 case BOOLEAN -> MergeFieldInfo.Type.NUMBER; @@ -256,6 +422,20 @@ protected void updateTags(String listId, CrmContact crmContact, List crm } } + protected String updateTagsBatch(String listId, + List emailContacts, + MailchimpClient mailchimpClient) { + emailContacts.stream() + .filter(emailContact -> CollectionUtils.isNotEmpty(emailContact.inactiveTags())) + .forEach(emailContact -> emailContact.inactiveTags().removeAll(emailContact.activeTags())); + try { + return mailchimpClient.updateContactTagsBatch(listId, emailContacts); + } catch (Exception e) { + log.error("updating tags failed for contacts! {}", e.getMessage()); + return null; + } + } + // protected CrmContact toCrmContact(MemberInfo member) { // if (member == null) { // return null;