-
Notifications
You must be signed in to change notification settings - Fork 11
/
clean.py
170 lines (125 loc) · 5.85 KB
/
clean.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
__author__ = 'roman'
import sys
import os
import operator
import logging
from logger import get_logger
logger = get_logger(__name__)
class Cleaner:
def __init__(self, library):
self.library = library
self._total = len(library.collection)
self._duplicates = 0
self._playlist_entries = {}
self._removed_duplicates = []
def remove_duplicates(self):
"""
Scan library for duplicate entries and deletes them. Duplicates are detected using AUDIO ID information
stored in the library file
:return:
"""
ids = {}
print("\n")
collection = self.library.collection
for entry in collection:
audio_id = entry.get("AUDIO_ID")
if audio_id is None:
continue
if audio_id not in ids:
ids[audio_id] = []
ids[audio_id].append(entry)
#Discard unique entries
duplicates = [entries for entries in ids.values() if len(entries) > 1]
logger.debug("{} duplicates detected".format(len(duplicates)))
# Remove duplicates
for dup in duplicates:
entry_keep, remove_entries = self._choose_entry(dup)
if remove_entries is not None and entry_keep is not None:
new_path = self.library.get_full_path(entry_keep, sys.platform == "win32")
for entry in remove_entries:
old_path = self.library.get_full_path(entry, sys.platform == "win32")
logger.info(u"Removing \"{}\" in favour of \"{}\"".format(old_path, new_path))
self._removed_duplicates.append((old_path, new_path)) # store the same information for UI
collection.remove(entry) # remove from the collection
self._add_playlist_entry(entry, entry_keep) # save to the playlist dictionary for further processing
self._duplicates += len(remove_entries)
elif entry_keep is None:
artist = remove_entries[0].get("ARTIST")
title = remove_entries[0].get("TITLE")
logger.info(u"Duplicates of \"{} - {}\" not removed, because none of the source files exist"
.format(artist, title))
# And now get down to processing playlists
self.process_playlists()
logger.info("\n")
collection.set("ENTRIES", str(len(collection)))
def get_result(self):
"""
Return result of duplicates removal in form of a dictionary {"count": number_of_duplicates,
"duplicates": a list of tuples of removed duplicates (old_path, new_path)). Invoked by UI to get information
about the run.
:return:
"""
return {"count": self._duplicates, "duplicates": self._removed_duplicates}
def _choose_entry(self, entries):
"""
Check duplicate XML entries in the provided list, decide which one to keep and return one entry to keep and
entries to delete. The choice
is made whether on basis which file path exists with the missing path entry discarded. If all the entries do not
exist, then None is returned. If more than one entry exist on the hard drive, then the choice is made by the
number of cue points set for the entry.
:param entries: List of XML entry duplicates
:return: a tuple of form (entry_to_keep, [entries to delete]). None if all the entries do not exist.
"""
def getlen_cuepoints(entry):
return len(entry.findall("CUE_V2"))
exist_entries = []
for entry in entries:
path = self.library.get_full_path(entry, True)
if sys.platform == "darwin":
path = os.path.join("/Volumes", path)
if os.path.exists(path):
exist_entries.append(entry)
if not exist_entries:
return (None, entries)
# get entry with the most number of cue points
entry_keep = max([(getlen_cuepoints(e), e) for e in exist_entries], key=operator.itemgetter(0))[1]
entries.remove(entry_keep)
return (entry_keep, entries)
def _add_playlist_entry(self, old_entry, new_entry):
"""
Adds a new path to the dictionary of playlist entries to replace missing paths.
:param old_path: Path to the missing file
:param new_path: Correct path
:return:
"""
old_path = self.library.get_full_path(old_entry, True, True)
if old_path not in self._playlist_entries:
self._playlist_entries[old_path] = new_entry
def process_playlists(self):
"""
Go through playlists and replace removed paths with correct ones
:return:
"""
playlists = self.library.playlists
logger.debug("Processing playlists")
for playlist_entry in playlists.iter("PRIMARYKEY"):
path = playlist_entry.get("KEY")
if path in self._playlist_entries:
new_entry = self._playlist_entries[path]
new_path = self.library.get_full_path(new_entry, True, True)
playlist_entry.set("KEY", new_path)
logger.debug(u"Playlist entry changed from \"{}\" to \"{}\"".format(path, new_path))
# Windows version of Traktor uses UUIDs for playlist entries, so those have to be updated as well
if sys.platform == "win32":
uuid = new_entry.get("UUID")
playlist_entry.set("UUID", uuid)
def report(self):
"""
Print a short report on what was done after the run.
:return:
"""
print("\n{} entries processed in total".format(self._total))
if self._duplicates == 1:
print("{} duplicate removed".format(self._duplicates))
else:
print("{} duplicates removed".format(self._duplicates))