Skip to content

Commit

Permalink
Persisting all incoming events. Updating error ones.
Browse files Browse the repository at this point in the history
  • Loading branch information
volodymyrsy committed Nov 4, 2021
1 parent 18523f0 commit c1d6ad1
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,17 @@ public Response webhook(String json, @Context HttpServletRequest request) throws
// takes a while, so spin it off as a new thread
Runnable thread = () -> {
try {
log.info("Saving event with id '{}'...", event.getId());
env.webhookRequestService().persist(event.getId(), "stripe", new JSONObject(event));
processEvent(event.getType(), stripeObject, env);
env.failedRequestService().delete(event.getId());

// TODO: decide if we want to delete right away or keep it for future analysis/analytics
//env.webhookRequestService().delete(event.getId());
} catch (Exception e) {
log.error("failed to process the Stripe event", e);
// TODO: email notification?
log.info("Saving event with id '{}' as failed request...", event.getId());
env.failedRequestService().persist(event.getId(), new JSONObject(event), e.getMessage());
log.info("Updating event with id '{}' with error details...", event.getId());
env.webhookRequestService().updateWithErrorMessage(event.getId(), e.getMessage());
}
};
new Thread(thread).start();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.impactupgrade.nucleus.dao;

import com.impactupgrade.nucleus.model.FailedRequest;
import com.impactupgrade.nucleus.model.WebhookRequest;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.Transaction;
Expand All @@ -10,56 +10,56 @@

import java.util.Optional;

