-
Notifications
You must be signed in to change notification settings - Fork 1
/
digest-server.py
executable file
·97 lines (79 loc) · 2.67 KB
/
digest-server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#!/usr/bin/python
from resource import *
from xmlutils import *
import tornado.ioloop
import tornado.web
from curtain import digest
from os import path as ospath
from urllib2 import HTTPError
from urllib import unquote
def chunks(l, n):
return [l[i:i+n] for i in range(0, len(l), n)]
def testPredicate(pred, errno):
def testPredicateDecorator(func):
def newFunc(self):
if pred(self):
self.send_error(errno)
else:
func(self)
return newFunc
return testPredicateDecorator
def forbidden(self):
realpath = ospath.realpath(unquote(self.request.path))
return not realpath.startswith('/' + self.params['username'])
def enclosingDirectoryNotFound(self):
return not ospath.exists('.' +
ospath.dirname(unquote(self.request.path).rstrip('/')))
def notFound(self):
return not ospath.exists('.' + unquote(self.request.path))
class Handler(digest.DigestAuthMixin, tornado.web.RequestHandler):
creds = {}
def getcreds(uname):
if uname in Handler.creds:
return Handler.creds[uname]
@digest.digest_auth('realm',getcreds)
@testPredicate(forbidden, 403)
@testPredicate(notFound, 404)
def get(self):
r = Resource()
url = self.request.full_url()
r.initFromUrl(url)
if r.category == 'directory':
rs = getResourceList(url)
xmlstr = buildResourceList(rs)
else:
r.addContent()
xmlstr = buildResourceDownload(r)
self.write(xmlstr)
@digest.digest_auth('realm',getcreds)
@testPredicate(forbidden, 403)
@testPredicate(enclosingDirectoryNotFound, 404)
def put(self):
resource = parseResourceUpload(self.request.body)
# Error if writing uploading a directory where a file exists
if resource.category == "directory" and not notFound(self):
return self.send_error(400)
resource.putContent('.'+ unquote(self.request.path))
self.set_status(200)
self.finish()
@digest.digest_auth('realm',getcreds)
@testPredicate(forbidden, 403)
@testPredicate(notFound, 404)
def delete(self):
r = Resource()
url = self.request.full_url()
r.initFromUrl(url)
r.deleteContent()
self.set_status(200)
self.finish()
passwdfile = open(".passwd", "r")
Handler.creds= {}
for [user, pw] in chunks(passwdfile.readlines(), 2):
user = user.strip('\n')
pw = pw.strip('\n')
Handler.creds[user] = {'auth_username': user, 'auth_password': pw}
application = tornado.web.Application([
(r"/.*", Handler),
], '')
application.listen(8887)
tornado.ioloop.IOLoop.instance().start()