Skip to content

Commit

Permalink
Streams
Browse files Browse the repository at this point in the history
- Finished work to allow for more streams to be queried. Initialized by adding: 'implementers', 'snomed'.
  • Loading branch information
joeflack4 committed Sep 26, 2022
1 parent 2f95dc8 commit d4da9bb
Showing 1 changed file with 130 additions and 117 deletions.
247 changes: 130 additions & 117 deletions fhir_zulip_nlp/fhir_zulip_nlp.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@
# TODO: change CSV names to clarify that `zulip_raw_results.csv` is only for queried / matched messages
RAW_RESULTS_MATCHES_FILENAME = 'zulip_raw_results.csv'
RAW_RESULTS_ALL_FILENAME = 'zulip_raw_results_all.csv'
STREAM_NAME_FIELD = 'display_recipient' # strange, but this is the 'stream name' field from Zulip API
CONFIG = {
'zuliprc_path': os.path.join(ENV_DIR, '.zuliprc'), # rc = "runtime config"
'streams': ['terminology'],
'streams': ['terminology', 'implementers', 'snomed'],
'num_messages_per_query': 1000,
'outpath_user_info': os.path.join(PROJECT_DIR, 'zulip_user_info.csv'),
'outpath_report_counts': os.path.join(PROJECT_DIR, 'zulip_report1_counts.csv'),
Expand Down Expand Up @@ -176,8 +177,9 @@ def query_handler(
def format_df(df: pd.DataFrame) -> pd.DataFrame:
"""Format dataframe"""
# Reorganize columns
cols_head = ['category', 'keyword']
cols_tail = [x for x in list(df.columns) if x not in cols_head]
original_cols = list(df.columns)
cols_head = [x for x in ['stream', 'category', 'keyword'] if x in original_cols]
cols_tail = [x for x in original_cols if x not in cols_head]
df = df[cols_head + cols_tail]
# Sort values
df = df.sort_values(cols_head + ['timestamp'] if 'timestamp' in cols_tail else [])
Expand Down Expand Up @@ -212,7 +214,7 @@ def _get_messages_with_context(df: pd.DataFrame, context: str) -> pd.DataFrame:


def _get_counts_from_kw_messages(
df: pd.DataFrame, category: str, keyword: str, spelling: str, today: str, context: str = ''
df: pd.DataFrame, stream: str, category: str, keyword: str, spelling: str, today: str, context: str = ''
) -> Dict:
"""Get counts from keyword messages"""
# Get context info
Expand All @@ -226,6 +228,7 @@ def _get_counts_from_kw_messages(
threadlen = f'{z:.1f}'
# Create report
kw_report = {
'stream': stream,
'category': category,
'keyword': keyword,
'keyword_spelling': spelling,
Expand Down Expand Up @@ -253,19 +256,23 @@ def create_report_counts(
reports: List[Dict] = []
no_results: List[Dict] = []
today = str(date.today())
for c, keywords in category_keywords.items():
for k, spellings in keywords.items():
contexts = kw_contexts.get(k, [])
# add null context: needed to capture messages where no context words appear
contexts = contexts + [''] if '' not in contexts else contexts
for s in spellings:
df_i = df[df['keyword_spelling'] == s]
for context in contexts:
kw_report: Dict = _get_counts_from_kw_messages(
df=df_i, category=c, keyword=k, spelling=s, today=today, context=context)
if kw_report['num_messages_with_kw_spelling'] == 0:
no_results.append({'keyword': k, 'spelling': s, 'context': context, 'results': 0})
reports.append(kw_report)
for stream in CONFIG['streams']:
df_i = df[df[STREAM_NAME_FIELD] == stream]
for c, keywords in category_keywords.items():
for k, spellings in keywords.items():
contexts = kw_contexts.get(k, [])
# add null context: needed to capture messages where no context words appear
contexts = contexts + [''] if '' not in contexts else contexts
for s in spellings:
df_i2 = df_i[df_i['keyword_spelling'] == s]
for context in contexts:
kw_report: Dict = _get_counts_from_kw_messages(
df=df_i2, stream=stream, category=c, keyword=k,
spelling=s, today=today, context=context)
if kw_report['num_messages_with_kw_spelling'] == 0:
no_results.append(
{'stream': stream, 'keyword': k, 'spelling': s, 'context': context, 'results': 0})
reports.append(kw_report)

# Report
df_report = pd.DataFrame(reports)
Expand Down Expand Up @@ -304,98 +311,106 @@ def create_report_thread_length(
today = date.today()
tot_all, std_all, num_all_threads = 0, 0, 0
avg_total, std_tot = 0, 0
for i, (category, keywords) in enumerate(category_keywords.items()):
tot_category, var_category, avg_category, std_category, num_threads = 0, 0, 0, 0, 0

for i2, (k, spellings) in enumerate(keywords.items()):
contexts = kw_contexts.get(k, [])
# add null context '': needed to capture messages where no context words appear
contexts = contexts + [''] if '' not in contexts else contexts
for spelling in spellings:
df_i = df[df['keyword_spelling'] == spelling]
for context in contexts:
df_j = _get_messages_with_context(df_i, context)
df_j = df_j.sort_values(['timestamp']) # oldest first
threads: List[str] = list(df_j['subject'].unique())
# Average thread length
# TODO: Refactor to pandas?
tot_thread_len = 0
thread_data: Dict[str, pd.DataFrame] = {}
for thread in threads:
df_thread = df_j[df_j['subject'] == thread]
num_threads += 1
# TODO: Want to double check that timestamps are still/indeed sorted properly (i) here, and
# (ii) everywhere else where we're doing timestamps like this
# TODO: better: rather than get the first & last, timestamp. should be able to get max() & min()
thread_len = (list(df_thread['timestamp'])[-1]
- list(df_thread['timestamp'])[0]) / seconds_per_day
tot_thread_len += float(f'{thread_len:.1f}')
thread_data[thread] = df_thread
num_all_threads += 1
tot_all += thread_len
avg_len_kw_thread = round(tot_thread_len / len(threads), 3) if threads else 0
# Outliers
# TODO: Refactor to pandas to reduce lines and improve performance?
# TODO: Add cols for 1 and 2 std deviations?
# TODO: Might need to make 3 columns for these outliers: std deviations away from (i) keyword avg,
# (ii) category avg, (iii) avg of all of our queried category/keyword threads.
# Calc std deviation
sum_square_distance = 0
for thread in threads:
df_thread = thread_data[thread]
thread_len = (list(df_thread['timestamp'])[-1]
- list(df_thread['timestamp'])[0]) / seconds_per_day
sum_square_distance += (float(thread_len) - float(avg_len_kw_thread)) ** 2
stddev_kw_threads = math.sqrt(sum_square_distance / len(threads)) if threads else 0
# Calc how many std deviations away per thread
tot_category += tot_thread_len
var_category += stddev_kw_threads ** 2
std_all += stddev_kw_threads ** 2
for i3, thread in enumerate(threads):
outlier = False
df_thread = thread_data[thread]
thread_len = (list(df_thread['timestamp'])[-1]
- list(df_thread['timestamp'])[0]) / seconds_per_day
std_away = 0
if thread_len > stddev_kw_threads + avg_len_kw_thread \
or thread_len < avg_len_kw_thread - stddev_kw_threads:
outlier = True
std_away = abs(thread_len - avg_len_kw_thread) / stddev_kw_threads
if i3 == len(threads) - 1:
avg_category = round(tot_category / num_threads, 2)
std_category = round(math.sqrt(var_category / num_threads), 2)
num_threads = 0
tot_category = 0
var_category = 0
if i2 == len(list(keywords.keys())) - 1:
avg_total = round((tot_all / num_all_threads), 2)
std_tot = round(math.sqrt(std_all / num_all_threads), 2)
# Calc URL
thread_df = dict(df_thread.iloc[0]) # representative row of whole df; all values should be same
url = 'https://chat.fhir.org/#narrow/' + \
f'{thread_df["type"]}/{thread_df["stream_id"]}-{thread_df["display_recipient"]}' + \
f'/topic/{thread_df["subject"]}'
# Append to report
kw_report = {
'category': category,
'keyword': k,
'thread': thread,
'thread_url': url,
'thread_len_days': f'{thread_len:.1f}',
'avg_len_kw_outlier?': outlier,
'avg_len_kw': str(avg_len_kw_thread),
'stddev_kw': str(round(std_away, 2)),
'query_date': today,
}
if include_all_columns:
kw_report = {**kw_report, **{
'avg_len_category': avg_category,
'stddev_category': std_category,
'avg_len_total': str(avg_total),
'stddev_total': str(std_tot),
}}
avg_category, std_category, avg_total, std_tot = 0, 0, 0, 0
reports.append(kw_report)
for stream in CONFIG['streams']:
df_i = df[df[STREAM_NAME_FIELD] == stream]
for i, (category, keywords) in enumerate(category_keywords.items()):
tot_category, var_category, avg_category, std_category, num_threads = 0, 0, 0, 0, 0

for i2, (k, spellings) in enumerate(keywords.items()):
contexts = kw_contexts.get(k, [])
# add null context '': needed to capture messages where no context words appear
contexts = contexts + [''] if '' not in contexts else contexts
for spelling in spellings:
df_i2 = df_i[df_i['keyword_spelling'] == spelling]
for context in contexts:
df_i3 = _get_messages_with_context(df_i2, context)
df_i3 = df_i3.sort_values(['timestamp']) # oldest first
threads: List[str] = list(df_i3['subject'].unique())
# Average thread length
# TODO: Refactor to pandas?
tot_thread_len = 0
thread_data: Dict[str, pd.DataFrame] = {}
for thread in threads:
df_thread = df_i3[df_i3['subject'] == thread]
num_threads += 1
# TODO: Want to double check that timestamps are still/indeed sorted
# properly (i) here, and (ii) everywhere else where we're doing timestamps
# like this
# TODO: better: rather than get the first & last, timestamp. should be
# able to get max() & min()
thread_len = (list(df_thread['timestamp'])[-1]
- list(df_thread['timestamp'])[0]) / seconds_per_day
tot_thread_len += float(f'{thread_len:.1f}')
thread_data[thread] = df_thread
num_all_threads += 1
tot_all += thread_len
avg_len_kw_thread = round(tot_thread_len / len(threads), 3) if threads else 0
# Outliers
# TODO: Refactor to pandas to reduce lines and improve performance?
# TODO: Add cols for 1 and 2 std deviations?
# TODO: Might need to make 3 columns for these outliers: std deviations away
# from (i) keyword avg,
# (ii) category avg, (iii) avg of all of our queried category/keyword threads.
# Calc std deviation
sum_square_distance = 0
for thread in threads:
df_thread = thread_data[thread]
thread_len = (list(df_thread['timestamp'])[-1]
- list(df_thread['timestamp'])[0]) / seconds_per_day
sum_square_distance += (float(thread_len) - float(avg_len_kw_thread)) ** 2
stddev_kw_threads = math.sqrt(sum_square_distance / len(threads)) if threads else 0
# Calc how many std deviations away per thread
tot_category += tot_thread_len
var_category += stddev_kw_threads ** 2
std_all += stddev_kw_threads ** 2
for i3, thread in enumerate(threads):
outlier = False
df_thread = thread_data[thread]
thread_len = (list(df_thread['timestamp'])[-1]
- list(df_thread['timestamp'])[0]) / seconds_per_day
std_away = 0
if thread_len > stddev_kw_threads + avg_len_kw_thread \
or thread_len < avg_len_kw_thread - stddev_kw_threads:
outlier = True
std_away = abs(thread_len - avg_len_kw_thread) / stddev_kw_threads
if i3 == len(threads) - 1:
avg_category = round(tot_category / num_threads, 2)
std_category = round(math.sqrt(var_category / num_threads), 2)
num_threads = 0
tot_category = 0
var_category = 0
if i2 == len(list(keywords.keys())) - 1:
avg_total = round((tot_all / num_all_threads), 2)
std_tot = round(math.sqrt(std_all / num_all_threads), 2)
# Calc URL
# representative row of whole df; all values should be same
thread_df = dict(df_thread.iloc[0])
url = 'https://chat.fhir.org/#narrow/' + \
f'{thread_df["type"]}/{thread_df["stream_id"]}-{thread_df["display_recipient"]}' + \
f'/topic/{thread_df["subject"]}'
# Append to report
kw_report = {
'stream': stream,
'category': category,
'keyword': k,
'thread': thread,
'thread_url': url,
'thread_len_days': f'{thread_len:.1f}',
'avg_len_kw_outlier?': outlier,
'avg_len_kw': str(avg_len_kw_thread),
'stddev_kw': str(round(std_away, 2)),
'query_date': today,
}
# TODO: prolly want to summarize for 'stream' too:
if include_all_columns:
kw_report = {**kw_report, **{
'avg_len_category': avg_category,
'stddev_category': std_category,
'avg_len_total': str(avg_total),
'stddev_total': str(std_tot),
}}
avg_category, std_category, avg_total, std_tot = 0, 0, 0, 0
reports.append(kw_report)

df_report = pd.DataFrame(reports)
df_report = format_df(df_report)
Expand All @@ -410,8 +425,6 @@ def create_report_users(
"""Report: Users"""
# display_recipient: Data on the recipient of the message; either the name of a stream or a dictionary containing
# basic data on the users who received the message. See: https://zulip.com/api/get-messages
stream_name_field = 'display_recipient'

# Add only messages from `df_all` that aren't in `df_matches`
# - Create common primary key
for df_i in [df_all, df_matches]:
Expand Down Expand Up @@ -450,9 +463,9 @@ def create_report_users(
# todo: This could also probably use a refactor after adding 'thread.id' logic; maybe via a thread-centric df.
user_participation: List[Dict[str, Any]] = []
df = df_matches # assign to shorter variable for more readable code
streams = df[stream_name_field].unique()
streams = df[STREAM_NAME_FIELD].unique()
for stream in streams:
df_i = df[df[stream_name_field] == stream]
df_i = df[df[STREAM_NAME_FIELD] == stream]
categories = df_i['category'].unique()
for c in categories:
df_i2 = df_i[df_i['category'] == c]
Expand Down Expand Up @@ -567,7 +580,7 @@ def plot_category_msg_counts_by_year():
for c in categories:
df_i = df[df['category'] == c]
keywords = df_i['keyword'].unique()
kw_year_counts= {}
kw_year_counts = {}
for kw in keywords:
df_i2 = df_i[df_i['keyword'] == kw]
year_counts = []
Expand Down Expand Up @@ -634,7 +647,7 @@ def plot_table_user_participation_counts(path) -> pd.DataFrame:
}
rows.append(row)
df2 = pd.DataFrame(rows)
df2.to_csv('zulip_dataviz_table_user_activity.csv', index=False)
df2.to_csv(os.path.join(PROJECT_DIR, 'zulip_dataviz_table_user_activity.csv'), index=False)
return df2


Expand Down Expand Up @@ -766,9 +779,9 @@ def _get_keyword_contexts(use_cached_keyword_inputs=False) -> Dict[str, List[str
return keyword_contexts


def run(analyze_only=False, analytical_tables_only=False, use_cached_keyword_inputs=False):
def run(analyze_only=False, plots_only=False, use_cached_keyword_inputs=False):
"""Run program"""
if not analytical_tables_only:
if not plots_only:
# Get inputs
keywords: TYPE_KEYWORDS_DICT = _get_keywords(use_cached_keyword_inputs)
kw_contexts: Dict[str, List[str]] = _get_keyword_contexts()
Expand Down

0 comments on commit d4da9bb

Please sign in to comment.