Skip to content

Commit

Permalink
Prefer CamelCase for class names
Browse files Browse the repository at this point in the history
Co-authored-by: Nick <[email protected]>
  • Loading branch information
dav3r and mcdonnnj committed May 23, 2024
1 parent 47c12a9 commit 2861e2e
Showing 1 changed file with 20 additions and 20 deletions.
40 changes: 20 additions & 20 deletions src/lambda_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,22 @@


# Define some named tuples to make the code more readable
class aws_credentials(NamedTuple):
class AwsCredentials(NamedTuple):
"""Named tuple to hold AWS credentials."""

access_key_id: str
secret_access_key: str
session_token: str


class ec2_info(NamedTuple):
class Ec2Info(NamedTuple):
"""Named tuple to hold EC2 information."""

application_tag_value: str
public_ip: str


class event_validation(NamedTuple):
class EventValidation(NamedTuple):
"""Named tuple to hold event validation information."""

errors: List[str]
Expand All @@ -69,7 +69,7 @@ class FileConfig(TypedDict):
static_ips: List[str]


def assume_role(role_arn: str, session_name: str) -> aws_credentials:
def assume_role(role_arn: str, session_name: str) -> AwsCredentials:
"""Assume the given role and return a named tuple containing the assumed role's credentials."""
# Create an STS session with current credentials
sts: boto3.client = boto3.client("sts")
Expand All @@ -79,7 +79,7 @@ def assume_role(role_arn: str, session_name: str) -> aws_credentials:
RoleArn=role_arn, RoleSessionName=session_name
)

return aws_credentials(
return AwsCredentials(
response["Credentials"]["AccessKeyId"],
response["Credentials"]["SecretAccessKey"],
response["Credentials"]["SessionToken"],
Expand All @@ -90,7 +90,7 @@ def create_assumed_aws_client(
aws_service: str, role_arn: str, session_name: str
) -> boto3.client:
"""Assume the given role and return an AWS client for the given service using that role."""
role_credentials: aws_credentials = assume_role(role_arn, session_name)
role_credentials: AwsCredentials = assume_role(role_arn, session_name)

return boto3.client(
aws_service,
Expand All @@ -104,7 +104,7 @@ def create_assumed_aws_resource(
aws_service: str, region: str, role_arn: str, session_name: str
) -> boto3.resource:
"""Assume the given role and return an AWS resource object for the given service using that role."""
role_credentials: aws_credentials = assume_role(role_arn, session_name)
role_credentials: AwsCredentials = assume_role(role_arn, session_name)

return boto3.resource(
aws_service,
Expand All @@ -127,7 +127,7 @@ def convert_tags(aws_resource: boto3.resource) -> Dict[str, str]:

def get_ec2_ips(
ec2: boto3.resource, application_tag_name: str, publish_egress_tag_name: str
) -> Iterator[ec2_info]:
) -> Iterator[Ec2Info]:
"""Create a set of public EC2 IPs.
Yields (application tag value, public_ip) tuples.
Expand All @@ -153,7 +153,7 @@ def get_ec2_ips(
# Send back a tuple associating the public IP to an application.
# If application is unset, return "", so that the IP can be included
# in a list of all IPs if desired (e.g. using app_regex=".*").
yield ec2_info(tags.get(application_tag_name, ""), instance.public_ip_address)
yield Ec2Info(tags.get(application_tag_name, ""), instance.public_ip_address)

for vpc_address in vpc_addresses:
# Convert elastic IP tags from an AWS dictionary into a Python dictionary
Expand All @@ -165,7 +165,7 @@ def get_ec2_ips(
# Send back a tuple associating the public IP to an application.
# If application is unset, return "", so that the IP can be included
# in a list of all IPs if desired (e.g. using app_regex=".*").
yield ec2_info(eip_tags.get(application_tag_name, ""), vpc_address.public_ip)
yield Ec2Info(eip_tags.get(application_tag_name, ""), vpc_address.public_ip)


def get_ec2_regions(
Expand Down Expand Up @@ -222,7 +222,7 @@ def task_default(event):
return result


def validate_event_data(event: Dict[str, Any]) -> event_validation:
def validate_event_data(event: Dict[str, Any]) -> EventValidation:
"""Validate the event data and return a tuple containing the validated event, a boolean result (True if valid, False if invalid), and a list of error message strings."""
result = True
errors = []
Expand Down Expand Up @@ -306,21 +306,21 @@ def validate_event_data(event: Dict[str, Any]) -> event_validation:
if errors:
result = False

return event_validation(errors, event, result)
return EventValidation(errors, event, result)


def task_publish(event: Dict[str, Any]) -> Dict[str, Union[Optional[str], bool]]:
"""Publish the egress IP addresses in the given AWS accounts to an S3 bucket."""
result: Dict[str, Union[Optional[str], bool]] = {"message": None, "success": True}

# Validate all event data before going any further
event_validation_info: event_validation = validate_event_data(event)
if not event_validation_info.valid:
for e in event_validation_info.errors:
EventValidation_info: EventValidation = validate_event_data(event)
if not EventValidation_info.valid:
for e in EventValidation_info.errors:
logging.error(e)
failed_task(result, " ".join(event_validation_info.errors))
failed_task(result, " ".join(EventValidation_info.errors))
return result
validated_event = event_validation_info.event
validated_event = EventValidation_info.event

# The account IDs to examine for IP addresses
account_ids: List[str] = validated_event["account_ids"]
Expand Down Expand Up @@ -407,13 +407,13 @@ def task_publish(event: Dict[str, Any]) -> Dict[str, Union[Optional[str], bool]]
)

# Get the public IPs of instances that are tagged to be published
for ec2_info in get_ec2_ips(
for Ec2Info in get_ec2_ips(
ec2, application_tag_name, publish_egress_tag_name
):
# Loop through all regexes and add IP to set if matched
for config in file_configs:
if config["app_regex"].match(ec2_info.application_tag_value):
config["ip_set"].add(ip_network(ec2_info.public_ip))
if config["app_regex"].match(Ec2Info.application_tag_value):
config["ip_set"].add(ip_network(Ec2Info.public_ip))

# Use a single timestamp for all files
now = "{:%a %b %d %H:%M:%S UTC %Y}".format(datetime.utcnow())
Expand Down

0 comments on commit 2861e2e

Please sign in to comment.