diff --git a/lgsf/aws_lambda/handlers.py b/lgsf/aws_lambda/handlers.py index 8b3f522e..952a25ee 100644 --- a/lgsf/aws_lambda/handlers.py +++ b/lgsf/aws_lambda/handlers.py @@ -10,7 +10,7 @@ def scraper_worker_handler(event, context): - print(event) + print(f"EVENT: {event}") message = json.loads(event["Records"][0]["body"]) council = message["council"] command_name = message["scraper_type"] @@ -18,58 +18,11 @@ def scraper_worker_handler(event, context): options = {"council": council, "verbose": True, "aws_lambda": True} console = Console(file=sys.stdout) scraper = scraper_cls(options, console) - # scraper.run() - - codecommit_client = boto3.client("codecommit") - - main_info = codecommit_client.get_branch( - repositoryName="CouncillorsRepo", branchName="main" - ) - - now = datetime.now().strftime("%Y-%m-%d") - branch_name = f"{council}-{now}" - - codecommit_client.create_branch( - repositoryName="CouncillorsRepo", - branchName=branch_name, - commitId=main_info["branch"]["commitId"], - ) - - put_files = [] - - wards = scraper.get_councillors() - for ward in wards: - for councillor_xml in ward.find_all("councillor"): - councillor = scraper.get_single_councillor(ward, councillor_xml) - file_path = f"{council}/{councillor.as_file_name()}.json" - put_files.append( - { - "filePath": file_path, - "fileContent": bytes( - json.dumps(councillor.as_dict(), indent=4), "utf-8" - ), - } - ) - - commit_info = codecommit_client.create_commit( - repositoryName="CouncillorsRepo", - branchName=branch_name, - parentCommitId=main_info["branch"]["commitId"], - commitMessage=f"Scraped on {now}", - putFiles=put_files, - ) - try: - merge_info = codecommit_client.merge_branches_by_fast_forward( - repositoryName="CouncillorsRepo", - sourceCommitSpecifier=branch_name, - destinationCommitSpecifier="main", - ) - codecommit_client.delete_branch( - repositoryName="CouncillorsRepo", branchName=branch_name - ) - except codecommit_client.exceptions.ManualMergeRequiredException as e: - print(e) + scraper.run() + except Exception as e: + scraper.console.log(e) + scraper.delete_branch() def queue_builder_handler(event, context): diff --git a/lgsf/commands/base.py b/lgsf/commands/base.py index f1f9a601..d6427518 100644 --- a/lgsf/commands/base.py +++ b/lgsf/commands/base.py @@ -21,7 +21,6 @@ def __init__(self, argv, stdout, pretty=False): self.console = Console(file=self.stdout) self.pretty = pretty - def create_parser(self): self.parser = argparse.ArgumentParser() if hasattr(self, "add_arguments"): @@ -72,7 +71,7 @@ def create_parser(self): self.parser.add_argument( "--exclude-missing", action="store_true", - help="Don't run councils missing a scraper matching command name" + help="Don't run councils missing a scraper matching command name", ) self.parser.add_argument( "-t", @@ -203,7 +202,7 @@ def councils_to_run(self): councils.append(council) if self.options["exclude_missing"]: - councils = list(set(councils) - set(c['code'] for c in self.missing())) + councils = list(set(councils) - set(c["code"] for c in self.missing())) return councils def run_councils(self): diff --git a/lgsf/councillors/scrapers.py b/lgsf/councillors/scrapers.py index 6f833849..0842c25a 100644 --- a/lgsf/councillors/scrapers.py +++ b/lgsf/councillors/scrapers.py @@ -1,10 +1,12 @@ import abc +from datetime import datetime +import boto3 from bs4 import BeautifulSoup from dateutil.parser import parse from lgsf.scrapers import ScraperBase -from lgsf.councillors import CouncillorBase +from lgsf.councillors import CouncillorBase, json class BaseCouncillorScraper(ScraperBase): @@ -16,6 +18,13 @@ class BaseCouncillorScraper(ScraperBase): def __init__(self, options, console): super().__init__(options, console) self.councillors = set() + if self.options.get("aws_lambda"): + self.codecommit_client = boto3.client("codecommit") + self.put_files = [] + self.today = datetime.now().strftime("%Y-%m-%d") + self._branch_head = "" + self.batch = 1 + self.new_data = True @abc.abstractmethod def get_councillors(self): @@ -36,11 +45,222 @@ def get_tags(self): def run(self): + if self.options.get("aws_lambda"): + self.delete_data_if_exists() + for councillor_html in self.get_councillors(): councillor = self.get_single_councillor(councillor_html) - self.save_councillor(councillor_html, councillor) + self.process_councillor(councillor, councillor_html) + + self.aws_tidy_up() + self.report() + def delete_data_if_exists(self): + self.console.log("Deleting existing data...") + head = self.branch_head + delete_commit = self.delete_existing(head) + if head == delete_commit: + self.console.log("...no data to delete.") + else: + self.branch_head = delete_commit + self.console.log("...data deleted.") + + def aws_tidy_up(self): + if self.options.get("aws_lambda"): + # Check if there's anything left to commit... + if self.put_files: + self.process_batch() + + # check for differences + try: + differences_response = self.codecommit_client.get_differences( + repositoryName="CouncillorsRepo", + afterCommitSpecifier=self.branch, + beforeCommitSpecifier="main", + afterPath=self.options["council"], + beforePath=self.options["council"], + MaxResults=400, + ) + except self.codecommit_client.exceptions.PathDoesNotExistException: + differences_response = {"differences": True} + + if differences_response["differences"]: + # squash and merge + self.attempt_merge() + else: + self.new_data = False + self.console.log("No new councillor data found.") + + self.delete_branch() + + def process_councillor(self, councillor, councillor_raw_str): + if self.options.get("aws_lambda"): + # stage... + self.stage_councillor(councillor_raw_str, councillor) + + # Do a batch commit if needed... + if len(self.put_files) > 90: + self.process_batch() + else: + self.save_councillor(councillor_raw_str, councillor) + + def process_batch(self): + self.console.log( + f"Committing batch {self.batch} consisting of {len(self.put_files)} files" + ) + message = f"{self.options['council']} - councillors batch {self.batch} - scraped on {self.today}" + commit_info = self.commit(self.put_files, message) + self.branch_head = commit_info["commitId"] + self.batch += 1 + self.put_files = [] + + def attempt_merge(self): + self.console.log("Attempting to create merge commit...") + merge_info = self.codecommit_client.merge_branches_by_squash( + repositoryName="CouncillorsRepo", + sourceCommitSpecifier=self.branch, + destinationCommitSpecifier="main", + commitMessage=f"{self.options['council']} - councillors - scraped on {self.today}", + ) + self.console.log( + f"{self.branch} squashed and merged into main at {merge_info['commitId']}" + ) + + def delete_branch(self): + delete_info = self.codecommit_client.delete_branch( + repositoryName="CouncillorsRepo", branchName=self.branch + ) + if delete_info["deletedBranch"]: + self.console.log(f'deleted {delete_info["deletedBranch"]["branchName"]}') + + def stage_councillor(self, councillor_html, councillor): + council = self.options["council"] + json_file_path = f"{council}/json/{councillor.as_file_name()}.json" + raw_file_path = f"{council}/raw/{councillor.as_file_name()}.html" + self.put_files.extend( + [ + { + "filePath": json_file_path, + "fileContent": bytes( + json.dumps(councillor.as_dict(), indent=4), "utf-8" + ), + }, + { + "filePath": raw_file_path, + "fileContent": bytes(councillor_html.prettify(), "utf-8"), + }, + ] + ) + + def commit(self, put_files, message): + commit_info = self.codecommit_client.create_commit( + repositoryName="CouncillorsRepo", + branchName=self.branch, + parentCommitId=self.branch_head, + commitMessage=message, + putFiles=put_files, + ) + return commit_info + + @property + def branch_head(self): + """returns today's branch's HEAD commit hash""" + if not self._branch_head: + try: + branch_info = self.codecommit_client.get_branch( + repositoryName="CouncillorsRepo", branchName=self.branch + ) + self._branch_head = branch_info["branch"]["commitId"] + except self.codecommit_client.exceptions.BranchDoesNotExistException: + self._branch_head = self.create_branch(self.branch) + + return self._branch_head + + @branch_head.setter + def branch_head(self, commit_id): + self._branch_head = commit_id + + @property + def branch(self): + """returns today's branch name""" + return f"{self.options['council']}-{self.today}" + + def create_branch(self, branch_name): + """ + `$ git checkout -b branch_name main` + ...or create a branch with HEAD pointing at main + + returns commit hash of HEAD + """ + main_info = self.codecommit_client.get_branch( + repositoryName="CouncillorsRepo", branchName="main" + ) + commit_id = main_info["branch"]["commitId"] + + self.codecommit_client.create_branch( + repositoryName="CouncillorsRepo", branchName=branch_name, commitId=commit_id + ) + + return commit_id + + def get_files(self, folder_path): + subfolder_paths = [] + file_paths = [] + try: + self.console.log(f"Getting all files in {folder_path}...") + folder = self.codecommit_client.get_folder( + repositoryName="CouncillorsRepo", + commitSpecifier=self.branch, + folderPath=folder_path, + ) + for subfolder in folder["subFolders"]: + subfolder_paths.append(subfolder["absolutePath"]) + for file in folder["files"]: + file_paths.append(file["absolutePath"]) + + for subfolder_path in subfolder_paths: + sf_paths, f_paths = self.get_files(subfolder_path) + subfolder_paths.extend(sf_paths) + file_paths.extend(f_paths) + + self.console.log(f"...found {len(file_paths)} files in {folder_path}") + return subfolder_paths, file_paths + + except self.codecommit_client.exceptions.FolderDoesNotExistException: + self.console.log(f"{folder_path} Does not exist") + return subfolder_paths, file_paths + + def delete_files(self, delete_files, commit_id, batch): + message = f"Deleting batch no. {batch} consisting of {len(delete_files)} files" + self.console.log(message) + + delete_commit = self.codecommit_client.create_commit( + repositoryName="CouncillorsRepo", + branchName=self.branch, + parentCommitId=commit_id, + commitMessage=message, + deleteFiles=delete_files, + ) + return delete_commit + + def delete_existing(self, commit_id): + _, file_paths = self.get_files(f"{self.options['council']}") + batch = 1 + while len(file_paths) >= 100: + delete_files = [{"filePath": fp} for fp in file_paths[:100]] + delete_commit = self.delete_files(delete_files, commit_id, batch) + batch += 1 + file_paths = file_paths[100:] + commit_id = delete_commit["commitId"] + + if file_paths: + delete_files = [{"filePath": fp} for fp in file_paths] + delete_commit = self.delete_files(delete_files, commit_id, batch) + return delete_commit["commitId"] + else: + return commit_id + def save_councillor(self, raw_content, councillor_obj): assert ( type(councillor_obj) == CouncillorBase @@ -55,7 +275,14 @@ def report(self): raise ValueError( "Not many councillors found ({})".format(len(self.councillors)) ) - self.console.log("Found {} councillors".format(len(self.councillors))) + if self.new_data: + self.console.log( + f"Found {len(self.councillors)} councillors with some new data" + ) + else: + self.console.log( + f"Found {len(self.councillors)} councillors but no new data" + ) class HTMLCouncillorScraper(BaseCouncillorScraper): @@ -110,11 +337,16 @@ class ModGovCouncillorScraper(BaseCouncillorScraper): ext = "xml" def run(self): + + if self.options.get("aws_lambda"): + self.delete_data_if_exists() wards = self.get_councillors() for ward in wards: for councillor_xml in ward.find_all("councillor"): councillor = self.get_single_councillor(ward, councillor_xml) - self.save_councillor(councillor_xml, councillor) + self.process_councillor(councillor, councillor_xml) + + self.aws_tidy_up() self.report() def format_councillor_api_url(self): diff --git a/lgsf/scrapers/base.py b/lgsf/scrapers/base.py index d88de97b..474ec5dc 100644 --- a/lgsf/scrapers/base.py +++ b/lgsf/scrapers/base.py @@ -33,7 +33,7 @@ def get(self, url, verify=True): """ if self.options.get("verbose"): - self.console.log(url) + self.console.log(f"Scraping from {url}") headers = {"User-Agent": "Scraper/DemocracyClub", "Accept": "*/*"} return requests.get(url, headers=headers, verify=verify) diff --git a/sam-template.yaml b/sam-template.yaml index a4d74503..b3763b1b 100644 --- a/sam-template.yaml +++ b/sam-template.yaml @@ -8,7 +8,7 @@ Description: > # More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst Globals: Function: - Timeout: 10 + Timeout: 300 Resources: