Skip to content

Commit

Permalink
DC-958: Send email on snapshot creation complete (#1760)
Browse files Browse the repository at this point in the history
- Add general support for using the thurloe pubsub topic for email notifications from TDR
- Add an email to be sent when snapshot creation is complete
  • Loading branch information
pshapiro4broad authored Aug 20, 2024
1 parent 15319cb commit d00c467
Show file tree
Hide file tree
Showing 18 changed files with 392 additions and 114 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package bio.terra.app.configuration;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "notification")
public record NotificationConfiguration(String projectId, String topicId) {}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Set;
import java.util.UUID;
import org.broadinstitute.dsde.workbench.client.sam.model.ManagedResourceGroupCoordinates;
import org.broadinstitute.dsde.workbench.client.sam.model.UserIdInfo;

/**
* This is the interface to IAM used in the main body of the repository code. Right now, the only
Expand Down Expand Up @@ -332,4 +333,6 @@ void azureCreateManagedResourceGroup(
boolean getResourceTypeAdminPermission(
AuthenticatedUserRequest userReq, IamResourceType iamResourceType, IamAction action)
throws InterruptedException;

UserIdInfo getUserIds(String accessToken, String userEmail) throws InterruptedException;
}
6 changes: 6 additions & 0 deletions src/main/java/bio/terra/service/auth/iam/IamService.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.collections4.map.PassiveExpiringMap;
import org.broadinstitute.dsde.workbench.client.sam.model.UserIdInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -522,4 +523,9 @@ public String signUrlForBlob(
AuthenticatedUserRequest userReq, String project, String path, Duration duration) {
return callProvider(() -> iamProvider.signUrlForBlob(userReq, project, path, duration));
}

public UserIdInfo getUserIds(String userEmail) {
String tdrSaAccessToken = googleCredentialsService.getApplicationDefaultAccessToken(SCOPES);
return callProvider(() -> iamProvider.getUserIds(tdrSaAccessToken, userEmail));
}
}
46 changes: 19 additions & 27 deletions src/main/java/bio/terra/service/auth/iam/sam/SamIam.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.broadinstitute.dsde.workbench.client.sam.model.RolesAndActions;
import org.broadinstitute.dsde.workbench.client.sam.model.SyncReportEntry;
import org.broadinstitute.dsde.workbench.client.sam.model.SystemStatus;
import org.broadinstitute.dsde.workbench.client.sam.model.UserIdInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -840,6 +841,15 @@ AccessPolicyMembershipRequest createAccessPolicy(IamRole role, List<String> emai
return membership;
}

@Override
public UserIdInfo getUserIds(String accessToken, String userEmail) throws InterruptedException {
return SamRetry.retry(configurationService, () -> getUserIdsInner(accessToken, userEmail));
}

private UserIdInfo getUserIdsInner(String accessToken, String userEmail) throws ApiException {
return samApiService.usersApi(accessToken).getUserIds(userEmail);
}

/**
* Syncing a policy with SAM results in a Google group being created that is tied to that policy.
* The response is an object with one key that is the policy group email and a value that is a
Expand Down Expand Up @@ -867,42 +877,24 @@ public static ErrorReportException convertSamExToDataRepoEx(final ApiException s
logger.warn("Sam client exception details: {}", samEx.getResponseBody());

// Sometimes the sam message is buried several levels down inside of the error report object.
String message = null;
String message;
try {
ErrorReport errorReport = objectMapper.readValue(samEx.getResponseBody(), ErrorReport.class);
message = extractErrorMessage(errorReport);
} catch (JsonProcessingException | IllegalArgumentException ex) {
message = Objects.requireNonNullElse(samEx.getMessage(), "Sam client exception");
}

switch (samEx.getCode()) {
case HttpStatusCodes.STATUS_CODE_BAD_REQUEST:
{
return new IamBadRequestException(message, samEx);
}
case HttpStatusCodes.STATUS_CODE_UNAUTHORIZED:
{
return new IamUnauthorizedException(message, samEx);
}
case HttpStatusCodes.STATUS_CODE_FORBIDDEN:
{
return new IamForbiddenException(message, samEx);
}
case HttpStatusCodes.STATUS_CODE_NOT_FOUND:
{
return new IamNotFoundException(message, samEx);
}
case HttpStatusCodes.STATUS_CODE_CONFLICT:
{
return new IamConflictException(message, samEx);
}
return switch (samEx.getCode()) {
case HttpStatusCodes.STATUS_CODE_BAD_REQUEST -> new IamBadRequestException(message, samEx);
case HttpStatusCodes.STATUS_CODE_UNAUTHORIZED -> new IamUnauthorizedException(message, samEx);
case HttpStatusCodes.STATUS_CODE_FORBIDDEN -> new IamForbiddenException(message, samEx);
case HttpStatusCodes.STATUS_CODE_NOT_FOUND -> new IamNotFoundException(message, samEx);
case HttpStatusCodes.STATUS_CODE_CONFLICT -> new IamConflictException(message, samEx);
// SAM does not use a 501 NOT_IMPLEMENTED status code, so that case is skipped here
// A 401 error will only occur when OpenDJ is down and should be raised as a 500 error
default:
{
return new IamInternalServerErrorException(message, samEx);
}
}
default -> new IamInternalServerErrorException(message, samEx);
};
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package bio.terra.service.notification;

import bio.terra.app.configuration.NotificationConfiguration;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.PostConstruct;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public class NotificationService {

private static final Logger logger = LoggerFactory.getLogger(NotificationService.class);

private final PubSubService pubSubService;
private final NotificationConfiguration notificationConfiguration;
private final ObjectMapper objectMapper;

public NotificationService(
PubSubService pubSubService,
NotificationConfiguration notificationConfiguration,
ObjectMapper objectMapper) {
this.pubSubService = pubSubService;
this.notificationConfiguration = notificationConfiguration;
this.objectMapper = objectMapper;
}

@VisibleForTesting
@PostConstruct
protected void createTopic() {
try {
pubSubService.createTopic(
notificationConfiguration.projectId(), notificationConfiguration.topicId());
} catch (IOException e) {
logger.warn("Error creating notification topic", e);
}
}

public void snapshotReady(
String subjectId, String snapshotExportLink, String snapshotName, String snapshotSummary) {
try {
pubSubService.publishMessage(
notificationConfiguration.projectId(),
notificationConfiguration.topicId(),
objectMapper.writeValueAsString(
new SnapshotReadyNotification(
subjectId, snapshotExportLink, snapshotName, snapshotSummary)));
} catch (IOException e) {
logger.warn("Error sending notification", e);
}
}
}
33 changes: 33 additions & 0 deletions src/main/java/bio/terra/service/notification/PubSubService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package bio.terra.service.notification;

import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public class PubSubService {
private static final Logger logger = LoggerFactory.getLogger(PubSubService.class);

public void createTopic(String projectId, String topicId) throws IOException {
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
TopicName topicName = TopicName.of(projectId, topicId);
if (topicAdminClient.getTopic(topicName) == null) {
Topic topic = topicAdminClient.createTopic(topicName);
logger.info("Created topic: {}", topic.getName());
}
}
}

public void publishMessage(String projectId, String topicId, String message) throws IOException {
TopicName topicName = TopicName.of(projectId, topicId);
var publisher = Publisher.newBuilder(topicName).build();
publisher.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(message)).build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package bio.terra.service.notification;

public record SnapshotReadyNotification(
String notificationType,
String recipientUserId,
String snapshotExportLink,
String snapshotName,
String snapshotSummary) {
public SnapshotReadyNotification(
String recipientUserId,
String snapshotExportLink,
String snapshotName,
String snapshotSummary) {
this(
"SnapshotReadyNotification",
recipientUserId,
snapshotExportLink,
snapshotName,
snapshotSummary);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package bio.terra.service.snapshot.flight.create;

import bio.terra.service.auth.iam.IamService;
import bio.terra.service.job.DefaultUndoStep;
import bio.terra.service.snapshotbuilder.SnapshotBuilderService;
import bio.terra.service.snapshotbuilder.SnapshotRequestDao;
import bio.terra.stairway.FlightContext;
import bio.terra.stairway.StepResult;
import bio.terra.stairway.exception.RetryException;
import java.util.UUID;

public class NotifyUserOfSnapshotCreationStep extends DefaultUndoStep {
private final SnapshotBuilderService snapshotBuilderService;
private final SnapshotRequestDao snapshotRequestDao;
private final IamService iamService;
private final UUID snapshotRequestId;

public NotifyUserOfSnapshotCreationStep(
SnapshotBuilderService snapshotBuilderService,
SnapshotRequestDao snapshotRequestDao,
IamService iamService,
UUID snapshotRequestId) {
this.snapshotBuilderService = snapshotBuilderService;
this.snapshotRequestDao = snapshotRequestDao;
this.iamService = iamService;
this.snapshotRequestId = snapshotRequestId;
}

@Override
public StepResult doStep(FlightContext flightContext)
throws InterruptedException, RetryException {
var request = snapshotRequestDao.getById(snapshotRequestId);
var user = iamService.getUserIds(request.createdBy());
snapshotBuilderService.notifySnapshotReady(user.getUserSubjectId(), snapshotRequestId);
return StepResult.getStepResultSuccess();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -432,11 +432,16 @@ public SnapshotCreateFlight(FlightMap inputParameters, Object applicationContext
IamResourceType.DATASET,
"A snapshot was created from this dataset."));

// at end of flight, add created snapshot id to the snapshot request
if (mode == SnapshotRequestContentsModel.ModeEnum.BYREQUESTID) {
UUID snapshotRequestId = contents.getRequestIdSpec().getSnapshotRequestId();
// Add created snapshot id to the snapshot request.
addStep(
new AddCreatedSnapshotIdToSnapshotRequestStep(
snapshotRequestDao, contents.getRequestIdSpec().getSnapshotRequestId(), snapshotId));
snapshotRequestDao, snapshotRequestId, snapshotId));
// Notify user that snapshot is ready to use.
addStep(
new NotifyUserOfSnapshotCreationStep(
snapshotBuilderService, snapshotRequestDao, iamService, snapshotRequestId));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package bio.terra.service.snapshotbuilder;

import bio.terra.app.configuration.TerraConfiguration;
import bio.terra.common.CloudPlatformWrapper;
import bio.terra.common.exception.ApiException;
import bio.terra.common.exception.BadRequestException;
Expand All @@ -23,6 +24,7 @@
import bio.terra.service.dataset.Dataset;
import bio.terra.service.dataset.DatasetService;
import bio.terra.service.filedata.azure.AzureSynapsePdao;
import bio.terra.service.notification.NotificationService;
import bio.terra.service.snapshot.Snapshot;
import bio.terra.service.snapshot.SnapshotService;
import bio.terra.service.snapshotbuilder.query.Query;
Expand Down Expand Up @@ -61,9 +63,10 @@ public class SnapshotBuilderService {
private final IamService iamService;
private final SnapshotService snapshotService;
private final BigQuerySnapshotPdao bigQuerySnapshotPdao;

private final AzureSynapsePdao azureSynapsePdao;
private final QueryBuilderFactory queryBuilderFactory;
private final NotificationService notificationService;
private final TerraConfiguration terraConfiguration;

public SnapshotBuilderService(
SnapshotRequestDao snapshotRequestDao,
Expand All @@ -72,16 +75,20 @@ public SnapshotBuilderService(
IamService iamService,
SnapshotService snapshotService,
BigQuerySnapshotPdao bigQuerySnapshotPdao,
NotificationService notificationService,
AzureSynapsePdao azureSynapsePdao,
QueryBuilderFactory queryBuilderFactory) {
QueryBuilderFactory queryBuilderFactory,
TerraConfiguration terraConfiguration) {
this.snapshotRequestDao = snapshotRequestDao;
this.snapshotBuilderSettingsDao = snapshotBuilderSettingsDao;
this.datasetService = datasetService;
this.iamService = iamService;
this.snapshotService = snapshotService;
this.bigQuerySnapshotPdao = bigQuerySnapshotPdao;
this.notificationService = notificationService;
this.azureSynapsePdao = azureSynapsePdao;
this.queryBuilderFactory = queryBuilderFactory;
this.terraConfiguration = terraConfiguration;
}

public SnapshotAccessRequestResponse createRequest(
Expand Down Expand Up @@ -414,4 +421,20 @@ private SnapshotBuilderDomainOption getDomainOption(
String domainId = getDomainId(conceptId, snapshot, userRequest);
return getDomainOptionFromSettingsByName(domainId, snapshot.getId());
}

public String createExportSnapshotLink(UUID snapshotId) {
return "%s/import-data?snapshotId=%s&format=tdrexport&tdrSyncPermissions=false"
.formatted(terraConfiguration.basePath(), snapshotId);
}

public void notifySnapshotReady(String subjectId, UUID snapshotRequestId) {
var snapshotAccessRequest = snapshotRequestDao.getById(snapshotRequestId);
UUID snapshotId = snapshotAccessRequest.createdSnapshotId();
Snapshot snapshot = snapshotService.retrieve(snapshotId);
notificationService.snapshotReady(
subjectId,
createExportSnapshotLink(snapshotId),
snapshot.getName(),
convertModelToApiResponse(snapshotAccessRequest).getSummary());
}
}
2 changes: 2 additions & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,5 @@ duos.basePath=https://consent.dsde-dev.broadinstitute.org
tps.basePath=https://tps.dsde-dev.broadinstitute.org
sentry.dsn=
sentry.environment=undefined
notification.projectId=broad-dsde-dev
notification.topicId=workbench-notifications-dev
Loading

0 comments on commit d00c467

Please sign in to comment.