-
Notifications
You must be signed in to change notification settings - Fork 4
/
qsplit.py
executable file
·355 lines (292 loc) · 14.5 KB
/
qsplit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
#!/usr/bin/env python3
# Copyright (c) 2013 Qumulo, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
'''
== Description:
This python sample will use the read_dir_aggregates API to build a list of paths
that can be piped to tools such as rsync in order to optimize a migration
*from* a qumulo cluster to another disk target.
Approach:
- divide a qumulo cluster into N equal partitions based on size. A partition is a list of paths.
The partitioning is based on the block count, which is obtained from fs_read_dir_aggregates
- feed each partition to an rsync client
== Typical Script Usage:
python qsplit.py --ip ip_address|hostname [options] path
If you are targeting a Windows environment and want to use robocopy as the data mover tool,
specify a -r (or --robocopy) option:
python qsplit.py -r --ip 192.168.1.88 /media --buckets 4
'''
# Import python libraries
import argparse
import datetime
import os
import re
import sys
# Import Qumulo REST libraries
# Leaving in the 'magic file path' for customers who want to run these scripts
# from cust_demo as before
import qumulo.lib.auth
import qumulo.lib.request
import qumulo.rest.fs as fs
import qumulo.rest.snapshot as snap
class Bucket:
def __init__(self, size, start_time):
self.size = size
self.free_space = self.size
self.entries = []
self.start_time = start_time
def add_without_duplicate(self, entry_to_add):
'''
add_without_duplicate will add a (directory) entry to bucket entries
only if we're not already handling contents for that directory.
Example 1:
last_bucket_entry = "iTunes/TV/Pan Am/._05 One Coin in a Fountain (HD).m4v"
entry_to_add = "iTunes/TV/Pan Am/"
In this case we're already splitting the Pan Am directory so we don't add
the directory itself as an entry (which just creates more work for rsync or robocopy).
Example 2:
last_bucket_entry = "iTunes/TV/Pan Am/._05 One Coin in a Fountain (HD).m4v"
entry_to_add = "iTunes/TV/Pan Am/03 Ich Bin Ein Berliner (HD).m4v
In this case we add entry_to_add to the current bucket.
'''
if len(self.entries) > 0:
if not(entry_to_add["path"] in self.entries[-1]["path"]):
self.entries.append(entry_to_add)
else:
self.entries.append(entry_to_add)
return
def add(self, entry, current_path, size, robocopy=False):
''' add an entry to the current bucket. If there isn't space for the entry
in the bucket such that we'll exceed max_bucket_size, create a new bucket
and make it the current one. '''
path = current_path + entry['name']
if robocopy:
path = path.replace('/','\\')
bucket_entry = { "path" : path, "size" : size }
# if we're creating robocopy buckets, don't add files just folders
if robocopy and entry['type'] == "FS_FILE_TYPE_DIRECTORY" :
# self.entries.append(bucket_entry)
self.add_without_duplicate(bucket_entry)
elif not robocopy:
# self.entries.append(bucket_entry)
self.add_without_duplicate(bucket_entry)
# decrement the size, regardless
self.free_space -= size
def remaining_capacity(self):
return self.free_space
def print_contents(self):
print('{}, {}'.format(self.free_space, self.size))
for entry in self.entries:
print(entry)
self.print_bucket_size()
def bucket_count(self):
return len(self.entries)
def print_bucket_size(self):
total_size = 0
for entry in self.entries:
total_size += int(entry['size'])
print("Total data stored in bucket: {}".format(str(total_size)))
if total_size > self.size:
print("More data stored in bucket that initial size")
print("Overflow: " + str(total_size-self.size))
def get_bucket_size(self):
total_size = 0
for entry in self.entries:
total_size += int(entry['size'])
return total_size
def save(self, filename, offset, robocopy):
# create a file for bucket path entries
bucket_file = open(filename, 'w+')
for entry in self.entries:
if robocopy:
bucket_file.write(entry['path'].encode('utf-8') + '\n')
else:
relative_path = entry['path'][offset:]
bucket_file.write(relative_path.encode('utf-8') + '\n')
bucket_file.close()
#### Classes
class QumuloFilesCommand(object):
''' class wrapper for REST API cmd so that we can new them up in tests '''
def __init__(self, args=None):
self.port = args.port
self.user = args.user
self.passwd = args.passwd
self.host = args.host
self.num_buckets = args.buckets
self.agg_type = args.agg_type
self.robocopy = args.robocopy
self.verbose = args.verbose
self.snap = None
# add trailing slash if it doesn't exist
self.start_path = re.sub("([^/])$", "\g<1>/", args.start_path)
self.connection = qumulo.lib.request.Connection(self.host, int(self.port))
self.credentials = qumulo.lib.auth.get_credentials(args.credentials_store)
self.login()
if args.snapshot_id is not None:
self.snap = snap.get_snapshot(self.connection,
self.credentials,
args.snapshot_id).data
self.total_size = self.get_directory_size(self.start_path)
self.max_bucket_size = self.total_size / self.num_buckets
if self.verbose:
print("--------Total size: " + str(self.total_size) + " -------------")
print( "--------Max Bucket size: " + str(self.max_bucket_size) + " -------------")
self.start_time = datetime.datetime.now()
self.create_buckets()
self.bucket_index = 0
self.items_iterated_count = 0
def login(self):
# Check to see if we have valid stored credentials before we try the
# specified username and password.
try:
if qumulo.rest.auth.who_am_i(self.connection, self.credentials):
return
except qumulo.lib.request.RequestError:
pass
try:
login_results, _ = qumulo.rest.auth.login(\
self.connection, None, self.user, self.passwd)
self.credentials = qumulo.lib.auth.Credentials.\
from_login_response(login_results)
except Exception as excpt:
print("Error connecting to the REST server: {}".format(excpt))
print(__doc__)
sys.exit(1)
def create_buckets(self):
self.buckets = []
if self.num_buckets == 1:
self.buckets.append(Bucket(self.max_bucket_size, self.start_time))
else:
for i in range(0, self.num_buckets):
self.buckets.append(Bucket(self.max_bucket_size, self.start_time))
def current_bucket(self):
return self.buckets[self.bucket_index]
def get_next_bucket(self):
# Only increment to a new bucket if we are not already pointing to the
# last one
if self.bucket_index < self.num_buckets:
if self.verbose:
self.current_bucket().print_contents()
self.bucket_index +=1
def process_buckets(self):
bucket_num = 1
units = "GB"
if self.agg_type == 'files':
units = "Inodes"
for bucket in self.buckets:
sz = str(round(bucket.get_bucket_size()/(1000*1000*1000), 1))
bucket_percent = round(100.0 * bucket.get_bucket_size()/self.total_size,1)
filename = "split_bucket_%s.txt" % (bucket_num, )
if self.agg_type == 'files':
sz = str(bucket.get_bucket_size())
print("Bucket %s size: %s %s (%s%%) - count: %s file_name: %s" % (
str(bucket_num).rjust(3),
sz.rjust(9),
units,
str(bucket_percent).rjust(5),
str(len(bucket.entries)).rjust(8),
filename
)
)
bucket.save(filename, len(self.start_path), self.robocopy)
if self.verbose:
print("--------Dumping Bucket: " + str(i) + "-------------")
bucket.print_contents()
bucket_num += 1
def get_directory_size(self, path):
try:
result = fs.read_dir_aggregates(self.connection,
self.credentials,
path=path,
snapshot=self.snap['id'] if self.snap is not None else None)
except qumulo.lib.request.RequestError as excpt:
print(sys.exc_info())
sys.exit(1)
sz = int(result.data['total_capacity'])
if self.agg_type == 'files':
sz = int(result.data['total_files']) \
+ int(result.data['total_other_objects']) \
+ int(result.data['total_symlinks']) \
+ int(result.data['total_directories'])
return sz
def process_folder(self, path):
try:
response = fs.read_entire_directory(self.connection,
self.credentials,
page_size=1000,
path=path,
snapshot=self.snap['id'] if self.snap is not None else None)
except Exception as excpt:
print("Error in read_entire_directory: %s" % excpt)
sys.exit(1)
for r in response:
if self.verbose:
print("processing " + str(len(r.data['files'])) + " in path " + path)
self.process_folder_contents(r.data['files'], path)
self.items_iterated_count += 1
def process_folder_contents(self, dir_contents, path):
for entry in dir_contents:
if self.items_iterated_count >0 and (self.items_iterated_count % 1000) == 0:
print("Processed %s items." % (self.items_iterated_count, ))
size = 0
if entry['type'] == "FS_FILE_TYPE_FILE" or entry['type'] == "FS_FILE_TYPE_SYMLINK":
if self.agg_type == 'files':
size = 1
elif self.agg_type == 'capacity':
size = int(entry['size'])
else:
size = self.get_directory_size(entry['path'])
snap_dir = ""
if self.snap is not None:
snap_dir = ".snapshot/" + self.snap['directory_name'] + "/"
# File or dir fits in the current bucket or
# we're on the last bucket already -> add it
if (size <= self.current_bucket().remaining_capacity()) or (self.bucket_index == (self.num_buckets-1)):
self.current_bucket().add(entry, path + snap_dir, size, self.robocopy)
else:
# This item is too large to fit in the bucket.
# Check if it is a dir and traverse it.
# We can pick files within
if (entry['type'] == "FS_FILE_TYPE_DIRECTORY"):
new_path = path + entry['name'] + "/"
if self.verbose:
print("Calling process_folder with " + new_path + "... ")
self.process_folder(new_path)
else:
# It is a file that doesn't fit. Start a new bucket.
self.get_next_bucket()
print("Starting bucket " + str(self.bucket_index))
self.current_bucket().add(entry, path + snap_dir, size, self.robocopy)
self.items_iterated_count += 1
def main():
''' Main entry point '''
parser = argparse.ArgumentParser()
parser.add_argument("--ip", "--host", default="music", dest="host", required=True, help="Required: Specify host (cluster) for file lists")
parser.add_argument("-P", "--port", type=int, dest="port", default=8000, required=False, help="Specify port on cluster; defaults to 8000")
parser.add_argument("--credentials-store", default=qumulo.lib.auth.credential_store_filename(), help="Read qumulo_api credentials from a custom path")
parser.add_argument("-u", "--user", default="admin", dest="user", required=False, help="Specify user credentials for login; defaults to admin")
parser.add_argument("--password", default="admin", dest="passwd", required=False, help="Specify user pwd for login, defaults to admin")
parser.add_argument("-b", "--buckets", type=int, default=1, dest="buckets", required=False, help="Specify number of manifest files (aka 'buckets'); defaults to 1")
parser.add_argument("-v", "--verbose", default=False, required=False, dest="verbose", help="Echo values to console; defaults to False ", action="store_true")
parser.add_argument("-r", "--robocopy", default=False, required=False, dest="robocopy", help="Generate Robocopy-friendly buckets", action="store_true")
parser.add_argument("-a", "--aggregate_type", default='capacity', required=False, dest="agg_type", help="Split based on 'capacity' (default) or 'files'")
parser.add_argument("-s", "--snapshot", default=None, required=False, dest="snapshot_id", help="Specify a specific snapshot by numeric id")
parser.add_argument("start_path", action="store", help="Path on the cluster for file info; Must be the last argument")
args = parser.parse_args()
command = QumuloFilesCommand(args)
print("Begin folder and file traversal.")
command.process_folder(command.start_path)
print("Completed folder and file traversal. Process Buckets.")
command.process_buckets()
# Main
if __name__ == '__main__':
main()