-
Notifications
You must be signed in to change notification settings - Fork 34
/
certbotstratoapi.py
249 lines (203 loc) · 8.81 KB
/
certbotstratoapi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
"""Certbot-Strato-API Class"""
import os
import re
import urllib
import pyotp
import requests
from bs4 import BeautifulSoup
class CertbotStratoApi:
"""Class to validate domains for Certbot with dns-01 challange"""
def __init__(self, api_url=None):
""" Initializes the data structure """
if api_url is None:
self.api_url = 'https://www.strato.de/apps/CustomerService'
else:
self.api_url = api_url
self.txt_key = '_acme-challenge'
self.txt_value = os.environ['CERTBOT_VALIDATION']
self.domain_name = os.environ['CERTBOT_DOMAIN']
self.second_level_domain_name = re.search(r'([\w-]+\.[\w-]+)$',
self.domain_name).group(1)
print(f'INFO: txt_key: {self.txt_key}')
print(f'INFO: txt_value: {self.txt_value}')
print(f'INFO: second_level_domain_name: {self.second_level_domain_name}')
print(f'INFO: domain_name: {self.domain_name}')
# setup session for cookie sharing
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:126.0) Gecko/20100101 Firefox/126.0'}
self.http_session = requests.session()
self.http_session.headers.update(headers)
# Set later
self.session_id = ''
self.package_id = 0
self.records = []
def login_2fa(
self,
response: requests.Response,
username: str,
totp_secret: str,
totp_devicename: str,
) -> requests.Response:
"""Login with Two-factor authentication by TOTP on Strato website.
:param str totp_secret: 2FA TOTP secret hash
:param str totp_devicename: 2FA TOTP device name
:returns: Original response or 2FA response
:rtype: requests.Response
"""
# Is 2FA used
soup = BeautifulSoup(response.text, 'html.parser')
if soup.find('h1', string=re.compile('Zwei\\-Faktor\\-Authentifizierung')) is None:
print('INFO: 2FA is not used.')
return response
if (not totp_secret) or (not totp_devicename):
print('ERROR: 2FA parameter is not completely set.')
return response
param = {'identifier': username}
# Set parameter 'totp_token'
totp_input = soup.find('input', attrs={'type': 'hidden', 'name': 'totp_token'})
if totp_input is not None:
param['totp_token'] = totp_input['value']
else:
print('ERROR: Parsing error on 2FA site by totp_token.')
return response
# Set parameter 'action_customer_login.x'
param['action_customer_login.x'] = 1
# No idea what this regex does
# TODO: rewrite with beautifulsoup
# Set parameter pw_id
for device in re.finditer(
rf'<option value="(?P<value>(S\.{username}\.\w*))"'
r'( selected(="selected")?)?\s*>(?P<name>(.+?))</option>',
response.text):
if totp_devicename.strip() == device.group('name').strip():
param['pw_id'] = device.group('value')
break
if param.get('pw_id') is None:
print('ERROR: Parsing error on 2FA site by device name.')
return response
# Set parameter 'totp'
param['totp'] = pyotp.TOTP(totp_secret).now()
print(f'DEBUG: totp: {param.get("totp")}')
request = self.http_session.post(self.api_url, param)
return request
def login(
self,
username: str,
password: str,
totp_secret: str = None,
totp_devicename: str = None,
) -> bool:
"""Login to Strato website. Requests session ID.
:param str username: Username or customer number of
'STRATO Customer Login'
:param str password: Password of 'STRATO Customer Login'
:param str totp-secret: 2FA TOTP secret hash
:param str totp-devicename: 2FA TOTP device name
:returns: Successful login
:rtype: bool
"""
# request session id
self.http_session.get(self.api_url)
data={'identifier': username, 'passwd': password, 'action_customer_login.x': 'Login'}
request = self.http_session.post(self.api_url, data=data)
# Check 2FA Login
request = self.login_2fa(request, username,
totp_secret, totp_devicename)
# Check successful login
parsed_url = urllib.parse.urlparse(request.url)
query_parameters = urllib.parse.parse_qs(parsed_url.query)
if 'sessionID' not in query_parameters:
return False
self.session_id = query_parameters['sessionID'][0]
print(f'DEBUG: session_id: {self.session_id}')
return True
def get_package_id(self) -> None:
"""Requests package ID for the selected domain."""
# request strato packages
request = self.http_session.get(self.api_url, params={
'sessionID': self.session_id,
'cID': 0,
'node': 'kds_CustomerEntryPage',
})
soup = BeautifulSoup(request.text, 'html.parser')
package_anchor = soup.select_one(
'#package_list > tbody >'
f' tr:has(.package-information:-soup-contains("{self.second_level_domain_name}"))'
' .jss_with_own_packagename a'
)
if package_anchor:
if package_anchor.has_attr('href'):
link_target = urllib.parse.urlparse(package_anchor["href"])
self.package_id = urllib.parse.parse_qs(link_target.query)["cID"][0]
print(f'INFO: strato package id (cID): {self.package_id}')
return
print(f'ERROR: Domain {self.second_level_domain_name} not '
'found in strato packages. Using fallback cID=1')
self.package_id = 1
def get_txt_records(self) -> None:
"""Requests all txt and cname records related to domain."""
request = self.http_session.get(self.api_url, params={
'sessionID': self.session_id,
'cID': self.package_id,
'node': 'ManageDomains',
'action_show_txt_records': '',
'vhost': self.domain_name
})
# No idea what this regex does
# TODO: rewrite with beautifulsoup
for record in re.finditer(
r'<select [^>]*name="type"[^>]*>.*?'
r'<option[^>]*value="(?P<type>[^"]*)"[^>]*selected[^>]*>'
r'.*?</select>.*?'
r'<input [^>]*value="(?P<prefix>[^"]*)"[^>]*name="prefix"[^>]*>'
r'.*?<textarea [^>]*name="value"[^>]*>(?P<value>.*?)</textarea>',
request.text):
self.records.append({
'prefix': record.group('prefix'),
'type': record.group('type'),
'value': record.group('value')
})
print('INFO: Current cname/txt records:')
list(print(f'INFO: - {item["prefix"]} {item["type"]}: {item["value"]}')
for item in self.records)
def add_txt_record(self, prefix: str, record_type: str, value: str) -> None:
"""Add a txt/cname record.
:param prefix str: Prefix of record
:param record_type str: Type of record ('TXT' or 'CNAME')
:param value str: Value of record
"""
self.records.append({
'prefix': prefix,
'type': record_type,
'value': value,
})
def remove_txt_record(self, prefix: str, record_type: str) -> None:
"""Remove a txt/cname record.
:param prefix str: Prefix of record
:param record_type str: Type of record ('TXT' or 'CNAME')
"""
for i in reversed(range(len(self.records))):
if (self.records[i]['prefix'] == prefix
and self.records[i]['type'] == record_type):
self.records.pop(i)
def set_amce_record(self) -> None:
"""Set or replace AMCE txt record on domain."""
self.add_txt_record(self.txt_key, 'TXT', self.txt_value)
def reset_amce_record(self) -> None:
"""Reset AMCE txt record on domain."""
self.remove_txt_record(self.txt_key, 'TXT')
def push_txt_records(self) -> None:
"""Push modified txt records to Strato."""
print('INFO: New cname/txt records:')
list(print(f'INFO: - {item["prefix"]} {item["type"]}: {item["value"]}')
for item in self.records)
self.http_session.post(self.api_url, {
'sessionID': self.session_id,
'cID': self.package_id,
'node': 'ManageDomains',
'vhost': self.domain_name,
'spf_type': 'NONE',
'prefix': [r['prefix'] for r in self.records],
'type': [r['type'] for r in self.records],
'value': [r['value'] for r in self.records],
'action_change_txt_records': 'Einstellung+übernehmen',
})