From 2a7deceffc15e05e1fe9ab84d5da7c48d37c3d98 Mon Sep 17 00:00:00 2001 From: Paul Woods Date: Fri, 11 May 2018 18:44:59 -0400 Subject: [PATCH] Bugfix in monthly dag for gap_events --- airflow/pipe_events_dag.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airflow/pipe_events_dag.py b/airflow/pipe_events_dag.py index 00f7160..6754ec5 100644 --- a/airflow/pipe_events_dag.py +++ b/airflow/pipe_events_dag.py @@ -24,7 +24,7 @@ def source_date_range(self): return '{{ yesterday_ds }}', '{{ yesterday_ds }}' elif self.schedule_interval == '@monthly': start_date = '{{ (execution_date.replace(day=1) + macros.dateutil.relativedelta.relativedelta(days=-1)).strftime("%Y-%m-%d") }}' - end_date = '{{ (execution_date.replace(day=1) + macros.dateutil.relativedelta.relativedelta(months=1, days=-2)).strftime("%Y%m%d") }}' + end_date = '{{ (execution_date.replace(day=1) + macros.dateutil.relativedelta.relativedelta(months=1, days=-2)).strftime("%Y-%m-%d") }}' return start_date, end_date else: raise ValueError('Unsupported schedule interval {}'.format(self.schedule_interval)) @@ -39,7 +39,6 @@ def build(self, dag_id): publish_events = BashOperator( task_id='publish_events', - depends_on_past=True, bash_command='{docker_run} {docker_image} gap_events ' '{date_range} ' '{project_id}:{source_dataset}.{position_messages} '