-
Notifications
You must be signed in to change notification settings - Fork 0
/
SSLAuthSrv.py
177 lines (143 loc) · 5.38 KB
/
SSLAuthSrv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn as TMI
import ssl, urllib, threading
import sys, os, shutil, base64, posixpath
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
#3-2 stuff
#if python_version.startswith('3'):
#from urllib.parse import parse_qs
#from http.server import BaseHTTPRequestHandler
#Info!
#A mostly simple HTTP server, now with features!
###########################
#Features working:Webserver without serving your drive, adds Basic Auth, SSL(with OpenSSL),
#Todo: digest auth, do_POST() all the things, log request, log error
###########################
#Notes:
#Methods starting with "do_" are overrides or expanded header functions
#self.wfile is inherited way to write data to client
LPORT = 443
CERTFILE_PATH = "localhost.pem"
class TestHandler(BaseHTTPRequestHandler):
''' Main class to present webpages and authentication. '''
key = ''
SrvObject = None
EnableFileRead = False
pages = {}
#content type examples
#self.send_header('Content-type', 'text/html')
#self.send_header("Content-type", 'text/plain')
#self.send_header("Content-type", 'application/octet-stream')
def do_AuthenticateHeader(self):
self.send_response(401)
self.send_header('WWW-Authenticate', 'Basic realm=\"Test\"')
self.send_header('Content-type', 'text/html')
self.end_headers()#Sends a blank line, indicating the end of the HTTP headers in the response
def do_200(self):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
def do_GET(self):
"""Serve a GET request."""
self.logic(head=False)
def do_HEAD(self):
"""Serve a HEAD request."""
self.logic()
def parseURI(self):
i = {}
#parse URI
print(self.path)
o = urlparse(self.path)
#Stored paged, dynamic pages and content in the future
if o.path.lower():# in self.pages and self.EnableFileRead == True:
if o.path.lower() == '/' or o.path.lower() == '/index.html':
i = self.pages['index'],0
elif o.path.lower() == '/home':
i = self.pages['home'],1
elif o.path.lower() == '/about':
i = self.pages['about'],0
elif o.path.lower() == '/news':
i = self.pages['news'],0
elif o.path.lower() == '/blog':
i = self.pages['blog'],0
elif o.path.lower() == '/contact': #complaints page with picture upload
i = self.pages['contact'],0
else:
return None,None
return i #return tuple of page,challenge
def logic(self, head=True):
page = None
page,challenge = self.parseURI()
if challenge == 1:
if self.headers.getheader('Authorization') == None:
self.do_AuthenticateHeader()
self.send_error(404, "File not found")
return
elif self.headers.getheader('Authorization') == 'Basic '+key:
pass
else:#likely failed password
self.do_AuthenticateHeader()
self.send_error(404, "File not found")
return
#Give 200 OK
#self.do_200()
#if page not None:
#if head ==
#Write data to client
if page == None:
self.send_error(404, "File not found")
return
else:
if head == False:
self.do_200()
#print(page)
self.wfile.write(page)
else:
self.do_200()
return
def copyfile(self, source, outputfile): #not used now but may be later
shutil.copyfileobj(source, outputfile)
class ThreadedHTTPServer(TMI, HTTPServer):
"""Handle requests in a separate thread."""
#Make dict from files
def BuildPageCache():
odict = {}
blobs = []
pages = []
with open("index.html","rb") as ifile:
blobs.append(ifile.read())
pages.append('index')
with open("home.html","rb") as ifile:
blobs.append(ifile.read())
pages.append('home')
with open("about.html","rb") as ifile:
blobs.append(ifile.read())
pages.append('about')
with open("news.html","rb") as ifile:
blobs.append(ifile.read())
pages.append('news')
with open("blog.html","rb") as ifile:
blobs.append(ifile.read())
pages.append('blog')
with open("contact.html","rb") as ifile:
blobs.append(ifile.read())
pages.append('contact')
odict = dict(zip(pages,blobs))
return odict
#content type examples
#self.send_header('Content-type', 'text/html')
#self.send_header("Content-type", 'text/plain')
#self.send_header("Content-type", 'application/octet-stream')
if __name__ == '__main__':
key = base64.b64encode(b"cookie:cookie")#sys.argv[1])
TH = TestHandler
TH.key = key
TH.pages = BuildPageCache()
#httpd = HTTPServer(('localhost', 80), TH)
httpd = ThreadedHTTPServer(('localhost', 80), TH)
#httpd.socket = ssl.wrap_socket (httpd.socket, keyfile='server1.key', certfile='server1.cert', server_side=True)
print("serving!")
httpd.serve_forever()