-
Notifications
You must be signed in to change notification settings - Fork 0
/
follotter_broker.rb
248 lines (215 loc) · 7.09 KB
/
follotter_broker.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
require 'rubygems'
require 'open-uri'
require "time"
require "json"
require "pp"
require 'date'
require 'time'
require 'parsedate'
require 'yaml'
require 'oauth'
require 'amqp'
require 'mq'
require 'carrot'
$:.unshift(File.dirname(__FILE__))
require 'follotter_database'
class FollotterBroker < FollotterDatabase
def self.start
config = YAML.load_file("/home/seiryo/work/follotter/follotter_config.yml")
@@API_USER = config["API_USER"]
@@API_PASSWORD = config["API_PASSWORD"]
@@HOST_MQ = config["HOST_MQ"]
@@HDB_FILE_PATH = config["HDB_FILE_PATH"]
@@QUEUE_FILE_PATH = config['QUEUE_COUNTER_FILE_PATH']
@@RESET_FILE_PATH = config['QUEUE_RESETER_FILE_PATH']
@@LOWER_LIMIT = config['STATUSES_LOWER_LIMIT']
@@STORE_API_LIMIT = config['STORE_API_LIMIT']
@@ACTIVE_RATE = config['ACTIVE_USER_FETCH_RATE']
@@CONFIG = config
broke_count, fetch_count, parse_count, update_count = self.acquire_queue_count(@@QUEUE_FILE_PATH)
batch = self.create_batch(broke_count, fetch_count, parse_count, update_count)
first_api_limit = batch.api_limit
# API制限を少し残しておく
batch.api_limit -= @@STORE_API_LIMIT
return if (0 != (fetch_count + update_count + update_count))
#self.optimize_tch
return unless 0 < batch.api_limit
# RabbitMQ接続
carrot = Carrot.new(:host => @@HOST_MQ)
# updater(lookup)で出力されたキューを再発行
if 0 < broke_count
batch = self.enqueue_relation(carrot, batch)
end
# アクティブユーザをクロールするためのキューを発行
if 0 < batch.api_limit
batch = self.enqueue_lookup(carrot, batch)
end
# RabbitMQ切断
carrot.stop
# 発行したキューの数を記録し、保存
batch.queue = first_api_limit - batch.api_limit
batch.api_limit = first_api_limit
batch.save
# 終了
end
def self.enqueue_lookup(carrot, batch)
crawl_users = Array.new
active_users = ActiveUser.find(:all, :order => 'updated DESC',
:limit => @@ACTIVE_RATE * batch.api_limit)
active_users.each do |au|
au_id = au.id
au.destroy
user = User.find_by_id(au_id)
unless user
user = User.new
user.id = au_id.to_i
end
#next unless @@LOWER_LIMIT <= user.statuses_count
crawl_users << user
next if 100 > crawl_users.size
queue = self.create_queue(crawl_users)
fq = carrot.queue('fetcher')
fq.publish(Marshal.dump(queue))
sq = carrot.queue('streamer')
sq.publish(Marshal.dump(queue))
batch.api_limit -= 1
crawl_users = Array.new
end
if 0 < crawl_users.size
queue = self.create_queue(crawl_users)
fq = carrot.queue('fetcher')
fq.publish(Marshal.dump(queue))
sq = carrot.queue('streamer')
sq.publish(Marshal.dump(queue))
batch.api_limit -= 1
end
return batch
end
def self.enqueue_relation(carrot, batch)
broke = carrot.queue('broker')
fetch = carrot.queue('fetcher')
while batch.api_limit > 0
msg = broke.pop(:ack => false)
break unless msg
queue = Marshal.load(msg)
user = queue[:lookup_user]
# 発言数が一定以下のユーザはスキップ
next if @@LOWER_LIMIT > user.statuses_count
fetch.publish(msg)
batch.api_limit -= 1
end
return batch
end
def self.check_process(name)
count = `ps aux | grep #{name} | grep -v grep | wc -l`
count = count.chomp.to_i
return if count > 0
`ruby #{name}`
end
def self.create_queue(crawl_users)
queue = Hash.new
#queue[:user_id] = false
#queue[:target] = false
queue[:api] = "lookup"
queue[:lookup_users_hash] = Hash.new
queue[:lookup_relations] = Hash.new
crawl_users.each do |user|
queue[:lookup_users_hash][user.id] = user if nil != user.screen_name
queue[:lookup_relations][user.id] = self.acquire_relations(user.id)
end
user_ids = crawl_users.map{ |u| u.id }
queue[:url] = "http://api.twitter.com/1/users/lookup.json?user_id=#{user_ids.join(",")}"
#queue[:new_relations] = []
return queue
end
def self.acquire_queue_count(queue_counter)
results = `ruby #{queue_counter}`
results = results.split("\n")
#pp results
raise unless 4 == results.size
broke_count = (results.shift).to_i
fetch_count = (results.shift).to_i
parse_count = (results.shift).to_i
update_count = (results.shift).to_i
return broke_count, fetch_count, parse_count, update_count
end
def self.acquire_relations(user_id)
relations = Hash.new
relations[:normal] = Hash.new
relations[:remove] = Hash.new
relations[:normal][:friends] = Array.new
relations[:normal][:followers] = Array.new
relations[:remove][:friends] = Array.new
relations[:remove][:followers] = Array.new
Friend.find_all_by_user_id(user_id).each do |f|
relations[:normal][:friends] << f.target_id if false == f.removed
relations[:remove][:friends] << f.target_id if true == f.removed
end
Follower.find_all_by_user_id(user_id).each do |f|
relations[:normal][:followers] << f.target_id if false == f.removed
relations[:remove][:followers] << f.target_id if true == f.removed
end
return relations
end
def self.create_batch(broke_count, fetch_count, parse_count, update_count)
# 未クロールのアクティブユーザ数カウント
begin
finish_count = ActiveUser.count
rescue
finish_count = 0
end
# アクティブユーザが居ない場合は取得
exception = nil
if 0 == finish_count
`/bin/sh /home/seiryo/work/follotter/follotter_yats.sh`
finish_count = ActiveUser.count
exception = "reset"
end
# API残り回数を取得
api_limit = self.acquire_api_limit
# 本バッチ情報を記録、作成
batch = Batch.create(
:exception => exception,
:api_limit => api_limit,
:finisher => finish_count,
:broker => broke_count,
:fetcher => fetch_count,
:parser => parse_count,
:updater => update_count)
if nil != exception
# Streamerへのキューをリセット
`ruby #{@@RESET_FILE_PATH}`
end
return batch
end
def self.acquire_api_limit
consumer = OAuth::Consumer.new(
@@CONFIG['CONSUMER_KEY'],
@@CONFIG['CONSUMER_SECRET'],
:site => 'http://twitter.com'
)
access_token = OAuth::AccessToken.new(
consumer,
@@CONFIG['ACCESS_TOKEN'],
@@CONFIG['ACCESS_TOKEN_SECRET']
)
url = "http://api.twitter.com/1/account/rate_limit_status.json"
response = access_token.get(url)
raise if 300 <= response.code.to_i
res = JSON.parse(response.body)
limit = res["remaining_hits"]
return limit.to_i
end
end
lock_file_path = "/tmp/fb_lock"
unless File.exist?(lock_file_path)
begin
file = File.open(lock_file_path, 'w')
file.puts Time.now.strftime("%Y-%m-%d %H:%M:%S")
file.close
FollotterBroker.start
rescue
ensure
File.unlink(lock_file_path)
end
end