Skip to content
This repository has been archived by the owner on Jan 13, 2021. It is now read-only.

On-demand feature for AWS pipeline addition to dataduct #7

Merged
merged 3 commits into from
May 10, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dataduct/etl/etl_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,14 @@ def create_base_objects(self):
topic_arn=self.topic_arn,
pipeline_name=self.name,
)
if self.frequency == 'on-demand':
scheduleType='ONDEMAND'
else:
scheduleType='cron'
self.default = self.create_pipeline_object(
object_class=DefaultObject,
pipeline_log_uri=self.s3_log_dir,
scheduleType=scheduleType
)

@property
Expand Down
2 changes: 1 addition & 1 deletion dataduct/pipeline/data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def aws_format(self):
Returns:
result(list of dict): list of AWS-readable dict of all objects
"""
return [x.aws_format() for x in self.objects]
return [x.aws_format() for x in self.objects if hasattr(x,'fields')]

def add_object(self, pipeline_object):
"""Add an object to the datapipeline
Expand Down
52 changes: 32 additions & 20 deletions dataduct/pipeline/pipeline_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ..s3 import S3Path
from ..utils.exceptions import ETLInputError


scheduleType = ''
class PipelineObject(object):
"""DataPipeline class with steps and metadata.

Expand Down Expand Up @@ -56,12 +56,15 @@ def s3_files(self):
Returns:
result(list of S3Files): List of files to be uploaded to s3
"""
result = self.additional_s3_files
for _, values in self.fields.iteritems():
for value in values:
if isinstance(value, S3File) or isinstance(value, S3Directory):
result.append(value)
return result
if hasattr(self,'additional_s3_files'):
result = self.additional_s3_files
for _, values in self.fields.iteritems():
for value in values:
if isinstance(value, S3File) or isinstance(value, S3Directory):
result.append(value)
return result
else:
return []

def __getitem__(self, key):
"""Fetch the items associated with a key
Expand Down Expand Up @@ -130,16 +133,25 @@ def aws_format(self):
result: The AWS-readable dict format of the object
"""
fields = []
for key, values in self.fields.iteritems():
for value in values:
if isinstance(value, PipelineObject):
fields.append({'key': key, 'refValue': value.id})
elif isinstance(value, S3Path):
fields.append({'key': key, 'stringValue': value.uri})
elif isinstance(value, S3File) or \
isinstance(value, S3Directory):
fields.append({'key': key,
'stringValue': value.s3_path.uri})
else:
fields.append({'key': key, 'stringValue': str(value)})
return {'id': self._id, 'name': self._id, 'fields': fields}
global scheduleType
if hasattr(self, 'fields'):
for key, values in self.fields.iteritems():
for value in values:
if isinstance(value, PipelineObject):
if scheduleType == 'ONDEMAND'and key == 'schedule' :
pass
else:
fields.append({'key': key, 'refValue': value.id})
elif isinstance(value, S3Path):
fields.append({'key': key, 'stringValue': value.uri})
elif isinstance(value, S3File) or \
isinstance(value, S3Directory):
fields.append({'key': key,
'stringValue': value.s3_path.uri})
else:
if key == 'scheduleType' and str(value) == 'ONDEMAND':
scheduleType = 'ONDEMAND'
fields.append({'key': key, 'stringValue': str(value)})
return {'id': self._id, 'name': self._id, 'fields': fields}
else:
return None
5 changes: 5 additions & 0 deletions dataduct/pipeline/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
'8-hours': ('8 hours', None),
'12-hours': ('12 hours', None),
'one-time': ('15 minutes', 1),
'on-demand': ('ondemand',None),
Copy link

@idralyuk idralyuk May 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a space between , and None

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

'30-min': ('30 minutes', None),
'15-min': ('15 minutes', None),
}
Expand Down Expand Up @@ -64,6 +65,10 @@ def __init__(self,
load_minutes(int): Minutes at which the pipeline should be run
**kwargs(optional): Keyword arguments directly passed to base class
"""
if frequency == 'on-demand':
logger.debug("Don't create schedule object")
return None

current_time = datetime.utcnow()
# Set the defaults for load hour and minutes
if load_minutes is None:
Expand Down