Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding fix for issue 119,124,129: Kinesis Event Source add function not working as expected, Cannot have multiple S3 event sources on same bucket for same Lambda Function, Cannot use different VPC config for different environments #120

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions docs/config_file_example.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ Here is an example config file showing all possible sections.
arn: arn:aws:kinesis:us-west-2:123456789012:stream/foo
starting_position: LATEST
batch_size: 100
vpc_config:
security_group_ids:
- sg-12345678
- sg-23456789
subnet_ids:
- subnet-12345678
- subnet-23456789
env2:
profile: profile2
region: us-west-2
Expand All @@ -51,6 +58,13 @@ Here is an example config file showing all possible sections.
arn: arn:aws:kinesis:us-west-2:234567890123:stream/foo
starting_position: LATEST
batch_size: 100
vpc_config:
security_group_ids:
- sg-34567890
- sg-34567891
subnet_ids:
- subnet-23456789
- subnet-34567890
lambda:
description: A simple Python sample
handler: simple.handler
Expand All @@ -59,13 +73,6 @@ Here is an example config file showing all possible sections.
timeout: 3
log_retention_policy: 7
excluded_dirs: default
vpc_config:
security_group_ids:
- sg-12345678
- sg-23456789
subnet_ids:
- subnet-12345678
- subnet-23456789


Explanations:
Expand Down
37 changes: 27 additions & 10 deletions kappa/event_source/kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,31 @@ def _get_uuid(self, function):

def add(self, function):
try:
response = self._lambda.call(
'create_event_source_mapping',
FunctionName=function.name,
EventSourceArn=self.arn,
BatchSize=self.batch_size,
StartingPosition=self.starting_position,
Enabled=self.enabled
)
LOG.debug(response)
existingMapping={}
try:
response = self._lambda.call(
'list_event_source_mappings',
FunctionName=function.name,
EventSourceArn=self.arn
)
LOG.debug(response)
existingMapping = self.arn in response['EventSourceMappings'][0]['EventSourceArn']
except Exception:
LOG.debug('Kinesis event source mapping not available')

if not existingMapping:
response = self._lambda.call(
'create_event_source_mapping',
FunctionName=function.name,
EventSourceArn=self.arn,
BatchSize=self.batch_size,
StartingPosition=self.starting_position,
Enabled=self.enabled
)
LOG.debug(response)
else:
self.update(function)

except Exception:
LOG.exception('Unable to add event source')

Expand Down Expand Up @@ -83,9 +99,10 @@ def update(self, function):
try:
response = self._lambda.call(
'update_event_source_mapping',
FunctionName=function.name,
BatchSize=self.batch_size,
Enabled=self.enabled,
FunctionName=function.arn)
UUID=uuid)
LOG.debug(response)
except Exception:
LOG.exception('Unable to update event source')
Expand Down
5 changes: 4 additions & 1 deletion kappa/event_source/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ def __init__(self, context, config):
self._lambda = kappa.awsclient.create_client('lambda', context.session)

def _make_notification_id(self, function_name):
return 'Kappa-%s-notification' % function_name
id_no = self._config.get('id')
if not id_no:
return 'Kappa-%s-notification' % function_name
return 'Kappa-%s-notification-%s' % (function_name,id_no)

def _get_bucket_name(self):
return self.arn.split(':')[-1]
Expand Down
11 changes: 6 additions & 5 deletions kappa/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,14 @@ def memory_size(self):

@property
def vpc_config(self):
env_cfg = self._context.config['environments'][self._context.environment]
vpc_config = {}
if 'vpc_config' in self._config:
if 'security_group_ids' in self._config['vpc_config']:
sgids = self._config['vpc_config']['security_group_ids']
if 'vpc_config' in env_cfg:
if 'security_group_ids' in env_cfg['vpc_config']:
sgids = env_cfg['vpc_config']['security_group_ids']
vpc_config['SecurityGroupIds'] = sgids
if 'subnet_ids' in self._config['vpc_config']:
snids = self._config['vpc_config']['subnet_ids']
if 'subnet_ids' in env_cfg['vpc_config']:
snids = env_cfg['vpc_config']['subnet_ids']
vpc_config['SubnetIds'] = snids
return vpc_config

Expand Down