-
Notifications
You must be signed in to change notification settings - Fork 0
/
antler.py
93 lines (73 loc) · 2.35 KB
/
antler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from re import findall
from urlparse import urlparse
from urlparse import urljoin
import urllib
import sys
import Queue
import threading
class Crawler(object):
def __init__(self, start_url=None):
"""
collect a dictoinary of visited sites and the the count of <input.. tags.
"""
self.count = 0
self.visited = {}
if start_url:
self.find([start_url])
def update_data(self, url, count):
self.count += 1
self.visited[url] = count
def next(self, urls, depth):
self.find(urls, depth)
def find(self, urls, depth=0):
print '\nurls %d depth %d' % (len(urls), depth)
if depth == 3:
print 'stopped {0}'.format({'depth': depth, 'visited': len(self.visited)})
return False
childs = []
for url in urls:
if self.count > 49:
print '\nstopped {0}'.format({'depth': depth, 'visited': len(self.visited)})
return False
if url in self.visited.keys():
continue
site = Site()
body = site.request(url)
self.update_data(url, site.count_input(body))
childs.extend(site.find_a(body))
if childs:
self.next(childs, depth+1)
return True
class Site(threading.Thread):
# defined here, for test mocking
url = None
def __init__(self):
super(Site, self).__init__()
self.html = ''
def request(self, url):
self.url = url
uri = urlparse(url)
self.base = uri.scheme + '://' + uri.netloc
# write progress output
sys.stdout.write('.')
sys.stdout.flush()
try:
io = urllib.urlopen(self.url)
except:
print '\ncould not open url %s, timeout 1 second' % url
return ''
self.html = io.read()
def count_input(self):
matches = findall(r'<input\s', self.html)
return len(matches)
def find_a(self):
# filtering and mapping
out = []
results = findall(r'<a.*?href="(.*?)"', self.html)
for href in results:
if len(href) > 0:
if href[0] in ['#', '?'] or href.startswith('javascript'):
continue
url = urljoin(self.base, href)
out.append(url)
return out