diff --git a/sdk/basic_offload.py b/sdk/basic_offload.py index d3cb9de..0fd8c45 100644 --- a/sdk/basic_offload.py +++ b/sdk/basic_offload.py @@ -33,6 +33,8 @@ async def main(): "cpu": 1, "memory": 256 }, + use_gpu=True, + gpu_count=1 ) await meca_api.join() diff --git a/sdk/py/meca_api.py b/sdk/py/meca_api.py index 2c3e364..d95420d 100644 --- a/sdk/py/meca_api.py +++ b/sdk/py/meca_api.py @@ -50,13 +50,17 @@ async def offload_task( data: str, callback: Callable[[str], None], resource: dict = None, - runtime: str = None + runtime: str = None, + use_gpu: bool = False, + gpu_count: int = 0, ) -> str: print('Offloading task...', container_ref, data) payload = { 'task_id': task_id, 'container_reference': container_ref, - 'content': data + 'content': data, + 'use_gpu': use_gpu, + 'gpu_count': gpu_count, } if resource: payload['resource'] = resource diff --git a/src/worker_renderer/executor_api.js b/src/worker_renderer/executor_api.js index 8c12037..9fca8f4 100644 --- a/src/worker_renderer/executor_api.js +++ b/src/worker_renderer/executor_api.js @@ -1,7 +1,14 @@ const TASK_EXECUTOR_URL = process.env.TASK_EXECUTOR_URL || 'http://localhost:2591'; -async function postTaskExecution(containerRef, input, resource, runtime) { +async function postTaskExecution( + containerRef, + input, + resource, + runtime, + useGpu, + gpuCount +) { const requestOptions = { method: 'POST', headers: { 'Content-Type': 'application/json' }, @@ -10,6 +17,8 @@ async function postTaskExecution(containerRef, input, resource, runtime) { input, resource, runtime, + useGpu, + gpuCount, }), }; const msg = await fetch(TASK_EXECUTOR_URL, requestOptions) @@ -28,4 +37,4 @@ async function postTaskExecution(containerRef, input, resource, runtime) { return msg; } -module.exports.postTaskExecution = postTaskExecution; +module.exports = { postTaskExecution }; diff --git a/src/worker_renderer/schema.proto b/src/worker_renderer/schema.proto index 73cded7..6b3e85a 100644 --- a/src/worker_renderer/schema.proto +++ b/src/worker_renderer/schema.proto @@ -11,6 +11,8 @@ message Task { string content = 3; Resource resource = 4; string runtime = 5; + bool useGpu = 6; + int32 gpuCount = 7; } message TaskResult { diff --git a/src/worker_renderer/task_consumer.js b/src/worker_renderer/task_consumer.js index e59842d..64cbcf5 100644 --- a/src/worker_renderer/task_consumer.js +++ b/src/worker_renderer/task_consumer.js @@ -3,44 +3,10 @@ import Channels from '../common/channels.js'; const log = require('electron-log/renderer'); const amqp = require('amqplib'); const { ipcRenderer } = require('electron'); -const protobuf = require('protobufjs'); -const { struct } = require('pb-util'); -const { postTaskExecution } = require('./executor_api'); +const task_processor = require('./task_processor'); const MQ_URL = process.env.MQ_URL || 'amqp://localhost:5672'; -const Task = protobuf - .loadSync('src/worker_renderer/schema.proto') - .lookupType('Task'); - -const TaskResult = protobuf - .loadSync('src/worker_renderer/schema.proto') - .lookupType('TaskResult'); - -const parseTaskFromProto = (content) => { - let task; - try { - task = Task.decode(content); - } catch (err) { - console.log(' [con] Got decode error: %s', err.toString()); - log.info(' [con] Got decode error: %s', err.toString()); - return { id: '', content: err.toString() }; - } - - const typeError = Task.verify(task); - - if (typeError) { - console.log(' [con] Got type error: %s', typeError.toString()); - let id = ''; - if ('id' in task) id = task.id; - return { id, content: typeError.toString() }; - } - console.log(` [con] Received: ${JSON.stringify(task)}`); - log.info(` [con] Received: ${JSON.stringify(task)}`); - - return task; -}; - class Consumer { static openQueues = {}; @@ -69,10 +35,9 @@ class Consumer { channel.consume(queueName, async (msg) => { channel.ack(msg); const { correlationId } = msg.properties; - const resultObject = await this.handleMsgContent(msg.content); - const serializedResult = TaskResult.encode( - TaskResult.fromObject(resultObject) - ).finish(); + const serializedResult = await task_processor.handleMsgContent( + msg.content + ); channel.sendToQueue( msg.properties.replyTo, @@ -99,50 +64,6 @@ class Consumer { delete Consumer.openQueues[queueName]; }; - - this.handleMsgContent = async function handleMsgContent(content) { - const transactionStartDatetime = Math.floor(new Date().getTime() / 1000); - - const task = parseTaskFromProto(content); - ipcRenderer.send( - Channels.JOB_RECEIVED, - task.id, - task.containerRef, - task.content - // task.resource, - // task.runtime - ); - - let result = ''; - result = await postTaskExecution( - task.containerRef, - task.content, - task.resource, - task.runtime - ); - - if (task.resource == null) { - task.resource = { "cpu": 1, "memory": 128 }; - } - console.log(` [con] Resource consumed: ${task.resource}`); - log.info(` [con] Resource consumed: ${task.resource}`); - - const transactionEndDatetime = Math.floor(new Date().getTime() / 1000); - const duration = transactionEndDatetime - transactionStartDatetime; - const reply = { - id: task.id, - content: result, - resource: task.resource, - transactionStartDatetime, - transactionEndDatetime, - duration, - }; - - console.log(` [con] Result: ${JSON.stringify(reply)}`); - log.info(` [con] Result: ${JSON.stringify(reply)}`); - - return reply; - }; } } diff --git a/src/worker_renderer/task_processor.js b/src/worker_renderer/task_processor.js new file mode 100644 index 0000000..f7e1296 --- /dev/null +++ b/src/worker_renderer/task_processor.js @@ -0,0 +1,77 @@ +const log = require('electron-log/renderer'); +const protobuf = require('protobufjs'); +const executor_api = require('./executor_api'); + +const Task = protobuf + .loadSync('src/worker_renderer/schema.proto') + .lookupType('Task'); + +const TaskResult = protobuf + .loadSync('src/worker_renderer/schema.proto') + .lookupType('TaskResult'); + +function parseTaskFromProto(content) { + let task; + try { + task = Task.decode(content); + } catch (err) { + console.log(' [con] Got decode error: %s', err.toString()); + log.info(' [con] Got decode error: %s', err.toString()); + return { id: '', content: err.toString() }; + } + + const typeError = Task.verify(task); + + if (typeError) { + console.log(' [con] Got type error: %s', typeError.toString()); + let id = ''; + if ('id' in task) id = task.id; + return { id, content: typeError.toString() }; + } + console.log(` [con] Received: ${JSON.stringify(task)}`); + log.info(` [con] Received: ${JSON.stringify(task)}`); + + return task; +} + +async function handleMsgContent(content) { + const transactionStartDatetime = Math.floor(new Date().getTime() / 1000); + + const task = parseTaskFromProto(content); + + let result = ''; + result = await executor_api.postTaskExecution( + task.containerRef, + task.content, + task.resource, + task.runtime, + task.useGpu, + task.gpuCount + ); + + if (task.resource == null) { + task.resource = { cpu: 1, memory: 128 }; + } + console.log(` [con] Resource consumed: ${task.resource}`); + log.info(` [con] Resource consumed: ${task.resource}`); + + const transactionEndDatetime = Math.floor(new Date().getTime() / 1000); + const duration = transactionEndDatetime - transactionStartDatetime; + const reply = { + id: task.id, + content: result, + resource: task.resource, + transactionStartDatetime, + transactionEndDatetime, + duration, + }; + + console.log(` [con] Result: ${JSON.stringify(reply)}`); + log.info(` [con] Result: ${JSON.stringify(reply)}`); + + return TaskResult.encode(TaskResult.fromObject(reply)).finish(); +} + +module.exports = { + handleMsgContent, +}; diff --git a/task_executor/conf/conf.yaml b/task_executor/conf/conf.yaml new file mode 100644 index 0000000..643e66b --- /dev/null +++ b/task_executor/conf/conf.yaml @@ -0,0 +1,6 @@ +# now only support docker +type: "docker" +# timeout in minutes +timeout: 10 +cpu: 8 +mem: 8192