-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconcurrent_file_size.rb
executable file
·120 lines (99 loc) · 2.92 KB
/
concurrent_file_size.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# Adapted from the Akka/Scala example in Prag Prog's excellent
# "Programming Concurrency on the JVM"
#
# This is mostly a direct translation.
require 'celluloid/autostart'
class SizeCollector
include Celluloid
attr_reader :total_size, :pending_number_of_files_to_visit
def initialize
@idle_file_processors = []
@file_names_to_process = []
@start = Time.now
@total_size = 0
@pending_number_of_files_to_visit = 0
end
def register_worker(file_processor)
@idle_file_processors << file_processor
send_a_file_to_process
end
def add_file_to_process(file_name)
puts "Add file: #{file_name}"
@file_names_to_process << file_name
@pending_number_of_files_to_visit += 1
async.send_a_file_to_process
end
def send_a_file_to_process
return if @idle_file_processors.empty? && @file_names_to_process.empty?
worker = @idle_file_processors.pop
if worker
worker.async.process_file(@file_names_to_process.pop.to_s)
end
end
def add_file_size(file_size)
puts "Add file size: #{file_size}"
@total_size += file_size
@pending_number_of_files_to_visit -= 1
puts "Remaining: #{@pending_number_of_files_to_visit}"
if done?
puts "DONE! Completed in #{Time.now - @start}"
end
end
def done?
@pending_number_of_files_to_visit == 0
end
end
# FileProcessors are the workers with the job to explore a given directory and
# send back the total size of files and names of subdirectories they find. Once
# they finish that task, they send the RequestAFile class to let SizeCollector
# know they're ready to take on the task of exploring another directory. They
# also need to register with SizeCollector in the first place to receive the
# first directory to explore.
class FileProcessor
include Celluloid
def initialize(size_collector)
@size_collector = size_collector
register_to_get_file
end
def process_file(file_name)
size = 0
if File.file?(file_name)
size = File.size(file_name)
elsif File.directory?(file_name)
entries = Dir.glob("#{file_name}/*") + Dir.glob("#{file_name}/.*").reject { |f| f.match(/\.\.?$/) }
entries.each { |f| @size_collector.async.add_file_to_process(f) }
else
return
end
@size_collector.async.add_file_size(size)
register_to_get_file
size
end
private
def register_to_get_file
@size_collector.async.register_worker(Actor.current)
end
end
class ConcurrentFileSize
def initialize
@size_collector = SizeCollector.new
end
def run
unless ARGV[0]
puts "Usage: #{$0} PATH"
exit 1
end
@size_collector.add_file_to_process(ARGV[0])
5.times { FileProcessor.new(@size_collector) }
until @size_collector.done?
sleep 1
puts
puts "Pending: #{@size_collector.pending_number_of_files_to_visit}"
puts "Total: #{@size_collector.total_size}"
puts
end
end
end
if __FILE__ == $0
ConcurrentFileSize.new.run
end