Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle luigi task had failed dependencies #3311

Open
alex-coch opened this issue Sep 4, 2024 · 0 comments
Open

Handle luigi task had failed dependencies #3311

alex-coch opened this issue Sep 4, 2024 · 0 comments

Comments

@alex-coch
Copy link

I have a boilerplate code where I raise an Exception when a tmp-file is being created. As a result of this we get the event 1 had failed dependencies. Having had on_failure function for ProcessData I expected that data.txt and data_tmp.txt have to be removed. But on_failure function is not triggered and the file data.txt remains in a folder. How can I handle the event had failed dependencies properly?

Code:

import luigi
import os
import subprocess

class GenerateData(luigi.Task):
    output_path = luigi.Parameter(default='data.txt')

    def output(self):
        return luigi.LocalTarget(self.output_path)

    def run(self):
        with self.output().open('w') as f:
            for i in range(10):
                f.write("{}\n".format(i))


class GenerateTmpData(luigi.Task):
    output_path = luigi.Parameter(default='data_tmp.txt')

    def output(self):
        return luigi.LocalTarget(self.output_path)

    def run(self):
        raise BaseException("Simulated error in GenerateTmpData")
        with self.output().open('w') as f:
            for i in range(10):
                f.write("{}\n".format(i))


class ProcessData(luigi.Task):
    input_path = luigi.Parameter(default='data.txt')
    input_tmp_path = luigi.Parameter(default='data_tmp.txt')
    output_path = luigi.Parameter(default='processed_data.txt')

    def requires(self):
        yield GenerateTmpData(output_path=self.input_tmp_path)
        yield GenerateData(output_path=self.input_path)

    def output(self):
        return luigi.LocalTarget(self.output_path)

    def run(self):
        with self.input()[1].open('r') as infile, self.output().open('w') as outfile:
            for line in infile:
                number = int(line.strip())
                processed_number = number * 2
                outfile.write("{}\n".format(processed_number))

    def on_failure(self, exception):
        for item in self.input():
            if os.path.exists(item.path):
                os.remove(item.path)
        super(ProcessData, self).on_failure(exception)


def run_luigi_task():
    command = (
        'python -m luigi '
        '--module test ProcessData --local-scheduler '
    )
    subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, executable='/bin/bash')

if __name__ == '__main__':
    run_luigi_task()

Run as:

python test.py

Output:

DEBUG: Checking if ProcessData(input_path=data.txt, input_tmp_path=data_tmp.txt, output_path=processed_data.txt) is complete
DEBUG: Checking if GenerateTmpData(output_path=data_tmp.txt) is complete
DEBUG: Checking if GenerateData(output_path=data.txt) is complete
INFO: Informed scheduler that task   ProcessData_data_txt_data_tmp_txt_processed_data_t_ec6640af99   has status   PENDING
INFO: Informed scheduler that task   GenerateData_data_txt_1287d398ef   has status   PENDING
INFO: Informed scheduler that task   GenerateTmpData_data_tmp_txt_941f24e4a7   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 763] Worker Worker(salt=208102894, workers=1, host=8b0c89a2a184, username=root, pid=763) running   GenerateData(output_path=data.txt)
INFO: [pid 763] Worker Worker(salt=208102894, workers=1, host=8b0c89a2a184, username=root, pid=763) done      GenerateData(output_path=data.txt)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   GenerateData_data_txt_1287d398ef   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 763] Worker Worker(salt=208102894, workers=1, host=8b0c89a2a184, username=root, pid=763) running   GenerateTmpData(output_path=data_tmp.txt)
ERROR: [pid 763] Worker Worker(salt=208102894, workers=1, host=8b0c89a2a184, username=root, pid=763) failed    GenerateTmpData(output_path=data_tmp.txt)
Traceback (most recent call last):
  File "/app/env/lib/python3.5/site-packages/luigi/worker.py", line 199, in run
    new_deps = self._run_get_new_deps()
  File "/app/env/lib/python3.5/site-packages/luigi/worker.py", line 141, in _run_get_new_deps
    task_gen = self.task.run()
  File "test.py", line 25, in run
    raise BaseException("Simulated error in GenerateTmpData")
BaseException: Simulated error in GenerateTmpData
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   GenerateTmpData_data_tmp_txt_941f24e4a7   has status   FAILED
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: There are 2 pending tasks last scheduled by this worker
INFO: Worker Worker(salt=208102894, workers=1, host=8b0c89a2a184, username=root, pid=763) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 1 ran successfully:
    - 1 GenerateData(output_path=data.txt)
* 1 failed:
    - 1 GenerateTmpData(output_path=data_tmp.txt)
* 1 were left pending, among these:
    * 1 had failed dependencies:
        - 1 ProcessData(input_path=data.txt, input_tmp_path=data_tmp.txt, output_path=processed_data.txt)

This progress looks :( because there were failed tasks

===== Luigi Execution Summary =====
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant