From 67e386940fed64c0639f0a185d5cd2379cb5ee04 Mon Sep 17 00:00:00 2001 From: sarayourfriend Date: Fri, 6 Sep 2024 07:28:48 +1000 Subject: [PATCH] Remove retired DAGs from the codebase (#4867) * Remove retired DAGs * Remove or stabilise references to retired DAGs --- .../ISSUE_TEMPLATE/new_source_suggestion.yml | 1 - .pre-commit-config.yaml | 2 +- catalog/dags/.airflowignore | 1 - catalog/dags/retired/README.md | 6 - catalog/dags/retired/__init__.py | 0 catalog/dags/retired/cleaner_workflow.py | 72 ---- catalog/dags/retired/common/etl/__init__.py | 0 .../common/etl/bootstrap/config-py27.sh | 8 - .../common/etl/scripts/ExtractCCLinks.py | 379 ------------------ .../retired/common/etl/scripts/__init__.py | 0 .../dags/retired/common/ingestion_column.py | 47 --- catalog/dags/retired/common/loader/sql.py | 52 --- catalog/dags/retired/common/pg_cleaner.py | 251 ------------ catalog/dags/retired/common_api_workflows.py | 66 --- .../retired/commoncrawl/commoncrawl_etl.py | 245 ----------- .../SyncImageProviders.py | 102 ----- .../scripts/merge_cc_tags.py | 137 ------- .../retired/commoncrawl/commoncrawl_utils.py | 10 - .../commoncrawl/sync_commoncrawl_workflow.py | 74 ---- .../database/image_expiration_workflow.py | 42 -- .../dags/retired/database/loader_workflow.py | 196 --------- .../terminate_long_queries_workflow.py | 95 ----- .../fix_phylopic_foreign_identifier.py | 119 ------ .../provider_api_scripts/modules/__init__.py | 0 .../provider_api_scripts/modules/etlMods.py | 198 --------- .../provider_api_scripts/thingiverse.py | 268 ------------- .../providers/provider_api_scripts/walters.py | 193 --------- .../dags/retired/update_workflows/__init__.py | 0 .../europeana_sub_provider_update_workflow.py | 63 --- .../flickr_sub_provider_update_workflow.py | 63 --- ...mithsonian_sub_provider_update_workflow.py | 62 --- .../retired/update_workflows/update_sql.py | 270 ------------- .../media_props_gen/docs/media_props.md | 2 +- documentation/catalog/guides/quickstart.md | 5 +- documentation/catalog/reference/index.md | 1 - documentation/catalog/reference/retired.md | 39 -- .../meta/media_properties/catalog.md | 2 +- pyproject.toml | 1 - 38 files changed, 5 insertions(+), 3067 deletions(-) delete mode 100644 catalog/dags/retired/README.md delete mode 100644 catalog/dags/retired/__init__.py delete mode 100644 catalog/dags/retired/cleaner_workflow.py delete mode 100644 catalog/dags/retired/common/etl/__init__.py delete mode 100644 catalog/dags/retired/common/etl/bootstrap/config-py27.sh delete mode 100644 catalog/dags/retired/common/etl/scripts/ExtractCCLinks.py delete mode 100644 catalog/dags/retired/common/etl/scripts/__init__.py delete mode 100644 catalog/dags/retired/common/ingestion_column.py delete mode 100644 catalog/dags/retired/common/loader/sql.py delete mode 100644 catalog/dags/retired/common/pg_cleaner.py delete mode 100644 catalog/dags/retired/common_api_workflows.py delete mode 100644 catalog/dags/retired/commoncrawl/commoncrawl_etl.py delete mode 100644 catalog/dags/retired/commoncrawl/commoncrawl_scripts/commoncrawl_s3_syncer/SyncImageProviders.py delete mode 100644 catalog/dags/retired/commoncrawl/commoncrawl_scripts/scripts/merge_cc_tags.py delete mode 100644 catalog/dags/retired/commoncrawl/commoncrawl_utils.py delete mode 100644 catalog/dags/retired/commoncrawl/sync_commoncrawl_workflow.py delete mode 100644 catalog/dags/retired/database/image_expiration_workflow.py delete mode 100644 catalog/dags/retired/database/loader_workflow.py delete mode 100644 catalog/dags/retired/database/terminate_long_queries_workflow.py delete mode 100644 catalog/dags/retired/fix_phylopic_foreign_identifier.py delete mode 100644 catalog/dags/retired/providers/provider_api_scripts/modules/__init__.py delete mode 100644 catalog/dags/retired/providers/provider_api_scripts/modules/etlMods.py delete mode 100644 catalog/dags/retired/providers/provider_api_scripts/thingiverse.py delete mode 100644 catalog/dags/retired/providers/provider_api_scripts/walters.py delete mode 100644 catalog/dags/retired/update_workflows/__init__.py delete mode 100644 catalog/dags/retired/update_workflows/europeana_sub_provider_update_workflow.py delete mode 100644 catalog/dags/retired/update_workflows/flickr_sub_provider_update_workflow.py delete mode 100644 catalog/dags/retired/update_workflows/smithsonian_sub_provider_update_workflow.py delete mode 100644 catalog/dags/retired/update_workflows/update_sql.py delete mode 100644 documentation/catalog/reference/retired.md diff --git a/.github/ISSUE_TEMPLATE/new_source_suggestion.yml b/.github/ISSUE_TEMPLATE/new_source_suggestion.yml index 7eb2a87fa81..6ed55c3d017 100644 --- a/.github/ISSUE_TEMPLATE/new_source_suggestion.yml +++ b/.github/ISSUE_TEMPLATE/new_source_suggestion.yml @@ -68,7 +68,6 @@ body: - The script must use the ProviderDataIngester class in `catalog/dags/providers/provider_api_scripts/provider_data_ingester.py`. - The script must use the `ImageStore` or `AudioStore` class (Import this from `catalog/dags/provider_api_scripts/common/storage/image.py`). - The script should use the `DelayedRequester` class (Import this from `catalog/dags/provider_api_scripts/common/requester.py`). - - The script must not use anything from `catalog/dags/retired/providers/provider_api_scripts/modules/etlMods.py`, since that module is deprecated. - If the provider API has can be queried by 'upload date' or something similar, the script should take a `--date` parameter when run as a script, giving the date for which we should collect images. The form should be `YYYY-MM-DD` (so, the script can be run via `python my_favorite_provider.py --date 2018-01-01`). - The script must provide a main function that takes the same parameters as from the CLI. In our example from above, we'd then have a main function `my_favorite_provider.main(date)`. The main should do the same thing calling from the CLI would do. - The script *must* conform to [PEP8][pep8]. Please use `ov just lint` before committing. diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 582bb2dffb1..719d6e70682 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,4 +1,4 @@ -exclude: Pipfile\.lock|migrations|\.idea|node_modules|retired|\.snap +exclude: Pipfile\.lock|migrations|\.idea|node_modules|\.snap repos: - repo: local # More local hooks are defined at the bottom. diff --git a/catalog/dags/.airflowignore b/catalog/dags/.airflowignore index a4e1542ff45..e2177a844a1 100644 --- a/catalog/dags/.airflowignore +++ b/catalog/dags/.airflowignore @@ -1,4 +1,3 @@ # Ignore all non-DAG files common/ providers/provider_api_scripts -retired diff --git a/catalog/dags/retired/README.md b/catalog/dags/retired/README.md deleted file mode 100644 index 5563ff620e5..00000000000 --- a/catalog/dags/retired/README.md +++ /dev/null @@ -1,6 +0,0 @@ -# Retired DAGs - -This folder contains DAGs that were either one-offs or are no longer relevant to -current processes, but are useful to keep around for potential future use-cases. -DAGs in this folder are excluded from Airflow's DAG parsing and will not be -shown in the UI. diff --git a/catalog/dags/retired/__init__.py b/catalog/dags/retired/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/catalog/dags/retired/cleaner_workflow.py b/catalog/dags/retired/cleaner_workflow.py deleted file mode 100644 index bd378f1ce45..00000000000 --- a/catalog/dags/retired/cleaner_workflow.py +++ /dev/null @@ -1,72 +0,0 @@ -import logging -import os -from copy import deepcopy -from datetime import datetime - -from airflow import DAG -from airflow.operators.python import PythonOperator -from common import config -from retired.common import pg_cleaner - - -logging.basicConfig( - format="%(asctime)s - %(name)s - %(levelname)s: %(message)s", - level=logging.INFO, -) - -logger = logging.getLogger(__name__) - -DAG_ID = "postgres_image_cleaner" -DB_CONN_ID = os.getenv("OPENLEDGER_CONN_ID", "postgres_openledger_testing") -PREFIX_LENGTH = 1 -DESIRED_PREFIX_LENGTH = 3 -CONCURRENCY = 8 - - -def create_id_partitioned_cleaner_dag( - dag_id=DAG_ID, - prefix_length=PREFIX_LENGTH, - postgres_conn_id=DB_CONN_ID, - start_date=datetime(1970, 1, 1), - concurrency=CONCURRENCY, - default_args=config.DAG_DEFAULT_ARGS, -): - args = deepcopy(default_args) - args.update(start_date=start_date) - dag = DAG( - dag_id=dag_id, - default_args=args, - concurrency=concurrency, - max_active_runs=concurrency, - schedule_interval=None, - start_date=start_date, - catchup=False, - tags=["database"], - ) - hex_prefixes = pg_cleaner.hex_counter(prefix_length) - with dag: - [_get_pg_cleaner_operator(prefix, postgres_conn_id) for prefix in hex_prefixes] - - return dag - - -def _get_pg_cleaner_operator( - prefix, - postgres_conn_id, - desired_length=DESIRED_PREFIX_LENGTH, - delay=CONCURRENCY, -): - task_id = f"clean_{prefix}" - return PythonOperator( - task_id=task_id, - python_callable=pg_cleaner.clean_prefix_loop, - op_args=[postgres_conn_id, prefix], - op_kwargs={ - "desired_prefix_length": desired_length, - "delay_minutes": delay, - }, - depends_on_past=False, - ) - - -globals()[DAG_ID] = create_id_partitioned_cleaner_dag() diff --git a/catalog/dags/retired/common/etl/__init__.py b/catalog/dags/retired/common/etl/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/catalog/dags/retired/common/etl/bootstrap/config-py27.sh b/catalog/dags/retired/common/etl/bootstrap/config-py27.sh deleted file mode 100644 index c72e4e88ad3..00000000000 --- a/catalog/dags/retired/common/etl/bootstrap/config-py27.sh +++ /dev/null @@ -1,8 +0,0 @@ -#!/bin/bash - -sudo pip install \ - boto3 \ - warcio \ - ujson \ - requests \ - BeautifulSoup diff --git a/catalog/dags/retired/common/etl/scripts/ExtractCCLinks.py b/catalog/dags/retired/common/etl/scripts/ExtractCCLinks.py deleted file mode 100644 index d77822a4593..00000000000 --- a/catalog/dags/retired/common/etl/scripts/ExtractCCLinks.py +++ /dev/null @@ -1,379 +0,0 @@ -# flake8: noqa -# Common Crawl data extraction -"""Identify all links to Creative Commons in the web crawl data""" - -import gzip -import logging -import os -import re -import sys -from collections import Counter - -import boto3 -import botocore -import requests -import StringIO -import ujson as json -from pyspark import SparkContext -from pyspark.sql import SparkSession -from pyspark.sql.types import * -from urlparse import urlparse -from warcio.archiveiterator import ArchiveIterator - - -BUCKET = os.environ.get("COMMONCRAWL_BUCKET", "not_set") -COMMONSMAPPER_BUCKET = os.environ.get("COMMONSMAPPER_BUCKET", "not_set") - -class CCLinks: - def __init__(self, _index, _ptn=2500): - """ - CCLinks constructor: Validate the user-defined index based on Common Crawl's expected format. - If the pattern is valid, it generates 1) a url for the WAT path and 2) the location to output the results. - - Parameters - ------------------ - _index: string - The common crawl index name - - _ptn: integer - The number of partitions for the spark job - - Returns - ------------------ - None - - """ - - self.crawlIndex = _index - - # check index format - pattern = re.compile(r"CC-MAIN-\d{4}-\d{2}") - if not pattern.match(_index): - logging.error(f"Invalid common crawl index format => {_index}.") - sys.exit() - - self.numPartitions = _ptn - self.url = "https://{}.s3.amazonaws.com/crawl-data/{}/wat.paths.gz".format( - BUCKET, self.crawlIndex - ) - self.output = f"s3://{COMMONSMAPPER_BUCKET}/output/{self.crawlIndex}" - - def loadWATFile(self): - # load the WAT file paths - """ - Make a request for a WAT file using the url, that was defined in the constructor. - - Parameters - ------------------ - None - - Returns - ------------------ - list - A list of WAT path locations. - """ - logging.info(f"Loading file {self.url}") - - try: - response = requests.get(self.url) - - if response.status_code == requests.codes.ok: - content = StringIO.StringIO(response.content) - fh = gzip.GzipFile(fileobj=content) - watPaths = fh.read().split() - - return watPaths - else: - raise Exception - - except Exception as e: - logging.error("There was a problem loading the file.") - logging.error(f"{type(e).__name__}: {e}") - # sys.exit() - - def processFile(self, _iterator): - """ - Parse each WAT file to identify domains with a hyperlink to creativecommons.org. - - Parameters - ------------------ - _iterator: iterator object - The iterator for the RDD partition that was assigned to the current process. - - Returns - ------------------ - list - A list of domains and their respective content path and query string, the hyperlink to creative commons (which may reference a license), the location of the domain in the current warc file and a count of the number of links and images. - """ - - logging.basicConfig( - format="%(asctime)s: [%(levelname)s - ExtractCCLinks] =======> %(message)s", - level=logging.INFO, - ) - - # connect to s3 using boto3 - s3 = boto3.resource("s3") - # s3.meta.client.meta.events.register('choose-signer.s3.*', disable_signing) - - try: - # verify bucket - s3.meta.client.head_bucket(Bucket=BUCKET) - except botocore.exceptions.ClientError as e: - error = int(e.response["Error"]["Code"]) - - if error == 404: - logging.error("Bucket not found!") - sys.exit() - - else: - # iterate over the keys and load the respective wat files - for uri in _iterator: - - try: - # verify key - s3.Object(BUCKET, uri.strip()).load() - - except botocore.client.ClientError as e: - logging.warning( - "{}: {}.".format(uri.strip(), e.response["Error"]["Message"]) - ) - pass - - else: - try: - resp = requests.get( - "https://{}.s3.amazonaws.com/{}".format( - BUCKET, uri.strip() - ), - stream=True, - ) - except Exception as e: - # ConnectionError: HTTPSConnectionPool - logging.error( - "Exception type: {}, Message: {}".format( - type(e).__name__, e - ) - ) - pass - else: - - for record in ArchiveIterator(resp.raw, arc2warc=True): - if record.rec_headers["Content-Type"] == "application/json": - - try: - content = json.loads(record.content_stream().read()) - - except Exception as e: - logging.warning( - "JSON payload file: {}. Exception type: {}, Message: {}".format( - uri.strip(), type(e).__name__, e - ) - ) - pass - - else: - if ( - content["Envelope"]["WARC-Header-Metadata"][ - "WARC-Type" - ] - != "response" - ): - continue - elif ( - "HTML-Metadata" - not in content["Envelope"]["Payload-Metadata"][ - "HTTP-Response-Metadata" - ] - ): - continue - elif ( - "Links" - not in content["Envelope"]["Payload-Metadata"][ - "HTTP-Response-Metadata" - ]["HTML-Metadata"] - ): - continue - - try: - segment = uri.split("/wat/")[0].strip() - targetURI = urlparse( - content["Envelope"]["WARC-Header-Metadata"][ - "WARC-Target-URI" - ].strip() - ) - offset = int( - content["Container"]["Offset"].strip() - ) - filename = content["Container"][ - "Filename" - ].strip() - dftLength = int( - content["Container"]["Gzip-Metadata"][ - "Deflate-Length" - ].strip() - ) - - links = filter( - lambda x: "url" in x, - content["Envelope"]["Payload-Metadata"][ - "HTTP-Response-Metadata" - ]["HTML-Metadata"]["Links"], - ) - - result = map( - lambda x: ( - targetURI.netloc, - targetURI.path, - targetURI.query, - urlparse(x["url"]).netloc, - urlparse(x["url"]).path, - segment, - filename, - offset, - dftLength, - json.dumps( - { - "Images": len( - list( - set( - map( - lambda i: i[ - "url" - ], - filter( - lambda z: "IMG@/src" - in z[ - "path" - ], - links, - ), - ) - ) - ) - ), - "Links": Counter( - map( - lambda l: urlparse( - l["url"] - ).netloc, - filter( - lambda z: "A@/href" - in z["path"] - and targetURI.netloc - not in z["url"] - and urlparse( - z["url"] - ).netloc - != "", - links, - ), - ) - ), - } - ), - ), - filter( - lambda y: "creativecommons.org" - in y["url"], - links, - ), - ) - - except (KeyError, ValueError) as e: - logging.error( - "{}:{}, File:{}".format( - type(e).__name__, e, uri.strip() - ) - ) - pass - - else: - if result: - yield from result - - def generateParquet(self, _data): - """ - Create a parquet file with the extracted content. - - Parameters - ------------------ - _data: generator - A list containing the extracted domains and their associated meta-data. - - Returns - ------------------ - None - """ - - schema = StructType( - [ - StructField("provider_domain", StringType(), True), - StructField("content_path", StringType(), True), - StructField("content_query_string", StringType(), True), - StructField("cc_domain", StringType(), True), - StructField("cc_license", StringType(), True), - StructField("warc_segment", StringType(), True), - StructField("warc_filename", StringType(), True), - StructField("content_offset", LongType(), True), - StructField("deflate_length", LongType(), True), - StructField("html_metadata", StringType(), True), - ] - ) - - spk = SparkSession.builder.getOrCreate() - df = spk.createDataFrame(_data, schema=schema) - df.write.format("parquet").mode("overwrite").save(self.output) - - -def main(): - args = sys.argv[1] - crawlIndex = args.strip() - - if crawlIndex.lower() == "--default": - s3 = boto3.client("s3") - - # verify bucket - contents = [] - prefix = "cc-index/collections/CC-MAIN-" - botoArgs = {"Bucket": BUCKET, "Prefix": prefix} - - while True: - - objects = s3.list_objects_v2(**botoArgs) - - for obj in objects["Contents"]: - key = obj["Key"] - - if "indexes" in key: - cIndex = key.split("/indexes/")[0].split("/") - cIndex = cIndex[len(cIndex) - 1] - - if str(cIndex) not in contents: - contents.append(str(cIndex)) - - try: - botoArgs["ContinuationToken"] = objects["NextContinuationToken"] - except KeyError: - break - - if contents: - crawlIndex = contents[-1] - - sc = SparkContext(appName="ExtractCCLinks") - - ccLinks = CCLinks(crawlIndex.upper(), sc.defaultParallelism) - watPaths = ccLinks.loadWATFile() - - if watPaths is None: - sc.stop() - sys.exit() - - watRDD = sc.parallelize(watPaths, ccLinks.numPartitions) - result = watRDD.mapPartitions(ccLinks.processFile) - - ccLinks.generateParquet(result) - sc.stop() - - -if __name__ == "__main__": - main() diff --git a/catalog/dags/retired/common/etl/scripts/__init__.py b/catalog/dags/retired/common/etl/scripts/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/catalog/dags/retired/common/ingestion_column.py b/catalog/dags/retired/common/ingestion_column.py deleted file mode 100644 index 96599f6972f..00000000000 --- a/catalog/dags/retired/common/ingestion_column.py +++ /dev/null @@ -1,47 +0,0 @@ -""" -This module has a method necessary to add an -ingestion_type column to TSV files before uploading them to S3 or -PostgreSQL. This is used for legacy TSV versions which did not have -ingestion_type column -""" -import logging -import os - - -logger = logging.getLogger(__name__) - - -def _fix_ingestion_column(filepath: str) -> None: - """The oldest TSV files have no `ingestion_type` column""" - - logger.info(f"File to fix: {filepath}") - with open(filepath) as f: - test_line = f.readline().strip() - columns = test_line.split("\t") - column_count = len(columns) - old_cols_number = 17 - new_cols_number = old_cols_number + 1 - if column_count == new_cols_number: - return - elif column_count != old_cols_number: - logger.warning( - f"Wrong number of columns ({column_count}) " - f"for a legacy TSV file. Cannot fix the file..." - ) - raise TypeError(f"Wrong number of columns ({column_count}) for TSV") - source = columns[-1] - ingestion_type = source if source == "commoncrawl" else "provider_api" - logger.info(f"Adding ingestion type {ingestion_type} to {filepath}") - temp_tsv = filepath + ".new" - with open(filepath) as old_tsv, open(temp_tsv, "w") as new_tsv: - old_line = old_tsv.readline().strip() - while old_line: - if ingestion_type == "commoncrawl": - line_list = [word.strip() for word in old_line.split("\t")] - new_tsv.write("\t".join(line_list[:-1] + line_list[-2:]) + "\n") - else: - new_tsv.write(old_line + "\t" + ingestion_type + "\n") - old_line = old_tsv.readline().strip() - - os.rename(filepath, filepath + ".old") - os.rename(temp_tsv, filepath) diff --git a/catalog/dags/retired/common/loader/sql.py b/catalog/dags/retired/common/loader/sql.py deleted file mode 100644 index a89fe5f6822..00000000000 --- a/catalog/dags/retired/common/loader/sql.py +++ /dev/null @@ -1,52 +0,0 @@ -def expire_old_images( - postgres_conn_id, - provider, - image_table=TABLE_NAMES[IMAGE], - task: AbstractOperator = None, -): - postgres = PostgresHook( - postgres_conn_id=postgres_conn_id, - default_statement_timeout=PostgresHook.get_execution_timeout(task), - ) - - if provider not in OLDEST_PER_PROVIDER: - raise Exception( - f"Provider value {provider} not defined in the " - f"OLDEST_PER_PROVIDER dictionary" - ) - - """ - Select all records that are outdated - """ - select_query = dedent( - f""" - SELECT {col.FOREIGN_ID.db_name} - FROM {image_table} - WHERE - {col.PROVIDER.db_name} = '{provider}' - AND - {col.UPDATED_ON.db_name} < {NOW} - INTERVAL '{OLDEST_PER_PROVIDER[provider]}'; - """ - ) - - selected_records = postgres.get_records(select_query) - - """ - Set the 'removed_from_source' value of each selected row to True to - indicate that those images are outdated - """ - for row in selected_records: - foreign_id = row[0] - - postgres.run( - dedent( - f""" - UPDATE {image_table} - SET {col.REMOVED.db_name} = 't' - WHERE - {image_table}.{col.PROVIDER.db_name} = '{provider}' - AND - MD5({image_table}.{col.FOREIGN_ID.db_name}) = MD5('{foreign_id}'); - """ - ) - ) diff --git a/catalog/dags/retired/common/pg_cleaner.py b/catalog/dags/retired/common/pg_cleaner.py deleted file mode 100644 index 1a01c864870..00000000000 --- a/catalog/dags/retired/common/pg_cleaner.py +++ /dev/null @@ -1,251 +0,0 @@ -""" -This file provides the pieces to perform an after-the-fact processing -of all data in the image table of the upstream DB through the ImageStore -class. -""" -import logging -import os -import time -from collections import namedtuple -from pathlib import Path -from textwrap import dedent - -from airflow.providers.postgres.hooks.postgres import PostgresHook -from common import tsv_cleaner, urls -from common.constants import IMAGE -from common.loader.sql import TABLE_NAMES -from common.storage import columns as column -from common.storage import image - - -logger = logging.getLogger(__name__) -logging.getLogger(image.__name__).setLevel(logging.WARNING) -logging.getLogger(urls.__name__).setLevel(logging.WARNING) - -MAX_DIR_SIZE = 8 * 1024**3 -OUTPUT_DIR = os.path.realpath(os.getenv("OUTPUT_DIR", "/tmp/")) -OVERWRITE_DIR = "overwrite/" -OUTPUT_PATH = os.path.join(OUTPUT_DIR, OVERWRITE_DIR) -DELAY_MINUTES = 1 - -IMAGE_TABLE_COLS = [ - # These are not precisely the same names as in the DB. - "identifier", - "created_on", - "updated_on", - "ingestion_type", - "provider", - "source", - "foreign_identifier", - "foreign_landing_url", - "image_url", - "thumbnail_url", - "width", - "height", - "filesize", - "license_", - "license_version", - "creator", - "creator_url", - "title", - "meta_data", - "tags", - "watermarked", - "last_synced", - "removed", -] - -ImageTableRow = namedtuple("ImageTableRow", IMAGE_TABLE_COLS) - - -class ImageStoreDict(dict): - def __missing__(self, key): - ret = self[key] = self._init_image_store(key) - return ret - - def _init_image_store( - self, - key, - output_path=OUTPUT_PATH, - ): - os.makedirs(output_path, exist_ok=True) - return image.ImageStore( - provider=key[0], - output_file=f"cleaned_{key[1]}.tsv", - output_dir=output_path, - ) - - -class CleaningException(Exception): - pass - - -def clean_prefix_loop( - postgres_conn_id, - prefix, - desired_prefix_length=4, - delay_minutes=DELAY_MINUTES, -): - failure = False - if len(prefix) >= desired_prefix_length: - try: - clean_rows(postgres_conn_id, prefix) - except Exception as e: - failure = True - logger.error(f"Failed to clean rows with prefix {prefix}") - logger.error(f"Exception was {e}") - else: - interfix_length = desired_prefix_length - len(prefix) - for i in hex_counter(interfix_length): - start_time = time.time() - try: - clean_rows(postgres_conn_id, prefix + i) - except Exception as e: - failure = True - logger.error(f"Failed to clean rows with prefix {prefix}") - logger.error(f"Exception was {e}") - total_time = time.time() - start_time - logger.info(f"Total time: {total_time} seconds") - _wait_for_space() - if failure: - raise CleaningException() - - -def clean_rows(postgres_conn_id, prefix): - """ - This function runs all rows from the image table whose identifier - starts with the given prefix through the ImageStore class, and - updates them with the result. - """ - image_store_dict = ImageStoreDict() - selected_rows = _select_records(postgres_conn_id, prefix) - total_rows = len(selected_rows) - logger.info(f"Processing {total_rows} rows from prefix {prefix}.") - for record in selected_rows: - try: - _clean_single_row(record, image_store_dict, prefix) - except Exception as e: - logger.warning(f"Record {record} could not be cleaned!") - logger.warning(f"Error cleaning was: {e}") - - for image_store in image_store_dict.values(): - image_store.commit() - - _log_and_check_totals(total_rows, image_store_dict) - - -def _wait_for_space( - min_polling_frequency=5, - max_polling_frequency=120, - delay_step=5, - max_dir_size=MAX_DIR_SIZE, - output_path=OUTPUT_PATH, -): - delay = max_polling_frequency - check_dir = Path(output_path) - total_wait_time = 0 - logger.info(f"Waiting for space in {output_path}") - while True: - du = sum(f.stat().st_size for f in check_dir.glob("**/*") if f.is_file()) - if du < max_dir_size: - break - else: - logger.info( - f"{output_path} holds {du / 1024 ** 2} MB," - f" but max is {max_dir_size / 1024 ** 2} MB." - f" Waiting for {delay} seconds" - ) - time.sleep(delay) - total_wait_time += delay - delay = max(delay - delay_step, min_polling_frequency) - logger.info(f"Total wait time: {total_wait_time} seconds") - - -def hex_counter(length): - max_string = "f" * length - format_string = f"0{length}x" - for h in range(int(max_string, 16) + 1): - yield format(h, format_string) - - -def _select_records(postgres_conn_id, prefix, image_table=TABLE_NAMES[IMAGE]): - postgres = PostgresHook(postgres_conn_id=postgres_conn_id) - min_base_uuid = "00000000-0000-0000-0000-000000000000" - max_base_uuid = "ffffffff-ffff-ffff-ffff-ffffffffffff" - min_uuid = prefix + min_base_uuid[len(prefix) :] - max_uuid = prefix + max_base_uuid[len(prefix) :] - select_query = dedent( - f""" - SELECT - {column.IDENTIFIER.db_name}, {column.CREATED_ON.db_name}, - {column.UPDATED_ON.db_name}, {column.INGESTION_TYPE.db_name}, - {column.PROVIDER.db_name}, {column.SOURCE.db_name}, - {column.FOREIGN_ID.db_name}, {column.LANDING_URL.db_name}, - {column.DIRECT_URL.db_name}, {column.THUMBNAIL.db_name}, - {column.WIDTH.db_name}, {column.HEIGHT.db_name}, - {column.FILESIZE.db_name}, {column.LICENSE.db_name}, - {column.LICENSE_VERSION.db_name}, {column.CREATOR.db_name}, - {column.CREATOR_URL.db_name}, {column.TITLE.db_name}, - {column.META_DATA.db_name}, {column.TAGS.db_name}, - {column.WATERMARKED.db_name}, {column.LAST_SYNCED.db_name}, - {column.REMOVED.db_name} - FROM {image_table} - WHERE - {column.IDENTIFIER.db_name}>='{min_uuid}'::uuid - AND - {column.IDENTIFIER.db_name}<='{max_uuid}'::uuid; - """ - ) - return postgres.get_records(select_query) - - -def _clean_single_row(record, image_store_dict, prefix): - dirty_row = ImageTableRow(*record) - image_store = image_store_dict[(dirty_row.provider, prefix)] - total_images_before = image_store.total_items - license_lower = dirty_row.license_.lower() if dirty_row.license_ else None - tags_list = [t for t in dirty_row.tags if t] if dirty_row.tags else None - image_store.add_item( - foreign_landing_url=dirty_row.foreign_landing_url, - image_url=dirty_row.image_url, - thumbnail_url=dirty_row.thumbnail_url, - license_url=tsv_cleaner.get_license_url(dirty_row.meta_data), - license_=license_lower, - license_version=dirty_row.license_version, - foreign_identifier=dirty_row.foreign_identifier, - width=dirty_row.width, - height=dirty_row.height, - creator=dirty_row.creator, - creator_url=dirty_row.creator_url, - title=dirty_row.title, - meta_data=dirty_row.meta_data, - raw_tags=tags_list, - watermarked=dirty_row.watermarked, - source=dirty_row.source, - ) - if not image_store.total_items - total_images_before == 1: - logger.warning(f"Record {dirty_row} was not stored!") - _save_failure_identifier(dirty_row.identifier) - - -def _save_failure_identifier(identifier, output_path=OUTPUT_PATH): - failure_dir = os.path.join(output_path, "cleaning_failures") - failure_file = f"fails_{int(time.time()) // 100 * 100}.txt" - os.makedirs(failure_dir, exist_ok=True) - failure_full_path = os.path.join(failure_dir, failure_file) - with open(failure_full_path, "a") as f: - f.write(f"{identifier}\n") - - -def _log_and_check_totals(total_rows, image_store_dict): - image_totals = {k: v.total_items for k, v in image_store_dict.items()} - total_images_sum = sum(image_totals.values()) - logger.info(f"Total images cleaned: {total_images_sum}") - logger.info(f"Image Totals breakdown: {image_totals}") - try: - assert total_images_sum == total_rows - except Exception as e: - logger.warning("total_images_sum NOT EQUAL TO total_rows!") - logger.warning(f"total_images_sum: {total_images_sum}") - logger.warning(f"total_rows: {total_rows}") - raise e diff --git a/catalog/dags/retired/common_api_workflows.py b/catalog/dags/retired/common_api_workflows.py deleted file mode 100644 index 08277ce2297..00000000000 --- a/catalog/dags/retired/common_api_workflows.py +++ /dev/null @@ -1,66 +0,0 @@ -import logging -import os - -import common.config as conf -from airflow import DAG -from airflow.operators.bash import BashOperator -from croniter import croniter - - -logging.basicConfig( - format="%(asctime)s: [%(levelname)s - DAG Loader] %(message)s", level=logging.INFO -) - -CRONTAB_STR = conf.CRONTAB_STR -SCRIPT = conf.SCRIPT -DAG_DEFAULT_ARGS = conf.DAG_DEFAULT_ARGS -DAG_VARIABLES = conf.DAG_VARIABLES - - -def load_dag_conf(source, DAG_VARIABLES): - """Validate and load configuration variables""" - logging.info(f"Loading configuration for {source}") - logging.debug(f"DAG_VARIABLES: {DAG_VARIABLES}") - dag_id = f"{source}_workflow" - - script_location = DAG_VARIABLES[source].get(SCRIPT) - try: - assert os.path.exists(script_location) - except Exception as e: - logging.warning(f"Invalid script location: {script_location}. Error: {e}") - script_location = None - - crontab_str = DAG_VARIABLES[source].get(CRONTAB_STR) - try: - croniter(crontab_str) - except Exception as e: - logging.warning(f"Invalid crontab string: {crontab_str}. Error: {e}") - crontab_str = None - - return script_location, dag_id, crontab_str - - -def create_dag( - source, script_location, dag_id, crontab_str=None, default_args=DAG_DEFAULT_ARGS -): - - dag = DAG( - dag_id=dag_id, - default_args=default_args, - schedule_interval=crontab_str, - catchup=False, - ) - - with dag: - BashOperator( - task_id=f"get_{source}_images", - bash_command=f"python {script_location} --mode default", - ) - - return dag - - -for source in DAG_VARIABLES: - script_location, dag_id, crontab_str = load_dag_conf(source, DAG_VARIABLES) - if script_location: - globals()[dag_id] = create_dag(source, script_location, dag_id, crontab_str) diff --git a/catalog/dags/retired/commoncrawl/commoncrawl_etl.py b/catalog/dags/retired/commoncrawl/commoncrawl_etl.py deleted file mode 100644 index 6c328956959..00000000000 --- a/catalog/dags/retired/commoncrawl/commoncrawl_etl.py +++ /dev/null @@ -1,245 +0,0 @@ -import os -from datetime import datetime, timedelta - -from airflow import DAG -from airflow.operators.python import PythonOperator -from airflow.providers.amazon.aws.operators.emr import ( - EmrCreateJobFlowOperator, - EmrTerminateJobFlowOperator, -) -from airflow.providers.amazon.aws.sensors.emr import EmrJobFlowSensor -from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor -from airflow.utils.trigger_rule import TriggerRule -from common.constants import DAG_DEFAULT_ARGS -from commoncrawl.commoncrawl_utils import get_load_s3_task_id, load_file_to_s3 - - -FILE_DIR = os.path.abspath(os.path.dirname(__file__)) -CORE_INSTANCE_COUNT = 100 -AWS_CONN_ID = os.getenv("AWS_CONN_ID", "aws_test") -EMR_CONN_ID = os.getenv("EMR_CONN_ID", "emr_test") -COMMONCRAWL_BUCKET = os.environ.get("COMMONCRAWL_BUCKET", "not_set") -BUCKET_V2 = "ov-commonsmapper" -CONFIG_SH_KEY = "bootstrap/config-py27.sh" -CONFIG_SH = f"s3://{BUCKET_V2}/{CONFIG_SH_KEY}" -EXTRACT_SCRIPT_S3_KEY = "scripts/ExtractCCLinks.py" -EXTRACT_SCRIPT_S3 = f"s3://{BUCKET_V2}/{EXTRACT_SCRIPT_S3_KEY}" -LOCAL_FILES_DIR = os.path.join(FILE_DIR, "util", "etl") -CONFIG_SH_LOCAL = os.path.join(LOCAL_FILES_DIR, "bootstrap", "config-py27.sh") -EXTRACT_SCRIPT_LOCAL = os.path.join( - LOCAL_FILES_DIR, "dags/commoncrawl_scripts/scripts", "ExtractCCLinks.py" -) -LOG_URI = f"s3://{BUCKET_V2}/logs/airflow_pipeline" -RAW_PROCESS_JOB_FLOW_NAME = "common_crawl_etl_job_flow" - -CC_INDEX_TEMPLATE = "CC-MAIN-{{ execution_date.strftime('%Y-%V') }}" -JOB_FLOW_OVERRIDES = { - "Applications": [{"Name": "hive"}, {"Name": "spark"}, {"Name": "pig"}], - "BootstrapActions": [ - { - "Name": "pip_install_deps", - "ScriptBootstrapAction": {"Path": CONFIG_SH}, - } - ], - "Configurations": [ - { - "Classification": "spark-defaults", - "Properties": { - "spark.executor.memory": "12G", - "spark.driver.memory": "12G", - "spark.driver.cores": "8", - "spark.default.parallelism": "4784", - "spark.executor.heartbeatInterval": "10000s", - "spark.network.timeout": "12000s", - "spark.executor.cores": "8", - "spark.executor.instances": "300", - "spark.dynamicAllocation.enabled": "false", - }, - }, - { - "Classification": "emrfs-site", - "Properties": {"fs.s3.enableServerSideEncryption": "true"}, - }, - ], - "Instances": { - "Ec2KeyName": "cc-catalog", - "Ec2SubnetIds": [ - "subnet-8ffebeb0", - # "subnet-9210d39d", - # "subnet-99d0dcd2", - # "subnet-9a7145fe", - # "subnet-cf2d5692", - # "subnet-d52562fa", - ], - "EmrManagedSlaveSecurityGroup": "sg-0a7b0a7d", - "EmrManagedMasterSecurityGroup": "sg-226d1c55", - "InstanceGroups": [ - { - "BidPrice": "0.75", - "EbsConfiguration": { - "EbsBlockDeviceConfigs": [ - { - "VolumeSpecification": { - "SizeInGB": 32, - "VolumeType": "gp2", - }, - "VolumesPerInstance": 1, - } - ] - }, - "InstanceCount": CORE_INSTANCE_COUNT, - "InstanceRole": "CORE", - "InstanceType": "c4.8xlarge", - "Market": "SPOT", - "Name": "common_crawl_etl_job_flow_core", - }, - { - "EbsConfiguration": { - "EbsBlockDeviceConfigs": [ - { - "VolumeSpecification": { - "SizeInGB": 32, - "VolumeType": "gp2", - }, - "VolumesPerInstance": 1, - } - ] - }, - "InstanceCount": 1, - "InstanceRole": "MASTER", - "InstanceType": "m4.xlarge", - "Market": "ON_DEMAND", - "Name": "common_crawl_etl_job_flow_master", - }, - ], - "KeepJobFlowAliveWhenNoSteps": False, - "TerminationProtected": False, - }, - "JobFlowRole": "DataPipelineDefaultResourceRole", - "LogUri": LOG_URI, - "Name": RAW_PROCESS_JOB_FLOW_NAME, - "ReleaseLabel": "emr-5.11.0", - "ScaleDownBehavior": "TERMINATE_AT_TASK_COMPLETION", - "ServiceRole": "DataPipelineDefaultRole", - "Steps": [ - { - "ActionOnFailure": "CONTINUE", - "HadoopJarStep": { - "Args": [ - "spark-submit", - "--deploy-mode", - "cluster", - "--master", - "yarn", - EXTRACT_SCRIPT_S3, - # This was "--default" previously but a task within the DAG - # modified it on DAG parse time to be this value. - CC_INDEX_TEMPLATE, - ], - "Jar": "command-runner.jar", - }, - "Name": "extract_cc_links", - } - ], - "Tags": [ - {"Key": "cc:environment", "Value": "production"}, - { - "Key": "cc:purpose", - "Value": "Find links to CC licensed content in Common Crawl.", - }, - {"Key": "cc:product", "Value": "cccatalog"}, - {"Key": "cc:team", "Value": "cc-search"}, - {"Key": "Name", "Value": "Common Crawl Data Pipeline"}, - ], - "VisibleToAllUsers": True, -} - -dag = DAG( - dag_id="commoncrawl_etl_workflow", - default_args={ - **DAG_DEFAULT_ARGS, - "retry_delay": timedelta(minutes=60), - "execution_timeout": None, - }, - start_date=datetime(1970, 1, 1), - schedule="0 0 * * 1", - max_active_tasks=1, - catchup=False, - tags=["commoncrawl"], -) - -with dag: - check_for_cc_index = S3KeySensor( - task_id="check_for_cc_index", - retries=0, - aws_conn_id=AWS_CONN_ID, - bucket_name=COMMONCRAWL_BUCKET, - bucket_key=f"crawl-data/{CC_INDEX_TEMPLATE}*", - wildcard_match=True, - poke_interval=60, - timeout=60 * 60 * 24 * 3, - soft_fail=True, - mode="reschedule", - ) - - check_for_wat_file = S3KeySensor( - task_id="check_for_wat_file", - retries=0, - aws_conn_id=AWS_CONN_ID, - bucket_name=COMMONCRAWL_BUCKET, - bucket_key=f"crawl-data/{CC_INDEX_TEMPLATE}/wat.paths.gz", - poke_interval=60, - timeout=60 * 60 * 24 * 3, - soft_fail=True, - mode="reschedule", - ) - - cluster_bootstrap_loader = PythonOperator( - task_id=get_load_s3_task_id(CONFIG_SH_KEY), - python_callable=load_file_to_s3, - op_args=[CONFIG_SH_LOCAL, CONFIG_SH_KEY, BUCKET_V2, AWS_CONN_ID], - ) - - extract_script_loader = PythonOperator( - task_id=get_load_s3_task_id(EXTRACT_SCRIPT_S3_KEY), - python_callable=load_file_to_s3, - op_args=[ - EXTRACT_SCRIPT_LOCAL, - EXTRACT_SCRIPT_S3_KEY, - BUCKET_V2, - AWS_CONN_ID, - ], - ) - - job_flow_creator = EmrCreateJobFlowOperator( - task_id=f"create_{RAW_PROCESS_JOB_FLOW_NAME}", - job_flow_overrides=JOB_FLOW_OVERRIDES, - aws_conn_id=AWS_CONN_ID, - emr_conn_id=EMR_CONN_ID, - ) - - job_sensor = EmrJobFlowSensor( - task_id=f"check_{RAW_PROCESS_JOB_FLOW_NAME}", - timeout=60 * 60 * 7, - mode="reschedule", - retries=0, - job_flow_id=job_flow_creator.task_id, - aws_conn_id=AWS_CONN_ID, - ) - - job_flow_terminator = EmrTerminateJobFlowOperator( - task_id=f"terminate_{RAW_PROCESS_JOB_FLOW_NAME}", - job_flow_id=job_flow_creator.task_id, - aws_conn_id=AWS_CONN_ID, - trigger_rule=TriggerRule.ALL_DONE, - ) - - ( - check_for_cc_index - >> check_for_wat_file - >> [extract_script_loader, cluster_bootstrap_loader] - >> job_flow_creator - >> job_sensor - >> job_flow_terminator - ) - [job_flow_creator, job_sensor, job_flow_terminator] diff --git a/catalog/dags/retired/commoncrawl/commoncrawl_scripts/commoncrawl_s3_syncer/SyncImageProviders.py b/catalog/dags/retired/commoncrawl/commoncrawl_scripts/commoncrawl_s3_syncer/SyncImageProviders.py deleted file mode 100644 index 645c8a87fe2..00000000000 --- a/catalog/dags/retired/commoncrawl/commoncrawl_scripts/commoncrawl_s3_syncer/SyncImageProviders.py +++ /dev/null @@ -1,102 +0,0 @@ -# flake8: noqa -import argparse -import os -import re - -import boto3 -from botocore import UNSIGNED -from botocore.client import Config - - -BUCKET = os.environ["S3_BUCKET"] -COMMONCRAWL_BUCKET = os.environ.get("COMMONCRAWL_BUCKET", "not_set") -PATH = os.environ["OUTPUT_DIR"] -ACCESS_KEY = os.environ["AWS_ACCESS_KEY"] -SECRET_KEY = os.environ["AWS_SECRET_KEY"] - - -def getCrawlIndex(_param): - - if not _param: # get the most recent index from common crawl - s3 = boto3.client("s3", config=Config(signature_version=UNSIGNED)) - - # verify bucket - contents = [] - prefix = "cc-index/collections/CC-MAIN-" - botoArgs = {"Bucket": COMMONCRAWL_BUCKET, "Prefix": prefix} - - while True: - - objects = s3.list_objects_v2(**botoArgs) - - for obj in objects["Contents"]: - key = obj["Key"] - - if "indexes" in key: - cIndex = key.split("/indexes/")[0].split("/") - cIndex = cIndex[len(cIndex) - 1] - - if str(cIndex) not in contents: - contents.append(str(cIndex)) - - try: - botoArgs["ContinuationToken"] = objects["NextContinuationToken"] - except KeyError: - break - - if contents: - _param = contents[-1] - - return _param - - -def validateIndexPattern(_index, _pattern=re.compile(r"CC-MAIN-\d{4}-\d{2}")): - if not _pattern.match(_index): - logging.error(f"Invalid common crawl index format => {_index}.") - raise argparse.ArgumentTypeError - return _index - - -def syncS3Objects(_index): - s3 = boto3.client( - "s3", aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY - ) - botoArgs = {"Bucket": BUCKET, "Prefix": f"common_crawl_image_data/{_index}"} - - objects = s3.list_objects_v2(**botoArgs) - - for obj in objects.get("Contents", []): - key = obj["Key"] - - if "_SUCCESS" not in key: - fileName = key.lstrip("common_crawl_image_data/").replace("/", "_") - fileName = f"{PATH}{fileName}" - fileName = fileName.replace(".csv", ".tsv") - - with open(fileName, "wb") as fh: - s3.download_fileobj(BUCKET, key, fh) - - # check if the file exists locally before removing it from the s3 bucket - if os.path.exists(fileName) and os.path.getsize(fileName) > 0: - s3.delete_object(Bucket=BUCKET, Key=key) - logging.info(f"Deleted object: {key}") - else: - s3.delete_object(Bucket=BUCKET, Key=key) - - -def main(): - - parser = argparse.ArgumentParser( - description="Sync Common Crawl Image Providers", add_help=True - ) - parser.add_argument("--index", type=validateIndexPattern) - args = parser.parse_args() - - ccIndex = getCrawlIndex(args.index) - syncS3Objects(ccIndex) - - logging.info("Terminated!") - - -if __name__ == "__main__": - main() diff --git a/catalog/dags/retired/commoncrawl/commoncrawl_scripts/scripts/merge_cc_tags.py b/catalog/dags/retired/commoncrawl/commoncrawl_scripts/scripts/merge_cc_tags.py deleted file mode 100644 index 274d88642c9..00000000000 --- a/catalog/dags/retired/commoncrawl/commoncrawl_scripts/scripts/merge_cc_tags.py +++ /dev/null @@ -1,137 +0,0 @@ -""" -Script to merge the tags and metadata column from Common crawl data -to the new provider API data in image table. - -Execution : python merge_cc_tags.py -c {cc_table_name} -a {api_table} - - eg : python merge_cc_tags.py -c science_museum_2020_06_02 -a image - -""" -import argparse -import logging -import os -from textwrap import dedent - -import psycopg2 - - -logging.basicConfig( - format="%(asctime)s - %(name)s - %(levelname)s: %(message)s", level=logging.INFO -) -logger = logging.getLogger(__name__) - -COL_IMAGE = "url" -COL_TAGS = "tags" -COL_PROVIDER = "provider" -COL_METADATA = "meta_data" -CONNECTION_ID = os.getenv("AIRFLOW_CONN_POSTGRES_OPENLEDGER_TESTING") - - -def _strip_url_schema(url): - return f""" - ( - CASE - WHEN {url} SIMILAR TO 'https://%' THEN LTRIM({url}, 'https://') - WHEN {url} SIMILAR TO 'http://%' THEN LTRIM({url}, 'http://') - ELSE {url} - END - ) - """ - - -def _modify_urls(url=None, provider_table=None): - if "museums_victoria" in provider_table: - sub_query = f""" - SPLIT_PART({_strip_url_schema(url)}, '-', 1) - """ - - elif "science_museum" in provider_table: - sub_query = f""" - RTRIM( - SPLIT_PART( - REVERSE( - {_strip_url_schema(url)} - ), '/', 1), 'medium|large') - """ - - elif "met" in provider_table: - sub_query = f""" - SPLIT_PART( - REVERSE( - {_strip_url_schema(url)} - ) - , '/', 1) - """ - return sub_query - - -def _merge_jsonb_objects(column): - """ - This function returns SQL that merges the top-level keys of the - a JSONB column, taking the newest available non-null value. - """ - return f""" - {column} = COALESCE( - jsonb_strip_nulls(a.{column}) - || jsonb_strip_nulls(b.{column}), - a.{column}, - b.{column} - ) - """ - - -def _merge_jsonb_arrays(column): - return f"""{column} = COALESCE( - ( - SELECT jsonb_agg(DISTINCT x) - FROM jsonb_array_elements(a.{column} || b.{column}) t(x) - ), - a.{column}, - b.{column} - )""" - - -def _merge_tags(cc_table=None, api_table=None): - try: - status = "success" - db = psycopg2.connect(CONNECTION_ID) - cursor = db.cursor() - query = dedent( - f""" - UPDATE {api_table} a - SET - {_merge_jsonb_arrays(COL_TAGS)}, - {_merge_jsonb_objects(COL_METADATA)} - FROM {cc_table} b - WHERE - a.{COL_PROVIDER} = b.{COL_PROVIDER} - AND - {_modify_urls('a.'+COL_IMAGE, - cc_table)} = {_modify_urls('b.'+COL_IMAGE, - cc_table)} - """ - ) - cursor.execute(query) - db.commit() - except Exception as e: - logger.warning(f"Merging failed due to {e}") - status = "Failure" - return status - - -def main(cc_table, api_table): - logger.info(f"Merging Common crawl tags from {cc_table} to {api_table}") - status = _merge_tags(cc_table=cc_table, api_table=api_table) - logger.info(f"Status: {status}") - - -if __name__ == "__main__": - parse = argparse.ArgumentParser() - parse.add_argument( - "-c", "--CC_table", required=True, help="Select Common crawl table" - ) - parse.add_argument( - "-a", "--API_table", required=True, help="Select table with new API data" - ) - args = parse.parse_args() - main(cc_table=args.CC_table, api_table=args.API_table) diff --git a/catalog/dags/retired/commoncrawl/commoncrawl_utils.py b/catalog/dags/retired/commoncrawl/commoncrawl_utils.py deleted file mode 100644 index e1e6952e876..00000000000 --- a/catalog/dags/retired/commoncrawl/commoncrawl_utils.py +++ /dev/null @@ -1,10 +0,0 @@ -from airflow.providers.amazon.aws.hooks.s3 import S3Hook - - -def load_file_to_s3(local_file, remote_key, bucket, aws_conn_id): - s3 = S3Hook(aws_conn_id=aws_conn_id) - s3.load_file(local_file, remote_key, replace=True, bucket_name=bucket) - - -def get_load_s3_task_id(s3_key: str) -> str: - return f"load_{s3_key.replace('/', '_').replace('.', '_')}_to_s3" diff --git a/catalog/dags/retired/commoncrawl/sync_commoncrawl_workflow.py b/catalog/dags/retired/commoncrawl/sync_commoncrawl_workflow.py deleted file mode 100644 index 9a493999fb1..00000000000 --- a/catalog/dags/retired/commoncrawl/sync_commoncrawl_workflow.py +++ /dev/null @@ -1,74 +0,0 @@ -import os -from datetime import timedelta - -from airflow import DAG -from airflow.operators.bash import BashOperator -from airflow.operators.python import PythonOperator -from common.constants import DAG_DEFAULT_ARGS -from common.tsv_cleaner import clean_tsv_directory - - -airflowHome = os.environ["AIRFLOW_HOME"] - -DAG_ID = "sync_commoncrawl_workflow" - -DEFAULT_OUTPUT_DIR = "/tmp" -TSV_SUBDIR = "common_crawl_tsvs/" - -CRAWL_OUTPUT_DIR = os.path.join( - os.environ.get("OUTPUT_DIR", DEFAULT_OUTPUT_DIR), TSV_SUBDIR -) - - -def _empty_tsv_dir(tsv_directory): - for tsv in os.listdir(tsv_directory): - os.remove(os.path.join(tsv_directory, tsv)) - - -dag = DAG( - dag_id=DAG_ID, - default_args={ - **DAG_DEFAULT_ARGS, - "retries": 3, - "retry_delay": timedelta(days=1), - "execution_timeout": None, - }, - schedule="0 16 15 * *", - catchup=False, - tags=["commoncrawl"], -) - -with dag: - create_dir_task = PythonOperator( - task_id="create_tsv_directory", - python_callable=os.makedirs, - op_args=[CRAWL_OUTPUT_DIR], - op_kwargs={"exist_ok": True}, - depends_on_past=False, - ) - sync_tsvs_task = BashOperator( - task_id="sync_commoncrawl_workflow", - bash_command=( - f"python {airflowHome}/dags/commoncrawl_s3_syncer/SyncImageProviders.py" - ), - env={ - "S3_BUCKET": os.environ["S3_BUCKET"], - "OUTPUT_DIR": CRAWL_OUTPUT_DIR, - "AWS_ACCESS_KEY": os.environ["AWS_ACCESS_KEY"], - "AWS_SECRET_KEY": os.environ["AWS_SECRET_KEY"], - }, - ) - clean_tsvs_task = PythonOperator( - task_id="clean_commoncrawl_tsvs", - python_callable=clean_tsv_directory, - op_args=[CRAWL_OUTPUT_DIR], - depends_on_past=False, - ) - empty_dir_task = PythonOperator( - task_id="empty_tsv_directory", - python_callable=_empty_tsv_dir, - op_args=[CRAWL_OUTPUT_DIR], - depends_on_past=False, - ) - - (create_dir_task >> sync_tsvs_task >> clean_tsvs_task >> empty_dir_task) diff --git a/catalog/dags/retired/database/image_expiration_workflow.py b/catalog/dags/retired/database/image_expiration_workflow.py deleted file mode 100644 index 07040cb34fb..00000000000 --- a/catalog/dags/retired/database/image_expiration_workflow.py +++ /dev/null @@ -1,42 +0,0 @@ -""" -# Image expiration - -This file configures the Apache Airflow DAG to expire the outdated images -in the image table by setting the removed_from_source column value to true -""" - -from datetime import timedelta - -from airflow import DAG -from airflow.operators.python import PythonOperator - -from common.constants import DAG_DEFAULT_ARGS, POSTGRES_CONN_ID -from common.loader import sql - - -DAG_ID = "image_expiration_workflow" -MAX_ACTIVE_TASKS = len(sql.OLDEST_PER_PROVIDER) - -dag = DAG( - dag_id=DAG_ID, - default_args={ - **DAG_DEFAULT_ARGS, - "retry_delay": timedelta(seconds=15), - "execution_timeout": timedelta( - seconds=0 - ), # Let these tasks run with no timeout - }, - max_active_tasks=MAX_ACTIVE_TASKS, - max_active_runs=MAX_ACTIVE_TASKS, - catchup=False, - schedule=None, - tags=["database"], -) - -with dag: - for provider in sql.OLDEST_PER_PROVIDER: - PythonOperator( - task_id=f"expire_outdated_images_of_{provider}", - python_callable=sql.expire_old_images, - op_args=[POSTGRES_CONN_ID, provider], - ) diff --git a/catalog/dags/retired/database/loader_workflow.py b/catalog/dags/retired/database/loader_workflow.py deleted file mode 100644 index 059cd78723e..00000000000 --- a/catalog/dags/retired/database/loader_workflow.py +++ /dev/null @@ -1,196 +0,0 @@ -"""\ -#### Database Loader DAG -**DB Loader Apache Airflow DAG** (directed acyclic graph) takes the media data saved -locally in TSV files, cleans it using an intermediate database table, and saves -the cleaned-up data into the main database (also called upstream or Openledger). - -In production,"locally" means on AWS EC2 instance that runs the Apache Airflow -webserver. Storing too much data there is dangerous, because if ingestion to the -database breaks down, the disk of this server gets full, and breaks all -Apache Airflow operations. - -As a first step, the DB Loader Apache Airflow DAG saves the data gathered by -Provider API Scripts to S3 before attempting to load it to PostgreSQL, and delete - it from disk if saving to S3 succeeds, even if loading to PostgreSQL fails. - -This way, we can delete data from the EC2 instance to open up disk space without - the possibility of losing that data altogether. This will allow us to recover if - we lose data from the DB somehow, because it will all be living in S3. -It's also a prerequisite to the long-term plan of saving data only to S3 -(since saving it to the EC2 disk is a source of concern in the first place). - -This is one step along the path to avoiding saving data on the local disk at all. -It should also be faster to load into the DB from S3, since AWS RDS instances -provide special optimized functionality to load data from S3 into tables in the DB. - -Loading the data into the Database is a two-step process: first, data is saved -to the intermediate table. Any items that don't have the required fields -(media url, license, foreign landing url and foreign id), and duplicates as -determined by combination of provider and foreign_id are deleted. -Then the data from the intermediate table is upserted into the main database. -If the same item is already present in the database, we update its information -with newest (non-null) data, and merge any metadata or tags objects to preserve all -previously downloaded data, and update any data that needs updating -(eg. popularity metrics). - -You can find more background information on the loading process in the following -issues and related PRs: - -- [[Feature] More sophisticated merging of columns in PostgreSQL when upserting]( -https://github.com/creativecommons/cccatalog/issues/378) - -- [DB Loader DAG should write to S3 as well as PostgreSQL]( -https://github.com/creativecommons/cccatalog/issues/333) - -- [DB Loader should take data from S3, rather than EC2 to load into PostgreSQL]( -https://github.com/creativecommons/cccatalog/issues/334) - -""" -import os -from datetime import timedelta -from textwrap import dedent as d - -from airflow import DAG -from airflow.operators.python import PythonOperator -from airflow.utils.trigger_rule import TriggerRule -from common.constants import DAG_DEFAULT_ARGS, POSTGRES_CONN_ID -from common.loader import loader, paths, sql - - -DAG_ID = "tsv_to_postgres_loader" -AWS_CONN_ID = os.getenv("AWS_CONN_ID", "no_aws_conn_id") -OPENVERSE_BUCKET = os.getenv("OPENVERSE_BUCKET") -MINIMUM_FILE_AGE_MINUTES = int(os.getenv("LOADER_FILE_AGE", 15)) -MAX_ACTIVE_TASKS = 5 -TIMESTAMP_TEMPLATE = "{{ ts_nodash }}" - -OUTPUT_DIR_PATH = os.path.realpath(os.getenv("OUTPUT_DIR", "/tmp/")) - - -dag = DAG( - dag_id=DAG_ID, - default_args={ - **DAG_DEFAULT_ARGS, - "retry_delay": timedelta(seconds=15), - }, - max_active_tasks=MAX_ACTIVE_TASKS, - max_active_runs=1, - schedule=None, - catchup=False, - tags=["database"], - doc_md=__doc__, -) - -with dag: - stage_oldest_tsv_file = PythonOperator( - task_id="stage_oldest_tsv_file", - python_callable=paths.stage_oldest_tsv_file, - op_kwargs={ - "output_dir": OUTPUT_DIR_PATH, - "identifier": TIMESTAMP_TEMPLATE, - "minimum_file_age_minutes": MINIMUM_FILE_AGE_MINUTES, - }, - doc_md=d( - f""" - Find the oldest TSV in the output directory ({OUTPUT_DIR_PATH}) that hasn't been - modified in {MINIMUM_FILE_AGE_MINUTES} minutes and transfer it to the staging - directory. If no files are found matching this criteria, the DAG is skipped. - """ - ), - ) - - media_type_xcom = f"{{{{ ti.xcom_pull(task_ids='{stage_oldest_tsv_file.task_id}', key='media_type') }}}}" # noqa: E501 - - create_loading_table = PythonOperator( - task_id="create_loading_table", - python_callable=sql.create_loading_table, - op_kwargs={ - "postgres_conn_id": POSTGRES_CONN_ID, - "identifier": TIMESTAMP_TEMPLATE, - "media_type": media_type_xcom, - }, - doc_md="Create a temporary loading table for ingesting data from a TSV.", - ) - copy_to_s3 = PythonOperator( - task_id="copy_to_s3", - python_callable=loader.copy_to_s3, - op_kwargs={ - "output_dir": OUTPUT_DIR_PATH, - "bucket": OPENVERSE_BUCKET, - "aws_conn_id": AWS_CONN_ID, - "identifier": TIMESTAMP_TEMPLATE, - }, - doc_md=d( - f""" - Copy the TSV from the local output directory ({OUTPUT_DIR_PATH}) into the S3 - bucket ({OPENVERSE_BUCKET}) for direct loading into the database. - """ - ), - ) - load_s3_data = PythonOperator( - task_id="load_s3_data", - python_callable=loader.load_s3_data, - op_kwargs={ - "bucket": OPENVERSE_BUCKET, - "aws_conn_id": AWS_CONN_ID, - "postgres_conn_id": POSTGRES_CONN_ID, - "identifier": TIMESTAMP_TEMPLATE, - }, - doc_md="Load the TSV from S3 into the database.", - ) - delete_staged_file = PythonOperator( - task_id="delete_staged_file", - python_callable=paths.delete_staged_file, - op_kwargs={"output_dir": OUTPUT_DIR_PATH, "identifier": TIMESTAMP_TEMPLATE}, - trigger_rule=TriggerRule.ALL_SUCCESS, - doc_md=d( - """ - Delete the staged TSV. This step will only be run if the file was _both_ copied - into S3 and loaded into the database successfully. - """ - ), - ) - drop_loading_table = PythonOperator( - task_id="drop_loading_table", - python_callable=sql.drop_load_table, - op_kwargs={ - "postgres_conn_id": POSTGRES_CONN_ID, - "identifier": TIMESTAMP_TEMPLATE, - "media_type": media_type_xcom, - }, - trigger_rule=TriggerRule.NONE_SKIPPED, - doc_md=d( - """ - Drop the temporary loading table used to store the TSV records. This step will - always occur after table creation unless the DAG has been skipped in order to - ensure tables are cleaned up in all cases. - """ - ), - ) - move_staged_failures = PythonOperator( - task_id="move_staged_failures", - python_callable=paths.move_staged_files_to_failure_directory, - op_kwargs={ - "output_dir": OUTPUT_DIR_PATH, - "identifier": TIMESTAMP_TEMPLATE, - }, - trigger_rule=TriggerRule.ONE_FAILED, - doc_md=d( - """ - Move any staged files that have failed to upload into the failed directory. - This task will run if any of the critical loader tasks fail. This allows - operators to retry the loading once the problem has been addressed. - """ - ), - ) - - # If there is a TSV to load, copy it to S3 & create the loading table, - # then perform the load into Postgres from S3 - stage_oldest_tsv_file >> [copy_to_s3, create_loading_table] >> load_s3_data - # Should any of these tasks fail, move the TSV to the failed location - [copy_to_s3, create_loading_table, load_s3_data] >> move_staged_failures - # If any of the loading steps fail, we don't need the loading table anymore - [create_loading_table, load_s3_data] >> drop_loading_table - # Once the data is loaded successfully, delete the staged TSV locally - # (since it exists in S3) - load_s3_data >> delete_staged_file diff --git a/catalog/dags/retired/database/terminate_long_queries_workflow.py b/catalog/dags/retired/database/terminate_long_queries_workflow.py deleted file mode 100644 index 402232ad0b7..00000000000 --- a/catalog/dags/retired/database/terminate_long_queries_workflow.py +++ /dev/null @@ -1,95 +0,0 @@ -""" -# Terminate long-running queries - -This DAG runs every fifteen minutes and terminates long-running -queries of a specific query string in the API DB. -""" - -import logging -from textwrap import dedent - -from airflow import DAG -from airflow.exceptions import AirflowSkipException -from airflow.operators.python import PythonOperator - -from common.constants import ( - DAG_DEFAULT_ARGS, - OPENLEDGER_API_CONN_ID, - XCOM_PULL_TEMPLATE, -) -from common.slack import send_message -from common.sql import PostgresHook, RETURN_ROW_COUNT - - -logger = logging.getLogger(__name__) - - -DAG_ID = "terminate_long_queries" -MAX_ACTIVE = 1 - -QUERY_TO_TERMINATE = 'SELECT "image"."id", "image"."created_on", "image"."updated_on", "image"."identifier", "image"."foreign_identifier", "image"."title", "image"."foreign_landing_url", "image"."creator", "image"."creator_url", "image"."thumbnail", "image"."provider", "image"."url", "image"."filesize", "image"."filetype", "image"."watermarked", "image"."license", "image"."license_version", "image"."source", "image"."last_synced_with_source", "image"."removed_from_source", "image"."view_count", "image"."tags", "image"."tags_list", "image"."category", "image"."meta_data", "image"."width", "image"."height" FROM "image" ORDER BY "image"."created_on" DESC LIMIT 1000' # noqa: E501 - - -def _terminate_queries( - postgres_conn_id, - query_to_terminate, -): - postgres = PostgresHook( - postgres_conn_id=postgres_conn_id, - default_statement_timeout=60, # 1-minute timeout - ) - - terminate_query = dedent( - f""" - select pg_terminate_backend(pid) - from pg_stat_activity - where pid in ( - select pid from pg_stat_activity - where query = '{query_to_terminate}'); - """ - ) - - return postgres.run(terminate_query, handler=RETURN_ROW_COUNT) - - -def _report_terminated_query_count(dag_id: str, count: int): - if count == 0: - raise AirflowSkipException("No queries matching query string were found.") - - send_message( - f"{count} queries were terminated.", - dag_id=dag_id, - username="Terminate long queries.", - ) - - -dag = DAG( - dag_id=DAG_ID, - default_args=DAG_DEFAULT_ARGS, - max_active_tasks=MAX_ACTIVE, - max_active_runs=MAX_ACTIVE, - catchup=False, - schedule="*/7 * * * *", # Every 7 minutes - tags=["database"], - render_template_as_native_obj=True, -) - -with dag: - terminate_queries = PythonOperator( - task_id="terminate_queries", - python_callable=_terminate_queries, - op_args=[OPENLEDGER_API_CONN_ID, QUERY_TO_TERMINATE], - ) - - report_query_count = PythonOperator( - task_id="report_query_count", - python_callable=_report_terminated_query_count, - op_kwargs={ - "dag_id": DAG_ID, - "count": XCOM_PULL_TEMPLATE.format( - terminate_queries.task_id, "return_value" - ), - }, - ) - - terminate_queries >> report_query_count diff --git a/catalog/dags/retired/fix_phylopic_foreign_identifier.py b/catalog/dags/retired/fix_phylopic_foreign_identifier.py deleted file mode 100644 index 2a42884d0cf..00000000000 --- a/catalog/dags/retired/fix_phylopic_foreign_identifier.py +++ /dev/null @@ -1,119 +0,0 @@ -""" -One-time run DAG to fix the foreign identifier for PhyloPic images. - -In order to prevent broken links, we need to update the foreign identifier from using -the image URL to using the foreign image UUID. -""" - -import logging -import re -from datetime import timedelta -from textwrap import dedent - -from airflow.decorators import dag -from airflow.models.abstractoperator import AbstractOperator -from airflow.operators.python import PythonOperator -from psycopg2.errors import UniqueViolation - -from common.constants import DAG_DEFAULT_ARGS, POSTGRES_CONN_ID, XCOM_PULL_TEMPLATE -from common.slack import send_message -from common.sql import PostgresHook - - -logger = logging.getLogger(__name__) - -DAG_ID = "update_phylopic_foreign_identifier" - - -def update_foreign_identifiers(task: AbstractOperator) -> dict[str, int]: - pg = PostgresHook( - postgres_conn_id=POSTGRES_CONN_ID, - default_statement_timeout=PostgresHook.get_execution_timeout(task), - ) - phylopic_rows = pg.get_records( - "SELECT foreign_identifier, identifier FROM image WHERE provider = 'phylopic';" - ) - - uuid_pattern = ( - "[0-9a-f]{8}-[0-9a-f]{4}-[0-5][0-9a-f]{3}-[089ab][0-9a-f]{3}-[0-9a-f]{12}" - ) - prog = re.compile(uuid_pattern) - counter = { - "updated": 0, - "skipped": 0, - "failed_none": 0, - "failed_duplicate": 0, - "failed_other_reason": 0, - } - - for foreign_identifier, identifier in phylopic_rows: - if len(foreign_identifier) == 36: - # The foreign identifier is (most likely) already the UUID - counter["skipped"] += 1 - continue - - uuid = prog.search(foreign_identifier).group() - if uuid is None: - counter["failed_none"] += 1 - continue - - # Update the foreign identifier with the UUID - update_query = dedent( - f""" - UPDATE image SET foreign_identifier = '{uuid}', thumbnail = NULL - WHERE identifier = '{identifier}' - """ - ) - - try: - pg.run(update_query) - counter["updated"] += 1 - except UniqueViolation: - logger.warning( - f"Duplicate foreign identifier {uuid} for identifier {identifier}" - ) - counter["failed_duplicate"] += 1 - except Exception as e: - counter["failed_other_reason"] += 1 - logger.error( - f"Failed to update foreign identifier for identifier {identifier}" - f" with error: {e}" - ) - - return counter - - -def final_report(counter) -> None: - message = f"{DAG_ID} DAG run completed. Update statistics:\n{counter}." - send_message(message, dag_id=DAG_ID) - - -@dag( - dag_id=DAG_ID, - default_args={ - **DAG_DEFAULT_ARGS, - "retries": 0, - "execution_timeout": timedelta(hours=5), - }, - schedule=None, - catchup=False, - doc_md=__doc__, - tags=["data_normalization"], -) -def update_phylopic(): - update = PythonOperator( - task_id="update_foreign_identifiers", - python_callable=update_foreign_identifiers, - ) - report = PythonOperator( - task_id="final_report_on_foreign_identifiers", - python_callable=final_report, - op_kwargs={ - "counter": XCOM_PULL_TEMPLATE.format(update.task_id, "return_value") - }, - ) - - update >> report - - -update_phylopic() diff --git a/catalog/dags/retired/providers/provider_api_scripts/modules/__init__.py b/catalog/dags/retired/providers/provider_api_scripts/modules/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/catalog/dags/retired/providers/provider_api_scripts/modules/etlMods.py b/catalog/dags/retired/providers/provider_api_scripts/modules/etlMods.py deleted file mode 100644 index c1878953571..00000000000 --- a/catalog/dags/retired/providers/provider_api_scripts/modules/etlMods.py +++ /dev/null @@ -1,198 +0,0 @@ -import json -import logging -import os -import re -import time - -import requests - - -PATH = os.environ.get("OUTPUT_DIR", "/tmp") - - -def _sanitize_json_values(unknown_input, recursion_limit=100): - """ - This function recursively sanitizes the non-dict values of an input - dictionary in preparation for dumping to JSON string. - """ - input_type = type(unknown_input) - if input_type not in [dict, list] or recursion_limit <= 0: - return sanitizeString(unknown_input) - elif input_type == list: - return [ - _sanitize_json_values(item, recursion_limit=recursion_limit - 1) - for item in unknown_input - ] - else: - return { - key: _sanitize_json_values(val, recursion_limit=recursion_limit - 1) - for key, val in unknown_input.items() - } - - -def _prepare_output_string(unknown_input): - if not unknown_input: - return "\\N" - elif type(unknown_input) in [dict, list]: - return json.dumps(_sanitize_json_values(unknown_input)) - else: - return sanitizeString(unknown_input) - - -def _check_all_arguments_exist(**kwargs): - all_truthy = True - for arg in kwargs: - if not kwargs[arg]: - logging.warning(f"Missing {arg}") - all_truthy = False - return all_truthy - - -def create_tsv_list_row( - foreign_identifier=None, - foreign_landing_url=None, - image_url=None, - thumbnail=None, - width=None, - height=None, - filesize=None, - license_=None, - license_version=None, - creator=None, - creator_url=None, - title=None, - meta_data=None, - tags=None, - watermarked="f", - provider=None, - source=None, -): - - raw_output_list = [ - foreign_identifier, - foreign_landing_url, - image_url, - thumbnail, - width, - height, - filesize, - license_, - license_version, - creator, - creator_url, - title, - meta_data, - tags, - watermarked, - provider, - source, - ] - - if _check_all_arguments_exist( - foreign_landing_url=foreign_landing_url, - image_url=image_url, - license_=license_, - license_version=license_version, - ): - return [_prepare_output_string(item) for item in raw_output_list] - else: - return None - - -def writeToFile(_data, _name, output_dir=PATH): - outputFile = f"{output_dir}{_name}" - - if len(_data) < 1: - return None - - logging.info(f"Writing to file => {outputFile}") - - with open(outputFile, "a") as fh: - for line in _data: - if line: - fh.write("\t".join(line) + "\n") - - -def sanitizeString(_data): - if _data is None: - return "" - else: - _data = str(_data) - - _data = _data.strip() - _data = _data.replace('"', "'") - _data = re.sub(r"\n|\r", " ", _data) - # _data = re.escape(_data) - - backspaces = re.compile("\b+") - _data = backspaces.sub("", _data) - _data = _data.replace("\\", "\\\\") - - return re.sub(r"\s+", " ", _data) - - -def delayProcessing(_startTime, _maxDelay): - minDelay = 1.0 - - # subtract time elapsed from the requested delay - elapsed = float(time.time()) - float(_startTime) - delayInterval = round(_maxDelay - elapsed, 3) - waitTime = max(minDelay, delayInterval) # time delay between requests. - - logging.info(f"Time delay: {waitTime} second(s)") - time.sleep(waitTime) - - -def requestContent(_url, _headers=None): - # TODO: pass the request headers and params in a dictionary - - logging.info(f"Processing request: {_url}") - - try: - response = requests.get(_url, headers=_headers) - - if response.status_code == requests.codes.ok: - return response.json() - else: - logging.warning( - f"Unable to request URL: {_url} Status code:" f"{response.status_code}" - ) - return None - - except Exception as e: - logging.error("There was an error with the request.") - logging.info(f"{type(e).__name__}: {e}") - return None - - -def getLicense(_domain, _path, _url): - - if "creativecommons.org" not in _domain: - logging.warning( - f"The license for the following work -> {_url} is not issued by" - f"Creative Commons." - ) - return [None, None] - - pattern = re.compile(r"/(licenses|publicdomain)/([a-z\-?]+)/(\d\.\d)/?(.*?)") - if pattern.match(_path.lower()): - result = re.search(pattern, _path.lower()) - license = result.group(2).lower().strip() - version = result.group(3).strip() - - if result.group(1) == "publicdomain": - if license == "zero": - license = "cc0" - elif license == "mark": - license = "pdm" - else: - logging.warning("License not detected!") - return [None, None] - - elif license == "": - logging.warning("License not detected!") - return [None, None] - - return [license, version] - - return [None, None] diff --git a/catalog/dags/retired/providers/provider_api_scripts/thingiverse.py b/catalog/dags/retired/providers/provider_api_scripts/thingiverse.py deleted file mode 100644 index 60c23268b3e..00000000000 --- a/catalog/dags/retired/providers/provider_api_scripts/thingiverse.py +++ /dev/null @@ -1,268 +0,0 @@ -# flake8: noqa -""" -Content Provider: Thingiverse - -ETL Process: Use the API to identify all CC0 3D Models. - -Output: TSV file containing the 3D models, their respective images and meta-data. - -Notes: - All API requests require authentication. - Rate limiting is 300 per 5 minute window. -""" - -import argparse - -from airflow.models import Variable -from modules.etlMods import * - - -MAX_THINGS = 30 -LICENSE = "pd0" -TOKEN = Variable.get("API_KEY_THINGIVERSE", default_var=None) -DELAY = 5.0 # seconds -FILE = f"thingiverse_{int(time.time())}.tsv" - - -def requestBatchThings(_page): - - url = "https://api.thingiverse.com/newest?access_token={1}&per_page={2}&page={0}".format( - _page, TOKEN, MAX_THINGS - ) - - result = requestContent(url) - if result: - return map(lambda x: x["id"], result) - - return None - - -def getMetaData(_thing, _date): - - url = f"https://api.thingiverse.com/things/{_thing}?access_token={TOKEN}" - licenseText = "Creative Commons - Public Domain Dedication" - license = None - version = None - creator = None - creatorURL = None - title = None - foreignURL = None - extracted = [] - - logging.info(f"Processing thing: {_thing}") - - result = requestContent(url) - if result: - - # verify the date - modDate = result.get("modified", "") - if modDate: - modDate = modDate.split("T")[0].strip() - if datetime.strptime(modDate, "%Y-%m-%d") < datetime.strptime( - _date, "%Y-%m-%d" - ): - return "-1" - - startTime = time.time() - - # validate CC0 license - if not ( - ("license" in result) and (licenseText.lower() in result["license"].lower()) - ): - logging.warning( - "License not detected => https://www.thingiverse.com/thing:{}".format( - _thing - ) - ) - delayProcessing(startTime, DELAY) - return None - else: - license = "CC0" - version = "1.0" - - # get meta data - - # description of the work - description = sanitizeString(result.get("description", "")) - - # title for the 3D model - title = sanitizeString(result.get("name", "")) - - # the landing page - if "public_url" in result: - foreignURL = result["public_url"].strip() - else: - foreignURL = f"https://www.thingiverse.com/thing:{_thing}" - - # creator of the 3D model - if "creator" in result: - if ("first_name" in result["creator"]) and ( - "last_name" in result["creator"] - ): - creator = "{} {}".format( - sanitizeString(result["creator"]["first_name"]), - sanitizeString(result["creator"]["last_name"]), - ) - - if (creator.strip() == "") and ("name" in result["creator"]): - creator = sanitizeString(result["creator"]["name"]) - - if "public_url" in result["creator"]: - creatorURL = result["creator"]["public_url"].strip() - - # get the tags - delayProcessing(startTime, DELAY) - logging.info(f"Requesting tags for thing: {_thing}") - startTime = time.time() - tags = requestContent(url.replace(_thing, f"{_thing}/tags")) - tagsList = None - - if tags: - tagsList = list( - map( - lambda tag: { - "name": str(tag["name"].strip()), - "provider": "thingiverse", - }, - tags, - ) - ) - - # get 3D models and their respective images - delayProcessing(startTime, DELAY) - logging.info(f"Requesting images for thing: {_thing}") - - imageList = requestContent(url.replace(_thing, f"{_thing}/files")) - if imageList is None: - logging.warning("Image Not Detected!") - delayProcessing(startTime, DELAY) - return None - - for img in imageList: - metaData = {} - thumbnail = None - imageURL = None - foreignID = None - - metaData["description"] = description - - if ("default_image" in img) and img["default_image"]: - if "url" not in img["default_image"]: - logging.warning("3D Model Not Detected!") - continue - - metaData["3d_model"] = img["default_image"]["url"] - foreignID = str(img["default_image"]["id"]) - images = img["default_image"]["sizes"] - - for imgSize in images: - - if str(imgSize["type"]).strip().lower() == "display": - - if str(imgSize["size"]).lower() == "medium": - thumbnail = imgSize["url"].strip() - - if str(imgSize["size"]).lower() == "large": - imageURL = imgSize["url"].strip() - - elif imageURL is None: - imageURL = thumbnail - - else: - continue - - if imageURL is None: - logging.warning("Image Not Detected!") - continue - - extracted.append( - [ - imageURL if not foreignID else foreignID, - foreignURL, - imageURL, - thumbnail, - "\\N", - "\\N", - "\\N", - license, - str(version), - creator, - creatorURL, - title, - "\\N" if not metaData else json.dumps(metaData), - "\\N" if not tagsList else json.dumps(tagsList), - "f", - "thingiverse", - "thingiverse", - ] - ) - - writeToFile(extracted, FILE) - - return len(extracted) - - -def execJob(_date): - page = 1 - result = 0 - isValid = True - tmpCtr = 0 - - while isValid: # temporary control flow - - batch = requestBatchThings(page) - - if batch: - batch = list(batch) - tmp = list( - filter( - None, list(map(lambda thing: getMetaData(str(thing), _date), batch)) - ) - ) - - if "-1" in tmp: - isValid = False - tmp = tmp.remove("-1") - - if tmp: - tmpCtr = sum(tmp) - - result += tmpCtr - tmpCtr = 0 - - page += 1 - - logging.info(f"Total CC0 3D Models: {result}") - - -def main(): - logging.info("Begin: Thingiverse API requests") - param = None - mode = "date: " - - parser = argparse.ArgumentParser(description="Thingiverse API Job", add_help=True) - parser.add_argument( - "--mode", - choices=["default", "newest"], - help="Identify all CC0 3D models from the previous day [default] or the current date [newest].", - ) - - args = parser.parse_args() - if args.mode: - - if str(args.mode) == "newest": - param = datetime.strftime(datetime.now(), "%Y-%m-%d") - else: - param = datetime.strftime(datetime.now() - timedelta(1), "%Y-%m-%d") - - mode += param if param is not None else "" - logging.info(f"Processing {mode}") - - if param: - execJob(param) - - logging.info("Terminated!") - - -if __name__ == "__main__": - main() diff --git a/catalog/dags/retired/providers/provider_api_scripts/walters.py b/catalog/dags/retired/providers/provider_api_scripts/walters.py deleted file mode 100644 index 94fde7f2766..00000000000 --- a/catalog/dags/retired/providers/provider_api_scripts/walters.py +++ /dev/null @@ -1,193 +0,0 @@ -""" -Content Provider: Walters Art Museum - -ETL Process: Use the API to identify all CC licensed images. - -Output: TSV file containing the images and the - respective meta-data. - -Notes: - Rate limit: 250000 Per Day Per Key -""" - -import logging - -from airflow.models import Variable -from common.loader import provider_details as prov -from common.requester import DelayedRequester -from common.storage.image import ImageStore - - -logging.basicConfig( - format="%(asctime)s - %(name)s - %(levelname)s: %(message)s", level=logging.INFO -) -logger = logging.getLogger(__name__) - -DELAY = 1 -LIMIT = 250000 -PROVIDER = prov.WALTERS_DEFAULT_PROVIDER -REQUEST_TYPE = "objects" -ENDPOINT = f"https://api.thewalters.org/v1/{REQUEST_TYPE}" -API_KEY = Variable.get("API_KEY_WALTERS_ART_MUSEUM", default_var=None) -MUSEUM_SITE = "https://art.thewalters.org" -LICENSE = "CC0 1.0" - -# The API takes api_key as a query and not as a header -DEFAULT_QUERY_PARAMS = {"accept": "json", "pageSize": 100, "orderBy": "classification"} - -QUERY_CLASSIFICATION = [ - "Miniatures", - "Stained & Painted Glass", - "Lacquer & Inlay", - "Ceramics", - "Precious Stones & Gems", - "Pearl, Horn, Coral & Shell", - "Sculpture", - "Textiles", - "Painting & Drawing", - "Prints", - "Coins & Medals", - "Arms & Armor", - "Mosaics & Cosmati", - "Niello", - "Wood", - "Enamels", - "Manuscripts & Rare Books", - "Ivory & Bone", - "Mummies & Cartonnage", - "Timepieces, Clocks & Watches", - "Glasswares", - "Metal", - "Gold, Silver & Jewelry", - "Stone", - "Resin, Wax & Composite", - "Leather", -] - -delayed_requester = DelayedRequester(DELAY) -image_store = ImageStore(provider=PROVIDER) - - -def main(): - logger.info("Begin: Walters Art Museum provider script.") - - for class_param in QUERY_CLASSIFICATION: - logger.info(f"Obtaining Images of Classification: {class_param}") - image_list = _get_image_list(class_param) - _ = _process_image_list(image_list) - - total_images = image_store.commit() - logger.info(f"Total Images: {total_images}") - logger.info("Terminating Script!") - - -def _get_image_list(class_param, endpoint=ENDPOINT, retries=5): - image_list = [] - page = 1 - cond = True - while cond: - query_params = _build_query_param(class_param=class_param, page=page) - page += 1 - json_response_inpydict_form = delayed_requester.get_response_json( - endpoint=endpoint, - retries=retries, - query_params=query_params, - ) - - items_list = _extract_items_list_from_json(json_response_inpydict_form) - if items_list is None: - break - for img in items_list: - image_list.append(img) - - if json_response_inpydict_form.get("NextPage") is not True: - cond = False - - if len(image_list) == 0: - logger.warning("No more tries remaining. Returning Nonetypes.") - return None - else: - return image_list - - -def _build_query_param( - class_param=None, default_query_params=None, apikey=API_KEY, page=1 -): - if default_query_params is None: - default_query_params = DEFAULT_QUERY_PARAMS - query_params = default_query_params.copy() - query_params.update({"classification": class_param, "apikey": apikey, "Page": page}) - - return query_params - - -def _extract_items_list_from_json(json_response): - if ( - json_response is None - or str(json_response.get("ReturnStatus")).lower() != "true" - or json_response.get("Items") is None - or len(json_response.get("Items")) == 0 - ): - items_list = None - else: - items_list = json_response.get("Items") - - return items_list - - -def _process_image_list(image_list): - total_images = 0 - if image_list is not None: - for img in image_list: - total_images = _process_image(img) - - return total_images - - -def _process_image(img): - logger.debug(f"Processing Image: {img}") - - foreign_landing_url = img.get("ResourceURL") - image_url = img.get("PrimaryImage", {}).get("Raw") - license_url = "https://creativecommons.org/publicdomain/zero/1.0/" - foreign_identifier = img.get("ObjectNumber") - creator, creator_url = _get_creator_info(img) - title = img.get("Title") - meta_data = _get_image_meta_data(img) - - return image_store.add_item( - foreign_landing_url=foreign_landing_url, - image_url=image_url, - license_url=license_url, - foreign_identifier=foreign_identifier, - creator=creator, - creator_url=creator_url, - title=title, - meta_data=meta_data, - ) - - -def _get_creator_info(img): - creator, creator_url = None, None - creator = img.get("Creator") - if creator: - creator_url = f"{MUSEUM_SITE}/browse/{creator.lower()}" - - return creator, creator_url - - -def _get_image_meta_data(img): - image_meta_data = { - "ObjectNumber": img.get("ObjectNumber"), - "PublicAccessDate": img.get("PublicAccessDate"), - "Collection": img.get("Collection"), - "Medium": img.get("Medium"), - "Classification": img.get("Classification"), - "Description": img.get("Description"), - "CreditLine": img.get("CreditLine"), - } - return {k: v for k, v in image_meta_data.items() if v is not None} - - -if __name__ == "__main__": - main() diff --git a/catalog/dags/retired/update_workflows/__init__.py b/catalog/dags/retired/update_workflows/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/catalog/dags/retired/update_workflows/europeana_sub_provider_update_workflow.py b/catalog/dags/retired/update_workflows/europeana_sub_provider_update_workflow.py deleted file mode 100644 index 203a40f9ccd..00000000000 --- a/catalog/dags/retired/update_workflows/europeana_sub_provider_update_workflow.py +++ /dev/null @@ -1,63 +0,0 @@ -""" -This file configures the Apache Airflow DAG to update the database table to -reflect appropriate Europeana sub provider/ default provider names in the -source field -""" - -import logging -import os -from datetime import datetime, timedelta - -from airflow import DAG -from airflow.operators.python import PythonOperator -from retired.update_workflows import update_sql - - -logging.basicConfig( - format="%(asctime)s - %(name)s - %(levelname)s: %(message)s", level=logging.INFO -) - -logger = logging.getLogger(__name__) - -DAG_ID = "europeana_sub_provider_update_workflow" -DB_CONN_ID = os.getenv("OPENLEDGER_CONN_ID", "postgres_openledger_testing") -CONCURRENCY = 5 - -DAG_DEFAULT_ARGS = { - "owner": "data-eng-admin", - "depends_on_past": False, - "start_date": datetime(2020, 1, 15), - "email_on_retry": False, - "retries": 2, - "retry_delay": timedelta(seconds=15), - "schedule_interval": None, -} - - -def create_dag( - dag_id=DAG_ID, - args=DAG_DEFAULT_ARGS, - concurrency=CONCURRENCY, - max_active_runs=CONCURRENCY, - postgres_conn_id=DB_CONN_ID, -): - dag = DAG( - dag_id=dag_id, - default_args=args, - concurrency=concurrency, - max_active_runs=max_active_runs, - catchup=False, - schedule_interval=None, - ) - - with dag: - PythonOperator( - task_id="update_europeana_sub_providers", - python_callable=update_sql.update_europeana_sub_providers, - op_args=[postgres_conn_id], - ) - - return dag - - -globals()[DAG_ID] = create_dag() diff --git a/catalog/dags/retired/update_workflows/flickr_sub_provider_update_workflow.py b/catalog/dags/retired/update_workflows/flickr_sub_provider_update_workflow.py deleted file mode 100644 index 968b11c4db9..00000000000 --- a/catalog/dags/retired/update_workflows/flickr_sub_provider_update_workflow.py +++ /dev/null @@ -1,63 +0,0 @@ -""" -This file configures the Apache Airflow DAG to update the database table to -reflect appropriate Flickr sub provider/ default provider names in the source -field -""" - -import logging -import os -from datetime import datetime, timedelta - -from airflow import DAG -from airflow.operators.python import PythonOperator -from retired.update_workflows import update_sql - - -logging.basicConfig( - format="%(asctime)s - %(name)s - %(levelname)s: %(message)s", level=logging.INFO -) - -logger = logging.getLogger(__name__) - -DAG_ID = "flickr_sub_provider_update_workflow" -DB_CONN_ID = os.getenv("OPENLEDGER_CONN_ID", "postgres_openledger_testing") -CONCURRENCY = 5 - -DAG_DEFAULT_ARGS = { - "owner": "data-eng-admin", - "depends_on_past": False, - "start_date": datetime(2020, 1, 15), - "email_on_retry": False, - "retries": 2, - "retry_delay": timedelta(seconds=15), - "schedule_interval": None, -} - - -def create_dag( - dag_id=DAG_ID, - args=DAG_DEFAULT_ARGS, - concurrency=CONCURRENCY, - max_active_runs=CONCURRENCY, - postgres_conn_id=DB_CONN_ID, -): - dag = DAG( - dag_id=dag_id, - default_args=args, - concurrency=concurrency, - max_active_runs=max_active_runs, - catchup=False, - schedule_interval=None, - ) - - with dag: - PythonOperator( - task_id="update_flickr_sub_providers", - python_callable=update_sql.update_flickr_sub_providers, - op_args=[postgres_conn_id], - ) - - return dag - - -globals()[DAG_ID] = create_dag() diff --git a/catalog/dags/retired/update_workflows/smithsonian_sub_provider_update_workflow.py b/catalog/dags/retired/update_workflows/smithsonian_sub_provider_update_workflow.py deleted file mode 100644 index 1f5899914b8..00000000000 --- a/catalog/dags/retired/update_workflows/smithsonian_sub_provider_update_workflow.py +++ /dev/null @@ -1,62 +0,0 @@ -""" -This file configures the Apache Airflow DAG to update the database table to -reflect appropriate Smithsonian sub provider names in the source field -""" - -import logging -import os -from datetime import datetime, timedelta - -from airflow import DAG -from airflow.operators.python import PythonOperator -from retired.update_workflows import update_sql - - -logging.basicConfig( - format="%(asctime)s - %(name)s - %(levelname)s: %(message)s", level=logging.INFO -) - -logger = logging.getLogger(__name__) - -DAG_ID = "smithsonian_sub_provider_update_workflow" -DB_CONN_ID = os.getenv("OPENLEDGER_CONN_ID", "postgres_openledger_testing") -CONCURRENCY = 5 - -DAG_DEFAULT_ARGS = { - "owner": "data-eng-admin", - "depends_on_past": False, - "start_date": datetime(2020, 1, 15), - "email_on_retry": False, - "retries": 2, - "retry_delay": timedelta(seconds=15), - "schedule_interval": None, -} - - -def create_dag( - dag_id=DAG_ID, - args=DAG_DEFAULT_ARGS, - concurrency=CONCURRENCY, - max_active_runs=CONCURRENCY, - postgres_conn_id=DB_CONN_ID, -): - dag = DAG( - dag_id=dag_id, - default_args=args, - concurrency=concurrency, - max_active_runs=max_active_runs, - catchup=False, - schedule_interval=None, - ) - - with dag: - PythonOperator( - task_id="update_smithsonian_sub_providers", - python_callable=update_sql.update_smithsonian_sub_providers, - op_args=[postgres_conn_id], - ) - - return dag - - -globals()[DAG_ID] = create_dag() diff --git a/catalog/dags/retired/update_workflows/update_sql.py b/catalog/dags/retired/update_workflows/update_sql.py deleted file mode 100644 index 4ed0a0b6c97..00000000000 --- a/catalog/dags/retired/update_workflows/update_sql.py +++ /dev/null @@ -1,270 +0,0 @@ -import json -from textwrap import dedent - -from airflow.providers.postgres.hooks.postgres import PostgresHook -from storage import columns as col -from util.constants import IMAGE -from util.loader import provider_details as prov -from util.loader.sql import DB_USER_NAME, TABLE_NAMES, logger - - -def _create_temp_flickr_sub_prov_table( - postgres_conn_id, temp_table="temp_flickr_sub_prov_table" -): - """ - Drop the temporary table if it already exists - """ - postgres = PostgresHook(postgres_conn_id=postgres_conn_id) - postgres.run(f"DROP TABLE IF EXISTS public.{temp_table};") - - """ - Create intermediary table for sub provider migration - """ - postgres.run( - dedent( - f""" - CREATE TABLE public.{temp_table} ( - {col.CREATOR_URL.db_name} character varying(2000), - sub_provider character varying(80) - ); - """ - ) - ) - - postgres.run(f"ALTER TABLE public.{temp_table} OWNER TO {DB_USER_NAME};") - - """ - Populate the intermediary table with the sub providers of interest - """ - for sub_prov, user_id_set in prov.FLICKR_SUB_PROVIDERS.items(): - for user_id in user_id_set: - creator_url = prov.FLICKR_PHOTO_URL_BASE + user_id - postgres.run( - dedent( - f""" - INSERT INTO public.{temp_table} ( - {col.CREATOR_URL.db_name}, - sub_provider - ) - VALUES ( - '{creator_url}', - '{sub_prov}' - ); - """ - ) - ) - - return temp_table - - -def update_flickr_sub_providers( - postgres_conn_id, - image_table=TABLE_NAMES[IMAGE], - default_provider=prov.FLICKR_DEFAULT_PROVIDER, -): - postgres = PostgresHook(postgres_conn_id=postgres_conn_id) - temp_table = _create_temp_flickr_sub_prov_table(postgres_conn_id) - - select_query = dedent( - f""" - SELECT - {col.FOREIGN_ID.db_name} AS foreign_id, - public.{temp_table}.sub_provider AS sub_provider - FROM {image_table} - INNER JOIN public.{temp_table} - ON - {image_table}.{col.CREATOR_URL.db_name} = public.{temp_table}.{ - col.CREATOR_URL.db_name} - AND - {image_table}.{col.PROVIDER.db_name} = '{default_provider}'; - """ - ) - - selected_records = postgres.get_records(select_query) - logger.info(f"Updating {len(selected_records)} records") - - for row in selected_records: - foreign_id = row[0] - sub_provider = row[1] - postgres.run( - dedent( - f""" - UPDATE {image_table} - SET {col.SOURCE.db_name} = '{sub_provider}' - WHERE - {image_table}.{col.PROVIDER.db_name} = '{default_provider}' - AND - MD5({image_table}.{col.FOREIGN_ID.db_name}) = MD5('{foreign_id}'); - """ - ) - ) - - """ - Drop the temporary table - """ - postgres.run(f"DROP TABLE public.{temp_table};") - - -def _create_temp_europeana_sub_prov_table( - postgres_conn_id, temp_table="temp_eur_sub_prov_table" -): - """ - Drop the temporary table if it already exists - """ - postgres = PostgresHook(postgres_conn_id=postgres_conn_id) - postgres.run(f"DROP TABLE IF EXISTS public.{temp_table};") - - """ - Create intermediary table for sub provider migration - """ - postgres.run( - dedent( - f""" - CREATE TABLE public.{temp_table} ( - data_provider character varying(120), - sub_provider character varying(80) - ); - """ - ) - ) - - postgres.run(f"ALTER TABLE public.{temp_table} OWNER TO {DB_USER_NAME};") - - """ - Populate the intermediary table with the sub providers of interest - """ - for sub_prov, data_provider in prov.EUROPEANA_SUB_PROVIDERS.items(): - postgres.run( - dedent( - f""" - INSERT INTO public.{temp_table} ( - data_provider, - sub_provider - ) - VALUES ( - '{data_provider}', - '{sub_prov}' - ); - """ - ) - ) - - return temp_table - - -def update_europeana_sub_providers( - postgres_conn_id, - image_table=TABLE_NAMES[IMAGE], - default_provider=prov.EUROPEANA_DEFAULT_PROVIDER, - sub_providers=prov.EUROPEANA_SUB_PROVIDERS, -): - postgres = PostgresHook(postgres_conn_id=postgres_conn_id) - temp_table = _create_temp_europeana_sub_prov_table(postgres_conn_id) - - select_query = dedent( - f""" - SELECT L.foreign_id, L.data_providers, R.sub_provider - FROM( - SELECT - {col.FOREIGN_ID.db_name} AS foreign_id, - {col.META_DATA.db_name} ->> 'dataProvider' AS data_providers, - {col.META_DATA.db_name} - FROM {image_table} - WHERE {col.PROVIDER.db_name} = '{default_provider}' - ) L INNER JOIN - {temp_table} R ON - L.{col.META_DATA.db_name} ->'dataProvider' ? R.data_provider; - """ - ) - - selected_records = postgres.get_records(select_query) - - """ - Update each selected row if it corresponds to only one sub-provider. - Otherwise an exception is thrown - """ - for row in selected_records: - foreign_id = row[0] - data_providers = json.loads(row[1]) - sub_provider = row[2] - - eligible_sub_providers = { - s for s in sub_providers if sub_providers[s] in data_providers - } - if len(eligible_sub_providers) > 1: - raise Exception( - f"More than one sub-provider identified for the " - f"image with foreign ID {foreign_id}" - ) - - assert len(eligible_sub_providers) == 1 - assert eligible_sub_providers.pop() == sub_provider - - postgres.run( - dedent( - f""" - UPDATE {image_table} - SET {col.SOURCE.db_name} = '{sub_provider}' - WHERE - {image_table}.{col.PROVIDER.db_name} = '{default_provider}' - AND - MD5({image_table}.{col.FOREIGN_ID.db_name}) = MD5('{foreign_id}'); - """ - ) - ) - - """ - Drop the temporary table - """ - postgres.run(f"DROP TABLE public.{temp_table};") - - -def update_smithsonian_sub_providers( - postgres_conn_id, - image_table=TABLE_NAMES[IMAGE], - default_provider=prov.SMITHSONIAN_DEFAULT_PROVIDER, - sub_providers=prov.SMITHSONIAN_SUB_PROVIDERS, -): - postgres = PostgresHook(postgres_conn_id=postgres_conn_id) - - """ - Select all records where the source value is not yet updated - """ - select_query = dedent( - f""" - SELECT {col.FOREIGN_ID.db_name}, - {col.META_DATA.db_name} ->> 'unit_code' AS unit_code - FROM {image_table} - WHERE - {col.PROVIDER.db_name} = '{default_provider}' - AND - {col.SOURCE.db_name} = '{default_provider}'; - """ - ) - - selected_records = postgres.get_records(select_query) - - """ - Set the source value of each selected row to the sub-provider value - corresponding to unit code. If the unit code is unknown, an error is thrown - """ - for row in selected_records: - foreign_id = row[0] - unit_code = row[1] - - source = next((s for s in sub_providers if unit_code in sub_providers[s]), None) - if source is None: - raise Exception(f"An unknown unit code value {unit_code} encountered ") - - postgres.run( - dedent( - f""" - UPDATE {image_table} - SET {col.SOURCE.db_name} = '{source}' - WHERE - {image_table}.{col.PROVIDER.db_name} = '{default_provider}' - AND - MD5({image_table}.{col.FOREIGN_ID.db_name}) = MD5('{foreign_id}'); - """ - ) - ) diff --git a/catalog/utilities/media_props_gen/docs/media_props.md b/catalog/utilities/media_props_gen/docs/media_props.md index c8b9a5752c6..ff78d88e32f 100644 --- a/catalog/utilities/media_props_gen/docs/media_props.md +++ b/catalog/utilities/media_props_gen/docs/media_props.md @@ -258,7 +258,7 @@ index during the data refresh process. ## Selection Criteria -[`expire_old_images`](https://github.com/WordPress/openverse/tree/main/catalog/dags/retired/common/loader/sql.py) +[`expire_old_images`](https://github.com/WordPress/openverse/tree/c517c26e2c2f785f70d36c1928343b66c0966f4b/catalog/dags/retired/common/loader/sql.py) DAG added in [Expiration of outdated images in the database](https://github.com/cc-archive/cccatalog/pull/483) was used to set `removed_from_source` to `True` for images that were updated diff --git a/documentation/catalog/guides/quickstart.md b/documentation/catalog/guides/quickstart.md index e45f460cb7d..22461e204c9 100644 --- a/documentation/catalog/guides/quickstart.md +++ b/documentation/catalog/guides/quickstart.md @@ -202,9 +202,8 @@ catalog/ # Primary code directory │ ├── oauth2/ # - DAGs & code for Oauth2 key management │ ├── providers/ # - DAGs & code for provider ingestion │ │ ├── provider_api_scripts/ # - API access code specific to providers -│ │ ├── provider_csv_load_scripts/ # - Schema initialization SQL definitions for SQL-based providers -│ │ │ └── *.py # - DAG definition files for providers -│ │ └── retired/ # - DAGs & code that is no longer needed but might be a useful guide for the future +│ │ └── provider_csv_load_scripts/ # - Schema initialization SQL definitions for SQL-based providers +│ │ └── *.py # - DAG definition files for providers │ ├── templates/ # Templates for generating new provider code └── * # Documentation, configuration files, and project requirements ``` diff --git a/documentation/catalog/reference/index.md b/documentation/catalog/reference/index.md index f4700dc117e..b98ba63ec03 100644 --- a/documentation/catalog/reference/index.md +++ b/documentation/catalog/reference/index.md @@ -4,5 +4,4 @@ :titlesonly: DAGs -retired ``` diff --git a/documentation/catalog/reference/retired.md b/documentation/catalog/reference/retired.md deleted file mode 100644 index ccd0b3a28f6..00000000000 --- a/documentation/catalog/reference/retired.md +++ /dev/null @@ -1,39 +0,0 @@ -# Retired - -## Web Crawl Data (retired) - -The Common Crawl Foundation provides an open repository of petabyte-scale web -crawl data. A new dataset is published at the end of each month comprising over -200 TiB of uncompressed data. - -The data is available in three file formats: - -- WARC (Web ARChive): the entire raw data, including HTTP response metadata, - WARC metadata, etc. -- WET: extracted plaintext from each webpage. -- WAT: extracted html metadata, e.g. HTTP headers and hyperlinks, etc. - -For more information about these formats, please see the [Common Crawl -documentation][ccrawl_doc]. - -Openverse Catalog used AWS Data Pipeline service to automatically create an -Amazon EMR cluster of 100 c4.8xlarge instances that parsed the WAT archives to -identify all domains that link to creativecommons.org. Due to the volume of -data, Apache Spark was also used to streamline the processing. The output of -this methodology was a series of parquet files that contain: - -- the domains and its respective content path and query string (i.e. the exact - webpage that links to creativecommons.org) -- the CC referenced hyperlink (which may indicate a license), -- HTML meta data in JSON format which indicates the number of images on each - webpage and other domains that they reference, -- the location of the webpage in the WARC file so that the page contents can be - found. - -The steps above were performed in [`ExtractCCLinks.py`][ex_cc_links]. - -This method was retired in 2021. - -[ccrawl_doc]: https://commoncrawl.org/the-data/get-started/ -[ex_cc_links]: - https://github.com/WordPress/openverse/blob/c20262cad8944d324b49176678b16b230bc57e2e/archive/ExtractCCLinks.py diff --git a/documentation/meta/media_properties/catalog.md b/documentation/meta/media_properties/catalog.md index e0b229a1464..62b4870b835 100644 --- a/documentation/meta/media_properties/catalog.md +++ b/documentation/meta/media_properties/catalog.md @@ -469,7 +469,7 @@ index during the data refresh process. #### Selection Criteria -[`expire_old_images`](https://github.com/WordPress/openverse/tree/main/catalog/dags/retired/common/loader/sql.py) +[`expire_old_images`](https://github.com/WordPress/openverse/tree/c517c26e2c2f785f70d36c1928343b66c0966f4b/catalog/dags/retired/common/loader/sql.py) DAG added in [Expiration of outdated images in the database](https://github.com/cc-archive/cccatalog/pull/483) was used to set `removed_from_source` to `True` for images that were updated diff --git a/pyproject.toml b/pyproject.toml index a34cc8192be..c92895f00e5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,7 +53,6 @@ known-first-party = [ "maintenance", "oauth2", "providers", - "retired", "test", ]