public class FailedRequestDao implements Dao<String, FailedRequest> {
public class WebhookRequestDao implements Dao<String, WebhookRequest> {

private final SessionFactory sessionFactory;

public FailedRequestDao() {
public WebhookRequestDao() {
final Configuration configuration = new Configuration();
configuration.addAnnotatedClass(FailedRequest.class);
configuration.addAnnotatedClass(WebhookRequest.class);
this.sessionFactory = configuration.buildSessionFactory(new StandardServiceRegistryBuilder().build());
}

@Override
public FailedRequest create(FailedRequest failedRequest) {
public WebhookRequest create(WebhookRequest webhookRequest) {
final Session session = sessionFactory.openSession();
Transaction transaction = session.beginTransaction();
session.save(failedRequest);
session.save(webhookRequest);
transaction.commit();
session.close();
return failedRequest;
return webhookRequest;
}

@Override
public Optional<FailedRequest> get(String id) {
public Optional<WebhookRequest> get(String id) {
if (StringUtils.isEmpty(id)) {
return Optional.empty();
}

final Session session = sessionFactory.openSession();
FailedRequest failedRequest = session.get(FailedRequest.class, id);
WebhookRequest webhookRequest = session.get(WebhookRequest.class, id);
session.close();
return Optional.ofNullable(failedRequest);
return Optional.ofNullable(webhookRequest);
}

@Override
public FailedRequest update(FailedRequest failedRequest) {
public WebhookRequest update(WebhookRequest webhookRequest) {
final Session session = sessionFactory.openSession();
Transaction transaction = session.beginTransaction();
session.update(failedRequest);
session.update(webhookRequest);
transaction.commit();
session.close();
return failedRequest;
return webhookRequest;
}

@Override
public Optional<FailedRequest> delete(String id) {
public Optional<WebhookRequest> delete(String id) {
final Session session = sessionFactory.openSession();
Transaction transaction = session.beginTransaction();
FailedRequest failedRequest = session.get(FailedRequest.class, id);
session.delete(failedRequest);
WebhookRequest webhookRequest = session.get(WebhookRequest.class, id);
session.delete(webhookRequest);
transaction.commit();
session.close();
return Optional.ofNullable(failedRequest);
return Optional.ofNullable(webhookRequest);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import com.impactupgrade.nucleus.client.TwilioClient;
import com.impactupgrade.nucleus.service.logic.ContactService;
import com.impactupgrade.nucleus.service.logic.DonationService;
import com.impactupgrade.nucleus.service.logic.FailedRequestService;
import com.impactupgrade.nucleus.service.logic.WebhookRequestService;
import com.impactupgrade.nucleus.service.logic.MessagingService;
import com.impactupgrade.nucleus.service.segment.CrmService;
import com.impactupgrade.nucleus.service.segment.EmailPlatformService;
Expand Down Expand Up @@ -102,7 +102,7 @@ public void setOtherContext(MultivaluedMap<String, String> otherContext) {
public DonationService donationService() { return new DonationService(this); }
public ContactService contactService() { return new ContactService(this); }
public MessagingService messagingService() { return new MessagingService(this); }
public FailedRequestService failedRequestService() { return new FailedRequestService(this); }
public WebhookRequestService webhookRequestService() { return new WebhookRequestService(this); }

// segment services

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
@Getter
@Setter
@Entity
@Table(name = "failed_request", schema = "public")
public class FailedRequest {
@Table(name = "webhook_request", schema = "public")
public class WebhookRequest {
@Id
private String id;
@Column(name = "payload_type", nullable = false)
private String payloadType;
@Column(name = "payload", nullable = false)
private String payload;
@Column(name = "error_message", nullable = false)
@Column(name = "error_message")
private String errorMessage;
@Column(name = "attempt_count")
private int attemptCount;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.impactupgrade.nucleus.service.logic;

import com.google.common.base.Strings;
import com.impactupgrade.nucleus.dao.WebhookRequestDao;
import com.impactupgrade.nucleus.environment.Environment;
import com.impactupgrade.nucleus.model.WebhookRequest;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;

import java.util.Date;
import java.util.Objects;
import java.util.Optional;

public class WebhookRequestService {

private static final Logger log = LogManager.getLogger(WebhookRequestService.class);

private Environment env;
private WebhookRequestDao webhookRequestDao;

public WebhookRequestService(Environment env) {
this.env = env;
this.webhookRequestDao = new WebhookRequestDao();
}

public void persist(String id, String payloadType, JSONObject jsonObject) {
if (Strings.isNullOrEmpty(id) || Strings.isNullOrEmpty(payloadType) || Objects.isNull(jsonObject)) {
return;
}
try {
upsert(newWebhookRequest(id, payloadType, jsonObject));
} catch (Exception e) {
log.error("Failed to persist webhook request!", e);
}
}

public void updateWithErrorMessage(String id, String errorMessage) {
if (Strings.isNullOrEmpty(id)) {
return;
}
try {
Optional<WebhookRequest> existingRecord = webhookRequestDao.get(id);
if (existingRecord.isPresent()) {
WebhookRequest webhookRequest = existingRecord.get();
webhookRequest.setErrorMessage(errorMessage);
webhookRequestDao.update(webhookRequest);
}
} catch (Exception e) {
log.error("Failed to update webhook request!", e);
}
}

public void delete(String id) {
if (Strings.isNullOrEmpty(id)) {
return;
}
try {
webhookRequestDao.delete(id);
} catch (Exception e) {
log.error("Failed to delete failed request!", e);
}
}

// Utils
private WebhookRequest newWebhookRequest(String id, String payloadType, JSONObject payload) {
WebhookRequest webhookRequest = new WebhookRequest();
webhookRequest.setId(id);
webhookRequest.setPayloadType(payloadType);
webhookRequest.setPayload(payload.toString());
webhookRequest.setAttemptCount(1);
webhookRequest.setFirstAttemptTime(new Date());
webhookRequest.setLastAttemptTime(new Date());
return webhookRequest;
}

private void upsert(WebhookRequest webhookRequest) {
String id = webhookRequest.getId();
Optional<WebhookRequest> existingRecord = webhookRequestDao.get(id);
if (existingRecord.isEmpty()) {
webhookRequestDao.create(webhookRequest);
} else {
int attempts = existingRecord.get().getAttemptCount() + 1;
webhookRequest.setAttemptCount(attempts);
webhookRequest.setLastAttemptTime(new Date());
webhookRequestDao.update(webhookRequest);
}
}

}

0 comments on commit c1d6ad1

Please sign in to comment.