diff --git a/data/templates/secure_erase.py b/data/templates/secure_erase.py index be6ab75e7..ff964365e 100755 --- a/data/templates/secure_erase.py +++ b/data/templates/secure_erase.py @@ -2,6 +2,8 @@ # Copyright 2016, EMC, Inc. +# -*- coding: UTF-8 -*- + """ This script is to do Secure Erase (SE) on a compute node Four methods/tools are integrated in this scripts @@ -21,6 +23,9 @@ ARG_PARSER = argparse.ArgumentParser(description='RackHD secure-erase argument') +ARG_PARSER.add_argument("-i", action="store", default='undefined', type=str, + help="Secure erase taskId") + ARG_PARSER.add_argument("-d", action="append", default=[], type=str, help="Disks to be erased with arguments") @@ -58,6 +63,147 @@ "==========================\n" +class ProgressParser: + """ + Secure erase job progress parser. + """ + + def __init__(self, task_id, disks, se_param, se_tool): + self.task_id = task_id + self.disk_list = disks + self.interval = 10 + self.param = se_param + self.tool = se_tool + self.se_duration = 0 + + def scrub_parser(self, log): + """ + Secure erase job progress parser for scrub tool. + """ + # Scrub version 2.5.2-2 example: + # scrub /dev/sdf + # scrub: using NNSA NAP-14.1-C patterns + # scrub: please verify that device size below is correct! + # scrub: scrubbing /dev/sdf1 1995650048 bytes (~1GB) + # scrub: random |................................................| + # scrub: random |................................................| + # scrub: 0x00 |................................................| + # scrub: verify |................................................| + + # Maximum dot count for each pass + MAX_DOT_COUNT = 48 + # Scrub pass counts for different scrub methods, default pass count is 1 + pass_counts = {"nnsa": 4, "dod": 4, "gutmann": 35, "schneier":7, "pfitzner7":7, + "pfitzner33": 33, "usarmy": 3, "random2": 2, "old": 6, "fastold": 5} + pass_count = pass_counts[self.param] + count = 0 + pattern = re.compile("^scrub: \w{4,6}\s*\|\.{" + str(MAX_DOT_COUNT) + "}\|") + last_line = '' + for line in log.readlines(): + line = line.strip() + if pattern.match(line): + count += 1 + elif line.startswith("scrub: "): + last_line = line + dot_count = len(last_line.split("|")[1]) + # "|" is also counted as a dot for percentage calculation + percentage = (100.00/pass_count)*(count + (dot_count+1.0)/(MAX_DOT_COUNT+2.0)) + return percentage + + def get_hdparm_druation(self, log): + """ + Secure erase job progress parser for scrub tool. + """ + pattern = re.compile("(\d{1,4})min for SECURE ERASE UNIT. \ + (\d{1,4})min for ENHANCED SECURE ERASE UNIT.") + estimated_time = 0 + for line in log.readlines(): + match = pattern.match(line) + if match: + if self.param == "secure-erase": + estimated_time = match.group(1) + else: + estimated_time = match.group(2) + break + return estimated_time + + def hdparm_parser(self, log, percent): + """ + Secure erase job progress parser for scrub tool. + """ + if self.se_duration == 0: + self.se_duration = self.get_hdparm_druation(log) + else: + percent += self.interval/(self.se_duration*60.00) + if percent > 100: + percent = 99.5 + return percent + + def sg_format_parser(self, log): + """ + Secure erase job progress parser for scrub tool. + """ + patterna = re.compile("Format in progress, (\d{1,3}\.{\d{1,3}})\% done", re.I) + patternb = re.compile("FORMAT Completed", re.I) + lines = log.readlines() + match = patterna.match(lines[-1]) + if match: + return float(match.group(1)) + match = patternb.match(lines[-2]) + if match: + return 100 + return 0 + + def sg_sanitize_parser(self): + """ + Secure erase job progress parser for scrub tool. + """ + return 'NA' + + def send_progress_notification(self): + """ + Secure erase job progress parser for scrub tool. + """ + parser_mapper = { + "hdparm": self.hdparm_parser, + "scrub": self.scrub_parser, + "sg_format": self.sg_format_parser, + "sg_sanitize": self.sg_sanitize_parser + } + disk_count = len(self.disk_list) + percentage_list = [0.0]*disk_count + erase_start_flags = [False]*disk_count + payload = { + "taskId": self.task_id, + "progress": {"percentage": "0%", "description": "Secure erase started"} + } + poll_counter = 0 + while True: + cmd = 'curl -X POST -H "Content-Type:application/json" ' \ + '-d \'{}\'http://172.31.128.1:8080/api/1.1/notification' \ + .format(json.dumps(payload)) + subprocess.call(cmd, shell=True) + for (index, value) in enumerate(self.disk_list): + if erase_start_flags[index]: + # Check secure erase sub-progress is alive + command = 'ps aux | grep {} | grep {} | sed "/grep/d"'.format(self.tool, value) + erase_alive = subprocess.check_output(command, shell=True) + log = open(value + '.log', 'r') + percentage_list[index] = parser_mapper[self.tool](log) + if not erase_alive: + percentage_list[index] = 100 + else: + percentage_list[index] = parser_mapper[self.tool](log) + else: + erase_start_flags[index] = os.path.exists('/home/monorail/' + value + '.log') + payload["progress"]["percentage"] = sum(percentage_list)/disk_count + poll_counter += 1 + payload["progress"]["description"] = "This is the {}th polling with {}s interval" \ + .format(str(poll_counter), str(self.interval)) + if payload["progress"]["percentage"] == 100: + break + time.sleep(self.interval) + def create_jbod(disk_arg, raid_tool): """ Create JBOD for each physical disk under a virtual disk. @@ -101,7 +247,8 @@ def create_jbod(disk_arg, raid_tool): # drwxr-xr-x 2 root root 300 May 19 03:15 ./ # drwxr-xr-x 5 root root 100 May 16 04:43 ../ # lrwxrwxrwx 1 root root 9 May 19 03:06 pci-0000:06:00.0-scsi-0:2:0:0 -> ../../sdf - # lrwxrwxrwx 1 root root 10 May 19 03:06 pci-0000:06:00.0-scsi-0:2:0:0-part1 -> ../../sdf1 + # lrwxrwxrwx 1 root root 10 May 19 03:06 pci-0000:06:00.0-scsi-0:2:0:0-part1 -> + # ../../sdf1 # lrwxrwxrwx 1 root root 10 May 19 02:31 pci-0000:06:00.0-scsi-0:2:1:0 -> ../../sda disk_name = '' for line in lines: @@ -138,7 +285,8 @@ def convert_raid_to_jbod(): # Idenfity tools used for raid operation raid_controller_vendor = ARG_LIST.v - assert raid_controller_vendor in RAID_VENDOR_LIST.keys(), "RAID controller vendor info is invalid" + assert raid_controller_vendor in RAID_VENDOR_LIST.keys(), \ + "RAID controller vendor info is invalid" raid_tool = RAID_VENDOR_LIST[raid_controller_vendor] assert os.path.exists(raid_tool), "Overlay doesn't include tool path: " + raid_tool @@ -370,7 +518,8 @@ def hdparm_secure_erase(disk_name, se_option): # # except for "supported" and "enabled", other items should have "not" before them if hdparm_option == "--security-erase": - pattern_se_support = re.compile(r'[\s\S]*(?!not)[\s]*supported[\s]*[\s\S]*enabled[\s]*not[\s]' + pattern_se_support = re.compile(r'[\s\S]*(?!not)[\s]*supported' + r'[\s]*[\s\S]*enabled[\s]*not[\s]' r'*locked[\s]*not[\s]*frozen[\s]*not[\s]*expired[\s\S]*') else: pattern_se_support = re.compile(r'[\s\S]*(?!not)[\s]*supported[\s]*[\s\S]*enabled[\s]*not' @@ -441,6 +590,24 @@ def scrub_secure_erase(disk_name, se_option): command = ["scrub", "-f", "-p", scrub_option, disk_name] # -f is to force erase return secure_erase_base(disk_name, cmd=command) +def get_process_exit_status(async_result): + """ + Get subprocess exit status + :param async_result: multiprocessing Pool async result object + :return: a dict includes process exit code and exit status description + """ + process_result = {} + try: + process_exit_result = async_result.get() + except AssertionError as err: + process_result = {"exitcode": -1, "message": err} + else: + process_result["exitcode"] = process_exit_result["exit_code"] + if process_result["exit_code"] == 0: + process_result["message"] = "Secure erase completed successfully" + else: + process_result["message"] = process_exit_result["message"] + return process_result if __name__ == '__main__': TOOL_MAPPER = { @@ -451,6 +618,7 @@ def scrub_secure_erase(disk_name, se_option): } tool = ARG_LIST.t option = ARG_LIST.o + task_id = ARG_LIST.i assert tool in ["scrub", "hdparm", "sg_format", "sg_sanitize"], \ "Secure erase tool is not supported" @@ -459,13 +627,16 @@ def scrub_secure_erase(disk_name, se_option): # Get process count we should started user_count = len(disk_list) - cpu_thread_count = cpu_count() + cpu_thread_count = cpu_count() - 1 if user_count > cpu_thread_count: process_count = cpu_thread_count else: process_count = user_count pool = Pool(process_count) + #Get secure erase progress and send notification + progress_parser = ProgressParser(task_id, disk_list, option, tool) + progress_status = pool.apply_async(progress_parser.send_progress_notification, ()) # Run multiple processes for SE erase_output_list = [] for disk in disk_list: @@ -474,30 +645,23 @@ def scrub_secure_erase(disk_name, se_option): erase_output["poolExitStatus"] = result erase_output_list.append(erase_output) + progress_result = get_process_exit_status(progress_status) # Parse erase exit message # .get() is a method blocks main process erase_result_list = [] for erase_output in erase_output_list: erase_result = {"seMethod": erase_output["seMethod"], "disk": erase_output["disk"]} - try: - pool_exit_result = erase_output["poolExitStatus"].get() - except AssertionError as err: - erase_result["exitcode"] = -1 - erase_result["message"] = err - else: - erase_result["exitcode"] = pool_exit_result["exit_code"] - if pool_exit_result["exit_code"] == 0: - erase_result["message"] = "Secure erase completed successfully" - else: - erase_result["message"] = pool_exit_result["message"] + erase_result.update(get_process_exit_status(erase_output["poolExitStatus"])) erase_result_list.append(erase_result) pool.close() pool.join() - print erase_result_list + if progress_result("exitcode"): + print progress_result["Message"] + print erase_result_list for erase_result in erase_result_list: if erase_result["exitcode"]: msg = "Drive %s failed to run secure erase with tool %s, error info are: \n %s" \ diff --git a/lib/services/notification-api-service.js b/lib/services/notification-api-service.js index 453ba7dbc..877dcd8cd 100644 --- a/lib/services/notification-api-service.js +++ b/lib/services/notification-api-service.js @@ -4,13 +4,14 @@ var di = require('di'); -module.exports = notificationApiServiceFactory; -di.annotate(notificationApiServiceFactory, new di.Provide('Http.Services.Api.Notification')); -di.annotate(notificationApiServiceFactory, +module.exports = NotificationApiServiceFactory; +di.annotate(NotificationApiServiceFactory, new di.Provide('Http.Services.Api.Notification')); +di.annotate(NotificationApiServiceFactory, new di.Inject( 'Protocol.Events', 'Protocol.Task', 'TaskGraph.Store', + 'TaskGraph.TaskGraph', 'Logger', 'Services.Waterline', 'Errors', @@ -19,45 +20,46 @@ di.annotate(notificationApiServiceFactory, ) ); -function notificationApiServiceFactory( +function NotificationApiServiceFactory( eventsProtocol, taskProtocol, taskGraphStore, + TaskGraph, Logger, waterline, Errors, Promise, _ ) { - var logger = Logger.initialize(notificationApiServiceFactory); + var logger = Logger.initialize(NotificationApiServiceFactory); - function notificationApiService() { + function NotificationApiService() { } - notificationApiService.prototype.postNotification = function(message) { + NotificationApiService.prototype.postNotification = function(message) { var self = this; if (_.has(message, 'nodeId')) { return self.postNodeNotification(message); + } else if (_.has(message, 'taskId') && _.has(message, 'progress')) { + // This will be progress update notification if taskId is specified + return self.postProgressEvent(message); } - // Add other cases here if to support more notification types - - // This will be a broadcast notification if no id (like nodeId) is specified else { + // This will be a broadcast notification if no id (like nodeId) is specified return self.postBroadcastNotification(message); } }; - notificationApiService.prototype.postNodeNotification = function(message) { - var self = this; + NotificationApiService.prototype.postNodeNotification = function(message) { return Promise.try(function() { if (!message.nodeId || !_.isString(message.nodeId)) { throw new Errors.BadRequestError('Invalid node ID in query or body'); - }; + } }) .then(function () { - return waterline.nodes.needByIdentifier(message.nodeId) + return waterline.nodes.needByIdentifier(message.nodeId); }) .then(function (node) { if(!node) { @@ -70,14 +72,47 @@ function notificationApiServiceFactory( }); }; - notificationApiService.prototype.postBroadcastNotification = function(message) { - var self = this; - + NotificationApiService.prototype.postBroadcastNotification = function(message) { return eventsProtocol.publishBroadcastNotification(message) .then(function () { return message; }); }; - return new notificationApiService(); + NotificationApiService.prototype.postProgressEvent = function(data) { + var progressData; + return waterline.taskdependencies.find({taskId: data.taskId}) + .then(function(task){ + if (!_.isEmpty(task) && _.has(task[0], 'graphId')){ + return waterline.graphobjects.find({instanceId: task[0].graphId}) + .then(function(graphObject){ + //TODO: workflow progress percentage should be designed + progressData = { + graphId: graphObject[0].instanceId, + graphName: graphObject[0].definition.friendlyName, + progress: { + percentage: "na", + description: data.progress.description + }, + taskProgress: { + graphId: graphObject[0].instanceId, + taskId: data.taskId, + taskName: graphObject[0].tasks[data.taskId].friendlyName, + progress: data.progress + } + }; + }) + .then(function(){ + return TaskGraph.prototype.updateGraphProgress(progressData); + }); + } else { + logger.error('notification API fails', { + progress: data, + error: "Can't find active task for given taskId" + }); + } + }); + }; + + return new NotificationApiService(); } diff --git a/lib/services/workflow-api-service.js b/lib/services/workflow-api-service.js index 8523bff0d..ab1dc348c 100644 --- a/lib/services/workflow-api-service.js +++ b/lib/services/workflow-api-service.js @@ -92,6 +92,17 @@ function workflowApiServiceFactory( definition, configuration.options, context, configuration.domain, true); }); }) + .tap(function(graph){ + var progressData = { + graphId: graph.instanceId, + progress: { + percentage: "0%", + description: 'Graph "' + graph.definition.friendlyName + '" started' + }, + taskProgress: {} + }; + return TaskGraph.prototype.updateGraphProgress(progressData); + }) .then(function(graph) { self.runTaskGraph(graph.instanceId, configuration.domain); return graph; diff --git a/spec/lib/services/notification-api-service-spec.js b/spec/lib/services/notification-api-service-spec.js index 09de5834b..e44f8eb48 100644 --- a/spec/lib/services/notification-api-service-spec.js +++ b/spec/lib/services/notification-api-service-spec.js @@ -10,6 +10,7 @@ describe('Http.Api.Notification', function () { var needByIdentifier; var postNodeNotification; var postBroadcastNotification; + var TaskGraph; var nodeNotificationMessage = { nodeId: "57a86b5c36ec578876878294", @@ -20,22 +21,32 @@ describe('Http.Api.Notification', function () { data: 'dummy data' }; - var node = {_id: nodeNotificationMessage.nodeId} + var progressNotificationMessage = { + taskId: "57a86b5c36ec578876878294", + progress: { + description: "test", + percentage: "10%" + } + }; + + var node = {_id: nodeNotificationMessage.nodeId}; before('Setup mocks', function () { helper.setupInjector([ - helper.require("/lib/services/notification-api-service.js") + onHttpContext.prerequisiteInjectables, + helper.require("/lib/services/notification-api-service.js"), ]); notificationApiService = helper.injector.get('Http.Services.Api.Notification'); _ = helper.injector.get('_'); eventProtocol = helper.injector.get('Protocol.Events'); waterline = helper.injector.get('Services.Waterline'); + TaskGraph = helper.injector.get('TaskGraph.TaskGraph'); waterline.nodes = { needByIdentifier: function() {} }; sinon.stub(eventProtocol, 'publishNodeNotification').resolves(); sinon.stub(eventProtocol, 'publishBroadcastNotification').resolves(); - + this.sandbox = sinon.sandbox.create(); needByIdentifier = sinon.stub(waterline.nodes, 'needByIdentifier'); needByIdentifier.resolves(node); postNodeNotification = sinon.spy(notificationApiService, 'postNodeNotification'); @@ -112,5 +123,70 @@ describe('Http.Api.Notification', function () { expect(resp).to.deep.equal(broadcastNotificationMessage); }); }); + + it('should update graph progress', function () { + var tasks = [{graphId: "graphId"}], + graphs = [{ + instanceId: "graphId", + definition: {friendlyName: "Test Graph"}, + tasks: {"57a86b5c36ec578876878294": {friendlyName: "Test Task"}} + }], + progressData = { + graphId: graphs[0].instanceId, + graphName: graphs[0].definition.friendlyName, + progress: { + percentage: "na", + description: progressNotificationMessage.progress.description + }, + taskProgress: { + graphId: graphs[0].instanceId, + taskId: progressNotificationMessage.taskId, + taskName: graphs[0].tasks[progressNotificationMessage.taskId].friendlyName, + progress: progressNotificationMessage.progress + } + }; + waterline.taskdependencies = {find: function() {}}; + waterline.graphobjects = {find: function() {}}; + this.sandbox.stub(waterline.taskdependencies, 'find').resolves(tasks); + this.sandbox.stub(waterline.graphobjects, 'find').resolves(graphs); + this.sandbox.stub(TaskGraph.prototype, 'updateGraphProgress').resolves(); + return notificationApiService.postProgressEvent(progressNotificationMessage) + .then(function () { + expect(waterline.taskdependencies.find).to.be.calledOnce; + expect(waterline.taskdependencies.find).to.be.calledWith({ + taskId: progressNotificationMessage.taskId}); + expect(waterline.graphobjects.find).to.be.calledOnce; + expect(waterline.graphobjects.find).to.be.calledWith({ + instanceId: tasks[0].graphId}); + expect(TaskGraph.prototype.updateGraphProgress).to.be.calledOnce; + expect(TaskGraph.prototype.updateGraphProgress).to.be.calledWith(progressData); + }); + }); + + it('should not update graph progress', function () { + this.sandbox.restore(); + waterline.taskdependencies = {find: function() {}}; + waterline.graphobjects = {find: function() {}}; + this.sandbox.stub(waterline.taskdependencies, 'find').resolves([]); + this.sandbox.spy(waterline.graphobjects, 'find'); + this.sandbox.spy(TaskGraph.prototype, 'updateGraphProgress'); + return notificationApiService.postProgressEvent({taskId: 'aTask'}) + .then(function () { + expect(waterline.taskdependencies.find).to.be.calledOnce; + expect(waterline.graphobjects.find).to.have.not.been.called; + expect(TaskGraph.prototype.updateGraphProgress).to.have.not.been.called; + }); + }); + + it('should call postProgressEvent', function () { + sinon.stub(notificationApiService, 'postProgressEvent').resolves(); + return notificationApiService.postNotification(progressNotificationMessage) + .then(function () { + expect(notificationApiService.postProgressEvent).to.be.calledOnce; + expect(notificationApiService.postProgressEvent).to.be + .calledWith(progressNotificationMessage); + }); + }); + }); }); diff --git a/spec/lib/services/workflow-api-service-spec.js b/spec/lib/services/workflow-api-service-spec.js index fdaaad35a..c3ff9fa45 100644 --- a/spec/lib/services/workflow-api-service-spec.js +++ b/spec/lib/services/workflow-api-service-spec.js @@ -49,7 +49,10 @@ describe('Http.Services.Api.Workflows', function () { waterline.taskdefinitions = { destroy: sinon.stub().resolves({ injectableName: 'test' }) }; - graph = { instanceId: 'testgraphid' }; + graph = { + instanceId: 'testgraphid', + definition: {friendlyName: 'testGraph'} + }; task = { instanceId: 'testtaskid' }; workflow = { id: 'testid', _status: 'cancelled' }; graphDefinition = { injectableName: 'Graph.Test' }; @@ -73,6 +76,7 @@ describe('Http.Services.Api.Workflows', function () { this.sandbox.stub(workflowApiService, 'createActiveGraph'); this.sandbox.stub(workflowApiService, 'runTaskGraph'); this.sandbox.stub(env, 'get'); + this.sandbox.stub(TaskGraph.prototype, 'updateGraphProgress').resolves(); }); afterEach('Http.Services.Api.Profiles afterEach', function() { @@ -87,7 +91,14 @@ describe('Http.Services.Api.Workflows', function () { workflowApiService.findGraphDefinitionByName.resolves(graphDefinition); workflowApiService.createActiveGraph.resolves(graph); workflowApiService.runTaskGraph.resolves(); - + var data = { + graphId: graph.instanceId, + progress: { + percentage: "0%", + description: 'Graph "' + graph.definition.friendlyName + '" started' + }, + taskProgress: {} + }; return workflowApiService.createAndRunGraph({ name: 'Graph.Test', options: { test: 1 }, @@ -103,6 +114,8 @@ describe('Http.Services.Api.Workflows', function () { expect(workflowApiService.createActiveGraph).to.have.been.calledWith( graphDefinition, { test: 1 }, { test: 2 }, 'test' ); + expect(TaskGraph.prototype.updateGraphProgress).to.have.been.calledOnce; + expect(TaskGraph.prototype.updateGraphProgress).to.have.been.calledWith(data); expect(workflowApiService.runTaskGraph).to.have.been.calledOnce; expect(workflowApiService.runTaskGraph) .to.have.been.calledWith(graph.instanceId, 'test');