-
Notifications
You must be signed in to change notification settings - Fork 0
/
topic_modeling_using_es.py
111 lines (92 loc) · 4.71 KB
/
topic_modeling_using_es.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from loguru import logger
import os
from dotenv import load_dotenv
import warnings
import tqdm
import pandas as pd
import traceback
import time
from src.config import ES_INDEX
from src.elasticsearch_utils import ElasticSearchClient
from src.utils import clean_title
warnings.filterwarnings("ignore")
load_dotenv()
if __name__ == "__main__":
# logs automatically rotate log file
os.makedirs("es_topic_modeling_logs", exist_ok=True)
logger.add(f"es_topic_modeling_logs/generate_topics_modeling.log", rotation="23:59")
btc_topics_list = pd.read_csv("btc_topics.csv")
btc_topics_list = btc_topics_list['Topics'].to_list()
elastic_search = ElasticSearchClient()
dev_urls = [
"https://lists.linuxfoundation.org/pipermail/lightning-dev/",
"https://lists.linuxfoundation.org/pipermail/bitcoin-dev/",
"https://delvingbitcoin.org/",
"https://gnusha.org/pi/bitcoindev/",
]
for dev_url in dev_urls:
dev_name = dev_url.split("/")[-2]
logger.info(f"dev_url: {dev_url}")
logger.info(f"dev_name: {dev_name}")
for topic in btc_topics_list:
SAVE_CSV = True
OUTPUT_DIR = "es_topic_modeling_output"
os.makedirs(OUTPUT_DIR, exist_ok=True)
CSV_FILE_PATH = fr"{OUTPUT_DIR}/{clean_title(topic)}.csv"
# fetch all docs that matches the provided topic
logger.info(f"Fetching docs for topic: {topic}")
docs_list = elastic_search.fetch_docs_with_keywords(
es_index=ES_INDEX, url=dev_url, keyword=topic
)
logger.success(f"TOTAL THREADS RECEIVED WITH A TOPIC: {str(topic)} = {len(docs_list)}")
if docs_list:
if os.path.exists(CSV_FILE_PATH):
stored_df = pd.read_csv(CSV_FILE_PATH)
logger.info(f"Shape of stored df: {stored_df.shape}")
stored_source_ids = stored_df['source_id'].to_list()
logger.info(f"Docs in stored df: {len(stored_source_ids)}")
else:
logger.info(f"CSV file path does not exist! Creating new one: {CSV_FILE_PATH}")
stored_df = pd.DataFrame(columns=['primary_topics', 'source_id', 'source_url'])
stored_source_ids = stored_df['source_id'].to_list()
# update topic to each doc
logger.info(f'updating topic to all these docs...')
for idx, doc in enumerate(tqdm.tqdm(docs_list)):
try:
doc_source_id = doc['_source']['id']
doc_source_url = doc['_source']['url']
doc_id = doc['_id']
doc_index = doc['_index']
logger.info(f"Doc Id: {doc_id}, Source Id: {doc_source_id}")
# update primary keyword
primary_kw = doc['_source'].get('primary_topics', [])
primary_kw.append(topic)
primary_kw = list(set(primary_kw))
# update topics to elasticsearch
elastic_search.es_client.update(
index=doc_index,
id=doc_id,
body={
'doc': {
"primary_topics": primary_kw if primary_kw else []
}
}
)
# save csv for each topic with top 5 docs and their topics
if idx <= 5 and SAVE_CSV:
row_data = {
'primary_topics': primary_kw if primary_kw else [],
'source_id': doc_source_id if doc_source_id else None,
'source_url': doc_source_url if doc_source_url else None
}
row_data = pd.Series(row_data).to_frame().T
stored_df = pd.concat([stored_df, row_data], ignore_index=True)
stored_df.drop_duplicates(subset='source_id', keep='first', inplace=True)
stored_df.to_csv(CSV_FILE_PATH, index=False)
time.sleep(2)
logger.info(f"csv file saved at IDX: {idx}, PATH: {CSV_FILE_PATH}")
except Exception as ex:
logger.error(f"Error occurred while updating topics: {ex} \n{traceback.format_exc()}")
else:
logger.info(f"NO THREADS FOUND FOR A TOPIC: {str(topic).upper()}")
logger.info(f"Process completed for dev_url: {dev_url}")