-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathoutbot_report.py
119 lines (96 loc) · 3.71 KB
/
outbot_report.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import requests
import json
from datetime import datetime
import secrets
import sys
PLUGIN_URL = "https://api.outbreak.info/resources/query?sort=-date&fields=date&q=curatedBy.name:{name}&size=1"
METADATA_URL = "https://api.outbreak.info/resources/query?aggs=curatedBy.name&facet_size=1000"
EPI_URL = "https://api.outbreak.info/covid19/query?fields=date&size=1&sort=-date"
GEN_URL = "https://api.outbreak.info/genomics/metadata"
today = datetime.now()
def get_icon(date_difference, priority=None):
ALERTS = ["🟢", "🟡", "🟠", "🔴"]
SCHEDULES = {"high": [1, 3, 7], "default": [3, 7, 30], "low": [7, 10, 30]}
schedule = SCHEDULES[priority if priority else "default"]
if date_difference < schedule[0]:
return "🟢"
elif date_difference < schedule[1]:
return "🟡"
elif date_difference < schedule[2]:
return "🟠"
else:
return "🔴"
def format_days(date_difference):
if date_difference == 0:
return "today"
elif date_difference == 1:
return "yesterday"
return f"{date_difference} days old"
class Plugin:
def __init__(self, name, url=None, priority="default", headers=None):
if ' ' in name:
name = f'"{name}"'
self.name = name
self.url = PLUGIN_URL.format(name=name)
self.priority = priority
self.total = None
self.headers = headers or {}
if url:
self.url = url
self.set_info()
def set_info(self):
plugin_request = requests.get(self.url, headers=self.headers)
try:
plugin_info = plugin_request.json()
if self.name == 'genomics':
self.total = plugin_info['stats']['total']
# truncate out HH-MM to ignore while parsing
latest_date_str = plugin_info['src']['genomics_api']['version'][:10]
else:
self.total = plugin_info['total']
latest_date_str = plugin_info['hits'][0]['date']
if latest_date_str is None:
raise Exception
except:
self.date_difference = -1
return
latest_date = datetime.strptime(latest_date_str, '%Y-%m-%d')
date_difference = (today - latest_date).days
self.date_difference = date_difference
def set_message(self):
if self.date_difference == -1:
self.message = f"⚪️ *{self.name}*: age unknown ({self.total:,})"
return
icon = get_icon(self.date_difference)
date_str = format_days(self.date_difference)
message = f"{icon} *{self.name}*: {date_str}"
if self.total:
message += f" ({self.total:,})"
self.message = message
def __lt__(self, other):
try:
return self.date_difference < other.date_difference
except:
return True
def __str__(self):
try:
self.set_message()
return self.message or ""
except Exception as e:
return f"Plugin info for {self.name} raised error {e}"
meta_request = requests.get(METADATA_URL)
metadata = meta_request.json()
plugin_names = [i['term'] for i in metadata['facets']['curatedBy.name']['terms']]
plugins = [Plugin(name) for name in plugin_names]
plugins.sort()
epi = Plugin("epidemiological data", url=EPI_URL)
epi.date_difference = epi.date_difference - 1
plugins.append(epi)
genomics = Plugin("genomics", url=GEN_URL, headers={'Authorization': secrets.GEN_AUTH})
plugins.append(genomics)
printmode = '--log' in sys.argv
for m in [str(i) for i in plugins]:
if printmode:
print(m)
else:
requests.post(secrets.SLACK_HOOK_URL, json={'text': m})