diff --git a/modules/demo/ssr/graph/index.js b/modules/demo/ssr/graph/index.js index 4b590c58d..1e56b0bff 100755 --- a/modules/demo/ssr/graph/index.js +++ b/modules/demo/ssr/graph/index.js @@ -19,6 +19,7 @@ const fastify = require('fastify')(); fastify // .register(require('./plugins/webrtc'), require('./plugins/graph')(fastify)) .register(require('fastify-static'), {root: require('path').join(__dirname, 'public')}) - .get('/', (req, reply) => reply.sendFile('video.html')); + .register(require('./plugins/api')) + .get('/', (req, reply) => reply.sendFile('video.html')) fastify.listen(8080).then(() => console.log('server ready')); diff --git a/modules/demo/ssr/graph/package.json b/modules/demo/ssr/graph/package.json index bbf58f11a..6bc0b3f53 100644 --- a/modules/demo/ssr/graph/package.json +++ b/modules/demo/ssr/graph/package.json @@ -21,11 +21,14 @@ "fastify-socket.io": "2.0.0", "fastify-static": "4.2.3", "fastify": "3.20.2", + "fastify-multipart": "5.0.2", + "fastify-cors": "6.0.2", "nanoid": "3.1.25", "rxjs": "6.6.7", "shm-typed-array": "0.0.13", "simple-peer": "9.11.0", - "socket.io": "4.1.3" + "socket.io": "4.1.3", + "glob": "7.2.0" }, "files": [ "render", diff --git a/modules/demo/ssr/graph/plugins/api/index.js b/modules/demo/ssr/graph/plugins/api/index.js new file mode 100644 index 000000000..955a4d96a --- /dev/null +++ b/modules/demo/ssr/graph/plugins/api/index.js @@ -0,0 +1,190 @@ +const {graphs, clients} = require('../graph'); +const fs = require('fs') +const util = require('util') +const {pipeline} = require('stream') +const pump = util.promisify(pipeline) +var glob = require('glob'); +const {Float32Buffer} = require('@rapidsai/cuda'); +const {GraphCOO} = require('@rapidsai/cugraph'); +const {DataFrame, Uint32} = require('@rapidsai/cudf'); +const {loadEdges, loadNodes} = require('../graph/loader'); + +function readDataFrame(path) { + let df = new DataFrame({}); + if (path.indexOf('.csv', path.length - 4) !== -1) { + // csv file + df = DataFrame.readCSV({sources: [path], header: 0, sourceType: 'files'}); + + } else if (path.indexOf('.parquet', path.length - 8) !== -1) { + // csv file + df = DataFrame.readParquet({sources: [path]}); + } + if (df.names.includes('Unnamed: 0')) { df = df.cast({'Unnamed: 0': new Uint32}); } + return df; +} + +async function getNodesForGraph(asDeviceMemory, nodes, numNodes) { + let nodesRes = {}; + const pos = new Float32Buffer(Array.from( + {length: numNodes * 2}, + () => Math.random() * 1000 * (Math.random() < 0.5 ? -1 : 1), + )); + + if (nodes.x in nodes.dataframe.names) { + nodesRes.nodeXPositions = asDeviceMemory(nodes.dataframe.get(node.x).data); + } else { + nodesRes.nodeXPositions = pos.subarray(0, pos.length / 2); + } + if (nodes.y in nodes.dataframe.names) { + nodesRes.nodeYPositions = asDeviceMemory(nodes.dataframe.get(node.y).data); + } else { + nodesRes.nodeYPositions = pos.subarray(pos.length / 2); + } + if (nodes.dataframe.names.includes(nodes.size)) { + nodesRes.nodeRadius = asDeviceMemory(nodes.dataframe.get(nodes.size).data); + } + if (nodes.dataframe.names.includes(nodes.color)) { + nodesRes.nodeFillColors = asDeviceMemory(nodes.dataframe.get(nodes.color).data); + } + if (nodes.dataframe.names.includes(nodes.id)) { + nodesRes.nodeElementIndices = asDeviceMemory(nodes.dataframe.get(nodes.id).data); + } + return nodesRes; +} + +async function getEdgesForGraph(asDeviceMemory, edges) { + let edgesRes = {}; + + if (edges.dataframe.names.includes(edges.color)) { + edgesRes.edgeColors = asDeviceMemory(edges.dataframe.get(edges.color).data); + } + if (edges.dataframe.names.includes(edges.id)) { + edgesRes.edgeList = asDeviceMemory(edges.dataframe.get(edges.id).data); + } + if (edges.dataframe.names.includes(edges.bundle)) { + edgesRes.edgeBundles = asDeviceMemory(edges.dataframe.get(edges.bundle).data); + } + return edgesRes; +} + +module.exports = function(fastify, opts, done) { + fastify.register(require('fastify-multipart')) + fastify.register(require('fastify-cors'), + { + // put your options here + }); + + // fastify.addHook('preValidation', (request, reply, done) => { + // console.log('this is executed', request); + // done() + // }); + + async function loadGraph(id, data) { + if (!(id in fastify[graphs])) { + const asDeviceMemory = (buf) => new (buf[Symbol.species])(buf); + const src = data.edges.dataframe.get(data.edges.src); + const dst = data.edges.dataframe.get(data.edges.dst); + const graph = new GraphCOO(src._col, dst._col, {directedEdges: true}); + fastify[graphs][id] = { + refCount: 0, + nodes: await getNodesForGraph(asDeviceMemory, data.nodes, graph.numNodes()), + edges: await getEdgesForGraph(asDeviceMemory, data.edges), + graph: graph, + }; + } + + ++fastify[graphs][id].refCount; + + return { + gravity: 0.0, + linLogMode: false, + scalingRatio: 5.0, + barnesHutTheta: 0.0, + jitterTolerance: 0.05, + strongGravityMode: false, + outboundAttraction: false, + graph: fastify[graphs][id].graph, + nodes: { + ...fastify[graphs][id].nodes, + length: fastify[graphs][id].graph.numNodes(), + }, + edges: { + ...fastify[graphs][id].edges, + length: fastify[graphs][id].graph.numEdges(), + }, + }; + } + + fastify.get('/getIDValue', async (request, reply) => { + console.log(fastify[clients][request.query.id + ':video']); + reply.send(fastify[clients][request.query.id + ':video'].graph.dataframes[0].numRows); + }); + + fastify.post('/uploadFile', async function(req, reply) { + const data = await req.file(); + + const filepath = `${__dirname}/../../data/${data.filename}`; + const target = fs.createWriteStream(filepath); + try { + await pump(data.file, target); + console.log('success'); + } catch (err) { console.log(err); } + reply.send() + }); + + fastify.get('/getFileNames', async (request, reply) => { + if (`${request.query.id}:video` in fastify[clients]) { + glob(`*.{csv,parquet}`, + {cwd: `${__dirname}/../../data/`}, + (er, files) => { reply.send(JSON.stringify(files.concat(['defaultExample']))); }); + } else { + reply.code(500).send('client handshake not established'); + } + }); + + fastify.get('/loadOnGPU', async (request, reply) => { + const id = `${request.query.id}:video`; + const filePath = `${__dirname}/../../data/` + if (id in fastify[clients]) { + if (fs.existsSync(`${filePath}${request.query.nodes}`) && + fs.existsSync(`${filePath}${request.query.edges}`)) { + fastify[clients][id].data.nodes.dataframe = + await readDataFrame(`${filePath}${request.query.nodes}`); + + fastify[clients][id].data.edges.dataframe = + await readDataFrame(`${filePath}${request.query.edges}`); + } else { + fastify[clients][id].data.nodes.dataframe = await loadNodes(); + fastify[clients][id].data.edges.dataframe = await loadEdges(); + } + reply.send('successfully loaded in GPU Memory'); + } + else { + reply.code(500).send('client handshake not established'); + } + }) + + fastify.get('/fetchDFParameters', async (request, reply) => { + const id = `${request.query.id}:video`; + if (id in fastify[clients]) { + reply.send(JSON.stringify({ + nodesParams: fastify[clients][id].data.nodes.dataframe.names.concat([null]), + edgesParams: fastify[clients][id].data.edges.dataframe.names.concat([null]) + })); + } else { + reply.code(500).send('client handshake not established'); + } + }); + + fastify.post('/updateRenderColumns', async (request, reply) => { + const id = `${request.body.id}:video`; + if (id in fastify[clients]) { + Object.assign(fastify[clients][id].data.nodes, request.body.nodes); + Object.assign(fastify[clients][id].data.edges, request.body.edges); + fastify[clients][id].graph = await loadGraph('default', fastify[clients][id].data); + } else { + reply.code(500).send('client handshake not established'); + } + }); + done(); +} diff --git a/modules/demo/ssr/graph/plugins/graph/index.js b/modules/demo/ssr/graph/plugins/graph/index.js index cba183d25..f88aa5564 100644 --- a/modules/demo/ssr/graph/plugins/graph/index.js +++ b/modules/demo/ssr/graph/plugins/graph/index.js @@ -12,14 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -const wrtc = require('wrtc'); -const {MemoryView} = require('@rapidsai/cuda'); -const {Float32Buffer} = require('@rapidsai/cuda'); -const {GraphCOO} = require('@rapidsai/cugraph'); -const {Series, Int32} = require('@rapidsai/cudf'); +const wrtc = require('wrtc'); +const {MemoryView} = require('@rapidsai/cuda'); +const {Series, Int32, DataFrame} = require('@rapidsai/cudf'); -const {loadNodes, loadEdges} = require('./loader'); -const {RenderCluster} = require('../../render/cluster'); +const {RenderCluster} = require('../../render/cluster'); const {create: shmCreate, detach: shmDetach} = require('shm-typed-array'); @@ -63,24 +60,31 @@ function graphSSRClients(fastify) { }, event: {}, props: {width, height, layout}, - graph: await loadGraph(graphId), + graph: {}, // await loadGraph(graphId), + data: { + nodes: {dataframe: new DataFrame({}), color: '', size: '', id: '', x: 'x', y: 'y'}, + edges: {dataframe: new DataFrame({}), color: '', id: '', bundle: '', src: 'src', dst: 'dst'} + }, frame: shmCreate(width * height * 3 / 2), peer: peer, }; - if (clients[stream.id].graph.dataframes[0]) { - const res = getPaginatedRows(clients[stream.id].graph.dataframes[0]); - peer.send(JSON.stringify({ - type: 'data', - data: {nodes: {data: res, length: clients[stream.id].graph.dataframes[0].numRows}} - })); - } - if (clients[stream.id].graph.dataframes[1]) { - const res = getPaginatedRows(clients[stream.id].graph.dataframes[1]); - peer.send(JSON.stringify({ - type: 'data', - data: {edges: {data: res, length: clients[stream.id].graph.dataframes[1].numRows}} - })); - } + + // if (clients[stream.id].graph !== {}) { + // if (clients[stream.id].graph.dataframes[0]) { + // const res = getPaginatedRows(clients[stream.id].graph.dataframes[0]); + // peer.send(JSON.stringify({ + // type: 'data', + // data: {nodes: {data: res, length: clients[stream.id].graph.dataframes[0].numRows}} + // })); + // } + // if (clients[stream.id].graph.dataframes[1]) { + // const res = getPaginatedRows(clients[stream.id].graph.dataframes[1]); + // peer.send(JSON.stringify({ + // type: 'data', + // data: {edges: {data: res, length: clients[stream.id].graph.dataframes[1].numRows}} + // })); + // } + // } stream.addTrack(source.createTrack()); peer.streams.push(stream); @@ -110,6 +114,7 @@ function graphSSRClients(fastify) { } case 'layout': { clients[stream.id].props.layout = JSON.parse(data); + console.log(JSON.parse(data)); break; } } @@ -126,60 +131,6 @@ function graphSSRClients(fastify) { } } } - - async function loadGraph(id) { - let dataframes = []; - - if (!(id in graphs)) { - const asDeviceMemory = (buf) => new (buf[Symbol.species])(buf); - dataframes = await Promise.all([loadNodes(id), loadEdges(id)]); - const src = dataframes[1].get('src'); - const dst = dataframes[1].get('dst'); - graphs[id] = { - refCount: 0, - nodes: { - nodeRadius: asDeviceMemory(dataframes[0].get('size').data), - nodeFillColors: asDeviceMemory(dataframes[0].get('color').data), - nodeElementIndices: asDeviceMemory(dataframes[0].get('id').data), - }, - edges: { - edgeList: asDeviceMemory(dataframes[1].get('edge').data), - edgeColors: asDeviceMemory(dataframes[1].get('color').data), - edgeBundles: asDeviceMemory(dataframes[1].get('bundle').data), - }, - graph: new GraphCOO(src._col, dst._col, {directedEdges: true}), - }; - } - - ++graphs[id].refCount; - - const pos = new Float32Buffer(Array.from( - {length: graphs[id].graph.numNodes() * 2}, - () => Math.random() * 1000 * (Math.random() < 0.5 ? -1 : 1), - )); - - return { - gravity: 0.0, - linLogMode: false, - scalingRatio: 5.0, - barnesHutTheta: 0.0, - jitterTolerance: 0.05, - strongGravityMode: false, - outboundAttraction: false, - graph: graphs[id].graph, - nodes: { - ...graphs[id].nodes, - length: graphs[id].graph.numNodes(), - nodeXPositions: pos.subarray(0, pos.length / 2), - nodeYPositions: pos.subarray(pos.length / 2), - }, - edges: { - ...graphs[id].edges, - length: graphs[id].graph.numEdges(), - }, - dataframes: dataframes - }; - } } function layoutAndRenderGraphs(clients) { @@ -200,6 +151,8 @@ function layoutAndRenderGraphs(clients) { continue; } + if (client.graph == {}) { continue; } + const state = {...client.state}; const props = {...client.props}; const event = @@ -224,7 +177,7 @@ function layoutAndRenderGraphs(clients) { if (event.length === 0 && !props.layout) { continue; } if (event.length !== 0) { client.event = Object.create(null); } - if (props.layout == true) { client.graph = forceAtlas2(client.graph); } + if (props.layout == true && client.graph !== {}) { client.graph = forceAtlas2(client.graph); } const { width = client.props.width ?? 800, @@ -268,7 +221,7 @@ function layoutAndRenderGraphs(clients) { result.state.boxSelectCoordinates.rectdata = [{polygon: [[]], show: false}]; // send to client - if (client.graph.dataframes) { sendToClient(client.graph.dataframes); } + sendToClient([client.data.nodes.dataframe, client.data.edges.dataframe]); } else if (JSON.stringify(client.state.selectedInfo.selectedCoordinates) !== JSON.stringify(result.state.selectedInfo.selectedCoordinates)) { // selections updated @@ -276,12 +229,10 @@ function layoutAndRenderGraphs(clients) { Series.new({type: new Int32, data: result.state.selectedInfo.selectedNodes}); const edges = Series.new({type: new Int32, data: result.state.selectedInfo.selectedEdges}); - if (client.graph.dataframes) { - sendToClient([ - client.graph.dataframes[0].gather(nodes), - client.graph.dataframes[1].gather(edges) - ]); - } + sendToClient([ + client.data.nodes.dataframe.gather(nodes), + client.data.edges.dataframe.gather(edges) + ]); } // copy result state to client's current state result?.state && Object.assign(client.state, result.state); @@ -299,6 +250,7 @@ function getPaginatedRows(df, page = 1, rowsPerPage = 400) { } function forceAtlas2({graph, nodes, edges, ...params}) { + if (graph == undefined) { return {}; } graph.forceAtlas2({...params, positions: nodes.nodeXPositions.buffer}); return { graph, diff --git a/modules/demo/ssr/graph/public/video.html b/modules/demo/ssr/graph/public/video.html index 8b4b9df90..0dc26de56 100644 --- a/modules/demo/ssr/graph/public/video.html +++ b/modules/demo/ssr/graph/public/video.html @@ -24,6 +24,7 @@