diff --git a/src/main/java/com/conveyal/datatools/common/status/MonitorableJob.java b/src/main/java/com/conveyal/datatools/common/status/MonitorableJob.java index 6eba99768..a3e487cbd 100644 --- a/src/main/java/com/conveyal/datatools/common/status/MonitorableJob.java +++ b/src/main/java/com/conveyal/datatools/common/status/MonitorableJob.java @@ -78,7 +78,8 @@ public enum JobType { VALIDATE_ALL_FEEDS, MONITOR_SERVER_STATUS, MERGE_FEED_VERSIONS, - RECREATE_BUILD_IMAGE + RECREATE_BUILD_IMAGE, + UPDATE_PELIAS } public MonitorableJob(Auth0UserProfile owner, String name, JobType type) { diff --git a/src/main/java/com/conveyal/datatools/common/utils/SparkUtils.java b/src/main/java/com/conveyal/datatools/common/utils/SparkUtils.java index 9dd8622d4..d93681227 100644 --- a/src/main/java/com/conveyal/datatools/common/utils/SparkUtils.java +++ b/src/main/java/com/conveyal/datatools/common/utils/SparkUtils.java @@ -1,5 +1,8 @@ package com.conveyal.datatools.common.utils; +import com.amazonaws.AmazonServiceException; +import com.conveyal.datatools.common.utils.aws.CheckedAWSException; +import com.conveyal.datatools.common.utils.aws.S3Utils; import com.conveyal.datatools.manager.auth.Auth0UserProfile; import com.conveyal.datatools.manager.utils.ErrorUtils; import com.fasterxml.jackson.core.JsonProcessingException; @@ -7,6 +10,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.io.ByteStreams; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.eclipse.jetty.http.HttpStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,14 +19,18 @@ import spark.Request; import spark.Response; +import javax.servlet.MultipartConfigElement; +import javax.servlet.ServletException; import javax.servlet.ServletInputStream; import javax.servlet.ServletOutputStream; import javax.servlet.ServletRequestWrapper; import javax.servlet.http.HttpServletResponse; +import javax.servlet.http.Part; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.util.Arrays; import static com.conveyal.datatools.manager.DataManager.getConfigPropertyAsText; @@ -75,7 +84,10 @@ public static String formatJSON (String key, String value) { * supplied details about the exception encountered. */ public static ObjectNode getObjectNode(String message, int code, Exception e) { - String detail = e != null ? e.getMessage() : null; + String detail = null; + if (e != null) { + detail = e.getMessage() != null ? e.getMessage() : ExceptionUtils.getStackTrace(e); + } return mapper.createObjectNode() .put("result", code >= 400 ? "ERR" : "OK") .put("message", message) @@ -265,6 +277,50 @@ public static void copyRequestStreamIntoFile(Request req, File file) { } } + /** + * Copies a multi-part file upload to disk, attempts to upload it to S3, then deletes the local file. + * @param req Request object containing file to upload + * @param uploadType A string to include in the uploaded filename. Will also be added to the temporary file + * which makes debugging easier should the upload fail. + * @param key The S3 key to upload the file to + * @return An HTTP S3 url containing the uploaded file + */ + public static String uploadMultipartRequestBodyToS3(Request req, String uploadType, String key) { + // Get file from request + if (req.raw().getAttribute("org.eclipse.jetty.multipartConfig") == null) { + MultipartConfigElement multipartConfigElement = new MultipartConfigElement(System.getProperty("java.io.tmpdir")); + req.raw().setAttribute("org.eclipse.jetty.multipartConfig", multipartConfigElement); + } + String extension = null; + File tempFile = null; + String uploadedFileName = null; + try { + Part part = req.raw().getPart("file"); + uploadedFileName = part.getSubmittedFileName(); + + extension = "." + part.getContentType().split("/", 0)[1]; + tempFile = File.createTempFile(part.getName() + "_" + uploadType, extension); + InputStream inputStream; + inputStream = part.getInputStream(); + FileOutputStream out = new FileOutputStream(tempFile); + IOUtils.copy(inputStream, out); + } catch (IOException | ServletException e) { + e.printStackTrace(); + logMessageAndHalt(req, 400, "Unable to read uploaded file"); + } + try { + return S3Utils.uploadObject(uploadType + "/" + key + "_" + uploadedFileName, tempFile); + } catch (AmazonServiceException | CheckedAWSException e) { + logMessageAndHalt(req, 500, "Error uploading file to S3", e); + return null; + } finally { + boolean deleted = tempFile.delete(); + if (!deleted) { + LOG.error("Could not delete s3 temporary upload file"); + } + } + } + private static String trimLines(String str) { if (str == null) return ""; String[] lines = str.split("\n"); diff --git a/src/main/java/com/conveyal/datatools/common/utils/aws/S3Utils.java b/src/main/java/com/conveyal/datatools/common/utils/aws/S3Utils.java index a5ee96bf9..078781132 100644 --- a/src/main/java/com/conveyal/datatools/common/utils/aws/S3Utils.java +++ b/src/main/java/com/conveyal/datatools/common/utils/aws/S3Utils.java @@ -7,7 +7,9 @@ import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest; +import com.amazonaws.services.s3.model.PutObjectRequest; import com.conveyal.datatools.common.utils.SparkUtils; import com.conveyal.datatools.manager.DataManager; import com.conveyal.datatools.manager.models.OtpServer; @@ -192,6 +194,22 @@ public static String downloadObject( } } + /** + * Uploads a file to S3 using a given key + * @param keyName The s3 key to uplaod the file to + * @param fileToUpload The file to upload to S3 + * @return A URL where the file is publicly accessible + */ + public static String uploadObject(String keyName, File fileToUpload) throws AmazonServiceException, CheckedAWSException { + String url = S3Utils.getDefaultBucketUrlForKey(keyName); + // FIXME: This may need to change during feed store refactor + getDefaultS3Client().putObject(new PutObjectRequest( + S3Utils.DEFAULT_BUCKET, keyName, fileToUpload) + // grant public read + .withCannedAcl(CannedAccessControlList.PublicRead)); + return url; + } + public static AmazonS3 getDefaultS3Client() throws CheckedAWSException { return getS3Client (null, null); } diff --git a/src/main/java/com/conveyal/datatools/editor/controllers/api/EditorController.java b/src/main/java/com/conveyal/datatools/editor/controllers/api/EditorController.java index 7400fbeaf..ea73c0cdf 100644 --- a/src/main/java/com/conveyal/datatools/editor/controllers/api/EditorController.java +++ b/src/main/java/com/conveyal/datatools/editor/controllers/api/EditorController.java @@ -1,11 +1,6 @@ package com.conveyal.datatools.editor.controllers.api; -import com.amazonaws.AmazonServiceException; -import com.amazonaws.services.s3.model.CannedAccessControlList; -import com.amazonaws.services.s3.model.PutObjectRequest; import com.conveyal.datatools.common.utils.SparkUtils; -import com.conveyal.datatools.common.utils.aws.CheckedAWSException; -import com.conveyal.datatools.common.utils.aws.S3Utils; import com.conveyal.datatools.editor.controllers.EditorLockController; import com.conveyal.datatools.manager.auth.Auth0UserProfile; import com.conveyal.datatools.manager.models.FeedSource; @@ -23,7 +18,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.commons.dbutils.DbUtils; -import org.apache.commons.io.IOUtils; import org.eclipse.jetty.http.HttpStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,14 +25,8 @@ import spark.Request; import spark.Response; -import javax.servlet.MultipartConfigElement; -import javax.servlet.ServletException; -import javax.servlet.http.Part; import javax.sql.DataSource; -import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -108,49 +96,6 @@ public abstract class EditorController { registerRoutes(); } - public static String uploadBranding(Request req, String key) { - String url; - - // Get file from request - if (req.raw().getAttribute("org.eclipse.jetty.multipartConfig") == null) { - MultipartConfigElement multipartConfigElement = new MultipartConfigElement(System.getProperty("java.io.tmpdir")); - req.raw().setAttribute("org.eclipse.jetty.multipartConfig", multipartConfigElement); - } - String extension = null; - File tempFile = null; - try { - Part part = req.raw().getPart("file"); - extension = "." + part.getContentType().split("/", 0)[1]; - tempFile = File.createTempFile(key + "_branding", extension); - InputStream inputStream; - inputStream = part.getInputStream(); - FileOutputStream out = new FileOutputStream(tempFile); - IOUtils.copy(inputStream, out); - } catch (IOException | ServletException e) { - e.printStackTrace(); - logMessageAndHalt(req, 400, "Unable to read uploaded file"); - } - - try { - String keyName = "branding/" + key + extension; - url = S3Utils.getDefaultBucketUrlForKey(keyName); - // FIXME: This may need to change during feed store refactor - S3Utils.getDefaultS3Client().putObject(new PutObjectRequest( - S3Utils.DEFAULT_BUCKET, keyName, tempFile) - // grant public read - .withCannedAcl(CannedAccessControlList.PublicRead)); - return url; - } catch (AmazonServiceException | CheckedAWSException e) { - logMessageAndHalt(req, 500, "Error uploading file to S3", e); - return null; - } finally { - boolean deleted = tempFile.delete(); - if (!deleted) { - LOG.error("Could not delete s3 upload file."); - } - } - } - /** * Add static HTTP endpoints to Spark static instance. */ @@ -399,7 +344,7 @@ private String uploadEntityBranding (Request req, Response res) { int id = getIdFromRequest(req); String url; try { - url = uploadBranding(req, String.format("%s_%d", classToLowercase, id)); + url = SparkUtils.uploadMultipartRequestBodyToS3(req, "branding", String.format("%s_%d", classToLowercase, id)); } catch (HaltException e) { // Do not re-catch halts thrown for exceptions that have already been caught. throw e; diff --git a/src/main/java/com/conveyal/datatools/manager/controllers/api/DeploymentController.java b/src/main/java/com/conveyal/datatools/manager/controllers/api/DeploymentController.java index 7184cc04c..7ef59d772 100644 --- a/src/main/java/com/conveyal/datatools/manager/controllers/api/DeploymentController.java +++ b/src/main/java/com/conveyal/datatools/manager/controllers/api/DeploymentController.java @@ -1,9 +1,11 @@ package com.conveyal.datatools.manager.controllers.api; +import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.AmazonS3URI; +import com.amazonaws.services.s3.model.DeleteObjectRequest; import com.conveyal.datatools.common.status.MonitorableJob; -import com.conveyal.datatools.common.utils.aws.CheckedAWSException; import com.conveyal.datatools.common.utils.SparkUtils; +import com.conveyal.datatools.common.utils.aws.CheckedAWSException; import com.conveyal.datatools.common.utils.aws.EC2Utils; import com.conveyal.datatools.common.utils.aws.S3Utils; import com.conveyal.datatools.manager.auth.Auth0UserProfile; @@ -32,10 +34,12 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.UUID; import java.util.stream.Collectors; import static com.conveyal.datatools.common.utils.SparkUtils.logMessageAndHalt; import static com.conveyal.datatools.manager.DataManager.isExtensionEnabled; +import static com.conveyal.datatools.manager.jobs.DeployJob.bundlePrefix; import static com.mongodb.client.model.Filters.and; import static com.mongodb.client.model.Filters.eq; import static spark.Spark.delete; @@ -311,6 +315,12 @@ private static Deployment updateDeployment (Request req, Response res) { List versionIds = versionsToInsert.stream().map(v -> v.id).collect(Collectors.toList()); Persistence.deployments.updateField(deploymentToUpdate.id, "feedVersionIds", versionIds); } + + // If updatedDocument has deleted a CSV file, also delete that CSV file from S3 + if (updateDocument.containsKey("peliasCsvFiles")) { + List csvUrls = (List) updateDocument.get("peliasCsvFiles"); + removeDeletedCsvFiles(csvUrls, deploymentToUpdate, req); + } Deployment updatedDeployment = Persistence.deployments.update(deploymentToUpdate.id, req.body()); // TODO: Should updates to the deployment's fields trigger a notification to subscribers? This could get // very noisy. @@ -337,6 +347,29 @@ private static Deployment updateDeployment (Request req, Response res) { // // } + /** + * Helper method for update steps which removes all removed csv files from s3. + * @param csvUrls The new list of csv files + * @param deploymentToUpdate An existing deployment, which contains csv files to check changes against + * @param req A request object used to report failure + */ + private static void removeDeletedCsvFiles(List csvUrls, Deployment deploymentToUpdate, Request req) { + // Only delete if the array differs + if (deploymentToUpdate.peliasCsvFiles != null && !csvUrls.equals(deploymentToUpdate.peliasCsvFiles)) { + for (String existingCsvUrl : deploymentToUpdate.peliasCsvFiles) { + // Only delete if the file does not exist in the deployment + if (!csvUrls.contains(existingCsvUrl)) { + try { + AmazonS3URI s3URIToDelete = new AmazonS3URI(existingCsvUrl); + S3Utils.getDefaultS3Client().deleteObject(new DeleteObjectRequest(s3URIToDelete.getBucket(), s3URIToDelete.getKey())); + } catch(Exception e) { + logMessageAndHalt(req, 500, "Failed to delete file from S3.", e); + } + } + } + } + } + /** * HTTP endpoint to deregister and terminate a set of instance IDs that are associated with a particular deployment. * The intent here is to give the user a device by which they can terminate an EC2 instance that has started up, but @@ -452,6 +485,48 @@ private static String deploy (Request req, Response res) { return SparkUtils.formatJobMessage(job.jobId, "Deployment initiating."); } + /** + * Uploads a file from Spark request object to the s3 bucket of the deployment the Pelias Update Job is associated with. + * Follows https://github.com/ibi-group/datatools-server/blob/dev/src/main/java/com/conveyal/datatools/editor/controllers/api/EditorController.java#L111 + * @return S3 URL the file has been uploaded to + */ + private static Deployment uploadToS3 (Request req, Response res) { + // Check parameters supplied in request for validity. + Deployment deployment = getDeploymentWithPermissions(req, res); + + String url; + try { + + String keyName = String.join( + "/", + bundlePrefix, + deployment.projectId, + deployment.id, + // Where filenames are generated. Prepend random UUID to prevent overwriting + UUID.randomUUID().toString() + ); + url = SparkUtils.uploadMultipartRequestBodyToS3(req, "csvUpload", keyName); + + // Update deployment csvs + List updatedCsvList = new ArrayList<>(deployment.peliasCsvFiles); + updatedCsvList.add(url); + + // If this is set, a file is being replaced + String s3FileToRemove = req.raw().getHeader("urlToDelete"); + if (s3FileToRemove != null) { + updatedCsvList.remove(s3FileToRemove); + } + + // Persist changes after removing deleted csv files from s3 + removeDeletedCsvFiles(updatedCsvList, deployment, req); + return Persistence.deployments.updateField(deployment.id, "peliasCsvFiles", updatedCsvList); + + } catch (Exception e) { + logMessageAndHalt(req, 500, "Failed to upload file to S3.", e); + return null; + } + } + public static void register (String apiPrefix) { // Construct JSON managers which help serialize the response. Slim JSON is the generic JSON view. Full JSON // contains additional fields (at the moment just #ec2Instances) and should only be used when the controller @@ -477,5 +552,7 @@ public static void register (String apiPrefix) { post(apiPrefix + "secure/deployments", DeploymentController::createDeployment, fullJson::write); put(apiPrefix + "secure/deployments/:id", DeploymentController::updateDeployment, fullJson::write); post(apiPrefix + "secure/deployments/fromfeedsource/:id", DeploymentController::createDeploymentFromFeedSource, fullJson::write); + post(apiPrefix + "secure/deployments/:id/upload", DeploymentController::uploadToS3, fullJson::write); + } } diff --git a/src/main/java/com/conveyal/datatools/manager/controllers/api/FeedSourceController.java b/src/main/java/com/conveyal/datatools/manager/controllers/api/FeedSourceController.java index faba03200..c8420cbb6 100644 --- a/src/main/java/com/conveyal/datatools/manager/controllers/api/FeedSourceController.java +++ b/src/main/java/com/conveyal/datatools/manager/controllers/api/FeedSourceController.java @@ -2,8 +2,8 @@ import com.conveyal.datatools.common.utils.Scheduler; import com.conveyal.datatools.manager.DataManager; -import com.conveyal.datatools.manager.auth.Auth0UserProfile; import com.conveyal.datatools.manager.auth.Actions; +import com.conveyal.datatools.manager.auth.Auth0UserProfile; import com.conveyal.datatools.manager.extensions.ExternalFeedResource; import com.conveyal.datatools.manager.jobs.FetchSingleFeedJob; import com.conveyal.datatools.manager.jobs.NotifyUsersForSubscriptionJob; @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.StringUtils; -import org.bson.conversions.Bson; import org.eclipse.jetty.http.HttpStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,8 +43,6 @@ import static com.conveyal.datatools.manager.models.ExternalFeedSourceProperty.constructId; import static com.conveyal.datatools.manager.models.transform.NormalizeFieldTransformation.getInvalidSubstitutionMessage; import static com.conveyal.datatools.manager.models.transform.NormalizeFieldTransformation.getInvalidSubstitutionPatterns; -import static com.mongodb.client.model.Filters.and; -import static com.mongodb.client.model.Filters.eq; import static com.mongodb.client.model.Filters.in; import static spark.Spark.delete; import static spark.Spark.get; diff --git a/src/main/java/com/conveyal/datatools/manager/jobs/DeployJob.java b/src/main/java/com/conveyal/datatools/manager/jobs/DeployJob.java index bfc7f6259..ddcb2c09b 100644 --- a/src/main/java/com/conveyal/datatools/manager/jobs/DeployJob.java +++ b/src/main/java/com/conveyal/datatools/manager/jobs/DeployJob.java @@ -12,43 +12,14 @@ import com.amazonaws.services.ec2.model.InstanceType; import com.amazonaws.services.ec2.model.RunInstancesRequest; import com.amazonaws.services.ec2.model.Tag; -import com.amazonaws.services.elasticloadbalancingv2.AmazonElasticLoadBalancing; import com.amazonaws.services.ec2.model.TerminateInstancesResult; +import com.amazonaws.services.elasticloadbalancingv2.AmazonElasticLoadBalancing; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3URI; import com.amazonaws.services.s3.model.CopyObjectRequest; import com.amazonaws.services.s3.transfer.TransferManager; import com.amazonaws.services.s3.transfer.TransferManagerBuilder; import com.amazonaws.services.s3.transfer.Upload; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.Serializable; -import java.net.HttpURLConnection; -import java.net.MalformedURLException; -import java.net.URL; -import java.nio.channels.Channels; -import java.nio.channels.FileChannel; -import java.nio.channels.WritableByteChannel; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Date; -import java.util.HashSet; -import java.util.List; -import java.util.Scanner; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - import com.amazonaws.waiters.Waiter; import com.amazonaws.waiters.WaiterParameters; import com.conveyal.datatools.common.status.MonitorableJob; @@ -81,6 +52,34 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.channels.WritableByteChannel; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Scanner; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + import static com.conveyal.datatools.manager.models.Deployment.DEFAULT_OTP_VERSION; /** @@ -91,7 +90,7 @@ public class DeployJob extends MonitorableJob { private static final Logger LOG = LoggerFactory.getLogger(DeployJob.class); - private static final String bundlePrefix = "bundles"; + public static final String bundlePrefix = "bundles"; // Indicates whether EC2 instances should be EBS optimized. private static final boolean EBS_OPTIMIZED = "true".equals(DataManager.getConfigPropertyAsText("modules.deployment.ec2.ebs_optimized")); // Indicates the node.js version installed by nvm to set the PATH variable to point to @@ -341,6 +340,17 @@ public void jobLogic () { // Set baseUrl after success. status.baseUrl = otpServer.publicUrl; } + + // Now that the main instance is updated successfully, update Pelias + if (deployment.peliasUpdate) { + // Get log upload URI from deploy job + AmazonS3URI logUploadS3URI = getS3FolderURI(); + + // Execute the pelias update job and keep track of it + PeliasUpdateJob peliasUpdateJob = new PeliasUpdateJob(owner, "Updating Custom Geocoder Database", deployment, logUploadS3URI); + this.addNextJob(peliasUpdateJob); + } + status.completed = true; } diff --git a/src/main/java/com/conveyal/datatools/manager/jobs/PeliasUpdateJob.java b/src/main/java/com/conveyal/datatools/manager/jobs/PeliasUpdateJob.java new file mode 100644 index 000000000..61e3d85bf --- /dev/null +++ b/src/main/java/com/conveyal/datatools/manager/jobs/PeliasUpdateJob.java @@ -0,0 +1,232 @@ +package com.conveyal.datatools.manager.jobs; + +import com.amazonaws.services.s3.AmazonS3URI; +import com.conveyal.datatools.common.status.MonitorableJob; +import com.conveyal.datatools.common.utils.aws.S3Utils; +import com.conveyal.datatools.manager.auth.Auth0UserProfile; +import com.conveyal.datatools.manager.models.Deployment; +import com.conveyal.datatools.manager.models.FeedVersion; +import com.conveyal.datatools.manager.persistence.Persistence; +import com.conveyal.datatools.manager.utils.HttpUtils; +import com.conveyal.datatools.manager.utils.SimpleHttpResponse; +import com.conveyal.datatools.manager.utils.json.JsonUtil; +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.http.Header; +import org.apache.http.message.BasicHeader; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; +import java.util.stream.Collectors; + +import static com.mongodb.client.model.Filters.in; + +public class PeliasUpdateJob extends MonitorableJob { + /** + * The deployment to send to Pelias + */ + private final Deployment deployment; + + /** + * The workerId our request has on the webhook server. Used to get status updates + */ + private String workerId; + + + /** + * Timer used to poll the status endpoint + */ + private final Timer timer; + + /** + * The number of webhook status requests allowed to fail before considering the server down + */ + private int webhookStatusFailuresAllowed = 3; + + /** + * S3 URI to upload logs to + */ + private final AmazonS3URI logUploadS3URI; + + public PeliasUpdateJob(Auth0UserProfile owner, String name, Deployment deployment, AmazonS3URI logUploadS3URI) { + super(owner, name, JobType.UPDATE_PELIAS); + this.deployment = deployment; + this.timer = new Timer(); + this.logUploadS3URI = logUploadS3URI; + } + + /** + * This method must be overridden by subclasses to perform the core steps of the job. + */ + @Override + public void jobLogic() throws Exception { + status.message = "Launching custom geocoder update request"; + workerId = this.makeWebhookRequest(); + status.percentComplete = 1.0; + + // Give server 1 second to create worker + Thread.sleep(1000); + // Check status every 2 seconds + timer.schedule(new StatusChecker(), 0, 2000); + + } + + private void getWebhookStatus() { + URI url = getWebhookURI(deployment.peliasWebhookUrl + "/status/" + workerId); + + // Convert raw body to JSON + PeliasWebhookStatusMessage statusResponse; + + try { + SimpleHttpResponse response = HttpUtils.httpRequestRawResponse(url, 1000, HttpUtils.REQUEST_METHOD.GET, null); + // Convert raw body to PeliasWebhookStatusMessage + String jsonResponse = response.body; + statusResponse = JsonUtil.objectMapper.readValue(jsonResponse, PeliasWebhookStatusMessage.class); + } catch (Exception ex) { + // Allow a set number of failed requests before showing the user failure message + if (--webhookStatusFailuresAllowed == 0) { + status.fail("Webhook status did not provide a valid response!", ex); + timer.cancel(); + } + return; + } + + if (!statusResponse.error.equals("false")) { + status.fail(statusResponse.error); + timer.cancel(); + return; + } + + if (statusResponse.completed) { + status.completeSuccessfully(statusResponse.message); + timer.cancel(); + return; + } + + status.message = statusResponse.message; + status.percentComplete = statusResponse.percentComplete; + status.completed = false; + } + + /** + * Make a request to Pelias update webhook + * + * @return The workerID of the run created on the Pelias server + */ + private String makeWebhookRequest() { + URI url = getWebhookURI(deployment.peliasWebhookUrl); + + // Convert from feedVersionIds to Pelias Config objects + List gtfsFeeds = Persistence.feedVersions.getFiltered(in("_id", deployment.feedVersionIds)) + .stream() + .map(PeliasWebhookGTFSFeedFormat::new) + .collect(Collectors.toList()); + + PeliasWebhookRequestBody peliasWebhookRequestBody = new PeliasWebhookRequestBody(); + peliasWebhookRequestBody.gtfsFeeds = gtfsFeeds; + peliasWebhookRequestBody.csvFiles = deployment.peliasCsvFiles; + peliasWebhookRequestBody.logUploadUrl = logUploadS3URI.toString(); + peliasWebhookRequestBody.deploymentId = deployment.id; + + String query = JsonUtil.toJson(peliasWebhookRequestBody); + + // Create headers needed for Pelias webhook + List
headers = new ArrayList<>(); + headers.add(new BasicHeader("Accept", "application/json")); + headers.add(new BasicHeader("Content-type", "application/json")); + + // Get webhook response + SimpleHttpResponse response = HttpUtils.httpRequestRawResponse(url, 5000, HttpUtils.REQUEST_METHOD.POST, query, headers); + + // Convert raw body to JSON + String jsonResponse; + try { + jsonResponse = response.body; + } + catch (NullPointerException ex) { + status.fail("Webhook server specified did not provide a response!", ex); + return null; + } + + // Parse JSON + JsonNode webhookResponse; + try { + webhookResponse = JsonUtil.objectMapper.readTree(jsonResponse); + } catch (IOException ex) { + status.fail("The Webhook server's response was invalid! Is the server URL correct?", ex); + return null; + } + + if (webhookResponse.get("error") != null) { + status.fail("Server returned an error: " + webhookResponse.get("error").asText()); + return null; + } + + return webhookResponse.get("workerId").asText(); + } + + /** + * Helper function to convert Deployment webhook URL to URI object + * @param webhookUrlString String containing URL to parse + * @return URI object with webhook URL + */ + private URI getWebhookURI(String webhookUrlString) { + URI url; + try { + url = new URI(webhookUrlString); + } catch (URISyntaxException ex) { + status.fail("Webhook URL was not a valid URL", ex); + return null; + } + + return url; + } + + /** + * Class used to execute the status update + */ + class StatusChecker extends TimerTask { + public void run() { + getWebhookStatus(); + } + } + + /** + * The request body required by the Pelias webhook + */ + private static class PeliasWebhookRequestBody { + public List gtfsFeeds; + public List csvFiles; + public String logUploadUrl; + public String deploymentId; + } + + /** + * The GTFS feed info format the Pelias webhook requires + */ + private static class PeliasWebhookGTFSFeedFormat { + public String uri; + public String name; + public String filename; + + public PeliasWebhookGTFSFeedFormat(FeedVersion feedVersion) { + uri = S3Utils.getS3FeedUri(feedVersion.id); + name = Persistence.feedSources.getById(feedVersion.feedSourceId).name; + filename = feedVersion.id; + } + } + + /** + * The status object returned by the webhook/status endpoint + */ + public static class PeliasWebhookStatusMessage { + public Boolean completed; + public String error; + public String message; + public Double percentComplete; + } +} \ No newline at end of file diff --git a/src/main/java/com/conveyal/datatools/manager/models/Deployment.java b/src/main/java/com/conveyal/datatools/manager/models/Deployment.java index 81e2bcb87..8541d5ddc 100644 --- a/src/main/java/com/conveyal/datatools/manager/models/Deployment.java +++ b/src/main/java/com/conveyal/datatools/manager/models/Deployment.java @@ -73,6 +73,11 @@ public class Deployment extends Model implements Serializable { private ObjectMapper otpConfigMapper = new ObjectMapper().setSerializationInclusion(Include.NON_NULL); + /* Pelias fields, used to determine where/if to send data to the Pelias webhook */ + public String peliasWebhookUrl; + public boolean peliasUpdate; + public List peliasCsvFiles = new ArrayList<>(); + /** * Get parent project for deployment. Note: at one point this was a JSON property of this class, but severe * performance issues prevent this field from scaling to be fetched/assigned to a large collection of deployments. diff --git a/src/main/java/com/conveyal/datatools/manager/utils/HttpUtils.java b/src/main/java/com/conveyal/datatools/manager/utils/HttpUtils.java index 7e4dac8a9..fdd6ed0d5 100644 --- a/src/main/java/com/conveyal/datatools/manager/utils/HttpUtils.java +++ b/src/main/java/com/conveyal/datatools/manager/utils/HttpUtils.java @@ -1,6 +1,6 @@ package com.conveyal.datatools.manager.utils; -import org.apache.http.HttpResponse; +import org.apache.http.Header; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpDelete; @@ -10,7 +10,6 @@ import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,6 +17,8 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URI; +import java.util.ArrayList; +import java.util.List; public class HttpUtils { private static final Logger LOG = LoggerFactory.getLogger(HttpUtils.class); @@ -28,14 +29,22 @@ public enum REQUEST_METHOD {GET, POST, DELETE, PUT} * @return a {@link SimpleHttpResponse} that consumes and closes the entity (verifying that the HTTP connection is * closed) */ - //TODO: Replace with java.net.http once migrated to JDK 11. See HttpUtils under otp-middleware. + public static SimpleHttpResponse httpRequestRawResponse( + URI uri, + int connectionTimeout, + REQUEST_METHOD method, + String bodyContent + ) { + return httpRequestRawResponse(uri, connectionTimeout, method, bodyContent, new ArrayList<>()); + } + public static SimpleHttpResponse httpRequestRawResponse( URI uri, int connectionTimeout, REQUEST_METHOD method, - String bodyContent + String bodyContent, + List
headers ) { - RequestConfig timeoutConfig = RequestConfig.custom() .setConnectionRequestTimeout(connectionTimeout) .setConnectTimeout(connectionTimeout) @@ -48,6 +57,9 @@ public static SimpleHttpResponse httpRequestRawResponse( switch (method) { case GET: HttpGet getRequest = new HttpGet(uri); + for (Header header : headers) { + getRequest.setHeader(header); + } getRequest.setConfig(timeoutConfig); httpUriRequest = getRequest; break; @@ -55,6 +67,10 @@ public static SimpleHttpResponse httpRequestRawResponse( try { HttpPost postRequest = new HttpPost(uri); if (bodyContent != null) postRequest.setEntity(new StringEntity(bodyContent)); + for (Header header : headers) { + postRequest.setHeader(header); + } + postRequest.setConfig(timeoutConfig); httpUriRequest = postRequest; } catch (UnsupportedEncodingException e) { @@ -65,6 +81,9 @@ public static SimpleHttpResponse httpRequestRawResponse( case PUT: try { HttpPut putRequest = new HttpPut(uri); + for (Header header : headers) { + putRequest.setHeader(header); + } putRequest.setEntity(new StringEntity(bodyContent)); putRequest.setConfig(timeoutConfig); httpUriRequest = putRequest; @@ -75,6 +94,9 @@ public static SimpleHttpResponse httpRequestRawResponse( break; case DELETE: HttpDelete deleteRequest = new HttpDelete(uri); + for (Header header : headers) { + deleteRequest.setHeader(header); + } deleteRequest.setConfig(timeoutConfig); httpUriRequest = deleteRequest; break;