Skip to content

Commit

Permalink
WiP Tell scrapers how to commit to aws
Browse files Browse the repository at this point in the history
With added batch gymnastics
  • Loading branch information
GeoWill committed Aug 11, 2021
1 parent dcb7100 commit a625739
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 61 deletions.
57 changes: 5 additions & 52 deletions lgsf/aws_lambda/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,66 +10,19 @@


def scraper_worker_handler(event, context):
print(event)
print(f"EVENT: {event}")
message = json.loads(event["Records"][0]["body"])
council = message["council"]
command_name = message["scraper_type"]
scraper_cls = load_scraper(council, command_name)
options = {"council": council, "verbose": True, "aws_lambda": True}
console = Console(file=sys.stdout)
scraper = scraper_cls(options, console)
# scraper.run()

codecommit_client = boto3.client("codecommit")

main_info = codecommit_client.get_branch(
repositoryName="CouncillorsRepo", branchName="main"
)

now = datetime.now().strftime("%Y-%m-%d")
branch_name = f"{council}-{now}"

codecommit_client.create_branch(
repositoryName="CouncillorsRepo",
branchName=branch_name,
commitId=main_info["branch"]["commitId"],
)

put_files = []

wards = scraper.get_councillors()
for ward in wards:
for councillor_xml in ward.find_all("councillor"):
councillor = scraper.get_single_councillor(ward, councillor_xml)
file_path = f"{council}/{councillor.as_file_name()}.json"
put_files.append(
{
"filePath": file_path,
"fileContent": bytes(
json.dumps(councillor.as_dict(), indent=4), "utf-8"
),
}
)

commit_info = codecommit_client.create_commit(
repositoryName="CouncillorsRepo",
branchName=branch_name,
parentCommitId=main_info["branch"]["commitId"],
commitMessage=f"Scraped on {now}",
putFiles=put_files,
)

try:
merge_info = codecommit_client.merge_branches_by_fast_forward(
repositoryName="CouncillorsRepo",
sourceCommitSpecifier=branch_name,
destinationCommitSpecifier="main",
)
codecommit_client.delete_branch(
repositoryName="CouncillorsRepo", branchName=branch_name
)
except codecommit_client.exceptions.ManualMergeRequiredException as e:
print(e)
scraper.run()
except Exception as e:
scraper.console.log(e)
scraper.delete_branch()


def queue_builder_handler(event, context):
Expand Down
5 changes: 2 additions & 3 deletions lgsf/commands/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def __init__(self, argv, stdout, pretty=False):
self.console = Console(file=self.stdout)
self.pretty = pretty


