Skip to content

Commit

Permalink
Merge pull request #315 from georgetown-cset/310-use-new-name-linkages
Browse files Browse the repository at this point in the history
Use new table of CSET_id to unnormalized org name linkages
  • Loading branch information
rggelles authored May 8, 2024
2 parents 4a24fe0 + 232791f commit 2fe20d8
Show file tree
Hide file tree
Showing 29 changed files with 512 additions and 1,061 deletions.
163 changes: 1 addition & 162 deletions company_linkage/parat_data_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
"values": [
"default-pool",
"parat-pool",
]
}]
}]
Expand All @@ -154,7 +154,6 @@
)

# load aggregated_organizations to BigQuery

load_aggregated_orgs = GCSToBigQueryOperator(
task_id=f"load_{aggregated_table}",
bucket=DATA_BUCKET,
Expand All @@ -166,157 +165,6 @@
write_disposition="WRITE_TRUNCATE"
)

run_get_ai_counts = GKEStartPodOperator(
task_id="run_get_ai_counts",
project_id=PROJECT_ID,
location=GCP_ZONE,
cluster_name="cc2-task-pool",
name="run_get_ai_counts",
cmds=["/bin/bash"],
arguments=["-c", (f"echo 'getting AI counts!' ; rm -r ai || true ; "
f"mkdir -p ai && "
f"python3 get_ai_counts.py ai/ai_company_papers.jsonl ai/ai_company_patents.jsonl "
f"ai/ai_company_patent_grants.jsonl && "
f"gsutil -m cp -r ai gs://{DATA_BUCKET}/{tmp_dir}/ ")],
namespace="default",
image=f"us.gcr.io/{PROJECT_ID}/parat",
get_logs=True,
startup_timeout_seconds=300,
# see also https://cloud.google.com/composer/docs/how-to/using/using-kubernetes-pod-operator#affinity-config
affinity={
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [{
"matchExpressions": [{
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
"values": [
"default-pool",
]
}]
}]
}
}
}
)

load_ai_papers = GCSToBigQueryOperator(
task_id=f"load_ai_company_papers",
bucket=DATA_BUCKET,
source_objects=[f"{tmp_dir}/ai/ai_company_papers.jsonl"],
schema_object=f"{schema_dir}/ai_papers_schema.json",
destination_project_dataset_table=f"{staging_dataset}.ai_company_papers",
source_format="NEWLINE_DELIMITED_JSON",
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE"
)

load_ai_patents = GCSToBigQueryOperator(
task_id=f"load_ai_company_patents",
bucket=DATA_BUCKET,
source_objects=[f"{tmp_dir}/ai/ai_company_patents.jsonl"],
schema_object=f"{schema_dir}/ai_patents_schema.json",
destination_project_dataset_table=f"{staging_dataset}.ai_company_patents",
source_format="NEWLINE_DELIMITED_JSON",
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE"
)

load_ai_patent_grants = GCSToBigQueryOperator(
task_id=f"load_ai_company_patent_grants",
bucket=DATA_BUCKET,
source_objects=[f"{tmp_dir}/ai/ai_company_patent_grants.jsonl"],
schema_object=f"{schema_dir}/ai_patents_schema.json",
destination_project_dataset_table=f"{staging_dataset}.ai_company_patent_grants",
source_format="NEWLINE_DELIMITED_JSON",
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE"
)

run_papers = []
for paper_type in ["top_paper", "highly_cited_paper", "all_paper", "all_patent"]:

