Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions deduplication/__main__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
from deduplication.workflows import *
from deduplication.args import parse_args


args = parse_args()

args.sim_threshold = float(args.sim_threshold)

if args.mode == "bloom":
if args.single:
assert len(args.input) == 1 and len(args.minhash_dir) == 1 and len(args.name) == 1, "Expected single input argument but got a list"
dedup_single_bloom(args.input[0], args.minhash_dir[0], args.num, args.fp, args.output_file, args.name[0], args.sim_threshold, args.num_perm, args.save_dir, not args.skip_minhashing)
dedup_single_bloom(args.input[0], args.minhash_dir[0], args.num, args.fp, args.output_file, args.name[0], args.sim_threshold, args.num_perm, args.save_dir, not args.skip_minhashing, skip_insertion=args.skip_insertion)
elif args.multi:
dedup_multi_bloom(args.input, args.minhash_dir, args.num, args.fp, args.output_file, args.name, args.sim_threshold, args.num_perm, args.save_dir, not args.skip_minhashing)
dedup_multi_bloom(args.input, args.minhash_dir, args.num, args.fp, args.output_file, args.name, args.sim_threshold, args.num_perm, args.save_dir, not args.skip_minhashing, skip_insertion=args.skip_insertion)
else:
assert len(args.input) == 1 and len(args.minhash_dir) == 1 and len(args.name) == 1, "Expected single input argument but got a list"
dedup_single_file_bloom(args.input[0], args.minhash_dir[0], args.num, args.fp, args.output_file, args.name[0], args.sim_threshold, args.num_perm, args.save_dir, not args.skip_minhashing)
dedup_single_file_bloom(args.input[0], args.minhash_dir[0], args.num, args.fp, args.output_file, args.name[0], args.sim_threshold, args.num_perm, args.save_dir, not args.skip_minhashing, skip_insertion=args.skip_insertion)
else:
if args.single:
assert len(args.input) == 1 and len(args.minhash_dir) == 1 and len(args.name) == 1, "Expected single input argument but got a list"
Expand Down
7 changes: 7 additions & 0 deletions deduplication/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ def parse_args():
parser.add_argument(
"--sim-threshold",
help="Jaccard Similarity threshold for deduplication, should be in [0, 1]. Default is 0.8",
type=float,
default=0.8,
)
parser.add_argument(
"--num-perm",
help="Number of hash functions for MinHashing. Default is 128",
type=int,
default=128,
)
parser.add_argument(
Expand Down Expand Up @@ -97,5 +99,10 @@ def parse_args():
help="If set, will skip the minhashing step of each workflow (useful if minhashes have been precomputed at minhash_dir)",
action="store_true"
)
parser.add_argument(
"--skip-insertion",
help="If set, will skip inserting unique documents into the index (works only with LSHBloom)",
action="store_true"
)

return parser.parse_args()
13 changes: 7 additions & 6 deletions deduplication/lshbloom.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(self, minhash_dir: str, lsh_params: Dict):
self.minhash_dir = minhash_dir
self.lsh = MinHashLSHBloom(**lsh_params)

def deduplicate_corpus(self) -> List[Tuple[str]]:
def deduplicate_corpus(self, skip_insertion: bool = False) -> List[Tuple[str]]:
"""
Deduplicates documents in the given corpus and adds them to the LSH index if appropriate.
Documents without existing duplicates will be stored in the LSH index for future deduplication.
Expand All @@ -45,12 +45,12 @@ def deduplicate_corpus(self) -> List[Tuple[str]]:
if f.endswith(".pkl")
]
for minhashfile in minhash_files:
dups = self.deduplicate_minhash_file(minhashfile)
dups = self.deduplicate_minhash_file(minhashfile, skip_insertion=skip_insertion)
duplicate_list.extend(dups)

return duplicate_list

def deduplicate_and_insert(self, params: Tuple) -> List[Tuple[str]]:
def deduplicate_and_insert(self, params: Tuple, skip_insertion: bool = False) -> List[Tuple[str]]:
"""
Deduplicates a MinHash signature corresponding to a document using the provided LSH index.
If the document is not duplicated in the LSH index, it is added to the index.
Expand All @@ -67,12 +67,13 @@ def deduplicate_and_insert(self, params: Tuple) -> List[Tuple[str]]:

# insert if not duplicated in index
if not result:
self.lsh.insert(m_query)
if not skip_insertion:
self.lsh.insert(m_query)
return None

return [(key,)]

def deduplicate_minhash_file(self, minhashfile: str) -> List[Tuple[str]]:
def deduplicate_minhash_file(self, minhashfile: str, skip_insertion: bool = False) -> List[Tuple[str]]:
"""
Deduplicate documents in the given minhash file and adds them to the LSH index if appropriate.
Documents without existing duplicates will be stored in the LSH index for future deduplication.
Expand All @@ -91,7 +92,7 @@ def deduplicate_minhash_file(self, minhashfile: str) -> List[Tuple[str]]:
# can't multiprocess here as insertion requires C++ dependencies that are not compatible with pickle
with tqdm(total=len(minhash_list), desc=fname) as pbar:
for i in range(len(minhash_list)):
result = self.deduplicate_and_insert(minhash_list[i])
result = self.deduplicate_and_insert(minhash_list[i], skip_insertion=skip_insertion)
if result:
duplicate_list.extend(result)
pbar.update()
Expand Down
10 changes: 7 additions & 3 deletions deduplication/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def dedup_single_bloom(
save_dir: str = "./",
compute_minhashes: bool = True,
clear: bool = False,
skip_insertion: bool = False,
):
if clear:
clear_dir(save_dir)
Expand All @@ -138,7 +139,7 @@ def dedup_single_bloom(
m.process()

index = LSHBloom(minhash_dir, lsh_params)
duplicates = index.deduplicate_corpus()
duplicates = index.deduplicate_corpus(skip_insertion=skip_insertion)
write_duplicates_to_csv(duplicates, csvfile, corpus_name, header=["dup_key"])


Expand All @@ -155,6 +156,7 @@ def dedup_multi_bloom(
save_dir: str = "./",
compute_minhashes: bool = True,
clear: bool = False,
skip_insertion: bool = False,
):
assert len(input_dirs) == len(minhash_dirs) == len(corpus_names), \
f"Expected len(input_dirs) == len(minhash_dirs) == len(corpus_names), got {len(input_dirs)}, {len(minhash_dirs)}, {len(corpus_names)}"
Expand All @@ -174,7 +176,8 @@ def dedup_multi_bloom(
n_hash_funcs,
save_dir,
compute_minhashes,
clear=False
clear=False,
skip_insertion=skip_insertion
)

def dedup_single_file_bloom(
Expand All @@ -189,6 +192,7 @@ def dedup_single_file_bloom(
save_dir: str = "./",
compute_minhashes: bool = True,
clear: bool = False,
skip_insertion: bool = False,
):
if clear:
clear_dir(save_dir)
Expand All @@ -208,5 +212,5 @@ def dedup_single_file_bloom(
fname = input_file.split("/")[-1]
minhash_file = f"{minhash_dir}/{fname[:-6]}.pkl"
index = LSHBloom(minhash_dir, lsh_params)
duplicates = index.deduplicate_minhash_file(minhash_file)
duplicates = index.deduplicate_minhash_file(minhash_file, skip_insertion=skip_insertion)
write_duplicates_to_csv(duplicates, csvfile, corpus_name, header=["dup_key"])