-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathduplicates_folder_bacci.py
More file actions
345 lines (255 loc) · 11.2 KB
/
duplicates_folder_bacci.py
File metadata and controls
345 lines (255 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
"""
This scripts was made with the purpose to clean from duplicates, folders that have to be sent to cold storage.
It takes as input a csv file containing the list of duplicated files given by the dsi, and a directory of interest.
It outputs a txt file containing the list of duplicated folders in the directory of interest, with the differences in files between the folders (if there are some differences).
"""
#%%
import pandas as pd
from pathlib import Path
import numpy as np
import os
import csv
from collections import defaultdict
#%% CHANGING THESE VARIABLES TO ADAPT TO OTHER FOLDERS
# the csv file containing the list of duplicated files given by the dsi
path_to_duplicated_files_list = Path("//iss/bacci/STORAGE_INFOS/bacci_duplicated_files_list.csv") #change here
# the server path
server_path = '//iss/bacci/' #change here
jo = "raw/Joana Lourenco/2P imaging_troubleshooting"
angela = "raw/Angela De Stasi"
fani = "raw/Fani Koukouli/Fani Koukouli_in vivo"
kathleen = "users/kathleen.cho"
# the directory you want to clean duplicates files from
str_dir = fani #change here
main_dir = Path(str_dir)
save_path_to_csv = Path("//iss/bacci/users/mai-an.nguyen/fani_duplicated_files_list.csv") #change here
output_txt_file_name = "fani_duplicated_folders_list.txt" #change here
#%% Load the csv file of duplicated files
table = pd.read_csv(path_to_duplicated_files_list)
table['size_bytes'] = table['size_bytes'] / 1e9 #convert bytes to GB
#%% Create a csv file with only duplicated files from the folder of interest (used for the next steps)
table_folder = table[table['file'].str[:len(str_dir)] == str_dir]
hash_list = np.unique(table_folder.hash)
table_folder = table.set_index('hash').loc[hash_list]
table_folder.to_csv(save_path_to_csv)
#%% Step 1: Read csv file and map in a dictionnary each filepath to its hash index
""" save_path_to_csv = Path("//iss/bacci/users/mai-an.nguyen/test_duplicated_files_list.csv")
main_dir = Path('test_duplicates')
server_path = "C:/Users/mai-an.nguyen/Documents" """
csv_file = Path(save_path_to_csv)
path_to_hash = {} # filepath -> hash index
with open(csv_file, newline="") as f:
reader = csv.reader(f)
next(reader) #skip header
for row in reader:
if len(row) == 0 :
break
file_hash = row[0].strip()
path = os.path.normpath(row[5].strip())
path_to_hash[path] = file_hash
#%% Step 2: Build incomplete directory trees from path_to_hash (path that don't have duplicates of interest won't be included in the tree)
tree_files = defaultdict(dict) # parent dir -> {filename: hash}
tree_children = defaultdict(set) # parent dir -> set(subdirs)
all_dirs = set() # set of all directories
for path, file_hash in path_to_hash.items():
folder, filename = os.path.split(path)
tree_files[folder][filename] = file_hash
# Build tree_children: Register full parent chain
current = folder
while True:
all_dirs.add(current)
parent = os.path.dirname(current)
if parent == current:
break
tree_children[parent].add(current)
current = parent
all_dirs.remove('') #remove empty string if present
#%% Step 3: Bottom-up directory hashing
# Compute a signature for each directory based on its files and the signatures of its children.
# This way, two directories will have the same signature if they contain the same files (with same hashes) and the same subdirectories (with same signatures).
dir_signature = {} # dir -> tuple
def compute_signature(folder):
"""
NOT COMPLETE
NB: Signature would be the same only if duplicate files have exactly the same names.
"""
if folder in dir_signature:
return dir_signature[folder]
files_part = tuple(sorted(tree_files[folder].items()))
children_part = []
for child in sorted(tree_children[folder]):
child_sig = compute_signature(child)
children_part.append((os.path.basename(child), child_sig))
signature = (files_part, tuple(children_part))
dir_signature[folder] = signature
return signature
def compute_signature_without_filenames(folder):
"""
NB: Signature SHOULD be the same even if duplicate files don't have the same name...? maybe?
"""
if folder in dir_signature:
return dir_signature[folder]
files_part = tuple(sorted(tree_files[folder].values()))
children_part = []
for child in sorted(tree_children[folder]):
child_sig = compute_signature(child)
children_part.append((child_sig))
signature = (files_part, tuple(children_part))
dir_signature[folder] = signature
return signature
for d in all_dirs:
compute_signature_without_filenames(d)
#%% Step 4: Group duplicate folders (with same signature)
signature_groups = defaultdict(list)
for folder, sig in dir_signature.items():
if tree_files[folder] or tree_children[folder]:
signature_groups[sig].append(folder)
duplicate_groups = [
group for group in signature_groups.values()
if len(group) > 1
]
#%% Step 5: Keep only highest-level folder duplicates
flat = []
for group in duplicate_groups:
for folder in group:
flat.append((folder, dir_signature[folder]))
flat.sort(key=lambda x: x[0].count(os.sep))
selected = []
selected_by_sig = defaultdict(list)
for folder, sig in flat:
if any(os.path.commonpath([folder, p]) == p for p in selected):
continue
selected.append(folder)
selected_by_sig[sig].append(folder)
final_groups = [
group for group in selected_by_sig.values()
if len(group) > 1
]
final_groups_clean = final_groups
#%% [OPTIONAL] Step 6: Filter out some groups.
# This is optional and can be adapted based on the specific use case.
final_groups_clean = []
for group in final_groups:
large_group = len(group) > 5
has_parent = [True if main_dir in Path(path).parents else False for path in group]
if any(has_parent) :
group.insert(0, group.pop(has_parent.index(True)))
basenames = set([os.path.basename(path) for path in group])
is_tseries = [(b.startswith("TSeries") or b.startswith("log8bit") or b.startswith("ZSeries")) for b in basenames]
different_tseries = len(basenames) > 1 and any(is_tseries) and \
( (all(is_tseries) and len(set([b.split("-")[1] for b in basenames])) > 1 ) or not all(is_tseries) )
if not large_group and any(has_parent) and not different_tseries:
#if any(has_parent) and not different_tseries:
final_groups_clean.append(group)
#%% Step 7: Write output (with file differences)
def list_files(folder):
files = set()
for root, _, filenames in os.walk(folder):
for f in filenames:
full = os.path.join(root, f)
rel = os.path.relpath(full, folder)
files.add(rel)
return files
def add_duplicates_files(ref_files_duplicates:set, parent, ref_dir, tree_files):
for key in tree_files[parent].keys():
relpath = os.path.relpath(parent, ref_dir)
if relpath == '.':
key_rel = key
else :
key_rel = os.path.join(relpath, key)
ref_files_duplicates.add(key_rel)
return ref_files_duplicates
def collect_paths(tree, node):
paths = []
for child in tree[node]:
new_path = child
paths.append(new_path)
paths.extend(collect_paths(tree, child))
return paths
def compare_folders(ref_files, current_files, ref_dir, current_dir):
"""
Compare two directories and return two sets: missing and extra files.
missing contains files in ref_dir that are not in current_dir.
extra contains files in current_dir that are not in ref_dir.
Files that are duplicates (same hash) but have different names are removed from missing and extra.
ref_files : set of files in ref_dir
current_files : set of files in current_dir
ref_dir : directory path of ref_dir
current_dir : directory path of current_dir
return (missing, extra)
"""
missing = ref_files - current_files
extra = current_files - ref_files
# --------- Extra files in reference that are not in current (not duplicate files) ---------
ref_files_duplicates = set()
current_files_duplicates = set()
parent = ref_dir
children = collect_paths(tree_children, parent)
children.insert(0, parent)
while True :
ref_files_duplicates = add_duplicates_files(ref_files_duplicates, parent, ref_dir, tree_files)
if len(children) != 0:
parent = children[0]
children.pop(0)
else:
break
for f in current_files:
if f not in ref_files_duplicates and f not in extra :
extra.add(f)
# --------- Missing files in reference that are in current (not duplicate files) ---------
parent = current_dir
children = collect_paths(tree_children, parent)
children.insert(0, parent)
while True :
current_files_duplicates = add_duplicates_files(current_files_duplicates, parent, current_dir, tree_files)
if len(children) != 0:
parent = children[0]
children.pop(0)
else:
break
for f in ref_files:
if f not in current_files_duplicates and f not in missing :
missing.add(f)
# --------- Remove from missing and extra the files that are duplicates but don't have the same names ---------
for m in missing.copy():
if m in tree_files[os.path.normpath(ref_dir)]:
missing.remove(m)
for e in extra.copy():
if e in tree_files[os.path.normpath(current_dir)]:
extra.remove(e)
return missing, extra
# saved by default to Desktop
output_file = os.path.join(os.path.expanduser('~'), 'Desktop', output_txt_file_name)
with open(output_file, "w") as out:
for i, group in enumerate(final_groups_clean, 1):
out.write(f"--- Duplicate Group {i} ---\n")
reference = group[0]
ref_files = list_files(os.path.join(server_path, reference))
out.write(f"Reference: {reference}\n")
for folder in group:
out.write(folder + "\n")
out.write("\n")
# Compare others against reference
for folder in group[1:]:
current_files = list_files(os.path.join(server_path, folder))
missing, extra = compare_folders(ref_files, current_files, reference, folder)
numerous_missing = len(missing) > 30
numerous_extra = len(extra) > 30
if missing or extra:
out.write(f"Differences - {folder} has:\n")
if numerous_extra or numerous_missing:
out.write(f" Missing files: {len(missing)}.\n")
out.write(f" Extra files: {len(extra)}.\n")
out.write(" Too many differences to list. Probably not duplicate folders.\n")
else :
if missing:
out.write(" Missing files:\n")
for f in sorted(missing):
out.write(f" {f}\n")
if extra:
out.write(" Extra files:\n")
for f in sorted(extra):
out.write(f" {f}\n")
out.write("\n")
out.write("\n")
print(f"Done. Results written to {output_file}")