Skip to content

Commit

Permalink
Merge pull request ibi-group#408 from ibi-group/add-pelias-update-job
Browse files Browse the repository at this point in the history
Pelias: Add Pelias Update Job
  • Loading branch information
miles-grant-ibigroup authored Sep 20, 2021
2 parents 149bc4a + 0098fce commit 7513fec
Show file tree
Hide file tree
Showing 10 changed files with 462 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public enum JobType {
VALIDATE_ALL_FEEDS,
MONITOR_SERVER_STATUS,
MERGE_FEED_VERSIONS,
RECREATE_BUILD_IMAGE
RECREATE_BUILD_IMAGE,
UPDATE_PELIAS
}

public MonitorableJob(Auth0UserProfile owner, String name, JobType type) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,36 @@
package com.conveyal.datatools.common.utils;

import com.amazonaws.AmazonServiceException;
import com.conveyal.datatools.common.utils.aws.CheckedAWSException;
import com.conveyal.datatools.common.utils.aws.S3Utils;
import com.conveyal.datatools.manager.auth.Auth0UserProfile;
import com.conveyal.datatools.manager.utils.ErrorUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.io.ByteStreams;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.eclipse.jetty.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spark.HaltException;
import spark.Request;
import spark.Response;

import javax.servlet.MultipartConfigElement;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletRequestWrapper;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.Part;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;

