Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New stager #281

Merged
merged 9 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docker-compose.swarm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ services:
proxynet:
aliases:
- streamer
dr-data-stager-net:
aliases:
- streamer
secrets:
- streamer-service-config.json
- streamer-mailer-config.json
Expand All @@ -29,6 +32,9 @@ services:
proxynet:
aliases:
- streamer-ui
dr-data-stager-net:
aliases:
- streamer-ui
secrets:
- streamer-service-config.json
- streamer-ui-config.json
Expand All @@ -50,6 +56,8 @@ networks:
default:
name: streamer4user-net
attachable: true
dr-data-stager-net:
external: true
proxynet:
external: true

Expand Down
9 changes: 3 additions & 6 deletions streamer-ui/packages/server/routes/stager.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const createError = require("http-errors");
const path = require("path");
const { basicAuthString, fetchOnce } = require("./utils");
const fetch = require("node-fetch");
const { fetchOnce } = require("./utils");

// location of the streamer-service-config.json file.
const fconfig = path.join(__dirname, '../config/streamer-service-config.json');
Expand All @@ -20,15 +19,13 @@ var _getDac = async function(req, res, next) {
delete require.cache[require.resolve(fconfig)];

const headers = {
'Content-Type': 'application/json',
'Authorization': basicAuthString(config.DataStager.username, config.DataStager.password),
'Content-Type': 'application/json'
};

fetchOnce(
config.DataStager.url + "/rdm/DAC/project/" + projectId,
config.DataStager.url + "/dac/project/" + projectId,
{
method: 'GET',
credentials: 'include',
headers,
},
1000 * 30, // timeout after 30 seconds
Expand Down
2 changes: 1 addition & 1 deletion streamer/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM centos:7
FROM almalinux:8

# application metadata
MAINTAINER Donders Institute
Expand Down
2 changes: 1 addition & 1 deletion streamer/config/default.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"password": "bind_password"
},
"DataStager": {
"url": "http://stager.dccn.nl:3000",
"url": "http://dr-data-stager-api:8080",
"username": "admin",
"password": "xxxxxx"
},
Expand Down
40 changes: 20 additions & 20 deletions streamer/lib/modalityMEG.js
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ var _execStreamerJob = function(name, config, job, cb_remove, cb_done) {

var rget_args = { headers: { 'Accept': 'application/json' } };

var myurl = sconfig.url + '/rdm/DAC/project/';
var myurl = sconfig.url + '/dac/project/';
if ( toCatchall || p == 'unknown' ) {
myurl += '_CATCHALL.MEG';
} else {
Expand All @@ -389,9 +389,13 @@ var _execStreamerJob = function(name, config, job, cb_remove, cb_done) {

// here we get the collection namespace for the project
var rpost_args = {
headers: { 'Accept': 'application/json',
'Content-Type': 'application/json' },
data: []
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json'
},
data: {
jobs: []
}
};

if ( src_list.length == 0 ) {
Expand All @@ -412,32 +416,28 @@ var _execStreamerJob = function(name, config, job, cb_remove, cb_done) {

for( var i=0; i<src_list.length; i++ ) {
// add job data to post_args
rpost_args.data.push({
'type': 'rdm',
'data': { 'clientIF': 'irods',
'stagerUser': 'root',
'rdmUser': 'irods',
'title': '[' + (new Date()).toISOString() + '] Streamer.MEG: ' + path.basename(src_list[i]),
'timeout': 3600,
'timeout_noprogress': 1800,
'srcURL': src_list[i],
'dstURL': dst_list[i] },
'options': { 'attempts': 5,
'backoff': { 'delay' : 60000,
'type' : 'fixed' } }
rpost_args.data.jobs.push({
"drUser": utility.getOuFromCollName(rdata.collName) + "[email protected]",
"dstURL": dst_list[i],
"srcURL": src_list[i],
"stagerUser": sconfig.username,
"stagerUserEmail": "",
"timeout": 3600,
"timeout_noprogress": 1800,
"title": '[' + (new Date()).toISOString() + '] Streamer.MEG: ' + path.basename(src_list[i])
});
}

// post new jobs to stager
if ( rpost_args.data.length > 0 ) {
c_stager.post(sconfig.url + '/job', rpost_args, function(rdata, resp) {
c_stager.post(sconfig.url + '/jobs', rpost_args, function(rdata, resp) {
if ( resp.statusCode >= 400 ) { //HTTP error
var errmsg = 'HTTP error: (' + resp.statusCode + ') ' + resp.statusMessage;
utility.printErr(job.id + ':MEG:execStreamerJob:submitStagerJob', errmsg);
return cb_async_stager(errmsg, false);
} else {
rdata.forEach( function(d) {
utility.printLog(job.id + ':MEG:execStreamerJob:submitStagerJob', JSON.stringify(d));
rdata.jobs.forEach( function(stagerJobData) {
utility.printLog(job.id + ':MEG:execStreamerJob:submitStagerJob', JSON.stringify(stagerJobData));
});
// everything is fine
return cb_async_stager(null, true);
Expand Down
38 changes: 17 additions & 21 deletions streamer/lib/modalityMRI.js
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ var _execStreamerJob = function(name, config, job, cb_remove, cb_done) {

// construct project and RESTful endpoint for resolving RDM collection namespace
var p = (toCatchall) ? '_CATCHALL.MRI':projectNumber;
var myurl = sconfig.url + '/rdm/DAC/project/' + p;
var myurl = sconfig.url + '/dac/project/' + p;

// general function to construct destination URL for stager job
var _mkDst = function(_src, _collName) {
Expand Down Expand Up @@ -402,34 +402,30 @@ var _execStreamerJob = function(name, config, job, cb_remove, cb_done) {
}

var rpost_args = {
headers: { 'Accept': 'application/json',
'Content-Type': 'application/json' },
data: [{
'type': 'rdm',
'data': { 'clientIF': 'irods',
'stagerUser': 'root',
'rdmUser': 'irods',
'title': '[' + (new Date()).toISOString() + '] Streamer.MRI: ' + src,
'timeout': 3600,
'timeout_noprogress': 600,
'srcURL': src,
'dstURL': _mkDst(src, rdata.collName) },
'options': { 'attempts': 5,
'backoff': { 'delay' : 60000,
'type' : 'fixed' } }
}]
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json'
},
data: {
"drUser": utility.getOuFromCollName(rdata.collName) + "[email protected]",
"dstURL": _mkDst(src, rdata.collName),
"srcURL": src,
"stagerUser": sconfig.username,
"stagerUserEmail": "",
"timeout": 3600,
"timeout_noprogress": 600,
"title": '[' + (new Date()).toISOString() + '] Streamer.MRI: ' + src
}
};

// Submit stager job
c_stager.post(sconfig.url + '/job', rpost_args, function(rdata, resp) {
c_stager.post(sconfig.url + '/job', rpost_args, function(stagerJobData, resp) {
if ( resp.statusCode >= 400 ) { //HTTP error
var errmsg = 'HTTP error: (' + resp.statusCode + ') ' + resp.statusMessage;
utility.printErr(job.id + ':MRI:execStreamerJob:submitStagerJob', errmsg);
return cb_async(errmsg, src, projectNumber);
} else {
rdata.forEach( function(d) {
utility.printLog(job.id + ':MRI:execStreamerJob:submitStagerJob', JSON.stringify(d));
});
utility.printLog(job.id + ':MRI:execStreamerJob:submitStagerJob', JSON.stringify(stagerJobData));
// job submitted!! set to job's maxProgress for the task
job.progress(maxProgress, 100);
return cb_async(null, src, projectNumber);
Expand Down
40 changes: 20 additions & 20 deletions streamer/lib/modalityUSER.js
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ var _execStreamerJob = function(name, config, job, cb_remove, cb_done) {
password: sconfig.password
});
var rget_args = { headers: { 'Accept': 'application/json' } };
var myurl = sconfig.url + '/rdm/DAC/project/';
var myurl = sconfig.url + '/dac/project/';
if ( toCatchall || p == 'unknown' ) {
// NOTE: it requires the stager to provide endpoint to get the USER catchall collection.
myurl += '_CATCHALL.USER';
Expand Down Expand Up @@ -271,9 +271,13 @@ var _execStreamerJob = function(name, config, job, cb_remove, cb_done) {

// here we get the collection namespace for the project
var rpost_args = {
headers: { 'Accept': 'application/json',
'Content-Type': 'application/json' },
data: []
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json'
},
data: {
jobs: []
}
};

// construct destination collection
Expand All @@ -298,30 +302,26 @@ var _execStreamerJob = function(name, config, job, cb_remove, cb_done) {
}

// compose POST data for submitting stager jobs
rpost_args.data.push({
'type': 'rdm',
'data': { 'clientIF': 'irods',
'stagerUser': 'root',
'rdmUser': 'irods',
'title': '[' + (new Date()).toISOString() + '] Streamer.USER: ' + src,
'timeout': 3600,
'timeout_noprogress': 600,
'srcURL': src,
'dstURL': dst },
'options': { 'attempts': 5,
'backoff': { 'delay' : 60000,
'type' : 'fixed' } }
rpost_args.data.jobs.push({
"drUser": utility.getOuFromCollName(rdata.collName) + "[email protected]",
"dstURL": dst,
"srcURL": src,
"stagerUser": sconfig.username,
"stagerUserEmail": "",
"timeout": 3600,
"timeout_noprogress": 600,
"title": '[' + (new Date()).toISOString() + '] Streamer.USER: ' + src
});

// submit jobs to stager
c_stager.post(sconfig.url + '/job', rpost_args, function(rdata, resp) {
c_stager.post(sconfig.url + '/jobs', rpost_args, function(rdata, resp) {
if ( resp.statusCode >= 400 ) { //HTTP error
var errmsg = 'HTTP error: (' + resp.statusCode + ') ' + resp.statusMessage;
utility.printErr(job.id + ':USER:execStreamerJob:submitStagerJob', errmsg);
return cb_async(errmsg, false);
} else {
rdata.forEach( function(d) {
utility.printLog(job.id + ':USER:execStreamerJob:submitStagerJob', JSON.stringify(d));
rdata.jobs.forEach( function(stagerJobData) {
utility.printLog(job.id + ':USER:execStreamerJob:submitStagerJob', JSON.stringify(stagerJobData));
});
// everything is fine
job.progress(maxProgress, 100);
Expand Down
14 changes: 14 additions & 0 deletions streamer/lib/utility.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const child_process = require('child_process');
const path = require('path');

// general error handler to send response to the client
var _responseOnError = function(c_type, c_data, resp) {
Expand Down Expand Up @@ -41,7 +42,20 @@ var _diskFree = function(path) {
return freespace;
}

// parses the RDR iRODS collName to get the ou name in lower case.
//
// The RDR iRODS collName is structured as follows:
//
// `/{zone}/{o}/{ou}/{collection}`
//
// This function gets the value of {ou} assuming the input `collName`
// follows the structure.
var _getOuFromCollName = function(collName) {
return path.basename(path.dirname(collName)).toLowerCase()
}

module.exports.responseOnError = _responseOnError;
module.exports.printLog = _printLog;
module.exports.printErr = _printErr;
module.exports.diskFree = _diskFree;
module.exports.getOuFromCollName = _getOuFromCollName;
2 changes: 0 additions & 2 deletions streamer/lib/utility_ad.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
var config = require('config');
var ActiveDirectory = require('activedirectory');
var utility = require('./utility');

// utility function for finding a user profile in the Active Directory.
//
// The argument `name` can be one of the following type of string:
Expand Down
Loading