run_get_paper_counts = GKEStartPodOperator(
task_id=f"run_get_{paper_type}_counts",
project_id=PROJECT_ID,
location=GCP_ZONE,
cluster_name="cc2-task-pool",
name=f"run_get_{paper_type}_counts",
cmds=["/bin/bash"],
arguments=["-c", (f"echo 'getting {paper_type} counts!' ; rm -r {paper_type} || true ; "
f"mkdir -p {paper_type} && "
f"python3 {paper_type}s.py {paper_type}/{paper_type}_counts.jsonl && "
f"gsutil -m cp -r {paper_type} gs://{DATA_BUCKET}/{tmp_dir}/ ")],
namespace="default",
image=f"us.gcr.io/{PROJECT_ID}/parat",
get_logs=True,
startup_timeout_seconds=300,
# see also https://cloud.google.com/composer/docs/how-to/using/using-kubernetes-pod-operator#affinity-config
affinity={
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [{
"matchExpressions": [{
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
"values": [
"default-pool",
]
}]
}]
}
}
}
)
run_papers.append(run_get_paper_counts)

# even though these are near-identical we do these in sequence -- we'd have to put in a dummy operator
# otherwise anyway and they should be fast

load_top_papers = GCSToBigQueryOperator(
task_id=f"load_top_papers",
bucket=DATA_BUCKET,
source_objects=[f"{tmp_dir}/top_paper/top_paper_counts.jsonl"],
schema_object=f"{schema_dir}/top_papers_schema.json",
destination_project_dataset_table=f"{staging_dataset}.top_paper_counts",
source_format="NEWLINE_DELIMITED_JSON",
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE"
)

load_highly_cited_papers = GCSToBigQueryOperator(
task_id=f"load_highly_cited_papers",
bucket=DATA_BUCKET,
source_objects=[f"{tmp_dir}/highly_cited_paper/highly_cited_paper_counts.jsonl"],
schema_object=f"{schema_dir}/highly_cited_papers_schema.json",
destination_project_dataset_table=f"{staging_dataset}.highly_cited_paper_counts",
source_format="NEWLINE_DELIMITED_JSON",
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE"
)

load_all_papers = GCSToBigQueryOperator(
task_id=f"load_all_papers",
bucket=DATA_BUCKET,
source_objects=[f"{tmp_dir}/all_paper/all_paper_counts.jsonl"],
schema_object=f"{schema_dir}/all_papers_schema.json",
destination_project_dataset_table=f"{staging_dataset}.all_paper_counts",
source_format="NEWLINE_DELIMITED_JSON",
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE"
)

load_all_patents = GCSToBigQueryOperator(
task_id=f"load_all_patents",
bucket=DATA_BUCKET,
source_objects=[f"{tmp_dir}/all_patent/all_patent_counts.jsonl"],
schema_object=f"{schema_dir}/all_patents_schema.json",
destination_project_dataset_table=f"{staging_dataset}.all_patent_counts",
source_format="NEWLINE_DELIMITED_JSON",
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE"
)

start_visualization_tables = DummyOperator(task_id="start_visualization_tables")
wait_for_visualization_tables = DummyOperator(task_id="wait_for_visualization_tables")

Expand Down Expand Up @@ -406,15 +254,6 @@
wait_for_initial_tables
>> aggregate_organizations
>> load_aggregated_orgs
>> run_get_ai_counts
>> load_ai_papers
>> load_ai_patents
>> load_ai_patent_grants
>> run_papers
>> load_top_papers
>> load_highly_cited_papers
>> load_all_papers
>> load_all_patents
>> start_visualization_tables
)
(
Expand Down
4 changes: 2 additions & 2 deletions company_linkage/parat_scripts/aggregate_organizations.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,8 @@ def aggregate_organizations(output_file, local=False):

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("output_file", type=str, help="A jsonl file for writing output data to create new tables")
parser.add_argument("from_airflow", type=bool, default=False, action="store_true",
parser.add_argument("--output_file", required=True, help="A jsonl file for writing output data to create new tables")
parser.add_argument("--from_airflow", default=False, action="store_true",
help="If true, will upload output to GCS")
args = parser.parse_args()
if not args.output_file.endswith(".jsonl"):
Expand Down
28 changes: 0 additions & 28 deletions company_linkage/parat_scripts/all_papers.py

This file was deleted.

28 changes: 0 additions & 28 deletions company_linkage/parat_scripts/all_patents.py

This file was deleted.

Loading

0 comments on commit 2fe20d8

Please sign in to comment.