import static com.conveyal.datatools.manager.DataManager.getConfigPropertyAsText;
Expand Down Expand Up @@ -75,7 +84,10 @@ public static String formatJSON (String key, String value) {
* supplied details about the exception encountered.
*/
public static ObjectNode getObjectNode(String message, int code, Exception e) {
String detail = e != null ? e.getMessage() : null;
String detail = null;
if (e != null) {
detail = e.getMessage() != null ? e.getMessage() : ExceptionUtils.getStackTrace(e);
}
return mapper.createObjectNode()
.put("result", code >= 400 ? "ERR" : "OK")
.put("message", message)
Expand Down Expand Up @@ -265,6 +277,50 @@ public static void copyRequestStreamIntoFile(Request req, File file) {
}
}

/**
* Copies a multi-part file upload to disk, attempts to upload it to S3, then deletes the local file.
* @param req Request object containing file to upload
* @param uploadType A string to include in the uploaded filename. Will also be added to the temporary file
* which makes debugging easier should the upload fail.
* @param key The S3 key to upload the file to
* @return An HTTP S3 url containing the uploaded file
*/
public static String uploadMultipartRequestBodyToS3(Request req, String uploadType, String key) {
// Get file from request
if (req.raw().getAttribute("org.eclipse.jetty.multipartConfig") == null) {
MultipartConfigElement multipartConfigElement = new MultipartConfigElement(System.getProperty("java.io.tmpdir"));
req.raw().setAttribute("org.eclipse.jetty.multipartConfig", multipartConfigElement);
}
String extension = null;
File tempFile = null;
String uploadedFileName = null;
try {
Part part = req.raw().getPart("file");
uploadedFileName = part.getSubmittedFileName();

extension = "." + part.getContentType().split("/", 0)[1];
tempFile = File.createTempFile(part.getName() + "_" + uploadType, extension);
InputStream inputStream;
inputStream = part.getInputStream();
FileOutputStream out = new FileOutputStream(tempFile);
IOUtils.copy(inputStream, out);
} catch (IOException | ServletException e) {
e.printStackTrace();
logMessageAndHalt(req, 400, "Unable to read uploaded file");
}
try {
return S3Utils.uploadObject(uploadType + "/" + key + "_" + uploadedFileName, tempFile);
} catch (AmazonServiceException | CheckedAWSException e) {
logMessageAndHalt(req, 500, "Error uploading file to S3", e);
return null;
} finally {
boolean deleted = tempFile.delete();
if (!deleted) {
LOG.error("Could not delete s3 temporary upload file");
}
}
}

private static String trimLines(String str) {
if (str == null) return "";
String[] lines = str.split("\n");
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/com/conveyal/datatools/common/utils/aws/S3Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.conveyal.datatools.common.utils.SparkUtils;
import com.conveyal.datatools.manager.DataManager;
import com.conveyal.datatools.manager.models.OtpServer;
Expand Down Expand Up @@ -192,6 +194,22 @@ public static String downloadObject(
}
}

/**
* Uploads a file to S3 using a given key
* @param keyName The s3 key to uplaod the file to
* @param fileToUpload The file to upload to S3
* @return A URL where the file is publicly accessible
*/
public static String uploadObject(String keyName, File fileToUpload) throws AmazonServiceException, CheckedAWSException {
String url = S3Utils.getDefaultBucketUrlForKey(keyName);
// FIXME: This may need to change during feed store refactor
getDefaultS3Client().putObject(new PutObjectRequest(
S3Utils.DEFAULT_BUCKET, keyName, fileToUpload)
// grant public read
.withCannedAcl(CannedAccessControlList.PublicRead));
return url;
}

public static AmazonS3 getDefaultS3Client() throws CheckedAWSException {
return getS3Client (null, null);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package com.conveyal.datatools.editor.controllers.api;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.conveyal.datatools.common.utils.SparkUtils;
import com.conveyal.datatools.common.utils.aws.CheckedAWSException;
import com.conveyal.datatools.common.utils.aws.S3Utils;
import com.conveyal.datatools.editor.controllers.EditorLockController;
import com.conveyal.datatools.manager.auth.Auth0UserProfile;
import com.conveyal.datatools.manager.models.FeedSource;
Expand All @@ -23,22 +18,15 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.io.IOUtils;
import org.eclipse.jetty.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spark.HaltException;
import spark.Request;
import spark.Response;

import javax.servlet.MultipartConfigElement;
import javax.servlet.ServletException;
import javax.servlet.http.Part;
import javax.sql.DataSource;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
Expand Down Expand Up @@ -108,49 +96,6 @@ public abstract class EditorController<T extends Entity> {
registerRoutes();
}

public static String uploadBranding(Request req, String key) {
String url;

// Get file from request
if (req.raw().getAttribute("org.eclipse.jetty.multipartConfig") == null) {
MultipartConfigElement multipartConfigElement = new MultipartConfigElement(System.getProperty("java.io.tmpdir"));
req.raw().setAttribute("org.eclipse.jetty.multipartConfig", multipartConfigElement);
}
String extension = null;
File tempFile = null;
try {
Part part = req.raw().getPart("file");
extension = "." + part.getContentType().split("/", 0)[1];
tempFile = File.createTempFile(key + "_branding", extension);
InputStream inputStream;
inputStream = part.getInputStream();
FileOutputStream out = new FileOutputStream(tempFile);
IOUtils.copy(inputStream, out);
} catch (IOException | ServletException e) {
e.printStackTrace();
logMessageAndHalt(req, 400, "Unable to read uploaded file");
}

try {
String keyName = "branding/" + key + extension;
url = S3Utils.getDefaultBucketUrlForKey(keyName);
// FIXME: This may need to change during feed store refactor
S3Utils.getDefaultS3Client().putObject(new PutObjectRequest(
S3Utils.DEFAULT_BUCKET, keyName, tempFile)
// grant public read
.withCannedAcl(CannedAccessControlList.PublicRead));
return url;
} catch (AmazonServiceException | CheckedAWSException e) {
logMessageAndHalt(req, 500, "Error uploading file to S3", e);
return null;
} finally {
boolean deleted = tempFile.delete();
if (!deleted) {
LOG.error("Could not delete s3 upload file.");
}
}
}

/**
* Add static HTTP endpoints to Spark static instance.
*/
Expand Down Expand Up @@ -399,7 +344,7 @@ private String uploadEntityBranding (Request req, Response res) {
int id = getIdFromRequest(req);
String url;
try {
url = uploadBranding(req, String.format("%s_%d", classToLowercase, id));
url = SparkUtils.uploadMultipartRequestBodyToS3(req, "branding", String.format("%s_%d", classToLowercase, id));
} catch (HaltException e) {
// Do not re-catch halts thrown for exceptions that have already been caught.
throw e;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.conveyal.datatools.manager.controllers.api;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3URI;
import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.conveyal.datatools.common.status.MonitorableJob;
import com.conveyal.datatools.common.utils.aws.CheckedAWSException;
import com.conveyal.datatools.common.utils.SparkUtils;
import com.conveyal.datatools.common.utils.aws.CheckedAWSException;
import com.conveyal.datatools.common.utils.aws.EC2Utils;
import com.conveyal.datatools.common.utils.aws.S3Utils;
import com.conveyal.datatools.manager.auth.Auth0UserProfile;
Expand Down Expand Up @@ -32,10 +34,12 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

import static com.conveyal.datatools.common.utils.SparkUtils.logMessageAndHalt;
import static com.conveyal.datatools.manager.DataManager.isExtensionEnabled;
import static com.conveyal.datatools.manager.jobs.DeployJob.bundlePrefix;
import static com.mongodb.client.model.Filters.and;
import static com.mongodb.client.model.Filters.eq;
import static spark.Spark.delete;
Expand Down Expand Up @@ -311,6 +315,12 @@ private static Deployment updateDeployment (Request req, Response res) {
List<String> versionIds = versionsToInsert.stream().map(v -> v.id).collect(Collectors.toList());
Persistence.deployments.updateField(deploymentToUpdate.id, "feedVersionIds", versionIds);
}

// If updatedDocument has deleted a CSV file, also delete that CSV file from S3
if (updateDocument.containsKey("peliasCsvFiles")) {
List<String> csvUrls = (List<String>) updateDocument.get("peliasCsvFiles");
removeDeletedCsvFiles(csvUrls, deploymentToUpdate, req);
}
Deployment updatedDeployment = Persistence.deployments.update(deploymentToUpdate.id, req.body());
// TODO: Should updates to the deployment's fields trigger a notification to subscribers? This could get
// very noisy.
Expand All @@ -337,6 +347,29 @@ private static Deployment updateDeployment (Request req, Response res) {
//
// }

/**
* Helper method for update steps which removes all removed csv files from s3.
* @param csvUrls The new list of csv files
* @param deploymentToUpdate An existing deployment, which contains csv files to check changes against
* @param req A request object used to report failure
*/
private static void removeDeletedCsvFiles(List<String> csvUrls, Deployment deploymentToUpdate, Request req) {
// Only delete if the array differs
if (deploymentToUpdate.peliasCsvFiles != null && !csvUrls.equals(deploymentToUpdate.peliasCsvFiles)) {
for (String existingCsvUrl : deploymentToUpdate.peliasCsvFiles) {
// Only delete if the file does not exist in the deployment
if (!csvUrls.contains(existingCsvUrl)) {
try {
AmazonS3URI s3URIToDelete = new AmazonS3URI(existingCsvUrl);
S3Utils.getDefaultS3Client().deleteObject(new DeleteObjectRequest(s3URIToDelete.getBucket(), s3URIToDelete.getKey()));
} catch(Exception e) {
logMessageAndHalt(req, 500, "Failed to delete file from S3.", e);
}
}
}
}
}

/**
* HTTP endpoint to deregister and terminate a set of instance IDs that are associated with a particular deployment.
* The intent here is to give the user a device by which they can terminate an EC2 instance that has started up, but
Expand Down Expand Up @@ -452,6 +485,48 @@ private static String deploy (Request req, Response res) {
return SparkUtils.formatJobMessage(job.jobId, "Deployment initiating.");
}

/**
* Uploads a file from Spark request object to the s3 bucket of the deployment the Pelias Update Job is associated with.
* Follows https://github.com/ibi-group/datatools-server/blob/dev/src/main/java/com/conveyal/datatools/editor/controllers/api/EditorController.java#L111
* @return S3 URL the file has been uploaded to
*/
private static Deployment uploadToS3 (Request req, Response res) {
// Check parameters supplied in request for validity.
Deployment deployment = getDeploymentWithPermissions(req, res);

String url;
try {

String keyName = String.join(
"/",
bundlePrefix,
deployment.projectId,
deployment.id,
// Where filenames are generated. Prepend random UUID to prevent overwriting
UUID.randomUUID().toString()
);
url = SparkUtils.uploadMultipartRequestBodyToS3(req, "csvUpload", keyName);

// Update deployment csvs
List<String> updatedCsvList = new ArrayList<>(deployment.peliasCsvFiles);
updatedCsvList.add(url);

// If this is set, a file is being replaced
String s3FileToRemove = req.raw().getHeader("urlToDelete");
if (s3FileToRemove != null) {
updatedCsvList.remove(s3FileToRemove);
}

// Persist changes after removing deleted csv files from s3
removeDeletedCsvFiles(updatedCsvList, deployment, req);
return Persistence.deployments.updateField(deployment.id, "peliasCsvFiles", updatedCsvList);

} catch (Exception e) {
logMessageAndHalt(req, 500, "Failed to upload file to S3.", e);
return null;
}
}

public static void register (String apiPrefix) {
// Construct JSON managers which help serialize the response. Slim JSON is the generic JSON view. Full JSON
// contains additional fields (at the moment just #ec2Instances) and should only be used when the controller
Expand All @@ -477,5 +552,7 @@ public static void register (String apiPrefix) {
post(apiPrefix + "secure/deployments", DeploymentController::createDeployment, fullJson::write);
put(apiPrefix + "secure/deployments/:id", DeploymentController::updateDeployment, fullJson::write);
post(apiPrefix + "secure/deployments/fromfeedsource/:id", DeploymentController::createDeploymentFromFeedSource, fullJson::write);
post(apiPrefix + "secure/deployments/:id/upload", DeploymentController::uploadToS3, fullJson::write);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import com.conveyal.datatools.common.utils.Scheduler;
import com.conveyal.datatools.manager.DataManager;
import com.conveyal.datatools.manager.auth.Auth0UserProfile;
import com.conveyal.datatools.manager.auth.Actions;
import com.conveyal.datatools.manager.auth.Auth0UserProfile;
import com.conveyal.datatools.manager.extensions.ExternalFeedResource;
import com.conveyal.datatools.manager.jobs.FetchSingleFeedJob;
import com.conveyal.datatools.manager.jobs.NotifyUsersForSubscriptionJob;
Expand All @@ -21,7 +21,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.bson.conversions.Bson;
import org.eclipse.jetty.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -44,8 +43,6 @@
import static com.conveyal.datatools.manager.models.ExternalFeedSourceProperty.constructId;
import static com.conveyal.datatools.manager.models.transform.NormalizeFieldTransformation.getInvalidSubstitutionMessage;
import static com.conveyal.datatools.manager.models.transform.NormalizeFieldTransformation.getInvalidSubstitutionPatterns;
import static com.mongodb.client.model.Filters.and;
import static com.mongodb.client.model.Filters.eq;
import static com.mongodb.client.model.Filters.in;
import static spark.Spark.delete;
import static spark.Spark.get;
Expand Down
Loading

0 comments on commit 7513fec

Please sign in to comment.