-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconn_deco.py
51 lines (40 loc) · 1.28 KB
/
conn_deco.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from datetime import timedelta
from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
import pendulum
from decors import get_connection, remove, setup
def_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
@dag(
default_args=def_args,
schedule=None,
start_date=pendulum.today('UTC'),
tags=["example"],
)
def conn_decorator():
@task()
def doing_nothing(conn_id, **kwargs):
print(f"Using connection {conn_id}")
ssh_hook = get_connection(conn_id=conn_id, **kwargs)
with ssh_hook.get_conn() as ssh_client:
sftp_client = ssh_client.open_sftp()
print("Connected")
lst = sftp_client.listdir(path="/tmp/")
for f in lst:
print(f)
return conn_id
conn_id = PythonOperator(python_callable=setup, task_id="setup_connection")
# another way of mixing taskflow and classical api:
a_id = conn_id.output["return_value"]
dno = doing_nothing(conn_id=a_id)
en = PythonOperator(
python_callable=remove, op_kwargs={"conn_id": dno}, task_id="cleanup"
)
conn_id >> dno >> en
dag = conn_decorator()