Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offload publishing to separate jobs #5618

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package nextflow

import nextflow.processor.PublishOffloadManager

import java.nio.file.Files
import java.nio.file.Path
Expand Down Expand Up @@ -293,6 +294,12 @@ class Session implements ISession {

FilePorter getFilePorter() { filePorter }

private int publishOffloadBatchSize

private PublishOffloadManager publishOffloadManager

PublishOffloadManager getPublishOffloadManager() { publishOffloadManager }

/**
* Creates a new session with an 'empty' (default) configuration
*/
Expand Down Expand Up @@ -394,6 +401,21 @@ class Session implements ISession {
// -- file porter config
this.filePorter = new FilePorter(this)

this.publishOffloadBatchSize = config.publishOffloadBatchSize ? config.publishOffloadBatchSize as int : 0

if ( this.publishOffloadBatchSize ) {
// -- publish offload manager config
log.warn("Publish offload flag enabled. Creating Offload Manager")
this.publishOffloadManager = new PublishOffloadManager(this, publishOffloadBatchSize)
}

}

void startPublishOffloadManager() {
if ( this.publishOffloadBatchSize ) {
log.debug("Starting Publish offload manager")
this.publishOffloadManager?.init()
}
}

protected Path cloudCachePath(Map cloudcache, Path workDir) {
Expand Down Expand Up @@ -676,13 +698,19 @@ class Session implements ISession {
log.debug "Session await"
processesBarrier.awaitCompletion()
log.debug "Session await > all processes finished"
terminated = true
monitorsBarrier.awaitCompletion()
log.debug "Session await > all barriers passed"


if( !aborted ) {
joinAllOperators()
log.trace "Session > all operators finished"
}
// shutdown t
publishPoolManager?.shutdown(false)
publishOffloadManager?.close()
log.debug "Session await > Publish phase finished"
terminated = true
monitorsBarrier.awaitCompletion()
log.debug "Session await > all barriers passed"
}

void destroy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,22 @@ class ExecutorFactory {
return result
}

@CompileDynamic
static String getDefaultExecutorName(Session session) {
def result = session.config.process?.executor?.toString()
if( !result ) {
if (session.config.executor instanceof String) {
return session.config.executor
} else if (session.config.executor?.name instanceof String) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle else is not needed

return session.config.executor.name
} else {
return DEFAULT_EXECUTOR
}
}
return result
}


void signalExecutors() {
for( Executor exec : executors.values() )
exec.signal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,9 @@ class PublishDir {

// create target dirs if required
makeDirs(destination.parent)

def offload = false
try {
processFileImpl(source, destination)
offload = processFileImpl(source, destination)
}
catch( FileAlreadyExistsException e ) {
// don't copy source path if target is identical, but still emit the publish event
Expand All @@ -425,11 +425,12 @@ class PublishDir {

if( !sameRealPath && shouldOverwrite(source, destination) ) {
FileHelper.deletePath(destination)
processFileImpl(source, destination)
offload = processFileImpl(source, destination)
}
}

notifyFilePublish(destination, source)
//Don't notify if file publication is offloaded. It will be notified after the offloaded job is finished.
if (!offload)
notifyFilePublish(destination, source)
}

private String real0(Path p) {
Expand Down Expand Up @@ -495,7 +496,7 @@ class PublishDir {
return sourceHash != targetHash
}

protected void processFileImpl( Path source, Path destination ) {
protected boolean processFileImpl( Path source, Path destination ) {
log.trace "publishing file: $source -[$mode]-> $destination"

if( !mode || mode == Mode.SYMLINK ) {
Expand All @@ -509,17 +510,26 @@ class PublishDir {
FilesEx.mklink(source, [hard:true], destination)
}
else if( mode == Mode.MOVE ) {
FileHelper.movePath(source, destination)
if ( session.getPublishOffloadManager()?.tryMoveOffload(source, destination, retryConfig, failOnError) ){
return true
} else {
FileHelper.movePath(source, destination)
}
}
else if( mode == Mode.COPY ) {
FileHelper.copyPath(source, destination)
if ( session.getPublishOffloadManager()?.tryCopyOffload(source, destination, retryConfig, failOnError) ){
return true
} else {
FileHelper.copyPath(source, destination)
}
}
else if( mode == Mode.COPY_NO_FOLLOW ) {
FileHelper.copyPath(source, destination, LinkOption.NOFOLLOW_LINKS)
}
else {
throw new IllegalArgumentException("Unknown file publish mode: ${mode}")
}
return false
}

protected void createPublishDir() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Copyright 2013-2024, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.processor

import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import nextflow.Nextflow
import nextflow.Session
import nextflow.executor.Executor
import nextflow.executor.ExecutorFactory
import nextflow.extension.FilesEx
import nextflow.fusion.FusionHelper
import nextflow.script.BaseScript
import nextflow.script.BodyDef
import nextflow.script.ProcessConfig
import nextflow.util.ArrayTuple

import java.nio.file.Path
import java.util.concurrent.atomic.AtomicInteger

@Slf4j
class PublishOffloadManager {
Map<Integer, ArrayTuple> runningPublications= new HashMap<Integer, ArrayTuple>(10)
static final Map SUPPORTED_SCHEMES = [awsbatch:['s3'], local:['file']]
static final String S5CMD_CONTAINER = 'jorgeejarquea/s5cmd_aws:0.0.1'
static final String PUBLISH_FUNCTION = 'nxf_publish'
private Session session
private PublishTaskProcessor publishProcessor
private List<String> commands = new LinkedList<String>()
private boolean closed = false
private int batchSize
/**
* Unique offloaded index number
*/
final protected AtomicInteger indexCount = new AtomicInteger()

PublishOffloadManager(Session session, int batchSize) {
this.session = session;
this.batchSize = batchSize;
}
@PackageScope
TaskProcessor getPublishProcessor(){ publishProcessor }

void init(){
//Try with template
BodyDef body = new BodyDef({
def file = PublishOffloadManager.class.getResource('copy-group-template.sh')
return file.text
},'publish file process', 'shell')
this.publishProcessor = createProcessor( "publish_process", body )

}

private boolean checkOffload(Path source, Path destination, String executor){
return this.batchSize > 0 && source.scheme in SUPPORTED_SCHEMES[executor] && destination.scheme in SUPPORTED_SCHEMES[executor];
}

private void invokeProcessor(inputValue) {
final params = new TaskStartParams(TaskId.next(), publishProcessor.indexCount.incrementAndGet())
final values = new ArrayList(1)
values[0] = inputValue
final args = new ArrayList(2)
args[0] = params
args[1] = values
publishProcessor.invokeTask(args.toArray())
}

private synchronized boolean tryOffload(String command, Path origin, Path destination, PublishRetryConfig retryConfig, boolean failOnError){
if (checkOffload(origin, destination, publishProcessor.executor.name)) {
final id = indexCount.incrementAndGet()
runningPublications.put(id, Nextflow.tuple(origin, destination, failOnError))
commands.add(generateExecutionCommand(id, command, origin, destination, retryConfig))
if (commands.size() == this.batchSize){
invokeProcessor(commands.join(";"))
commands.clear()
}
return true
}
return false
}

private isFusionEnabled(){
return FusionHelper.isFusionEnabled(session)
}

private useS5cmd(){
return ( (!isFusionEnabled()) && (ExecutorFactory.getDefaultExecutorName(session) == 'awsbatch') )
}

private String generateExecutionCommand(Integer id, String command, Path origin, Path destination, PublishRetryConfig retryConfig){
return "$PUBLISH_FUNCTION ${retryConfig.maxAttempts} ${retryConfig.delay.toMillis()} ${retryConfig.jitter} ${retryConfig.maxDelay.toMillis()} " +
"$id $command ${convertFilePath(origin)} ${convertFilePath(destination)}"
}

private String convertFilePath(Path path) {
if (isFusionEnabled()) {
return FusionHelper.toContainerMount(path)
} else {
return FilesEx.toUriString(path)
}
}

boolean tryMoveOffload(Path origin, Path destination, PublishRetryConfig retryConfig, boolean failOnError) {
String command = 'mv'
if ( useS5cmd() ) {
command = 's5cmd mv'
}
tryOffload(command, origin, destination, retryConfig, failOnError)
}

boolean tryCopyOffload(Path origin, Path destination, PublishRetryConfig retryConfig, boolean failOnError) {
String command = 'cp'
if ( useS5cmd() ) {
command = 's5cmd cp'
}
tryOffload(command, origin, destination, retryConfig, failOnError)
}

private PublishTaskProcessor createProcessor( String name, BodyDef body){
assert body
assert session.script
log.debug("Creating processor $name")
// -- the config object
final processConfig = new ProcessConfig(session.script, name)
// Invoke the code block which will return the script closure to the executed.
// As side effect will set all the property declarations in the 'taskConfig' object.
if (useS5cmd()) {
processConfig.put('container', S5CMD_CONTAINER);
}
processConfig._in_val('executions')
processConfig._out_stdout()
if ( !body )
throw new IllegalArgumentException("Missing script in the specified process block -- make sure it terminates with the script string to be executed")

// -- apply settings from config file to process config
processConfig.applyConfig((Map)session.config.process, name, name, name)

// -- get the executor for the given process config
final execObj = session.executorFactory.getExecutor(name, processConfig, body, session)

// -- create processor class
new PublishTaskProcessor( name, execObj, session, session.script, processConfig, body, this )
}

synchronized void close() {
closed=true
if ( commands.size() ){
invokeProcessor(commands.join(";"))
commands.clear()
}

}
}

@Slf4j
class PublishTaskProcessor extends TaskProcessor{
PublishOffloadManager manager

PublishTaskProcessor(String name, Executor executor, Session session, BaseScript baseScript, ProcessConfig processConfig, BodyDef bodyDef, PublishOffloadManager manager) {
super(name, executor, session, baseScript, processConfig, bodyDef)
this.manager = manager
}

@Override
void finalizeTask0(TaskRun task) {
if( task.outputs.size() == 1 ){
def value = task.outputs.values().first()
value = value instanceof Path ? value.text : value?.toString()
for ( String finishedCopy : value.split('\n') ){
final result = finishedCopy.split(":")
if (result.size() == 2) {
final id = result[0] as Integer
final tuple = manager.runningPublications.remove(id)
final exitCode = result[1] as Integer
if( exitCode == 0 ){
session.notifyFilePublish((Path) tuple.get(0), (Path) tuple.get(1))
} else {
if (tuple.get(2) as Boolean) {
log.error("Publication of file ${tuple.get(0)} -> ${tuple.get(1)} failed.")
} else {
log.warn("Publication of file ${tuple.get(0)} -> ${tuple.get(1)} failed.")
}
printPublishTaskError(task)
}
}
}
} else {
log.error("Incorrect number of outputs in the publish task");
}
}

private void printPublishTaskError(TaskRun task){
final List<String> message = []
final max = 50
final lines = task.dumpStderr(max)
message << "Executed publish task:"
if( lines ) {
message << "\nCommand error:"
for( String it : lines ) {
message << " ${stripWorkDir(it, task.workDir)}"
}
}
if( task?.workDir )
message << "\nWork dir:\n ${task.workDirStr}"
log.debug(message.join('\n'))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2422,7 +2422,7 @@ class TaskProcessor {
* @param task The {@code TaskRun} instance to finalize
* @param producedFiles The map of files to be bind the outputs
*/
private void finalizeTask0( TaskRun task ) {
protected void finalizeTask0( TaskRun task ) {
log.trace "Finalize process > ${safeTaskName(task)}"

// -- bind output (files)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ class ScriptRunner {
protected run() {
log.debug "> Launching execution"
assert scriptParser, "Missing script instance to run"
session.startPublishOffloadManager()
// -- launch the script execution
scriptParser.runScript()
// -- normalise output
Expand Down
Loading
Loading