-
Notifications
You must be signed in to change notification settings - Fork 2
/
riding_forecast.py
238 lines (221 loc) · 8.08 KB
/
riding_forecast.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import csv
import datetime
from scipy.stats import norm
from regional_poll_interpolator import RegionalPollInterpolator
import riding_poll_model
party_long_names = {
'cpc': 'Conservative/Conservateur',
'lpc': 'Liberal/Lib',
'ndp': 'NDP-New Democratic Party/NPD-Nouveau Parti d',
'gpc': 'Green Party/Parti Vert',
'bq': 'Bloc Qu',
'oth': 'Independent',
}
province_to_region = {
'Newfoundland and Labrador': 'ATL',
'Prince Edward Island': 'ATL',
'Nova Scotia': 'ATL',
'New Brunswick': 'ATL',
'Quebec': 'QC',
'Ontario': 'ON',
'Manitoba': 'SK_MB',
'Saskatchewan': 'SK_MB',
'Alberta': 'AB',
'British Columbia': 'BC',
'Yukon': 'Canada',
'Northwest Territories': 'Canada',
'Nunavut': 'Canada',
}
province_abbreviations = {
'Newfoundland and Labrador': 'NL',
'Prince Edward Island': 'PE',
'Nova Scotia': 'NS',
'New Brunswick': 'NB',
'Quebec': 'QC',
'Ontario': 'ON',
'Manitoba': 'MB',
'Saskatchewan': 'SK',
'Alberta': 'AB',
'British Columbia': 'BC',
'Yukon': 'YT',
'Northwest Territories': 'NT',
'Nunavut': 'NU',
}
provinces_by_numeric_code = {
'10': 'NL',
'11': 'PE',
'12': 'NS',
'13': 'NB',
'24': 'QC',
'35': 'ON',
'46': 'MB',
'47': 'SK',
'48': 'AB',
'59': 'BC',
'60': 'YT',
'61': 'NT',
'62': 'NU',
}
nonexistent_candidates = {
'59014': ['gpc'],
'10004': ['gpc'],
}
def WhichParty(s):
"""If the given string contains a party name, return its abbreviation."""
for abbreviation, long_name in party_long_names.items():
if long_name in s:
return abbreviation
return None
def WhichRegion(s):
"""If the given string contains a province name, return its region code."""
for province, region in province_to_region.items():
if province in s:
return region
return None
def WhichProvince(s):
"""If the given string contains a province name, return its short form."""
for province, abbr in province_abbreviations.items():
if province in s:
return abbr
return None
def NormalizeDictVector(d):
"""Adjusts numerical values so they add up to 1."""
normalized = {}
divisor = sum(d.values())
for key in d:
normalized[key] = d[key] / divisor
return normalized
def KeyWithHighestValue(d, forbidden_keys=[]):
"""Return the key with the highest value.
Optionally, a list of forbidden keys can be provided. If so, the function
will return the key with the next-highest value, but which is not
forbidden.
"""
mv = -1
mk = None
for k, v in d.items():
if k in forbidden_keys:
continue
if v > mv:
mk = k
mv = v
return mk
# Load regional polling data.
interpolator = RegionalPollInterpolator()
interpolator.LoadFromCsv('regional_poll_averages.csv')
interpolator.LoadFromCsv('regional_baseline.csv')
baseline_date = datetime.datetime(2011, 5, 2)
# Load and process per-riding election results from 2011.
old_ridings = {}
with open('table_tableau12.csv') as csv_file:
reader = csv.DictReader(csv_file)
for row in reader:
riding_name = row['Electoral District Name/Nom de circonscription']
riding_number = row['Electoral District Number']
popular_vote = float(row['Percentage of Votes Obtained'])
party = WhichParty(row['Candidate/Candidat'])
if not party:
continue
province = WhichProvince(row['Province'])
region = WhichRegion(row['Province'])
assert region
before = interpolator.Interpolate(region, party, baseline_date)
after = interpolator.GetMostRecent(region, party)
if before > 2: # As in 2% not 200%
projected_gain = after / before
else:
projected_gain = 1
projection = popular_vote * projected_gain
if not riding_number in old_ridings:
old_ridings[riding_number] = {
'2011': {}, 'projections': {},
'name': riding_name,
'number': riding_number,
'province': province}
r = old_ridings[riding_number]
r['2011'][party] = popular_vote
r['projections'][party] = projection
# Calculate the transposition from old ridings (2003) to new ridings (2013).
new_ridings = {}
with open('TRANSPOSITION_338FED.csv') as csv_file:
# Skip the first few lines of the file, to get to the data part.
for i in range(4):
next(csv_file)
reader = csv.DictReader(csv_file)
for row in reader:
new_riding_number = row['2013 FED Number']
if not new_riding_number:
continue
new_riding_name = row['2013 FED Name']
old_riding_number = row['2003 FED Number from which the 2013 ' +
'FED Number is constituted']
prov_num_code = row['Province and territory numeric code']
province = provinces_by_numeric_code[prov_num_code]
assert province
population_2013 = float(row['2013 FED - Population'])
population_transferred = float(
row['Population transferred to 2013 FED'])
population_percent = population_transferred / population_2013
all_votes = row['All votes']
electors = row['Electors on lists']
if new_riding_number not in new_ridings:
new_ridings[new_riding_number] = {
'name': new_riding_name,
'number': new_riding_number,
'province': province,
'feeders': {},
'total_votes_2011': 0,
'total_electors_2011': 0,
'population': int(population_2013)}
r = new_ridings[new_riding_number]
r['feeders'][old_riding_number] = population_percent
r['total_votes_2011'] += int(all_votes)
r['total_electors_2011'] += int(electors)
# Output final stats for each riding.
party_order = ['cpc', 'ndp', 'lpc', 'gpc', 'bq', 'oth']
readable_party_names = {
'cpc': 'CON',
'lpc': 'LIB',
'ndp': 'NDP',
'gpc': 'GRN',
'bq': 'BQ',
'oth': 'OTH',
}
with open('riding_forecasts.csv', 'wb') as csv_file:
csv_writer = csv.writer(csv_file)
csv_writer.writerow(
['province', 'name', 'number,'] +
[readable_party_names[p].lower() for p in party_order] +
['projected_winner', 'strategic_vote', 'confidence', 'turnout_2011'])
for r in new_ridings.values():
projections = {}
riding_name = r['name']
riding_number = str(r['number'])
province = r['province']
# Project this riding by mixing old-riding projections.
for feeder_number, weight in r['feeders'].items():
feeder = old_ridings[feeder_number]
normalized = NormalizeDictVector(feeder['projections'])
for party, support in normalized.items():
if party not in projections:
projections[party] = 0
projections[party] += support * weight
# Upgrade the projections for ridings that have local polling data.
projections = riding_poll_model.projections_by_riding_number.get(
riding_number, projections)
for party in nonexistent_candidates.get(riding_number, []):
projections[party] = 0
projections = NormalizeDictVector(projections)
ordered_projections = [projections.get(p, 0) for p in party_order]
projected_winner = KeyWithHighestValue(projections)
runner_up = KeyWithHighestValue(projections, [projected_winner])
strategic_vote = KeyWithHighestValue(projections, ['cpc'])
gap = projections[projected_winner] - projections[runner_up]
projected_winner = readable_party_names[projected_winner]
strategic_vote = readable_party_names[strategic_vote]
confidence = norm.cdf(gap / 0.25)
turnout = float(r['total_votes_2011']) / r['total_electors_2011']
csv_writer.writerow([province, riding_name, riding_number] +
ordered_projections +
[projected_winner, strategic_vote, confidence,
turnout])