From f8d42e87a172f8b79405a75da1f69c8655eb84b2 Mon Sep 17 00:00:00 2001 From: Ian Ross Date: Wed, 22 Jul 2020 10:34:38 -0500 Subject: [PATCH 1/2] add ALICE/ALLIE abbreviation expansion. TODO: better on/off toggle --- docker-compose.yml | 20 ++++++++++++++++- index_pubmed.py | 53 ++++++++++++++++++++++++++++++++++++++++++++-- init.sql | 49 ++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 2 ++ 4 files changed, 121 insertions(+), 3 deletions(-) create mode 100644 init.sql diff --git a/docker-compose.yml b/docker-compose.yml index 854691f..a87e46b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -30,12 +30,30 @@ services: build: . networks: - kmnet - command: "/bin/bash ./wait_for_it.sh -t 0 es01:9200 -- python index_pubmed.py bulk --n_min 1 --n_max 1" +# command: "tail -f /dev/null" + command: "wait-for-it -s es01:9200 -s km_postgres:5432 -- python index_pubmed.py bulk --n_min 1 --n_max 5" depends_on: - es01 environment: - PYTHONUNBUFFERED=1 + postgres: + container_name: km_postgres + restart: always + image: postgres:latest + environment: + - POSTGRES_PASSWORD=supersecretpassword + - POSTGRES_USER=kinderminer + volumes: + - ./init.sql:/docker-entrypoint-initdb.d/init.sql + - type: bind + source: ./allie_data + target: /var/lib/postgresql/data + networks: + - kmnet + ports: + - "5432:5432" + volumes: esdata01: driver: local diff --git a/index_pubmed.py b/index_pubmed.py index 4e0bc9b..9f6d5cd 100644 --- a/index_pubmed.py +++ b/index_pubmed.py @@ -8,8 +8,13 @@ from xml.etree import ElementTree as ET import re import ftplib +import psycopg2 +import psycopg2.extras +import urllib.request as urllib import pickle +DO_ABBREVIATIONS = True + es = Elasticsearch(['es01:9200']) def parse_cover_date(coverDate): @@ -157,12 +162,16 @@ def update_mapping(index_name, type_name): return 0 class Helper(): + def __init__(self): + self.conn = psycopg2.connect("dbname=%s user=%s password=%s host=%s port=%s" % \ + ("kinderminer", "kinderminer", "supersecretpassword", "km_postgres", "5432")) + self.cur = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) + def get_metadata_from_xml(self, filepath): """ """ metadata = {} - parser = ET.iterparse(filepath) for event, element in parser: @@ -271,6 +280,14 @@ def get_metadata_from_xml(self, filepath): temp["metadata_update"] = datetime.datetime.now() + if DO_ABBREVIATIONS: + print("Checking for abbreviations") + self.cur.execute("SELECT DISTINCT(short_form, long_form), short_form, long_form FROM alice_abbreviations WHERE pubmed_id=%(pmid)s", + {"pmid" : temp["PMID"]}) + for abbr in self.cur: + print("Abbreviation found!") + temp["abstract_long_form"] = temp["abstract"].replace(abbr['short_form'], abbr['long_form']) + temp['time'] = [datetime.datetime.now()] element.clear() @@ -344,6 +361,33 @@ def update(self): updates_applied.add(update_file) pickle.dump(updates_applied, open("pubmed_updates_applied.p", "w")) +def download_allie(): + psql_fetching_conn = psycopg2.connect("dbname=%s user=%s password=%s host=%s port=%s" % \ + ("kinderminer", "kinderminer", "supersecretpassword", "km_postgres", "5432")) + cur = psql_fetching_conn.cursor() + + update_file = 'alice_output_latest.txt.gz' + print('ftp://ftp.dbcls.jp/allie/alice_output/%s' % update_file) + urllib.urlretrieve('ftp://ftp.dbcls.jp/allie/alice_output/%s' % update_file, update_file) + subprocess.call(["gunzip", '%s' % update_file]) + print("Cleaning up text") + subprocess.call(["sed", "s/\\\\/\\\\\\\\/g", "-i", update_file.replace(".gz", "")]) + print("Copying into postgres") + + # TODO: Need to make sure the table is there... but that can be done at the docker level + + try: + with open(update_file.replace(".gz", "")) as fin: + cur.copy_from(fin, "alice_abbreviations") + psql_fetching_conn.commit() + #subprocess.call(["rm", update_file.replace(".gz", "")]) + except: + print("Error copying %s" % update_file) + print(sys.exc_info()) + psql_fetching_conn.commit() + #subprocess.call(["rm", update_file.replace(".gz", "")]) + return 0 + def main(): parser = argparse.ArgumentParser( description="Utility for indexing PubMed abstracts into Elasticsearch to make them full-text searchable." @@ -352,6 +396,11 @@ def main(): parser.add_argument('--n_min', default=1, type=int, help='Minimum file number to process.') parser.add_argument('--n_max', default=1, type=int, help='Maximum file number to process.') + + # TODO: pass + do in abbreviation embiggening + #if DO_ABBREVIATIONS: + #download_allie() + if not es.indices.exists("pubmed_abstracts"): es.indices.create("pubmed_abstracts") print("Waiting for ok status...") @@ -367,6 +416,6 @@ def main(): else: print("Invalid operation specified!") sys.exit(1) - +# if __name__ == '__main__': main() diff --git a/init.sql b/init.sql new file mode 100644 index 0000000..8a45b71 --- /dev/null +++ b/init.sql @@ -0,0 +1,49 @@ +-- +-- PostgreSQL database dump +-- + +-- Dumped from database version 10.13 +-- Dumped by pg_dump version 10.13 + +SET statement_timeout = 0; +SET lock_timeout = 0; +SET idle_in_transaction_session_timeout = 0; +SET client_encoding = 'UTF8'; +SET standard_conforming_strings = on; +SELECT pg_catalog.set_config('search_path', '', false); +SET check_function_bodies = false; +SET xmloption = content; +SET client_min_messages = warning; +SET row_security = off; + +SET default_tablespace = ''; + +SET default_with_oids = false; + +-- +-- Name: alice_abbreviations; Type: TABLE; Schema: public; Owner: kinderminer +-- + +CREATE TABLE public.alice_abbreviations ( + sequential_id integer, + pubmed_id text, + publication_year text, + long_form_id integer, + short_form_id integer, + long_form text, + short_form text +); + + +ALTER TABLE public.alice_abbreviations OWNER TO kinderminer; + +-- +-- Name: alice_abbreviations_pubmed_id_idx; Type: INDEX; Schema: public; Owner: kinderminer +-- + +CREATE INDEX alice_abbreviations_pubmed_id_idx ON public.alice_abbreviations USING btree (pubmed_id); + +-- +-- PostgreSQL database dump complete +-- + diff --git a/requirements.txt b/requirements.txt index 56b20ac..44d974b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,4 @@ elasticsearch python-dateutil +psycopg2 +wait-for-it From a3469bd0d44ddb05951cf9145169c9950ff25888 Mon Sep 17 00:00:00 2001 From: Ian Ross Date: Wed, 22 Jul 2020 11:02:49 -0500 Subject: [PATCH 2/2] Add abbreviation expansion toggle --- .dockerignore | 1 + .env | 3 +++ README.md | 11 +++++++++++ docker-compose.yml | 4 ++-- index_pubmed.py | 11 +++++------ 5 files changed, 22 insertions(+), 8 deletions(-) create mode 100644 .dockerignore create mode 100644 .env diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..dc6cc59 --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +*data diff --git a/.env b/.env new file mode 100644 index 0000000..8180890 --- /dev/null +++ b/.env @@ -0,0 +1,3 @@ +# set to 1 to expand abbreviations via ALLIE (http://allie.dbcls.jp) + +EXPAND_ABBREVIATIONS=1 diff --git a/README.md b/README.md index 90cc898..18e890e 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,17 @@ It is also possible to ingest the daily update files provided by MEDLINE (`ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/`). **BY DEFAULT, ALL UPDATE FILES WILL BE APPLIED IN THIS MODE** +## Abbreviation expansion +Abberviation expansion is done via the ALLIE (http://allie.dbcls.jp) database. +By default, abbrevations are kept as-is from PubMed, but by changing the setting in `.env` +to + +``` +EXPAND_ABBREVIATIONS=1 +``` + +The ALLIE database will be downloaded and installed into a postgres table. As the PubMed abstracts are ingested, this database is queried and any abbreviations found within the abstract are replaced with the long form, and the result is stored within the `abstract_long_form` field. + ## Caveats - The intended use is for testing of query logic, and the JVM options set for Elasticsearch are set with this in mind. diff --git a/docker-compose.yml b/docker-compose.yml index a87e46b..9452e45 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -30,12 +30,12 @@ services: build: . networks: - kmnet -# command: "tail -f /dev/null" - command: "wait-for-it -s es01:9200 -s km_postgres:5432 -- python index_pubmed.py bulk --n_min 1 --n_max 5" + command: "wait-for-it -s es01:9200 -s km_postgres:5432 -- python index_pubmed.py bulk --n_min 1 --n_max 1" depends_on: - es01 environment: - PYTHONUNBUFFERED=1 + - EXPAND_ABBREVIATIONS=${EXPAND_ABBREVIATIONS} postgres: container_name: km_postgres diff --git a/index_pubmed.py b/index_pubmed.py index 9f6d5cd..6c84fa6 100644 --- a/index_pubmed.py +++ b/index_pubmed.py @@ -13,7 +13,7 @@ import urllib.request as urllib import pickle -DO_ABBREVIATIONS = True +EXPAND_ABBREVIATIONS = True if os.environ['EXPAND_ABBREVIATIONS'] == '1' else False es = Elasticsearch(['es01:9200']) @@ -280,7 +280,7 @@ def get_metadata_from_xml(self, filepath): temp["metadata_update"] = datetime.datetime.now() - if DO_ABBREVIATIONS: + if EXPAND_ABBREVIATIONS: print("Checking for abbreviations") self.cur.execute("SELECT DISTINCT(short_form, long_form), short_form, long_form FROM alice_abbreviations WHERE pubmed_id=%(pmid)s", {"pmid" : temp["PMID"]}) @@ -396,10 +396,9 @@ def main(): parser.add_argument('--n_min', default=1, type=int, help='Minimum file number to process.') parser.add_argument('--n_max', default=1, type=int, help='Maximum file number to process.') - - # TODO: pass + do in abbreviation embiggening - #if DO_ABBREVIATIONS: - #download_allie() + if EXPAND_ABBREVIATIONS: + print("Downloading ALLIE abbreviation expansion database...") + download_allie() if not es.indices.exists("pubmed_abstracts"): es.indices.create("pubmed_abstracts")