-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconcurrent_file_size_pool.rb
100 lines (82 loc) · 2.49 KB
/
concurrent_file_size_pool.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# Adapted from the Akka/Scala example in Prag Prog's excellent
# "Programming Concurrency on the JVM"
#
# This is a more idiomatic implementation using Celluloid's actor pool support.
require 'celluloid/autostart'
class SizeCollector
include Celluloid
attr_reader :total_size, :pending_number_of_files_to_visit
def initialize
@worker_pool = FileProcessor.pool
@file_names_to_process = []
@start = Time.now
@total_size = 0
@pending_number_of_files_to_visit = 0
end
def add_file_to_process(file_name)
puts "Add file: #{file_name}"
@file_names_to_process << file_name
@pending_number_of_files_to_visit += 1
send_a_file_to_process
end
def add_file_size(file_size)
puts "Add file size: #{file_size}"
@total_size += file_size
@pending_number_of_files_to_visit -= 1
puts "Remaining: #{@pending_number_of_files_to_visit}"
if done?
puts "DONE! Completed in #{Time.now - @start}"
end
end
def done?
@pending_number_of_files_to_visit == 0
end
private
def send_a_file_to_process
@worker_pool.async.process_file(@file_names_to_process.pop.to_s, Actor.current)
end
end
# FileProcessors are the workers with the job to explore a given directory and
# send back the total size of files and names of subdirectories they find. Once
# they finish that task, they send the RequestAFile class to let SizeCollector
# know they're ready to take on the task of exploring another directory. They
# also need to register with SizeCollector in the first place to receive the
# first directory to explore.
class FileProcessor
include Celluloid
def process_file(file_name, size_collector)
size = 0
if File.file?(file_name)
size = File.size(file_name)
elsif File.directory?(file_name)
entries = Dir.glob("#{file_name}/*") + Dir.glob("#{file_name}/.*").reject { |f| f.match(/\.\.?$/) }
entries.each { |f| size_collector.async.add_file_to_process(f) }
else
return
end
size_collector.async.add_file_size(size)
size
end
end
class ConcurrentFileSize
def initialize
@size_collector = SizeCollector.new
end
def run
unless ARGV[0]
puts "Usage: #{$0} PATH"
exit 1
end
@size_collector.add_file_to_process(ARGV[0])
until @size_collector.done?
sleep 1
puts
puts "Pending: #{@size_collector.pending_number_of_files_to_visit}"
puts "Total: #{@size_collector.total_size}"
puts
end
end
end
if __FILE__ == $0
ConcurrentFileSize.new.run
end