-
Notifications
You must be signed in to change notification settings - Fork 0
/
run.py
144 lines (104 loc) · 3.61 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import time
import json
from ConfigParser import ConfigParser
from threading import Thread
from collections import Counter
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
from py2neo import Graph
from neo4j import tweet_to_neo4j
#implement config parser to avoid showing secrets on github
config = ConfigParser()
config.read('config.ini')
consumer_key = config.get("twitter", "consumer_key")
consumer_secret = config.get("twitter", "consumer_secret")
access_token_key = config.get("twitter", "access_token_key")
access_token_secret = config.get("twitter", "access_token_secret")
#authoize th twitter stream
auth = OAuthHandler(consumer_key, consumer_secret)
auth.secure = True
auth.set_access_token(access_token_key, access_token_secret)
#neo4j graph
graph = Graph("http://neo4j:[email protected]:7474/db/data/")
class TwitterListener(StreamListener):
""" A Handlers which sends tweets received by the string to the current RDD
"""
def __init__(self):
self.hashtag_bucket = []
# def on_connect(self):
# self.start_time = time.time()
# time.sleep(1)
def on_data(self, data):
try:
#parse only the data we need from the tweet
data = ParseTweet(data)
# print data
#write the tweet a neo4j database for later analysis
tweet_to_neo4j(data,graph)
#append hashtag to hashtag counter list
hashtags = data[4]
# add hashtags to big list
self.hashtag_bucket = self.hashtag_bucket + hashtags
#print running total of hashtags
#write counter() object to a new file every 5 minutes, reset hashtag bucket
# file_suffix += 1
# self.hashtag_bucket = []
# else:
# return True
except BaseException, e:
print 'Failed on_data because ', str(e)
time.sleep(2)
def on_error(self, status):
print(status)
def ParseTweet(tweet):
"""From every tweet, returns only data relevant to Yewno task
"""
temp = json.loads(tweet)
tweet_id = temp['id']
user_id = temp['user']['id']
user_name = temp['user']['name']
tweet_text = temp['text']
hash_tags = []
for i in temp['entities']['hashtags']:
hash_tags.append(i['text'])
symbols = temp['entities']['symbols']
#if the tweet is retweeting something, also grab the id of the retweeted tweet.
if temp.has_key('retweeted_status'):
RT_id = temp['retweeted_status']['id']
else:
RT_id = 'null'
return [tweet_id,
user_id,
user_name,
tweet_text,
hash_tags,
symbols,
RT_id]
def write_counter(counter,file_suffix):
"""writes a Counter() object to a file as a list of tuples"""
filename = 'counter%s.txt' % str(file_suffix)
for i in counter:
file = open(filename,'a')
file.write(str(i))
file.write('\n')
file.close()
class BackgroundTimer(Thread):
def run(self):
self.file_suffix = 1
while 1:
time.sleep(300)
x = Counter(listener.hashtag_bucket).most_common(25)
write_counter(x,self.file_suffix)
print "writing counter"
self.file_suffix += 1
listener.hashtag_bucket = []
if __name__ == '__main__':
#create listener
listener = TwitterListener()
#start background timer
timer = BackgroundTimer()
timer.start()
#start stream
stream = Stream(auth, listener)
stream.filter(track=['basketball'])