-
Notifications
You must be signed in to change notification settings - Fork 1
/
covid_data.py
248 lines (202 loc) · 11.1 KB
/
covid_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
import os, subprocess
import pandas as pd
import numpy as np
import json
from datasetmanager import *
class CovidData:
def __init__(self, routes_locations = 'dataset/airport_routes.csv',
border_closures_csv='dataset/border_closures.csv',
border_closures_json='dataset/border_closures.json',
eu_countries_csv='dataset/eu_countries.csv',
thread_num=20):
"""
A wrapper class for organising all of the data for the COVID-19 project.
parameters:
routes_locations: the file location of the routes dataset mapping
locations in the COVID dataset to airports created
by the script download_route_dataset.py.
thread_num: the number of threads to use when running
download_route_dataset.py
"""
self.border_closures_csv = border_closures_csv
self.border_closures_json = border_closures_json
self.eu_countries_csv = eu_countries_csv
covid_manager = CovidManager()
datasets = covid_manager.getDatasets()
self.confirmed_df = datasets['full']['confirmed']
self.deaths_df = datasets['full']['deaths']
# Important Note! Recovered can only be used for global data!
self.recovered_df = datasets['covid_recovered']
airport_manager = AirportToLocation(datasets['full']['confirmed'])
airport_manager.getDataset()
if not os.path.isfile(routes_locations):
subprocess.run(['python3', 'download_route_dataset.py', '-t', str(thread_num)], capture_output=True)
self.routes_df = pd.read_csv(routes_locations)
def createBorderDataset(self):
border_closure_df = pd.read_csv(self.border_closures_csv, delimiter=':').fillna('none')
eu_countries_str = '|'.join(pd.read_csv(self.eu_countries_csv)['Country'].tolist())
def parse_border_list(list_str):
list_str = list_str.replace("EU", eu_countries_str)
return list_str.split("|")
border_closures = {
'country': {},
'state': {}
}
join_location_name = lambda state, country: '{}:{}'.format(state, country)
for _index, row in border_closure_df.iterrows():
if row['Province/State'] == 'none':
country_closures = border_closures['country'].get(row['Country/Region'], [])
closure_dict = {
'date' : row['ClosureDate'],
'whitelist' : [],
'blacklist' : []
}
if not row['WhiteList'] == 'none':
closure_dict['whitelist'] = parse_border_list(row["WhiteList"])
if not row['BlackList'] == 'none':
closure_dict['blacklist'] = parse_border_list(row["BlackList"])
country_closures.append(closure_dict)
border_closures['country'][row['Country/Region']] = country_closures
else:
location_name = join_location_name(row['Province/State'], row['Country/Region'])
state_closures = border_closures['state'].get(location_name, [])
closure_dict = {
'date' : row['ClosureDate'],
'whitelist' : [],
'blacklist' : []
}
if not row['WhiteList'] == 'none':
closure_dict['whitelist'] = parse_border_list(row["WhiteList"])
if not row['BlackList'] == 'none':
closure_dict['blacklist'] = parse_border_list(row["BlackList"])
state_closures.append(closure_dict)
border_closures['state'][location_name] = state_closures
for eu_country in eu_countries_str.split('|'):
if eu_country not in border_closures and not eu_country == "Ireland":
country_closures = border_closures['country'].get(eu_country, [])
closure_dict = {
'date' : '3/17/20',
'whitelist' : parse_border_list("EU"),
'blacklist' : []
}
country_closures.append(closure_dict)
border_closures['country'][eu_country] = country_closures
with open(self.border_closures_json, 'w') as fp:
json.dump(border_closures, fp)
def loadBorderDataset(self):
with open(self.border_closures_json, 'r') as fp:
data = json.load(fp)
return data
def routesToWeightedEdges(self, bin_region_column, country):
"""
Returns the number of routes going between locations based on
bin_region_column and country parameters.
parameters:
bin_region_column: Is either 'county', 'state' or 'country'.
'county' : routes between locations should be
based on county locations if possible.
(Largest set)
'state' : routes between locations should be
based on state locations if possible.
(Second largest set)
'country': routes between locations should be
based on country locations.
(Smallest set)
country: If None then all routes between countries are
returned. If not None then only looks at routes
within that country.
returns:
A dataframe specifying the number of routes between locations
specified by the parameters.
"""
new_routes_df = self.routes_df.copy(deep=True).fillna("none")
if not country == None:
new_routes_df = new_routes_df.loc[(new_routes_df['DepartCountry/Region'] == country) & (new_routes_df['ArrivalCountry/Region'] == country)]
new_routes_df['NumberOfRoutes'] = 1
agg_dict = {'NumberOfRoutes' : ['sum']}
new_column = ['NumberOfRoutes']
if bin_region_column == 'county':
return new_routes_df.groupby([ 'DepartCounty',
'DepartProvince/State',
'DepartCountry/Region',
'ArrivalCounty',
'ArrivalProvince/State',
'ArrivalCountry/Region'])['NumberOfRoutes'].sum().reset_index()
if bin_region_column == 'state':
return new_routes_df.groupby([ 'DepartProvince/State',
'DepartCountry/Region',
'ArrivalProvince/State',
'ArrivalCountry/Region'])['NumberOfRoutes'].sum().reset_index()
if bin_region_column == 'country':
return new_routes_df.groupby([ 'DepartCountry/Region',
'ArrivalCountry/Region'])['NumberOfRoutes'].sum().reset_index()
def getData(self, bin_region_column='county', country=None, specific_date=None):
"""
Gets the COVID data and routes inbetween locations based on the
parameters of the function.
You can only get the recovered dataset if bin_region_column='country'.
parameters:
bin_region_column: Is either 'county', 'state' or 'country'.
'county' : gets data with locations as counties,
provinces/states and countries/regions
(Largest set)
'state' : gets data with locations as
provinces/states and countries/regions
(Second largest set)
'country': just gets data based on countries.
(Smallest set)
country: If None then data from all countries will be used
Otherwise it will only return data for that
specific country.
specific_date: If None then returns data about all dates about
COVID.
Otherwise it will return the data for that
specific date.
If set to 'latest' then returns the data from the
latest date of recording.
returns:
Returns a dictionary stores the COVID data as dataframes based on
the parameters and a dataframe storing the routes between locations
in the COVID dataframes.
"""
assert (bin_region_column == 'county') or (bin_region_column == 'state') or (bin_region_column == 'country'), "Invalid region parsed to bin_region_column! Needs to be county, state or country"
data = {
'confirmed' : self.confirmed_df.copy(deep=True).fillna("none"),
'deaths' : self.deaths_df.copy(deep=True).fillna("none")
}
if bin_region_column == 'country':
data['recovered'] = self.recovered_df.copy(deep=True).fillna("none")
if not country == None:
for data_type in data:
data[data_type] = data[data_type].loc[data[data_type]['Country/Region'] == country]
dates = data['confirmed'].columns[5:].to_list()
new_columns = ['Lat', 'Long'] + dates
# Order is required so Lat and Long are before dates
agg_dict = {}
agg_dict['Lat'] = ['mean']
agg_dict['Long'] = ['mean']
if specific_date == None:
for date in dates: agg_dict[date] = ['sum']
elif not specific_date == 'latest':
assert (specific_date in dates), "{} is not a valid date. Check the covid .csv files for what a valid dates look like!".format(specific_date)
agg_dict[specific_date] = ['sum']
new_columns = ['Lat', 'Long'] + [specific_date]
else:
latest_date = dates[-1]
agg_dict[latest_date] = ['sum']
new_columns = ['Lat', 'Long'] + [latest_date]
for data_type in data:
df = data[data_type]
# County specific dataset is just the full COVID dataset
if bin_region_column == 'county': continue
if bin_region_column == 'state':
new_df = df.groupby(['Province/State', 'Country/Region']).agg(agg_dict)
new_df.columns = new_columns
new_df = new_df.reset_index()
data[data_type] = new_df
if bin_region_column == 'country':
new_df = df.groupby(['Country/Region']).agg(agg_dict)
new_df.columns = new_columns
new_df = new_df.reset_index()
data[data_type] = new_df
return data, self.routesToWeightedEdges(bin_region_column, country)