Skip to content

Commit

Permalink
airflow LSF executor
Browse files Browse the repository at this point in the history
  • Loading branch information
xunpan committed Feb 21, 2022
1 parent f710c6f commit fe9ba1d
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 0 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ Scripts to use `CRIU` for LSF checkpoint/restart job

- [LSF Flux job](https://github.com/IBMSpectrumComputing/lsf-utils/tree/master/flux)
A script to provision a Flux cluster in an LSF job

- [LSF Airflow Executor](https://github.com/IBMSpectrumComputing/lsf-utils/tree/master/airflow)
An Airflow executor plugin to run Airflow tasks in LSF cluster resources as jobs
21 changes: 21 additions & 0 deletions airflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# LSF Executor for Airflow

## How to use it
copy lsf executor plugin into `plugins` directory
```
$ cp lsf_executor/lsf.py $AIRFLOW_HOME/plugins/
```

configure `lsf.LSFExecutor` to use LSF as executor of Airflow.
```
$ vim $AIRFLOW_HOME/airflow.cfg
executor = lsf.LSFExecutor
```

copy the example workflow into `dags` directory
```
$ cp examples/process_text.py $AIRFLOW_HOME/dags/
```

Now, you can trigger `process_text.py` workflow in the Airflow and run tasks in `LSF` cluster as jobs.

54 changes: 54 additions & 0 deletions airflow/examples/process_text.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/bin/python3
#
# Copyright International Business Machines Corp, 2022
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator


with DAG('process_text',
description = 'say hi instead of hello from a text file',
schedule_interval = None,
start_date = datetime(2022, 1, 13),
catchup = False) as dag:

the_end = DummyOperator(task_id = 'the_end',)

file_name = '/tmp/airflow_lsf_demo.txt'
generate_text_file = BashOperator(
task_id = 'generate_text_file',
bash_command = f'echo "Hello, Airflow in LSF!" > {file_name}',
)

say_hi = BashOperator(
task_id = 'say_hi',
bash_command = f'sed -i s/Hello/Hi/ {file_name}',
)

generate_text_file >> say_hi

wait_and_let_go = BashOperator(
task_id = 'wait_and_let_go',
bash_command = 'sleep 30',
)
wait_and_let_go >> say_hi

say_hi >> the_end

if __name__ == "__main__":
dag.cli()
162 changes: 162 additions & 0 deletions airflow/lsf_executor/lsf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#!/bin/python3
#
# Copyright International Business Machines Corp, 2022
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from airflow.plugins_manager import AirflowPlugin
from airflow.executors.base_executor import BaseExecutor

import subprocess
import re

class LSFExecutor(BaseExecutor):
def __init__(self):
self.jobs = {}
self.queues = {}

super().__init__()

def start(self):
self.log.info("[ LSF ] starting LSF executor")

q = run_cmd(['bqueues', '-o', 'QUEUE_NAME', '-noheader'], self.log)
if q == None:
self.log.warning("LSF: there is no queue found")
return

self.queues = {q: True for q in q.strip('\n').split('\n')}

self.log.info(f"[ LSF ] there are {len(self.queues)} queues: {','.join(self.queues)}")

def execute_async(self, key, command, queue=None, executor_config=None):
self.log.info("[ LSF ] executing async(). key = %s command = %s queue = %s config = %s" % (key, command, queue, executor_config))

self.validate_command(command)

options=['-J', f'{key.dag_id}-{key.task_id}-{key.run_id}']

# the queue name is setting when it is an LSF queue name
if self.queues.get(queue) != None:
options.extend(['-q', queue])

jobid = bsub(options, command, self.log)
if (int(jobid) > 0):
self.jobs[jobid] = key

def sync(self):
self.log.info("LSF: executing sync()")

# TODO: bjobs job_ids instead of multiple bjobs. It may be all in one or sub set of jobs by one request
for jid in list(self.jobs.keys()):
stat = bjobs(jid, self.log)
if stat is None:
del self.jobs[jid]
continue

if stat == 'DONE':
self.success(self.jobs[jid])
del self.jobs[jid]
if stat == 'EXIT':
self.fail(self.jobs[jid])
del self.jobs[jid]

def end(self):
self.log.info("LSF: executing end()")

for jid in list(self.jobs.keys()):
bkill(jid, self.log)
del self.jobs[jid]

self.heartbeat()

def terminate(self):
self.log.info("LSF: executing terminate()")

for jid in self.jobs.keys():
bkill(jid, self.log)
del self.jobs[jid]

def bkill(jobid, log):
cmd = ["bkill", "-C", "job is killed because airflow is ended", jid]
log.info(f'[ LSF ] request: {cmd}')

run_cmd(cmd, log)

def bjobs(jobid, log):
cmd = ['bjobs', '-o', 'stat', '-noheader', jobid]
log.info(f'[ LSF ] request: {cmd}')

reply = run_cmd(cmd, log)
log.info(f'[ LSF ] reply: {reply}')

stat = reply
if reply is not None:
stat = reply.strip()

return stat

def bsub(options, cmd, log):
# DEBUG: submit to local host for test
bsub_cmd = ['bsub', '-m', 'scurvily1']
bsub_cmd.extend(options)
bsub_cmd.extend(cmd)
log.info(f'[ LSF ] request: {bsub_cmd}')

message = run_cmd(bsub_cmd, log)
log.info(f'[ LSF ] reply: {message}')

# record job information for monitoring
jobid = '0'
result = re.search(r'<(\d*)>', message)
if result:
jobid = result.group(1)
else:
log.info(f'[ LSF ] error: failed to get jobid {message}')

return jobid


def run_cmd(cmd, log):
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
try:
outs, errs = proc.communicate(timeout=5)
log.info(f'[ LSF ] stdout: {outs}')
log.info(f'[ LSF ] stderr: {errs}')
except TimeoutExpired:
proc.kill()
outs, errs = proc.communicate()

result = outs.decode('utf-8')
return result


# Defining the plugin class
class LSFExecutorPlugin(AirflowPlugin):
name = "LSF"
executors = [LSFExecutor]


# DEBUG: test only
if __name__ == '__main__':
e = LSFExecutor()
e.start()

cmd = ['sleep', '9527']
jid = bsub([], cmd, e.log)

stat = bjobs(jid, e.log)
e.log.info(f'status is <{stat}>')

bkill(jid, e.log)

0 comments on commit fe9ba1d

Please sign in to comment.