def create_parser(self):
self.parser = argparse.ArgumentParser()
if hasattr(self, "add_arguments"):
Expand Down Expand Up @@ -72,7 +71,7 @@ def create_parser(self):
self.parser.add_argument(
"--exclude-missing",
action="store_true",
help="Don't run councils missing a scraper matching command name"
help="Don't run councils missing a scraper matching command name",
)
self.parser.add_argument(
"-t",
Expand Down Expand Up @@ -203,7 +202,7 @@ def councils_to_run(self):
councils.append(council)

if self.options["exclude_missing"]:
councils = list(set(councils) - set(c['code'] for c in self.missing()))
councils = list(set(councils) - set(c["code"] for c in self.missing()))
return councils

def run_councils(self):
Expand Down
240 changes: 236 additions & 4 deletions lgsf/councillors/scrapers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import abc
from datetime import datetime

import boto3
from bs4 import BeautifulSoup
from dateutil.parser import parse

from lgsf.scrapers import ScraperBase
from lgsf.councillors import CouncillorBase
from lgsf.councillors import CouncillorBase, json


class BaseCouncillorScraper(ScraperBase):
Expand All @@ -16,6 +18,13 @@ class BaseCouncillorScraper(ScraperBase):
def __init__(self, options, console):
super().__init__(options, console)
self.councillors = set()
if self.options.get("aws_lambda"):
self.codecommit_client = boto3.client("codecommit")
self.put_files = []
self.today = datetime.now().strftime("%Y-%m-%d")
self._branch_head = ""
self.batch = 1
self.new_data = True

@abc.abstractmethod
def get_councillors(self):
Expand All @@ -36,11 +45,222 @@ def get_tags(self):

def run(self):

if self.options.get("aws_lambda"):
self.delete_data_if_exists()

for councillor_html in self.get_councillors():
councillor = self.get_single_councillor(councillor_html)
self.save_councillor(councillor_html, councillor)
self.process_councillor(councillor, councillor_html)

self.aws_tidy_up()

self.report()

def delete_data_if_exists(self):
self.console.log("Deleting existing data...")
head = self.branch_head
delete_commit = self.delete_existing(head)
if head == delete_commit:
self.console.log("...no data to delete.")
else:
self.branch_head = delete_commit
self.console.log("...data deleted.")

def aws_tidy_up(self):
if self.options.get("aws_lambda"):
# Check if there's anything left to commit...
if self.put_files:
self.process_batch()

# check for differences
try:
differences_response = self.codecommit_client.get_differences(
repositoryName="CouncillorsRepo",
afterCommitSpecifier=self.branch,
beforeCommitSpecifier="main",
afterPath=self.options["council"],
beforePath=self.options["council"],
MaxResults=400,
)
except self.codecommit_client.exceptions.PathDoesNotExistException:
differences_response = {"differences": True}

if differences_response["differences"]:
# squash and merge
self.attempt_merge()
else:
self.new_data = False
self.console.log("No new councillor data found.")

self.delete_branch()

def process_councillor(self, councillor, councillor_raw_str):
if self.options.get("aws_lambda"):
# stage...
self.stage_councillor(councillor_raw_str, councillor)

# Do a batch commit if needed...
if len(self.put_files) > 90:
self.process_batch()
else:
self.save_councillor(councillor_raw_str, councillor)

def process_batch(self):
self.console.log(
f"Committing batch {self.batch} consisting of {len(self.put_files)} files"
)
message = f"{self.options['council']} - councillors batch {self.batch} - scraped on {self.today}"
commit_info = self.commit(self.put_files, message)
self.branch_head = commit_info["commitId"]
self.batch += 1
self.put_files = []

def attempt_merge(self):
self.console.log("Attempting to create merge commit...")
merge_info = self.codecommit_client.merge_branches_by_squash(
repositoryName="CouncillorsRepo",
sourceCommitSpecifier=self.branch,
destinationCommitSpecifier="main",
commitMessage=f"{self.options['council']} - councillors - scraped on {self.today}",
)
self.console.log(
f"{self.branch} squashed and merged into main at {merge_info['commitId']}"
)

def delete_branch(self):
delete_info = self.codecommit_client.delete_branch(
repositoryName="CouncillorsRepo", branchName=self.branch
)
if delete_info["deletedBranch"]:
self.console.log(f'deleted {delete_info["deletedBranch"]["branchName"]}')

def stage_councillor(self, councillor_html, councillor):
council = self.options["council"]
json_file_path = f"{council}/json/{councillor.as_file_name()}.json"
raw_file_path = f"{council}/raw/{councillor.as_file_name()}.html"
self.put_files.extend(
[
{
"filePath": json_file_path,
"fileContent": bytes(
json.dumps(councillor.as_dict(), indent=4), "utf-8"
),
},
{
"filePath": raw_file_path,
"fileContent": bytes(councillor_html.prettify(), "utf-8"),
},
]
)

def commit(self, put_files, message):
commit_info = self.codecommit_client.create_commit(
repositoryName="CouncillorsRepo",
branchName=self.branch,
parentCommitId=self.branch_head,
commitMessage=message,
putFiles=put_files,
)
return commit_info

@property
def branch_head(self):
"""returns today's branch's HEAD commit hash"""
if not self._branch_head:
try:
branch_info = self.codecommit_client.get_branch(
repositoryName="CouncillorsRepo", branchName=self.branch
)
self._branch_head = branch_info["branch"]["commitId"]
except self.codecommit_client.exceptions.BranchDoesNotExistException:
self._branch_head = self.create_branch(self.branch)

return self._branch_head

@branch_head.setter
def branch_head(self, commit_id):
self._branch_head = commit_id

@property
def branch(self):
"""returns today's branch name"""
return f"{self.options['council']}-{self.today}"

def create_branch(self, branch_name):
"""
`$ git checkout -b branch_name main`
...or create a branch with HEAD pointing at main
returns commit hash of HEAD
"""
main_info = self.codecommit_client.get_branch(
repositoryName="CouncillorsRepo", branchName="main"
)
commit_id = main_info["branch"]["commitId"]

self.codecommit_client.create_branch(
repositoryName="CouncillorsRepo", branchName=branch_name, commitId=commit_id
)

return commit_id

def get_files(self, folder_path):
subfolder_paths = []
file_paths = []
try:
self.console.log(f"Getting all files in {folder_path}...")
folder = self.codecommit_client.get_folder(
repositoryName="CouncillorsRepo",
commitSpecifier=self.branch,
folderPath=folder_path,
)
for subfolder in folder["subFolders"]:
subfolder_paths.append(subfolder["absolutePath"])
for file in folder["files"]:
file_paths.append(file["absolutePath"])

for subfolder_path in subfolder_paths:
sf_paths, f_paths = self.get_files(subfolder_path)
subfolder_paths.extend(sf_paths)
file_paths.extend(f_paths)

self.console.log(f"...found {len(file_paths)} files in {folder_path}")
return subfolder_paths, file_paths

except self.codecommit_client.exceptions.FolderDoesNotExistException:
self.console.log(f"{folder_path} Does not exist")
return subfolder_paths, file_paths

def delete_files(self, delete_files, commit_id, batch):
message = f"Deleting batch no. {batch} consisting of {len(delete_files)} files"
self.console.log(message)

delete_commit = self.codecommit_client.create_commit(
repositoryName="CouncillorsRepo",
branchName=self.branch,
parentCommitId=commit_id,
commitMessage=message,
deleteFiles=delete_files,
)
return delete_commit

def delete_existing(self, commit_id):
_, file_paths = self.get_files(f"{self.options['council']}")
batch = 1
while len(file_paths) >= 100:
delete_files = [{"filePath": fp} for fp in file_paths[:100]]
delete_commit = self.delete_files(delete_files, commit_id, batch)
batch += 1
file_paths = file_paths[100:]
commit_id = delete_commit["commitId"]

if file_paths:
delete_files = [{"filePath": fp} for fp in file_paths]
delete_commit = self.delete_files(delete_files, commit_id, batch)
return delete_commit["commitId"]
else:
return commit_id

def save_councillor(self, raw_content, councillor_obj):
assert (
type(councillor_obj) == CouncillorBase
Expand All @@ -55,7 +275,14 @@ def report(self):
raise ValueError(
"Not many councillors found ({})".format(len(self.councillors))
)
self.console.log("Found {} councillors".format(len(self.councillors)))
if self.new_data:
self.console.log(
f"Found {len(self.councillors)} councillors with some new data"
)
else:
self.console.log(
f"Found {len(self.councillors)} councillors but no new data"
)


class HTMLCouncillorScraper(BaseCouncillorScraper):
Expand Down Expand Up @@ -110,11 +337,16 @@ class ModGovCouncillorScraper(BaseCouncillorScraper):
ext = "xml"

def run(self):

if self.options.get("aws_lambda"):
self.delete_data_if_exists()
wards = self.get_councillors()
for ward in wards:
for councillor_xml in ward.find_all("councillor"):
councillor = self.get_single_councillor(ward, councillor_xml)
self.save_councillor(councillor_xml, councillor)
self.process_councillor(councillor, councillor_xml)

self.aws_tidy_up()
self.report()

def format_councillor_api_url(self):
Expand Down
2 changes: 1 addition & 1 deletion lgsf/scrapers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def get(self, url, verify=True):
"""

if self.options.get("verbose"):
self.console.log(url)
self.console.log(f"Scraping from {url}")
headers = {"User-Agent": "Scraper/DemocracyClub", "Accept": "*/*"}

return requests.get(url, headers=headers, verify=verify)
Expand Down
Loading

0 comments on commit a625739

Please sign in to comment.