Skip to content

Commit

Permalink
Asset manager host (#733)
Browse files Browse the repository at this point in the history
* updated code styles

* changed hostUrl for asset service and code quality changes

* bypass broken eslint rule resolves #732

* moved const declarations closer together
  • Loading branch information
jsnoble authored and kstaken committed Jun 14, 2018
1 parent 41c4c88 commit 4dd1d2d
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 53 deletions.
73 changes: 28 additions & 45 deletions lib/cluster/services/api.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
'use strict';

const _ = require('lodash');
const Router = require('express').Router;
const { Router } = require('express');
const Promise = require('bluebird');
const bodyParser = require('body-parser');
const request = require('request');
const makeTable = require('../../utils/api_utils').makeTable;
const sendError = require('../../utils/api_utils').sendError;
const handleError = require('../../utils/api_utils').handleError;
const { makeTable, sendError, handleError } = require('../../utils/api_utils');

module.exports = function module(context, app) {
const logger = context.apis.foundation.makeLogger({ module: 'api_service' });
const executionService = context.services.execution;
const jobsService = context.services.jobs;
const messaging = context.messaging;
const assetsUrl = `http://${context.sysconfig.teraslice.master_hostname}:${process.env.assets_port}`;
const { messaging } = context;
const assetsUrl = `http://127.0.0.1:${process.env.assets_port}`;
const v1routes = new Router();
let stateStore;

app.use(bodyParser.json({
Expand All @@ -33,15 +32,13 @@ module.exports = function module(context, app) {

app.set('json spaces', 4);

const v1routes = new Router();

v1routes.get('/cluster/state', (req, res) => {
res.status(200).json(executionService.getClusterState());
});

v1routes.route('/assets*')
.delete((req, res) => {
const assetId = req.params.asset_id;
const { asset_id: assetId } = req.params;
messaging.broadcast('assets:delete', { payload: assetId });
_redirect(req, res);
})
Expand Down Expand Up @@ -77,9 +74,7 @@ module.exports = function module(context, app) {
});

v1routes.get('/jobs', (req, res) => {
const from = req.query.from;
const size = req.query.size;
const sort = req.query.sort;
const { from, size, sort } = req.query;

logger.debug(`GET /jobs endpoint has been called, from: ${from}, size: ${size}, sort: ${sort}`);
const handleApiError = handleError(res, logger, 500, 'Could not retrieve list of jobs');
Expand All @@ -92,7 +87,7 @@ module.exports = function module(context, app) {
});

v1routes.get('/jobs/:job_id', (req, res) => {
const jobId = req.params.job_id;
const { job_id: jobId } = req.params;
logger.debug(`GET /jobs/:job_id endpoint has been called, job_id: ${jobId}`);
const handleApiError = handleError(res, logger, 500, 'Could not retrieve job');

Expand All @@ -102,7 +97,7 @@ module.exports = function module(context, app) {
});

v1routes.put('/jobs/:job_id', (req, res) => {
const jobId = req.params.job_id;
const { job_id: jobId } = req.params;
const jobSpec = req.body;
if (Object.keys(jobSpec).length === 0) {
sendError(res, 400, `no data was provided to update job ${jobId}`);
Expand All @@ -117,7 +112,7 @@ module.exports = function module(context, app) {
});

v1routes.get('/jobs/:job_id/ex', (req, res) => {
const jobId = req.params.job_id;
const { job_id: jobId } = req.params;
logger.debug(`GET /jobs/:job_id endpoint has been called, job_id: ${jobId}`);
const handleApiError = handleError(res, logger, 500, 'Could not retrieve list of execution contexts');

Expand All @@ -128,7 +123,7 @@ module.exports = function module(context, app) {
});

v1routes.post('/jobs/:job_id/_start', (req, res) => {
const jobId = req.params.job_id;
const { job_id: jobId } = req.params;
if (!jobId) {
sendError(res, 400, 'no job_id was posted');
return;
Expand All @@ -142,8 +137,7 @@ module.exports = function module(context, app) {
});

v1routes.post('/jobs/:job_id/_stop', (req, res) => {
const jobId = req.params.job_id;
const timeout = req.query.timeout;
const { query: { timeout }, params: { job_id: jobId } } = req;
logger.debug(`POST /jobs/:job_id/_stop endpoint has been called, job_id: ${jobId}, removing any pending workers for the job`);
const handleApiError = handleError(res, logger, 500, `Could not stop execution for job: ${jobId}`);

Expand All @@ -153,7 +147,7 @@ module.exports = function module(context, app) {
});

v1routes.post('/jobs/:job_id/_pause', (req, res) => {
const jobId = req.params.job_id;
const { job_id: jobId } = req.params;
logger.debug(`POST /jobs/:job_id/_pause endpoint has been called, job_id: ${jobId}`);
const handleApiError = handleError(res, logger, 500, `Could not pause execution for job: ${jobId}`);

Expand All @@ -163,7 +157,7 @@ module.exports = function module(context, app) {
});

v1routes.post('/jobs/:job_id/_resume', (req, res) => {
const jobId = req.params.job_id;
const { job_id: jobId } = req.params;
logger.debug(`POST /jobs/:job_id/_resume endpoint has been called, job_id: ${jobId}`);
const handleApiError = handleError(res, logger, 500, `Could not resume execution for job: ${jobId}`);

Expand All @@ -173,7 +167,7 @@ module.exports = function module(context, app) {
});

v1routes.post('/jobs/:job_id/_recover', (req, res) => {
const jobId = req.params.job_id;
const { job_id: jobId } = req.params;
logger.debug(`POST /jobs/:job_id/_recover endpoint has been called, job_id: ${jobId}`);
const handleApiError = handleError(res, logger, 500, `Could not recover execution for job: ${jobId}`);

Expand All @@ -183,8 +177,7 @@ module.exports = function module(context, app) {
});

v1routes.post('/jobs/:job_id/_workers', (req, res) => {
const query = req.query;
const jobId = req.params.job_id;
const { query, params: { job_id: jobId } } = req;
logger.debug('POST /jobs/:job_id/_workers endpoint has been called, query:', query);
const handleApiError = handleError(res, logger, 500, `Could not change workers for job: ${jobId}`);

Expand All @@ -194,7 +187,7 @@ module.exports = function module(context, app) {
});

v1routes.get('/jobs/:job_id/slicer', (req, res) => {
const jobId = req.params.job_id;
const { job_id: jobId } = req.params;
logger.debug(`GET /jobs/:job_id/slicer endpoint has been called, job_id: ${jobId}`);
const handleApiError = handleError(res, logger, 500, `Could not get slicer statistics for job: ${jobId}`);

Expand All @@ -205,9 +198,7 @@ module.exports = function module(context, app) {
});

v1routes.get('/jobs/:job_id/errors/', (req, res) => {
const jobId = req.params.job_id;
const size = req.query.size ? req.query.size : 10000;
const from = req.query.from;
const { query: { size = 10000, from }, params: { job_id: jobId } } = req;
const handleApiError = handleError(res, logger, 500, `Could not get errors for job: ${jobId}`);

logger.debug(`GET /jobs/:job_id/errors endpoint has been called, job_id: ${jobId}, from: ${from}, size: ${size}`);
Expand All @@ -225,10 +216,7 @@ module.exports = function module(context, app) {
});

v1routes.get('/jobs/:job_id/errors/:ex_id', (req, res) => {
const jobId = req.params.job_id;
const exId = req.params.ex_id;
const from = req.query.from;
const size = req.query.size ? req.query.size : 10000;
const { params: { job_id: jobId, ex_id: exId }, query: { from, size = 10000 } } = req;
const handleApiError = handleError(res, logger, 500, `Could not get errors for job: ${jobId}, execution: ${exId}`);

logger.debug(`GET /jobs/:job_id/errors endpoint has been called, job_id: ${jobId}, ex_id: ${exId}, from: ${from}, size: ${size}`);
Expand All @@ -243,10 +231,7 @@ module.exports = function module(context, app) {
});

v1routes.get('/ex', (req, res) => {
const status = req.query.status;
const from = req.query.from;
const size = req.query.size;
const sort = req.query.sort;
const { status, from, size, sort } = req.query; //eslint-disable-line
const handleApiError = handleError(res, logger, 500, 'Could not retrieve list of execution contexts');

logger.debug(`GET /ex endpoint has been called, status: ${status}, from: ${from}, size: ${size}, sort: ${sort}`);
Expand All @@ -259,7 +244,7 @@ module.exports = function module(context, app) {
});

v1routes.get('/ex/:ex_id', (req, res) => {
const exId = req.params.ex_id;
const { ex_id: exId } = req.params;
logger.debug(`GET /ex/:ex_id endpoint has been called, ex_id: ${exId}`);
const handleApiError = handleError(res, logger, 500, `Could not retrieve execution context ${exId}`);

Expand All @@ -269,8 +254,7 @@ module.exports = function module(context, app) {
});

v1routes.post('/ex/:ex_id/_stop', (req, res) => {
const exId = req.params.ex_id;
const timeout = req.query.timeout;
const { params: { ex_id: exId }, query: { timeout } } = req;
logger.debug(`POST /ex/:ex_id/_stop endpoint has been called, ex_id: ${exId}, removing any pending workers for the job`);
const handleApiError = handleError(res, logger, 500, `Could not stop execution: ${exId}`);
// for lifecyle events, we need to ensure that the execution is alive first
Expand All @@ -281,7 +265,7 @@ module.exports = function module(context, app) {
});

v1routes.post('/ex/:ex_id/_pause', (req, res) => {
const exId = req.params.ex_id;
const { ex_id: exId } = req.params;
logger.debug(`POST /ex_id/:id/_pause endpoint has been called, ex_id: ${exId}`);
const handleApiError = handleError(res, logger, 500, `Could not pause execution: ${exId}`);
// for lifecyle events, we need to ensure that the execution is alive first
Expand All @@ -292,7 +276,7 @@ module.exports = function module(context, app) {
});

v1routes.post('/ex/:ex_id/_resume', (req, res) => {
const exId = req.params.ex_id;
const { ex_id: exId } = req.params;
logger.debug(`POST /ex/:id/_resume endpoint has been called, ex_id: ${exId}`);
const handleApiError = handleError(res, logger, 500, `Could not resume execution: ${exId}`);
// for lifecyle events, we need to ensure that the execution is alive first
Expand All @@ -303,8 +287,7 @@ module.exports = function module(context, app) {
});

v1routes.post('/ex/:ex_id/_workers', (req, res) => {
const exId = req.params.ex_id;
const query = req.query;
const { params: { ex_id: exId }, query } = req;
logger.debug(`POST /ex/:id/_workers endpoint has been called, ex_id: ${exId} query: ${JSON.stringify(query)}`);
const handleApiError = handleError(res, logger, 500, `Could not change workers for execution: ${exId}`);

Expand All @@ -314,7 +297,7 @@ module.exports = function module(context, app) {
});

v1routes.get('/ex/:ex_id/slicer', (req, res) => {
const exId = req.params.ex_id;
const { ex_id: exId } = req.params;
logger.debug(`GET /ex/:ex_id/slicer endpoint has been called, ex_id: ${exId}`);
const handleApiError = handleError(res, logger, 500, `Could not get statistics for execution: ${exId}`);

Expand Down Expand Up @@ -375,7 +358,7 @@ module.exports = function module(context, app) {
const handleApiError = handleError(res, logger, 500, 'Could not get all jobs');

if (req.query.size && !isNaN(req.query.size) && req.query.size >= 0) {
size = req.query.size;
({ size } = req.query);
}

jobsService.getJobs(null, size, '_updated:desc')
Expand All @@ -394,7 +377,7 @@ module.exports = function module(context, app) {
const handleApiError = handleError(res, logger, 500, 'Could not get all executions');

if (req.query.size && !isNaN(req.query.size) && req.query.size >= 0) {
size = req.query.size;
({ size } = req.query);
}
executionService.searchExecutionContexts(query, null, size, '_updated:desc')
.then((jobs) => {
Expand Down
15 changes: 7 additions & 8 deletions lib/cluster/services/assets.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
'use strict';

const Promise = require('bluebird');
const makeTable = require('../../utils/api_utils').makeTable;
const handleError = require('../../utils/api_utils').handleError;
const { makeTable, handleError } = require('../../utils/api_utils');
const _ = require('lodash');

module.exports = function (context) {
module.exports = function module(context) {
const logger = context.apis.foundation.makeLogger({ module: 'assets_service' });
const messageModule = require('./messaging');
const messaging = messageModule(context, logger);
Expand Down Expand Up @@ -102,7 +101,7 @@ module.exports = function (context) {
Promise.resolve(require('../storage/assets')(context))
.then((store) => {
assetsStore = store;
const port = process.env.port;
const { port } = process.env;
logger.info(`assets_service is listening on port ${port}`);
app.listen(port);
messaging.send({ to: 'cluster_master', message: 'assets:service:available' });
Expand All @@ -124,7 +123,7 @@ module.exports = function (context) {
];

function mapping(item) {
return function (field) {
return (field) => {
if (field === 'description') {
return item[field] ? item[field].slice(0, 30) : item[field];
}
Expand All @@ -139,18 +138,18 @@ module.exports = function (context) {
});
}

function assetsSearch (query, req, res) {
function assetsSearch(query, req, res) {
const handleApiError = handleError(res, logger, 500, 'Could not get assets');

return assetsStore.search(query, null, 10000, '_created:desc', ['_created', 'name', 'version', 'description'])
.then((results) => {
return _.map(results.hits.hits, (asset) => {
const data = results.hits.hits;
return _.map(data, (asset) => {
const record = asset._source;
record.id = asset._id;
return record;
});
})
.catch(handleApiError);

}
};

0 comments on commit 4dd1d2d

Please sign in to comment.